bvcsub.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/job/bbq/video/model"
  7. videov1 "go-common/app/service/bbq/video/api/grpc/v1"
  8. "go-common/library/log"
  9. "strings"
  10. )
  11. //BvcTransSub ...
  12. func (s *Service) BvcTransSub() {
  13. msgs := s.bvcSub.Messages()
  14. for {
  15. var (
  16. err error
  17. vr *model.VideoRepRaw
  18. )
  19. c := context.Background()
  20. msg, ok := <-msgs
  21. //release subscription
  22. if s.c.SubBvcControl.Control == 2 {
  23. msg.Commit()
  24. continue
  25. }
  26. if !ok {
  27. log.Info("BvcTransSub databus Consumer exit")
  28. return
  29. }
  30. res := &model.DatabusBVCTransSub{}
  31. log.Infov(context.Background(), log.KV("log", fmt.Sprintf("databus message %s", string(msg.Value))))
  32. if err = json.Unmarshal(msg.Value, &res); err != nil {
  33. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  34. msg.Commit()
  35. continue
  36. }
  37. if vr, err = s.dao.RawVideo(c, res.SVID); err != nil {
  38. msg.Commit()
  39. continue
  40. }
  41. //resource check
  42. if vr.SyncStatus&model.SourceXcodeCover > 0 {
  43. if err = s.importVideo(c, vr); err != nil {
  44. log.Errorw(c, "errmsg", "importVideo err", "req", vr, "err", err)
  45. msg.Commit()
  46. continue
  47. }
  48. s.syncTag(c, vr.Tag)
  49. s.dao.UpdateSyncStatus(c, vr.SVID, model.SourceOnshelf)
  50. }
  51. msg.Commit()
  52. }
  53. }
  54. //importVideo put video on shelf
  55. func (s *Service) importVideo(c context.Context, vr *model.VideoRepRaw) (err error) {
  56. // var (
  57. // st int64
  58. // )
  59. // if vr.From == model.VideoFromBILI {
  60. // st = model.VideoStPassReview
  61. // } else {
  62. // st = model.VideoStPendingPassReview
  63. // }
  64. req := &videov1.ImportVideoInfo{
  65. AVID: vr.AVID,
  66. Svid: vr.SVID,
  67. MID: vr.MID,
  68. CID: vr.CID,
  69. SubTID: vr.SubTID,
  70. TID: vr.TID,
  71. Title: vr.Title,
  72. Pubtime: vr.Pubtime,
  73. From: int64(vr.From),
  74. CoverUrl: vr.CoverURL,
  75. CoverHeight: vr.CoverHeight,
  76. CoverWidth: vr.CoverWidth,
  77. //State: st,
  78. HomeImgHeight: vr.HomeImgHeight,
  79. HomeImgUrl: vr.HomeImgURL,
  80. HomeImgWidth: vr.HomeImgWidth,
  81. }
  82. for i := 0; i < _retryTimes; i++ {
  83. if _, err = s.dao.VideoClient.ImportVideo(c, req); err == nil {
  84. break
  85. }
  86. }
  87. return
  88. }
  89. //syncTag sync video from bilibili common tag
  90. func (s *Service) syncTag(c context.Context, t string) (err error) {
  91. if t == "" {
  92. return
  93. }
  94. var (
  95. arrTag []string
  96. tag []*videov1.TagInfo
  97. )
  98. arrTag = strings.Split(t, ",")
  99. for _, v := range arrTag {
  100. tmp := &videov1.TagInfo{
  101. TagName: v,
  102. TagType: 3,
  103. }
  104. tag = append(tag, tmp)
  105. }
  106. reqTag := &videov1.SyncVideoTagRequest{
  107. TagInfos: tag,
  108. }
  109. if _, err = s.dao.VideoClient.SyncTag(c, reqTag); err != nil {
  110. log.Error("sync tag err :%v,tag:%v", err, tag)
  111. }
  112. return
  113. }