task.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package redis
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. tmod "go-common/app/job/main/videoup-report/model/task"
  8. "go-common/library/cache/redis"
  9. "go-common/library/log"
  10. )
  11. const (
  12. _twexpire = 24 * 60 * 60 // 1 day
  13. )
  14. func key(id int64) string {
  15. return fmt.Sprintf("tw_%d", id)
  16. }
  17. //SetWeight 设置权重配置
  18. func (d *Dao) SetWeight(c context.Context, mcases map[int64]*tmod.WeightParams) (err error) {
  19. conn := d.secondary.Get(c)
  20. defer conn.Close()
  21. log.Info("SetWeight before len(%d) time(%v)", len(mcases), time.Now())
  22. for tid, mcase := range mcases {
  23. var bs []byte
  24. key := key(tid)
  25. if bs, err = json.Marshal(mcase); err != nil {
  26. log.Error("json.Marshal(%+v) error(%v)", mcase, err)
  27. continue
  28. }
  29. if err = conn.Send("SET", key, bs); err != nil {
  30. log.Error("SET error(%v)", err)
  31. continue
  32. }
  33. if err = conn.Send("EXPIRE", key, _twexpire); err != nil {
  34. log.Error("EXPIRE error(%v)", err)
  35. continue
  36. }
  37. }
  38. if err = conn.Flush(); err != nil {
  39. log.Error("conn.Flush error(%v)", err)
  40. return
  41. }
  42. for i := 0; i < 2*len(mcases); i++ {
  43. if _, err = conn.Receive(); err != nil {
  44. log.Error("conn.Receive() error(%v)", err)
  45. return
  46. }
  47. }
  48. log.Info("SetWeight end len(%d) time(%v)", len(mcases), time.Now())
  49. return
  50. }
  51. //GetWeight 获取实时任务的权重配置
  52. func (d *Dao) GetWeight(c context.Context, ids []int64) (mcases map[int64]*tmod.WeightParams, err error) {
  53. conn := d.secondary.Get(c)
  54. defer conn.Close()
  55. mcases = make(map[int64]*tmod.WeightParams)
  56. for _, id := range ids {
  57. var bs []byte
  58. key := key(int64(id))
  59. if bs, err = redis.Bytes(conn.Do("GET", key)); err != nil {
  60. if err == redis.ErrNil {
  61. err = nil
  62. } else {
  63. log.Error("conn.Do(GET, %v) error(%v)", key, err)
  64. }
  65. continue
  66. }
  67. p := &tmod.WeightParams{}
  68. if err = json.Unmarshal(bs, p); err != nil {
  69. log.Error("json.Unmarshal(%s) error(%v)", string(bs), err)
  70. err = nil
  71. continue
  72. }
  73. mcases[int64(id)] = p
  74. }
  75. return
  76. }