service.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package bnj
  2. import (
  3. "context"
  4. "strconv"
  5. "sync/atomic"
  6. "time"
  7. "go-common/app/interface/main/activity/conf"
  8. "go-common/app/interface/main/activity/dao/bnj"
  9. "go-common/app/interface/main/activity/dao/like"
  10. bnjmdl "go-common/app/interface/main/activity/model/bnj"
  11. arcclient "go-common/app/service/main/archive/api"
  12. "go-common/library/log"
  13. "go-common/library/queue/databus"
  14. )
  15. // Service .
  16. type Service struct {
  17. c *conf.Config
  18. arcClient arcclient.ArchiveClient
  19. dao *bnj.Dao
  20. likeDao *like.Dao
  21. resetPub *databus.Databus
  22. previewArcs map[int64]*arcclient.Arc
  23. bnjAdmins map[int64]struct{}
  24. likeCount int64
  25. timeReset int64
  26. resetMid int64
  27. timeFinish int64
  28. resetCD int32
  29. }
  30. // New init bnj service.
  31. func New(c *conf.Config) (s *Service) {
  32. s = &Service{
  33. c: c,
  34. dao: bnj.New(c),
  35. likeDao: like.New(c),
  36. resetPub: databus.New(c.Databus.Bnj),
  37. }
  38. var err error
  39. if s.arcClient, err = arcclient.NewClient(c.ArcClient); err != nil {
  40. panic(err)
  41. }
  42. if s.c.Bnj2019.AdminCheck != 0 {
  43. tmp := make(map[int64]struct{}, len(s.c.Bnj2019.Admins))
  44. for _, mid := range s.c.Bnj2019.Admins {
  45. tmp[mid] = struct{}{}
  46. }
  47. s.bnjAdmins = tmp
  48. }
  49. go s.bnjTimeproc()
  50. return s
  51. }
  52. // Close .
  53. func (s *Service) Close() {
  54. s.dao.Close()
  55. s.resetPub.Close()
  56. }
  57. func (s *Service) timeResetproc() {
  58. for {
  59. time.Sleep(time.Second)
  60. if s.timeFinish != 0 {
  61. log.Info("timeResetproc finish")
  62. break
  63. }
  64. if s.timeReset == 1 {
  65. // sub databus
  66. msg := &bnjmdl.ResetMsg{Mid: s.resetMid, Ts: time.Now().Unix()}
  67. if err := s.resetPub.Send(context.Background(), strconv.FormatInt(s.resetMid, 10), msg); err != nil {
  68. log.Error("timeResetproc s.resetPub.Send(%+v) error(%v)", msg, err)
  69. }
  70. atomic.StoreInt64(&s.timeReset, 0)
  71. atomic.StoreInt64(&s.resetMid, 0)
  72. }
  73. }
  74. }
  75. func (s *Service) timeFinishproc() {
  76. for {
  77. time.Sleep(time.Second)
  78. if value, err := s.dao.CacheTimeFinish(context.Background()); err != nil {
  79. log.Error("timeFinishproc s.dao.CacheTimeFinish error(%v)")
  80. } else if value > 0 {
  81. log.Info("timeFinishproc cache value finish")
  82. atomic.StoreInt64(&s.timeFinish, value)
  83. }
  84. }
  85. }
  86. func (s *Service) bnjTimeproc() {
  87. for {
  88. time.Sleep(time.Second)
  89. if time.Now().Unix() > s.c.Bnj2019.Start.Unix() {
  90. go s.timeResetproc()
  91. go s.timeFinishproc()
  92. go s.bnjResetCDproc()
  93. go s.bnjArcproc()
  94. log.Info("bnjTimeproc start")
  95. break
  96. }
  97. }
  98. }
  99. func (s *Service) bnjResetCDproc() {
  100. for {
  101. time.Sleep(time.Second)
  102. lid := s.c.Bnj2019.SubID
  103. scoreMap, err := s.likeDao.LikeActLidCounts(context.Background(), []int64{lid})
  104. if err != nil || scoreMap == nil {
  105. log.Error("bnjScoreproc s.likeDao.LikeActLidCounts(%d) error(%v)", lid, err)
  106. continue
  107. }
  108. if score, ok := scoreMap[lid]; ok {
  109. if score >= s.c.Bnj2019.Reward[len(s.c.Bnj2019.Reward)-1].Condition && s.resetCD != _lastCD {
  110. atomic.StoreInt32(&s.resetCD, _lastCD)
  111. log.Info("bnjResetCDproc finish")
  112. }
  113. if score > s.likeCount {
  114. atomic.StoreInt64(&s.likeCount, score)
  115. }
  116. }
  117. }
  118. }
  119. func (s *Service) bnjArcproc() {
  120. for {
  121. time.Sleep(time.Second)
  122. now := time.Now().Unix()
  123. var aids []int64
  124. for _, v := range s.c.Bnj2019.Info {
  125. if v.Publish.Unix() < now {
  126. if v.Aid > 0 {
  127. aids = append(aids, v.Aid)
  128. }
  129. }
  130. }
  131. if len(aids) > 0 {
  132. if arcsReply, err := s.arcClient.Arcs(context.Background(), &arcclient.ArcsRequest{Aids: aids}); err != nil {
  133. log.Error("bnjArcproc s.arcClient.Arcs(%v) error(%v)", aids, err)
  134. } else if len(arcsReply.Arcs) > 0 {
  135. tmp := make(map[int64]*arcclient.Arc, len(aids))
  136. for _, aid := range aids {
  137. if arc, ok := arcsReply.Arcs[aid]; ok && arc != nil {
  138. tmp[aid] = arc
  139. } else {
  140. log.Error("bnjArcproc aid(%d) data(%v)", aid, arc)
  141. continue
  142. }
  143. }
  144. s.previewArcs = tmp
  145. }
  146. }
  147. log.Error("bnjArcproc aids(%v) conf error", aids)
  148. }
  149. }