track.go 4.9 KB


  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/dm2/model"
  7. "go-common/library/log"
  8. )
  9. func (s *Service) trackSubject(c context.Context, m *model.BinlogMsg) (err error) {
  10. nw := &model.Subject{}
  11. if err = json.Unmarshal(m.New, &nw); err != nil {
  12. log.Error("json.Unmarshal(%s) error(%v)", m.New, err)
  13. return
  14. }
  15. switch m.Action {
  16. case "insert":
  17. if err = s.dao.AddSubjectCache(c, nw); err != nil {
  18. log.Error("s.dao.AddSubjectCache(%v) error(%v)", nw, err)
  19. return
  20. }
  21. case "delete":
  22. if err = s.dao.DelSubjectCache(c, nw.Type, nw.Oid); err != nil {
  23. log.Error("s.dao.DelSubjectCahce(%v) error(%v)", nw, err)
  24. return
  25. }
  26. case "update":
  27. old := model.Subject{}
  28. if err = json.Unmarshal(m.Old, &old); err != nil {
  29. log.Error("json.Unmarshal(%s) error(%v)", m.Old, err)
  30. return
  31. }
  32. if err = s.dao.AddSubjectCache(c, nw); err != nil { // 全量缓存subject
  33. log.Error("s.dao.AddSubjectCache(%v) error(%v)", nw, err)
  34. return
  35. }
  36. if nw.Childpool != old.Childpool || nw.Maxlimit != old.Maxlimit || nw.State != old.State {
  37. // 立刻刷新全段弹幕缓存
  38. flush := &model.Flush{Oid: nw.Oid, Type: nw.Type, Force: true}
  39. s.flushDmCache(c, flush)
  40. // 立刻刷新分段弹幕缓存
  41. s.flushXMLSegCache(c, nw)
  42. }
  43. }
  44. return
  45. }
  46. func (s *Service) trackIndex(c context.Context, m *model.BinlogMsg) (err error) {
  47. if m.Action != "update" {
  48. return
  49. }
  50. dm := &model.DM{}
  51. old := &model.DM{}
  52. if err = json.Unmarshal(m.New, &dm); err != nil {
  53. log.Error("json.Unmarshal(%s) error(%v)", m.New, err)
  54. return
  55. }
  56. if err = json.Unmarshal(m.Old, &old); err != nil {
  57. log.Error("json.Unmarshal(%s) error(%v)", m.Old, err)
  58. return
  59. }
  60. s.asyncAddRecent(c, dm) // 更新up主最新1000条弹幕
  61. s.asyncAddFlushDM(c, &model.Flush{
  62. Type: dm.Type,
  63. Oid: dm.Oid,
  64. Force: true,
  65. }) // 刷新全段弹幕
  66. sub, err := s.subject(c, dm.Type, dm.Oid)
  67. if err != nil {
  68. return
  69. }
  70. p, err := s.pageinfo(c, sub.Pid, dm)
  71. if err != nil {
  72. return
  73. }
  74. if dm.NeedUpdateSpecial(old) {
  75. if err = s.specialLocationUpdate(c, dm.Type, dm.Oid); err != nil {
  76. return
  77. }
  78. }
  79. s.dao.DelIdxContentCaches(c, dm.Type, dm.Oid, dm.ID) // 删除content cache
  80. s.asyncAddFlushDMSeg(c, &model.FlushDMSeg{
  81. Type: dm.Type,
  82. Oid: dm.Oid,
  83. Force: true,
  84. Page: p,
  85. })
  86. return
  87. }
  88. func (s *Service) trackVideoup(c context.Context, aid int64) (err error) {
  89. var (
  90. retry = 5
  91. tp = model.SubTypeVideo
  92. videos []*model.Video
  93. )
  94. for i := 0; i < retry; i++ {
  95. if videos, err = s.dao.Videos(c, aid); err == nil {
  96. break
  97. }
  98. time.Sleep(time.Second)
  99. }
  100. if err != nil {
  101. log.Error("track video failed,aid(%d),error(%v)", aid, err)
  102. return
  103. }
  104. for _, v := range videos {
  105. for i := 0; i < retry; i++ {
  106. if err = s.syncVideo(c, tp, v); err == nil {
  107. break
  108. }
  109. time.Sleep(time.Second)
  110. }
  111. }
  112. return
  113. }
  114. func (s *Service) syncVideo(c context.Context, tp int32, v *model.Video) (err error) {
  115. log.Info("sync video:%+v", v)
  116. sub, err := s.dao.Subject(c, tp, v.Cid)
  117. if err != nil {
  118. return
  119. }
  120. if sub == nil {
  121. if v.XCodeState >= model.VideoXcodeHDFinish {
  122. // 生成弹幕蒙版
  123. var attr int32
  124. for _, mid := range s.maskMid {
  125. if mid == v.Mid {
  126. if err = s.dao.GenerateMask(c, v.Cid, mid, model.MaskPlatAll, model.MaskPriorityHgih, v.Aid, 0, 0); err != nil {
  127. break
  128. }
  129. attr = attr | (model.AttrYes << model.AttrSubMaskOpen)
  130. break
  131. }
  132. }
  133. if _, err = s.dao.AddSubject(c, tp, v.Cid, v.Aid, v.Mid, s.maxlimit(v.Duration), attr); err != nil {
  134. return
  135. }
  136. }
  137. } else {
  138. if sub.Mid != v.Mid {
  139. if _, err = s.dao.UpdateSubMid(c, tp, v.Cid, v.Mid); err != nil {
  140. return
  141. }
  142. if err = s.updateSubtilte(c, tp, v); err != nil {
  143. log.Error("updateSubtilte(params:%+v),error(%v)", v, err)
  144. return
  145. }
  146. }
  147. }
  148. return
  149. }
  150. func (s *Service) updateSubtilte(c context.Context, tp int32, v *model.Video) (err error) {
  151. var (
  152. subtitles []*model.Subtitle
  153. subtitle *model.Subtitle
  154. )
  155. if subtitles, err = s.dao.GetSubtitles(c, tp, v.Cid); err != nil {
  156. log.Error("updateSubtilte(params:%+v),error(%v)", v, err)
  157. return
  158. }
  159. for _, subtitle = range subtitles {
  160. subtitle.UpMid = v.Mid
  161. if err = s.dao.UpdateSubtitle(c, subtitle); err != nil {
  162. log.Error("updateSubtilte(params:%+v),error(%v)", v, err)
  163. return
  164. }
  165. s.dao.DelSubtitleCache(c, v.Cid, subtitle.ID)
  166. if subtitle.Status == model.SubtitleStatusDraft || subtitle.Status == model.SubtitleStatusToAudit {
  167. s.dao.DelSubtitleDraftCache(c, v.Cid, tp, subtitle.Mid, subtitle.Lan)
  168. }
  169. }
  170. s.dao.DelVideoSubtitleCache(c, v.Cid, tp)
  171. return
  172. }
  173. func (s *Service) maxlimit(duration int64) (limit int64) {
  174. switch {
  175. case duration == 0:
  176. limit = 1500
  177. case duration > 3600:
  178. limit = 8000
  179. case duration > 2400:
  180. limit = 6000
  181. case duration > 900:
  182. limit = 3000
  183. case duration > 600:
  184. limit = 1500
  185. case duration > 150:
  186. limit = 1000
  187. case duration > 60:
  188. limit = 500
  189. case duration > 30:
  190. limit = 300
  191. case duration <= 30:
  192. limit = 100
  193. }
  194. return
  195. }