retry.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/archive/model/result"
  7. "go-common/app/job/main/archive/model/retry"
  8. "go-common/library/cache/redis"
  9. "go-common/library/log"
  10. )
  11. func (s *Service) retryproc() {
  12. defer s.waiter.Done()
  13. for {
  14. if s.closeRetry {
  15. return
  16. }
  17. var (
  18. c = context.TODO()
  19. bs []byte
  20. err error
  21. )
  22. bs, err = s.PopFail(c)
  23. if err != nil || bs == nil {
  24. time.Sleep(5 * time.Second)
  25. continue
  26. }
  27. msg := &retry.Info{}
  28. if err = json.Unmarshal(bs, msg); err != nil {
  29. log.Error("json.Unretry dedeSyncmarshal(%s) error(%v)", bs, err)
  30. continue
  31. }
  32. log.Info("retry %s %s", retry.FailUpCache, bs)
  33. switch msg.Action {
  34. case retry.FailUpCache:
  35. s.updateResultCache(&result.Archive{AID: msg.Data.Aid, State: msg.Data.State}, nil)
  36. case retry.FailDatabus:
  37. var upInfo = &result.ArchiveUpInfo{Table: msg.Data.DatabusMsg.Table, Action: msg.Data.DatabusMsg.Table, Nw: msg.Data.DatabusMsg.Nw, Old: msg.Data.DatabusMsg.Old}
  38. s.sendNotify(upInfo)
  39. case retry.FailUpVideoCache:
  40. s.upVideoCache(msg.Data.Aid, msg.Data.Cids)
  41. case retry.FailDelVideoCache:
  42. s.delVideoCache(msg.Data.Aid, msg.Data.Cids)
  43. case retry.FailResultAdd:
  44. s.arcUpdate(msg.Data.Aid)
  45. default:
  46. continue
  47. }
  48. }
  49. }
  50. // PushFail rpush fail item to redis
  51. func (s *Service) PushFail(c context.Context, a interface{}) (err error) {
  52. var (
  53. conn = s.redis.Get(c)
  54. bs []byte
  55. )
  56. defer conn.Close()
  57. if bs, err = json.Marshal(a); err != nil {
  58. log.Error("json.Marshal(%v) error(%v)", a, err)
  59. return
  60. }
  61. if _, err = conn.Do("RPUSH", retry.FailList, bs); err != nil {
  62. log.Error("conn.Do(RPUSH, %s, %s) error(%v)")
  63. }
  64. return
  65. }
  66. // PopFail lpop fail item from redis
  67. func (s *Service) PopFail(c context.Context) (bs []byte, err error) {
  68. var conn = s.redis.Get(c)
  69. defer conn.Close()
  70. if bs, err = redis.Bytes(conn.Do("LPOP", retry.FailList)); err != nil && err != redis.ErrNil {
  71. log.Error("redis.Bytes(conn.Do(LPOP, %s)) error(%v)", retry.FailList, err)
  72. return
  73. }
  74. return
  75. }