binlog.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "runtime/debug"
  6. "go-common/app/job/main/ugcpay/model"
  7. "go-common/library/log"
  8. "github.com/pkg/errors"
  9. )
  10. const (
  11. _tableOrderUser = "order_user"
  12. _tableAsset = "asset"
  13. _tableAssetRelation = "asset_relation"
  14. )
  15. func (s *Service) binlogproc() (err error) {
  16. defer func() {
  17. if x := recover(); x != nil {
  18. log.Error("binlogproc panic(%+v) :\n %s", x, debug.Stack())
  19. go s.binlogproc()
  20. }
  21. }()
  22. var (
  23. c = context.Background()
  24. )
  25. for res := range s.binlogMQ.Messages() {
  26. if err != nil {
  27. log.Error("%+v", err)
  28. err = nil
  29. }
  30. msg := &model.Message{}
  31. if err = json.Unmarshal(res.Value, msg); err != nil {
  32. err = errors.WithStack(err)
  33. continue
  34. }
  35. switch msg.Table {
  36. case _tableOrderUser:
  37. ms := &model.BinlogOrderUser{}
  38. if err = json.Unmarshal(msg.New, ms); err != nil {
  39. err = errors.Wrapf(err, "%s", msg.New)
  40. continue
  41. }
  42. log.Info("Delete order_user cache : %+v", ms)
  43. if err = s.dao.DelCacheOrderUser(c, ms.OrderID); err != nil {
  44. continue
  45. }
  46. case _tableAsset:
  47. ms := &model.BinlogAsset{}
  48. if err = json.Unmarshal(msg.New, ms); err != nil {
  49. err = errors.Wrapf(err, "%s", msg.New)
  50. continue
  51. }
  52. log.Info("Delete asset cache : %+v", ms)
  53. if err = s.dao.DelCacheAsset(c, ms.OID, ms.OType, ms.Currency); err != nil {
  54. continue
  55. }
  56. case _tableAssetRelation:
  57. ms := &model.BinlogAssetRelation{}
  58. if err = json.Unmarshal(msg.New, ms); err != nil {
  59. err = errors.Wrapf(err, "%s", msg.New)
  60. continue
  61. }
  62. log.Info("Delete asset_relation cache : %+v", ms)
  63. if err = s.dao.DelCacheAssetRelationState(c, ms.OID, ms.OType, ms.MID); err != nil {
  64. continue
  65. }
  66. }
  67. if err = res.Commit(); err != nil {
  68. err = errors.Wrapf(err, "binlogproc commit")
  69. continue
  70. }
  71. log.Info("binlogproc consume key:%v, topic: %v, part:%v, offset:%v, message %s,", res.Key, res.Topic, res.Partition, res.Offset, res.Value)
  72. }
  73. return
  74. }