databus.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. "go-common/app/job/openplatform/open-sug/conf"
  8. "go-common/app/job/openplatform/open-sug/model"
  9. "go-common/library/log"
  10. )
  11. func (s *Service) pgcConsumePROC() {
  12. var (
  13. msgs = s.pgcSub.Messages()
  14. err error
  15. )
  16. for {
  17. msg, ok := <-msgs
  18. if !ok {
  19. log.Error("pgc databus Consumer exit")
  20. return
  21. }
  22. s.pgcMsgCnt++
  23. msg.Commit()
  24. log.Info("message commit key:%s partition:%d offset:%d", msg.Key, msg.Partition, msg.Offset)
  25. m := &model.Message{}
  26. if err = json.Unmarshal(msg.Value, m); err != nil {
  27. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  28. continue
  29. }
  30. func(m *model.Message) {
  31. s.wg.Add(1)
  32. //defer s.wg.Done()
  33. go s.subproc()
  34. if m.Table == "t_chn_season2" || m.Table == "t_jp_season2" {
  35. s.seasonUpdate(m.Action, m.New, m.Old)
  36. s.seasonMsgCnt++
  37. }
  38. }(m)
  39. log.Info("pgcConsumeproc table:%s key:%s partition:%d offset:%d", m.Table, msg.Key, msg.Partition, msg.Offset)
  40. }
  41. }
  42. func (s *Service) seasonUpdate(action string, n, o json.RawMessage) {
  43. newSeason := new(model.Season)
  44. if err := json.Unmarshal(n, newSeason); err != nil {
  45. log.Error("json.Unmarshal(%s) error(%v)", n, err)
  46. return
  47. }
  48. oldSeason := new(model.Season)
  49. if err := json.Unmarshal(o, oldSeason); err != nil {
  50. log.Error("json.Unmarshal(%s) (%s)error(%v)", string(o), o, err)
  51. return
  52. }
  53. if action == "insert" || action == "update" {
  54. if newSeason.FieldDiff(oldSeason) {
  55. s.dao.Index(
  56. context.TODO(),
  57. s.envIndex(s.c.ElasticSearch.Season.Index),
  58. s.c.ElasticSearch.Season.Type,
  59. strconv.Itoa(newSeason.ID),
  60. newSeason.EsFormat(),
  61. )
  62. }
  63. }
  64. }
  65. func (s *Service) envIndex(index string) string {
  66. return fmt.Sprintf("%s_%s", s.c.Env, index)
  67. }
  68. func (s *Service) existsOrCreate(c conf.EsIndex) {
  69. if s.dao.IndexExists(context.TODO(), s.envIndex(c.Index)) {
  70. log.Info("索引(%s)已存在", s.envIndex(c.Index))
  71. return
  72. }
  73. if !s.dao.CreateIndex(context.TODO(), s.envIndex(c.Index), c.Mapping) {
  74. panic("创建索引失败")
  75. }
  76. }
  77. func (s *Service) subproc() {
  78. defer s.wg.Done()
  79. }