action.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "strconv"
  7. "time"
  8. "go-common/app/job/main/activity/model/like"
  9. "go-common/library/log"
  10. )
  11. // actionDealProc .
  12. func (s *Service) actionDealProc(i int) {
  13. defer s.waiter.Done()
  14. var (
  15. ch = s.subActionCh[i]
  16. sm = s.actionSM[i]
  17. ls *like.LastTmStat
  18. )
  19. for {
  20. ms, ok := <-ch
  21. if !ok {
  22. s.multiUpActDB(i, sm)
  23. log.Warn("s.actionDealProc(%d) quit", i)
  24. return
  25. }
  26. if ls, ok = sm[ms.Lid]; !ok {
  27. ls = &like.LastTmStat{Last: time.Now().Unix()}
  28. sm[ms.Lid] = ls
  29. // the first time update db.
  30. s.updateActDB([]int64{ms.Lid})
  31. }
  32. if time.Now().Unix()-ls.Last > 60 {
  33. s.updateActDB([]int64{ms.Lid})
  34. delete(sm, ms.Lid)
  35. }
  36. log.Info("s.actionDealProc(%d) lid:%d time:%d", i, ms.Lid, ls.Last)
  37. }
  38. }
  39. // updateActDB batch to deal like_extend.
  40. func (s *Service) updateActDB(lids []int64) {
  41. var (
  42. c = context.Background()
  43. insertExt []*like.Extend
  44. )
  45. if len(lids) == 0 {
  46. return
  47. }
  48. lidLike, err := s.dao.BatchLikeActSum(c, lids)
  49. if err != nil {
  50. log.Error("s.dao.BatchLikeActSum(%v) error(%+v)", lids, err)
  51. return
  52. }
  53. insertExt = make([]*like.Extend, 0, len(lids))
  54. for _, v := range lids {
  55. if _, ok := lidLike[v]; ok {
  56. insertExt = append(insertExt, &like.Extend{Lid: v, Like: lidLike[v]})
  57. } else {
  58. log.Warn("s.updateActDB() data has not found")
  59. }
  60. }
  61. if len(insertExt) == 0 {
  62. return
  63. }
  64. s.BatchInsertLikeExtend(c, insertExt)
  65. }
  66. // multiUpActDB division sm data .
  67. func (s *Service) multiUpActDB(yu int, sm map[int64]*like.LastTmStat) {
  68. var (
  69. i int
  70. startLids = [1000]int64{}
  71. lids = startLids[:0]
  72. )
  73. log.Info("start close(%d) multiUpActDB start", yu)
  74. for lid := range sm {
  75. lids = append(lids, lid)
  76. i++
  77. if i%1000 == 0 {
  78. s.updateActDB(lids)
  79. lids = startLids[:0]
  80. }
  81. }
  82. if len(lids) > 0 {
  83. s.updateActDB(lids)
  84. }
  85. log.Info("start close(%d) multiUpActDB end", yu)
  86. }
  87. // BatchInsertLikeExtend batch insert like_extend table.
  88. func (s *Service) BatchInsertLikeExtend(c context.Context, extends []*like.Extend) (res int64, err error) {
  89. var buf bytes.Buffer
  90. cnt := 0
  91. rows := int64(0)
  92. for _, v := range extends {
  93. buf.WriteString("(")
  94. buf.WriteString(strconv.FormatInt(v.Lid, 10))
  95. buf.WriteString(",")
  96. buf.WriteString(strconv.FormatInt(v.Like, 10))
  97. buf.WriteString("),")
  98. cnt++
  99. if cnt%500 == 0 {
  100. buf.Truncate(buf.Len() - 1)
  101. if rows, err = s.dao.AddExtend(c, buf.String()); err != nil {
  102. log.Error("s.dao.dealAddExtend() error(%+v)", err)
  103. return
  104. }
  105. res += rows
  106. buf.Reset()
  107. }
  108. }
  109. if buf.Len() > 0 {
  110. buf.Truncate(buf.Len() - 1)
  111. if rows, err = s.dao.AddExtend(c, buf.String()); err != nil {
  112. log.Error("s.dao.dealAddExtend() error(%+v)", err)
  113. return
  114. }
  115. res += rows
  116. }
  117. return
  118. }
  119. // actionProc .
  120. func (s *Service) actionProc(c context.Context, msg json.RawMessage) (err error) {
  121. var (
  122. act = new(like.Action)
  123. )
  124. if err = json.Unmarshal(msg, act); err != nil {
  125. log.Error("actionProc json.Unmarshal(%s) error(%v)", msg, err)
  126. return
  127. }
  128. s.subActionCh[act.Lid%_sharding] <- act
  129. return
  130. }