full_refresh.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package ugc
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/app/job/main/tv/dao/app"
  7. "go-common/app/job/main/tv/model/ugc"
  8. "go-common/library/log"
  9. )
  10. const (
  11. _errSleep = 500 * time.Millisecond
  12. _succSleep = 10 * time.Millisecond
  13. )
  14. func errMid(funcName string, mid int64, err error) {
  15. log.Error("Func:[%s], Step:[%s], Mid:[%d], Err:[%v]", "fullRefresh", funcName, mid, err)
  16. }
  17. func errArcPce(funcName string, mid int64, numPce int, err error) {
  18. log.Error("Func:[%s], Step:[%s], Mid:[%d], NumPce:[%d], Err:[%v]", "fullRefresh-ArcPce", funcName, mid, numPce, err)
  19. }
  20. func infoArc(funcName string, aid int64, msg string) {
  21. log.Info("Func:[%s], Step:[%s], Aid:[%d], Msg:[%s]", "fullRefresh-ArcPce-Arc", funcName, aid, msg)
  22. time.Sleep(_errSleep)
  23. }
  24. func errArc(funcName string, aid int64, err error) {
  25. log.Error("Func:[%s], Step:[%s], Aid:[%d], Err:[%v]", "fullRefresh-ArcPce-Arc", funcName, aid, err)
  26. time.Sleep(_errSleep)
  27. }
  28. func errArcVideos(funcName string, aid int64, cids []int64, err error) {
  29. log.Error("Func:[%s], Step:[%s], Aid:[%d], Cids: [%v], Err:[%v]", "fullRefresh-ArcPce-Arc-Videos", funcName, aid, cids, err)
  30. time.Sleep(_errSleep)
  31. }
  32. func infoArcVideos(funcName string, aid int64, cids []int64, msg string) {
  33. log.Info("Func:[%s], Step:[%s], Aid:[%d], Cids: [%v], Msg:[%s]", "fullRefresh-ArcPce-Arc-Videos", funcName, aid, cids, msg)
  34. time.Sleep(_succSleep)
  35. }
  36. func (s *Service) fullRefreshproc() {
  37. for {
  38. s.fullRefresh()
  39. time.Sleep(time.Duration(s.c.UgcSync.Frequency.FullRefreshFre))
  40. }
  41. }
  42. func (s *Service) fullRefresh() {
  43. var (
  44. fullName = "fullRefresh"
  45. pagesize = s.c.UgcSync.Batch.ProducerPS
  46. begin = time.Now()
  47. totalArcs = 0
  48. treatedUp = 0
  49. totalUp = len(s.activeUps)
  50. )
  51. if totalUp == 0 {
  52. log.Error("[%s] ActiveUps Empty", fullName)
  53. return
  54. }
  55. log.Info("fullRefresh Total Uppers Len %d", totalUp)
  56. for mid := range s.activeUps {
  57. var (
  58. upArcCnt int
  59. err error
  60. )
  61. if upArcCnt, err = s.dao.UpArcsCnt(ctx, int64(mid)); err != nil {
  62. errMid("CountUpArcs", mid, err)
  63. continue
  64. }
  65. if upArcCnt == 0 {
  66. errMid("CountUpArcs", mid, fmt.Errorf("Empty Arcs"))
  67. continue
  68. }
  69. for arcPce := 0; arcPce < app.NumPce(int(upArcCnt), pagesize); arcPce++ { // travel the upper's archive by piece
  70. var upArcs []*ugc.ArcFull
  71. if upArcs, err = s.dao.PickUpArcs(ctx, int(mid), arcPce, pagesize); err != nil {
  72. errArcPce("PickUpArcs", mid, arcPce, err)
  73. continue
  74. }
  75. if len(upArcs) == 0 {
  76. errArcPce("PickUpArcs", mid, arcPce, fmt.Errorf("Empty Arcs, Stop Picking"))
  77. break
  78. }
  79. if err = s.fullArcs(ctx, upArcs); err != nil {
  80. errArcPce("FullArcs", mid, arcPce, err)
  81. }
  82. time.Sleep(time.Duration(s.c.UgcSync.Frequency.FullRefArcFre)) // pause between each archives pce treatment
  83. }
  84. treatedUp = treatedUp + 1
  85. totalArcs = totalArcs + upArcCnt
  86. log.Info("fullRefresh Total Up %d, Treated Up %d, Treated Arcs %d, Time Used %v", totalUp, treatedUp, totalArcs, time.Since(begin))
  87. }
  88. log.Info("fullRefresh Ends! Len Uppers %d, Time Used %v", len(s.activeUps), time.Since(begin))
  89. }
  90. func (s *Service) fullArcs(ctx context.Context, arcs []*ugc.ArcFull) (err error) {
  91. for _, arc := range arcs {
  92. var (
  93. arcOk, actVideos, shouldAudit bool
  94. aid = arc.AID
  95. transCids []int64
  96. arcAllow = &ugc.ArcAllow{}
  97. )
  98. if err = s.dao.SetArcCMS(ctx, &arc.ArcCMS); err != nil { // set cache
  99. errArc("SetArcCMS", aid, err) // cache error, ignore
  100. }
  101. if arc.Deleted == 1 {
  102. if actVideos, err = s.dao.ActVideos(ctx, aid); err != nil {
  103. errArc("actVideos", aid, err) // db error
  104. continue
  105. }
  106. if !actVideos {
  107. infoArc("actVideos", aid, "Arc Deleted && No Active Videos, Jump to the next")
  108. continue
  109. } else {
  110. if err = s.dao.DelVideos(ctx, aid); err != nil { // delete also the videos
  111. errArc("actVideos", aid, err)
  112. continue
  113. }
  114. infoArc("actVideos", aid, "Arc Deleted, So we delete the rest videos")
  115. }
  116. }
  117. arcAllow.FromArcFull(arc)
  118. if arcOk = s.arcAllowImport(arcAllow); !arcOk {
  119. log.Warn("[fullRefresh-ArcPce-Arc]")
  120. continue
  121. }
  122. if arcOk, transCids, err = s.transFailTreat(ctx, aid); err != nil {
  123. errArcVideos("TransFailVideos-DelVideos", aid, transCids, err) // db error
  124. continue
  125. }
  126. if !arcOk {
  127. continue
  128. }
  129. if shouldAudit, err = s.dao.ShouldAudit(ctx, aid); err != nil {
  130. errArc("ShouldAudit", aid, err)
  131. continue
  132. }
  133. if shouldAudit {
  134. log.Info("fullRefresh addAudCid cAid %d", aid)
  135. s.audAidCh <- []int64{aid} // add aid into channel to treat
  136. }
  137. if err = s.refArcVideo(ctx, aid); err != nil {
  138. errArc("refArcVideo", aid, err)
  139. continue
  140. }
  141. time.Sleep(10 * time.Millisecond)
  142. }
  143. return
  144. }
  145. func (s *Service) transFailTreat(ctx context.Context, aid int64) (arcOk bool, failCids []int64, err error) {
  146. arcOk = true
  147. if failCids, err = s.dao.TransFailVideos(ctx, aid); err != nil { // delete transcoding failed cids
  148. errArc("TransFailVideos", aid, err) // db error, stop this archive here
  149. return
  150. }
  151. if len(failCids) == 0 {
  152. // infoArcVideos("TransFailVideos", aid, failCids, "No Fail Cids")
  153. return
  154. }
  155. if arcOk, err = s.dao.DelVideoArc(ctx, &ugc.DelVideos{
  156. AID: aid,
  157. CIDs: failCids,
  158. }); err != nil {
  159. return
  160. }
  161. if !arcOk {
  162. infoArcVideos("TransFailVideos", aid, failCids, " Delete Videos & Arc succ")
  163. return
  164. }
  165. infoArcVideos("TransFailVideos", aid, failCids, " Delete Videos succ")
  166. return
  167. }
  168. func (s *Service) refArcVideo(ctx context.Context, cAid int64) (err error) {
  169. var (
  170. proName = "videoProducer-video"
  171. pagesize = s.c.UgcSync.Batch.ProducerPS
  172. videoCnt int
  173. maxID = 0
  174. )
  175. if videoCnt, err = s.dao.ArcVideoCnt(ctx, cAid); err != nil {
  176. log.Error("[%s] CountArcs Aid %d, error [%v]", proName, cAid, err)
  177. return
  178. }
  179. if videoCnt == 0 {
  180. return
  181. }
  182. nbPiece := app.NumPce(videoCnt, pagesize)
  183. log.Info("[%s] NumPiece %d, Pagesize %d", proName, nbPiece, pagesize)
  184. for i := 0; i < nbPiece; i++ {
  185. videos, newMaxID, errR := s.dao.PickArcVideo(ctx, cAid, maxID, pagesize)
  186. if errR != nil {
  187. log.Error("[%s] Pick Piece %d Error, Ignore it", proName, i)
  188. continue
  189. }
  190. if newMaxID <= maxID {
  191. log.Error("[%s] MaxID is not increasing! [%d,%d]", proName, newMaxID, maxID)
  192. return
  193. }
  194. maxID = newMaxID
  195. for _, v := range videos {
  196. s.dao.SetVideoCMS(ctx, v)
  197. }
  198. time.Sleep(500 * time.Millisecond)
  199. }
  200. return
  201. }