memcache.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package dao
  2. import (
  3. "context"
  4. "strconv"
  5. "sync"
  6. "go-common/app/service/main/archive/api"
  7. feedmdl "go-common/app/service/main/feed/model"
  8. "go-common/library/cache/memcache"
  9. "go-common/library/sync/errgroup"
  10. )
  11. const (
  12. _prefixArc = "ap_"
  13. _prefixBangumi = "bp_"
  14. _bulkSize = 100
  15. )
  16. func arcKey(aid int64) string {
  17. return _prefixArc + strconv.FormatInt(aid, 10)
  18. }
  19. func bangumiKey(bid int64) string {
  20. return _prefixBangumi + strconv.FormatInt(bid, 10)
  21. }
  22. // pingMc ping memcache
  23. func (d *Dao) pingMC(c context.Context) (err error) {
  24. conn := d.mc.Get(c)
  25. item := memcache.Item{Key: "ping", Value: []byte{1}, Expiration: d.mcExpire}
  26. err = conn.Set(&item)
  27. conn.Close()
  28. return
  29. }
  30. // AddArchivesCache batch set archives cache.
  31. func (d *Dao) AddArchivesCache(c context.Context, vs ...*api.Arc) (err error) {
  32. conn := d.mc.Get(c)
  33. defer conn.Close()
  34. for _, v := range vs {
  35. if v == nil {
  36. continue
  37. }
  38. item := memcache.Item{Key: arcKey(v.Aid), Object: v, Flags: memcache.FlagProtobuf, Expiration: d.mcExpire}
  39. if err = conn.Set(&item); err != nil {
  40. PromError("mc:增加稿件缓存", "conn.Store(%s) error(%v)", arcKey(v.Aid), err)
  41. return
  42. }
  43. }
  44. return
  45. }
  46. // AddArchivesCacheMap batch set archives cache.
  47. func (d *Dao) AddArchivesCacheMap(c context.Context, arcm map[int64]*api.Arc) (err error) {
  48. var arcs []*api.Arc
  49. for _, arc := range arcm {
  50. arcs = append(arcs, arc)
  51. }
  52. return d.AddArchivesCache(c, arcs...)
  53. }
  54. // ArchivesCache batch get archive from cache.
  55. func (d *Dao) ArchivesCache(c context.Context, aids []int64) (cached map[int64]*api.Arc, missed []int64, err error) {
  56. if len(aids) == 0 {
  57. return
  58. }
  59. cached = make(map[int64]*api.Arc, len(aids))
  60. allKeys := make([]string, 0, len(aids))
  61. aidmap := make(map[string]int64, len(aids))
  62. for _, aid := range aids {
  63. k := arcKey(aid)
  64. allKeys = append(allKeys, k)
  65. aidmap[k] = aid
  66. }
  67. group, errCtx := errgroup.WithContext(c)
  68. mutex := sync.Mutex{}
  69. keysLen := len(allKeys)
  70. for i := 0; i < keysLen; i += _bulkSize {
  71. var keys []string
  72. if (i + _bulkSize) > keysLen {
  73. keys = allKeys[i:]
  74. } else {
  75. keys = allKeys[i : i+_bulkSize]
  76. }
  77. group.Go(func() (err error) {
  78. conn := d.mc.Get(errCtx)
  79. replys, err := conn.GetMulti(keys)
  80. defer conn.Close()
  81. if err != nil {
  82. PromError("mc:获取稿件缓存", "conn.Gets(%v) error(%v)", keys, err)
  83. err = nil
  84. return
  85. }
  86. for _, reply := range replys {
  87. arc := &api.Arc{}
  88. if err = conn.Scan(reply, arc); err != nil {
  89. PromError("获取稿件缓存json解析", "json.Unmarshal(%v) error(%v)", reply.Value, err)
  90. err = nil
  91. continue
  92. }
  93. mutex.Lock()
  94. cached[aidmap[reply.Key]] = arc
  95. delete(aidmap, reply.Key)
  96. mutex.Unlock()
  97. }
  98. return
  99. })
  100. }
  101. group.Wait()
  102. missed = make([]int64, 0, len(aidmap))
  103. for _, aid := range aidmap {
  104. missed = append(missed, aid)
  105. }
  106. MissedCount.Add("archive", int64(len(missed)))
  107. CachedCount.Add("archive", int64(len(cached)))
  108. return
  109. }
  110. // DelArchiveCache delete archive cache.
  111. func (d *Dao) DelArchiveCache(c context.Context, aid int64) (err error) {
  112. conn := d.mc.Get(c)
  113. defer conn.Close()
  114. if err = conn.Delete(arcKey(aid)); err != nil {
  115. if err == memcache.ErrNotFound {
  116. err = nil
  117. } else {
  118. PromError("mc:删除稿件缓存", "conn.Delete(%s) error(%v)", arcKey(aid), err)
  119. return
  120. }
  121. }
  122. return
  123. }
  124. // AddBangumisCacheMap batch set bangumis cache.
  125. func (d *Dao) AddBangumisCacheMap(c context.Context, bm map[int64]*feedmdl.Bangumi) (err error) {
  126. var bs []*feedmdl.Bangumi
  127. for _, b := range bm {
  128. bs = append(bs, b)
  129. }
  130. return d.AddBangumisCache(c, bs...)
  131. }
  132. // AddBangumisCache add batch set bangumi cache.
  133. func (d *Dao) AddBangumisCache(c context.Context, bs ...*feedmdl.Bangumi) (err error) {
  134. conn := d.mc.Get(c)
  135. defer conn.Close()
  136. for _, b := range bs {
  137. if b == nil {
  138. continue
  139. }
  140. item := memcache.Item{Key: bangumiKey(b.SeasonID), Object: b, Flags: memcache.FlagProtobuf, Expiration: d.bangumiExpire}
  141. if err = conn.Set(&item); err != nil {
  142. PromError("mc:增加番剧缓存", "conn.Store(%s) error(%v)", bangumiKey(b.SeasonID), err)
  143. return
  144. }
  145. }
  146. return
  147. }
  148. // BangumisCache batch get archive from cache.
  149. func (d *Dao) BangumisCache(c context.Context, bids []int64) (cached map[int64]*feedmdl.Bangumi, missed []int64, err error) {
  150. cached = make(map[int64]*feedmdl.Bangumi, len(bids))
  151. if len(bids) == 0 {
  152. return
  153. }
  154. keys := make([]string, 0, len(bids))
  155. bidmap := make(map[string]int64, len(bids))
  156. for _, bid := range bids {
  157. k := bangumiKey(bid)
  158. keys = append(keys, k)
  159. bidmap[k] = bid
  160. }
  161. conn := d.mc.Get(c)
  162. defer conn.Close()
  163. replys, err := conn.GetMulti(keys)
  164. if err != nil {
  165. PromError("mc:获取番剧", "conn.Gets(%v) error(%v)", keys, err)
  166. return
  167. }
  168. for _, reply := range replys {
  169. b := &feedmdl.Bangumi{}
  170. if err = conn.Scan(reply, b); err != nil {
  171. PromError("获取番剧json解析", "json.Unmarshal(%v) error(%v)", reply.Value, err)
  172. return
  173. }
  174. cached[bidmap[reply.Key]] = b
  175. delete(bidmap, reply.Key)
  176. }
  177. missed = make([]int64, 0, len(bidmap))
  178. for _, bid := range bidmap {
  179. missed = append(missed, bid)
  180. }
  181. MissedCount.Add("bangumi", int64(len(missed)))
  182. CachedCount.Add("bangumi", int64(len(cached)))
  183. return
  184. }