report_cid.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package ugc
  2. import (
  3. "encoding/json"
  4. "time"
  5. model "go-common/app/job/main/tv/model/pgc"
  6. ugcmdl "go-common/app/job/main/tv/model/ugc"
  7. "go-common/library/log"
  8. "go-common/library/queue/databus"
  9. )
  10. func (s *Service) repCidproc() {
  11. defer s.waiter.Done()
  12. var toRepCids []int64
  13. for {
  14. cid, ok := <-s.repCidCh
  15. if !ok {
  16. log.Warn("[repCidproc] channel quit")
  17. return
  18. }
  19. toRepCids = append(toRepCids, cid)
  20. if len(toRepCids) < s.c.UgcSync.Batch.ReportCidPS { // not enough cid, stay waiting
  21. time.Sleep(5 * time.Second)
  22. continue
  23. }
  24. goCids := make([]int64, len(toRepCids))
  25. copy(goCids, toRepCids)
  26. toRepCids = []int64{}
  27. if err := s.reportCids(goCids); err != nil {
  28. log.Error("reportCids Cids %v, Err %v", goCids, err)
  29. continue
  30. }
  31. }
  32. }
  33. func (s *Service) audCidproc() {
  34. defer s.waiter.Done()
  35. var (
  36. toAudAids = make(map[int64]int)
  37. ps = s.c.UgcSync.Batch.ReportCidPS
  38. )
  39. for {
  40. aids, ok := <-s.audAidCh
  41. if !ok {
  42. log.Warn("[audCidproc] channel quit")
  43. return
  44. }
  45. for _, aid := range aids {
  46. toAudAids[aid] = 1 // use map to remove duplicated aids
  47. }
  48. if len(toAudAids) < ps { // not enough cid, stay waiting
  49. time.Sleep(3 * time.Second)
  50. continue
  51. }
  52. distinctAIDs := pickKeys(toAudAids)
  53. toAudAids = make(map[int64]int)
  54. if err := s.wrapSyncLic(ctx, distinctAIDs); err != nil {
  55. log.Error("audCidproc Aids %v, Err %v", distinctAIDs, err)
  56. continue
  57. }
  58. log.Info("audCidproc Apply %d Aids: %v", len(aids), distinctAIDs)
  59. }
  60. }
  61. func (s *Service) reshelfArcproc() {
  62. defer s.waiter.Done()
  63. var (
  64. reshelfAids = make(map[int64]int)
  65. ps = s.c.UgcSync.Batch.ReshelfPS
  66. )
  67. for {
  68. aid, ok := <-s.reshelfAidCh
  69. if !ok {
  70. log.Warn("[reshelfAid] channel quit")
  71. return
  72. }
  73. reshelfAids[aid] = 1 // use map to remove duplicated aids
  74. if len(reshelfAids) < ps { // not enough cid, stay waiting
  75. time.Sleep(3 * time.Second)
  76. continue
  77. }
  78. distinctAIDs := pickKeys(reshelfAids)
  79. reshelfAids = make(map[int64]int)
  80. if offAids, err := s.cmsDao.OffArcs(ctx, distinctAIDs); err != nil {
  81. log.Error("reshelfAid OffArcs Aids %v, Err %v", distinctAIDs, err)
  82. continue
  83. } else if len(offAids) == 0 {
  84. log.Warn("reshelfAid OffArcs Origin Aids %v, after filter it's empty", distinctAIDs)
  85. continue
  86. } else {
  87. if err = s.cmsDao.ReshelfArcs(ctx, offAids); err != nil {
  88. log.Error("reshelfAid OffAids %v, ReshelfArcs Err %v", offAids, err)
  89. continue
  90. }
  91. log.Info("reshelfAid Apply %d Aids: %v", len(offAids), offAids)
  92. }
  93. }
  94. }
  95. func (s *Service) reportCids(cids []int64) (err error) {
  96. var cidReq []*ugcmdl.CidReq
  97. for _, v := range cids {
  98. cidReq = append(cidReq, &ugcmdl.CidReq{CID: v})
  99. }
  100. for i := 0; i < _apiRetry; i++ {
  101. if err = s.dao.RepCidBatch(ctx, cidReq); err == nil {
  102. break
  103. }
  104. }
  105. if err != nil { // 3 times still error
  106. log.Error("ReportCid Cids %v Err %v", cids, err)
  107. return
  108. }
  109. err = s.dao.FinishReport(ctx, cids)
  110. log.Info("ReportCids %v, Len %d, Succ!", cids, len(cids))
  111. return
  112. }
  113. // consume Databus message; beause daily modification is not many, so use simple loop
  114. func (s *Service) consumeVideo() {
  115. defer s.waiter.Done()
  116. for {
  117. msg, ok := <-s.ugcSub.Messages()
  118. if !ok {
  119. log.Info("databus: tv-job video consumer exit!")
  120. return
  121. }
  122. msg.Commit()
  123. time.Sleep(1 * time.Millisecond)
  124. Loop:
  125. for {
  126. select {
  127. case s.consumerLimit <- struct{}{}: // would block if already 2 goroutines:
  128. go s.UgcDbus(msg)
  129. break Loop
  130. default:
  131. log.Warn("consumeVideo thread Full!!!")
  132. time.Sleep(1 * time.Second)
  133. }
  134. }
  135. }
  136. }
  137. // UgcDbus def.
  138. func (s *Service) UgcDbus(msg *databus.Message) {
  139. m := &model.DatabusRes{}
  140. log.Info("[consumeVideo] New Message: %s", msg)
  141. if err := json.Unmarshal(msg.Value, m); err != nil {
  142. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  143. <-s.consumerLimit // clean the space for new consumer to begin
  144. return
  145. }
  146. if m.Action == "delete" {
  147. log.Info("[consumeVideo] Video Deletion, We ignore:<%v>,<%v>", m, msg.Value)
  148. <-s.consumerLimit // clean the space for new consumer to begin
  149. return
  150. }
  151. if m.Table == "ugc_video" {
  152. s.videoDatabus(msg.Value)
  153. } else if m.Table == "ugc_archive" {
  154. s.arcDatabus(msg.Value)
  155. } else {
  156. log.Error("[consumeVideo] Wrong Table Name: ", m.Table)
  157. }
  158. <-s.consumerLimit // clean the space for new consumer to begin
  159. }