retry.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/archive-shjd/model"
  7. "go-common/library/cache/redis"
  8. "go-common/library/log"
  9. "github.com/pkg/errors"
  10. )
  11. const (
  12. _retryList = "retry_list"
  13. )
  14. func (s *Service) retryconsumer() {
  15. defer s.waiter.Done()
  16. for {
  17. if s.close {
  18. log.Info("retryconsumer closed")
  19. return
  20. }
  21. var (
  22. c = context.TODO()
  23. err error
  24. item *model.RetryItem
  25. )
  26. if item, err = s.PopItem(c); err != nil {
  27. log.Error("%+v")
  28. time.Sleep(2 * time.Second)
  29. continue
  30. }
  31. if item == nil {
  32. time.Sleep(1 * time.Second)
  33. continue
  34. }
  35. log.Info("get retry item(%+v)", item)
  36. switch item.Tp {
  37. case model.TypeForDelVideo:
  38. s.DelteVideoCache(item.AID, item.CID)
  39. case model.TypeForUpdateVideo:
  40. s.UpdateVideoCache(item.AID, item.CID)
  41. case model.TypeForUpdateArchive:
  42. s.UpdateCache(item.Old, item.Nw, item.Action)
  43. }
  44. time.Sleep(10 * time.Millisecond)
  45. }
  46. }
  47. // PushItem is
  48. func (s *Service) PushItem(c context.Context, item *model.RetryItem) (err error) {
  49. conn := s.rds.Get(c)
  50. defer conn.Close()
  51. bs, err := json.Marshal(item)
  52. if err != nil {
  53. err = errors.Wrap(err, "json.Marshal")
  54. return
  55. }
  56. if _, err = conn.Do("RPUSH", _retryList, bs); err != nil {
  57. err = errors.Wrap(err, "conn.Send(RPUSH)")
  58. return
  59. }
  60. return
  61. }
  62. // PopItem is
  63. func (s *Service) PopItem(c context.Context) (item *model.RetryItem, err error) {
  64. conn := s.rds.Get(c)
  65. defer conn.Close()
  66. bs, err := redis.Bytes(conn.Do("LPOP", _retryList))
  67. if err != nil {
  68. if err == redis.ErrNil {
  69. err = nil
  70. return
  71. }
  72. err = errors.WithStack(err)
  73. return
  74. }
  75. item = &model.RetryItem{}
  76. if err = json.Unmarshal(bs, item); err != nil {
  77. err = errors.WithStack(err)
  78. return
  79. }
  80. return
  81. }