ugc.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package ugc
  2. import (
  3. "context"
  4. "strconv"
  5. "sync"
  6. "go-common/app/job/main/tv/conf"
  7. appDao "go-common/app/job/main/tv/dao/app"
  8. arcdao "go-common/app/job/main/tv/dao/archive"
  9. "go-common/app/job/main/tv/dao/cms"
  10. "go-common/app/job/main/tv/dao/ftp"
  11. "go-common/app/job/main/tv/dao/lic"
  12. playdao "go-common/app/job/main/tv/dao/playurl"
  13. ugcdao "go-common/app/job/main/tv/dao/ugc"
  14. updao "go-common/app/job/main/tv/dao/upper"
  15. arccli "go-common/app/service/main/archive/api"
  16. archive "go-common/app/service/main/archive/api/gorpc"
  17. "go-common/library/log"
  18. "go-common/library/queue/databus"
  19. "github.com/robfig/cron"
  20. )
  21. const _chanSize = 10240
  22. var ctx = context.TODO()
  23. // Service is show service.
  24. type Service struct {
  25. c *conf.Config
  26. // dao
  27. dao *ugcdao.Dao
  28. playurlDao *playdao.Dao
  29. licDao *lic.Dao
  30. ftpDao *ftp.Dao
  31. appDao *appDao.Dao
  32. arcDao *arcdao.Dao
  33. upDao *updao.Dao
  34. cmsDao *cms.Dao
  35. // logic
  36. daoClosed bool
  37. // waiter
  38. waiter *sync.WaitGroup
  39. consumerLimit chan struct{}
  40. // rpc
  41. arcClient arccli.ArchiveClient
  42. arcRPC *archive.Service2
  43. // databus
  44. archiveNotifySub *databus.Databus
  45. ugcSub *databus.Databus
  46. // memory data
  47. ugcTypesRel map[int32]*conf.UgcType
  48. ugcTypesCat map[int32]int32
  49. arcTypes map[int32]*arccli.Tp // map for arc types
  50. pgcTypes map[string]int // filter pgc types data
  51. activeUps map[int64]int // store all the trusted uppers
  52. // cron
  53. cron *cron.Cron
  54. // channels
  55. modArcCh, audAidCh chan []int64
  56. repCidCh, reshelfAidCh chan int64
  57. }
  58. // New inits the ugc service
  59. func New(c *conf.Config) (s *Service) {
  60. s = &Service{
  61. c: c,
  62. dao: ugcdao.New(c),
  63. playurlDao: playdao.New(c),
  64. licDao: lic.New(c),
  65. ftpDao: ftp.New(c),
  66. arcDao: arcdao.New(c),
  67. appDao: appDao.New(c),
  68. upDao: updao.New(c),
  69. cmsDao: cms.New(c),
  70. waiter: new(sync.WaitGroup),
  71. archiveNotifySub: databus.New(c.ArchiveNotifySub),
  72. ugcSub: databus.New(c.UgcSub),
  73. cron: cron.New(),
  74. arcTypes: make(map[int32]*arccli.Tp),
  75. ugcTypesRel: make(map[int32]*conf.UgcType),
  76. pgcTypes: make(map[string]int),
  77. activeUps: make(map[int64]int),
  78. consumerLimit: make(chan struct{}, c.UgcSync.Cfg.ThreadLimit),
  79. modArcCh: make(chan []int64, _chanSize),
  80. audAidCh: make(chan []int64, _chanSize),
  81. reshelfAidCh: make(chan int64, _chanSize),
  82. repCidCh: make(chan int64, _chanSize),
  83. arcRPC: archive.New2(c.ArchiveRPC),
  84. }
  85. // transform cfg to map, in order to filter pgc types archive
  86. var err error
  87. if s.arcClient, err = arccli.NewClient(c.ArcClient); err != nil {
  88. panic(err)
  89. }
  90. for _, v := range s.c.Cfg.PgcTypes {
  91. s.pgcTypes[v] = 1
  92. }
  93. for k, v := range s.c.Cfg.UgcZones { // transform cfg map
  94. s.ugcTypesRel[atoi(k)] = v
  95. }
  96. if err := s.cron.AddFunc(s.c.Redis.CronUGC, s.ZoneIdx); err != nil { // load Zone Idx & types
  97. panic(err)
  98. }
  99. if err := s.cron.AddFunc(s.c.UgcSync.Frequency.TypesCron, s.loadTypes); err != nil {
  100. panic(err)
  101. }
  102. s.cron.Start()
  103. s.loadTypes() // load types
  104. s.loadTids() // load ugc idx relationship
  105. s.refreshUp(ctx, false) // init upper list
  106. go s.ZoneIdx()
  107. s.waiter.Add(1)
  108. go s.syncUpproc() // sync modified uppers' info to license owner
  109. go s.refreshUpproc() // refresh upper info
  110. s.waiter.Add(1)
  111. go s.manualproc() // manual import videos
  112. s.waiter.Add(1)
  113. go s.upImportproc() // import upper history data
  114. go s.fullRefreshproc() // full refrsh video data
  115. s.waiter.Add(1)
  116. go s.modArcproc() // sync modified archive data
  117. s.waiter.Add(1)
  118. go s.delArcproc() // sync deleted archive data
  119. s.waiter.Add(1)
  120. go s.delVideoproc() // sync deleted video data
  121. s.waiter.Add(1)
  122. go s.delUpproc() // treat deleted uppers
  123. go s.seaUgcContproc() // uploads ugc search content to sug's FTP
  124. s.waiter.Add(1)
  125. go s.arcConsumeproc() // archive Notify-T databus
  126. s.waiter.Add(1)
  127. go s.consumeVideo() // consume video databus, report cid info
  128. s.waiter.Add(1)
  129. go s.repCidproc() // consume channel and report cid to playurl
  130. s.waiter.Add(1)
  131. go s.audCidproc() // consume audit aid data
  132. s.waiter.Add(1)
  133. go s.reshelfArcproc() // reshelf the cms invalid arcs when they have at least one video that can play now
  134. return
  135. }
  136. // Close close the services
  137. func (s *Service) Close() {
  138. if s.dao != nil {
  139. s.daoClosed = true
  140. log.Info("Dao Closed!")
  141. }
  142. log.Info("Close ArcNotifySub!")
  143. s.archiveNotifySub.Close()
  144. log.Info("Close ugcSub!")
  145. s.ugcSub.Close()
  146. log.Info("Wait Sync!")
  147. s.waiter.Wait()
  148. log.Info("DB Closed Physically!")
  149. s.dao.DB.Close()
  150. }
  151. // transform string to int
  152. func atoi(number string) int32 {
  153. res, _ := strconv.Atoi(number)
  154. return int32(res)
  155. }