redis.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. package pendant
  2. import (
  3. "context"
  4. "strconv"
  5. "encoding/json"
  6. "go-common/app/service/main/usersuit/model"
  7. "go-common/library/cache/redis"
  8. "go-common/library/log"
  9. "github.com/pkg/errors"
  10. )
  11. const (
  12. _pendantPKG = "pkg_" // key of
  13. _pendantEquip = "pe_"
  14. )
  15. func keyEquip(mid int64) string {
  16. return _pendantEquip + strconv.FormatInt(mid, 10)
  17. }
  18. // encode
  19. func (d *Dao) encode(mid, pid, expires, tp int64, status, isVIP int32, pendant *model.Pendant) (res []byte, err error) {
  20. ft := &model.PendantPackage{Mid: mid, Pid: pid, Expires: expires, Type: tp, Status: status, IsVIP: isVIP, Pendant: pendant}
  21. return json.Marshal(ft)
  22. }
  23. // decode
  24. func (d *Dao) decode(src []byte, v *model.PendantPackage) (err error) {
  25. return json.Unmarshal(src, v)
  26. }
  27. // AddPKGCache set package cache.
  28. func (d *Dao) AddPKGCache(c context.Context, mid int64, info []*model.PendantPackage) (err error) {
  29. var (
  30. key = _pendantPKG + strconv.FormatInt(mid, 10)
  31. args = redis.Args{}.Add(key)
  32. )
  33. for i := 0; i < len(info); i++ {
  34. var ef []byte
  35. if ef, err = d.encode(info[i].Mid, info[i].Pid, info[i].Expires, info[i].Type, info[i].Status, info[i].IsVIP, info[i].Pendant); err != nil {
  36. return
  37. }
  38. args = args.Add(i, ef)
  39. }
  40. conn := d.redis.Get(c)
  41. defer conn.Close()
  42. if err = conn.Send("DEL", key); err != nil {
  43. log.Error("conn.Send(DEL, %s) error(%v)", key, err)
  44. return
  45. }
  46. if err = conn.Send("HMSET", args...); err != nil {
  47. log.Error("conn.Send(HMSET, %s) error(%v)", key, err)
  48. return
  49. }
  50. if err = conn.Send("EXPIRE", key, d.pendantExpire); err != nil {
  51. log.Error("conn.Send(EXPIRE, %s) error(%v)", key, err)
  52. return
  53. }
  54. if err = conn.Flush(); err != nil {
  55. log.Error("conn.Flush() error(%v)", err)
  56. return
  57. }
  58. for i := 0; i < 3; i++ {
  59. if _, err = conn.Receive(); err != nil {
  60. log.Error("conn.Receive() %d error(%v)", i+1, err)
  61. break
  62. }
  63. }
  64. return
  65. }
  66. // PKGCache get package cache.
  67. func (d *Dao) PKGCache(c context.Context, mid int64) (info []*model.PendantPackage, err error) {
  68. var (
  69. key = _pendantPKG + strconv.FormatInt(mid, 10)
  70. tmp = make(map[string]string, len(info))
  71. )
  72. conn := d.redis.Get(c)
  73. defer conn.Close()
  74. if tmp, err = redis.StringMap(conn.Do("HGETALL", key)); err != nil {
  75. return
  76. }
  77. if err == nil && len(tmp) > 0 {
  78. for i := 0; i < len(tmp); i++ {
  79. s := strconv.FormatInt(int64(i), 10)
  80. vf := &model.PendantPackage{}
  81. vf.Pendant = &model.Pendant{}
  82. if err = d.decode([]byte(tmp[s]), vf); err != nil {
  83. return
  84. }
  85. info = append(info, &model.PendantPackage{
  86. Mid: vf.Mid,
  87. Pid: vf.Pid,
  88. Expires: vf.Expires,
  89. Type: vf.Type,
  90. Status: vf.Status,
  91. IsVIP: vf.IsVIP,
  92. Pendant: vf.Pendant,
  93. })
  94. }
  95. }
  96. return
  97. }
  98. // DelPKGCache del package cache
  99. func (d *Dao) DelPKGCache(c context.Context, mid int64) (err error) {
  100. key := _pendantPKG + strconv.FormatInt(mid, 10)
  101. conn := d.redis.Get(c)
  102. defer conn.Close()
  103. if err = conn.Send("DEL", key); err != nil {
  104. log.Error("conn.Send(DEL, %s) error(%v)", key, err)
  105. return
  106. }
  107. return
  108. }
  109. // equipCache return pendant info cache
  110. func (d *Dao) equipCache(c context.Context, mid int64) (info *model.PendantEquip, err error) {
  111. var (
  112. item []byte
  113. conn = d.redis.Get(c)
  114. )
  115. defer conn.Close()
  116. if item, err = redis.Bytes(conn.Do("GET", keyEquip(mid))); err != nil {
  117. if err == redis.ErrNil {
  118. err = nil
  119. }
  120. return
  121. }
  122. if err = json.Unmarshal(item, &info); err != nil {
  123. log.Error("json.Unmarshal(%v) err(%v)", item, err)
  124. }
  125. return
  126. }
  127. // equipsCache obtain equips from redis .
  128. func (d *Dao) equipsCache(c context.Context, mids []int64) (map[int64]*model.PendantEquip, []int64, error) {
  129. var (
  130. err error
  131. bss [][]byte
  132. key string
  133. args = redis.Args{}
  134. conn = d.redis.Get(c)
  135. )
  136. for _, v := range mids {
  137. key = keyEquip(v)
  138. args = args.Add(key)
  139. }
  140. defer conn.Close()
  141. if bss, err = redis.ByteSlices(conn.Do("MGET", args...)); err != nil {
  142. if err == redis.ErrNil {
  143. return nil, nil, nil
  144. }
  145. log.Error("Failed mget equip: keys: %+v: %+v", args, err)
  146. return nil, nil, err
  147. }
  148. info := make(map[int64]*model.PendantEquip, len(mids))
  149. for _, bs := range bss {
  150. if bs == nil {
  151. continue
  152. }
  153. pe := &model.PendantEquip{}
  154. if err = json.Unmarshal(bs, pe); err != nil {
  155. log.Error("json.Unmarshal(%s) error(%v)", string(bs), err)
  156. err = nil
  157. continue
  158. }
  159. info[pe.Mid] = pe
  160. }
  161. missed := make([]int64, 0, len(mids))
  162. for _, mid := range mids {
  163. if _, ok := info[mid]; !ok {
  164. missed = append(missed, mid)
  165. }
  166. }
  167. return info, missed, nil
  168. }
  169. // AddEquipCache set pendant info cache
  170. func (d *Dao) AddEquipCache(c context.Context, mid int64, info *model.PendantEquip) (err error) {
  171. var (
  172. key = keyEquip(mid)
  173. values []byte
  174. conn = d.redis.Get(c)
  175. )
  176. defer conn.Close()
  177. if values, err = json.Marshal(info); err != nil {
  178. return
  179. }
  180. if err = conn.Send("SET", keyEquip(mid), values); err != nil {
  181. log.Error("conn.Send(SET, %s, %d) error(%v)", key, values, err)
  182. return
  183. }
  184. if err = conn.Send("EXPIRE", key, d.pendantExpire); err != nil {
  185. log.Error("conn.Send(Expire, %s, %d) error(%v)", key, d.pendantExpire, err)
  186. return
  187. }
  188. if err = conn.Flush(); err != nil {
  189. err = errors.Wrap(err, "conn.Send Flush")
  190. return
  191. }
  192. for i := 0; i < 2; i++ {
  193. if _, err = conn.Receive(); err != nil {
  194. err = errors.Wrap(err, "conn.Send conn.Receive()")
  195. return
  196. }
  197. }
  198. return
  199. }
  200. // AddEquipsCache mset equips info to caache .
  201. func (d *Dao) AddEquipsCache(c context.Context, equips map[int64]*model.PendantEquip) (err error) {
  202. var (
  203. bs []byte
  204. key string
  205. keys []string
  206. argsMid = redis.Args{}
  207. conn = d.redis.Get(c)
  208. )
  209. defer conn.Close()
  210. for _, v := range equips {
  211. if bs, err = json.Marshal(v); err != nil {
  212. log.Error("json.Marshal err(%v)", err)
  213. continue
  214. }
  215. key = keyEquip(v.Mid)
  216. keys = append(keys, key)
  217. argsMid = argsMid.Add(key).Add(string(bs))
  218. }
  219. if err = conn.Send("MSET", argsMid...); err != nil {
  220. err = errors.Wrap(err, "conn.Send(MSET) error")
  221. return
  222. }
  223. count := 1
  224. for _, v := range keys {
  225. count++
  226. if err = conn.Send("EXPIRE", v, d.pendantExpire); err != nil {
  227. err = errors.Wrap(err, "conn.Send error")
  228. return
  229. }
  230. }
  231. if err = conn.Flush(); err != nil {
  232. err = errors.Wrap(err, "conn.Send Flush")
  233. return
  234. }
  235. for i := 0; i < count; i++ {
  236. if _, err = conn.Receive(); err != nil {
  237. err = errors.Wrap(err, "conn.Send conn.Receive()")
  238. return
  239. }
  240. }
  241. return
  242. }
  243. // DelEquipCache set pendant info cache
  244. func (d *Dao) DelEquipCache(c context.Context, mid int64) (err error) {
  245. key := keyEquip(mid)
  246. conn := d.redis.Get(c)
  247. defer conn.Close()
  248. if _, err = conn.Do("DEL", key); err != nil {
  249. log.Error("conn.Do(DEL, %s) error(%v)", key, err)
  250. }
  251. return
  252. }
  253. // DelEquipsCache del batch equip cache .
  254. func (d *Dao) DelEquipsCache(c context.Context, mids []int64) (err error) {
  255. var (
  256. args = redis.Args{}
  257. conn = d.redis.Get(c)
  258. )
  259. defer conn.Close()
  260. for _, v := range mids {
  261. args = args.Add(keyEquip(v))
  262. }
  263. if _, err = conn.Do("DEL", args...); err != nil {
  264. log.Error("conn.Do(DEL, %s) error(%v)", args, err)
  265. }
  266. return
  267. }