123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- package dao
- import (
- "context"
- "fmt"
- "sync"
- "go-common/app/service/main/thumbup/model"
- "go-common/library/cache/memcache"
- "go-common/library/log"
- "go-common/library/xstr"
- "go-common/library/sync/errgroup"
- )
- const (
- _bulkSize = 100
- )
- func statsKey(businessID, messageID int64) string {
- return fmt.Sprintf("m_%d_b_%d", messageID, businessID)
- }
- func recoverStatsValue(c context.Context, s string) (res *model.Stats) {
- var (
- vs []int64
- err error
- )
- res = new(model.Stats)
- if s == "" {
- return
- }
- if vs, err = xstr.SplitInts(s); err != nil || len(vs) < 2 {
- PromError("mc:stats解析")
- log.Error("dao.recoverStatsValue(%s) err: %v", s, err)
- return
- }
- res = &model.Stats{Likes: vs[0], Dislikes: vs[1]}
- return
- }
- // AddStatsCache .
- func (d *Dao) AddStatsCache(c context.Context, businessID int64, vs ...*model.Stats) (err error) {
- if len(vs) == 0 {
- return
- }
- conn := d.mc.Get(c)
- defer conn.Close()
- for _, v := range vs {
- if v == nil {
- continue
- }
- key := statsKey(businessID, v.ID)
- bs := xstr.JoinInts([]int64{v.Likes, v.Dislikes})
- item := memcache.Item{Key: key, Value: []byte(bs), Expiration: d.mcStatsExpire}
- if err = conn.Set(&item); err != nil {
- PromError("mc:增加计数缓存")
- log.Error("conn.Set(%s) error(%v)", key, err)
- return
- }
- }
- return
- }
- // DelStatsCache del stats cache
- func (d *Dao) DelStatsCache(c context.Context, businessID int64, messageID int64) (err error) {
- conn := d.mc.Get(c)
- defer conn.Close()
- key := statsKey(businessID, messageID)
- if err = conn.Delete(key); err != nil {
- if err == memcache.ErrNotFound {
- err = nil
- return
- }
- PromError("mc:DelStatsCache")
- log.Error("d.DelStatsCache(%s) error(%+v)", key, err)
- }
- return
- }
- // AddStatsCacheMap .
- func (d *Dao) AddStatsCacheMap(c context.Context, businessID int64, stats map[int64]*model.Stats) (err error) {
- var s []*model.Stats
- for _, v := range stats {
- s = append(s, v)
- }
- return d.AddStatsCache(c, businessID, s...)
- }
- // StatsCache .
- func (d *Dao) StatsCache(c context.Context, businessID int64, messageIDs []int64) (cached map[int64]*model.Stats, missed []int64, err error) {
- if len(messageIDs) == 0 {
- return
- }
- cached = make(map[int64]*model.Stats, len(messageIDs))
- allKeys := make([]string, 0, len(messageIDs))
- midmap := make(map[string]int64, len(messageIDs))
- for _, id := range messageIDs {
- k := statsKey(businessID, id)
- allKeys = append(allKeys, k)
- midmap[k] = id
- }
- group, errCtx := errgroup.WithContext(c)
- mutex := sync.Mutex{}
- keysLen := len(allKeys)
- for i := 0; i < keysLen; i += _bulkSize {
- var keys []string
- if (i + _bulkSize) > keysLen {
- keys = allKeys[i:]
- } else {
- keys = allKeys[i : i+_bulkSize]
- }
- group.Go(func() (err error) {
- conn := d.mc.Get(errCtx)
- replys, err := conn.GetMulti(keys)
- defer conn.Close()
- if err != nil {
- PromError("mc:获取计数缓存")
- log.Error("conn.Gets(%v) error(%v)", keys, err)
- err = nil
- return
- }
- for _, reply := range replys {
- var s string
- if err = conn.Scan(reply, &s); err != nil {
- PromError("获取计数缓存json解析")
- log.Error("json.Unmarshal(%v) error(%v)", reply.Value, err)
- err = nil
- continue
- }
- stat := recoverStatsValue(c, s)
- stat.ID = midmap[reply.Key]
- mutex.Lock()
- cached[midmap[reply.Key]] = stat
- delete(midmap, reply.Key)
- mutex.Unlock()
- }
- return
- })
- }
- group.Wait()
- missed = make([]int64, 0, len(midmap))
- for _, aid := range midmap {
- missed = append(missed, aid)
- }
- return
- }
|