service.go 4.9 KB


  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "go-common/app/job/main/figure-timer/conf"
  8. "go-common/app/job/main/figure-timer/dao"
  9. "go-common/app/job/main/figure-timer/model"
  10. "go-common/library/log"
  11. "github.com/robfig/cron"
  12. )
  13. // Service struct of service.
  14. type Service struct {
  15. c *conf.Config
  16. dao dao.Int
  17. missch chan func()
  18. curVer int64
  19. cron *cron.Cron
  20. }
  21. // New create service instance and return.
  22. func New(c *conf.Config) (s *Service) {
  23. s = &Service{
  24. c: c,
  25. dao: dao.New(c),
  26. missch: make(chan func(), 1024),
  27. }
  28. s.cron = cron.New()
  29. s.cron.AddFunc(s.c.Property.CycleCron, s.cycleproc)
  30. if c.Property.CycleAll {
  31. s.cron.AddFunc(s.c.Property.CycleAllCron, s.cycleallproc)
  32. }
  33. go s.missproc()
  34. if c.Property.FixRecord {
  35. go s.fixproc()
  36. }
  37. s.cron.Start()
  38. return
  39. }
  40. func (s *Service) missproc() {
  41. defer func() {
  42. if x := recover(); x != nil {
  43. log.Error("s.missproc panic(%v)", x)
  44. go s.missproc()
  45. log.Info("s.missproc recover")
  46. }
  47. }()
  48. for {
  49. for fn := range s.missch {
  50. fn()
  51. }
  52. }
  53. }
  54. func (s *Service) cycleproc() {
  55. defer func() {
  56. if x := recover(); x != nil {
  57. log.Error("s.cycleproc panic(%v)", x)
  58. go s.cycleproc()
  59. log.Info("s.cycleproc recover")
  60. }
  61. }()
  62. var (
  63. err error
  64. mids []int64
  65. c = context.TODO()
  66. wg sync.WaitGroup
  67. newVer = weekVersion(time.Now().AddDate(0, 0, int(-s.c.Property.CalcWeekOffset*7)))
  68. )
  69. // Refresh Version
  70. atomic.StoreInt64(&s.curVer, newVer)
  71. log.Info("Calc active users start ver [%d]", s.curVer)
  72. rank.Init()
  73. // Calc figure concurrently
  74. for i := s.c.Property.PendingMidStart; i < s.c.Property.PendingMidShard; i++ {
  75. if mids, err = s.PendingMids(c, s.curVer, i, s.c.Property.PendingMidRetry); err != nil {
  76. log.Error("%+v", err)
  77. }
  78. if len(mids) == 0 {
  79. continue
  80. }
  81. smids := splitMids(mids, s.c.Property.ConcurrencySize)
  82. for c := range smids {
  83. csmids := smids[c]
  84. wg.Add(1)
  85. go func() {
  86. defer func() {
  87. wg.Done()
  88. }()
  89. for _, mid := range csmids {
  90. log.Info("Start handle mid [%d] figure ver [%d]", mid, s.curVer)
  91. if err = s.HandleFigure(context.TODO(), mid, s.curVer); err != nil {
  92. log.Error("%+v", err)
  93. }
  94. }
  95. }()
  96. }
  97. wg.Wait()
  98. }
  99. log.Info("Calc rank info start [%d]", s.curVer)
  100. s.calcRank(c, s.curVer)
  101. log.Info("Calc rank info finished [%d]", s.curVer)
  102. log.Info("Calc active users finished ver [%d]", s.curVer)
  103. }
  104. func splitMids(mids []int64, concurrencySize int64) (smids [][]int64) {
  105. if len(mids) == 0 {
  106. return
  107. }
  108. if concurrencySize == 0 {
  109. concurrencySize = 1
  110. }
  111. step := int64(len(mids))/concurrencySize + 1
  112. for c := int64(0); c < concurrencySize; c++ {
  113. var cMids []int64
  114. indexFrom := c * step
  115. indexTo := (c + 1) * step
  116. if indexFrom >= int64(len(mids)) {
  117. break
  118. }
  119. if indexTo >= int64(len(mids)) {
  120. cMids = mids[indexFrom:]
  121. } else {
  122. cMids = mids[indexFrom:indexTo]
  123. }
  124. smids = append(smids, cMids)
  125. }
  126. return
  127. }
  128. // PendingMids get pending mid list with retry
  129. func (s *Service) PendingMids(c context.Context, version int64, shard int64, retry int64) (mids []int64, err error) {
  130. var (
  131. maxDo = retry + 1
  132. doTimes int64
  133. )
  134. for doTimes < maxDo {
  135. if mids, err = s.dao.PendingMidsCache(c, s.curVer, shard); err != nil {
  136. doTimes++
  137. log.Info("s.dao.PendingMidsCache(%d,%d) retry (%d) error (%+v)", version, shard, doTimes, err)
  138. } else {
  139. doTimes = maxDo
  140. }
  141. }
  142. return
  143. }
  144. func (s *Service) cycleallproc() {
  145. defer func() {
  146. if x := recover(); x != nil {
  147. log.Error("cycleallproc panic(%+v)", x)
  148. }
  149. }()
  150. var (
  151. ctx = context.TODO()
  152. err error
  153. newVer = weekVersion(time.Now().AddDate(0, 0, int(-s.c.Property.CalcWeekOffset*7)))
  154. )
  155. // Refresh Version
  156. atomic.StoreInt64(&s.curVer, newVer)
  157. log.Info("cycleallproc active users start ver [%d]", s.curVer)
  158. rank.Init()
  159. for shard := s.c.Property.PendingMidStart; shard < 100; shard++ {
  160. log.Info("cycleallproc start run: %d", shard)
  161. var (
  162. figures []*model.Figure
  163. fromMid = int64(shard)
  164. end bool
  165. )
  166. for !end {
  167. if figures, end, err = s.dao.Figures(ctx, fromMid, 100); err != nil {
  168. log.Error("%+v", err)
  169. break
  170. }
  171. if len(figures) == 0 {
  172. continue
  173. }
  174. for _, figure := range figures {
  175. if fromMid < figure.Mid {
  176. fromMid = figure.Mid
  177. }
  178. log.Info("Start handle mid [%d] figure ver [%d]", figure.Mid, s.curVer)
  179. if err = s.HandleFigure(ctx, figure.Mid, s.curVer); err != nil {
  180. log.Error("%+v", err)
  181. continue
  182. }
  183. }
  184. }
  185. log.Info("cycleallproc rank info start [%d]", s.curVer)
  186. s.calcRank(ctx, s.curVer)
  187. log.Info("cycleallproc rank info finished [%d]", s.curVer)
  188. log.Info("cycleallproc active users finished ver [%d]", s.curVer)
  189. }
  190. }
  191. // Close kafka consumer close.
  192. func (s *Service) Close() (err error) {
  193. s.dao.Close()
  194. return
  195. }
  196. // Wait wait service end.
  197. func (s *Service) Wait() {
  198. }
  199. // Ping check service health.
  200. func (s *Service) Ping(c context.Context) error {
  201. return s.dao.Ping(c)
  202. }