123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- package ugc
- import (
- "encoding/json"
- "time"
- model "go-common/app/job/main/tv/model/pgc"
- ugcmdl "go-common/app/job/main/tv/model/ugc"
- "go-common/library/log"
- "go-common/library/queue/databus"
- )
- func (s *Service) repCidproc() {
- defer s.waiter.Done()
- var toRepCids []int64
- for {
- cid, ok := <-s.repCidCh
- if !ok {
- log.Warn("[repCidproc] channel quit")
- return
- }
- toRepCids = append(toRepCids, cid)
- if len(toRepCids) < s.c.UgcSync.Batch.ReportCidPS { // not enough cid, stay waiting
- time.Sleep(5 * time.Second)
- continue
- }
- goCids := make([]int64, len(toRepCids))
- copy(goCids, toRepCids)
- toRepCids = []int64{}
- if err := s.reportCids(goCids); err != nil {
- log.Error("reportCids Cids %v, Err %v", goCids, err)
- continue
- }
- }
- }
- func (s *Service) audCidproc() {
- defer s.waiter.Done()
- var (
- toAudAids = make(map[int64]int)
- ps = s.c.UgcSync.Batch.ReportCidPS
- )
- for {
- aids, ok := <-s.audAidCh
- if !ok {
- log.Warn("[audCidproc] channel quit")
- return
- }
- for _, aid := range aids {
- toAudAids[aid] = 1 // use map to remove duplicated aids
- }
- if len(toAudAids) < ps { // not enough cid, stay waiting
- time.Sleep(3 * time.Second)
- continue
- }
- distinctAIDs := pickKeys(toAudAids)
- toAudAids = make(map[int64]int)
- if err := s.wrapSyncLic(ctx, distinctAIDs); err != nil {
- log.Error("audCidproc Aids %v, Err %v", distinctAIDs, err)
- continue
- }
- log.Info("audCidproc Apply %d Aids: %v", len(aids), distinctAIDs)
- }
- }
- func (s *Service) reshelfArcproc() {
- defer s.waiter.Done()
- var (
- reshelfAids = make(map[int64]int)
- ps = s.c.UgcSync.Batch.ReshelfPS
- )
- for {
- aid, ok := <-s.reshelfAidCh
- if !ok {
- log.Warn("[reshelfAid] channel quit")
- return
- }
- reshelfAids[aid] = 1 // use map to remove duplicated aids
- if len(reshelfAids) < ps { // not enough cid, stay waiting
- time.Sleep(3 * time.Second)
- continue
- }
- distinctAIDs := pickKeys(reshelfAids)
- reshelfAids = make(map[int64]int)
- if offAids, err := s.cmsDao.OffArcs(ctx, distinctAIDs); err != nil {
- log.Error("reshelfAid OffArcs Aids %v, Err %v", distinctAIDs, err)
- continue
- } else if len(offAids) == 0 {
- log.Warn("reshelfAid OffArcs Origin Aids %v, after filter it's empty", distinctAIDs)
- continue
- } else {
- if err = s.cmsDao.ReshelfArcs(ctx, offAids); err != nil {
- log.Error("reshelfAid OffAids %v, ReshelfArcs Err %v", offAids, err)
- continue
- }
- log.Info("reshelfAid Apply %d Aids: %v", len(offAids), offAids)
- }
- }
- }
- func (s *Service) reportCids(cids []int64) (err error) {
- var cidReq []*ugcmdl.CidReq
- for _, v := range cids {
- cidReq = append(cidReq, &ugcmdl.CidReq{CID: v})
- }
- for i := 0; i < _apiRetry; i++ {
- if err = s.dao.RepCidBatch(ctx, cidReq); err == nil {
- break
- }
- }
- if err != nil { // 3 times still error
- log.Error("ReportCid Cids %v Err %v", cids, err)
- return
- }
- err = s.dao.FinishReport(ctx, cids)
- log.Info("ReportCids %v, Len %d, Succ!", cids, len(cids))
- return
- }
- // consume Databus message; beause daily modification is not many, so use simple loop
- func (s *Service) consumeVideo() {
- defer s.waiter.Done()
- for {
- msg, ok := <-s.ugcSub.Messages()
- if !ok {
- log.Info("databus: tv-job video consumer exit!")
- return
- }
- msg.Commit()
- time.Sleep(1 * time.Millisecond)
- Loop:
- for {
- select {
- case s.consumerLimit <- struct{}{}: // would block if already 2 goroutines:
- go s.UgcDbus(msg)
- break Loop
- default:
- log.Warn("consumeVideo thread Full!!!")
- time.Sleep(1 * time.Second)
- }
- }
- }
- }
- // UgcDbus def.
- func (s *Service) UgcDbus(msg *databus.Message) {
- m := &model.DatabusRes{}
- log.Info("[consumeVideo] New Message: %s", msg)
- if err := json.Unmarshal(msg.Value, m); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
- <-s.consumerLimit // clean the space for new consumer to begin
- return
- }
- if m.Action == "delete" {
- log.Info("[consumeVideo] Video Deletion, We ignore:<%v>,<%v>", m, msg.Value)
- <-s.consumerLimit // clean the space for new consumer to begin
- return
- }
- if m.Table == "ugc_video" {
- s.videoDatabus(msg.Value)
- } else if m.Table == "ugc_archive" {
- s.arcDatabus(msg.Value)
- } else {
- log.Error("[consumeVideo] Wrong Table Name: ", m.Table)
- }
- <-s.consumerLimit // clean the space for new consumer to begin
- }
|