redis.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "go-common/library/cache/redis"
  6. "go-common/library/log"
  7. )
  8. const (
  9. _keyWaitBlock = "wb_" // b_batch_no wait block
  10. _keyBlock = "bl_" // b_batch_no block
  11. _preLock = "lk_"
  12. _keyUniqueCheck = "uc:"
  13. times = 3
  14. )
  15. // keyWaitBlock return block cache key.
  16. func keyWaitBlock(batchNo int64) string {
  17. return _keyWaitBlock + fmt.Sprintf("%d", batchNo)
  18. }
  19. // keyBlock return block cache key.
  20. func keyBlock() string {
  21. return _keyBlock
  22. }
  23. func lockKey(key string) string {
  24. return _preLock + key
  25. }
  26. func uniqueCheckKey(uuid string) string {
  27. return _keyUniqueCheck + uuid
  28. }
  29. // BlockMidCache get wait block mids.
  30. func (d *Dao) BlockMidCache(c context.Context, batchNo int64, num int64) (res []int64, err error) {
  31. var (
  32. conn = d.redis.Get(c)
  33. key = keyWaitBlock(batchNo)
  34. )
  35. defer conn.Close()
  36. if res, err = redis.Int64s(conn.Do("ZREVRANGEBYSCORE", key, "+inf", "-inf", "LIMIT", 0, num)); err != nil {
  37. log.Error("redis(ZREVRANGEBYSCORE %s,%d) error(%v)", key, num, err)
  38. return
  39. }
  40. return
  41. }
  42. // DelBlockCache delete the wait block redis.
  43. func (d *Dao) DelBlockCache(c context.Context, batchNo int64, mid int64) (err error) {
  44. var (
  45. key = keyWaitBlock(batchNo)
  46. args = []interface{}{key, mid}
  47. )
  48. conn := d.redis.Get(c)
  49. defer conn.Close()
  50. if err = conn.Send("ZREM", args...); err != nil {
  51. log.Error("conn.Send(ZREM %s,%v) error(%v)", key, mid, err)
  52. return
  53. }
  54. if err = conn.Flush(); err != nil {
  55. log.Error("conn.Flush() error(%v)", err)
  56. return
  57. }
  58. if _, err = conn.Receive(); err != nil {
  59. log.Error("conn.Receive() error(%v)", err)
  60. return
  61. }
  62. return
  63. }
  64. //SetNXLockCache redis lock.
  65. func (d *Dao) SetNXLockCache(c context.Context, k string, times int64) (res bool, err error) {
  66. var (
  67. key = lockKey(k)
  68. conn = d.redis.Get(c)
  69. )
  70. defer conn.Close()
  71. if res, err = redis.Bool(conn.Do("SETNX", key, "1")); err != nil {
  72. if err == redis.ErrNil {
  73. err = nil
  74. } else {
  75. log.Error("conn.Do(SETNX(%d)) error(%v)", key, err)
  76. return
  77. }
  78. }
  79. if res {
  80. if _, err = redis.Bool(conn.Do("EXPIRE", key, times)); err != nil {
  81. log.Error("conn.Do(EXPIRE, %s, %d) error(%v)", key, times, err)
  82. return
  83. }
  84. }
  85. return
  86. }
  87. //DelLockCache del lock cache.
  88. func (d *Dao) DelLockCache(c context.Context, k string) (err error) {
  89. var (
  90. key = lockKey(k)
  91. conn = d.redis.Get(c)
  92. )
  93. defer conn.Close()
  94. if _, err = conn.Do("DEL", key); err != nil {
  95. log.Error("conn.Do(del,%v) err(%v)", key, err)
  96. }
  97. return
  98. }
  99. //AddBlockCache add block cache.
  100. func (d *Dao) AddBlockCache(c context.Context, mid int64, score int8, blockNo int64) (err error) {
  101. var (
  102. key = keyWaitBlock(blockNo)
  103. )
  104. conn := d.redis.Get(c)
  105. defer conn.Close()
  106. if err = conn.Send("ZADD", key, score, mid); err != nil {
  107. log.Error("conn.Send(ZADD %s,%d,%d) error(%v)", key, score, mid, err)
  108. return
  109. }
  110. if err = conn.Send("EXPIRE", key, d.expire); err != nil {
  111. log.Error("conn.Send(EXPIRE) error(%v)", err)
  112. return
  113. }
  114. if err = conn.Flush(); err != nil {
  115. log.Error("conn.Flush() error(%v)", err)
  116. return
  117. }
  118. for i := 0; i < 2; i++ {
  119. if _, err = conn.Receive(); err != nil {
  120. log.Error("conn.Receive() error(%v)", err)
  121. return
  122. }
  123. }
  124. return
  125. }
  126. // SetBlockCache block.
  127. func (d *Dao) SetBlockCache(c context.Context, mids []int64) (err error) {
  128. var (
  129. key = keyBlock()
  130. conn = d.redis.Get(c)
  131. )
  132. defer conn.Close()
  133. for _, mid := range mids {
  134. if err = conn.Send("SADD", key, mid); err != nil {
  135. log.Error("SADD conn.Send error(%v)", err)
  136. return
  137. }
  138. }
  139. if err = conn.Send("EXPIRE", key, d.expire); err != nil {
  140. log.Error("EXPIRE conn.Send error(%v)", err)
  141. return
  142. }
  143. if err = conn.Flush(); err != nil {
  144. log.Error("conn.Flush error(%v)", err)
  145. return
  146. }
  147. for i := 0; i < len(mids); i++ {
  148. if _, err = conn.Receive(); err != nil {
  149. log.Error("SetBlockCache Receive error(%v)", err)
  150. return
  151. }
  152. }
  153. return
  154. }
  155. //SPOPBlockCache pop mid.
  156. func (d *Dao) SPOPBlockCache(c context.Context) (mid int64, err error) {
  157. var (
  158. key = keyBlock()
  159. conn = d.redis.Get(c)
  160. )
  161. defer conn.Close()
  162. if mid, err = redis.Int64(conn.Do("SPOP", key)); err != nil {
  163. if err == redis.ErrNil {
  164. err = nil
  165. } else {
  166. log.Error("SPOP conn.Do(%s,%v) err(%v)", key, err)
  167. }
  168. }
  169. return
  170. }
  171. // PingRedis check redis connection
  172. func (d *Dao) PingRedis(c context.Context) (err error) {
  173. conn := d.redis.Get(c)
  174. _, err = conn.Do("SET", "PING", "PONG")
  175. conn.Close()
  176. return
  177. }
  178. // PfaddCache SetNX.
  179. func (d *Dao) PfaddCache(c context.Context, uuid string) (ok bool, err error) {
  180. conn := d.redis.Get(c)
  181. defer conn.Close()
  182. key := uniqueCheckKey(uuid)
  183. if err = conn.Send("SETNX", key, 1); err != nil {
  184. log.Error("SETNX conn.Send error(%v)", err)
  185. return
  186. }
  187. if err = conn.Send("EXPIRE", key, d.msgUUIDExpire); err != nil {
  188. log.Error("conn.Send(EXPIRE) error(%v)", err)
  189. return
  190. }
  191. if err = conn.Flush(); err != nil {
  192. log.Error("DelLock conn.Flush() error(%v)", err)
  193. return
  194. }
  195. if ok, err = redis.Bool(conn.Receive()); err != nil {
  196. log.Error("conn.Receive() error(%v)", err)
  197. return
  198. }
  199. if _, err = conn.Receive(); err != nil {
  200. log.Error("conn.Receive() error(%v)", err)
  201. }
  202. return
  203. }
  204. // TTL get redis cache ttl.
  205. func (d *Dao) TTL(c context.Context, key string) (ttl int64, err error) {
  206. conn := d.redis.Get(c)
  207. ttl, err = redis.Int64(conn.Do("TTL", key))
  208. conn.Close()
  209. return
  210. }