roominfo_notify.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package service
  2. import (
  3. "encoding/json"
  4. "go-common/app/job/live/push-search/model"
  5. "go-common/library/log"
  6. "context"
  7. roomV1 "go-common/app/service/live/room/api/liverpc/v1"
  8. "go-common/library/sync/errgroup"
  9. "strconv"
  10. )
  11. const (
  12. _updateAct = "update"
  13. _insertAct = "insert"
  14. )
  15. func (s *Service) roomInfoNotifyConsumeProc() {
  16. defer s.waiter.Done()
  17. for {
  18. msg, ok := <-s.dao.RoomInfoDataBus.Messages()
  19. // databus关闭chan导致,服务自杀或异常退出
  20. if !ok {
  21. log.Error("roomInfoNotifyConsumeProc closed")
  22. if err := s.dao.RoomInfoDataBus.Close(); err != nil {
  23. log.Error("s.dao.RoomInfoDataBus.Close() error(%v)", err)
  24. }
  25. return
  26. }
  27. m := &message{data: msg}
  28. p := new(model.ApRoomNotifyInfo)
  29. if err := json.Unmarshal(msg.Value, p); err != nil {
  30. msg.Commit()
  31. log.Error("[RoomInfoDataBus]json.Unmarshal(%s) error(%v)", string(msg.Value), err)
  32. continue
  33. }
  34. if p.Action != _insertAct && p.Action != _updateAct {
  35. msg.Commit()
  36. log.Error("[RoomInfoDataBus]Action Invalid error(%v)", p.Action)
  37. continue
  38. }
  39. //判断是否是关注or昵称变更,如果是则跳过,顺便解出新旧map
  40. isAttentionUpdate, oldMap, newMap, err := isAttentionChange(p.Action, p.Old, p.New)
  41. if err != nil {
  42. msg.Commit()
  43. log.Error("[RoomInfoDataBus]isAttentionChange,json.Unmarshal(old:%s, new:%s) error(%v)", string(p.Old), string(p.New), err)
  44. continue
  45. }
  46. if isAttentionUpdate {
  47. msg.Commit()
  48. log.Error("[RoomInfoDataBus]attention change pass")
  49. continue
  50. }
  51. //hash chan
  52. if newMap == nil || newMap.RoomId <= 0 {
  53. msg.Commit()
  54. log.Error("[RoomInfoDataBus]roomId type conversion error, roomId:%+v", newMap)
  55. continue
  56. }
  57. dataMap := new(model.DataMap)
  58. dataMap.Action = p.Action
  59. dataMap.Table = p.Table
  60. dataMap.New = newMap
  61. dataMap.Old = oldMap
  62. m.object = dataMap
  63. log.Info("[RoomInfoDataBus]roomInfoNotifyConsumeProc key:%s partition:%d offset:%d", msg.Key, msg.Partition, msg.Offset)
  64. s.binLogMergeChan[newMap.RoomId%s.c.Group.RoomInfo.Num] <- m
  65. }
  66. }
  67. func isAttentionChange(action string, old []byte, new []byte) (bool, *model.TableField, *model.TableField, error) {
  68. newMap := &model.TableField{}
  69. oldMap := &model.TableField{}
  70. err := json.Unmarshal(new, newMap)
  71. if err != nil {
  72. return false, oldMap, newMap, err
  73. }
  74. if action == _updateAct {
  75. err := json.Unmarshal(old, oldMap)
  76. if err != nil {
  77. return false, oldMap, newMap, err
  78. }
  79. if oldMap != nil && oldMap.Attentions != newMap.Attentions {
  80. return true, oldMap, newMap, err
  81. }
  82. }
  83. if action == _insertAct {
  84. oldMap = nil
  85. }
  86. return false, oldMap, newMap, err
  87. }
  88. func (s *Service) roomInfoNotifyHandleProc(c chan *message) {
  89. defer s.waiterChan.Done()
  90. for {
  91. msgData, ok := <-c
  92. if !ok {
  93. log.Error("[RoomInfoDataBus]roomInfoNotifyHandleProc closed")
  94. return
  95. }
  96. msgData.data.Commit()
  97. p, assertOk := msgData.object.(*model.DataMap)
  98. if !assertOk {
  99. log.Error("[RoomInfoDataBus]roomInfoNotifyHandleProc msg object type conversion error, msg:%+v", msgData)
  100. return
  101. }
  102. uid := p.New.Uid
  103. wg := errgroup.Group{}
  104. uName := ""
  105. fc := 0
  106. areaInfo := &roomV1.AreaGetDetailResp_AreaInfo{}
  107. wg.Go(func() (err error) {
  108. userInfo, err := s.getMultiUserInfo(uid)
  109. if err == nil && userInfo != nil && userInfo.Uname != "" {
  110. uName = userInfo.Uname
  111. }
  112. return
  113. })
  114. //fc任何错误都要返回,不然fc为0无法判断是接口返回0还是初始化的0!!!!
  115. wg.Go(func() (err error) {
  116. fc, err = s.getFc(uid)
  117. return
  118. })
  119. wg.Go(func() (err error) {
  120. areaInfo, err = s.getAreaV2Detail(p.New.AreaV2Id)
  121. return
  122. })
  123. err := wg.Wait()
  124. //成功返回则替换,否则输出原数据
  125. ret, retByte := s.generateSearchInfo(p.Action, p.Table, p.New, p.Old)
  126. if err == nil {
  127. if uName != "" {
  128. ret["new"].(map[string]interface{})["uname"] = uName
  129. retByte["uname"] = []byte(uName)
  130. }
  131. if areaInfo != nil && areaInfo.Name != "" {
  132. ret["new"].(map[string]interface{})["s_category"] = areaInfo.Name
  133. retByte["s_category"] = []byte(areaInfo.Name)
  134. }
  135. ret["new"].(map[string]interface{})["attentions"] = fc
  136. ret["new"].(map[string]interface{})["attention"] = fc
  137. retByte["attentions"] = []byte(strconv.Itoa(fc))
  138. retByte["attention"] = []byte(strconv.Itoa(fc))
  139. }
  140. writeWg := errgroup.Group{}
  141. writeWg.Go(func() (err error) {
  142. for i := 0; i < _retry; i++ {
  143. hbaseErr := s.saveHBase(context.TODO(), s.rowKey(p.New.RoomId), retByte)
  144. err = hbaseErr
  145. if hbaseErr != nil {
  146. continue
  147. }
  148. break
  149. }
  150. if err != nil {
  151. log.Error("[RoomInfoDataBus]fail to write hbase, msg:(%v), err:(%v)", p, err)
  152. }
  153. return
  154. })
  155. writeWg.Go(func() (err error) {
  156. err = s.dao.Pub(context.TODO(), int64(p.New.RoomId), ret)
  157. if err != nil {
  158. log.Error("[RoomInfoDataBus]fail to pub, msg:(%v), err:(%v)", p, err)
  159. }
  160. return
  161. })
  162. wg.Wait()
  163. log.Info("[RoomInfoDataBus]success handle, error(%v),msg:(%v)", err, ret)
  164. }
  165. }