redis.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package databus
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/admin/main/videoup/model/archive"
  6. "go-common/app/admin/main/videoup/model/message"
  7. "go-common/library/cache/redis"
  8. "go-common/library/conf/env"
  9. "go-common/library/log"
  10. )
  11. const (
  12. _multSyncList = "m_sync_list"
  13. _prefixMsgInfo = "videoup_admin_msg"
  14. )
  15. // PopMsgCache get databus message from redis
  16. func (d *Dao) PopMsgCache(c context.Context) (msg *message.Videoup, err error) {
  17. var (
  18. conn = d.redis.Get(c)
  19. bs []byte
  20. )
  21. defer conn.Close()
  22. if bs, err = redis.Bytes(conn.Do("LPOP", fixRedisList(_prefixMsgInfo))); err != nil {
  23. if err == redis.ErrNil {
  24. err = nil
  25. } else {
  26. log.Error("conn.Do(LPOP, %s) error(%v)", fixRedisList(_prefixMsgInfo), err)
  27. }
  28. return
  29. }
  30. msg = &message.Videoup{}
  31. if err = json.Unmarshal(bs, msg); err != nil {
  32. log.Error("json.Unmarshal error(%v)", err)
  33. }
  34. return
  35. }
  36. // PushMsgCache add message into redis.
  37. func (d *Dao) PushMsgCache(c context.Context, msg *message.Videoup) (err error) {
  38. var (
  39. bs []byte
  40. conn = d.redis.Get(c)
  41. )
  42. defer conn.Close()
  43. if bs, err = json.Marshal(msg); err != nil {
  44. log.Error("json.Marshal(%s) error(%v)", bs, err)
  45. return
  46. }
  47. if _, err = conn.Do("RPUSH", fixRedisList(_prefixMsgInfo), bs); err != nil {
  48. log.Error("conn.Do(RPUSH, %s) error(%v)", bs, err)
  49. }
  50. return
  51. }
  52. func fixRedisList(list string) (target string) {
  53. if env.DeployEnv == env.DeployEnvPre {
  54. target = "pre_" + list
  55. } else {
  56. target = list
  57. }
  58. return
  59. }
  60. // PushMultSync rpush stuct item to redis
  61. func (d *Dao) PushMultSync(c context.Context, sync *archive.MultSyncParam) (ok bool, err error) {
  62. var (
  63. conn = d.redis.Get(c)
  64. bs []byte
  65. )
  66. defer conn.Close()
  67. if bs, err = json.Marshal(sync); err != nil {
  68. log.Error("json.Marshal(%v) error(%v)", sync, err)
  69. return
  70. }
  71. if err = conn.Send("SADD", fixRedisList(_multSyncList), bs); err != nil {
  72. log.Error("conn.Send(SADD, %s, %s) error(%v)", fixRedisList(_multSyncList), bs, err)
  73. return
  74. }
  75. if err = conn.Flush(); err != nil {
  76. log.Error("conn.Flush error(%v)", err)
  77. return
  78. }
  79. if ok, err = redis.Bool(conn.Receive()); err != nil {
  80. log.Error("conn.Receive error(%v)", err)
  81. }
  82. return
  83. }
  84. // PopMultSync lpop stuct item from redis
  85. func (d *Dao) PopMultSync(c context.Context) (res *archive.MultSyncParam, err error) {
  86. var (
  87. conn = d.redis.Get(c)
  88. bs []byte
  89. sync = &archive.MultSyncParam{}
  90. )
  91. defer conn.Close()
  92. if bs, err = redis.Bytes(conn.Do("SPOP", fixRedisList(_multSyncList))); err != nil && err != redis.ErrNil {
  93. log.Error("redis.Bytes(conn.Do(SPOP, %s)) error(%v)", fixRedisList(_multSyncList), err)
  94. return
  95. }
  96. if len(bs) == 0 {
  97. return
  98. }
  99. if err = json.Unmarshal(bs, sync); err != nil {
  100. log.Error("json.Unmarshal(%s) error(%v)", sync, err)
  101. return
  102. }
  103. res = sync
  104. return
  105. }