123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496 |
- package dao
- import (
- "context"
- "fmt"
- "strconv"
- "strings"
- "time"
- "unsafe"
- recsys "go-common/app/service/bbq/recsys/api/grpc/v1"
- "go-common/app/service/bbq/recsys/dao/parallel"
- "go-common/app/service/bbq/recsys/model"
- "go-common/library/cache/redis"
- "go-common/library/log"
- "github.com/Dai0522/workpool"
- "github.com/json-iterator/go"
- )
- //user const
- const (
- TaskLastPage = "TaskLastPage"
- TaskLastUpsPage = "TaskLastUpsPage"
- TaskBiliUserProfile = "TaskBiliUserProfile"
- TaskBBQUserProfile = "TaskBBQUserProfile"
- TaskBBQDeviceProfile = "TaskBBQDeviceProfile"
- TaskUserLike = "TaskUserLike"
- TaskUserLikeYesterday = "TaskUserLikeYesterday"
- TaskUserPlay = "TaskUserPlay"
- TaskUserPlayYesterday = "TaskUserPlayYesterday"
- TaskDevicePlay = "TaskDevicePlay"
- TaskDevicePlayYesterday = "TaskDevicePlayYesterday"
- TaskUserFollow = "TaskUserFollow"
- TaskUserFollowYesterday = "TaskUserFollowYesterday"
- //_BBQDeviceProfileKey = "bbq:device:profile:%s"
- _BBQDeviceProfileKey = "bbq:device:profile:{buvid}:%s"
- _BBQUserProfileKey = "bbq:user:profile:%d"
- _BiliUserProfileKey = "bbq:user:basic:%d"
- _LastFewPageRecords1 = "bbq:last:v1:mid:%d"
- _LastFewPageRecords2 = "bbq:last:v1:buvid:%s"
- _LastFewUpsPageRecords1 = "bbq:last:v1:ups:mid:%d"
- _LastFewUpsPageRecords2 = "bbq:last:v1:ups:buvid:%s"
- _RealTimeUserLike = "storm:v2:u:%d:like:%s"
- _RealTimeUserPlayMID = "storm:v2:u:%d:%s:view:100"
- _RealTimeUserPlayBuvID = "storm:v2:u:%s:%s:view:100"
- _RealTimeUserFollow = "storm:v2:u:%d:%s:follow:100"
- _ModelTest = "bbq:model:init"
- _Zone = "zone"
- _Tag = "tag"
- _Up = "up"
- )
- //LastPageRedisKey for main rec process
- func (d *Dao) LastPageRedisKey(mid int64, buvid string) (key string) {
- if mid > 0 {
- key = fmt.Sprintf(_LastFewPageRecords1, mid)
- } else {
- key = fmt.Sprintf(_LastFewPageRecords2, buvid)
- }
- return
- }
- //LastUpsPageRedisKey for ups rec process
- func (d *Dao) LastUpsPageRedisKey(mid int64, buvid string) (key string) {
- if mid > 0 {
- key = fmt.Sprintf(_LastFewUpsPageRecords1, mid)
- } else {
- key = fmt.Sprintf(_LastFewUpsPageRecords2, buvid)
- }
- return
- }
- //InitModel ...
- func (d *Dao) InitModel(c context.Context, weights map[string]float64) (err error) {
- conn := d.redis.Get(c)
- defer conn.Close()
- key := _ModelTest
- if result, err := redis.String(conn.Do("GET", key)); err == nil {
- for _, field := range strings.Split(result, ",") {
- featureWeightPair := strings.Split(field, ":")
- if len(featureWeightPair) >= 2 {
- feature := featureWeightPair[0]
- weight, _ := strconv.ParseFloat(featureWeightPair[1], 64)
- weights[feature] = weight
- }
- }
- }
- return
- }
- //StoreRecResults store rec or upsRec history according to getKeyFunc
- func (d *Dao) StoreRecResults(c context.Context, u *model.UserProfile, mid int64, buvid string, response *recsys.RecsysResponse, getKeyFunc func(int64, string) string, lastRecords []model.Record4Dup) (err error) {
- conn := d.redis.Get(c)
- defer conn.Close()
- key := getKeyFunc(mid, buvid)
- maxPageNum := 10
- size := len(response.List)
- if len(lastRecords) > maxPageNum*size {
- lastRecords = lastRecords[size:]
- }
- for _, record := range response.List {
- svid := record.Svid
- mid, ok1 := record.Map[model.UperMid]
- tag, ok2 := record.Map[model.ScatterTag]
- if ok1 && ok2 {
- lastRecords = append(lastRecords, model.Record4Dup{
- SVID: svid,
- MID: mid,
- Tag: tag,
- })
- }
- }
- bytes, _ := jsoniter.Marshal(lastRecords)
- _, err = conn.Do("SETEX", key, 86400, bytes)
- if err != nil {
- log.Error("store last few records error: ", err)
- }
- ////for test
- //if mid == 28272030 || mid == 390642849 {
- // return
- //}
- // write bloomfilter for es
- svids := make([]uint64, len(response.List))
- for i, v := range response.List {
- svids[i] = uint64(v.Svid)
- }
- if _, bfErr := d.WriteBF(c, mid, buvid, svids); bfErr != nil {
- log.Errorv(c, log.KV("Write BF error: ", bfErr))
- }
- return
- }
- //InitUserProfile ...
- func (d *Dao) InitUserProfile(c context.Context, mid int64, buvid string) (u *model.UserProfile) {
- u = &model.UserProfile{
- Mid: mid,
- Buvid: buvid,
- Name: "",
- Gender: -1,
- ViewVideos: []int64{},
- Zones1: map[string]float64{},
- BiliTags: map[string]float64{}, //bili
- Zones2: map[string]float64{}, //bili
- FollowUps: map[int64]int64{}, //bili
- BBQTags: map[string]float64{}, //bbq
- BBQZones: map[string]float64{}, //bbq
- BBQPrefUps: map[int64]int64{}, //bbq
- BBQFollowAction: map[int64]int64{}, //bbq
- BBQFollow: map[int64]int64{}, //bbq
- BBQBlack: map[int64]int64{}, //bbq
- PosVideos: map[int64]int64{},
- NegVideos: map[int64]int64{},
- LikeVideos: map[int64]int64{},
- LikeTags: map[string]float64{},
- LikeTagIDs: map[int64]int64{},
- LikeUPs: map[int64]int64{},
- PosTagIDs: map[int64]int64{},
- NegTagIDs: map[int64]int64{},
- PosTags: map[string]float64{},
- NegTags: map[string]float64{},
- LastRecords: []model.Record4Dup{},
- }
- return
- }
- //LoadUserProfile load user info from redis parallel
- func (d *Dao) LoadUserProfile(c context.Context, mid int64, buvid string) (userProfile *model.UserProfile, err error) {
- tasks := make(map[string]workpool.Task)
- userProfile = d.InitUserProfile(c, mid, buvid)
- // lastPage
- if mid != 0 || buvid != "" {
- taskName := TaskLastPage
- key := fmt.Sprintf(_LastFewPageRecords2, buvid)
- if mid != 0 {
- key = fmt.Sprintf(_LastFewPageRecords1, mid)
- }
- task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "GET", key)
- tasks[taskName] = task
- }
- if mid != 0 || buvid != "" {
- taskName := TaskLastUpsPage
- key := fmt.Sprintf(_LastFewUpsPageRecords2, buvid)
- if mid != 0 {
- key = fmt.Sprintf(_LastFewUpsPageRecords1, mid)
- }
- task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "GET", key)
- tasks[taskName] = task
- }
- // user profile bili
- if mid != 0 {
- taskName := TaskBiliUserProfile
- key := fmt.Sprintf(_BiliUserProfileKey, mid)
- task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "HGETALL", key)
- tasks[taskName] = task
- }
- // user profile bbq: mid
- if mid != 0 {
- taskName := TaskBBQUserProfile
- key := fmt.Sprintf(_BBQUserProfileKey, mid)
- task := parallel.NewRedisTaskWithName(&c, taskName, d.redis, "HGETALL", key)
- tasks[taskName] = task
- }
- // user profile bbq: buvid
- if mid == 0 && buvid != "" {
- taskName := TaskBBQDeviceProfile
- key := fmt.Sprintf(_BBQDeviceProfileKey, buvid)
- task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
- tasks[taskName] = task
- }
- // user real time like
- today := time.Now().Format("20060102")
- yesterday := time.Now().AddDate(0, 0, -1).Format("20060102")
- if mid != 0 {
- taskName := TaskUserLike
- key := fmt.Sprintf(_RealTimeUserLike, mid, today)
- task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
- tasks[taskName] = task
- }
- if mid != 0 {
- taskName := TaskUserLikeYesterday
- key := fmt.Sprintf(_RealTimeUserLike, mid, yesterday)
- task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
- tasks[taskName] = task
- }
- if mid != 0 {
- taskName := TaskUserFollow
- key := fmt.Sprintf(_RealTimeUserFollow, mid, today)
- task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
- tasks[taskName] = task
- }
- if mid != 0 {
- taskName := TaskUserFollowYesterday
- key := fmt.Sprintf(_RealTimeUserFollow, mid, yesterday)
- task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
- tasks[taskName] = task
- }
- if mid != 0 {
- taskName := TaskUserPlay
- key := fmt.Sprintf(_RealTimeUserPlayMID, mid, today)
- task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
- tasks[taskName] = task
- }
- if mid != 0 {
- taskName := TaskUserPlayYesterday
- key := fmt.Sprintf(_RealTimeUserPlayMID, mid, yesterday)
- task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
- tasks[taskName] = task
- }
- if mid == 0 && buvid != "" {
- taskName := TaskDevicePlay
- key := fmt.Sprintf(_RealTimeUserPlayBuvID, buvid, today)
- task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
- tasks[taskName] = task
- }
- if mid == 0 && buvid != "" {
- taskName := TaskDevicePlayYesterday
- key := fmt.Sprintf(_RealTimeUserPlayBuvID, buvid, yesterday)
- task := parallel.NewRedisTask(&c, d.redis, "HGETALL", key)
- tasks[taskName] = task
- }
- ftTasks := d.parallelTask2(tasks)
- for name, task := range ftTasks {
- var raw *[]byte
- raw, err = task.Wait(100 * time.Millisecond)
- if err != nil && err != redis.ErrNil {
- log.Errorv(c, log.KV("REDIS_GET_ERROR", err))
- continue
- }
- if raw == nil {
- continue
- }
- switch name {
- case TaskLastPage:
- setLastPage(raw, userProfile, "lastRecords")
- case TaskLastUpsPage:
- setLastPage(raw, userProfile, "lastUpsRecords")
- case TaskBiliUserProfile:
- setUserProfileBili(raw, err, userProfile)
- case TaskBBQDeviceProfile:
- setUserProfileBBQ(raw, err, userProfile)
- case TaskBBQUserProfile:
- setUserProfileBBQ(raw, err, userProfile)
- case TaskUserLikeYesterday:
- setUserLikeInfo(raw, err, userProfile)
- case TaskUserLike:
- setUserLikeInfo(raw, err, userProfile)
- case TaskUserFollowYesterday:
- setUserFollowInfo(raw, err, userProfile)
- case TaskUserFollow:
- setUserFollowInfo(raw, err, userProfile)
- case TaskUserPlayYesterday:
- setUserPlayInfo(raw, err, userProfile)
- case TaskDevicePlayYesterday:
- setUserPlayInfo(raw, err, userProfile)
- case TaskUserPlay:
- setUserPlayInfo(raw, err, userProfile)
- case TaskDevicePlay:
- setUserPlayInfo(raw, err, userProfile)
- }
- }
- if err == redis.ErrNil {
- err = nil
- }
- return
- }
- func setUserProfileBBQ(bytes *[]byte, inErr error, u *model.UserProfile) (err error) {
- var res map[string]string
- if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
- if err == redis.ErrNil {
- err = nil
- } else {
- log.Error("redis HGETALL failed error(%v)", err)
- }
- }
- for key, value := range res {
- if key == _Zone {
- zone2s := strings.Split(value, ",")
- for _, zone2 := range zone2s {
- u.BBQZones[zone2] = 1.0
- }
- } else if key == _Tag {
- tags := strings.Split(value, ",")
- for _, tag := range tags {
- u.BBQTags[tag] = 1.0
- }
- } else if key == _Up {
- ups := strings.Split(value, ",")
- for _, upStr := range ups {
- upMID, _ := strconv.ParseInt(upStr, 10, 64)
- u.BBQPrefUps[upMID] = 1
- }
- }
- }
- return
- }
- func setUserProfileBili(bytes *[]byte, inErr error, u *model.UserProfile) {
- var res map[string]string
- var err error
- if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
- if err == redis.ErrNil {
- err = nil
- } else {
- log.Error("redis HGETALL failed error(%v)", err)
- }
- }
- for key, value := range res {
- if key == _Zone {
- zone2s := strings.Split(value, ",")
- for _, zone2 := range zone2s {
- u.Zones2[zone2] = 1.0
- }
- } else if key == _Tag {
- tags := strings.Split(value, ",")
- for _, tag := range tags {
- u.BiliTags[tag] = 1.0
- }
- } else if key == _Up {
- ups := strings.Split(value, ",")
- for _, upStr := range ups {
- upMID, _ := strconv.ParseInt(upStr, 10, 64)
- u.FollowUps[upMID] = 1
- }
- }
- }
- }
- func setUserLikeInfo(bytes *[]byte, inErr error, u *model.UserProfile) {
- var object struct {
- SVID int64 `json:"svid"`
- CTime int64 `json:"ctime"`
- BuvID string `json:"buvid"`
- }
- var res map[string]string
- var err error
- if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
- if err != redis.ErrNil {
- log.Error("redis HGETALL failed error(%v)", err)
- }
- }
- for _, value := range res {
- err = jsoniter.UnmarshalFromString(value, &object)
- if err != nil {
- log.Error("json parse error: %v", err)
- }
- u.LikeVideos[object.SVID] = object.CTime
- }
- }
- func setUserFollowInfo(bytes *[]byte, inErr error, u *model.UserProfile) {
- var object struct {
- UpID int64 `json:"upid"`
- CTime int64 `json:"ctime"`
- MID int64 `json:"mid"`
- }
- var res map[string]string
- var err error
- if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
- if err != redis.ErrNil {
- log.Error("user real time follow redis HGETALL failed error(%v)", err)
- }
- }
- for _, value := range res {
- err = jsoniter.UnmarshalFromString(value, &object)
- if err != nil {
- log.Error("json parse error: %v", err)
- }
- u.BBQFollowAction[object.UpID] = object.CTime
- }
- }
- func setUserPlayInfo(bytes *[]byte, inErr error, u *model.UserProfile) {
- var object struct {
- Svid int64 `json:"svid"`
- CTime int64 `json:"ctime"`
- Duration int64 `json:"duration"`
- ViewDuration int64 `json:"viewDuration"`
- }
- var res map[string]string
- var err error
- if res, err = redis.StringMap(*(*interface{})(unsafe.Pointer(bytes)), inErr); err != nil {
- if err != redis.ErrNil {
- log.Error("redis HGETALL failed error(%v)", err)
- } else {
- err = nil
- }
- }
- for _, value := range res {
- err = jsoniter.UnmarshalFromString(value, &object)
- if err != nil {
- log.Error("json parse error: %v", err)
- continue
- }
- u.ViewVideos = append(u.ViewVideos, object.Svid)
- if object.ViewDuration >= 15000 || (object.Duration >= 5000 && float64(object.ViewDuration) >= 0.95*float64(object.Duration)) {
- u.PosVideos[object.Svid] = object.CTime
- }
- if object.ViewDuration <= 500 {
- u.NegVideos[object.Svid] = object.CTime
- }
- }
- }
- func setLastPage(bytes *[]byte, u *model.UserProfile, lastRecordType string) {
- var results []model.Record4Dup
- if len(*bytes) == 0 {
- return
- }
- err := jsoniter.Unmarshal(*bytes, &results)
- if err != nil {
- log.Error("UnmarshalFromString value(%v) error(%v)", bytes, err)
- } else {
- if lastRecordType == "lastRecords" {
- u.LastRecords = results
- } else {
- u.LastUpsRecords = results
- }
- }
- }
|