sync_ep.go 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package pgc
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/job/main/tv/dao/lic"
  6. model "go-common/app/job/main/tv/model/pgc"
  7. "go-common/library/ecode"
  8. "go-common/library/log"
  9. )
  10. // pick the data from DB to audit and combine the XML for the license owner
  11. // producer, content data => channel
  12. func (s *Service) syncEPs() {
  13. defer s.waiter.Done()
  14. for {
  15. if s.daoClosed {
  16. log.Info("syncEPs DB closed!")
  17. return
  18. }
  19. readySids, err := s.dao.ReadySns(ctx)
  20. if err != nil || len(readySids) == 0 {
  21. time.Sleep(time.Duration(s.c.Sync.Frequency.ErrorWait))
  22. continue
  23. }
  24. for _, sid := range readySids {
  25. var contSlices [][]*model.Content
  26. if contSlices, err = s.dao.PickData(ctx, sid); err != nil || len(contSlices) == 0 {
  27. continue
  28. }
  29. for _, conts := range contSlices {
  30. if err = s.epsSync(sid, conts); err != nil {
  31. s.addRetryEps(conts)
  32. }
  33. s.dao.AuditingCont(ctx, conts) // update status to auditing
  34. }
  35. }
  36. time.Sleep(1 * time.Second)
  37. }
  38. }
  39. func (s *Service) epsSync(sid int64, conts []*model.Content) (err error) {
  40. var reqCall = &model.ReqEpLicCall{
  41. SID: sid,
  42. Conts: conts,
  43. }
  44. if reqCall.EpLic, err = s.epLicCreate(ctx, sid, conts); err != nil {
  45. return
  46. }
  47. return s.epLicCall(ctx, reqCall)
  48. }
  49. // epLicCreate picks the sid and conts to create the license model
  50. func (s *Service) epLicCreate(ctx context.Context, sid int64, conts []*model.Content) (epLic *model.License, err error) {
  51. var (
  52. season *model.TVEpSeason
  53. prefix = s.c.Sync.AuditPrefix
  54. programs []*model.Program
  55. )
  56. if season, err = s.dao.Season(ctx, int(sid)); err != nil {
  57. log.Error("Season ID %d, Err %v", sid, err)
  58. return
  59. }
  60. epLic = newLic(season, s.c.Sync)
  61. epLic.XMLData.Service.Head.Count = len(conts)
  62. for _, v := range conts {
  63. s.dao.WaitCall(ctx, v.EPID) // avoid always selecting the same data, give time to the caller
  64. url, _, errPlay := s.playurlDao.Playurl(ctx, v.CID)
  65. if errPlay != nil {
  66. log.Error("syncEPs EP Playurl EPID = %d, Error: %v", v.EPID, errPlay)
  67. s.addRetryEp(v)
  68. continue
  69. }
  70. ep, errEP := s.dao.EP(ctx, v.EPID)
  71. if errEP != nil {
  72. log.Error("EpContent EPID %d Can't found", v.EPID)
  73. continue
  74. }
  75. program := model.CreateProgram(prefix, ep)
  76. program.ProgramMediaList = &model.PMList{
  77. ProgramMedia: []*model.PMedia{model.CreatePMedia(s.c.Sync.AuditPrefix, v.EPID, url)},
  78. }
  79. programs = append(programs, program)
  80. }
  81. epLic.XMLData.Service.Body.ProgramSetList.ProgramSet[0].ProgramList.Program = programs
  82. return
  83. }
  84. // epLicCall picks the license and sync to audit
  85. func (s *Service) epLicCall(ctx context.Context, req *model.ReqEpLicCall) (err error) {
  86. var cfg = s.c.Sync
  87. res, err := s.licDao.CallRetry(ctx, cfg.API.AddURL, lic.PrepareXML(req.EpLic))
  88. if res == nil {
  89. err = ecode.TvSyncErr
  90. }
  91. return
  92. }