common_message.go 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/interface/live/push-live/dao"
  6. "go-common/app/interface/live/push-live/model"
  7. "go-common/library/cache/redis"
  8. "go-common/library/log"
  9. "go-common/library/queue/databus"
  10. )
  11. // LiveCommonMessage 直播通用消息
  12. func (s *Service) LiveCommonMessage(ctx context.Context, msg *databus.Message) (err error) {
  13. defer msg.Commit()
  14. var (
  15. mids []int64
  16. mMap = make(map[int64]bool) // mid去重
  17. midMap = make(map[int][]int64) // 最终格式化后的mid map
  18. )
  19. m := new(model.LiveCommonMessage)
  20. if err = json.Unmarshal(msg.Value, &m); err != nil {
  21. log.Error("[service.common_message|LiveCommonMessage] json Unmarshal error(%v), model(%v)", err, m)
  22. return
  23. }
  24. task := s.InitCommonTask(m)
  25. if mids, err = s.convertStrToInt64(m.MsgContent.Mids); err != nil {
  26. log.Error("[service.push|LiveCommonMessage] format Mids error(%v), task(%v), model(%v)", err, task, m)
  27. return
  28. }
  29. // remove duplicated mid
  30. for _, mid := range mids {
  31. mMap[mid] = true
  32. }
  33. // mid filter
  34. business := m.MsgContent.Business
  35. filteredMids := s.midFilter(mMap, business, task)
  36. midMap[business] = filteredMids
  37. log.Info("[service.push|LiveCommonMessage] message info: before(%d), after(%d), model(%v), task(%v)",
  38. len(mMap), len(midMap[business]), m, task)
  39. total := s.Push(task, midMap)
  40. // create push task
  41. go s.CreatePushTask(task, total)
  42. go s.setPushInterval(business, s.safeGetExpired(), filteredMids, task)
  43. log.Info("[service.push|LiveCommonMessage] common message push done, total(%d), err(%v)", total, err)
  44. return
  45. }
  46. // InitCommonTask Init push task by common message model
  47. func (s *Service) InitCommonTask(m *model.LiveCommonMessage) (task *model.ApPushTask) {
  48. task = &model.ApPushTask{
  49. Type: model.LivePushType,
  50. TargetID: 0,
  51. AlertTitle: m.MsgContent.AlertTitle,
  52. AlertBody: m.MsgContent.AlertBody,
  53. MidSource: m.MsgContent.Business,
  54. LinkType: m.MsgContent.LinkType,
  55. LinkValue: m.MsgContent.LinkValue,
  56. ExpireTime: m.MsgContent.ExpireTime,
  57. Group: m.MsgContent.Group,
  58. }
  59. return task
  60. }
  61. // setPushInterval 活动预约,对每个mid设置推送平滑key
  62. func (s *Service) setPushInterval(business int, expired int32, mids []int64, task *model.ApPushTask) (total int, err error) {
  63. if business != 111 {
  64. return
  65. }
  66. var conn redis.Conn
  67. defer func() {
  68. if conn != nil {
  69. conn.Close()
  70. }
  71. }()
  72. // redis conn
  73. conn, err = redis.Dial(s.c.Redis.PushInterval.Proto, s.c.Redis.PushInterval.Addr, s.dao.RedisOption()...)
  74. if err != nil {
  75. log.Error("[service.common_message|setPushInterval] redis.Dial error(%v), task(%v), mids(%d)",
  76. err, task, len(mids))
  77. return
  78. }
  79. for _, mid := range mids {
  80. key := dao.GetIntervalKey(mid)
  81. _, err = conn.Do("SET", key, task.LinkValue, "EX", expired)
  82. if err != nil {
  83. log.Error("[service.common_message|setPushInterval] set redis error(%v), task(%v), mid(%d)",
  84. err, task, mid)
  85. continue
  86. }
  87. total++
  88. }
  89. return
  90. }