memcache.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. pushmdl "go-common/app/service/main/push/model"
  8. "go-common/library/cache/memcache"
  9. "go-common/library/log"
  10. "go-common/library/sync/errgroup"
  11. )
  12. const (
  13. _prefixReport = "r_%d"
  14. _bulkSize = 10
  15. )
  16. func reportKey(mid int64) string {
  17. return fmt.Sprintf(_prefixReport, mid)
  18. }
  19. // pingMc ping memcache
  20. func (d *Dao) pingMC(c context.Context) (err error) {
  21. conn := d.mc.Get(c)
  22. defer conn.Close()
  23. item := memcache.Item{Key: "ping", Value: []byte{1}, Expiration: int32(time.Now().Unix())}
  24. err = conn.Set(&item)
  25. return
  26. }
  27. // ReportsCacheByMids get report cache by mids.
  28. func (d *Dao) ReportsCacheByMids(c context.Context, mids []int64) (res map[int64][]*pushmdl.Report, missed []int64, err error) {
  29. res = make(map[int64][]*pushmdl.Report, len(mids))
  30. if len(mids) == 0 {
  31. return
  32. }
  33. allKeys := make([]string, 0, len(mids))
  34. midmap := make(map[string]int64, len(mids))
  35. for _, mid := range mids {
  36. k := reportKey(mid)
  37. allKeys = append(allKeys, k)
  38. midmap[k] = mid
  39. }
  40. group := errgroup.Group{}
  41. mutex := sync.Mutex{}
  42. keysLen := len(allKeys)
  43. for i := 0; i < keysLen; i += _bulkSize {
  44. var keys []string
  45. if (i + _bulkSize) > keysLen {
  46. keys = allKeys[i:]
  47. } else {
  48. keys = allKeys[i : i+_bulkSize]
  49. }
  50. group.Go(func() error {
  51. conn := d.mc.Get(context.TODO())
  52. defer conn.Close()
  53. replys, err := conn.GetMulti(keys)
  54. if err != nil {
  55. PromError("mc:获取上报")
  56. log.Error("conn.Gets(%v) error(%v)", keys, err)
  57. return nil
  58. }
  59. for k, item := range replys {
  60. rm := make(map[int64]map[string]*pushmdl.Report)
  61. if err = conn.Scan(item, &rm); err != nil {
  62. PromError("mc:解析上报")
  63. log.Error("item.Scan(%s) error(%v)", item.Value, err)
  64. continue
  65. }
  66. mutex.Lock()
  67. mid := midmap[k]
  68. for _, v := range rm {
  69. for _, r := range v {
  70. res[mid] = append(res[mid], r)
  71. }
  72. }
  73. delete(midmap, k)
  74. mutex.Unlock()
  75. }
  76. return nil
  77. })
  78. }
  79. group.Wait()
  80. missed = make([]int64, 0, len(midmap))
  81. for _, mid := range midmap {
  82. missed = append(missed, mid)
  83. }
  84. return
  85. }