stat_redis.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/interface/main/playlist/model"
  7. "go-common/library/cache/redis"
  8. "go-common/library/log"
  9. )
  10. const (
  11. _statKey = "st_%d"
  12. _plKey = "pl_%d"
  13. )
  14. func keyStat(mid int64) string {
  15. return fmt.Sprintf(_statKey, mid)
  16. }
  17. func keyPl(pid int64) string {
  18. return fmt.Sprintf(_plKey, pid)
  19. }
  20. // PlStatCache get stat from cache.
  21. func (d *Dao) PlStatCache(c context.Context, mid, pid int64) (stat *model.PlStat, err error) {
  22. var (
  23. bs []byte
  24. key = keyStat(mid)
  25. conn = d.redis.Get(c)
  26. )
  27. defer conn.Close()
  28. if bs, err = redis.Bytes(conn.Do("HGET", key, pid)); err != nil {
  29. if err == redis.ErrNil {
  30. err = nil
  31. stat = nil
  32. } else {
  33. log.Error("conn.Do(HGET,%s,%d) error(%v)", key, pid, err)
  34. }
  35. return
  36. }
  37. stat = new(model.PlStat)
  38. if err = json.Unmarshal(bs, stat); err != nil {
  39. log.Error("json.Unmarshal(%s) error(%v)", string(bs), err)
  40. }
  41. return
  42. }
  43. // SetPlStatCache set playlist stat to cache.
  44. func (d *Dao) SetPlStatCache(c context.Context, mid, pid int64, stat *model.PlStat) (err error) {
  45. var (
  46. bs []byte
  47. ok bool
  48. keyMid = keyStat(mid)
  49. keyPid = keyPl(pid)
  50. conn = d.redis.Get(c)
  51. )
  52. defer conn.Close()
  53. if ok, err = redis.Bool(conn.Do("EXPIRE", keyMid, d.statExpire)); err != nil {
  54. log.Error("conn.Do(EXPIRE %s) error(%v)", keyMid, err)
  55. return
  56. }
  57. if ok {
  58. if bs, err = json.Marshal(stat); err != nil {
  59. log.Error("json.Marshal() error(%v)", err)
  60. return
  61. }
  62. if err = conn.Send("HSET", keyMid, pid, bs); err != nil {
  63. log.Error("conn.Send(HSET,%s,%d) error(%v)", keyMid, pid, err)
  64. return
  65. }
  66. if err = conn.Send("EXPIRE", keyMid, d.statExpire); err != nil {
  67. log.Error("conn.Send(EXPIRE,%s) error(%v)", keyMid, err)
  68. return
  69. }
  70. if err = conn.Send("SET", keyPid, bs); err != nil {
  71. log.Error("conn.Send(SET,%s,%s) error(%v)", keyPid, string(bs), err)
  72. return
  73. }
  74. if err = conn.Send("EXPIRE", keyPid, d.plExpire); err != nil {
  75. log.Error("conn.Send(EXPIRE,%s) error(%v)", keyPid, err)
  76. return
  77. }
  78. if err = conn.Flush(); err != nil {
  79. log.Error("add conn.Flush error(%v)", err)
  80. return
  81. }
  82. for i := 0; i < 4; i++ {
  83. if _, err = conn.Receive(); err != nil {
  84. log.Error("add conn.Receive()%d error(%v)", i+1, err)
  85. return
  86. }
  87. }
  88. }
  89. return
  90. }
  91. // SetStatsCache set playlist stat list to cache.
  92. func (d *Dao) SetStatsCache(c context.Context, mid int64, plStats []*model.PlStat) (err error) {
  93. var (
  94. bs []byte
  95. keyPid string
  96. keyPids []string
  97. argsPid = redis.Args{}
  98. )
  99. keyMid := keyStat(mid)
  100. conn := d.redis.Get(c)
  101. defer conn.Close()
  102. if _, err = redis.Bool(conn.Do("EXPIRE", keyMid, d.statExpire)); err != nil {
  103. log.Error("conn.Do(EXPIRE %s) error(%v)", keyMid, err)
  104. return
  105. }
  106. argsMid := redis.Args{}.Add(keyMid)
  107. for _, v := range plStats {
  108. if bs, err = json.Marshal(v); err != nil {
  109. log.Error("json.Marshal err(%v)", err)
  110. continue
  111. }
  112. argsMid = argsMid.Add(v.ID).Add(string(bs))
  113. keyPid = keyPl(v.ID)
  114. keyPids = append(keyPids, keyPid)
  115. argsPid = argsPid.Add(keyPid).Add(string(bs))
  116. }
  117. if err = conn.Send("HMSET", argsMid...); err != nil {
  118. log.Error("conn.Send(HMSET, %s) error(%v)", keyMid, err)
  119. return
  120. }
  121. if err = conn.Send("EXPIRE", keyMid, d.statExpire); err != nil {
  122. log.Error("conn.Send(Expire, %s, %d) error(%v)", keyMid, d.statExpire, err)
  123. return
  124. }
  125. if err = conn.Send("MSET", argsPid...); err != nil {
  126. log.Error("conn.Send(MSET) error(%v)", err)
  127. return
  128. }
  129. count := 3
  130. for _, v := range keyPids {
  131. count++
  132. if err = conn.Send("EXPIRE", v, d.plExpire); err != nil {
  133. log.Error("conn.Send(Expire, %s, %d) error(%v)", v, d.plExpire, err)
  134. return
  135. }
  136. }
  137. if err = conn.Flush(); err != nil {
  138. log.Error("conn.Flush error(%v)", err)
  139. return
  140. }
  141. for i := 0; i < count; i++ {
  142. if _, err = conn.Receive(); err != nil {
  143. log.Error("conn.Receive() error(%v)", err)
  144. return
  145. }
  146. }
  147. return
  148. }
  149. // PlsCache get playlist by pids from cache.
  150. func (d *Dao) PlsCache(c context.Context, pids []int64) (res []*model.PlStat, err error) {
  151. var (
  152. key string
  153. args = redis.Args{}
  154. )
  155. for _, pid := range pids {
  156. key = keyPl(pid)
  157. args = args.Add(key)
  158. }
  159. conn := d.redis.Get(c)
  160. defer conn.Close()
  161. var (
  162. bss [][]byte
  163. )
  164. if bss, err = redis.ByteSlices(conn.Do("MGET", args...)); err != nil {
  165. if err == redis.ErrNil {
  166. err = nil
  167. } else {
  168. log.Error("PlsCache conn.Do(MGET,%s) error(%v)", key, err)
  169. }
  170. return
  171. }
  172. for _, bs := range bss {
  173. stat := new(model.PlStat)
  174. if bs == nil {
  175. continue
  176. }
  177. if err = json.Unmarshal(bs, stat); err != nil {
  178. log.Error("json.Unmarshal(%s) error(%v)", string(bs), err)
  179. err = nil
  180. continue
  181. }
  182. res = append(res, stat)
  183. }
  184. return
  185. }
  186. // SetPlCache set playlist to cache.
  187. func (d *Dao) SetPlCache(c context.Context, plStats []*model.PlStat) (err error) {
  188. var (
  189. bs []byte
  190. keyPid string
  191. keyPids []string
  192. argsPid = redis.Args{}
  193. )
  194. conn := d.redis.Get(c)
  195. defer conn.Close()
  196. for _, v := range plStats {
  197. if bs, err = json.Marshal(v); err != nil {
  198. log.Error("json.Marshal err(%v)", err)
  199. continue
  200. }
  201. keyPid = keyPl(v.ID)
  202. keyPids = append(keyPids, keyPid)
  203. argsPid = argsPid.Add(keyPid).Add(string(bs))
  204. }
  205. if err = conn.Send("MSET", argsPid...); err != nil {
  206. log.Error("conn.Send(MSET) error(%v)", err)
  207. return
  208. }
  209. count := 1
  210. for _, v := range keyPids {
  211. count++
  212. if err = conn.Send("EXPIRE", v, d.plExpire); err != nil {
  213. log.Error("conn.Send(Expire, %s, %d) error(%v)", v, d.plExpire, err)
  214. return
  215. }
  216. }
  217. if err = conn.Flush(); err != nil {
  218. log.Error("conn.Flush error(%v)", err)
  219. return
  220. }
  221. for i := 0; i < count; i++ {
  222. if _, err = conn.Receive(); err != nil {
  223. log.Error("conn.Receive() error(%v)", err)
  224. return
  225. }
  226. }
  227. return
  228. }
  229. // DelPlCache delete playlist from redis.
  230. func (d *Dao) DelPlCache(c context.Context, mid, pid int64) (err error) {
  231. var (
  232. key = keyPl(pid)
  233. plaKey = keyPlArc(pid)
  234. pladKey = keyPlArcDesc(pid)
  235. keyStat = keyStat(mid)
  236. conn = d.redis.Get(c)
  237. )
  238. defer conn.Close()
  239. if err = conn.Send("DEL", key); err != nil {
  240. log.Error("conn.Send(DEL %s) error(%v)", key, err)
  241. return
  242. }
  243. if err = conn.Send("DEL", plaKey); err != nil {
  244. log.Error("conn.Send(DEL %s) error(%v)", plaKey, err)
  245. return
  246. }
  247. if err = conn.Send("DEL", pladKey); err != nil {
  248. log.Error("conn.Send(DEL %s) error(%v)", pladKey, err)
  249. return
  250. }
  251. if err = conn.Send("HDEL", keyStat, pid); err != nil {
  252. log.Error("conn.Send(HDEL,%s,%d) error(%v)", keyStat, pid, err)
  253. }
  254. if err = conn.Flush(); err != nil {
  255. log.Error("conn.Flush() error(%v)", err)
  256. return
  257. }
  258. for i := 0; i < 4; i++ {
  259. if _, err = conn.Receive(); err != nil {
  260. log.Error("conn.Receive() error(%v)", err)
  261. return
  262. }
  263. }
  264. return
  265. }
  266. // StatsCache get playlist stats from cache.
  267. func (d *Dao) StatsCache(c context.Context, mid int64) (res []*model.PlStat, err error) {
  268. key := keyStat(mid)
  269. conn := d.redis.Get(c)
  270. defer conn.Close()
  271. var (
  272. bss [][]byte
  273. )
  274. if bss, err = redis.ByteSlices(conn.Do("HGETALL", key)); err != nil {
  275. if err == redis.ErrNil {
  276. err = nil
  277. } else {
  278. log.Error("StatCache conn.Do(HGETALL,%s) error(%v)", key, err)
  279. }
  280. return
  281. }
  282. for i := 1; i <= len(bss); i += 2 {
  283. stat := new(model.PlStat)
  284. if err = json.Unmarshal(bss[i], stat); err != nil {
  285. log.Error("json.Unmarshal(%s) error(%v)", string(bss[i]), err)
  286. continue
  287. }
  288. res = append(res, stat)
  289. }
  290. return
  291. }