user.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "unsafe"
  9. recsys "go-common/app/service/bbq/recsys/api/grpc/v1"
  10. "go-common/app/service/bbq/recsys/dao/parallel"
  11. "go-common/app/service/bbq/recsys/model"
  12. "go-common/library/cache/redis"
  13. "go-common/library/log"
  14. "github.com/Dai0522/workpool"
  15. "github.com/json-iterator/go"
  16. )
  17. //user const
  18. const (
  19. TaskLastPage = "TaskLastPage"
  20. TaskLastUpsPage = "TaskLastUpsPage"
  21. TaskBiliUserProfile = "TaskBiliUserProfile"
  22. TaskBBQUserProfile = "TaskBBQUserProfile"
  23. TaskBBQDeviceProfile = "TaskBBQDeviceProfile"
  24. TaskUserLike = "TaskUserLike"
  25. TaskUserLikeYesterday = "TaskUserLikeYesterday"
  26. TaskUserPlay = "TaskUserPlay"
  27. TaskUserPlayYesterday = "TaskUserPlayYesterday"
  28. TaskDevicePlay = "TaskDevicePlay"
  29. TaskDevicePlayYesterday = "TaskDevicePlayYesterday"
  30. TaskUserFollow = "TaskUserFollow"
  31. TaskUserFollowYesterday = "TaskUserFollowYesterday"
  32. //_BBQDeviceProfileKey = "bbq:device:profile:%s"
  33. _BBQDeviceProfileKey = "bbq:device:profile:{buvid}:%s"
  34. _BBQUserProfileKey = "bbq:user:profile:%d"
  35. _BiliUserProfileKey = "bbq:user:basic:%d"
  36. _LastFewPageRecords1 = "bbq:last:v1:mid:%d"
  37. _LastFewPageRecords2 = "bbq:last:v1:buvid:%s"
  38. _LastFewUpsPageRecords1 = "bbq:last:v1:ups:mid:%d"
  39. _LastFewUpsPageRecords2 = "bbq:last:v1:ups:buvid:%s"
  40. _RealTimeUserLike = "storm:v2:u:%d:like:%s"
  41. _RealTimeUserPlayMID = "storm:v2:u:%d:%s:view:100"
  42. _RealTimeUserPlayBuvID = "storm:v2:u:%s:%s:view:100"
  43. _RealTimeUserFollow = "storm:v2:u:%d:%s:follow:100"
  44. _ModelTest = "bbq:model:init"
  45. _Zone = "zone"
  46. _Tag = "tag"
  47. _Up = "up"
  48. )
  49. //LastPageRedisKey for main rec process
  50. func (d *Dao) LastPageRedisKey(mid int64, buvid string) (key string) {
  51. if mid > 0 {
  52. key = fmt.Sprintf(_LastFewPageRecords1, mid)
  53. } else {
  54. key = fmt.Sprintf(_LastFewPageRecords2, buvid)
  55. }
  56. return
  57. }
  58. //LastUpsPageRedisKey for ups rec process
  59. func (d *Dao) LastUpsPageRedisKey(mid int64, buvid string) (key string) {
  60. if mid > 0 {
  61. key = fmt.Sprintf(_LastFewUpsPageRecords1, mid)
  62. } else {
  63. key = fmt.Sprintf(_LastFewUpsPageRecords2, buvid)
  64. }
  65. return
  66. }
  67. //InitModel ...
  68. func (d *Dao) InitModel(c context.Context, weights map[string]float64) (err error) {
  69. conn := d.redis.Get(c)
  70. defer conn.Close()
  71. key := _ModelTest
  72. if result, err := redis.String(conn.Do("GET", key)); err == nil {
  73. for _, field := range strings.Split(result, ",") {
  74. featureWeightPair := strings.Split(field, ":")
  75. if len(featureWeightPair) >= 2 {
  76. feature := featureWeightPair[0]
  77. weight, _ := strconv.ParseFloat(featureWeightPair[1], 64)
  78. weights[feature] = weight
  79. }
  80. }
  81. }
  82. return
  83. }
  84. //StoreRecResults store rec or upsRec history according to getKeyFunc
  85. func (d *Dao) StoreRecResults(c context.Context, u *model.UserProfile, mid int64, buvid string, response *recsys.RecsysResponse, getKeyFunc func(int64, string) string, lastRecords []model.Record4Dup) (err error) {
  86. conn := d.redis.Get(c)
  87. defer conn.Close()
  88. key := getKeyFunc(mid, buvid)
  89. maxPageNum := 10
  90. size := len(response.List)
  91. if len(lastRecords) > maxPageNum*size {
  92. lastRecords = lastRecords[size:]
  93. }
  94. for _, record := range response.List {
  95. svid := record.Svid
  96. mid, ok1 := record.Map[model.UperMid]
  97. tag, ok2 := record.Map[model.ScatterTag]
  98. if ok1 && ok2 {
  99. lastRecords = append(lastRecords, model.Record4Dup{
  100. SVID: svid,
  101. MID: mid,
  102. Tag: tag,
  103. })
  104. }
  105. }
  106. bytes, _ := jsoniter.Marshal(lastRecords)
  107. _, err = conn.Do("SETEX", key, 86400, bytes)
  108. if err != nil {
  109. log.Error("store last few records error: ", err)
  110. }
  111. ////for test
  112. //if mid == 28272030 || mid == 390642849 {
  113. // return
  114. //}
  115. // write bloomfilter for es
  116. svids := make([]uint64, len(response.List))
  117. for i, v := range response.List {
  118. svids[i] = uint64(v.Svid)
  119. }
  120. if _, bfErr := d.WriteBF(c, mid, buvid, svids); bfErr != nil {
  121. log.Errorv(c, log.KV("Write BF error: ", bfErr))
  122. }
  123. return
  124. }
  125. //InitUserProfile ...
  126. func (d *Dao) InitUserProfile(c context.Context, mid int64, buvid string) (u *model.UserProfile) {
  127. u = &model.UserProfile{
  128. Mid: mid,
  129. Buvid: buvid,
  130. Name: "",
  131. Gender: -1,
  132. ViewVideos: []int64{},
  133. Zones1: map[string]float64{},
  134. BiliTags: map[string]float64{}, //bili
  135. Zones2: map[string]float64{}, //bili
  136. FollowUps: map[int64]int64{}, //bili
  137. BBQTags: map[string]float64{}, //bbq
  138. BBQZones: map[string]float64{}, //bbq
  139. BBQPrefUps: map[int64]int64{}, //bbq
  140. BBQFollowAction: map[int64]int64{}, //bbq
  141. BBQFollow: map[int64]int64{}, //bbq
  142. BBQBlack: map[int64]int64{}, //bbq
  143. PosVideos: map[int64]int64{},
  144. NegVideos: map[int64]int64{},
  145. LikeVideos: map[int64]int64{},
  146. LikeTags: map[string]float64{},
  147. LikeTagIDs: map[int64]int64{},
  148. LikeUPs: map[int64]int64{},
  149. PosTagIDs: map[int64]int64{},
  150. NegTagIDs: map[int64]int64{},
  151. PosTags: map[string]float64{},
  152. NegTags: map[string]float64{},
  153. LastRecords: []model.Record4Dup{},
  154. }
  155. return
  156. }
  157. //LoadUserProfile load user info from redis parallel
  158. func (d *Dao) LoadUserProfile(c context.Context, mid int64, buvid string) (userProfile *model.UserProfile, err error) {
  159. tasks := make(map[string]workpool.Task)
  160. userProfile = d.InitUserProfile(c, mid, buvid)
  161. // lastPage
  162. if mid != 0 || buvid != "" {
  163. taskName := TaskLastPage
  164. key := fmt.Sprintf(_LastFewPageRecords2, buvid)
  165. if mid != 0 {
  166. key = fmt.Sprintf(_LastFewPageRecords1, mid)
  167. }
  168. task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "GET", key)
  169. tasks[taskName] = task
  170. }
  171. if mid != 0 || buvid != "" {
  172. taskName := TaskLastUpsPage
  173. key := fmt.Sprintf(_LastFewUpsPageRecords2, buvid)
  174. if mid != 0 {
  175. key = fmt.Sprintf(_LastFewUpsPageRecords1, mid)
  176. }
  177. task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "GET", key)
  178. tasks[taskName] = task
  179. }
  180. // user profile bili
  181. if mid != 0 {
  182. taskName := TaskBiliUserProfile
  183. key := fmt.Sprintf(_BiliUserProfileKey, mid)
  184. task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "HGETALL", key)
  185. tasks[taskName] = task
  186. }
  187. // user profile bbq: mid
  188. if mid != 0 {
  189. taskName := TaskBBQUserProfile
  190. key := fmt.Sprintf(_BBQUserProfileKey, mid)
  191. task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "HGETALL", key)
  192. tasks[taskName] = task
  193. }
  194. // user profile bbq: buvid
  195. if mid == 0 && buvid != "" {
  196. taskName := TaskBBQDeviceProfile
  197. key := fmt.Sprintf(_BBQDeviceProfileKey, buvid)
  198. task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
  199. tasks[taskName] = task
  200. }
  201. // user real time like
  202. today := time.Now().Format("20060102")
  203. yesterday := time.Now().AddDate(0, 0, -1).Format("20060102")
  204. if mid != 0 {
  205. taskName := TaskUserLike
  206. key := fmt.Sprintf(_RealTimeUserLike, mid, today)
  207. task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
  208. tasks[taskName] = task
  209. }
  210. if mid != 0 {
  211. taskName := TaskUserLikeYesterday
  212. key := fmt.Sprintf(_RealTimeUserLike, mid, yesterday)
  213. task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
  214. tasks[taskName] = task
  215. }
  216. if mid != 0 {
  217. taskName := TaskUserFollow
  218. key := fmt.Sprintf(_RealTimeUserFollow, mid, today)
  219. task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
  220. tasks[taskName] = task
  221. }
  222. if mid != 0 {
  223. taskName := TaskUserFollowYesterday
  224. key := fmt.Sprintf(_RealTimeUserFollow, mid, yesterday)
  225. task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
  226. tasks[taskName] = task
  227. }
  228. if mid != 0 {
  229. taskName := TaskUserPlay
  230. key := fmt.Sprintf(_RealTimeUserPlayMID, mid, today)
  231. task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
  232. tasks[taskName] = task
  233. }
  234. if mid != 0 {
  235. taskName := TaskUserPlayYesterday
  236. key := fmt.Sprintf(_RealTimeUserPlayMID, mid, yesterday)
  237. task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
  238. tasks[taskName] = task
  239. }
  240. if mid == 0 && buvid != "" {
  241. taskName := TaskDevicePlay
  242. key := fmt.Sprintf(_RealTimeUserPlayBuvID, buvid, today)
  243. task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
  244. tasks[taskName] = task
  245. }
  246. if mid == 0 && buvid != "" {
  247. taskName := TaskDevicePlayYesterday
  248. key := fmt.Sprintf(_RealTimeUserPlayBuvID, buvid, yesterday)
  249. task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
  250. tasks[taskName] = task
  251. }
  252. ftTasks := d.parallelTask2(tasks)
  253. for name, task := range ftTasks {
  254. var raw *[]byte
  255. raw, err = task.Wait(100 * time.Millisecond)
  256. if err != nil && err != redis.ErrNil {
  257. log.Errorv(c, log.KV("REDIS_GET_ERROR", err))
  258. continue
  259. }
  260. if raw == nil {
  261. continue
  262. }
  263. switch name {
  264. case TaskLastPage:
  265. setLastPage(raw, userProfile, "lastRecords")
  266. case TaskLastUpsPage:
  267. setLastPage(raw, userProfile, "lastUpsRecords")
  268. case TaskBiliUserProfile:
  269. setUserProfileBili(raw, err, userProfile)
  270. case TaskBBQDeviceProfile:
  271. setUserProfileBBQ(raw, err, userProfile)
  272. case TaskBBQUserProfile:
  273. setUserProfileBBQ(raw, err, userProfile)
  274. case TaskUserLikeYesterday:
  275. setUserLikeInfo(raw, err, userProfile)
  276. case TaskUserLike:
  277. setUserLikeInfo(raw, err, userProfile)
  278. case TaskUserFollowYesterday:
  279. setUserFollowInfo(raw, err, userProfile)
  280. case TaskUserFollow:
  281. setUserFollowInfo(raw, err, userProfile)
  282. case TaskUserPlayYesterday:
  283. setUserPlayInfo(raw, err, userProfile)
  284. case TaskDevicePlayYesterday:
  285. setUserPlayInfo(raw, err, userProfile)
  286. case TaskUserPlay:
  287. setUserPlayInfo(raw, err, userProfile)
  288. case TaskDevicePlay:
  289. setUserPlayInfo(raw, err, userProfile)
  290. }
  291. }
  292. if err == redis.ErrNil {
  293. err = nil
  294. }
  295. return
  296. }
  297. func setUserProfileBBQ(bytes *[]byte, inErr error, u *model.UserProfile) (err error) {
  298. var res map[string]string
  299. if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
  300. if err == redis.ErrNil {
  301. err = nil
  302. } else {
  303. log.Error("redis HGETALL failed error(%v)", err)
  304. }
  305. }
  306. for key, value := range res {
  307. if key == _Zone {
  308. zone2s := strings.Split(value, ",")
  309. for _, zone2 := range zone2s {
  310. u.BBQZones[zone2] = 1.0
  311. }
  312. } else if key == _Tag {
  313. tags := strings.Split(value, ",")
  314. for _, tag := range tags {
  315. u.BBQTags[tag] = 1.0
  316. }
  317. } else if key == _Up {
  318. ups := strings.Split(value, ",")
  319. for _, upStr := range ups {
  320. upMID, _ := strconv.ParseInt(upStr, 10, 64)
  321. u.BBQPrefUps[upMID] = 1
  322. }
  323. }
  324. }
  325. return
  326. }
  327. func setUserProfileBili(bytes *[]byte, inErr error, u *model.UserProfile) {
  328. var res map[string]string
  329. var err error
  330. if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
  331. if err == redis.ErrNil {
  332. err = nil
  333. } else {
  334. log.Error("redis HGETALL failed error(%v)", err)
  335. }
  336. }
  337. for key, value := range res {
  338. if key == _Zone {
  339. zone2s := strings.Split(value, ",")
  340. for _, zone2 := range zone2s {
  341. u.Zones2[zone2] = 1.0
  342. }
  343. } else if key == _Tag {
  344. tags := strings.Split(value, ",")
  345. for _, tag := range tags {
  346. u.BiliTags[tag] = 1.0
  347. }
  348. } else if key == _Up {
  349. ups := strings.Split(value, ",")
  350. for _, upStr := range ups {
  351. upMID, _ := strconv.ParseInt(upStr, 10, 64)
  352. u.FollowUps[upMID] = 1
  353. }
  354. }
  355. }
  356. }
  357. func setUserLikeInfo(bytes *[]byte, inErr error, u *model.UserProfile) {
  358. var object struct {
  359. SVID int64 `json:"svid"`
  360. CTime int64 `json:"ctime"`
  361. BuvID string `json:"buvid"`
  362. }
  363. var res map[string]string
  364. var err error
  365. if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
  366. if err != redis.ErrNil {
  367. log.Error("redis HGETALL failed error(%v)", err)
  368. }
  369. }
  370. for _, value := range res {
  371. err = jsoniter.UnmarshalFromString(value, &object)
  372. if err != nil {
  373. log.Error("json parse error: %v", err)
  374. }
  375. u.LikeVideos[object.SVID] = object.CTime
  376. }
  377. }
  378. func setUserFollowInfo(bytes *[]byte, inErr error, u *model.UserProfile) {
  379. var object struct {
  380. UpID int64 `json:"upid"`
  381. CTime int64 `json:"ctime"`
  382. MID int64 `json:"mid"`
  383. }
  384. var res map[string]string
  385. var err error
  386. if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
  387. if err != redis.ErrNil {
  388. log.Error("user real time follow redis HGETALL failed error(%v)", err)
  389. }
  390. }
  391. for _, value := range res {
  392. err = jsoniter.UnmarshalFromString(value, &object)
  393. if err != nil {
  394. log.Error("json parse error: %v", err)
  395. }
  396. u.BBQFollowAction[object.UpID] = object.CTime
  397. }
  398. }
  399. func setUserPlayInfo(bytes *[]byte, inErr error, u *model.UserProfile) {
  400. var object struct {
  401. Svid int64 `json:"svid"`
  402. CTime int64 `json:"ctime"`
  403. Duration int64 `json:"duration"`
  404. ViewDuration int64 `json:"viewDuration"`
  405. }
  406. var res map[string]string
  407. var err error
  408. if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
  409. if err != redis.ErrNil {
  410. log.Error("redis HGETALL failed error(%v)", err)
  411. } else {
  412. err = nil
  413. }
  414. }
  415. for _, value := range res {
  416. err = jsoniter.UnmarshalFromString(value, &object)
  417. if err != nil {
  418. log.Error("json parse error: %v", err)
  419. continue
  420. }
  421. u.ViewVideos = append(u.ViewVideos, object.Svid)
  422. if object.ViewDuration >= 15000 || (object.Duration >= 5000 && float64(object.ViewDuration) >= 0.95*float64(object.Duration)) {
  423. u.PosVideos[object.Svid] = object.CTime
  424. }
  425. if object.ViewDuration <= 500 {
  426. u.NegVideos[object.Svid] = object.CTime
  427. }
  428. }
  429. }
  430. func setLastPage(bytes *[]byte, u *model.UserProfile, lastRecordType string) {
  431. var results []model.Record4Dup
  432. if len(*bytes) == 0 {
  433. return
  434. }
  435. err := jsoniter.Unmarshal(*bytes, &results)
  436. if err != nil {
  437. log.Error("UnmarshalFromString value(%v) error(%v)", bytes, err)
  438. } else {
  439. if lastRecordType == "lastRecords" {
  440. u.LastRecords = results
  441. } else {
  442. u.LastUpsRecords = results
  443. }
  444. }
  445. }