|
- package service
- import (
- "context"
- "fmt"
- "sort"
- "strconv"
- "time"
- "go-common/app/interface/main/history/model"
- arcmdl "go-common/app/service/main/archive/model/archive"
- "go-common/library/ecode"
- "go-common/library/log"
- "go-common/library/net/metadata"
- )
- var (
- _countLimit = 50
- _emptyVideos = []*model.Video{}
- _emptyHis = []*model.History{}
- _emptyHismap = make(map[int64]*model.History)
- )
- func (s *Service) key(aid int64, typ int8) string {
- if typ < model.TypeArticle {
- return strconv.FormatInt(aid, 10)
- }
- return fmt.Sprintf("%d_%d", aid, typ)
- }
- // AddHistories batch update history.
- // +wd:ignore
- func (s *Service) AddHistories(c context.Context, mid int64, typ int8, ip string, hs []*model.History) (err error) {
- var (
- ok bool
- his, h *model.History
- hm2 map[string]*model.History
- hmc, res map[int64]*model.History
- expire = time.Now().Unix() - 60*60*24*90
- )
- if len(hs) > _countLimit {
- return ecode.TargetNumberLimit
- }
- s.serviceAdds(mid, hs)
- if hm2, err = s.historyDao.Map(c, mid); err != nil {
- return
- }
- if typ < model.TypeArticle {
- if hmc, err = s.historyDao.CacheMap(c, mid); err != nil {
- return err
- }
- }
- res = make(map[int64]*model.History)
- for _, his = range hs {
- if his.Unix < expire {
- continue
- }
- if len(hm2) > 0 {
- if h, ok = hm2[s.key(his.Aid, his.TP)]; ok && his.Unix < h.Unix {
- continue
- }
- }
- if len(hmc) > 0 {
- if h, ok = hmc[his.Aid]; ok && his.Unix < h.Unix {
- continue
- }
- }
- // TODO comment && merge
- res[his.Aid] = his
- }
- if err = s.historyDao.AddMap(c, mid, res); err == nil {
- return
- }
- if typ < model.TypeArticle {
- s.historyDao.AddCacheMap(c, mid, res)
- }
- return nil
- }
- // AddHistory add hisotry progress into hbase.
- func (s *Service) AddHistory(c context.Context, mid, rtime int64, h *model.History) (err error) {
- if h.TP < model.TypeUnknown || h.TP > model.TypeComic {
- err = ecode.RequestErr
- return
- }
- if h.Aid == 0 {
- return ecode.RequestErr
- }
- if h.TP == model.TypeBangumi || h.TP == model.TypeMovie || h.TP == model.TypePGC {
- msg := playPro{
- Type: h.TP,
- SubType: h.STP,
- Mid: mid,
- Sid: h.Sid,
- Epid: h.Epid,
- Cid: h.Cid,
- Progress: h.Pro,
- IP: metadata.String(c, metadata.RemoteIP),
- Ts: h.Unix,
- RealTime: rtime,
- }
- s.addPlayPro(&msg)
- }
- // NOTE if login user to save history
- if mid == 0 {
- return
- }
- return s.addHistory(c, mid, h)
- }
- // Progress get view progress from cache/hbase.
- func (s *Service) Progress(c context.Context, mid int64, aids []int64) (res map[int64]*model.History, err error) {
- if mid == 0 {
- res = _emptyHismap
- return
- }
- if s.migration(mid) {
- res, err = s.servicePosition(c, mid, model.BusinessByTP(model.TypeUGC), aids)
- if err == nil {
- return
- }
- }
- if res, _, err = s.historyDao.Cache(c, mid, aids); err != nil {
- return
- } else if len(res) == 0 {
- res = _emptyHismap
- }
- return
- }
- // Position get view progress from cache/hbase.
- func (s *Service) Position(c context.Context, mid int64, aid int64, typ int8) (res *model.History, err error) {
- if mid == 0 {
- err = ecode.NothingFound
- return
- }
- if s.migration(mid) {
- var hm map[int64]*model.History
- hm, err = s.servicePosition(c, mid, model.BusinessByTP(typ), []int64{aid})
- if err == nil && hm != nil {
- if res = hm[aid]; res == nil {
- err = ecode.NothingFound
- }
- return
- }
- }
- if typ < model.TypeArticle {
- var hm map[int64]*model.History
- hm, _, err = s.historyDao.Cache(c, mid, []int64{aid})
- if err != nil {
- return
- }
- if len(hm) > 0 {
- if h, ok := hm[aid]; ok {
- res = h
- }
- }
- if res == nil {
- err = ecode.NothingFound
- }
- return
- }
- var mhis map[string]*model.History
- mhis, err = s.historyDao.Map(c, mid)
- if err != nil {
- return
- }
- if len(mhis) > 0 {
- key := fmt.Sprintf("%d_%d", aid, typ)
- if h, ok := mhis[key]; ok {
- res = h
- }
- }
- if res == nil {
- err = ecode.NothingFound
- }
- return
- }
- // addHistory add new history into set.
- func (s *Service) addHistory(c context.Context, mid int64, h *model.History) (err error) {
- var cmd int64
- // note: the type is video to increase experience of user .
- if h.TP < model.TypeArticle {
- s.historyDao.PushFirstQueue(c, mid, h.Aid, h.Unix)
- }
- if cmd, err = s.Shadow(c, mid); err != nil {
- return
- }
- if cmd == model.ShadowOn {
- return
- }
- h.Mid = mid
- s.serviceAdd(h)
- // note: the type is video to redis`cache .
- if h.TP >= model.TypeArticle {
- s.addProPub(h)
- if !s.conf.History.Pub {
- err = s.historyDao.Add(c, mid, h)
- }
- return
- }
- // NOTE first view
- if h.Pro < 30 && h.Pro != -1 {
- h.Pro = 0
- }
- // NOTE after 30s
- if err = s.historyDao.AddCache(c, mid, h); err != nil {
- return
- }
- s.addMerge(mid, h.Unix)
- return
- }
- // ClearHistory clear user's historys.
- func (s *Service) ClearHistory(c context.Context, mid int64, tps []int8) (err error) {
- s.serviceClear(mid, tps)
- if len(tps) == 0 {
- s.historyDao.ClearCache(c, mid)
- err = s.historyDao.Clear(c, mid)
- s.userActionLog(mid, model.HistoryClear)
- return
- }
- tpsMap := make(map[int8]bool)
- for _, tp := range tps {
- tpsMap[tp] = true
- }
- var (
- histories map[string]*model.History
- dels []*model.History
- chis map[int64]*model.History
- aids []int64
- )
- if histories, err = s.historyDao.Map(c, mid); err != nil {
- return
- }
- chis, _ = s.historyDao.CacheMap(c, mid)
- for _, v := range chis {
- histories[s.key(v.Aid, v.TP)] = v
- }
- logMap := make(map[int8]struct{})
- for _, h := range histories {
- oldType := h.TP
- h.ConvertType()
- if tpsMap[h.TP] {
- if (h.TP == model.TypeUGC) || (h.TP == model.TypePGC) {
- aids = append(aids, h.Aid)
- }
- h.TP = oldType
- dels = append(dels, h)
- logMap[oldType] = struct{}{}
- }
- }
- if len(dels) == 0 {
- return
- }
- if len(aids) > 0 {
- s.historyDao.DelCache(c, mid, aids)
- }
- if err = s.historyDao.Delete(c, mid, dels); err != nil {
- return
- }
- for k := range logMap {
- s.userActionLog(mid, fmt.Sprintf(model.HistoryClearTyp, model.BusinessByTP(k)))
- }
- return
- }
- // DelHistory delete user's history del archive .
- // +wd:ignore
- func (s *Service) DelHistory(ctx context.Context, mid int64, aids []int64, typ int8) (err error) {
- if err = s.serviceDels(ctx, mid, aids, typ); err != nil {
- return
- }
- if err = s.historyDao.DelAids(ctx, mid, aids); err != nil {
- return
- }
- if typ >= model.TypeArticle {
- return
- }
- return s.historyDao.DelCache(ctx, mid, aids)
- }
- // Videos get videos of user view history.
- // +wd:ignore
- func (s *Service) Videos(c context.Context, mid int64, pn, ps int, typ int8) (res []*model.Video, err error) {
- var (
- arc *arcmdl.View3
- ok bool
- arcs map[int64]*arcmdl.View3
- video *model.Video
- history []*model.History
- his *model.History
- aids, epids []int64
- aidFavs map[int64]bool
- epban map[int64]*model.Bangumi
- )
- res = _emptyVideos
- mOK := s.migration(mid)
- if mOK {
- businesses := []string{model.BusinessByTP(model.TypeUGC), model.BusinessByTP(model.TypePGC)}
- history, epids, err = s.servicePnPsCursor(c, mid, businesses, pn, ps)
- }
- if !mOK || err != nil {
- if history, epids, err = s.histories(c, mid, pn, ps, true); err != nil {
- return
- }
- }
- if len(history) == 0 {
- return
- }
- for _, his = range history {
- aids = append(aids, his.Aid)
- }
- if typ >= model.TypeArticle {
- // TODO Article info .
- return
- }
- // bangumi info
- if len(epids) > 0 {
- epban = s.bangumi(c, mid, epids)
- }
- // archive info
- arcAids := &arcmdl.ArgAids2{Aids: aids}
- if arcs, err = s.arcRPC.Views3(c, arcAids); err != nil {
- return
- } else if len(arcs) == 0 {
- return
- }
- // favorite info
- aidFavs = s.favoriteds(c, mid, aids)
- res = make([]*model.Video, 0, len(aids))
- for _, his = range history {
- if arc, ok = arcs[his.Aid]; !ok || arc.Archive3 == nil {
- continue
- }
- // NOTE all no pay
- arc.Rights.Movie = 0
- video = &model.Video{
- Archive3: arc.Archive3,
- ViewAt: his.Unix,
- DT: his.DT,
- STP: his.STP,
- TP: his.TP,
- Progress: his.Pro,
- Count: len(arc.Pages),
- }
- if aidFavs != nil {
- video.Favorite = aidFavs[his.Aid]
- }
- for n, p := range arc.Pages {
- if p.Cid == his.Cid {
- p.Page = int32(n + 1)
- video.Page = p
- break
- }
- }
- if video.TP == model.TypeBangumi || video.TP == model.TypeMovie || video.TP == model.TypePGC {
- if epban != nil {
- video.BangumiInfo = epban[his.Epid]
- }
- video.Count = 0
- }
- res = append(res, video)
- }
- return
- }
- // AVHistories return the user all av history.
- // +wd:ignore
- func (s *Service) AVHistories(c context.Context, mid int64) (hs []*model.History, err error) {
- if s.migration(mid) {
- businesses := []string{model.BusinessByTP(model.TypeUGC), model.BusinessByTP(model.TypePGC)}
- hs, _, err = s.servicePnPsCursor(c, mid, businesses, 1, s.conf.History.Max)
- if err == nil {
- return
- }
- }
- hs, _, err = s.histories(c, mid, 1, s.conf.History.Max, true)
- return
- }
- // Histories return the user all av history.
- func (s *Service) Histories(c context.Context, mid int64, typ int8, pn, ps int) (res []*model.Resource, err error) {
- var hs []*model.History
- mOK := s.migration(mid)
- if mOK {
- var businesses []string
- if typ > 0 {
- businesses = []string{model.BusinessByTP(typ)}
- }
- hs, _, err = s.servicePnPsCursor(c, mid, businesses, pn, ps)
- }
- if !mOK || err != nil {
- if typ >= model.TypeArticle {
- hs, err = s.platformHistories(c, mid, typ, pn, ps)
- } else {
- hs, _, err = s.histories(c, mid, pn, ps, false)
- }
- }
- if err != nil {
- return
- }
- if len(hs) == 0 {
- return
- }
- for _, h := range hs {
- h.ConvertType()
- r := &model.Resource{
- Mid: h.Mid,
- Oid: h.Aid,
- Sid: h.Sid,
- Epid: h.Epid,
- Cid: h.Cid,
- DT: h.DT,
- Pro: h.Pro,
- Unix: h.Unix,
- TP: h.TP,
- STP: h.STP,
- }
- res = append(res, r)
- }
- return
- }
- // histories get aids of user view history
- func (s *Service) histories(c context.Context, mid int64, pn, ps int, onlyAV bool) (his []*model.History, epids []int64, err error) {
- var (
- size int
- ok bool
- e, h *model.History
- mhis map[string]*model.History
- chis, ehis map[int64]*model.History
- dhis []*model.History
- start = (pn - 1) * ps
- end = start + ps - 1
- total = s.conf.History.Total
- )
- if mhis, err = s.historyDao.Map(c, mid); err != nil {
- err = nil
- mhis = make(map[string]*model.History)
- }
- chis, _ = s.historyDao.CacheMap(c, mid)
- for _, v := range chis {
- mhis[s.key(v.Aid, v.TP)] = v
- }
- if len(mhis) == 0 {
- his = _emptyHis
- return
- }
- ehis = make(map[int64]*model.History, len(mhis))
- for _, h = range mhis {
- if onlyAV && h.TP >= model.TypeArticle {
- continue
- }
- if (h.TP == model.TypeBangumi || h.TP == model.TypeMovie || h.TP == model.TypePGC) && h.Sid != 0 {
- if e, ok = ehis[h.Sid]; !ok || h.Unix > e.Unix {
- ehis[h.Sid] = h
- if e != nil {
- dhis = append(dhis, e)
- }
- }
- } else {
- his = append(his, h)
- }
- }
- for _, h = range ehis {
- if h.Epid != 0 {
- epids = append(epids, h.Epid)
- }
- his = append(his, h)
- }
- sort.Sort(model.Histories(his))
- if size = len(his); size > total {
- dhis = append(dhis, his[total:]...)
- s.delChan.Do(c, func(ctx context.Context) {
- s.historyDao.Delete(ctx, mid, dhis)
- })
- his = his[:total]
- size = total
- }
- switch {
- case size > start && size > end:
- his = his[start : end+1]
- case size > start && size <= end:
- his = his[start:]
- default:
- his = _emptyHis
- }
- return
- }
- // platformHistories get aids of user view history
- func (s *Service) platformHistories(c context.Context, mid int64, typ int8, pn, ps int) (his []*model.History, err error) {
- var (
- size int
- h *model.History
- mhis map[string]*model.History
- start = (pn - 1) * ps
- end = start + ps - 1
- total = s.conf.History.Total
- )
- if mhis, err = s.historyDao.Map(c, mid); err != nil {
- return
- }
- if len(mhis) == 0 {
- his = _emptyHis
- return
- }
- for _, h = range mhis {
- if typ != h.TP {
- continue
- }
- his = append(his, h)
- }
- sort.Sort(model.Histories(his))
- if size = len(his); size > total {
- his = his[:total]
- size = total
- }
- switch {
- case size > start && size > end:
- his = his[start : end+1]
- case size > start && size <= end:
- his = his[start:]
- default:
- his = _emptyHis
- }
- return
- }
- // HistoryType get aids of user view history
- // +wd:ignore
- func (s *Service) HistoryType(c context.Context, mid int64, typ int8, oids []int64, ip string) (his []*model.History, err error) {
- var (
- mhis map[string]*model.History
- )
- if s.migration(mid) {
- his, err = s.serviceHistoryType(c, mid, model.BusinessByTP(typ), oids)
- if err == nil {
- return
- }
- }
- if mhis, err = s.historyDao.Map(c, mid); err != nil {
- return
- }
- if len(mhis) == 0 {
- his = _emptyHis
- return
- }
- for _, oid := range oids {
- key := fmt.Sprintf("%d_%d", oid, typ)
- if h, ok := mhis[key]; ok && h != nil {
- his = append(his, h)
- }
- }
- return
- }
- // HistoryCursor return the user all av history.
- func (s *Service) HistoryCursor(c context.Context, mid, max, viewAt int64, ps int, tp int8, tps []int8, ip string) (res []*model.Resource, err error) {
- var hs []*model.History
- if s.migration(mid) {
- var businesses []string
- for _, b := range tps {
- businesses = append(businesses, model.BusinessByTP(b))
- }
- res, err = s.serviceHistoryCursor(c, mid, max, businesses, model.BusinessByTP(tp), viewAt, ps)
- if err == nil {
- return
- }
- }
- hs, err = s.historyCursor(c, mid, max, viewAt, ps, tp, tps, ip)
- if err != nil {
- return
- }
- if len(hs) == 0 {
- return
- }
- for _, h := range hs {
- r := &model.Resource{
- TP: h.TP,
- STP: h.STP,
- Mid: h.Mid,
- Oid: h.Aid,
- Sid: h.Sid,
- Epid: h.Epid,
- Cid: h.Cid,
- Business: h.Business,
- DT: h.DT,
- Pro: h.Pro,
- Unix: h.Unix,
- }
- res = append(res, r)
- }
- return
- }
- // historyCursor get aids of user view history.
- func (s *Service) historyCursor(c context.Context, mid, max, viewAt int64, ps int, tp int8, tps []int8, ip string) (his []*model.History, err error) {
- var (
- ok bool
- e, h *model.History
- mhis map[string]*model.History
- chis, ehis map[int64]*model.History
- dhis []*model.History
- total = s.conf.History.Total
- tpMap = make(map[int8]bool)
- )
- for _, tp := range tps {
- tpMap[tp] = true
- }
- if mhis, err = s.historyDao.Map(c, mid); err != nil {
- err = nil
- mhis = make(map[string]*model.History)
- }
- for k, v := range mhis {
- v.ConvertType()
- if (len(tps) > 0) && !tpMap[v.TP] {
- delete(mhis, k)
- }
- }
- chis, _ = s.historyDao.CacheMap(c, mid)
- for k, v := range chis {
- v.ConvertType()
- if (len(tps) > 0) && !tpMap[v.TP] {
- delete(chis, k)
- continue
- }
- mhis[s.key(v.Aid, v.TP)] = v
- }
- if len(mhis) == 0 {
- his = _emptyHis
- return
- }
- ehis = make(map[int64]*model.History, len(mhis))
- for _, h = range mhis {
- if (h.TP == model.TypePGC) && h.Sid != 0 {
- if e, ok = ehis[h.Sid]; !ok || h.Unix > e.Unix {
- ehis[h.Sid] = h
- if e != nil {
- dhis = append(dhis, e)
- }
- }
- } else {
- his = append(his, h)
- }
- }
- for _, h = range ehis {
- his = append(his, h)
- }
- sort.Sort(model.Histories(his))
- if len(his) > total {
- dhis = append(dhis, his[total:]...)
- s.delChan.Do(c, func(ctx context.Context) {
- s.historyDao.Delete(ctx, mid, dhis)
- })
- his = his[:total]
- }
- if viewAt != 0 || max != 0 && tp != 0 {
- for index, h := range his {
- if viewAt != 0 && h.Unix <= viewAt || h.Aid == max && h.TP == tp {
- index++
- if index+ps <= len(his) {
- return his[index : index+ps], nil
- }
- return his[index:], nil
- }
- }
- return _emptyHis, nil
- }
- if len(his) >= ps {
- return his[:ps], nil
- }
- return
- }
- // SetShadow set the user switch.
- // +wd:ignore
- func (s *Service) SetShadow(c context.Context, mid, value int64) (err error) {
- s.serviceHide(mid, value == model.ShadowOn)
- if err = s.historyDao.SetInfoShadow(c, mid, value); err != nil {
- return
- }
- return s.historyDao.SetShadowCache(c, mid, value)
- }
- // Delete .
- func (s *Service) Delete(ctx context.Context, mid int64, his []*model.History) (err error) {
- if err = s.serviceDel(ctx, mid, his); err != nil {
- return
- }
- if err = s.historyDao.Delete(ctx, mid, his); err != nil {
- return
- }
- var aids []int64
- for _, h := range his {
- if h.TP < model.TypeArticle {
- aids = append(aids, h.Aid)
- }
- }
- if len(aids) == 0 {
- return
- }
- return s.historyDao.DelCache(ctx, mid, aids)
- }
- // Shadow return the user switch by mid.
- // +wd:ignore
- func (s *Service) Shadow(c context.Context, mid int64) (value int64, err error) {
- var (
- ok bool
- cache = true
- )
- if s.migration(mid) {
- value, err = s.serviceHideState(c, mid)
- if err == nil {
- return
- }
- }
- if value, err = s.historyDao.ShadowCache(c, mid); err != nil {
- err = nil
- cache = false
- } else if value != model.ShadowUnknown {
- return
- }
- if value, err = s.historyDao.InfoShadow(c, mid); err != nil {
- ok = true
- }
- if cache {
- s.cache.Do(c, func(ctx context.Context) {
- s.historyDao.SetShadowCache(ctx, mid, value)
- if ok && value == model.ShadowOn {
- s.historyDao.SetInfoShadow(ctx, mid, value)
- }
- })
- }
- return
- }
- // FlushHistory flush to hbase from cache.
- func (s *Service) FlushHistory(c context.Context, mids []int64, stime int64) (err error) {
- var (
- aids, miss []int64
- res map[int64]*model.History
- limit = s.conf.History.Cache
- )
- for _, mid := range mids {
- if aids, err = s.historyDao.IndexCacheByTime(c, mid, stime); err != nil {
- log.Error("s.historyDao.IndexCacheByTime(%d,%v) error(%v)", mid, stime, err)
- err = nil
- continue
- }
- if len(aids) == 0 {
- continue
- }
- if res, miss, err = s.historyDao.Cache(c, mid, aids); err != nil {
- log.Error("historyDao.Cache(%d,%v) miss:%v error(%v)", mid, aids, miss, err)
- err = nil
- continue
- }
- // * typ < model.TypeArticle all can .
- if err = s.historyDao.AddMap(c, mid, res); err != nil {
- log.Error("historyDao.AddMap(%d,%+v) error(%v)", mid, res, err)
- err = nil
- }
- if err = s.historyDao.TrimCache(c, mid, limit); err != nil {
- log.Error("historyDao.TrimCache(%d,%d) error(%v)", mid, limit, err)
- err = nil
- }
- }
- return
- }
- func (s *Service) merge(hmap map[int64]int64) {
- var (
- size = int64(s.conf.History.Size)
- merges = make(map[int64][]*model.Merge, size)
- )
- for k, v := range hmap {
- merges[k%size] = append(merges[k%size], &model.Merge{Mid: k, Now: v})
- }
- for k, v := range merges {
- s.historyDao.Merge(context.TODO(), int64(k), v)
- }
- }
- func (s *Service) bangumi(c context.Context, mid int64, epids []int64) (bangumiMap map[int64]*model.Bangumi) {
- var n = 50
- bangumiMap = make(map[int64]*model.Bangumi, len(epids))
- for len(epids) > 0 {
- if n > len(epids) {
- n = len(epids)
- }
- epban, _ := s.historyDao.Bangumis(c, mid, epids[:n])
- epids = epids[n:]
- for k, v := range epban {
- bangumiMap[k] = v
- }
- }
- return
- }
- // ManagerHistory ManagerHistory.
- // +wd:ignore
- func (s *Service) ManagerHistory(c context.Context, onlyAV bool, mid int64) (his []*model.History, err error) {
- var (
- mhis map[string]*model.History
- chis, ehis map[int64]*model.History
- )
- if mhis, err = s.historyDao.Map(c, mid); err != nil {
- err = nil
- mhis = make(map[string]*model.History)
- }
- chis, _ = s.historyDao.CacheMap(c, mid)
- for _, v := range chis {
- mhis[s.key(v.Aid, v.TP)] = v
- }
- if len(mhis) == 0 {
- his = _emptyHis
- return
- }
- ehis = make(map[int64]*model.History, len(mhis))
- for _, h := range mhis {
- if onlyAV && h.TP >= model.TypeArticle {
- continue
- }
- if (h.TP == model.TypeBangumi || h.TP == model.TypeMovie || h.TP == model.TypePGC) && h.Sid != 0 {
- if e, ok := ehis[h.Sid]; !ok || h.Unix > e.Unix {
- ehis[h.Sid] = h
- }
- } else {
- his = append(his, h)
- }
- }
- for _, h := range ehis {
- his = append(his, h)
- }
- sort.Sort(model.Histories(his))
- return
- }
|