service.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package pgc
  2. import (
  3. "context"
  4. "math/rand"
  5. "sync"
  6. "time"
  7. "go-common/app/job/main/tv/conf"
  8. "go-common/app/job/main/tv/dao/app"
  9. "go-common/app/job/main/tv/dao/cms"
  10. "go-common/app/job/main/tv/dao/lic"
  11. playdao "go-common/app/job/main/tv/dao/playurl"
  12. model "go-common/app/job/main/tv/model/pgc"
  13. "go-common/library/log"
  14. "go-common/library/queue/databus"
  15. "go-common/app/job/main/tv/dao/ftp"
  16. "github.com/robfig/cron"
  17. )
  18. var ctx = context.Background()
  19. // Service struct of service.
  20. type Service struct {
  21. dao *app.Dao
  22. daoClosed bool // logic close the dao's DB
  23. playurlDao *playdao.Dao
  24. licDao *lic.Dao
  25. ftpDao *ftp.Dao
  26. cmsDao *cms.Dao
  27. c *conf.Config
  28. waiter *sync.WaitGroup // general waiter
  29. waiterConsumer *sync.WaitGroup
  30. contentSub *databus.Databus // consumer for state change
  31. cron *cron.Cron
  32. ResuEps []*model.Content
  33. ResuSns []*model.TVEpSeason
  34. resuRetry map[string]int
  35. }
  36. // New create service instance and return.
  37. func New(c *conf.Config) (s *Service) {
  38. s = &Service{
  39. c: c,
  40. dao: app.New(c),
  41. playurlDao: playdao.New(c),
  42. licDao: lic.New(c),
  43. ftpDao: ftp.New(c),
  44. cmsDao: cms.New(c),
  45. daoClosed: false,
  46. waiter: new(sync.WaitGroup),
  47. waiterConsumer: new(sync.WaitGroup),
  48. contentSub: databus.New(c.ContentSub),
  49. cron: cron.New(),
  50. resuRetry: make(map[string]int),
  51. }
  52. rand.Seed(time.Now().UnixNano())
  53. // flush Redis - zone list
  54. go s.ZoneIdx()
  55. if err := s.cron.AddFunc(s.c.Redis.CronPGC, s.ZoneIdx); err != nil {
  56. panic(err)
  57. }
  58. if err := s.cron.AddFunc(s.c.PlayControl.ProducerCron, s.refreshCache); err != nil {
  59. panic(err)
  60. }
  61. if err := s.cron.AddFunc(s.c.Cfg.Merak.Cron, s.cmsShelve); err != nil {
  62. panic(err)
  63. }
  64. s.cron.Start()
  65. go s.searchSugproc() // uploads the passed season's list to search sug's FTP
  66. go s.seaPgcContproc() // uploads pgc search content to sug's FTP
  67. s.waiter.Add(1)
  68. go s.syncEPs()
  69. s.waiter.Add(1)
  70. go s.resubEps()
  71. s.waiter.Add(1)
  72. go s.resubSns()
  73. s.waiter.Add(1)
  74. go s.syncSeason()
  75. s.waiter.Add(1)
  76. go s.delSeason()
  77. s.waiter.Add(1)
  78. go s.delCont()
  79. // Databus
  80. s.waiterConsumer.Add(1)
  81. go s.consumeContent() // consume Databus Message to update MC
  82. return
  83. }
  84. // Close dao.
  85. func (s *Service) Close() {
  86. if s.dao != nil {
  87. s.daoClosed = true
  88. log.Info("Dao Closed!")
  89. }
  90. log.Info("Crontab Closed!")
  91. s.cron.Stop()
  92. log.Info("Databus Closed!")
  93. s.contentSub.Close()
  94. log.Info("Wait Producer!")
  95. s.waiter.Wait()
  96. log.Info("Wait SyncMC Consumers")
  97. s.waiterConsumer.Wait()
  98. log.Info("Physical Dao Closed!")
  99. s.dao.Close()
  100. log.Info("tv-job has been closed.")
  101. }