cluster.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package redis
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. modtask "go-common/app/admin/main/aegis/model/task"
  8. "go-common/library/cache/redis"
  9. "go-common/library/ecode"
  10. "go-common/library/log"
  11. "github.com/pkg/errors"
  12. )
  13. func lockKey(businessID, flowID int64) string {
  14. return fmt.Sprintf("aegis_lock_%d_%d", businessID, flowID)
  15. }
  16. func (d *Dao) getlock(c context.Context, bizid, flowid int64) (ok bool) {
  17. var (
  18. conn = d.cluster.Get(c)
  19. key = lockKey(bizid, flowid)
  20. err error
  21. )
  22. defer conn.Close()
  23. if ok, err = redis.Bool(conn.Do("SETNX", key, "1")); err != nil {
  24. if err == redis.ErrNil {
  25. err = nil
  26. } else {
  27. log.Error("conn.Do(SETNX(%s)) error(%v)", key, err)
  28. return
  29. }
  30. }
  31. if ok {
  32. conn.Do("EXPIRE", key, 3)
  33. }
  34. return
  35. }
  36. //SeizeTask .
  37. func (d *Dao) SeizeTask(c context.Context, businessID, flowID, uid, count int64) (hitids []int64, missids []int64, others map[int64]int64, err error) {
  38. var (
  39. lock bool
  40. pubkey = publicKey(businessID, flowID)
  41. ids []int64
  42. )
  43. // 1. 抢占分布式锁
  44. for lc := 0; lc < 3; lc++ {
  45. if lock = d.getlock(c, businessID, flowID); lock {
  46. break
  47. }
  48. time.Sleep(10 * time.Millisecond)
  49. }
  50. if !lock {
  51. log.Error("getlock getlock fail(%d,%d,%d)", businessID, flowID, uid)
  52. err = ecode.AegisTaskBusy
  53. return
  54. }
  55. conn := d.cluster.Get(c)
  56. defer conn.Close()
  57. defer conn.Do("DEL", lockKey(businessID, flowID))
  58. var (
  59. head, tail = int64(0), int64(count)
  60. )
  61. // 2. 从 public 按权重从高到低取出一批来
  62. for {
  63. if ids, err = redis.Int64s(conn.Do("ZRANGE", pubkey, head, tail)); err != nil {
  64. log.Error("redis (ZRANGE,%s,%d,%d) error(%v)", pubkey, 0, count, err)
  65. return
  66. }
  67. head += count
  68. tail += count
  69. if len(ids) == 0 {
  70. break
  71. }
  72. for _, id := range ids {
  73. if err = conn.Send("GET", haskKey(id)); err != nil {
  74. log.Error("redis (GET,%s) error(%v)", haskKey(id), err)
  75. return
  76. }
  77. }
  78. conn.Flush()
  79. var enough bool
  80. for _, id := range ids {
  81. var (
  82. bs []byte
  83. e error
  84. )
  85. bs, e = redis.Bytes(conn.Receive())
  86. if e != nil {
  87. log.Error("Receive Weight(%d) error(%v)", id, errors.WithStack(e))
  88. missids = append(missids, id)
  89. continue
  90. }
  91. task := &modtask.Task{}
  92. if e = json.Unmarshal(bs, task); err != nil {
  93. log.Error("json.Unmarshal error(%v)", errors.WithStack(e))
  94. missids = append(missids, id)
  95. continue
  96. }
  97. if task.ID != id {
  98. log.Error("id(%d-%d)不匹配", task.ID, id)
  99. missids = append(missids, id)
  100. continue
  101. }
  102. if task.UID != 0 && task.UID != uid {
  103. log.Info("id(%d) 任务已经指派给(%d)", task.ID, task.UID)
  104. missids = append(missids, id)
  105. continue
  106. }
  107. hitids = append(hitids, id)
  108. if len(hitids) >= int(count) {
  109. enough = true
  110. break
  111. }
  112. }
  113. if enough {
  114. break
  115. }
  116. }
  117. personKey := personalKey(businessID, flowID, uid)
  118. for _, id := range hitids {
  119. conn.Send("ZREM", pubkey, formatID(id))
  120. conn.Send("LREM", personKey, 0, id)
  121. conn.Send("RPUSH", personKey, id)
  122. }
  123. conn.Flush()
  124. for i := 0; i < len(hitids)*3; i++ {
  125. conn.Receive()
  126. }
  127. log.Info("rangefunc count(%d) hitids(%v) missids(%v)", count, hitids, missids)
  128. return
  129. }
  130. /*
  131. 遍历personal,delay,public。
  132. 在缓存中进行状态校验,public还要补充缓存权重
  133. */
  134. func (d *Dao) rangefuncCluster(c context.Context, listtype string, opt *modtask.ListOptions) (tasks map[int64]*modtask.Task, count int64, hitids, missids []int64, err error) {
  135. var (
  136. key string
  137. LENCMD, RANGECMD = "LLEN", "LRANGE"
  138. ids []int64
  139. )
  140. conn := d.cluster.Get(c)
  141. defer conn.Close()
  142. switch listtype {
  143. case "public":
  144. LENCMD, RANGECMD = "ZCARD", "ZRANGE"
  145. key = publicKey(opt.BusinessID, opt.FlowID)
  146. case "personal":
  147. key = personalKey(opt.BusinessID, opt.FlowID, opt.UID)
  148. case "delay":
  149. key = delayKey(opt.BusinessID, opt.FlowID, opt.UID)
  150. }
  151. // 1. 长度
  152. if count, err = redis.Int64(conn.Do(LENCMD, key)); err != nil {
  153. log.Error("redis (%s,%s) error(%v)", LENCMD, key, err)
  154. return
  155. }
  156. if count == 0 {
  157. return
  158. }
  159. if ids, err = redis.Int64s(conn.Do(RANGECMD, key, (opt.Pn-1)*opt.Ps, opt.Pn*opt.Ps-1)); err != nil {
  160. log.Error("redis (%s,%s,%d,%d) error(%v)", LENCMD, key, (opt.Pn-1)*opt.Ps, opt.Pn*opt.Ps, err)
  161. return
  162. }
  163. for _, id := range ids {
  164. if err = conn.Send("GET", haskKey(id)); err != nil {
  165. log.Error("redis (GET,%s) error(%v)", haskKey(id), err)
  166. return
  167. }
  168. if listtype == "public" {
  169. if err = conn.Send("ZSCORE", key, formatID(id)); err != nil {
  170. log.Error("redis (ZSCORE,%s,%s) error(%v)", key, formatID(id), err)
  171. return
  172. }
  173. }
  174. }
  175. conn.Flush()
  176. tasks = make(map[int64]*modtask.Task)
  177. for _, id := range ids {
  178. var (
  179. bs []byte
  180. e error
  181. wt int64
  182. )
  183. bs, e = redis.Bytes(conn.Receive())
  184. if listtype == "public" {
  185. wt, _ = redis.Int64(conn.Receive())
  186. wt = -wt
  187. }
  188. if e != nil {
  189. log.Error("Receive Weight(%d) error(%v)", id, errors.WithStack(e))
  190. missids = append(missids, id)
  191. continue
  192. }
  193. task := &modtask.Task{}
  194. if e = json.Unmarshal(bs, task); err != nil {
  195. log.Error("json.Unmarshal error(%v)", errors.WithStack(e))
  196. missids = append(missids, id)
  197. continue
  198. }
  199. if task.ID != id {
  200. log.Error("id(%d-%d)不匹配", task.ID, id)
  201. missids = append(missids, id)
  202. continue
  203. }
  204. // 缓存里状态同步不实时,不能用作校验
  205. tasks[task.ID] = task
  206. hitids = append(hitids, id)
  207. }
  208. log.Info("rangefunc count(%d) hitids(%v) missids(%v)", count, hitids, missids)
  209. return
  210. }