redis.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package databus
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/service/main/videoup/model/message"
  7. "go-common/library/cache/redis"
  8. "go-common/library/log"
  9. )
  10. const (
  11. _prefixMsgInfo = "videoup_service_msg"
  12. _preLock = "lock_"
  13. )
  14. func lockKey(key string) string {
  15. return fmt.Sprintf("%s%s", _preLock, key)
  16. }
  17. // PopMsgCache get databus message from redis
  18. func (d *Dao) PopMsgCache(c context.Context) (msg *message.Videoup, err error) {
  19. var (
  20. conn = d.redis.Get(c)
  21. bs []byte
  22. )
  23. defer conn.Close()
  24. if bs, err = redis.Bytes(conn.Do("LPOP", _prefixMsgInfo)); err != nil {
  25. if err == redis.ErrNil {
  26. err = nil
  27. } else {
  28. log.Error("conn.Do(LPOP, %s) error(%v)", _prefixMsgInfo, err)
  29. }
  30. return
  31. }
  32. msg = &message.Videoup{}
  33. if err = json.Unmarshal(bs, msg); err != nil {
  34. log.Error("json.Unmarshal error(%v)", err)
  35. }
  36. return
  37. }
  38. // PushMsgCache add message into redis.
  39. func (d *Dao) PushMsgCache(c context.Context, msg *message.Videoup) (err error) {
  40. var (
  41. bs []byte
  42. conn = d.redis.Get(c)
  43. )
  44. defer conn.Close()
  45. if bs, err = json.Marshal(msg); err != nil {
  46. log.Error("json.Marshal(%s) error(%v)", bs, err)
  47. return
  48. }
  49. if _, err = conn.Do("RPUSH", _prefixMsgInfo, bs); err != nil {
  50. log.Error("conn.Do(RPUSH, %s) error(%v)", bs, err)
  51. }
  52. return
  53. }
  54. //Lock .
  55. func (d *Dao) Lock(ctx context.Context, key string, ttl int) (gotLock bool, err error) {
  56. var lockValue = "1"
  57. conn := d.redis.Get(ctx)
  58. defer conn.Close()
  59. realKey := lockKey(key)
  60. var res interface{}
  61. //ttl 毫秒(PX) NX 其实就是 SetNX功能
  62. res, err = conn.Do("SET", realKey, lockValue, "PX", ttl, "NX")
  63. if err != nil {
  64. log.Error("redis_lock failed:%s:%s", realKey, err.Error())
  65. return
  66. }
  67. if res != nil {
  68. gotLock = true
  69. }
  70. return
  71. }
  72. //UnLock .
  73. func (d *Dao) UnLock(ctx context.Context, key string) (err error) {
  74. realKey := lockKey(key)
  75. conn := d.redis.Get(ctx)
  76. defer conn.Close()
  77. _, err = conn.Do("DEL", realKey)
  78. return
  79. }