run.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package service
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/job/main/up-rating/model"
  6. "go-common/library/log"
  7. "golang.org/x/sync/errgroup"
  8. )
  9. // Run run scores
  10. func (s *Service) Run(c context.Context, date time.Time) (err error) {
  11. date = time.Date(date.Year(), date.Month(), 1, 0, 0, 0, 0, time.Local)
  12. // get weight parameters
  13. params, err := s.getAllParamter(c)
  14. if err != nil {
  15. log.Error("s.getAllParamter error(%v)", err)
  16. return
  17. }
  18. // get past scores
  19. past, err := s.pastInfos(c)
  20. if err != nil {
  21. log.Error("s.pastInfos error(%v)", err)
  22. return
  23. }
  24. err = s.PrepareData(c, date, past, params)
  25. if err != nil {
  26. log.Error("s.PrepareDate error(%v)", err)
  27. return
  28. }
  29. err = s.CalScores(c, date, past, params)
  30. if err != nil {
  31. log.Error("s.CalScores error(%v)", err)
  32. return
  33. }
  34. err = s.delOldPastInfo(c, int64(_limit))
  35. if err != nil {
  36. log.Error("s.delOldPastInfo error(%v)", err)
  37. return
  38. }
  39. err = s.insertPastRecord(c, 0, date.AddDate(0, 1, 0).Format(_layout))
  40. if err != nil {
  41. log.Error("s.insertPastRecord error(%v)", err)
  42. return
  43. }
  44. log.Info("run data read finished")
  45. err = s.InsertTaskStatus(c, 0, 1, date.Format(_layout), "ok")
  46. return
  47. }
  48. // PrepareData prepare old data
  49. func (s *Service) PrepareData(c context.Context, date time.Time, past map[int64]*model.Past, params *model.RatingParameter) (err error) {
  50. var (
  51. g errgroup.Group
  52. routines = s.conf.Con.Concurrent
  53. lastMonth = time.Date(date.Year(), date.Month()-1, 1, 0, 0, 0, 0, time.Local)
  54. )
  55. offset, end, _, err := s.RatingOffEnd(c, lastMonth)
  56. if err != nil {
  57. return
  58. }
  59. total := end - offset
  60. section := (total - total%routines) / routines
  61. for i := 0; i < routines; i++ {
  62. begin := section*i + offset
  63. over := begin + section
  64. if i == routines-1 {
  65. over = end
  66. }
  67. // read chan: for last ratings
  68. rch := make(chan []*model.Rating, _limit)
  69. g.Go(func() (err error) {
  70. err = s.RatingFast(c, lastMonth, begin, over, rch)
  71. if err != nil {
  72. log.Error("s.RatingFast error(%v)", err)
  73. }
  74. return
  75. })
  76. wch := make(chan []*model.Rating, _limit)
  77. g.Go(func() (err error) {
  78. s.Copy(rch, wch, past, params)
  79. return
  80. })
  81. g.Go(func() (err error) {
  82. err = s.BatchInsertRatingStat(c, wch, date)
  83. if err != nil {
  84. log.Error("s.BatchInsertRatingStat error(%v)", err)
  85. }
  86. return
  87. })
  88. }
  89. if err = g.Wait(); err != nil {
  90. log.Error("run g.Wait error(%v)", err)
  91. return
  92. }
  93. return
  94. }
  95. // CalScores cal scores
  96. func (s *Service) CalScores(c context.Context, date time.Time, past map[int64]*model.Past, params *model.RatingParameter) (err error) {
  97. var (
  98. routines = s.conf.Con.Concurrent
  99. _limit = s.conf.Con.Limit
  100. )
  101. t := time.Now().UnixNano()
  102. var g errgroup.Group
  103. // get id start:end by date
  104. offset, end, err := s.BaseInfoOffEnd(c, date)
  105. if err != nil {
  106. return
  107. }
  108. total := end - offset
  109. section := (total - total%routines) / routines
  110. // parallelization and pipeling
  111. for i := 0; i < routines; i++ {
  112. begin := section*i + offset
  113. over := begin + section
  114. if i == routines-1 {
  115. over = end
  116. }
  117. // read chan: for origin datas
  118. rch := make(chan []*model.BaseInfo, _limit)
  119. g.Go(func() (err error) {
  120. err = s.BaseInfo(c, date, begin, over, rch)
  121. if err != nil {
  122. log.Error("s.BaseInfo error(%v)", err)
  123. }
  124. return
  125. })
  126. // write chan: for calculated results
  127. wch := make(chan []*model.Rating, _limit)
  128. g.Go(func() (err error) {
  129. s.CalScore(rch, wch, params, past, date)
  130. return
  131. })
  132. g.Go(func() (err error) {
  133. err = s.BatchInsertRatingStat(c, wch, date)
  134. if err != nil {
  135. log.Error("s.BatchInsertRatingStat error(%v)", err)
  136. }
  137. return
  138. })
  139. }
  140. if err = g.Wait(); err != nil {
  141. log.Error("run g.Wait error(%v)", err)
  142. return
  143. }
  144. log.Info("cal time cost:", time.Now().UnixNano()-t)
  145. return
  146. }