uname_notify.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/job/live/push-search/model"
  6. "go-common/library/log"
  7. "go-common/library/sync/errgroup"
  8. "strconv"
  9. )
  10. func (s *Service) unameNotifyConsumeProc() {
  11. defer s.waiter.Done()
  12. for {
  13. msg, ok := <-s.dao.UserNameDataBus.Messages()
  14. if !ok {
  15. log.Error("unameNotifyConsumeProc closed")
  16. if err := s.dao.UserNameDataBus.Close(); err != nil {
  17. log.Error("s.dao.UserNameDataBus.Close() error(%v)", err)
  18. }
  19. return
  20. }
  21. //先提交防止阻塞,关闭时等待任务执行完
  22. m := &message{data: msg}
  23. raw := new(model.LiveDatabus)
  24. if err := json.Unmarshal(msg.Value, raw); err != nil {
  25. msg.Commit()
  26. log.Error("[UnameDataBus]json.Unmarshal(%s) error(%v)", string(msg.Value), err)
  27. continue
  28. }
  29. p := new(model.UnameNotifyInfo)
  30. if err := json.Unmarshal([]byte(raw.MsgContent), p); err != nil {
  31. msg.Commit()
  32. log.Error("[UnameDataBus]json.Unmarshal(%s) error(%v)", raw.MsgContent, err)
  33. continue
  34. }
  35. m.object = p
  36. s.unameMergeChan[p.Uid%int64(s.c.Group.UserInfo.Num)] <- m
  37. }
  38. }
  39. func (s *Service) unameNotifyHandleProc(c chan *message) {
  40. defer s.waiterChan.Done()
  41. for {
  42. msgData, ok := <-c
  43. if !ok {
  44. log.Error("[UnameDataBus]unameNotifyHandleProc closed")
  45. return
  46. }
  47. //先提交防止阻塞,关闭时等待任务执行完
  48. msgData.data.Commit()
  49. p, assertOk := msgData.object.(*model.UnameNotifyInfo)
  50. if !assertOk {
  51. log.Error("[UnameDataBus]unameNotifyHandleProc msg object type conversion error, msg:%+v", msgData)
  52. return
  53. }
  54. uid := p.Uid
  55. if uid == 0 {
  56. log.Error("[UnameDataBus]empty uid, uid:%d", uid)
  57. continue
  58. }
  59. fc := 0
  60. newMap := &model.TableField{}
  61. wg := errgroup.Group{}
  62. wg.Go(func() (err error) {
  63. fc, err = s.getFc(uid)
  64. return
  65. })
  66. wg.Go(func() (err error) {
  67. roomInfo, err := s.getBaseRoomInfo(uid)
  68. if err == nil && roomInfo != nil {
  69. newMap.RoomId = int(roomInfo.Roomid)
  70. newMap.ShortId = int(roomInfo.ShortId)
  71. newMap.Uid = roomInfo.Uid
  72. newMap.UName = roomInfo.Uname
  73. newMap.Area = int(roomInfo.Area)
  74. newMap.Title = roomInfo.Title
  75. newMap.Tag = roomInfo.Tags
  76. newMap.TryTime = roomInfo.TryTime
  77. newMap.Cover = roomInfo.Cover
  78. newMap.UserCover = roomInfo.UserCover
  79. newMap.LockStatus = roomInfo.LockStatus
  80. newMap.HiddenStatus = roomInfo.HiddenStatus
  81. newMap.Attentions = int(roomInfo.Attentions)
  82. newMap.Online = int(roomInfo.Online)
  83. newMap.LiveTime = roomInfo.LiveTime
  84. newMap.AreaV2Id = int(roomInfo.AreaV2Id)
  85. newMap.AreaV2ParentId = int(roomInfo.AreaV2ParentId)
  86. newMap.Virtual = int(roomInfo.Virtual)
  87. newMap.AreaV2Name = roomInfo.AreaV2Name
  88. newMap.CTime = roomInfo.Ctime
  89. newMap.MTime = roomInfo.Mtime
  90. newMap.RoundStatus = int(roomInfo.RoundStatus)
  91. newMap.OnFlag = int(roomInfo.OnFlag)
  92. }
  93. return
  94. })
  95. err := wg.Wait()
  96. if err == nil && newMap.RoomId != 0 {
  97. //非uname更新
  98. if p.Uname == newMap.UName {
  99. log.Info("[UnameDataBus]uname no change, msg:(%v)", p)
  100. continue
  101. }
  102. ret, retByte := s.generateSearchInfo("update", _tableArchive, newMap, nil)
  103. if p.Uname != "" {
  104. ret["new"].(map[string]interface{})["uname"] = p.Uname
  105. retByte["uname"] = []byte(p.Uname)
  106. }
  107. ret["new"].(map[string]interface{})["attentions"] = fc
  108. ret["new"].(map[string]interface{})["attention"] = fc
  109. retByte["attentions"] = []byte(strconv.Itoa(fc))
  110. retByte["attention"] = []byte(strconv.Itoa(fc))
  111. ret["old"].(map[string]interface{})["uname"] = ""
  112. wg := errgroup.Group{}
  113. wg.Go(func() (err error) {
  114. for i := 0; i < _retry; i++ {
  115. hbaseErr := s.saveHBase(context.TODO(), s.rowKey(newMap.RoomId), retByte)
  116. err = hbaseErr
  117. if hbaseErr != nil {
  118. continue
  119. }
  120. break
  121. }
  122. if err != nil {
  123. log.Error("[UnameDataBus]fail to write hbase, msg:(%v), err:(%v)", p, err)
  124. }
  125. return
  126. })
  127. wg.Go(func() (err error) {
  128. err = s.dao.Pub(context.TODO(), int64(newMap.RoomId), ret)
  129. if err != nil {
  130. log.Error("[UnameDataBus]fail to pub, msg:(%v), err:(%v)", p, err)
  131. }
  132. return
  133. })
  134. wg.Wait()
  135. log.Info("[UnameDataBus]success to handle, error(%v), msg:(%v)", err, ret)
  136. continue
  137. }
  138. log.Error("[UnameDataBus]fail to getData, error(%v),msg:(%v)", err, p)
  139. }
  140. }