redis.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "time"
  7. "go-common/library/cache/redis"
  8. "go-common/library/log"
  9. )
  10. const (
  11. _keyUperWeightGroup = "platform_uper_weight_param_2" // 工作台任务权重参数(用户维度)
  12. _prefixKeyMissionSortedSet = "platform_missions_" // 工作台任务池 有序集合(权重排序)
  13. _prefixKeySingleExpire = "platform_single_expire_" // 单条工单的开始处理时间 有序集合
  14. _relatedMissions = "platfrom_missions_%d_%d" // 当前认领任务
  15. )
  16. // SetList .
  17. func (d *Dao) SetList(c context.Context, key string, ids []int64) (err error) {
  18. conn := d.redis.Get(c)
  19. defer conn.Close()
  20. for _, id := range ids {
  21. now := time.Now().Format("2006-01-02 15:04:05")
  22. log.Info("enter queue id is %d time is %s", id, now)
  23. if err = conn.Send("LPUSH", key, id); err != nil {
  24. log.Error("d.LPUSH error(%v)", err)
  25. return
  26. }
  27. }
  28. if err = conn.Flush(); err != nil {
  29. log.Error("conn.Flush error(%v)", err)
  30. }
  31. return
  32. }
  33. // ExistKey .
  34. func (d *Dao) ExistKey(c context.Context, key string) (exist bool, err error) {
  35. conn := d.redis.Get(c)
  36. defer conn.Close()
  37. exist, err = redis.Bool(conn.Do("EXISTS", key))
  38. return
  39. }
  40. // SetString .
  41. func (d *Dao) SetString(c context.Context, key, val string) (err error) {
  42. var conn = d.redis.Get(c)
  43. defer conn.Close()
  44. _, err = conn.Do("SET", key, val)
  45. return
  46. }
  47. // SetCrash .
  48. func (d *Dao) SetCrash(c context.Context) (err error) {
  49. key, val := "dead", "1"
  50. err = d.SetString(c, key, val)
  51. return
  52. }
  53. // IsCrash .
  54. func (d *Dao) IsCrash(c context.Context) (exist bool, err error) {
  55. key := "dead"
  56. exist, err = d.ExistKey(c, key)
  57. return
  58. }
  59. // UperInfoCache 读取用户维度的申诉weight计算参数
  60. func (d *Dao) UperInfoCache(c context.Context, apIDs []int64) (params []int64, err error) {
  61. conn := d.redis.Get(c)
  62. defer conn.Close()
  63. args := redis.Args{}
  64. args = args.Add(_keyUperWeightGroup)
  65. for _, apid := range apIDs {
  66. args = args.Add(apid)
  67. }
  68. if params, err = redis.Int64s(conn.Do("HMGET", args...)); err != nil {
  69. log.Error("HMGET %v error(%v)", args, err)
  70. }
  71. return
  72. }
  73. // DelUperInfo .
  74. func (d *Dao) DelUperInfo(c context.Context, mids []int64) (err error) {
  75. conn := d.redis.Get(c)
  76. defer conn.Close()
  77. args := redis.Args{}
  78. args = args.Add(_keyUperWeightGroup)
  79. for _, mid := range mids {
  80. args = args.Add(mid)
  81. }
  82. if _, err = conn.Do("HDEL", args...); err != nil {
  83. log.Error("HDEL %v error(%v)", args, err)
  84. }
  85. return
  86. }
  87. // SetWeightSortedSet 覆盖sorted set
  88. func (d *Dao) SetWeightSortedSet(c context.Context, bid int, newWeight map[int64]int64) (err error) {
  89. key := _prefixKeyMissionSortedSet + strconv.Itoa(bid)
  90. conn := d.redis.Get(c)
  91. defer conn.Close()
  92. args := redis.Args{}
  93. args = args.Add(key)
  94. for id, weight := range newWeight {
  95. args = args.Add(weight, id)
  96. }
  97. if _, err = conn.Do("ZADD", args...); err != nil { // ZADD key score member [[score member] [score member] ...]
  98. log.Error("ZADD %v error(%v)", args, err)
  99. }
  100. return
  101. }
  102. // SingleExpire 获取所有的 single expire 信息
  103. func (d *Dao) SingleExpire(c context.Context, bid int) (delIDs []int64, err error) {
  104. key := _prefixKeySingleExpire + strconv.Itoa(bid)
  105. conn := d.redis.Get(c)
  106. defer conn.Close()
  107. floorTime := time.Now().Add(-480 * time.Second).Unix()
  108. // fixme if too hash field too many
  109. delIDs, err = redis.Int64s(conn.Do("ZRANGEBYSCORE", key, "0", floorTime, "LIMIT", "0", "50"))
  110. return
  111. }
  112. // DelSingleExpire .
  113. func (d *Dao) DelSingleExpire(c context.Context, bid int, ids []int64) (err error) {
  114. key := _prefixKeySingleExpire + strconv.Itoa(bid)
  115. conn := d.redis.Get(c)
  116. defer conn.Close()
  117. args := redis.Args{}
  118. args = args.Add(key)
  119. for _, apID := range ids {
  120. args = args.Add(apID)
  121. }
  122. if _, err = conn.Do("ZREM", args...); err != nil {
  123. log.Error("ZREM %v error(%v)", args, err)
  124. }
  125. return
  126. }
  127. // DelRelatedMissions .
  128. func (d *Dao) DelRelatedMissions(ctx context.Context, bid, transAdmin int, ids []int64) (err error) {
  129. if len(ids) == 0 {
  130. return
  131. }
  132. key := fmt.Sprintf(_relatedMissions, transAdmin, bid)
  133. conn := d.redis.Get(ctx)
  134. defer conn.Close()
  135. args := redis.Args{}.Add(key)
  136. for _, id := range ids {
  137. args = args.Add(id)
  138. }
  139. _, err = conn.Do("ZREM", args...)
  140. return
  141. }