attention_notify.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/job/live/push-search/model"
  6. "go-common/library/log"
  7. "go-common/library/sync/errgroup"
  8. "strconv"
  9. )
  10. const (
  11. _retry = 3
  12. )
  13. func (s *Service) attentionNotifyConsumeProc() {
  14. defer s.waiter.Done()
  15. for {
  16. msg, ok := <-s.dao.AttentionDataBus.Messages()
  17. if !ok {
  18. log.Error("attentionNotifyConsumeProc closed")
  19. if err := s.dao.AttentionDataBus.Close(); err != nil {
  20. log.Error("s.dao.AttentionDataBus.Close() error(%v)", err)
  21. }
  22. return
  23. }
  24. m := &message{data: msg}
  25. p := new(model.LiveDatabusAttention)
  26. if err := json.Unmarshal(msg.Value, p); err != nil {
  27. msg.Commit()
  28. log.Error("[AttentionDataBus]json.Unmarshal(%s) error(%v)", string(msg.Value), err)
  29. continue
  30. }
  31. if p.MsgContent == nil {
  32. log.Error("[AttentionDataBus]attentionNotifyConsumeProc msg object msgContent is nil, msg:%+v", string(msg.Value))
  33. return
  34. }
  35. m.object = p
  36. s.attentionMergeChan[p.MsgContent.UpUid%int64(s.c.Group.Attention.Num)] <- m
  37. }
  38. }
  39. func (s *Service) attentionNotifyHandleProc(c chan *message) {
  40. defer s.waiterChan.Done()
  41. for {
  42. msgData, ok := <-c
  43. if !ok {
  44. log.Error("[AttentionDataBus]attentionNotifyHandleProc closed")
  45. return
  46. }
  47. //先提交防止阻塞,关闭时等待任务执行完
  48. msgData.data.Commit()
  49. p, assertOk := msgData.object.(*model.LiveDatabusAttention)
  50. if !assertOk {
  51. log.Error("[AttentionDataBus]attentionNotifyHandleProc msg object type conversion error, msg:%+v", msgData)
  52. return
  53. }
  54. uid := p.MsgContent.UpUid
  55. uName := ""
  56. newMap := &model.TableField{}
  57. wg := errgroup.Group{}
  58. wg.Go(func() (err error) {
  59. userInfo, err := s.getMultiUserInfo(uid)
  60. if err == nil && userInfo != nil && userInfo.Uname != "" {
  61. uName = userInfo.Uname
  62. }
  63. return
  64. })
  65. wg.Go(func() (err error) {
  66. roomInfo, err := s.getBaseRoomInfo(uid)
  67. if err == nil && roomInfo != nil {
  68. newMap.RoomId = int(roomInfo.Roomid)
  69. newMap.ShortId = int(roomInfo.ShortId)
  70. newMap.Uid = roomInfo.Uid
  71. newMap.UName = roomInfo.Uname
  72. newMap.Area = int(roomInfo.Area)
  73. newMap.Title = roomInfo.Title
  74. newMap.Tag = roomInfo.Tags
  75. newMap.TryTime = roomInfo.TryTime
  76. newMap.Cover = roomInfo.Cover
  77. newMap.UserCover = roomInfo.UserCover
  78. newMap.LockStatus = roomInfo.LockStatus
  79. newMap.HiddenStatus = roomInfo.HiddenStatus
  80. newMap.Attentions = int(roomInfo.Attentions)
  81. newMap.Online = int(roomInfo.Online)
  82. newMap.LiveTime = roomInfo.LiveTime
  83. newMap.AreaV2Id = int(roomInfo.AreaV2Id)
  84. newMap.AreaV2ParentId = int(roomInfo.AreaV2ParentId)
  85. newMap.Virtual = int(roomInfo.Virtual)
  86. newMap.AreaV2Name = roomInfo.AreaV2Name
  87. newMap.CTime = roomInfo.Ctime
  88. newMap.MTime = roomInfo.Mtime
  89. newMap.RoundStatus = int(roomInfo.RoundStatus)
  90. newMap.OnFlag = int(roomInfo.OnFlag)
  91. }
  92. return
  93. })
  94. err := wg.Wait()
  95. if err == nil && newMap.RoomId != 0 {
  96. ret, retByte := s.generateSearchInfo("update", _tableArchive, newMap, nil)
  97. if uName != "" {
  98. ret["new"].(map[string]interface{})["uname"] = uName
  99. retByte["uname"] = []byte(uName)
  100. }
  101. if p.MsgContent.ExtInfo != nil {
  102. ret["new"].(map[string]interface{})["attentions"] = p.MsgContent.ExtInfo.UpUidFans
  103. ret["new"].(map[string]interface{})["attention"] = p.MsgContent.ExtInfo.UpUidFans
  104. retByte["attentions"] = []byte(strconv.Itoa(p.MsgContent.ExtInfo.UpUidFans))
  105. retByte["attention"] = []byte(strconv.Itoa(p.MsgContent.ExtInfo.UpUidFans))
  106. }
  107. //构造假old
  108. ret["old"].(map[string]interface{})["attention"] = 0
  109. ret["old"].(map[string]interface{})["attentions"] = 0
  110. wg := errgroup.Group{}
  111. wg.Go(func() (err error) {
  112. for i := 0; i < _retry; i++ {
  113. hbaseErr := s.saveHBase(context.TODO(), s.rowKey(newMap.RoomId), retByte)
  114. err = hbaseErr
  115. if hbaseErr != nil {
  116. continue
  117. }
  118. break
  119. }
  120. if err != nil {
  121. log.Error("[AttentionDataBus]fail to write hbase, msg:(%v), err:(%v)", p, err)
  122. }
  123. return
  124. })
  125. wg.Go(func() (err error) {
  126. err = s.dao.Pub(context.TODO(), int64(newMap.RoomId), ret)
  127. if err != nil {
  128. log.Error("[AttentionDataBus]fail to pub, msg:(%v), err:(%v)", p, err)
  129. }
  130. return
  131. })
  132. wg.Wait()
  133. log.Info("[AttentionDataBus]success to handle, error(%v), msg:(%v)", err, ret)
  134. continue
  135. }
  136. log.Error("[AttentionDataBus]fail to getData, error(%v),msg:(%v)", err, p)
  137. }
  138. }