dao.go 11 KB


  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "sort"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "github.com/pkg/errors"
  10. "go-common/app/service/live/recommend/internal/conf"
  11. "go-common/app/service/live/recommend/recconst"
  12. relation_api "go-common/app/service/live/relation/api/liverpc"
  13. room_api "go-common/app/service/live/room/api/liverpc"
  14. "go-common/library/cache/redis"
  15. "go-common/library/log"
  16. "go-common/library/net/rpc/liverpc"
  17. )
  18. var _userRecCandidateKey = "rec_candidate_%d"
  19. var _recommendOffsetKey = "rec_offset_%d"
  20. // 已经推荐过的池子,用户+日期
  21. var _recommendedKey = "recommended_%d_%s"
  22. // RoomAPI room liverpc client
  23. var RoomAPI *room_api.Client
  24. // RelationAPI relation liverpc client
  25. var RelationAPI *relation_api.Client
  26. // Dao dao
  27. type Dao struct {
  28. c *conf.Config
  29. redis *redis.Pool
  30. }
  31. func init() {
  32. RoomAPI = room_api.New(getConf("room"))
  33. RelationAPI = relation_api.New(getConf("relation"))
  34. }
  35. func getConf(appName string) *liverpc.ClientConfig {
  36. c := conf.Conf.LiveRpc
  37. if c != nil {
  38. return c[appName]
  39. }
  40. return nil
  41. }
  42. // ClearRecommend 清空该用户相关的推荐缓存
  43. func (d *Dao) ClearRecommend(ctx context.Context, uid int64) error {
  44. candidateKey := fmt.Sprintf(_userRecCandidateKey, uid)
  45. recommendedKey := fmt.Sprintf(_recommendedKey, uid, time.Now().Format("20060102"))
  46. offsetKey := fmt.Sprintf(_recommendOffsetKey, uid)
  47. conn := d.redis.Get(ctx)
  48. defer conn.Close()
  49. _, err := conn.Do("DEL", candidateKey, recommendedKey, offsetKey)
  50. return errors.WithStack(err)
  51. }
  52. // New init mysql db
  53. func New(c *conf.Config) (dao *Dao) {
  54. dao = &Dao{
  55. c: c,
  56. redis: redis.NewPool(c.Redis),
  57. }
  58. return
  59. }
  60. // Close close the resource.
  61. func (d *Dao) Close() {
  62. d.redis.Close()
  63. }
  64. func (d *Dao) saveOffset(conn redis.Conn, uid int64, offset int) {
  65. conn.Do("SETEX", fmt.Sprintf(_recommendOffsetKey, uid), 86400, offset)
  66. }
  67. func (d *Dao) addToRecommended(conn redis.Conn, uid int64, ids []int64) {
  68. if len(ids) == 0 {
  69. return
  70. }
  71. day := time.Now().Format("20060102")
  72. key := fmt.Sprintf(_recommendedKey, uid, day)
  73. var is []interface{}
  74. is = append(is, key)
  75. for _, id := range ids {
  76. is = append(is, id)
  77. }
  78. conn.Send("EXPIRE", key, 86400)
  79. conn.Send("SADD", is...)
  80. conn.Flush()
  81. conn.Receive()
  82. _, err := conn.Receive()
  83. if err != nil {
  84. log.Info("addToRecommended error +%v", err)
  85. }
  86. }
  87. // GetRandomRoomIds 随机获取count个推荐
  88. // 如果总数量total比count小,则返回total个
  89. func (d *Dao) GetRandomRoomIds(ctx context.Context, uid int64, reqCount int, existRoomIDs []int64) (ret []int64, err error) {
  90. if reqCount == 0 {
  91. return
  92. }
  93. var (
  94. candidateLen int
  95. )
  96. r := d.redis.Get(ctx)
  97. defer r.Close()
  98. candidateKey := fmt.Sprintf(_userRecCandidateKey, uid)
  99. exists, err := redis.Int(r.Do("exists", candidateKey))
  100. if err != nil {
  101. err = errors.WithStack(err)
  102. return
  103. }
  104. existMap := map[int64]struct{}{}
  105. for _, id := range existRoomIDs {
  106. existMap[id] = struct{}{}
  107. }
  108. if exists == 0 {
  109. var candidate []int64
  110. var currentOffset = 0
  111. candidate, err = d.generateLrCandidateList(r, uid, candidateKey)
  112. if err != nil {
  113. return
  114. }
  115. Loop:
  116. for len(ret) < reqCount && currentOffset < len(candidate) {
  117. var tmp []int64
  118. if len(candidate)-currentOffset < int(reqCount) {
  119. tmp = candidate[currentOffset:]
  120. } else {
  121. tmp = candidate[currentOffset : currentOffset+reqCount]
  122. }
  123. //去重
  124. for _, id := range tmp {
  125. _, ok := existMap[id]
  126. currentOffset += 1
  127. if !ok {
  128. ret = append(ret, id)
  129. if len(ret) >= int(reqCount) {
  130. break Loop
  131. }
  132. }
  133. }
  134. }
  135. d.addToRecommended(r, uid, ret)
  136. d.saveOffset(r, uid, currentOffset)
  137. } else {
  138. candidateLen, err = redis.Int(r.Do("LLEN", candidateKey))
  139. if err != nil {
  140. return
  141. }
  142. var offset int
  143. offset, _ = redis.Int(r.Do("GET", fmt.Sprintf(_recommendOffsetKey, uid)))
  144. if offset > (candidateLen - 1) {
  145. return
  146. }
  147. var currentOffset = offset
  148. Loop2:
  149. for len(ret) < reqCount && currentOffset < candidateLen {
  150. var ids []int64
  151. ids, err = redis.Int64s(r.Do("LRANGE", candidateKey, currentOffset, currentOffset+reqCount-1))
  152. if err != nil {
  153. err = errors.WithStack(err)
  154. return
  155. }
  156. // 去重
  157. for _, id := range ids {
  158. currentOffset++
  159. _, ok := existMap[id]
  160. if !ok {
  161. ret = append(ret, id)
  162. if len(ret) >= int(reqCount) {
  163. break Loop2
  164. }
  165. }
  166. }
  167. if len(ids) == 0 {
  168. log.Error("Cannot get recommend candidate, key=%s, offset=%d, count=%d", candidateKey, offset, reqCount)
  169. break
  170. }
  171. }
  172. d.addToRecommended(r, uid, ret)
  173. d.saveOffset(r, uid, currentOffset)
  174. }
  175. return
  176. }
  177. // GetLrRecRoomIds 在GetRandomRoomIds的基础上进行LR计算并返回倒排的房间号列表
  178. // 与GetRandomRoomIds有相同的输入输出结构
  179. func (d *Dao) GetLrRecRoomIds(r redis.Conn, uid int64, candidateIds []int64) (ret []int64, err error) {
  180. var areas string
  181. areaIds := map[int64]struct{}{}
  182. areas, err = redis.String(r.Do("GET", fmt.Sprintf(recconst.UserAreaKey, uid)))
  183. if err != nil && err != redis.ErrNil {
  184. log.Error("redis GET error: %v", err)
  185. return
  186. }
  187. err = nil
  188. if areas != "" {
  189. split := strings.Split(areas, ";")
  190. for _, areaIdStr := range split {
  191. areaId, _ := strconv.ParseInt(areaIdStr, 10, 64)
  192. areaIds[areaId] = struct{}{}
  193. }
  194. }
  195. weightVector := makeWeightVec(d.c)
  196. roomFeatures, ok := roomFeatureValue.Load().(map[int64][]int64)
  197. if !ok {
  198. ret = candidateIds
  199. return
  200. }
  201. roomScoreSlice := ScoreSlice{}
  202. for _, roomId := range candidateIds {
  203. if fv, ok := roomFeatures[roomId]; ok {
  204. featureVector := make([]int64, len(fv))
  205. copy(featureVector, fv)
  206. areaId := featureVector[0]
  207. if _, ok := areaIds[areaId]; ok {
  208. featureVector[0] = 1
  209. } else {
  210. featureVector[0] = 0
  211. }
  212. counter := Counter{roomId: roomId, score: calcScore(weightVector, featureVector)}
  213. roomScoreSlice = append(roomScoreSlice, counter)
  214. }
  215. }
  216. sort.Sort(roomScoreSlice)
  217. for _, counter := range roomScoreSlice {
  218. ret = append(ret, counter.roomId)
  219. }
  220. return
  221. }
  222. // generateCandidateList 得到候选集
  223. func (d *Dao) generateCandidateList(r redis.Conn, uid int64, candidateKey string) (ret []int64, err error) {
  224. // 第一步 itemcf,优先级最高。
  225. itemCFKey := fmt.Sprintf(recconst.UserItemCFRecKey, uid)
  226. var itemCFList []int64
  227. itemCFList, err = redis.Int64s(r.Do("ZREVRANGE", itemCFKey, 0, -1))
  228. if err != nil {
  229. err = errors.WithStack(err)
  230. return
  231. }
  232. itemCFOnlineIds := d.FilterOnlineRoomIds(itemCFList)
  233. if len(itemCFOnlineIds) == 0 {
  234. log.Info("No item-cf room online for user, uid=%d, before online filter room ids: %+v", uid, itemCFList)
  235. }
  236. // 第二步 取兴趣分区的房间 人气超过100的房间
  237. var areas string
  238. areas, err = redis.String(r.Do("GET", fmt.Sprintf(recconst.UserAreaKey, uid)))
  239. if err != nil && err != redis.ErrNil {
  240. err = errors.WithStack(err)
  241. return
  242. }
  243. err = nil
  244. var areaRoomIDs []int64
  245. if areas != "" {
  246. split := strings.Split(areas, ";")
  247. for _, areaIdStr := range split {
  248. areaId, _ := strconv.ParseInt(areaIdStr, 10, 64)
  249. var ids = d.getAreaRoomIds(areaId)
  250. areaRoomIDs = append(areaRoomIDs, ids...)
  251. }
  252. }
  253. // 第三步 取兴趣分区大分区的100个 先不做
  254. // 第四步 减去已经推荐过的
  255. day := time.Now().Format("20060102")
  256. var recommendedList []int64
  257. edKey := fmt.Sprintf(_recommendedKey, uid, day)
  258. recommendedList, err = redis.Int64s(r.Do("SMEMBERS", edKey))
  259. if err != nil {
  260. err = errors.WithStack(err)
  261. return
  262. }
  263. recommended := map[int64]struct{}{}
  264. for _, id := range recommendedList {
  265. recommended[id] = struct{}{}
  266. }
  267. var itemCFFinalIDs []int64
  268. for _, id := range itemCFOnlineIds {
  269. _, exist := recommended[id]
  270. if !exist {
  271. itemCFFinalIDs = append(itemCFFinalIDs, id)
  272. }
  273. }
  274. var areaRoomFinalIDs []int64
  275. for _, id := range areaRoomIDs {
  276. _, exist := recommended[id]
  277. if !exist {
  278. areaRoomFinalIDs = append(areaRoomFinalIDs, id)
  279. }
  280. }
  281. ret = mergeArr(itemCFFinalIDs, areaRoomFinalIDs)
  282. log.Info("UserRecommend : uid=%d total=%d, "+
  283. "itemcf.original=%d, itemcf.online=%d, itemcf.noviewd=%d, "+
  284. "areaRoom.original=%d, itemcf.noviewd=%d viewed=%d",
  285. uid, len(ret), len(itemCFList), len(itemCFOnlineIds), len(itemCFFinalIDs),
  286. len(areaRoomIDs), len(areaRoomFinalIDs), len(recommendedList))
  287. return
  288. }
  289. // generateCandidateList 得到进过LR的候选集
  290. func (d *Dao) generateLrCandidateList(r redis.Conn, uid int64, candidateKey string) (ret []int64, err error) {
  291. roomIDs, err := d.generateCandidateList(r, uid, candidateKey)
  292. if err != nil {
  293. log.Error("generateLrCandidateList failed 1, error:%v", err)
  294. return
  295. }
  296. if len(ret) > 0 {
  297. ret, err = d.GetLrRecRoomIds(r, uid, roomIDs)
  298. if err != nil {
  299. log.Error("generateLrCandidateList failed 2, error:%v", err)
  300. return
  301. }
  302. }
  303. // 召回源不足的情况下补足推荐房间数
  304. if len(ret) < 150 {
  305. ids, ok := recDefaultRoomIds.Load().([]int64)
  306. if !ok {
  307. return
  308. }
  309. ret1, err1 := d.GetLrRecRoomIds(r, uid, ids)
  310. if err1 != nil {
  311. log.Error("generateLrCandidateList failed 3, error:%v", err1)
  312. return
  313. }
  314. ret = mergeArrWithOrder(ret, ret1, 150) // TODO:当前ret1的结果是没有过滤掉今天看过的房间的, 看后面是否需要优化
  315. }
  316. {
  317. for _, roomID := range ret {
  318. r.Send("RPUSH", candidateKey, roomID)
  319. }
  320. r.Send("EXPIRE", candidateKey, 60*2)
  321. err = r.Flush()
  322. if err != nil {
  323. err = errors.WithStack(err)
  324. return
  325. }
  326. for i := 0; i < len(ret)+1; i++ {
  327. r.Receive()
  328. }
  329. }
  330. return
  331. }
  332. // Ping dao ping
  333. func (d *Dao) Ping(ctx context.Context) (err error) {
  334. conn := d.redis.Get(ctx)
  335. defer conn.Close()
  336. _, err = conn.Do("ping")
  337. if err != nil {
  338. err = errors.Wrap(err, "dao Ping err")
  339. }
  340. return err
  341. }
  342. // Counter 房间-分数结构体, 用于构建一个可排序的slice
  343. type Counter struct {
  344. roomId int64
  345. score float32
  346. }
  347. // ScoreSlice Counter对象的slice
  348. type ScoreSlice []Counter
  349. func (s ScoreSlice) Len() int {
  350. return len(s)
  351. }
  352. func (s ScoreSlice) Swap(i, j int) {
  353. s[i], s[j] = s[j], s[i]
  354. }
  355. func (s ScoreSlice) Less(i, j int) bool {
  356. return s[j].score < s[i].score
  357. }
  358. func calcScore(weightVector []float32, featureVector []int64) (score float32) {
  359. if len(weightVector) != len(featureVector) {
  360. panic(fmt.Sprintf("权重数量和特征数量不匹配, 请检查配置或逻辑, weight: %+v, feature: %+v", weightVector, featureVector))
  361. }
  362. for i := 0; i < min(len(weightVector), len(featureVector)); i++ {
  363. score += weightVector[i] * float32(featureVector[i])
  364. }
  365. return
  366. }
  367. func min(x int, y int) int {
  368. if x < y {
  369. return x
  370. }
  371. return y
  372. }
  373. // 合并两个集合
  374. func mergeArr(x []int64, y []int64) (ret []int64) {
  375. tmpMap := map[int64]struct{}{}
  376. for _, id := range x {
  377. tmpMap[id] = struct{}{}
  378. }
  379. for _, id := range y {
  380. tmpMap[id] = struct{}{}
  381. }
  382. for id := range tmpMap {
  383. ret = append(ret, id)
  384. }
  385. return
  386. }
  387. // 按x, y的顺序合并两个集合, 当x的长度不小于limit则直接返回
  388. func mergeArrWithOrder(x []int64, y []int64, limit int) (ret []int64) {
  389. if len(x) >= limit {
  390. ret = x
  391. return
  392. }
  393. tmpMap := map[int64]struct{}{}
  394. ret = append(ret, x...)
  395. num := len(ret)
  396. for _, id := range x {
  397. tmpMap[id] = struct{}{}
  398. }
  399. for _, id := range y {
  400. if _, ok := tmpMap[id]; ok {
  401. continue
  402. }
  403. num += 1
  404. tmpMap[id] = struct{}{}
  405. ret = append(ret, id)
  406. if num >= limit {
  407. break
  408. }
  409. }
  410. return
  411. }
  412. func makeWeightVec(c *conf.Config) (ret []float32) {
  413. ret = append(ret, c.CommonFeature.UserAreaInterest.Weights...)
  414. ret = append(ret, c.CommonFeature.FansNum.Weights...)
  415. ret = append(ret, c.CommonFeature.CornerSign.Weights...)
  416. ret = append(ret, c.CommonFeature.Online.Weights...)
  417. return
  418. }