service.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "sync"
  6. "time"
  7. "go-common/app/job/main/coin/conf"
  8. "go-common/app/job/main/coin/dao"
  9. "go-common/app/job/main/coin/model"
  10. accrpc "go-common/app/service/main/account/api"
  11. arcrpc "go-common/app/service/main/archive/api/gorpc"
  12. coinrpc "go-common/app/service/main/coin/api/gorpc"
  13. coinmdl "go-common/app/service/main/coin/model"
  14. memrpc "go-common/app/service/main/member/api"
  15. "go-common/library/log"
  16. "go-common/library/queue/databus"
  17. "go-common/library/queue/databus/databusutil"
  18. )
  19. // Service coin job service.
  20. type Service struct {
  21. coinDao *dao.Dao
  22. c *conf.Config
  23. waiter *sync.WaitGroup
  24. accRPC accrpc.AccountClient
  25. memRPC memrpc.MemberClient
  26. arcRPC *arcrpc.Service2
  27. coinRPC *coinrpc.Service
  28. databus *databus.Databus
  29. group *databusutil.Group
  30. expGroup *databusutil.Group
  31. }
  32. // New new and return service.
  33. func New(c *conf.Config) (s *Service) {
  34. s = &Service{
  35. coinDao: dao.New(c),
  36. c: c,
  37. waiter: new(sync.WaitGroup),
  38. arcRPC: arcrpc.New2(c.ArchiveRPC),
  39. coinRPC: coinrpc.New(c.CoinRPC),
  40. }
  41. var err error
  42. if s.memRPC, err = memrpc.NewClient(c.MemRPC); err != nil {
  43. panic(err)
  44. }
  45. if s.accRPC, err = accrpc.NewClient(c.AccountRPC); err != nil {
  46. panic(err)
  47. }
  48. s.databus = databus.New(c.Databus)
  49. g := databusutil.NewGroup(c.Databusutil, databus.New(c.LoginDatabus).Messages())
  50. g.New = newMsg
  51. g.Split = split
  52. g.Do = s.awardDo
  53. g.Start()
  54. s.group = g
  55. eg := databusutil.NewGroup(c.Databusutil, databus.New(c.ExpDatabus).Messages())
  56. eg.New = newExpMsg
  57. eg.Split = split
  58. eg.Do = s.awardDo
  59. eg.Start()
  60. s.expGroup = eg
  61. s.waiter.Add(1)
  62. go s.consumeproc()
  63. go s.settleproc()
  64. return
  65. }
  66. func (s *Service) consumeproc() {
  67. defer s.waiter.Done()
  68. var (
  69. msg *databus.Message
  70. err error
  71. ok bool
  72. period *model.CoinSettlePeriod
  73. ctx = context.TODO()
  74. )
  75. for {
  76. if msg, ok = <-s.databus.Messages(); !ok {
  77. log.Error("s.databus.Message err(%v)", err)
  78. return
  79. }
  80. r := &coinmdl.Record{}
  81. if err = json.Unmarshal([]byte(msg.Value), r); err != nil {
  82. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  83. dao.PromError("msg:JSON")
  84. continue
  85. }
  86. var ip = r.IPV6
  87. for i := 0; i < 3; i++ {
  88. if err = s.addCoinExp(ctx, r.Mid, r.AvType, r.Multiply, ip); err != nil {
  89. time.Sleep(time.Millisecond * 50)
  90. continue
  91. }
  92. break
  93. }
  94. if err != nil {
  95. log.Errorv(ctx, log.KV("log", "fix: addCoinExp"), log.KV("mid", r.Mid), log.KV("type", r.AvType), log.KV("num", r.Multiply), log.KV("ip", ip))
  96. continue
  97. }
  98. if err = s.updateAddCoin(ctx, r); err != nil {
  99. continue
  100. }
  101. at := time.Unix(r.Timestamp, 0)
  102. if period, err = s.coinDao.HitSettlePeriod(ctx, at); err != nil {
  103. log.Errorv(ctx, log.KV("log", "s.coinDao.HitCoinPeriod"), log.KV("record", r), log.KV("err", err))
  104. dao.PromError("service:HitSettlePeriod")
  105. continue
  106. }
  107. for i := 0; ; i++ {
  108. if err = s.coinDao.UpsertSettle(ctx, period.ID, r.Up, r.Aid, r.AvType, r.Multiply, time.Now()); err != nil {
  109. log.Error("s.coinDao.UpsertCoinSettle(%d, %d, %d) error(%v)", r.Up, r.Aid, r.Multiply)
  110. dao.PromError("service:UpsertSettle")
  111. i++
  112. if i > 5 {
  113. // if env.DeployEnv == env.DeployEnvProd {
  114. // s.moni.Sms(ctx, s.c.Sms.Phone, s.c.Sms.Token, "coin-job upsetSettle fail for 5 time")
  115. // }
  116. break
  117. }
  118. continue
  119. }
  120. break
  121. }
  122. log.Info("key: %s,partion:%d,offset:%d success %s", msg.Key, msg.Partition, msg.Offset, msg.Value)
  123. err = msg.Commit()
  124. if err != nil {
  125. log.Error("msg.Commit partition:%d offset:%d err %v", msg.Partition, msg.Offset, err)
  126. dao.PromError("service:msgCommit")
  127. }
  128. }
  129. }
  130. // Close close service.
  131. func (s *Service) Close() {
  132. s.group.Close()
  133. s.expGroup.Close()
  134. s.databus.Close()
  135. }
  136. // Wait wait routine unitl all close.
  137. func (s *Service) Wait() {
  138. s.waiter.Wait()
  139. }
  140. // Ping check service health.
  141. func (s *Service) Ping(c context.Context) error {
  142. return s.coinDao.Ping(c)
  143. }
  144. func (s *Service) updateAddCoin(c context.Context, record *coinmdl.Record) (err error) {
  145. if record == nil {
  146. return
  147. }
  148. if err = s.coinRPC.UpdateAddCoin(c, record); err != nil {
  149. log.Errorv(c, log.KV("log", "UpdateAddCoin"), log.KV("mid", record.Mid), log.KV("record", record))
  150. dao.PromError("service:updateAddCoin")
  151. }
  152. return
  153. }