123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- package service
- import (
- "context"
- "strconv"
- "time"
- "go-common/app/job/main/stat/model"
- "go-common/app/service/main/archive/api"
- "go-common/app/service/main/archive/model/archive"
- "go-common/library/cache/memcache"
- "go-common/library/log"
- )
- const (
- _prefixStatPB = "stp_"
- _prefixClickPB = "clkp_"
- )
- func statPBKey(aid int64) string {
- return _prefixStatPB + strconv.FormatInt(aid, 10)
- }
- func clickPBKey(aid int64) string {
- return _prefixClickPB + strconv.FormatInt(aid, 10)
- }
- func (s *Service) statDealproc(i int64) {
- defer s.waiter.Done()
- var (
- ch = s.subStatCh[i]
- sm = s.statSM[i]
- c = context.TODO()
- ls *lastTmStat
- err error
- )
- for {
- now := time.Now().Unix()
- ms, ok := <-ch
- if !ok {
- s.multiUpdateDB(i, sm)
- log.Warn("statDealproc(%d) quit", i)
- return
- }
- if s.maxAid > 0 && s.maxAid+300 < ms.Aid {
- log.Warn("aid(%d) too big maxAid(%d)", ms.Aid, s.maxAid)
- continue
- }
- // get stat
- if ls, ok = sm[ms.Aid]; !ok {
- var stat *api.Stat
- if stat, err = s.dao.Stat(c, ms.Aid); err != nil {
- log.Error("s.dao.Stat(%d) error(%v)", ms.Aid, err)
- continue
- }
- ls = &lastTmStat{}
- if stat == nil {
- ls.stat = &api.Stat{Aid: ms.Aid}
- ls.last = 0 // NOTE: make sure update db in first.
- } else {
- ls.stat = stat
- ls.last = time.Now().Unix()
- }
- sm[ms.Aid] = ls
- }
- model.Merge(ms, ls.stat)
- if now-ms.Ts < 60 {
- // update cache
- s.updateCache(ls.stat)
- }
- // update db when after 60s
- if time.Now().Unix()-ls.last > 120 {
- s.updateDB(ls.stat)
- delete(sm, ms.Aid) // NOTE: delete make sure the normal scope of memory and can be save all in 120s when close chan.
- }
- }
- }
- // updateDB update stat in db.
- func (s *Service) updateDB(stat *api.Stat) (err error) {
- if _, err := s.dao.Update(context.TODO(), stat); err != nil {
- log.Error("s.dao.Update(%v) error(%v)", stat, err)
- }
- log.Info("update db aid(%d) stat(%+v) success", stat.Aid, stat)
- return
- }
- // multiUpdateDB update some stat in db.
- func (s *Service) multiUpdateDB(yu int64, sm map[int64]*lastTmStat) (err error) {
- log.Info("start close(%d) multi update stat start", yu)
- var (
- c = context.TODO()
- alloc = [1000]*api.Stat{}
- stats = alloc[:0]
- i int
- )
- for aid, ls := range sm {
- stats = append(stats, ls.stat)
- if i > 0 && i%1000 == 0 {
- s.dao.MultiUpdate(c, yu, stats...)
- } else if i+1 == len(sm) {
- s.dao.MultiUpdate(c, yu, stats...)
- } else {
- log.Info("start close(%d) aid(%d) append", i, aid)
- continue
- }
- log.Info("start close(%d) multi update stat %d", i, aid)
- stats = alloc[:0]
- }
- log.Info("start close(%d) multi update stat endm", yu)
- return
- }
- // updateCache purge stat info in cache
- func (s *Service) updateCache(st *api.Stat) (err error) {
- var (
- stat3 = &api.Stat{
- Aid: st.Aid,
- View: int32(st.View),
- Danmaku: int32(st.Danmaku),
- Reply: int32(st.Reply),
- Fav: int32(st.Fav),
- Coin: int32(st.Coin),
- Share: int32(st.Share),
- NowRank: int32(st.NowRank),
- HisRank: int32(st.HisRank),
- Like: int32(st.Like),
- DisLike: 0,
- }
- click *archive.Click3
- upclick = true
- )
- if click, err = s.dao.Click(context.TODO(), st.Aid); err != nil {
- upclick = false
- }
- if click == nil {
- click = &archive.Click3{}
- }
- for _, mc := range s.memcaches {
- var c = context.TODO()
- conn := mc.Get(c)
- if err = conn.Set(&memcache.Item{Key: statPBKey(stat3.Aid), Object: stat3, Flags: memcache.FlagProtobuf, Expiration: 0}); err != nil {
- log.Error("conn1.Set(%s, %+v) error(%v)", statPBKey(stat3.Aid), stat3, err)
- }
- if upclick {
- if err = conn.Set(&memcache.Item{Key: clickPBKey(stat3.Aid), Object: click, Flags: memcache.FlagProtobuf, Expiration: 0}); err != nil {
- log.Error("conn1.Set(%s, %+v) error(%v)", clickPBKey(stat3.Aid), click, err)
- }
- }
- if err == nil {
- log.Info("update cache aid(%d) stat(%+v) success", st.Aid, stat3)
- log.Info("update cache aid(%d) click(%+v) success", st.Aid, click)
- }
- conn.Close()
- }
- return
- }
- // Purge purge arc's stat cache
- func (s *Service) Purge(c context.Context, aids []int64) (err error) {
- for _, aid := range aids {
- var stat *api.Stat
- if stat, err = s.dao.Stat(c, aid); err != nil {
- return
- }
- s.updateCache(stat)
- }
- return
- }
|