sync_retry.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package pgc
  2. import (
  3. "time"
  4. "go-common/app/job/main/tv/model/common"
  5. "go-common/app/job/main/tv/model/pgc"
  6. "go-common/library/log"
  7. )
  8. func (s *Service) addRetryEp(in *pgc.Content) {
  9. s.addRetryEps([]*pgc.Content{in})
  10. }
  11. // addRetryEps adds eps into retry list
  12. func (s *Service) addRetryEps(in []*pgc.Content) {
  13. var (
  14. epids []int
  15. newConts []*pgc.Content
  16. )
  17. for _, v := range in {
  18. if !s.retryLimit(false, int64(v.EPID)) { // filter retried too many times ep
  19. continue
  20. }
  21. newConts = append(newConts, v)
  22. epids = append(epids, v.EPID)
  23. }
  24. if len(newConts) == 0 {
  25. return
  26. }
  27. log.Warn("addRetryEps Add IDs %v", epids)
  28. s.ResuEps = append(s.ResuEps, newConts...)
  29. }
  30. // pickRetryEp picks the to-retry eps from memory
  31. func (s *Service) pickRetryEp() (res []*pgc.Content) {
  32. if len(s.ResuEps) == 0 {
  33. return
  34. }
  35. res = append(res, s.ResuEps...)
  36. log.Info("pickRetry EP Len %d", len(res))
  37. s.ResuEps = make([]*pgc.Content, 0)
  38. return
  39. }
  40. // re-submit eps
  41. func (s *Service) resubEps() {
  42. defer s.waiter.Done()
  43. for {
  44. if s.daoClosed {
  45. log.Info("resubEps DB closed!")
  46. return
  47. }
  48. readyEps := s.pickRetryEp() // pick to-retry eps from memory
  49. if len(readyEps) == 0 {
  50. log.Info("resubEps Empty")
  51. time.Sleep(time.Duration(s.c.Cfg.SyncRetry.RetryFre))
  52. continue
  53. }
  54. againEps := make([]*pgc.Content, 0)
  55. for _, ep := range readyEps { // retry them
  56. if err := s.epsSync(int64(ep.SeasonID), []*pgc.Content{ep}); err != nil { // if error, re-add this item into re-sub list
  57. log.Error("resubEps Sid %d, Epid %v, Err %v", ep.SeasonID, ep.EPID, err)
  58. againEps = append(againEps, ep)
  59. continue
  60. }
  61. retry := &common.SyncRetry{}
  62. retry.FromEp(0, int64(ep.EPID))
  63. s.dao.DelRetry(ctx, retry) // after succ, del it from MC
  64. }
  65. if len(againEps) > 0 {
  66. s.addRetryEps(againEps)
  67. }
  68. time.Sleep(time.Duration(s.c.Cfg.SyncRetry.RetryFre))
  69. }
  70. }
  71. func (s *Service) addRetrySn(in *pgc.TVEpSeason) {
  72. s.addRetrySns([]*pgc.TVEpSeason{in})
  73. }
  74. // addRetrySns adds sns into retry list
  75. func (s *Service) addRetrySns(in []*pgc.TVEpSeason) {
  76. var (
  77. sids []int64
  78. newConts []*pgc.TVEpSeason
  79. )
  80. for _, v := range in {
  81. if !s.retryLimit(true, v.ID) { // filter retried too many times ep
  82. continue
  83. }
  84. newConts = append(newConts, v)
  85. sids = append(sids, v.ID)
  86. }
  87. log.Warn("addRetrySns Add IDs %v", sids)
  88. s.ResuSns = append(s.ResuSns, newConts...)
  89. }
  90. // pickRetryEp picks the to-retry eps from memory
  91. func (s *Service) pickRetrySn() (res []*pgc.TVEpSeason) {
  92. if len(s.ResuSns) == 0 {
  93. return
  94. }
  95. res = append(res, s.ResuSns...)
  96. log.Info("pickRetry Sn Len %d", len(res))
  97. s.ResuSns = make([]*pgc.TVEpSeason, 0)
  98. return
  99. }
  100. // re-submit eps
  101. func (s *Service) resubSns() {
  102. defer s.waiter.Done()
  103. for {
  104. if s.daoClosed {
  105. log.Info("resubSns DB closed!")
  106. return
  107. }
  108. readySns := s.pickRetrySn()
  109. if len(readySns) == 0 {
  110. log.Info("resubSns Empty")
  111. time.Sleep(time.Duration(s.c.Cfg.SyncRetry.RetryFre))
  112. continue
  113. }
  114. againSns := make([]*pgc.TVEpSeason, 0)
  115. for _, sn := range readySns {
  116. if err := s.snSync(sn); err != nil { // if error, re-add this item into re-sub list
  117. log.Error("resubSns Sid %d, Err %v", sn.ID, err)
  118. againSns = append(againSns, sn)
  119. continue
  120. }
  121. retry := &common.SyncRetry{}
  122. retry.FromSn(0, sn.ID)
  123. s.dao.DelRetry(ctx, retry)
  124. }
  125. if len(againSns) > 0 {
  126. s.addRetrySns(againSns)
  127. }
  128. time.Sleep(time.Duration(s.c.Cfg.SyncRetry.RetryFre))
  129. }
  130. }
  131. // retryLimit limits the retry times
  132. func (s *Service) retryLimit(isSn bool, id int64) bool {
  133. var req = &common.SyncRetry{}
  134. if isSn {
  135. req.FromSn(0, id)
  136. } else {
  137. req.FromEp(0, id)
  138. }
  139. retryTms, err := s.dao.GetRetry(ctx, req)
  140. if err != nil {
  141. log.Error("GetRetry Req %s, Err %v", req.MCKey(), err)
  142. return true
  143. }
  144. if retryTms > s.c.Cfg.SyncRetry.MaxRetry {
  145. log.Error("retryLimit Req %s, Retry Already %d times, stop here", req.MCKey(), retryTms)
  146. return false
  147. }
  148. s.dao.SetRetry(ctx, &common.SyncRetry{
  149. Ctype: req.Ctype,
  150. CID: req.CID,
  151. Retry: retryTms + 1,
  152. })
  153. return true
  154. }