memcache.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync"
  7. "go-common/app/service/main/push/model"
  8. "go-common/library/cache/memcache"
  9. "go-common/library/log"
  10. "go-common/library/sync/errgroup"
  11. )
  12. const (
  13. _prefixReport = "r_%d"
  14. _prefixToken = "t_%s"
  15. _bulkSize = 20
  16. )
  17. func reportKey(mid int64) string {
  18. return fmt.Sprintf(_prefixReport, mid)
  19. }
  20. func tokenKey(token string) string {
  21. return fmt.Sprintf(_prefixToken, token)
  22. }
  23. // pingMc ping memcache
  24. func (d *Dao) pingMC(c context.Context) (err error) {
  25. conn := d.mc.Get(c)
  26. defer conn.Close()
  27. item := memcache.Item{Key: "ping", Value: []byte{1}, Expiration: d.mcReportExpire}
  28. err = conn.Set(&item)
  29. return
  30. }
  31. // TokensCache get reports cache by tokens
  32. func (d *Dao) TokensCache(ctx context.Context, tokens []string) (res map[string]*model.Report, missed []string, err error) {
  33. res = make(map[string]*model.Report)
  34. if len(tokens) == 0 {
  35. return
  36. }
  37. var (
  38. mutex sync.Mutex
  39. allKeys []string
  40. tokenMap = make(map[string]string, len(tokens))
  41. )
  42. for _, t := range tokens {
  43. k := tokenKey(t)
  44. allKeys = append(allKeys, k)
  45. tokenMap[k] = t
  46. }
  47. keysLen := len(allKeys)
  48. group, errCtx := errgroup.WithContext(ctx)
  49. for i := 0; i < keysLen; i += _bulkSize {
  50. var keys []string
  51. if (i + _bulkSize) > keysLen {
  52. keys = allKeys[i:]
  53. } else {
  54. keys = allKeys[i : i+_bulkSize]
  55. }
  56. group.Go(func() (err error) {
  57. conn := d.mc.Get(errCtx)
  58. defer conn.Close()
  59. replys, err := conn.GetMulti(keys)
  60. if err != nil {
  61. PromError("mc:TokensCache GetMulti")
  62. log.Error("conn.Gets(%v) error(%+v)", keys, err)
  63. err = nil
  64. return
  65. }
  66. for key, item := range replys {
  67. r := &model.Report{}
  68. if err = conn.Scan(item, &r); err != nil {
  69. PromError("mc:TokensCache Scan")
  70. log.Error("item.Scan(%s) error(%+v)", item.Value, err)
  71. err = nil
  72. continue
  73. }
  74. mutex.Lock()
  75. res[tokenMap[key]] = r
  76. delete(tokenMap, key)
  77. mutex.Unlock()
  78. }
  79. return
  80. })
  81. }
  82. group.Wait()
  83. for _, t := range tokenMap {
  84. missed = append(missed, t)
  85. }
  86. return
  87. }
  88. // ReportsCacheByMid gets reports cache by mid.
  89. func (d *Dao) ReportsCacheByMid(c context.Context, mid int64) (res []*model.Report, err error) {
  90. key := reportKey(mid)
  91. conn := d.mc.Get(c)
  92. defer conn.Close()
  93. reply, err := conn.Get(key)
  94. if err != nil {
  95. if err == memcache.ErrNotFound {
  96. res = nil
  97. err = nil
  98. missedCount.Add("report", 1)
  99. return
  100. }
  101. PromError("mc:获取上报")
  102. log.Error("conn.Get(%v) error(%v)", key, err)
  103. return
  104. }
  105. rs := make(map[int64]map[string]*model.Report)
  106. if err = conn.Scan(reply, &rs); err != nil {
  107. PromError("mc:解析上报")
  108. log.Error("item.Scan(%s) error(%v)", reply.Value, err)
  109. return
  110. }
  111. for _, v := range rs {
  112. for _, r := range v {
  113. res = append(res, r)
  114. }
  115. }
  116. cachedCount.Add("report", 1)
  117. return
  118. }
  119. // ReportsCacheByMids get report cache by mids.
  120. func (d *Dao) ReportsCacheByMids(c context.Context, mids []int64) (res map[int64][]*model.Report, missed []int64, err error) {
  121. res = make(map[int64][]*model.Report, len(mids))
  122. if len(mids) == 0 {
  123. return
  124. }
  125. allKeys := make([]string, 0, len(mids))
  126. midmap := make(map[string]int64, len(mids))
  127. for _, mid := range mids {
  128. k := reportKey(mid)
  129. allKeys = append(allKeys, k)
  130. midmap[k] = mid
  131. }
  132. group := errgroup.Group{}
  133. mutex := sync.Mutex{}
  134. keysLen := len(allKeys)
  135. for i := 0; i < keysLen; i += _bulkSize {
  136. var keys []string
  137. if (i + _bulkSize) > keysLen {
  138. keys = allKeys[i:]
  139. } else {
  140. keys = allKeys[i : i+_bulkSize]
  141. }
  142. group.Go(func() error {
  143. conn := d.mc.Get(context.TODO())
  144. defer conn.Close()
  145. replys, err := conn.GetMulti(keys)
  146. if err != nil {
  147. PromError("mc:获取上报")
  148. log.Error("conn.Gets(%v) error(%v)", keys, err)
  149. return nil
  150. }
  151. for k, item := range replys {
  152. rm := make(map[int64]map[string]*model.Report)
  153. if err = conn.Scan(item, &rm); err != nil {
  154. PromError("mc:解析上报")
  155. log.Error("item.Scan(%s) error(%v)", item.Value, err)
  156. continue
  157. }
  158. mutex.Lock()
  159. mid := midmap[k]
  160. for _, v := range rm {
  161. for _, r := range v {
  162. res[mid] = append(res[mid], r)
  163. }
  164. }
  165. delete(midmap, k)
  166. mutex.Unlock()
  167. }
  168. return nil
  169. })
  170. }
  171. group.Wait()
  172. missed = make([]int64, 0, len(midmap))
  173. for _, mid := range midmap {
  174. missed = append(missed, mid)
  175. }
  176. missedCount.Add("report", int64(len(missed)))
  177. cachedCount.Add("report", int64(len(res)))
  178. return
  179. }
  180. // AddReportsCacheByMids add report cache by mids.
  181. func (d *Dao) AddReportsCacheByMids(c context.Context, mrs map[int64][]*model.Report) (err error) {
  182. var (
  183. bs []byte
  184. urs map[int64]map[string]*model.Report
  185. conn = d.mc.Get(c)
  186. )
  187. defer conn.Close()
  188. for mid, rs := range mrs {
  189. if urs, err = formatReport(rs); err != nil {
  190. log.Error("d.AddReportsCacheByMids() formatReport() data(%v) error(%v)", rs, err)
  191. continue
  192. }
  193. if bs, err = json.Marshal(urs); err != nil {
  194. PromError("增加上报缓存json解析")
  195. log.Error("json.Marshal(%v) error(%v)", mrs, err)
  196. continue
  197. }
  198. k := reportKey(mid)
  199. item := &memcache.Item{Key: k, Value: bs, Expiration: d.mcReportExpire}
  200. if err = conn.Set(item); err != nil {
  201. PromError("mc:批量添加上报")
  202. log.Error("conn.Store(%s) error(%v)", k, err)
  203. return
  204. }
  205. }
  206. return
  207. }
  208. func formatReport(rs []*model.Report) (mrs map[int64]map[string]*model.Report, err error) {
  209. mrs = make(map[int64]map[string]*model.Report)
  210. for _, r := range rs {
  211. if _, ok := mrs[r.APPID]; !ok {
  212. mrs[r.APPID] = make(map[string]*model.Report)
  213. }
  214. mrs[r.APPID][r.DeviceToken] = r
  215. }
  216. return
  217. }
  218. // // AddReportCache add report cache.
  219. // func (d *Dao) AddReportCache(c context.Context, r *model.Report) (err error) {
  220. // conn := d.mc.Get(c)
  221. // defer conn.Close()
  222. // k := reportKey(r.Mid)
  223. // reply, err := conn.Get(k)
  224. // if err != nil {
  225. // return
  226. // }
  227. // rs := make(map[int64]map[string]*model.Report)
  228. // if err = conn.Scan(reply, &rs); err != nil {
  229. // PromError("mc:解析上报")
  230. // log.Error("reply.Scan(%s) error(%v)", reply.Value, err)
  231. // return
  232. // }
  233. // if _, ok := rs[r.APPID]; !ok {
  234. // rs[r.APPID] = make(map[string]*model.Report)
  235. // }
  236. // rs[r.APPID][r.DeviceToken] = r
  237. // var bs []byte
  238. // if bs, err = json.Marshal(rs); err != nil {
  239. // PromError("增加上报缓存json解析")
  240. // log.Error("json.Marshal(%v) error(%v)", rs, err)
  241. // return
  242. // }
  243. // item := &memcache.Item{Key: k, Value: bs, Expiration: d.mcReportExpire}
  244. // if err = conn.Set(item); err != nil {
  245. // PromError("mc:添加上报")
  246. // log.Error("conn.Store(%s) error(%v)", k, err)
  247. // return
  248. // }
  249. // PromInfo("mc:新增上报缓存")
  250. // return
  251. // }
  252. // DelReportCache delete report cache.
  253. func (d *Dao) DelReportCache(c context.Context, mid int64, appID int64, deviceToken string) (err error) {
  254. conn := d.mc.Get(c)
  255. defer conn.Close()
  256. k := reportKey(mid)
  257. reply, err := conn.Get(k)
  258. if err != nil {
  259. if err == memcache.ErrNotFound {
  260. missedCount.Incr("report")
  261. err = nil
  262. return
  263. }
  264. PromError("mc:删除上报")
  265. log.Error("conn.Get(%v) error(%v)", k, err)
  266. return
  267. }
  268. rs := make(map[int64]map[string]*model.Report)
  269. if err = conn.Scan(reply, &rs); err != nil {
  270. PromError("mc:解析上报")
  271. log.Error("reply.Scan(%s) error(%v)", reply.Value, err)
  272. return
  273. }
  274. if _, ok := rs[appID]; !ok {
  275. return
  276. }
  277. if rs[appID][deviceToken] == nil {
  278. return
  279. }
  280. delete(rs[appID], deviceToken)
  281. var bs []byte
  282. if bs, err = json.Marshal(rs); err != nil {
  283. PromError("删除上报缓存 marshal")
  284. log.Error("json.Marshal(%v) error(%v)", rs, err)
  285. return
  286. }
  287. item := &memcache.Item{Key: k, Value: bs, Expiration: d.mcReportExpire}
  288. if err = conn.Set(item); err != nil {
  289. PromError("mc:删除上报缓存")
  290. log.Error("conn.Store(%s) error(%v)", k, err)
  291. return
  292. }
  293. PromInfo("mc:删除上报缓存")
  294. return
  295. }