service.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net"
  7. "go-common/app/interface/main/account/service/realname/crypto"
  8. "go-common/app/job/main/member/conf"
  9. "go-common/app/job/main/member/dao"
  10. "go-common/app/job/main/member/model"
  11. "go-common/app/job/main/member/model/queue"
  12. "go-common/app/job/main/member/service/block"
  13. memrpc "go-common/app/service/main/member/api/gorpc"
  14. "go-common/library/log"
  15. "go-common/library/log/infoc"
  16. "go-common/library/queue/databus"
  17. "go-common/library/queue/databus/databusutil"
  18. "golang.org/x/time/rate"
  19. )
  20. // Service struct of service.
  21. type Service struct {
  22. c *conf.Config
  23. dao *dao.Dao
  24. block *block.Service
  25. ds *databus.Databus
  26. accDs *databus.Databus
  27. passortDs *databus.Databus
  28. logDatabus *databus.Databus
  29. expDatabus *databus.Databus
  30. realnameDatabus *databus.Databus
  31. shareMidDatabus *databus.Databus
  32. loginGroup *databusutil.Group
  33. awardGroup *databusutil.Group
  34. memrpc *memrpc.Service
  35. cachepq *queue.PriorityQueue
  36. alipayCryptor *crypto.Alipay
  37. limiter *limiter
  38. ParsedRealnameInfoc *infoc.Infoc
  39. }
  40. type limiter struct {
  41. UpdateExp *rate.Limiter
  42. }
  43. // New create service instance and return.
  44. func New(c *conf.Config) (s *Service) {
  45. s = &Service{
  46. c: c,
  47. ds: databus.New(c.DataBus),
  48. accDs: databus.New(c.AccDataBus),
  49. passortDs: databus.New(c.PassortDataBus),
  50. logDatabus: databus.New(c.LogDatabus),
  51. expDatabus: databus.New(c.ExpDatabus),
  52. realnameDatabus: databus.New(c.RealnameDatabus),
  53. shareMidDatabus: databus.New(c.ShareMidDatabus),
  54. alipayCryptor: crypto.NewAlipay(string(c.RealnameAlipayPub), string(c.RealnameAlipayBiliPriv)),
  55. loginGroup: databusutil.NewGroup(c.Databusutil, databus.New(c.LoginDatabus).Messages()),
  56. awardGroup: databusutil.NewGroup(c.Databusutil, databus.New(c.AwardDatabus).Messages()),
  57. memrpc: memrpc.New(nil),
  58. cachepq: queue.NewPriorityQueue(1024, false),
  59. limiter: &limiter{
  60. UpdateExp: rate.NewLimiter(200, 10),
  61. },
  62. ParsedRealnameInfoc: infoc.New(c.ParsedRealnameInfoc),
  63. }
  64. s.dao = dao.New(c)
  65. s.block = block.New(c, s.dao.BlockImpl(), databus.New(c.BlockCreditDatabus))
  66. s.loginGroup.New = newMsg
  67. s.loginGroup.Split = split
  68. s.loginGroup.Do = s.awardDo
  69. s.awardGroup.New = newMsg
  70. s.awardGroup.Split = split
  71. s.awardGroup.Do = s.awardDo
  72. s.loginGroup.Start()
  73. s.awardGroup.Start()
  74. go s.passportSubproc()
  75. go s.realnameSubproc()
  76. go s.realnamealipaycheckproc()
  77. go s.cachedelayproc(context.Background())
  78. go s.shareMidproc()
  79. accproc := int32(10)
  80. expproc := int32(1)
  81. if c.Biz.AccprocCount > accproc {
  82. accproc = c.Biz.AccprocCount
  83. }
  84. if c.Biz.ExpprocCount > expproc {
  85. expproc = c.Biz.ExpprocCount
  86. }
  87. log.Info("Starting %d account sub proc", accproc)
  88. for i := 0; i < int(accproc); i++ {
  89. go s.subproc()
  90. go s.accSubproc()
  91. go s.logproc()
  92. }
  93. log.Info("Starting %d exp sub proc", expproc)
  94. for i := 0; i < int(expproc); i++ {
  95. go s.expproc()
  96. }
  97. if s.c.FeatureGates.DataFixer && s.dao.LeaderEleciton(context.Background()) {
  98. fmt.Println("Leader elected")
  99. // 数据检查
  100. s.makeChan(30)
  101. go s.dataCheckMids()
  102. for i := 0; i < 60; i++ {
  103. go s.dataFixer(csclice[i%30])
  104. }
  105. }
  106. return
  107. }
  108. func split(msg *databus.Message, data interface{}) int {
  109. t, ok := data.(*model.LoginLogIPString)
  110. if !ok {
  111. return 0
  112. }
  113. return int(t.Mid)
  114. }
  115. func newMsg(msg *databus.Message) (res interface{}, err error) {
  116. llm := new(model.LoginLogIPString)
  117. ll := new(model.LoginLog)
  118. if err = json.Unmarshal(msg.Value, &llm); err != nil {
  119. if err = json.Unmarshal(msg.Value, &ll); err != nil {
  120. msg.Commit()
  121. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  122. return
  123. }
  124. llm.Mid = ll.Mid
  125. llm.Timestamp = ll.Timestamp
  126. llm.Loginip = inetNtoA(uint32(ll.Loginip))
  127. }
  128. res = llm
  129. return
  130. }
  131. func inetNtoA(sum uint32) string {
  132. ip := make(net.IP, net.IPv4len)
  133. ip[0] = byte((sum >> 24) & 0xFF)
  134. ip[1] = byte((sum >> 16) & 0xFF)
  135. ip[2] = byte((sum >> 8) & 0xFF)
  136. ip[3] = byte(sum & 0xFF)
  137. return ip.String()
  138. }
  139. // Ping check service health.
  140. func (s *Service) Ping(c context.Context) error {
  141. return s.dao.Ping(c)
  142. }
  143. // Close kafka consumer close.
  144. func (s *Service) Close() (err error) {
  145. if err = s.ds.Close(); err != nil {
  146. log.Error("s.ds.Close(),err(%v)", err)
  147. }
  148. if err = s.accDs.Close(); err != nil {
  149. log.Error("s.accDs.Close(),err(%v)", err)
  150. }
  151. if err = s.passortDs.Close(); err != nil {
  152. log.Error("s.passportDs.Close(),err(%v)", err)
  153. }
  154. s.block.Close()
  155. return
  156. }