audit.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/job/main/videoup/model/archive"
  6. "go-common/app/job/main/videoup/model/message"
  7. "go-common/app/job/main/videoup/model/redis"
  8. xsql "go-common/library/database/sql"
  9. "go-common/library/log"
  10. "go-common/library/queue/databus"
  11. )
  12. // videoupConsumer is videoup message consumer.
  13. func (s *Service) videoupConsumer() {
  14. defer s.wg.Done()
  15. var (
  16. msgs = s.videoupSub.Messages()
  17. err error
  18. c = context.TODO()
  19. )
  20. for {
  21. func() {
  22. var (
  23. msg *databus.Message
  24. ok bool
  25. offset int64
  26. dbus *archive.Databus
  27. )
  28. msg, ok = <-msgs
  29. if !ok {
  30. for part, lastOffset := range s.videoupSubIdempotent {
  31. if _, err = s.arc.UpDBus(c, s.c.VideoupSub.Group, s.c.VideoupSub.Topic, part, lastOffset); err != nil {
  32. continue
  33. }
  34. }
  35. log.Error("s.videoupSub.Messages closed")
  36. return
  37. }
  38. defer s.Rescue(string(msg.Value))
  39. msg.Commit()
  40. s.videoupMo++
  41. m := &message.Videoup{}
  42. if err = json.Unmarshal(msg.Value, m); err != nil {
  43. log.Error("json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  44. return
  45. }
  46. offset, ok = s.videoupSubIdempotent[msg.Partition]
  47. if !ok {
  48. if dbus, err = s.arc.DBus(c, s.c.VideoupSub.Group, msg.Topic, msg.Partition); err != nil {
  49. return
  50. }
  51. if dbus == nil {
  52. if _, err = s.arc.AddDBus(c, s.c.VideoupSub.Group, msg.Topic, msg.Partition, msg.Offset); err != nil {
  53. return
  54. }
  55. offset = msg.Offset
  56. } else {
  57. offset = dbus.Offset
  58. }
  59. }
  60. // if last offset > current offset -> continue
  61. if offset > msg.Offset {
  62. log.Error("key(%s) value(%s) partition(%s) offset(%d) is too early", msg.Key, msg.Value, msg.Partition, msg.Offset)
  63. return
  64. }
  65. s.videoupSubIdempotent[msg.Partition] = msg.Offset
  66. s.promDatabus.Incr(m.Route)
  67. log.Info("videoupMessage key(%s) value(%s) partition(%d) offset(%d) route(%s) commit start", msg.Key, msg.Value, msg.Partition, msg.Offset, m.Route)
  68. switch m.Route {
  69. case message.RouteSyncCid:
  70. err = s.syncCid(c, m)
  71. case message.RouteFirstRound:
  72. err = s.firstRound(c, m)
  73. case message.RouteUGCFirstRound:
  74. err = s.firstRound(c, m)
  75. case message.RouteSecondRound:
  76. err = s.secondRound(c, m)
  77. case message.RouteAddArchive:
  78. err = s.addArchive(c, m)
  79. case message.RouteModifyArchive:
  80. err = s.modifyArchive(c, m)
  81. case message.RouteDeleteArchive:
  82. err = s.deleteArchive(c, m)
  83. case message.RouteDeleteVideo:
  84. err = s.deleteVideo(c, m)
  85. case message.RouteModifyVideo:
  86. log.Info("databus modifyVideo(%+v)", m)
  87. default:
  88. log.Warn("videoupConsumer unknown message route(%s)", m.Route)
  89. }
  90. if err == nil {
  91. log.Info("videoupMessage key(%s) value(%s) partition(%d) offset(%d) end", msg.Key, msg.Value, msg.Partition, msg.Offset)
  92. } else {
  93. log.Error("videoupMessage key(%s) value(%s) partition(%d) offset(%d) error(%v)", msg.Key, msg.Value, msg.Partition, msg.Offset, err)
  94. }
  95. }()
  96. }
  97. }
  98. func (s *Service) syncCid(c context.Context, m *message.Videoup) (err error) {
  99. // make sure filename not exist in redis, otherwise videoup can not submit!!!
  100. s.redis.DelFilename(c, m.Filename)
  101. log.Info("filename(%s) del_filename from redis success", m.Filename)
  102. return
  103. }
  104. func (s *Service) firstRound(c context.Context, m *message.Videoup) (err error) {
  105. var (
  106. v *archive.Video
  107. a *archive.Archive
  108. )
  109. if m.Xcode == 1 {
  110. s.promVideoS.Incr("xcode_hd")
  111. }
  112. s.promVideoE.Incr("first_round")
  113. if v, a, err = s.archiveVideoByAid(c, m.Filename, m.Aid); err != nil {
  114. log.Error("s.archiveVideoByAid(%s, %d) error(%v)", m.Filename, m.Aid, err)
  115. return
  116. }
  117. if a.State == archive.StateForbidUpDelete {
  118. log.Warn("archive(%d) filename(%s) state(%d) is deleted", a.Aid, v.Filename, a.State)
  119. return
  120. }
  121. // make sure filename not exist in redis, otherwise videoup can not submit!!!
  122. s.redis.DelFilename(c, m.Filename)
  123. log.Info("filename(%s) del_filename from redis success", m.Filename)
  124. var (
  125. tx *xsql.Tx
  126. sChange, rChange bool
  127. round int8
  128. ad = archive.AuditParam{IsAudit: true}
  129. )
  130. if tx, err = s.arc.BeginTran(c); err != nil {
  131. log.Error("s.arc.BeginTran archive(%d) filename(%s) error(%v)", a.Aid, m.Filename, err)
  132. return
  133. }
  134. log.Info("archive(%d) filename(%s) begin first_round transcation a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
  135. if sChange, err = s.tranArchive(c, tx, a, v, &ad); err != nil {
  136. tx.Rollback()
  137. log.Error("s.tranArchive(%d, %s) error(%v)", a.Aid, v.Filename, err)
  138. return
  139. }
  140. log.Info("archive(%d) filename(%s) first_round tranArchive fininsh a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
  141. if round, err = s.tranRound(c, tx, a); err != nil {
  142. tx.Rollback()
  143. log.Error("s.tranRound error(%v)", err)
  144. return
  145. }
  146. rChange = round != a.Round
  147. log.Info("archive(%d) firstRound upRound(%d)", a.Aid, a.Round)
  148. a.Round = round
  149. if sChange || rChange {
  150. if err = s.tranArchiveOper(tx, a); err != nil {
  151. tx.Rollback()
  152. return
  153. }
  154. }
  155. log.Info("archive(%d) filename(%s) first_round tranRound fininsh a_round(%d)", a.Aid, v.Filename, a.Round)
  156. if err = tx.Commit(); err != nil {
  157. log.Error("tx.Commit(%d, %s) error(%v)", a.Aid, v.Filename, err)
  158. return
  159. }
  160. log.Info("archive(%d) filename(%s) end first_round transcation a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
  161. s.sendPostFirstRound(c, message.RoutePostFirstRound, a.Aid, v.Filename, m.AdminChange)
  162. if sChange {
  163. if a.IsForbid() {
  164. //todo fixbug 导致打回的再自动开放视频 16s
  165. s.syncBVC(c, a)
  166. s.sendAuditMsg(c, message.RouteFirstRoundForbid, a.Aid)
  167. }
  168. s.sendMsg(c, a, v)
  169. if archive.NormalState(a.State) {
  170. // add for open state after dispatch
  171. s.sendAuditMsg(c, message.RouteAutoOpen, a.Aid)
  172. if is, _ := s.IsUpperFirstPass(c, a.Mid, a.Aid); is {
  173. go s.sendNewUpperMsg(c, a.Mid, a.Aid)
  174. }
  175. log.Info("firstRound aid(%d) pubbed. Do sync.", a.Aid)
  176. }
  177. }
  178. if s.canDo(a.Mid) {
  179. s.syncRetry(context.TODO(), a.Aid, 0, redis.ActionForVideocovers, a.Cover, a.Cover)
  180. }
  181. return
  182. }
  183. func (s *Service) secondRound(c context.Context, m *message.Videoup) (err error) {
  184. var (
  185. a *archive.Archive
  186. addit *archive.Addit
  187. )
  188. if a, err = s.arc.Archive(c, m.Aid); err != nil {
  189. log.Error("s.arc.Archive(%d) error(%v)", m.Aid, err)
  190. return
  191. }
  192. if addit, err = s.arc.Addit(c, m.Aid); err != nil {
  193. //为了不影响主流程,这里只是记个错误日志
  194. //!!!注意!!! addit可能是nil,用之前需要判断
  195. log.Error("s.arc.Addit(%d) error(%v)", m.Aid, err)
  196. err = nil
  197. }
  198. if a == nil {
  199. log.Warn("archive(%d) is not exist", m.Aid)
  200. return
  201. }
  202. //check first admin audit
  203. var had bool
  204. if had, _ = s.redis.SetMonitorCache(c, a.Aid); had {
  205. s.redis.DelMonitorCache(c, a.Aid)
  206. s.promVideoE.Incr("second_round")
  207. }
  208. // start archive
  209. var tx *xsql.Tx
  210. if tx, err = s.arc.BeginTran(c); err != nil {
  211. log.Error("s.arc.BeginTran error(%v)", err)
  212. return
  213. }
  214. if err = s.tranSumDuration(c, tx, a); err != nil {
  215. tx.Rollback()
  216. log.Error("s.tranSumDuration error(%v)", err)
  217. return
  218. }
  219. if err = tx.Commit(); err != nil {
  220. log.Error("tx.Commit error(%v)", err)
  221. return
  222. }
  223. if a.State != archive.StateForbidFixed {
  224. s.syncBVC(c, a)
  225. }
  226. if archive.NormalState(a.State) {
  227. s.changeMission(c, a, m.MissionID)
  228. } else {
  229. s.unBindMission(c, a, m.MissionID)
  230. }
  231. if m.IsSendNotify {
  232. s.sendMsg(c, a, nil)
  233. }
  234. //TODO 将其它发送系统通知的逻辑都放到sendMsg2Upper()里
  235. go s.sendMsg2Upper(c, m, a, addit)
  236. //回查阶段修改了稿件信息需要发送通知
  237. if a.State >= archive.StateOpen && ((s.c.ChangeDebug && s.c.ChangeMid == a.Mid) || !s.c.ChangeDebug) && (m.ChangeTypeID || m.ChangeTitle || m.ChangeCopyright || m.ChangeCover) {
  238. log.Info("archive(%d) secondRound sendChangeMsg(%+v)", a.Aid, m)
  239. s.sendChangeMsg(c, a, nil, m)
  240. }
  241. if s.canDo(a.Mid) {
  242. s.syncRetry(context.TODO(), a.Aid, 0, redis.ActionForVideocovers, a.Cover, a.Cover)
  243. }
  244. return
  245. }
  246. // sendMsg2Upper 发送系统通知给UP主
  247. func (s *Service) sendMsg2Upper(c context.Context, m *message.Videoup, a *archive.Archive, addit *archive.Addit) {
  248. if archive.NormalState(a.State) && m.MissionID != 0 && addit != nil && addit.MissionID == 0 { //m.MissionID是审核人员提交修改之前的活动id
  249. s.sendMissionMsg(c, a)
  250. }
  251. is, err := s.IsUpperFirstPass(c, a.Mid, a.Aid)
  252. if err != nil {
  253. log.Error("s.IsUpperFirstPass(%d,%d) error(%v)", a.Mid, a.Aid, err)
  254. err = nil
  255. } else if is {
  256. // UP主第一次稿件过审发送
  257. s.sendNewUpperMsg(c, a.Mid, a.Aid)
  258. }
  259. }
  260. func (s *Service) addArchive(c context.Context, m *message.Videoup) (err error) {
  261. var a *archive.Archive
  262. if a, err = s.arc.Archive(c, m.Aid); err != nil {
  263. log.Error("s.arc.Archive(%d) error(%v)", m.Aid, err)
  264. return
  265. }
  266. if a == nil {
  267. log.Warn("archive(%d) is not exist", m.Aid)
  268. return
  269. }
  270. if s.canDo(a.Mid) {
  271. s.syncRetry(context.TODO(), a.Aid, 0, redis.ActionForVideocovers, a.Cover, a.Cover)
  272. }
  273. var (
  274. tx *xsql.Tx
  275. round int8
  276. state, _, _, _ = s.archiveState(c, a, nil, nil)
  277. )
  278. if archive.NormalState(state) { // NOTE: add archive must audit maybe code mode.
  279. state = archive.StateForbidWait
  280. }
  281. if state == a.State {
  282. log.Warn("archive(%d) add_archive newState(%d)==oldState(%d)", a.Aid, state, a.State)
  283. return
  284. }
  285. if tx, err = s.arc.BeginTran(c); err != nil {
  286. log.Error("s.arc.BeginTran archive(%d) error(%v)", a.Aid, err)
  287. return
  288. }
  289. log.Info("archive(%d) begin add_archive transcation a_state(%d)", a.Aid, a.State)
  290. // archive
  291. if _, err = s.arc.TxUpState(tx, a.Aid, state); err != nil {
  292. tx.Rollback()
  293. log.Error("s.arc.TxUpState(%d, %d) error(%v)", a.Aid, state, err)
  294. return
  295. }
  296. a.State = state
  297. log.Info("archive(%d) add_archive upState(%d)", a.Aid, a.State)
  298. if round, err = s.tranRound(c, tx, a); err != nil {
  299. tx.Rollback()
  300. log.Error("s.tranRound error(%v)", err)
  301. return
  302. }
  303. a.Round = round
  304. log.Info("archive(%d) add_archive upRound(%d)", a.Aid, a.Round)
  305. if err = s.tranArchiveOper(tx, a); err != nil {
  306. tx.Rollback()
  307. return
  308. }
  309. if err = tx.Commit(); err != nil {
  310. log.Error("tx.Commit(%d) error(%v)", a.Aid, err)
  311. return
  312. }
  313. log.Info("archive(%d) end add_archive transcation a_state(%d)", a.Aid, a.State)
  314. return
  315. }
  316. func (s *Service) modifyArchive(c context.Context, m *message.Videoup) (err error) {
  317. var a *archive.Archive
  318. if a, err = s.arc.Archive(c, m.Aid); err != nil {
  319. log.Error("s.arc.Archive(%d) error(%v)", m.Aid, err)
  320. return
  321. }
  322. // start archive
  323. if a.NotAllowUp() {
  324. log.Warn("archive(%d) modify_archive state(%d) not allow update", a.Aid, a.State)
  325. return
  326. }
  327. if s.canDo(a.Mid) {
  328. s.syncRetry(context.TODO(), a.Aid, 0, redis.ActionForVideocovers, a.Cover, a.Cover)
  329. }
  330. // TODO comment when online
  331. if !m.EditArchive && !m.EditVideo {
  332. // to sync pubbed arc if not edited or just edit order.
  333. if archive.NormalState(a.State) {
  334. s.sendAuditMsg(c, message.RouteForceSync, a.Aid)
  335. log.Info("modifyArchive aid(%d) not modified or changed order. Do sync.", a.Aid)
  336. }
  337. return
  338. }
  339. var (
  340. tx *xsql.Tx
  341. state, _, _, _ = s.archiveState(c, a, nil, nil)
  342. round int8
  343. )
  344. if archive.NormalState(state) || state == archive.StateForbidUserDelay {
  345. if state != archive.StateForbidFixed {
  346. state = archive.StateForbidWait
  347. }
  348. }
  349. if state == a.State {
  350. return
  351. }
  352. if tx, err = s.arc.BeginTran(c); err != nil {
  353. log.Error("s.arc.BeginTran archive(%d) error(%v)", a.Aid, err)
  354. return
  355. }
  356. log.Info("archive(%d) begin modify_archive transcation a_state(%d)", a.Aid, a.State)
  357. // archive
  358. if _, err = s.arc.TxUpState(tx, a.Aid, state); err != nil {
  359. tx.Rollback()
  360. log.Error("s.arc.TxUpState(%d, %d) error(%v)", a.Aid, state, err)
  361. return
  362. }
  363. log.Info("archive(%d) modify_archive a.State(%d) upState(%d)", a.Aid, a.State, state)
  364. a.State = state
  365. if round, err = s.tranRound(c, tx, a); err != nil {
  366. tx.Rollback()
  367. return
  368. }
  369. a.Round = round
  370. log.Info("archive(%d) modify_archive upRound(%d)", a.Aid, a.Round)
  371. if err = s.tranArchiveOper(tx, a); err != nil {
  372. tx.Rollback()
  373. return
  374. }
  375. if err = tx.Commit(); err != nil {
  376. log.Error("tx.Commit(%d) error(%v)", a.Aid, err)
  377. return
  378. }
  379. log.Info("archive(%d) end modify_archive transcation a_state(%d)", a.Aid, a.State)
  380. if a.State == archive.StateForbidFixed {
  381. s.addClickToRedis(c, a.Aid)
  382. }
  383. return
  384. }
  385. func (s *Service) deleteArchive(c context.Context, m *message.Videoup) (err error) {
  386. var a *archive.Archive
  387. if a, err = s.arc.Archive(c, m.Aid); err != nil {
  388. log.Error("s.arc.Archive(%d) error(%v)", m.Aid, err)
  389. return
  390. }
  391. log.Info("archive delete_archive aid(%d)", m.Aid)
  392. s.syncBVC(c, a)
  393. return
  394. }
  395. func (s *Service) deleteVideo(c context.Context, m *message.Videoup) (err error) {
  396. var (
  397. v *archive.Video
  398. tx *xsql.Tx
  399. sum int64
  400. )
  401. if v, err = s.arc.NewVideo(c, m.Filename); err != nil || v == nil {
  402. return
  403. }
  404. log.Info("archive delete_video filename(%s) cid(%d) begin", m.Filename, v.Cid)
  405. if tx, err = s.arc.BeginTran(c); err != nil {
  406. log.Error("s.arc.BeginTran error(%v)", err)
  407. return
  408. }
  409. if sum, err = s.arc.NewSumDuration(c, v.Aid); err != nil {
  410. tx.Rollback()
  411. log.Error("s.arc.SumDuration(%d) filename(%s) error(%v)", v.Aid, v.Filename, err)
  412. return
  413. }
  414. if _, err = s.arc.TxUpArcDuration(tx, v.Aid, sum); err != nil {
  415. tx.Rollback()
  416. log.Error("s.arc.TxUpArcDuration(%d, %d) filename(%s) error(%v)", v.Aid, sum, v.Filename, err)
  417. return
  418. }
  419. if err = tx.Commit(); err != nil {
  420. log.Error("tx.Commit error(%v)", err)
  421. return
  422. }
  423. log.Info("archive(%d) filename(%s) upArcDuration(%d)", v.Aid, v.Filename, sum)
  424. log.Info("archive delete_video filename(%s) cid(%d) end", m.Filename, v.Cid)
  425. return
  426. }