retry.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/videoup-report/model/archive"
  7. "go-common/app/job/main/videoup-report/model/email"
  8. "go-common/library/log"
  9. )
  10. func (s *Service) addRetry(c context.Context, aid int64, action string, flag int64, flagA int64) (err error) {
  11. member := email.Retry{
  12. AID: aid,
  13. Action: action,
  14. Flag: flag,
  15. FlagA: flagA,
  16. CreateTime: time.Now().Unix(),
  17. }
  18. err = s.email.PushRedis(c, member, email.RetryListKey)
  19. return
  20. }
  21. //popRetry spop a retry element from redis set
  22. func (s *Service) popRetry(c context.Context, key string) (member email.Retry, err error) {
  23. var bs []byte
  24. if bs, err = s.email.PopRedis(c, key); err != nil {
  25. return
  26. }
  27. if err = json.Unmarshal(bs, &member); err != nil {
  28. log.Error("PopRetry(%s) json.Unmarshal(%s) error(%v)", key, string(bs), err)
  29. }
  30. return
  31. }
  32. func (s *Service) removeRetry(c context.Context, aid int64, action string) (err error) {
  33. var (
  34. bs []byte
  35. list []interface{}
  36. reply int
  37. )
  38. for _, snew := range archive.ReplyState {
  39. for _, sold := range archive.ReplyState {
  40. member := email.Retry{
  41. AID: aid,
  42. Action: action,
  43. Flag: snew,
  44. FlagA: sold,
  45. }
  46. if bs, err = json.Marshal(member); err != nil {
  47. log.Error("removeRetry json.Marshal error(%v) member(%+v)", err, member)
  48. err = nil
  49. continue
  50. }
  51. list = append(list, string(bs))
  52. }
  53. }
  54. if reply, err = s.email.RemoveRedis(c, email.RetryListKey, list...); err != nil {
  55. log.Error("removeRetry s.email.RemoveRedis error(%v) aid(%d) action(%s)", err, aid, action)
  56. } else {
  57. log.Info("removeRetry s.email.RemoveRedis success reply(%d) aid(%d) action(%s)", reply, aid, action)
  58. }
  59. return
  60. }
  61. func (s *Service) retryProc() {
  62. defer s.waiter.Done()
  63. for {
  64. if s.closed {
  65. return
  66. }
  67. c := context.TODO()
  68. member, err := s.popRetry(c, email.RetryListKey)
  69. if err != nil {
  70. time.Sleep(5 * time.Second)
  71. continue
  72. }
  73. log.Info("retry member(%+v)", member)
  74. if member.CreateTime > 0 && (time.Now().Unix()-member.CreateTime >= 180) {
  75. log.Error("retry list too long for 3min")
  76. time.Sleep(5 * time.Millisecond)
  77. continue
  78. }
  79. switch member.Action {
  80. case email.RetryActionReply:
  81. a, err := s.arc.ArchiveByAid(c, member.AID)
  82. if err != nil {
  83. log.Error("retryProc s.arc.ArchiveByAid(%d) error(%v) member(%+v)", member.AID, err, member)
  84. s.addRetry(c, member.AID, member.Action, member.Flag, member.FlagA)
  85. continue
  86. }
  87. isOpen := isOpenReplyState(a.State) > 0
  88. if member.Flag == archive.ReplyOn && isOpen {
  89. err = s.openReply(c, a, member.FlagA)
  90. }
  91. if member.Flag == archive.ReplyOff && !isOpen {
  92. err = s.closeReply(c, a, member.FlagA)
  93. }
  94. //if err != nil {
  95. //s.addRetry(c, member.AID, member.Action, member.Flag, member.FlagA)
  96. //}
  97. default:
  98. log.Warn("retryProc unknown action(%s) member(%+v)", member.Action, member)
  99. }
  100. time.Sleep(10 * time.Millisecond)
  101. }
  102. }