block.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/job/main/account-summary/model"
  6. "go-common/library/log"
  7. )
  8. func (s *Service) blockBinLogproc(ctx context.Context) {
  9. for msg := range s.BlockBinLog.Messages() {
  10. blog := &model.CanalBinLog{}
  11. if err := json.Unmarshal(msg.Value, blog); err != nil {
  12. log.Error("Failed to unmarshal canal bin log: %+v, value: %s: %+v", msg, string(msg.Value), err)
  13. msg.Commit()
  14. continue
  15. }
  16. log.Info("Handling message key: %s, value: %s", msg.Key, string(msg.Value))
  17. s.blockBinLogHandle(ctx, blog)
  18. msg.Commit()
  19. }
  20. }
  21. func (s *Service) blockBinLogHandle(ctx context.Context, blog *model.CanalBinLog) {
  22. if len(blog.New) == 0 {
  23. log.Error("Failed to sync to hbase with empty new field: %+v", blog)
  24. return
  25. }
  26. switch blog.Table {
  27. case "block_user":
  28. midl := &model.MidBinLog{}
  29. if err := json.Unmarshal(blog.New, midl); err != nil {
  30. log.Error("Failed to unmarsha new data: %s: %+v", string(blog.New), err)
  31. return
  32. }
  33. // FIXME: 一段时间后改用 syncBlock
  34. if err := s.SyncOne(ctx, midl.Mid); err != nil {
  35. log.Error("Failed to sync block with mid: %d: %+v", midl.Mid, err)
  36. return
  37. }
  38. default:
  39. log.Warn("Unable to hanlde binlog: %+v, old: %s, new: %s", blog, string(blog.Old), string(blog.New))
  40. }
  41. }