room_feature.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package dao
  2. import (
  3. "context"
  4. "errors"
  5. "regexp"
  6. "sort"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "go-common/app/service/live/recommend/internal/conf"
  11. relationV1 "go-common/app/service/live/relation/api/liverpc/v1"
  12. roomV1 "go-common/app/service/live/room/api/liverpc/v1"
  13. roomV2 "go-common/app/service/live/room/api/liverpc/v2"
  14. "go-common/library/log"
  15. "go-common/library/sync/errgroup"
  16. )
  17. var roomFeatureValue atomic.Value
  18. var recDefaultRoomIds atomic.Value
  19. // StartRoomFeatureJob 更新在线房间的特征信息
  20. func StartRoomFeatureJob(c *conf.Config) {
  21. t := time.Tick(time.Second * 30)
  22. refreshRoomFeature(context.Background(), c)
  23. for range t {
  24. refreshRoomFeature(context.Background(), c)
  25. }
  26. }
  27. func refreshRoomFeature(ctx context.Context, c *conf.Config) (err error) {
  28. n := 20
  29. currentIds, ok := onlineRoomIdValue.Load().(map[int64]struct{})
  30. if !ok {
  31. log.Warn("cannot load current online room ids")
  32. err = errors.New("cannot load current online room ids")
  33. return
  34. }
  35. keys := make([]int64, 0, len(currentIds))
  36. for k := range currentIds {
  37. keys = append(keys, k)
  38. }
  39. chunkIdsArray := sliceArray(keys, n)
  40. roomFeatures := map[int64][]int64{}
  41. var lock sync.Mutex
  42. var eg errgroup.Group
  43. for _, tmp := range chunkIdsArray {
  44. chunkIds := tmp
  45. eg.Go(func() (err error) {
  46. resp, err := RoomAPI.V2Room.GetByIds(ctx, &roomV2.RoomGetByIdsReq{Ids: chunkIds})
  47. if err != nil || resp.GetCode() != 0 {
  48. log.Error("dao.RoomAPI.V2Room.GetByIds (%v) error(%v) resp(%v)", chunkIds, err, resp)
  49. return
  50. }
  51. resp1, err1 := RoomAPI.V1RoomPendant.GetPendantByIds(ctx, &roomV1.RoomPendantGetPendantByIdsReq{Ids: chunkIds, Type: "mobile_index_badge", Position: 2})
  52. if err1 != nil || resp1.GetCode() != 0 {
  53. log.Error("dao.RoomAPI.V1Room.GetPendantByIds (%v) error(%v) resp(%v)", chunkIds, err1, resp1)
  54. return
  55. }
  56. uids := make([]int64, 0, n)
  57. for _, r := range resp.Data {
  58. uids = append(uids, r.Uid)
  59. }
  60. resp2, err2 := RelationAPI.V1Feed.GetUserFcBatch(ctx, &relationV1.FeedGetUserFcBatchReq{Uids: uids})
  61. if err2 != nil || resp.GetCode() != 0 {
  62. log.Error("dao.RelationAPI.V1Relation.GetUserFcBatch (%v) error(%v) resp(%v)", chunkIds, err2, resp2)
  63. return
  64. }
  65. roomPendantInfo := resp1.Data.Result
  66. fansCountInfo := resp2.Data
  67. for roomId, r := range resp.Data {
  68. cornerTag := ""
  69. fansNum := int64(0)
  70. if PendantInfo, ok := roomPendantInfo[roomId]; ok && PendantInfo != nil {
  71. cornerTag = PendantInfo.Value
  72. }
  73. if fans, ok := fansCountInfo[r.Uid]; ok {
  74. fansNum = fans.Fc
  75. }
  76. featureVector := createFeature(c, r.AreaV2Id, cornerTag, fansNum, r.Online)
  77. lock.Lock()
  78. roomFeatures[roomId] = featureVector
  79. lock.Unlock()
  80. }
  81. return
  82. })
  83. }
  84. eg.Wait()
  85. roomFeatureValue.Store(roomFeatures)
  86. //创建默认推荐房间列表
  87. roomScoreSlice := ScoreSlice{}
  88. for roomId, vec := range roomFeatures {
  89. featureVector := make([]int64, len(vec))
  90. copy(featureVector, vec)
  91. featureVector[0] = 0
  92. counter := Counter{roomId: roomId, score: calcScore(makeWeightVec(c), featureVector)}
  93. roomScoreSlice = append(roomScoreSlice, counter)
  94. }
  95. sort.Sort(roomScoreSlice)
  96. //默认的召回源
  97. limit := 400
  98. recDefault := make([]int64, 0, limit)
  99. for _, counter := range roomScoreSlice {
  100. limit = limit - 1
  101. if limit < 0 {
  102. break
  103. }
  104. recDefault = append(recDefault, counter.roomId)
  105. }
  106. recDefaultRoomIds.Store(recDefault)
  107. log.Info("refreshRoomFeature success, total num:%d recDefault_num:%d, recDefault:%+v", len(roomFeatures), len(recDefault), recDefault)
  108. return
  109. }
  110. //建立房间相关的特征向量
  111. func createFeature(c *conf.Config, areaV2Id int64, cornerTag string, fansNum int64, onlineValue int64) (featureVector []int64) {
  112. fansMilestone := c.CommonFeature.FansNum.Values
  113. onlineMilestone := c.CommonFeature.Online.Values
  114. cornerSignList := c.CommonFeature.CornerSign.Values
  115. featureVector = append(featureVector, areaV2Id) //分区id, 留待在线计算的时候替换成0,1
  116. featureVector = append(featureVector, oneHotEncode(fansNum, fansMilestone)...)
  117. featureVector = append(featureVector, oneHotTextEncode(cornerTag, cornerSignList)...)
  118. featureVector = append(featureVector, oneHotEncode(onlineValue, onlineMilestone)...)
  119. return
  120. }
  121. //把slice按大小切成多个等大的小slice(除了最后一块)
  122. func sliceArray(arr []int64, n int) (ret [][]int64) {
  123. remainder := len(arr) % n
  124. quotient := (len(arr) - remainder) / n
  125. num := int(quotient)
  126. if remainder > 0 {
  127. num = num + 1
  128. }
  129. ret = make([][]int64, 0, num)
  130. for i := 0; i < num; i++ {
  131. if i < num-1 {
  132. ret = append(ret, arr[n*i:n*(i+1)])
  133. } else {
  134. ret = append(ret, arr[n*i:])
  135. }
  136. }
  137. return
  138. }
  139. //构建0,1组成的特征向量; 如果x<0, 返回全为0的向量
  140. func int2Slice(x int, n int) []int64 {
  141. p := make([]int64, n)
  142. if x < 0 {
  143. return p
  144. }
  145. p[x] = 1
  146. return p
  147. }
  148. func compAndSet(value int64, vList []int64) int {
  149. place := 0
  150. for _, v := range vList {
  151. if value < v {
  152. return place
  153. }
  154. place = place + 1
  155. }
  156. return place
  157. }
  158. func oneHotEncode(value int64, milestone []int64) []int64 {
  159. place := compAndSet(value, milestone)
  160. return int2Slice(place, len(milestone)+1)
  161. }
  162. // textList ["", A, B ]
  163. // 如果targetText空或者没匹配到 ret[0] = 1
  164. func oneHotTextEncode(targetText string, textList []string) (ret []int64) {
  165. place := 0
  166. ret = make([]int64, len(textList))
  167. if targetText == "" {
  168. ret[0] = 1
  169. return
  170. }
  171. for i, text := range textList {
  172. if text == "" {
  173. continue
  174. }
  175. match, err := regexp.MatchString(text, targetText)
  176. if err != nil {
  177. log.Error("oneHotTextEncode regex error " + text)
  178. place = 0
  179. break
  180. }
  181. if match {
  182. place = i
  183. break
  184. }
  185. }
  186. ret[place] = 1
  187. return
  188. }