reply_set.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/job/main/reply-feed/model"
  5. "go-common/library/log"
  6. )
  7. // setReplySetBatch set reply set batch.
  8. func (s *Service) setReplySetBatch(ctx context.Context, oid int64, tp int) (err error) {
  9. var (
  10. stats []*model.ReplyStat
  11. rpIDs []int64
  12. )
  13. // 从DB查出满足热门评论条件的评论ID
  14. if rpIDs, err = s.dao.RpIDs(ctx, oid, tp); err != nil || len(rpIDs) <= 0 {
  15. return
  16. }
  17. // 从MC或者DB中取出reply stat
  18. if stats, err = s.GetStatsByID(ctx, oid, tp, rpIDs); err != nil {
  19. return
  20. }
  21. for _, stat := range stats {
  22. stat := stat
  23. s.statQ.Do(ctx, func(ctx context.Context) {
  24. s.dao.SetReplyStatMc(ctx, stat)
  25. })
  26. }
  27. return s.dao.SetReplySetRds(ctx, oid, tp, rpIDs)
  28. }
  29. // addReplySet add one rpID into redis reply set.
  30. func (s *Service) addReplySet(ctx context.Context, oid int64, tp int, rpID int64) (err error) {
  31. ok, err := s.dao.ExpireReplySetRds(ctx, oid, tp)
  32. if err != nil {
  33. return
  34. }
  35. if ok {
  36. if err = s.dao.AddReplySetRds(ctx, oid, tp, rpID); err != nil {
  37. return
  38. }
  39. } else {
  40. if err = s.setReplySetBatch(ctx, oid, tp); err != nil {
  41. return
  42. }
  43. }
  44. return
  45. }
  46. func (s *Service) remSet(ctx context.Context, oid, rpID int64, tp int) (err error) {
  47. if err = s.dao.RemReplySetRds(ctx, oid, rpID, tp); err != nil {
  48. log.Error("Remove rpID from set error (%v)", err)
  49. }
  50. return
  51. }
  52. // func (s *Service) delSet(ctx context.Context, oid int64, tp int) (err error) {
  53. // if err = s.dao.DelReplySetRds(ctx, oid, tp); err != nil {
  54. // log.Error("delete reply set(oid: %d, type: %d)", oid, tp)
  55. // }
  56. // return
  57. // }
  58. // func (s *Service) delReply(ctx context.Context, oid int64, tp int) {
  59. // var err error
  60. // if err = s.delSet(ctx, oid, tp); err != nil {
  61. // s.replyListQ.Do(ctx, func(ctx context.Context) {
  62. // s.delSet(ctx, oid, tp)
  63. // })
  64. // }
  65. // if err = s.delZSet(ctx, oid, tp); err != nil {
  66. // s.replyListQ.Do(ctx, func(ctx context.Context) {
  67. // s.delZSet(ctx, oid, tp)
  68. // })
  69. // }
  70. // }
  71. func (s *Service) remReply(ctx context.Context, oid int64, tp int, rpID int64) {
  72. var err error
  73. if err = s.remSet(ctx, oid, rpID, tp); err != nil {
  74. s.replyListQ.Do(ctx, func(ctx context.Context) {
  75. s.remSet(ctx, oid, rpID, tp)
  76. })
  77. }
  78. if err = s.remZSet(ctx, oid, tp, rpID); err != nil {
  79. s.replyListQ.Do(ctx, func(ctx context.Context) {
  80. s.remZSet(ctx, oid, tp, rpID)
  81. })
  82. }
  83. }