mids.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/interface/live/push-live/dao"
  5. "go-common/app/interface/live/push-live/model"
  6. "go-common/library/log"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. // MidFilter 收敛所有mid过滤逻辑入口
  12. func (s *Service) midFilter(ml map[int64]bool, business int, task *model.ApPushTask) (midMap []int64) {
  13. var (
  14. mutex sync.Mutex
  15. i int
  16. midsList [][]int64
  17. wg sync.WaitGroup
  18. needDecrease = needDecrease(business)
  19. filterConf = &dao.FilterConfig{
  20. Business: business,
  21. IntervalExpired: s.safeGetExpired(),
  22. IntervalValue: intervalValueByLinkValue(task.LinkValue),
  23. DailyExpired: dailyExpired(time.Now()),
  24. Task: task}
  25. )
  26. midMap = make([]int64, 0, len(ml))
  27. // split mids by limit
  28. mids := make([]int64, 0, s.c.Push.IntervalLimit)
  29. for mid := range ml {
  30. mids = append(mids, mid)
  31. i++
  32. if i == s.c.Push.IntervalLimit {
  33. i = 0
  34. midsList = append(midsList, mids)
  35. mids = make([]int64, 0, s.c.Push.IntervalLimit)
  36. }
  37. }
  38. if len(mids) > 0 {
  39. midsList = append(midsList, mids)
  40. }
  41. // filter goroutines
  42. for i := 0; i < len(midsList); i++ {
  43. wg.Add(1)
  44. go func(index int, mids []int64) {
  45. var (
  46. filteredMids []int64
  47. f *dao.Filter
  48. err error
  49. ctx = context.TODO()
  50. )
  51. defer func() {
  52. log.Info("[service.mids|midFilter] BatchFilter before(%d), after(%d), task(%v), business(%d), err(%v)",
  53. len(mids), len(filteredMids), task, business, err)
  54. wg.Done()
  55. }()
  56. // new filter
  57. f, err = s.dao.NewFilter(filterConf)
  58. if err != nil {
  59. return
  60. }
  61. filteredMids = f.BatchFilter(ctx, s.dao.NewFilterChain(f), mids)
  62. if len(filteredMids) == 0 {
  63. f.Done()
  64. return
  65. }
  66. // after filter, do something
  67. if needDecrease {
  68. go f.BatchDecreaseLimit(ctx, filteredMids)
  69. }
  70. mutex.Lock()
  71. midMap = append(midMap, filteredMids...)
  72. mutex.Unlock()
  73. }(i, midsList[i])
  74. }
  75. wg.Wait()
  76. log.Info("[service.mids|midFilter] filtered task(%v), before(%d), after(%d), type(%d)",
  77. task, len(ml), len(midMap), business)
  78. return
  79. }
  80. // intervalValueByLinkValue get roomid by link value
  81. func intervalValueByLinkValue(linkValue string) string {
  82. s := strings.Split(linkValue, ",")
  83. return s[0]
  84. }
  85. // needDecrease
  86. func needDecrease(business int) bool {
  87. return business != model.ActivityBusiness
  88. }