memcached.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "go-common/app/service/main/thumbup/model"
  7. "go-common/library/cache/memcache"
  8. "go-common/library/log"
  9. "go-common/library/xstr"
  10. "go-common/library/sync/errgroup"
  11. )
  12. const (
  13. _bulkSize = 100
  14. )
  15. func statsKey(businessID, messageID int64) string {
  16. return fmt.Sprintf("m_%d_b_%d", messageID, businessID)
  17. }
  18. func recoverStatsValue(c context.Context, s string) (res *model.Stats) {
  19. var (
  20. vs []int64
  21. err error
  22. )
  23. res = new(model.Stats)
  24. if s == "" {
  25. return
  26. }
  27. if vs, err = xstr.SplitInts(s); err != nil || len(vs) < 2 {
  28. PromError("mc:stats解析")
  29. log.Error("dao.recoverStatsValue(%s) err: %v", s, err)
  30. return
  31. }
  32. res = &model.Stats{Likes: vs[0], Dislikes: vs[1]}
  33. return
  34. }
  35. // AddStatsCache .
  36. func (d *Dao) AddStatsCache(c context.Context, businessID int64, vs ...*model.Stats) (err error) {
  37. if len(vs) == 0 {
  38. return
  39. }
  40. conn := d.mc.Get(c)
  41. defer conn.Close()
  42. for _, v := range vs {
  43. if v == nil {
  44. continue
  45. }
  46. key := statsKey(businessID, v.ID)
  47. bs := xstr.JoinInts([]int64{v.Likes, v.Dislikes})
  48. item := memcache.Item{Key: key, Value: []byte(bs), Expiration: d.mcStatsExpire}
  49. if err = conn.Set(&item); err != nil {
  50. PromError("mc:增加计数缓存")
  51. log.Error("conn.Set(%s) error(%v)", key, err)
  52. return
  53. }
  54. }
  55. return
  56. }
  57. // DelStatsCache del stats cache
  58. func (d *Dao) DelStatsCache(c context.Context, businessID int64, messageID int64) (err error) {
  59. conn := d.mc.Get(c)
  60. defer conn.Close()
  61. key := statsKey(businessID, messageID)
  62. if err = conn.Delete(key); err != nil {
  63. if err == memcache.ErrNotFound {
  64. err = nil
  65. return
  66. }
  67. PromError("mc:DelStatsCache")
  68. log.Error("d.DelStatsCache(%s) error(%+v)", key, err)
  69. }
  70. return
  71. }
  72. // AddStatsCacheMap .
  73. func (d *Dao) AddStatsCacheMap(c context.Context, businessID int64, stats map[int64]*model.Stats) (err error) {
  74. var s []*model.Stats
  75. for _, v := range stats {
  76. s = append(s, v)
  77. }
  78. return d.AddStatsCache(c, businessID, s...)
  79. }
  80. // StatsCache .
  81. func (d *Dao) StatsCache(c context.Context, businessID int64, messageIDs []int64) (cached map[int64]*model.Stats, missed []int64, err error) {
  82. if len(messageIDs) == 0 {
  83. return
  84. }
  85. cached = make(map[int64]*model.Stats, len(messageIDs))
  86. allKeys := make([]string, 0, len(messageIDs))
  87. midmap := make(map[string]int64, len(messageIDs))
  88. for _, id := range messageIDs {
  89. k := statsKey(businessID, id)
  90. allKeys = append(allKeys, k)
  91. midmap[k] = id
  92. }
  93. group, errCtx := errgroup.WithContext(c)
  94. mutex := sync.Mutex{}
  95. keysLen := len(allKeys)
  96. for i := 0; i < keysLen; i += _bulkSize {
  97. var keys []string
  98. if (i + _bulkSize) > keysLen {
  99. keys = allKeys[i:]
  100. } else {
  101. keys = allKeys[i : i+_bulkSize]
  102. }
  103. group.Go(func() (err error) {
  104. conn := d.mc.Get(errCtx)
  105. replys, err := conn.GetMulti(keys)
  106. defer conn.Close()
  107. if err != nil {
  108. PromError("mc:获取计数缓存")
  109. log.Error("conn.Gets(%v) error(%v)", keys, err)
  110. err = nil
  111. return
  112. }
  113. for _, reply := range replys {
  114. var s string
  115. if err = conn.Scan(reply, &s); err != nil {
  116. PromError("获取计数缓存json解析")
  117. log.Error("json.Unmarshal(%v) error(%v)", reply.Value, err)
  118. err = nil
  119. continue
  120. }
  121. stat := recoverStatsValue(c, s)
  122. stat.ID = midmap[reply.Key]
  123. mutex.Lock()
  124. cached[midmap[reply.Key]] = stat
  125. delete(midmap, reply.Key)
  126. mutex.Unlock()
  127. }
  128. return
  129. })
  130. }
  131. group.Wait()
  132. missed = make([]int64, 0, len(midmap))
  133. for _, aid := range midmap {
  134. missed = append(missed, aid)
  135. }
  136. return
  137. }