retry.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strings"
  6. "time"
  7. "go-common/app/job/main/videoup/model/archive"
  8. "go-common/app/job/main/videoup/model/message"
  9. "go-common/app/job/main/videoup/model/redis"
  10. "go-common/library/log"
  11. )
  12. func (s *Service) syncRetry(c context.Context, aid, mid int64, action, route, content string) (err error) {
  13. retry := &redis.RetryJSON{}
  14. retry.Action = action
  15. retry.Data.Aid = aid
  16. retry.Data.Route = route
  17. retry.Data.Mid = mid
  18. retry.Data.Content = content
  19. if action == redis.ActionForVideocovers && (content == "" || strings.Contains(content, "/bfs/archive")) {
  20. return
  21. }
  22. s.redis.PushFail(c, retry)
  23. return
  24. }
  25. func (s *Service) retryproc() {
  26. defer s.wg.Done()
  27. for {
  28. if s.closed {
  29. return
  30. }
  31. var (
  32. c = context.TODO()
  33. bs []byte
  34. err error
  35. retry = &redis.RetryJSON{}
  36. )
  37. bs, err = s.redis.PopFail(c)
  38. if err != nil || bs == nil {
  39. time.Sleep(5 * time.Second)
  40. continue
  41. }
  42. msg := &redis.Retry{}
  43. if err = json.Unmarshal(bs, msg); err != nil {
  44. log.Error("json.Unretry syncmarshal(%s) error(%v)", bs, err)
  45. continue
  46. }
  47. log.Info("retry %s %s", msg.Action, bs)
  48. if err = json.Unmarshal(bs, retry); err != nil {
  49. log.Error("json.Unmarshal(%s) error(%v)", msg, err)
  50. continue
  51. }
  52. s.promRetry.Incr(msg.Action)
  53. switch msg.Action {
  54. case redis.ActionForBvcCapable:
  55. var a *archive.Archive
  56. if a, err = s.arc.Archive(c, retry.Data.Aid); err != nil {
  57. log.Error("retry bvcCapable archive(%d) error(%v)", retry.Data.Aid, err)
  58. continue
  59. }
  60. log.Info("retry aid(%d) syncBVC bvcCapable", a.Aid)
  61. s.syncBVC(c, a)
  62. case redis.ActionForSendOpenMsg:
  63. s.sendAuditMsg(c, retry.Data.Route, retry.Data.Aid)
  64. case redis.ActionForSendBblog:
  65. var a *archive.Archive
  66. dynamic := ""
  67. if a, err = s.arc.Archive(c, retry.Data.Aid); err != nil {
  68. log.Error("retry sendBblog archive(%d) error(%v)", retry.Data.Aid, err)
  69. s.syncRetry(c, retry.Data.Aid, retry.Data.Mid, redis.ActionForSendBblog, "", "")
  70. continue
  71. }
  72. if add, _ := s.arc.Addit(c, a.Aid); add != nil {
  73. dynamic = add.Dynamic
  74. }
  75. s.sendBblog(&archive.Result{Aid: a.Aid, Mid: a.Mid, Dynamic: dynamic})
  76. case redis.ActionForVideoshot:
  77. imgs := strings.Split(retry.Data.Content, ",")
  78. s.videoshotAdd(retry.Data.Aid, retry.Data.Route, imgs)
  79. case redis.ActionForVideocovers:
  80. if retry.Data.Mid > 20 {
  81. continue
  82. }
  83. var a *archive.Archive
  84. if a, err = s.arc.Archive(c, retry.Data.Aid); err != nil {
  85. s.syncRetry(c, retry.Data.Aid, retry.Data.Mid+1, redis.ActionForVideocovers, retry.Data.Content, retry.Data.Content)
  86. continue
  87. }
  88. log.Info("retryproc videocoverCopy aid(%d) old cover is(%s) retry count is(%d)", retry.Data.Aid, a.Cover, retry.Data.Mid)
  89. if strings.Index(a.Cover, "//") == 0 {
  90. a.Cover = "http:" + a.Cover
  91. }
  92. if strings.Index(a.Cover, "/bfs") == 0 && !strings.Contains(a.Cover, "bfs/archive") {
  93. a.Cover = "http://i0.hdslb.com" + a.Cover
  94. }
  95. if a == nil || a.Cover == "" || strings.Contains(a.Cover, "bfs/archive") || (!strings.HasPrefix(a.Cover, "http://") && !strings.HasPrefix(a.Cover, "https://")) {
  96. log.Error("retryproc videocoverCopy aid(%d) cover is(%s)", retry.Data.Aid, a.Cover)
  97. continue
  98. }
  99. s.videocoverCopy(retry.Data.Aid, retry.Data.Mid, a)
  100. case redis.ActionForPostFirstRound:
  101. pmsg := &message.Videoup{}
  102. if err = json.Unmarshal([]byte(retry.Data.Content), pmsg); err != nil {
  103. log.Error("retryproc postFirstRound json.Unmarshal(%s) error(%v)", retry.Data.Content, err)
  104. continue
  105. }
  106. s.sendPostFirstRound(c, retry.Data.Route, pmsg.Aid, pmsg.Filename, pmsg.AdminChange)
  107. default:
  108. log.Warn("retryproc unknown action(%s) message(%+v)", msg.Action, retry)
  109. }
  110. time.Sleep(10 * time.Millisecond)
  111. }
  112. }
  113. //QueueProc .
  114. func (s *Service) QueueProc() {
  115. defer s.wg.Done()
  116. for {
  117. if s.closed {
  118. return
  119. }
  120. var (
  121. c = context.TODO()
  122. bs []byte
  123. err error
  124. )
  125. bs, err = s.redis.PopQueue(c, message.RouteVideoshotpv)
  126. if err != nil || bs == nil {
  127. time.Sleep(5 * time.Second)
  128. continue
  129. }
  130. m := &message.BvcVideo{}
  131. if err = json.Unmarshal(bs, m); err != nil {
  132. log.Error("QueueProc json.Unmarshal(%v) error(%v)", string(bs), err)
  133. continue
  134. }
  135. log.Info("queue proc pop %+v", m)
  136. switch m.Route {
  137. case message.RouteVideoshotpv:
  138. err = s.videoshotPv(c, m)
  139. default:
  140. log.Warn("QueueProc unknown route(%s) message(%s)", m.Route, m.Route)
  141. }
  142. if err != nil {
  143. log.Error("QueueProc error(%+v)", err)
  144. }
  145. time.Sleep(10 * time.Millisecond)
  146. }
  147. }