member.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strings"
  6. "go-common/app/job/main/account-summary/model"
  7. "go-common/library/log"
  8. )
  9. func (s *Service) memberBinLogproc(ctx context.Context) {
  10. for msg := range s.MemberBinLog.Messages() {
  11. blog := &model.CanalBinLog{}
  12. if err := json.Unmarshal(msg.Value, blog); err != nil {
  13. log.Error("Failed to unmarshal canal bin log: %+v, value: %s: %+v", msg, string(msg.Value), err)
  14. msg.Commit()
  15. continue
  16. }
  17. log.Info("Handling message key: %s, value: %s", msg.Key, string(msg.Value))
  18. s.memberBinLogHandle(ctx, blog)
  19. msg.Commit()
  20. }
  21. }
  22. func (s *Service) memberBinLogHandle(ctx context.Context, blog *model.CanalBinLog) {
  23. if len(blog.New) == 0 {
  24. log.Error("Failed to sync to hbase with empty new field: %+v", blog)
  25. return
  26. }
  27. switch {
  28. case strings.HasPrefix(blog.Table, "user_base_") || strings.HasPrefix(blog.Table, "user_official") || strings.HasPrefix(blog.Table, "user_exp_"):
  29. midl := &model.MidBinLog{}
  30. if err := json.Unmarshal(blog.New, midl); err != nil {
  31. log.Error("Failed to unmarsha new data: %s: %+v", string(blog.New), err)
  32. return
  33. }
  34. // FIXME: 一段时间后该用 syncMember
  35. if err := s.SyncOne(ctx, midl.Mid); err != nil {
  36. log.Error("Failed to sync member with mid: %d: %+v", midl.Mid, err)
  37. return
  38. }
  39. default:
  40. log.Warn("Unable to hanlde binlog: %+v, old: %s, new: %s", blog, string(blog.Old), string(blog.New))
  41. }
  42. }