top.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package service
  2. import (
  3. "bytes"
  4. "container/heap"
  5. "context"
  6. "strconv"
  7. "time"
  8. "go-common/app/job/main/up-rating/model"
  9. )
  10. // ratingTop get top ups
  11. func (s *Service) ratingTop(c context.Context, date time.Time, source chan []*model.Rating) (topRating map[int]map[int64]*RatingHeap, err error) {
  12. topRating = make(map[int]map[int64]*RatingHeap) // map[ctype][tagID]
  13. topRating[CreativeType] = make(map[int64]*RatingHeap)
  14. topRating[InfluenceType] = make(map[int64]*RatingHeap)
  15. for rating := range source {
  16. for _, r := range rating {
  17. if _, ok := topRating[CreativeType][r.TagID]; !ok {
  18. topRating[CreativeType][r.TagID] = &RatingHeap{}
  19. }
  20. pushTopRating(topRating[CreativeType][r.TagID], CreativeType, r)
  21. if _, ok := topRating[InfluenceType][r.TagID]; !ok {
  22. topRating[InfluenceType][r.TagID] = &RatingHeap{}
  23. }
  24. pushTopRating(topRating[InfluenceType][r.TagID], InfluenceType, r)
  25. }
  26. }
  27. return
  28. }
  29. func pushTopRating(h *RatingHeap, ctype int, r *model.Rating) {
  30. tr := &model.TopRating{
  31. MID: r.MID,
  32. CType: ctype,
  33. TagID: r.TagID,
  34. }
  35. switch ctype {
  36. case CreativeType:
  37. tr.Score = r.CreativityScore
  38. case InfluenceType:
  39. tr.Score = r.InfluenceScore
  40. }
  41. heap.Push(h, tr)
  42. if h.Len() > 10 {
  43. heap.Pop(h)
  44. }
  45. }
  46. func (s *Service) insertTopRating(c context.Context, date time.Time, topRating map[int]map[int64]*RatingHeap, baseInfo map[int64]*model.BaseInfo) (rows int64, err error) {
  47. return s.dao.InsertTopRating(c, assemberTopRating(date, topRating, baseInfo))
  48. }
  49. func assemberTopRating(date time.Time, topRating map[int]map[int64]*RatingHeap, baseInfo map[int64]*model.BaseInfo) (vals string) {
  50. var buf bytes.Buffer
  51. for _, tagTop := range topRating {
  52. for _, h := range tagTop {
  53. for h.Len() > 0 {
  54. tr := heap.Pop(h).(*model.TopRating)
  55. info := baseInfo[tr.MID]
  56. if info == nil {
  57. info = &model.BaseInfo{}
  58. }
  59. buf.WriteString("(")
  60. buf.WriteString(strconv.FormatInt(tr.MID, 10))
  61. buf.WriteByte(',')
  62. buf.WriteString(strconv.Itoa(tr.CType))
  63. buf.WriteByte(',')
  64. buf.WriteString(strconv.FormatInt(tr.TagID, 10))
  65. buf.WriteByte(',')
  66. buf.WriteString(strconv.FormatInt(tr.Score, 10))
  67. buf.WriteByte(',')
  68. buf.WriteString(strconv.FormatInt(info.TotalFans, 10))
  69. buf.WriteByte(',')
  70. buf.WriteString(strconv.FormatInt(info.TotalPlay, 10))
  71. buf.WriteByte(',')
  72. buf.WriteString("'" + date.Format(_layout) + "'")
  73. buf.WriteString(")")
  74. buf.WriteByte(',')
  75. }
  76. }
  77. }
  78. if buf.Len() > 0 {
  79. buf.Truncate(buf.Len() - 1)
  80. }
  81. vals = buf.String()
  82. buf.Reset()
  83. return
  84. }
  85. // RatingHeap rating heap for topK
  86. type RatingHeap []*model.TopRating
  87. // Len len
  88. func (r RatingHeap) Len() int { return len(r) }
  89. // Less less
  90. func (r RatingHeap) Less(i, j int) bool { return r[i].Score < r[j].Score }
  91. // Swap swap
  92. func (r RatingHeap) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
  93. // Push push to heap
  94. func (r *RatingHeap) Push(x interface{}) {
  95. *r = append(*r, x.(*model.TopRating))
  96. }
  97. // Pop pop from heap
  98. func (r *RatingHeap) Pop() interface{} {
  99. old := *r
  100. n := len(old)
  101. x := old[n-1]
  102. *r = old[0 : n-1]
  103. return x
  104. }