service.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package Service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strings"
  6. "sync"
  7. "time"
  8. "go-common/app/job/live/wallet/conf"
  9. "go-common/app/job/live/wallet/dao"
  10. "go-common/app/job/live/wallet/model"
  11. "go-common/library/log"
  12. "go-common/library/queue/databus"
  13. )
  14. // Service struct
  15. type Service struct {
  16. c *conf.Config
  17. dao *dao.Dao
  18. userSub *databus.Databus
  19. waiter *sync.WaitGroup
  20. userUpMo int64
  21. }
  22. // New init
  23. func New(c *conf.Config) (s *Service) {
  24. s = &Service{
  25. c: c,
  26. dao: dao.New(c),
  27. userSub: databus.New(c.UserSub),
  28. waiter: new(sync.WaitGroup),
  29. }
  30. s.waiter.Add(1)
  31. go s.userCanalConsumeproc()
  32. go s.checkUserCanalConsumeproc()
  33. return s
  34. }
  35. // Ping Service
  36. func (s *Service) Ping(c context.Context) (err error) {
  37. return s.dao.Ping(c)
  38. }
  39. // Close Service
  40. func (s *Service) Close() {
  41. defer s.waiter.Wait()
  42. s.userSub.Close()
  43. s.dao.Close()
  44. }
  45. // Wait goroutinue to close
  46. func (s *Service) Wait() {
  47. s.waiter.Wait()
  48. }
  49. // expCanalConsumeproc consumer archive
  50. func (s *Service) userCanalConsumeproc() {
  51. var (
  52. msgs = s.userSub.Messages()
  53. err error
  54. )
  55. defer s.waiter.Done()
  56. for {
  57. msg, ok := <-msgs
  58. if !ok {
  59. log.Info("userCanal databus Consumer exit")
  60. return
  61. }
  62. s.userUpMo++
  63. msg.Commit()
  64. m := &model.Message{}
  65. //log.Info("canal message %s", msg.Value)
  66. if err = json.Unmarshal(msg.Value, m); err != nil {
  67. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  68. continue
  69. }
  70. if !strings.HasPrefix(m.Table, "user_") || (m.Action != "update" && m.Action != "insert") {
  71. continue
  72. }
  73. s.mergeData(m.New, m.Old, m.Action)
  74. }
  75. }
  76. // checkConsumeproc check consumer stat
  77. func (s *Service) checkUserCanalConsumeproc() {
  78. if s.c.Env != "pro" {
  79. return
  80. }
  81. var userMo int64
  82. for {
  83. time.Sleep(1 * time.Minute)
  84. if s.userUpMo-userMo == 0 {
  85. msg := "live-wallet-job userCanal did not consume within a minute"
  86. //s.dao.SendSMS(msg)
  87. log.Warn(msg)
  88. }
  89. userMo = s.userUpMo
  90. }
  91. }