past.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "strconv"
  6. "time"
  7. "go-common/app/job/main/up-rating/model"
  8. "go-common/library/log"
  9. "golang.org/x/sync/errgroup"
  10. )
  11. var (
  12. _layout = "2006-01-02"
  13. _limit = 2000
  14. )
  15. // RunPastScore run past score by date
  16. func (s *Service) RunPastScore(c context.Context, date time.Time) (err error) {
  17. date = time.Date(date.Year(), date.Month(), 1, 0, 0, 0, 0, time.Local)
  18. times, err := s.getPastRecord(c, date.Format(_layout))
  19. if err != nil {
  20. log.Error("s.getPastRecord error(%v)", err)
  21. return
  22. }
  23. if times < 0 {
  24. log.Info("This month's calculation did not start")
  25. return
  26. }
  27. // 创作力需要计算前22个月的数据
  28. if times >= 22 {
  29. log.Info("Last month's calculation has end")
  30. return
  31. }
  32. var (
  33. readGroup errgroup.Group
  34. cw float64 // 创作力当月权重
  35. iw int64 // 影响力当月权重
  36. pastScore []*model.Past
  37. pastCh = make(chan []*model.Rating, _limit)
  38. )
  39. // 获取前n个月的数据
  40. pastDate := date.AddDate(0, -1*(22-times), 0)
  41. times++ // update calculate times
  42. //csr = csm0 + csm1 + ... + csm11 + 11/12 * csm12 + 10/12 * csm13 + ... 1/12 * csm22
  43. cw = float64(times) / float64(12)
  44. if cw > 1.0 {
  45. cw = 1.0
  46. }
  47. // isr = mfans0 + mfans1 + ... + mfans12
  48. iw = int64(float64(times) / float64(12))
  49. // get past month data
  50. readGroup.Go(func() (err error) {
  51. err = s.RatingInfos(c, pastDate, pastCh)
  52. if err != nil {
  53. log.Error("s.RatingInfos error(%v)", err)
  54. }
  55. return
  56. })
  57. // cal past month data
  58. readGroup.Go(func() (err error) {
  59. pastScore, err = s.calPastScores(c, pastCh, cw, iw)
  60. if err != nil {
  61. log.Error("s.calPastScores error(%v)", err)
  62. }
  63. return
  64. })
  65. if err = readGroup.Wait(); err != nil {
  66. log.Error("run readGroup.Wait error(%v)", err)
  67. return
  68. }
  69. err = s.insertPastRecord(c, times, date.Format(_layout))
  70. if err != nil {
  71. log.Error("s.upPastRecord error(%v)", err)
  72. return
  73. }
  74. err = s.batchInsertPastScore(c, pastScore)
  75. if err != nil {
  76. log.Error("s.batchInsertPastScore error(%v)", err)
  77. }
  78. return
  79. }
  80. // InsertPastRecord insert past record
  81. func (s *Service) InsertPastRecord(c context.Context, date string) (err error) {
  82. return s.insertPastRecord(c, 0, date)
  83. }
  84. func (s *Service) calPastScores(c context.Context, pastRating chan []*model.Rating, cw float64, iw int64) (pastScore []*model.Past, err error) {
  85. pastScore = make([]*model.Past, 0)
  86. for rating := range pastRating {
  87. p := calPastScore(rating, cw, iw)
  88. pastScore = append(pastScore, p...)
  89. }
  90. return
  91. }
  92. func calPastScore(rating []*model.Rating, cw float64, iw int64) (pastScore []*model.Past) {
  93. pastScore = make([]*model.Past, 0, len(rating))
  94. for _, r := range rating {
  95. pastScore = append(pastScore, &model.Past{
  96. MID: r.MID,
  97. MetaCreativityScore: int64(float64(r.MetaCreativityScore) * cw),
  98. MetaInfluenceScore: r.MetaInfluenceScore * iw,
  99. CreditScore: r.CreditScore,
  100. })
  101. }
  102. return
  103. }
  104. // get past calculate record
  105. func (s *Service) getPastRecord(c context.Context, date string) (times int, err error) {
  106. return s.dao.GetPastRecord(c, date)
  107. }
  108. func (s *Service) insertPastRecord(c context.Context, times int, date string) (err error) {
  109. _, err = s.dao.InsertPastRecord(c, times, date)
  110. return err
  111. }
  112. func (s *Service) pastInfos(c context.Context) (past map[int64]*model.Past, err error) {
  113. past = make(map[int64]*model.Past)
  114. var id int64
  115. for {
  116. var p []*model.Past
  117. p, id, err = s.dao.GetPasts(c, id, int64(_limit))
  118. if err != nil {
  119. return
  120. }
  121. for i := 0; i < len(p); i++ {
  122. past[p[i].MID] = p[i]
  123. }
  124. if len(p) < _limit {
  125. break
  126. }
  127. }
  128. return
  129. }
  130. func (s *Service) batchInsertPastScore(c context.Context, past []*model.Past) (err error) {
  131. var (
  132. buff = make([]*model.Past, 2000)
  133. buffEnd = 0
  134. )
  135. for _, p := range past {
  136. buff[buffEnd] = p
  137. buffEnd++
  138. if buffEnd >= 2000 {
  139. values := assemblePastValues(buff[:buffEnd])
  140. buffEnd = 0
  141. _, err = s.dao.InsertPastScoreStat(c, values)
  142. if err != nil {
  143. return
  144. }
  145. }
  146. }
  147. if buffEnd > 0 {
  148. values := assemblePastValues(buff[:buffEnd])
  149. buffEnd = 0
  150. _, err = s.dao.InsertPastScoreStat(c, values)
  151. }
  152. return
  153. }
  154. func assemblePastValues(past []*model.Past) (values string) {
  155. var buf bytes.Buffer
  156. for _, p := range past {
  157. buf.WriteString("(")
  158. buf.WriteString(strconv.FormatInt(p.MID, 10))
  159. buf.WriteByte(',')
  160. buf.WriteString(strconv.FormatInt(p.MetaCreativityScore, 10))
  161. buf.WriteByte(',')
  162. buf.WriteString(strconv.FormatInt(p.MetaInfluenceScore, 10))
  163. buf.WriteByte(',')
  164. buf.WriteString(strconv.FormatInt(p.CreditScore, 10))
  165. buf.WriteString(")")
  166. buf.WriteByte(',')
  167. }
  168. if buf.Len() > 0 {
  169. buf.Truncate(buf.Len() - 1)
  170. }
  171. values = buf.String()
  172. buf.Reset()
  173. return
  174. }
  175. func (s *Service) delOldPastInfo(c context.Context, limit int64) (err error) {
  176. var rows int64
  177. for {
  178. rows, err = s.dao.DelPastStat(c, limit)
  179. if err != nil {
  180. return
  181. }
  182. if rows < limit {
  183. break
  184. }
  185. }
  186. return
  187. }
  188. // DelPastRecord del past record
  189. func (s *Service) DelPastRecord(c context.Context, date time.Time) (err error) {
  190. _, err = s.dao.DelPastRecord(c, date)
  191. return
  192. }