statistics.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. "go-common/app/interface/main/push-archive/dao"
  8. "go-common/app/interface/main/push-archive/model"
  9. "go-common/library/log"
  10. )
  11. // 切割需要存储的统计数据
  12. func (s *Service) formStatisticsProc(aid int64, group string, included []int64, excluded *[]int64) {
  13. params := model.NewBatchParam(map[string]interface{}{
  14. "aid": aid,
  15. "group": group,
  16. "type": model.StatisticsPush,
  17. "createdTime": time.Now(),
  18. }, nil)
  19. dao.Batch(&included, 1000, 2, params, s.formPushStatistic)
  20. params.Params["type"] = model.StatisticsUnpush
  21. dao.Batch(excluded, 1000, 2, params, s.formPushStatistic)
  22. }
  23. // 组建统计对象
  24. func (s *Service) formPushStatistic(fans *[]int64, params map[string]interface{}) (err error) {
  25. ln := len(*fans)
  26. b, err := json.Marshal(*fans)
  27. if err != nil {
  28. log.Error("formStatistic json.Marshal error(%v) fans(%v) params(%v)", err, fans, params)
  29. return
  30. }
  31. ps := &model.PushStatistic{
  32. Aid: params["aid"].(int64),
  33. Group: params["group"].(string),
  34. Type: params["type"].(int),
  35. Mids: string(b),
  36. MidsCounter: ln,
  37. CTime: params["createdTime"].(time.Time),
  38. }
  39. if err = s.dao.AddStatisticsCache(context.TODO(), ps); err != nil {
  40. log.Error("formPushStatistic s.dao.AddStatisticsCache error(%v), pushstatistic(%v)", err, ps)
  41. return
  42. }
  43. return
  44. }
  45. // 每日定时清除推送的统计数据,只保留最近几天的数据
  46. func (s *Service) clearStatisticsProc() {
  47. for {
  48. // 到指定时间
  49. clearTime, err := s.getTodayTime(s.c.ArcPush.PushStatisticsClearTime)
  50. if err != nil {
  51. log.Error("clearStatisticsProc getTodayTime(%s) error(%v)", s.c.ArcPush.PushStatisticsClearTime, err)
  52. continue
  53. }
  54. dur := clearTime.Unix() - time.Now().Unix()
  55. if dur < 0 || dur > 60 {
  56. time.Sleep(time.Second * 50)
  57. continue
  58. }
  59. // 需要删除数据的最大时间
  60. time.Sleep(time.Second * 60)
  61. deadline, err := s.getDeadline()
  62. if err != nil {
  63. log.Error("clearStatisticsProc getDeadline error(%v)", err)
  64. continue
  65. }
  66. log.Info("start to clear statistics before deadline(%s)", deadline.Format("2006-01-02 15:04:05"))
  67. var min, max, mid int64
  68. for i := 0; i < 3; i++ {
  69. if min == 0 && max == 0 {
  70. min, max, err = s.dao.GetStatisticsIDRange(context.TODO(), deadline)
  71. mid = min
  72. }
  73. for err == nil && mid < max {
  74. min = mid
  75. mid = min + 5000
  76. if mid > max {
  77. mid = max
  78. }
  79. _, err = s.dao.DelStatisticsByID(context.TODO(), min, mid)
  80. time.Sleep(time.Second)
  81. }
  82. if err == nil {
  83. log.Info("success end clear statistics before deadline(%s)", deadline.Format("2006-01-02 15:04:05"))
  84. break
  85. }
  86. }
  87. if err != nil {
  88. s.dao.WechatMessage(fmt.Sprintf("clearStatisticsProc: push-archive failed to clear expired(%s) push_statistics, error(%v)", deadline.Format("2006-01-02 15:04:05"), err))
  89. }
  90. }
  91. }
  92. // 获取需要删除数据的 最大时间
  93. func (s *Service) getDeadline() (deadline time.Time, err error) {
  94. dd := "00:00:00"
  95. today, err := s.getTodayTime(dd)
  96. if err != nil {
  97. log.Error("clearStatisticsProc getTodayTime(%s) error(%v)", dd, err)
  98. return
  99. }
  100. deadline = today.AddDate(0, 0, -1*s.c.ArcPush.PushStatisticsKeepDays+1)
  101. return
  102. }
  103. // 统计数据落库
  104. func (s *Service) saveStatisticsProc() {
  105. defer s.wg.Done()
  106. for {
  107. select {
  108. case _, ok := <-s.CloseCh:
  109. if !ok {
  110. log.Info("CloseCh is closed, close the saveStatisticsProc")
  111. return
  112. }
  113. default:
  114. }
  115. ps, err := s.dao.GetStatisticsCache(context.TODO())
  116. if err != nil {
  117. log.Error("saveStatisticsProc s.dao.GetStatisticsCache error(%v)", err)
  118. time.Sleep(time.Millisecond * 100)
  119. continue
  120. }
  121. if ps == nil {
  122. time.Sleep(time.Millisecond * 100)
  123. continue
  124. }
  125. if _, err := s.dao.SetStatistics(context.TODO(), ps); err != nil {
  126. log.Error("saveStatisticsProc s.dao.SetStatistics error(%v) pushstatistic(%v)", err, ps)
  127. s.dao.AddStatisticsCache(context.TODO(), ps)
  128. }
  129. time.Sleep(time.Millisecond * 100)
  130. }
  131. }