binlogv.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/job/bbq/video/model"
  7. "go-common/app/service/bbq/common"
  8. topic "go-common/app/service/bbq/topic/api"
  9. "go-common/library/log"
  10. "strconv"
  11. )
  12. // videoConsumeproc 视频表消费
  13. func (s *Service) videoBinlogSub() {
  14. var msgs = s.videoSub.Messages()
  15. for {
  16. var err error
  17. msg, ok := <-msgs
  18. if !ok {
  19. log.Info("userCanal databus Consumer exit")
  20. return
  21. }
  22. res := &model.DatabusRes{}
  23. log.Infov(context.Background(), log.KV("log", fmt.Sprintf("canal message %s", string(msg.Value))))
  24. if err = json.Unmarshal(msg.Value, &res); err != nil {
  25. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  26. msg.Commit()
  27. continue
  28. }
  29. if res.Table != "video" || (res.Action != "update" && res.Action != "insert") {
  30. msg.Commit()
  31. continue
  32. }
  33. var vNew, vOld *model.VideoRaw
  34. if res.Action == "insert" || res.Action == "update" {
  35. if err = json.Unmarshal(res.New, &vNew); err != nil {
  36. log.Error("video unmarshal err(%v) data[%s]", err, string(res.New))
  37. continue
  38. }
  39. }
  40. if res.Action == "update" {
  41. if err = json.Unmarshal(res.Old, &vOld); err != nil {
  42. log.Error("video unmarshal err(%v) data[%s]", err, string(res.Old))
  43. continue
  44. }
  45. }
  46. //idempotent consume
  47. for i := 0; i < _retryTimes; i++ {
  48. //fetured video state subscription
  49. if err = s.VideoStateSub(vNew, vOld); err == nil {
  50. break
  51. }
  52. }
  53. //s.UpdateCms(context.Background(), vNew)
  54. //register comment
  55. for i := 0; i < _retryTimes; i++ {
  56. //merge related information subscription
  57. if err = s.CommentReg(context.Background(), vNew.SVID, model.StateActive); err == nil {
  58. break
  59. }
  60. }
  61. if res.Action == "insert" {
  62. for i := 0; i < _retryTimes; i++ {
  63. log.V(1).Infow(context.Background(), "log", "merge up info", "retry_time", i, "mid", vNew.MID, "svid", vNew.SVID)
  64. //merge related information subscription
  65. if err = s.MergeUpInfoSub(vNew); err == nil {
  66. break
  67. }
  68. }
  69. }
  70. //unidempotent consume
  71. if res.Action == "update" {
  72. s.UpdateStaInfoSub(vNew, vOld)
  73. } else if res.Action == "insert" {
  74. s.AddSVTotal(vNew)
  75. }
  76. msg.Commit()
  77. }
  78. }
  79. //UpdateCms ..
  80. func (s *Service) UpdateCms(c context.Context, vNew *model.VideoRaw) (err error) {
  81. if err = s.dao.UpdateCms(c, vNew); err != nil {
  82. log.Warnw(c, "event", fmt.Sprintf("updateCms err:%v,param:%v", err, vNew))
  83. }
  84. return
  85. }
  86. // VideoStateSub 视频状态变更消费
  87. func (s *Service) VideoStateSub(vNew *model.VideoRaw, vOld *model.VideoRaw) (err error) {
  88. log.Infow(context.Background(), "log", "one video state sub", "svid", vNew.SVID)
  89. s.SaveVideo2ES(strconv.Itoa(int(vNew.SVID)))
  90. if vOld == nil || vNew.State != vOld.State {
  91. var ids []int64
  92. ids, err = s.dao.GetRecallOpVideo(context.Background())
  93. if err != nil {
  94. log.Warnw(context.Background(), "log", "get recall op video fail")
  95. return
  96. }
  97. needSetRecallOpVideo := true
  98. if vNew.State == _selection {
  99. for _, id := range ids {
  100. if id == vNew.SVID {
  101. needSetRecallOpVideo = false
  102. break
  103. }
  104. }
  105. ids = append(ids, vNew.SVID)
  106. } else if vNew.State != _selection {
  107. index := -1
  108. for i, id := range ids {
  109. if id == vNew.SVID {
  110. index = i
  111. break
  112. }
  113. }
  114. if index != -1 {
  115. ids = append(ids[:index], ids[index+1:]...)
  116. } else {
  117. needSetRecallOpVideo = false
  118. }
  119. }
  120. if needSetRecallOpVideo {
  121. if err = s.dao.SetRecallOpVideo(context.Background(), ids); err != nil {
  122. log.Warnw(context.Background(), "log", "get recall op video fail")
  123. return
  124. }
  125. }
  126. }
  127. // 话题状态变更
  128. needUpdateTopicVideoState := false
  129. topicState := topic.TopicVideoStateUnAvailable
  130. if vOld == nil {
  131. needUpdateTopicVideoState = true
  132. // update topic video
  133. if common.IsTopicSvStateAvailable(int64(vNew.State)) {
  134. topicState = topic.TopicVideoStateAvailable
  135. }
  136. } else {
  137. if common.IsTopicSvStateAvailable(int64(vNew.State)) != common.IsTopicSvStateAvailable(int64(vOld.State)) {
  138. needUpdateTopicVideoState = true
  139. if common.IsTopicSvStateAvailable(int64(vNew.State)) {
  140. topicState = topic.TopicVideoStateAvailable
  141. }
  142. }
  143. }
  144. if needUpdateTopicVideoState {
  145. _, err = s.topicClient.UpdateVideoState(context.Background(), &topic.UpdateVideoStateReq{Svid: vNew.SVID, State: int32(topicState)})
  146. log.Infow(context.Background(), "log", "update topic video state", "svid", vNew.SVID, "new_state", vNew.State)
  147. if err != nil {
  148. log.Warnw(context.Background(), "log", "update topic video state", "svid", vNew.SVID, "new_state", vNew.State)
  149. return
  150. }
  151. }
  152. return
  153. }
  154. //MergeUpInfoSub ..
  155. func (s *Service) MergeUpInfoSub(vNew *model.VideoRaw) (err error) {
  156. mid := vNew.MID
  157. if err = s.dao.MergeUpInfo(mid); err != nil {
  158. log.Error("MergeUpInfo failed,err:%v,mid:%d", err, mid)
  159. }
  160. return
  161. }
  162. //UpdateStaInfoSub ...
  163. func (s *Service) UpdateStaInfoSub(vNew *model.VideoRaw, vOld *model.VideoRaw) {
  164. if vOld == nil || vNew.State == vOld.State {
  165. return
  166. }
  167. if vNew.State == model.VideoStInactive {
  168. s.dao.UpdateUVSt(vNew.MID, "unshelf_av_total")
  169. } else if vNew.State == model.VideoStDeleted {
  170. s.dao.UpdateUVStDel(vNew.MID, "av_total")
  171. }
  172. if vOld.State == model.VideoStInactive {
  173. s.dao.UpdateUVStDel(vNew.MID, "unshelf_av_total")
  174. } else if vNew.State == model.VideoStDeleted {
  175. s.dao.UpdateUVSt(vOld.MID, "av_total")
  176. }
  177. }
  178. //AddSVTotal ...
  179. func (s *Service) AddSVTotal(vNew *model.VideoRaw) {
  180. s.dao.AddSVTotal(vNew.MID)
  181. }