databus.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. package ugc
  2. import (
  3. "context"
  4. "encoding/json"
  5. appDao "go-common/app/job/main/tv/dao/app"
  6. ugcmdl "go-common/app/job/main/tv/model/ugc"
  7. arcmdl "go-common/app/service/main/archive/api"
  8. "go-common/library/database/sql"
  9. "go-common/library/log"
  10. )
  11. const (
  12. _tableArchive = "archive"
  13. _updateAction = "update"
  14. _insertAction = "insert"
  15. _deleted = 1
  16. )
  17. // arcConsumeproc consumer archive
  18. func (s *Service) arcConsumeproc() {
  19. var err error
  20. defer s.waiter.Done()
  21. for {
  22. msg, ok := <-s.archiveNotifySub.Messages()
  23. if !ok {
  24. log.Info("arc databus Consumer exit")
  25. break
  26. }
  27. var ms = &ugcmdl.ArcMsg{}
  28. log.Info("arcConsumeproc New message: %s", msg)
  29. if err = json.Unmarshal(msg.Value, ms); err != nil {
  30. msg.Commit()
  31. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  32. continue
  33. }
  34. switch ms.Table {
  35. case _tableArchive:
  36. s.ArcHandle(ms)
  37. }
  38. msg.Commit()
  39. }
  40. }
  41. // ArcHandle treats the archive notify-T message to update the DB if there is any change
  42. func (s *Service) ArcHandle(arcMsg *ugcmdl.ArcMsg) {
  43. var (
  44. msgMID = arcMsg.New.Mid
  45. trustUp = false
  46. )
  47. if len(s.activeUps) > 0 { // check whether it's our trust upper
  48. if _, ok := s.activeUps[msgMID]; ok {
  49. trustUp = true
  50. }
  51. } else { // when the memory is not ready, check upper from DB
  52. mid, _ := s.dao.UpInList(ctx, arcMsg.New.Mid)
  53. trustUp = mid > 0
  54. }
  55. if !trustUp { // if it's not our trust upper, ignore the message except the archive was imported manually
  56. if arcMsg.Action == _updateAction && s.arcExist(arcMsg.New.Aid) { // if it's update and the archive exists ( added manually ), we allow it
  57. s.arcUpdate(arcMsg.Old, arcMsg.New)
  58. return
  59. }
  60. log.Info("Message Aid %d, Mid %d, Not in List, Ignore", arcMsg.New.Aid, arcMsg.New.Mid)
  61. appDao.PromInfo("DsInsert:Ignore")
  62. return
  63. }
  64. // arc update
  65. if arcMsg.Action == _updateAction {
  66. s.arcUpdate(arcMsg.Old, arcMsg.New)
  67. }
  68. // arc insert
  69. if arcMsg.Action == _insertAction {
  70. s.arcInsert(arcMsg.New)
  71. }
  72. }
  73. // distinguishes whether an arc exist
  74. func (s *Service) arcExist(aid int64) bool {
  75. var (
  76. res *ugcmdl.Archive
  77. err error
  78. )
  79. if res, err = s.dao.ParseArc(ctx, aid); err != nil || res == nil {
  80. return false
  81. }
  82. if res.Deleted == _deleted {
  83. return false
  84. }
  85. return true
  86. }
  87. // arcInsert inserts a new databus notified archive
  88. func (s *Service) arcInsert(arc *ugcmdl.ArchDatabus) (err error) {
  89. if exist := s.arcExist(arc.Aid); exist {
  90. appDao.PromError("DsInsert:Exist")
  91. log.Error("Databus Insert Data Aid %d Exist", arc.Aid)
  92. return
  93. }
  94. if err = s.importArc(context.Background(), arc.Aid, false); err != nil {
  95. appDao.PromError("DsInsert:Err")
  96. log.Error("Databus Import Arc %d Error %v", arc.Aid, err)
  97. return
  98. }
  99. appDao.PromInfo("DsInsert:Succ")
  100. return
  101. }
  102. // arcUpdate updates a databus notified archive
  103. func (s *Service) arcUpdate(old *ugcmdl.ArchDatabus, new *ugcmdl.ArchDatabus) (err error) {
  104. if !s.arcExist(new.Aid) { // if an archive is not existing yet in our DB, we insert it
  105. return s.arcInsert(new)
  106. }
  107. new.Cover = s.coverURL(new.Cover, s.c.UgcSync.Cfg.BFSPrefix)
  108. var (
  109. oldAllow = &ugcmdl.ArcAllow{}
  110. newAllow = &ugcmdl.ArcAllow{}
  111. )
  112. oldAllow.FromDatabus(old)
  113. newAllow.FromDatabus(new)
  114. if !oldAllow.CanPlay() && newAllow.CanPlay() { // if an archive is recovered, re-insert it
  115. log.Info("Aid %d is recovered, add it", new.Aid)
  116. return s.arcInsert(new)
  117. }
  118. if oldAllow.CanPlay() && !newAllow.CanPlay() { // if an archive is banned, delete it
  119. log.Info("Aid %d can't play, delete it", new.Aid)
  120. if err = s.dao.UpdateArc(ctx, new); err != nil {
  121. return
  122. }
  123. return s.delArc(new.Aid)
  124. }
  125. // if arc level changed or video level changed, treat and import data
  126. return s.arcCompare(old, new)
  127. }
  128. // arcCompare compares the archive & the videos of the old and the new, to update if needed
  129. func (s *Service) arcCompare(old *ugcmdl.ArchDatabus, new *ugcmdl.ArchDatabus) (err error) {
  130. var (
  131. diff *ugcmdl.VideoDiff
  132. hitPGC bool
  133. )
  134. if hitPGC, err = s.delPGC(new.TypeID, new.Aid); err != nil {
  135. return
  136. }
  137. if hitPGC { // if the archive hits PGC types, delete it
  138. log.Warn("arcCompare Del Aid %d, Because of its typeID %d", new.Aid, new.TypeID)
  139. return
  140. }
  141. if s.diffArc(old, new) { // archive level info update if different
  142. if err = s.dao.UpdateArc(ctx, new); err != nil {
  143. appDao.PromError("DsUpdArc:Err")
  144. return
  145. }
  146. s.modArcCh <- []int64{new.Aid} // add one archive to submit
  147. appDao.PromInfo("DsUpdArc:Succ")
  148. }
  149. // video level info update if different
  150. if diff, err = s.diffVideos(new.Aid); err != nil {
  151. appDao.PromError("DsUpdVideo:Err")
  152. return
  153. }
  154. log.Info("Diff Result For Aid %d, Equal %v, Updated %v, Removed %v, New %v", new.Aid, diff.Equal, diff.Updated, diff.Removed, diff.New)
  155. if err = s.treatDiffV(diff); err != nil {
  156. appDao.PromError("DsUpdVideo:Err")
  157. return
  158. }
  159. appDao.PromInfo("DsUpdVideo:Succ")
  160. return
  161. }
  162. // get first level of types name
  163. func (s *Service) getPTypeName(typeID int32) (name string) {
  164. var (
  165. second, first *arcmdl.Tp
  166. ok bool
  167. )
  168. if second, ok = s.arcTypes[typeID]; !ok {
  169. log.Error("can't find type for ID: %d ", typeID)
  170. return
  171. }
  172. if first, ok = s.arcTypes[second.Pid]; !ok {
  173. log.Error("can't find type for ID: %d, second Info: %v", second, second.Pid)
  174. return
  175. }
  176. return first.Name
  177. }
  178. // getPType first level of types name
  179. func (s *Service) getPType(typeID int) (pid int) {
  180. var (
  181. second *arcmdl.Tp
  182. ok bool
  183. )
  184. if second, ok = s.arcTypes[int32(typeID)]; !ok {
  185. log.Error("can't find type for ID: %d ", typeID)
  186. return
  187. }
  188. return int(second.Pid)
  189. }
  190. // diffArc distinguishes whether the key fields of an archive have been changed
  191. func (s *Service) diffArc(old *ugcmdl.ArchDatabus, new *ugcmdl.ArchDatabus) (diff bool) {
  192. diff = (old.Title != new.Title)
  193. diff = diff || (old.Content != new.Content)
  194. diff = diff || (old.PubTime != new.PubTime)
  195. diff = diff || (old.TypeID != new.TypeID)
  196. diff = diff || (old.Cover != new.Cover)
  197. diff = diff || (s.getPTypeName(old.TypeID) != s.getPTypeName(new.TypeID))
  198. return
  199. }
  200. // diffVideos distinguishes whethe
  201. func (s *Service) diffVideos(aid int64) (diff *ugcmdl.VideoDiff, err error) {
  202. var (
  203. rpcRes *arcmdl.ViewReply
  204. dbRes map[int64]*ugcmdl.SimpleVideo
  205. video *ugcmdl.SimpleVideo
  206. ok bool
  207. )
  208. diff = &ugcmdl.VideoDiff{
  209. Aid: aid,
  210. }
  211. if rpcRes, err = s.videoPick(ctx, aid); err != nil {
  212. log.Error("rpc video pick %d, error %v", aid, err)
  213. return
  214. }
  215. if dbRes, err = s.dao.PickVideos(ctx, aid); err != nil {
  216. log.Error("db video pick %d, error %v", aid, err)
  217. return
  218. }
  219. for _, page := range rpcRes.Pages {
  220. if video, ok = dbRes[page.Cid]; !ok { // not found in DB, means it's new
  221. diff.New = append(diff.New, page.Cid)
  222. continue
  223. }
  224. if video.IndexOrder == int64(page.Page) && video.Eptitle == page.Part { // if title & index_order equal
  225. diff.Equal = append(diff.Equal, page.Cid)
  226. } else { // otherwise it's updated
  227. diff.Updated = append(diff.Updated, page)
  228. }
  229. delete(dbRes, page.Cid)
  230. }
  231. for _, v := range dbRes {
  232. diff.Removed = append(diff.Removed, v.CID)
  233. }
  234. return
  235. }
  236. // treatDiffV treats the result of diffVideos, like we add new ones, we deleted removed ones, and we updated the modified ones
  237. func (s *Service) treatDiffV(diff *ugcmdl.VideoDiff) (err error) {
  238. var (
  239. newPages []*arcmdl.Page
  240. page *arcmdl.Page
  241. aid = diff.Aid
  242. tx *sql.Tx
  243. )
  244. // all the operations about this archive's videos, will be in one transaction
  245. if tx, err = s.dao.BeginTran(ctx); err != nil {
  246. log.Error("BeginTran Error %v", err)
  247. return
  248. }
  249. // add new videos
  250. if len(diff.New) > 0 {
  251. for _, v := range diff.New {
  252. if page, err = s.pagePick(ctx, v, aid, ""); err != nil {
  253. continue
  254. }
  255. newPages = append(newPages, page)
  256. }
  257. if err = s.dao.TxAddVideos(tx, newPages, aid); err != nil {
  258. tx.Rollback()
  259. return
  260. }
  261. }
  262. if len(diff.Removed) > 0 {
  263. for _, v := range diff.Removed {
  264. if err = s.dao.TxDelVideo(tx, v); err != nil {
  265. tx.Rollback()
  266. return
  267. }
  268. }
  269. }
  270. if len(diff.Updated) > 0 {
  271. for _, v := range diff.Updated {
  272. if err = s.dao.TxUpdateVideo(tx, v); err != nil {
  273. tx.Rollback()
  274. return
  275. }
  276. }
  277. }
  278. tx.Commit()
  279. return
  280. }