weight.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package task
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/admin/main/videoup/model/archive"
  7. "go-common/library/cache/redis"
  8. "go-common/library/log"
  9. )
  10. const (
  11. _twexpire = 24 * 60 * 60 // 1 day
  12. )
  13. func key(id int64) string {
  14. return fmt.Sprintf("tw_%d", id)
  15. }
  16. //SetWeightRedis 设置权重配置
  17. func (d *Dao) SetWeightRedis(c context.Context, mcases map[int64]*archive.TaskPriority) (err error) {
  18. conn := d.redis.Get(c)
  19. defer conn.Close()
  20. for tid, mcase := range mcases {
  21. var bs []byte
  22. key := key(tid)
  23. if bs, err = json.Marshal(mcase); err != nil {
  24. log.Error("json.Marshal(%+v) error(%v)", mcase, err)
  25. continue
  26. }
  27. if err = conn.Send("SET", key, bs); err != nil {
  28. log.Error("SET error(%v)", err)
  29. continue
  30. }
  31. if err = conn.Send("EXPIRE", key, _twexpire); err != nil {
  32. log.Error("EXPIRE error(%v)", err)
  33. continue
  34. }
  35. }
  36. if err = conn.Flush(); err != nil {
  37. log.Error("conn.Flush error(%v)", err)
  38. return
  39. }
  40. for i := 0; i < 2*len(mcases); i++ {
  41. if _, err = conn.Receive(); err != nil {
  42. log.Error("conn.Receive() error(%v)", err)
  43. return
  44. }
  45. }
  46. return
  47. }
  48. //GetWeightRedis 获取实时任务的权重配置
  49. func (d *Dao) GetWeightRedis(c context.Context, ids []int64) (mcases map[int64]*archive.TaskPriority, err error) {
  50. conn := d.redis.Get(c)
  51. defer conn.Close()
  52. mcases = make(map[int64]*archive.TaskPriority)
  53. for _, id := range ids {
  54. var bs []byte
  55. key := key(int64(id))
  56. if bs, err = redis.Bytes(conn.Do("GET", key)); err != nil {
  57. if err == redis.ErrNil {
  58. err = nil
  59. } else {
  60. log.Error("conn.Do(GET, %v) error(%v)", key, err)
  61. }
  62. continue
  63. }
  64. p := &archive.TaskPriority{}
  65. if err = json.Unmarshal(bs, p); err != nil {
  66. log.Error("json.Unmarshal(%s) error(%v)", string(bs), err)
  67. err = nil
  68. continue
  69. }
  70. mcases[int64(id)] = p
  71. }
  72. return
  73. }