redis.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/interface/main/push-archive/model"
  7. "go-common/library/cache/redis"
  8. "go-common/library/log"
  9. )
  10. const (
  11. _prefixUpperLimit = "pau_%d"
  12. _prefixFanLimit = "paf_%d"
  13. _statisticsKey = "statistics_push_archive"
  14. _prefixPerUpperLimit = "perup_%d_%d"
  15. )
  16. func (d *Dao) do(c context.Context, command string, key string, args ...interface{}) (reply interface{}, err error) {
  17. conn := d.redis.Get(c)
  18. defer conn.Close()
  19. values := []interface{}{key}
  20. if len(args) > 0 {
  21. values = append(values, args...)
  22. }
  23. reply, err = conn.Do(command, values...)
  24. return
  25. }
  26. func upperLimitKey(mid int64) string {
  27. return fmt.Sprintf(_prefixUpperLimit, mid)
  28. }
  29. // pingRedis ping redis.
  30. func (d *Dao) pingRedis(c context.Context) (err error) {
  31. if _, err = d.do(c, "SET", "PING", "PONG"); err != nil {
  32. PromError("redis: ping remote")
  33. log.Error("remote redis: conn.Do(SET,PING,PONG) error(%v)", err)
  34. }
  35. return
  36. }
  37. // ExistUpperLimitCache judge that whether upper push limit cache exists.
  38. func (d *Dao) ExistUpperLimitCache(c context.Context, upper int64) (exist bool, err error) {
  39. key := upperLimitKey(upper)
  40. if exist, err = redis.Bool(d.do(c, "EXISTS", key)); err != nil {
  41. PromError("redis:读取upper推送限制")
  42. log.Error("ExistUpperLimitCache do(EXISTS, %s) error(%v)", key, err)
  43. }
  44. return
  45. }
  46. // AddUpperLimitCache sets upper push limit cache.
  47. func (d *Dao) AddUpperLimitCache(c context.Context, upper int64) (err error) {
  48. key := upperLimitKey(upper)
  49. if _, err = d.do(c, "SETEX", key, d.UpperLimitExpire, ""); err != nil {
  50. PromError("redis:添加upper推送限制")
  51. log.Error("AddUpperLimitCache do(SETEX, %s) error(%v)", key, err)
  52. }
  53. return
  54. }
  55. //fanLimitKey 粉丝推送总次数限制key
  56. func fanLimitKey(fan int64, relationType int) string {
  57. key := fmt.Sprintf(_prefixFanLimit, fan)
  58. if relationType != model.RelationSpecial {
  59. key = fmt.Sprintf("%s_%d", key, relationType)
  60. }
  61. return key
  62. }
  63. //GetFanLimitCache 读取粉丝限制的当前值
  64. func (d *Dao) GetFanLimitCache(c context.Context, fan int64, relationType int) (limit int, err error) {
  65. key := fanLimitKey(fan, relationType)
  66. if limit, err = redis.Int(d.do(c, "GET", key)); err != nil {
  67. if err == redis.ErrNil {
  68. err = nil
  69. } else {
  70. log.Error("GetFanLimitCache do(GET) error(%v)", err)
  71. }
  72. }
  73. return
  74. }
  75. //AddFanLimitCache 添加粉丝限制的缓存
  76. func (d *Dao) AddFanLimitCache(c context.Context, fan int64, relationType int, value int, expire int32) (err error) {
  77. key := fanLimitKey(fan, relationType)
  78. if _, err = d.do(c, "SETEX", key, expire, value); err != nil {
  79. log.Error("AddFanLimitCache do(SETEX) error(%v)", err)
  80. PromError("redis:添加fan推送限制")
  81. }
  82. return
  83. }
  84. //AddStatisticsCache 添加统计数据到redis
  85. func (d *Dao) AddStatisticsCache(c context.Context, ps *model.PushStatistic) (err error) {
  86. psByte, err := json.Marshal(*ps)
  87. if err != nil {
  88. log.Error("AddStatisticsCache json.Marshal error(%v), pushstatistic(%v)", err, ps)
  89. return
  90. }
  91. key := _statisticsKey
  92. if _, err = d.do(c, "LPUSH", key, string(psByte)); err != nil {
  93. log.Error("AddStatisticsCache do(LPUSH, %s) error(%v) pushstatistic(%v)", key, err, ps)
  94. PromError("redis:添加统计数据")
  95. }
  96. return
  97. }
  98. //GetStatisticsCache 读取一条统计数据
  99. func (d *Dao) GetStatisticsCache(c context.Context) (ps *model.PushStatistic, err error) {
  100. key := _statisticsKey
  101. psStr, err := redis.String(d.do(c, "RPOP", key))
  102. if err != nil {
  103. if err == redis.ErrNil {
  104. err = nil
  105. } else {
  106. log.Error("GetStatisticsCache do(RPOP, %s) error(%v)", key, err)
  107. }
  108. return
  109. }
  110. if err = json.Unmarshal([]byte(psStr), &ps); err != nil {
  111. log.Error("GetStatisticsCache json.Unmarshal error(%v), ps(%s)", err, psStr)
  112. return
  113. }
  114. return
  115. }
  116. //perUpperLimitKey 粉丝每个upper主的推送次数限制key
  117. func perUpperLimitKey(fan int64, upper int64) string {
  118. return fmt.Sprintf(_prefixPerUpperLimit, fan, upper)
  119. }
  120. //GetPerUpperLimitCache 粉丝每个upper主的已推送次数
  121. func (d *Dao) GetPerUpperLimitCache(c context.Context, fan int64, upper int64) (limit int, err error) {
  122. key := perUpperLimitKey(fan, upper)
  123. if limit, err = redis.Int(d.do(c, "GET", key)); err != nil {
  124. if err == redis.ErrNil {
  125. err = nil
  126. } else {
  127. log.Error("GetPerUpperLimitCache do(GET, %s) error(%v)", key, err)
  128. }
  129. }
  130. return
  131. }
  132. //AddPerUpperLimitCache 添加粉丝每个up主的推送次数
  133. func (d *Dao) AddPerUpperLimitCache(c context.Context, fan int64, upper int64, value int, expire int32) (err error) {
  134. key := perUpperLimitKey(fan, upper)
  135. if _, err = d.do(c, "SETEX", key, expire, value); err != nil {
  136. log.Error("AddPerUpperLimitCache do(SETEX, %s, %d, %d) error(%v)", key, expire, value, err)
  137. PromError("redis:添加perupper推送限制")
  138. }
  139. return
  140. }