archive.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "go-common/app/admin/main/videoup/model/archive"
  8. "go-common/app/admin/main/videoup/model/manager"
  9. "go-common/app/admin/main/videoup/model/oversea"
  10. tagrpc "go-common/app/interface/main/tag/model"
  11. "go-common/library/database/sql"
  12. "go-common/library/ecode"
  13. "go-common/library/log"
  14. "go-common/library/sync/errgroup"
  15. "go-common/library/time"
  16. "go-common/library/xstr"
  17. )
  18. // Submit second_round submit,update archive_addit\archive_delay\archive.
  19. func (s *Service) Submit(c context.Context, ap *archive.ArcParam) (err error) {
  20. var (
  21. tx *sql.Tx
  22. a *archive.Archive
  23. addit *archive.Addit
  24. operConts, flowConts []string
  25. oldMissionID, missionID, descFormatID int64
  26. mUser *manager.User
  27. porderConts []string
  28. porder = &archive.Porder{}
  29. rel = &oversea.ArchiveRelation{}
  30. )
  31. if a, err = s.arc.Archive(c, ap.Aid); err != nil || a == nil {
  32. log.Error("s.arc.Archive(%d) error(%v) or a==nil", ap.Aid, err)
  33. return
  34. }
  35. if addit, _ = s.arc.Addit(c, ap.Aid); addit != nil {
  36. missionID = addit.MissionID
  37. descFormatID = addit.DescFormatID
  38. }
  39. forbid, _ := s.arc.Forbid(c, ap.Aid)
  40. if tx, err = s.arc.BeginTran(c); err != nil {
  41. log.Error("s.arc.BeginTran() error(%v)", err)
  42. return
  43. }
  44. defer func() {
  45. if r := recover(); r != nil {
  46. tx.Rollback()
  47. log.Error("wocao jingran recover le error(%v)", r)
  48. }
  49. }()
  50. //私单稿件 审核后台允许 私单只填写 流量TAG
  51. if ap.GroupID > 0 {
  52. porderConts, porder = s.diffPorder(c, a.Aid, ap)
  53. operConts = append(operConts, porderConts...)
  54. if err = s.TxUpPorder(tx, ap); err != nil {
  55. tx.Rollback()
  56. return
  57. }
  58. //自动匹配流量tag
  59. ap.OnFlowID = ap.GroupID
  60. if err = s.txUpFlowID(tx, ap); err != nil {
  61. tx.Rollback()
  62. return
  63. }
  64. }
  65. s.sendPorderLog(c, ap, porderConts, porder, a)
  66. //流量属性控制,比如:频道禁止
  67. if flowConts, err = s.txBatchUpFlowsState(c, tx, a.Aid, ap.UID, ap.FlowAttribute); err != nil {
  68. log.Error("s.txBatchUpFlowsState error(%v)", err)
  69. tx.Rollback()
  70. return
  71. }
  72. if len(flowConts) > 0 {
  73. operConts = append(operConts, flowConts...)
  74. }
  75. //审核更换流量TAG 或者新增私单 私单二期 flow_design.pool=2 聚合到 archive_forbid + archive.attr 由前端merge
  76. var delayCont string
  77. if ap.State, delayCont, err = s.txUpArcDelay(c, tx, ap.Aid, ap.Mid, ap.State, ap.Delay, ap.DTime); err != nil {
  78. tx.Rollback()
  79. log.Error("s.arc.txUpArcDelay(%d,%d,%d,%d,%v,%d) error(%v)", ap.Aid, ap.Mid, ap.TypeID, ap.State, ap.Delay, ap.DTime, err)
  80. return
  81. }
  82. if delayCont != "" {
  83. operConts = append(operConts, delayCont)
  84. }
  85. ap.Round = s.archiveRound(c, a, ap.Aid, a.Mid, a.TypeID, a.Round, ap.State, ap.CanCelMission)
  86. if ap.Access, err = s.txUpArcMainState(c, tx, ap.Aid, ap.Forward, ap.TypeID, ap.Access, ap.State, ap.Round, ap.RejectReason); err != nil {
  87. tx.Rollback()
  88. log.Error("s.arc.txUpArcMainState(%d,%d,%d,%d,%d,%d,%s) error(%v)", ap.Aid, ap.Forward, ap.TypeID, ap.Access, ap.State, ap.Round, ap.RejectReason, err)
  89. return
  90. }
  91. ap.PTime = s.archivePtime(c, ap.Aid, ap.State, ap.PTime)
  92. // access、cancel_mission、cover、reject_reason、attr、source、redirecturl、forward、state、pubtime、copyright、mtime、round、title、typeid、content、delay
  93. if _, err = s.arc.TxUpArchive(tx, ap.Aid, ap.Title, ap.Content, ap.Cover, ap.Note, ap.Copyright, ap.PTime); err != nil {
  94. tx.Rollback()
  95. log.Error("s.arc.TxUpArchive(%d,%s,%s,%s,%s,%d,%d) error(%v)", ap.Aid, ap.Title, ap.Content, ap.Cover, ap.Note, ap.Copyright, ap.PTime, err)
  96. return
  97. }
  98. log.Info("aid(%d) update archive title(%s) content(%s) cover(%s) copyright(%d) ,ptime(%d), round(%d), state(%d)", ap.Aid, ap.Title, ap.Content, ap.Cover, ap.Copyright, ap.PTime, ap.Round, ap.State)
  99. // cancel activity
  100. if ap.CanCelMission {
  101. oldMissionID = missionID
  102. missionID = 0
  103. }
  104. desc := ""
  105. if descFormatID > 0 {
  106. desc = ap.Content
  107. }
  108. if _, err = s.arc.TxUpAddit(tx, ap.Aid, missionID, ap.Source, desc, ap.Dynamic); err != nil {
  109. tx.Rollback()
  110. log.Error("s.arc.TxUpAddit(%d,%d,%s,%s,%s) error(%v)", ap.Aid, missionID, ap.Source, desc, ap.Dynamic, err)
  111. return
  112. }
  113. if ap.PolicyID > 1 {
  114. ap.Attrs.LimitArea = 1
  115. } else {
  116. ap.Attrs.LimitArea = 0
  117. }
  118. attrs, forbidAttrs := s.archiveAttr(c, ap, true)
  119. var attrConts []string
  120. if attrConts, err = s.TxUpArchiveAttr(c, tx, a, ap.Aid, ap.UID, attrs, forbidAttrs, ap.URL); err != nil {
  121. tx.Rollback()
  122. log.Error("s.TxUpArchiveAttr(%d) error(%v)", ap.Aid, err)
  123. return
  124. }
  125. operConts = append(operConts, attrConts...)
  126. log.Info("aid(%d) update archive attribute(%+v)", ap.Aid, attrs)
  127. if err = tx.Commit(); err != nil {
  128. log.Error("tx.Commit() error(%v)", err)
  129. return
  130. }
  131. if ap.PolicyID != 0 {
  132. if rel, err = s.oversea.PolicyRelation(c, ap.Aid); err != nil {
  133. log.Error("s.oversea.PolicyRelation(%d) error(%v)", ap.Aid, err)
  134. return
  135. }
  136. if rel == nil || rel.GroupID != ap.PolicyID {
  137. if _, err = s.oversea.UpPolicyRelation(c, ap.Aid, ap.PolicyID); err != nil {
  138. log.Error("s.oversea.UpPolicyRelation(%d,%d) error(%v)", ap.Aid, ap.PolicyID, err)
  139. return
  140. }
  141. operConts = append(operConts, fmt.Sprintf("[地区展示]应用策略组ID[%d]", ap.PolicyID))
  142. }
  143. }
  144. log.Info("aid(%d) end second_round submit pro", ap.Aid)
  145. // is send email
  146. var isChanged = true
  147. if (((ap.State == archive.StateOpen && ap.Access == archive.AccessDefault) || ap.State == archive.StateForbidUserDelay) && !ap.AdminChange) ||
  148. (ap.State == archive.StateForbidWait) {
  149. isChanged = false
  150. }
  151. archiveOperConts, changeTypeID, changeCopyright, changeTitle, changeCover := s.diffArchiveOper(ap, a, addit, forbid)
  152. operConts = append(operConts, archiveOperConts...)
  153. oper := &archive.ArcOper{Aid: ap.Aid, UID: ap.UID, TypeID: ap.TypeID, State: archive.AccessState(ap.State, ap.Access), Round: ap.Round, Attribute: a.Attribute, Remark: ap.Note}
  154. oper.Content = strings.Join(operConts, ",")
  155. if ap.ApplyUID != 0 {
  156. mUser, _ = s.mng.User(c, ap.ApplyUID)
  157. oper.Content = "[通过" + mUser.NickName + "(" + strconv.FormatInt(ap.ApplyUID, 10) + ")申请的工单]" + oper.Content
  158. }
  159. s.addArchiveOper(c, oper)
  160. // databus
  161. s.busSecondRound(ap.Aid, oldMissionID, ap.Notify, isChanged, changeTypeID, changeCopyright, changeTitle, changeCover, ap.FromList, ap)
  162. go s.busSecondRoundUpCredit(ap.Aid, 0, ap.Mid, ap.UID, ap.State, ap.Round, ap.ReasonID, ap.RejectReason)
  163. // log
  164. s.sendArchiveLog(c, ap, operConts, a)
  165. return
  166. }
  167. // BatchArchive batch async archive audit.
  168. func (s *Service) BatchArchive(c context.Context, aps []*archive.ArcParam, action string) (err error) {
  169. var mp = &archive.MultSyncParam{}
  170. var ok bool
  171. for _, ap := range aps {
  172. mp.Action = action
  173. mp.ArcParam = ap
  174. if ok, err = s.busCache.PushMultSync(c, mp); err != nil {
  175. log.Error("s.busCache.PushMultSync(ap(%+v)) error(%v)", ap, err)
  176. return
  177. }
  178. if !ok {
  179. log.Warn("s.busCache.PushMultSync(ap(%+v))", ap)
  180. continue
  181. }
  182. }
  183. return
  184. }
  185. func (s *Service) dealArchive(c context.Context, ap *archive.ArcParam) (err error) {
  186. var a *archive.Archive
  187. if a, err = s.arc.Archive(c, ap.Aid); err != nil || a == nil {
  188. log.Error("s.arc.Archive(%d) error(%v) or a==nil", ap.Aid, err)
  189. return
  190. }
  191. var tx *sql.Tx
  192. if tx, err = s.arc.BeginTran(c); err != nil {
  193. log.Error("s.arc.BeginTran() error(%v)", err)
  194. return
  195. }
  196. defer func() {
  197. if r := recover(); r != nil {
  198. tx.Rollback()
  199. log.Error("wocao jingran recover le error(%v)", r)
  200. }
  201. }()
  202. log.Info("aid(%d) start BatchSubmit pro params is (%+v)", ap.Aid, ap)
  203. if ap.State, _, err = s.txUpArcDelay(c, tx, ap.Aid, ap.Mid, ap.State, true, ap.DTime); err != nil {
  204. tx.Rollback()
  205. log.Error("s.arc.txUpArcDelay(%d,%d,%d,%d,%v,%d) error(%v)", ap.Aid, ap.Mid, ap.TypeID, ap.State, true, ap.DTime, err)
  206. return
  207. }
  208. ap.Round = s.archiveRound(c, a, ap.Aid, a.Mid, a.TypeID, a.Round, ap.State, ap.CanCelMission)
  209. if ap.Access, err = s.txUpArcMainState(c, tx, ap.Aid, ap.Forward, ap.TypeID, ap.Access, ap.State, ap.Round, ap.RejectReason); err != nil {
  210. tx.Rollback()
  211. log.Error("s.arc.txUpArcMainState(%d,%d,%d,%d,%d,%d,%s) error(%v)", ap.Aid, ap.Forward, ap.TypeID, ap.Access, ap.State, ap.Round, ap.RejectReason, err)
  212. return
  213. }
  214. ap.PTime = s.archivePtime(c, ap.Aid, ap.State, a.PTime) // NOTE: for batch no ptime...
  215. if _, err = s.arc.TxUpArcPTime(tx, ap.Aid, ap.PTime); err != nil {
  216. tx.Rollback()
  217. log.Error("s.arc.TxUpArcPTime(%d,%d) error(%v)", ap.Aid, ap.PTime, err)
  218. return
  219. }
  220. log.Info("aid(%d) update archive pubtime(%d)", ap.Aid, ap.PTime)
  221. if _, err = s.arc.TxUpArcNote(tx, ap.Aid, ap.Note); err != nil {
  222. tx.Rollback()
  223. log.Error("s.arc.TxUpArcNote(%d,%s) error(%v)", ap.Aid, ap.Note, err)
  224. return
  225. }
  226. log.Info("aid(%d) update archive note(%s)", ap.Aid, ap.Note)
  227. if ap.FlagCopyright {
  228. if _, err = s.arc.TxUpArcCopyRight(tx, ap.Aid, ap.Copyright); err != nil {
  229. tx.Rollback()
  230. log.Error("s.arc.TxUpArcCopyRight(%d,%d) error(%v)", ap.Aid, ap.Copyright, err)
  231. return
  232. }
  233. log.Info("aid(%d) update archive Copyright(%d)", ap.Aid, ap.Copyright)
  234. }
  235. if err = tx.Commit(); err != nil {
  236. log.Error("tx.Commit() error(%v)", err)
  237. return
  238. }
  239. log.Info("aid(%d) end BatchSubmit pro", ap.Aid)
  240. var isChanged = true
  241. if (((ap.State == archive.StateOpen && ap.Access == archive.AccessDefault) || ap.State == archive.StateForbidUserDelay) && !ap.AdminChange) ||
  242. (ap.State == archive.StateForbidWait) {
  243. isChanged = false
  244. }
  245. oper := &archive.ArcOper{Aid: ap.Aid, UID: ap.UID, TypeID: ap.TypeID, State: archive.AccessState(ap.State, ap.Access), Round: ap.Round, Attribute: a.Attribute, Remark: ap.Note}
  246. operConts := s.diffBatchArchiveOper(ap, a)
  247. oper.Content = strings.Join(operConts, ",")
  248. s.addArchiveOper(c, oper)
  249. s.busSecondRound(ap.Aid, 0, ap.Notify, isChanged, false, false, false, false, ap.FromList, ap)
  250. go s.busSecondRoundUpCredit(ap.Aid, 0, ap.Mid, ap.UID, ap.State, ap.Round, ap.ReasonID, ap.RejectReason)
  251. // log
  252. ap.CTime = a.CTime
  253. s.sendArchiveLog(c, ap, operConts, a)
  254. return
  255. }
  256. func (s *Service) dealAttrs(c context.Context, ap *archive.ArcParam) (err error) {
  257. var a *archive.Archive
  258. if a, err = s.arc.Archive(c, ap.Aid); err != nil || a == nil {
  259. log.Error("s.arc.Archive(%d) error(%v) or a==nil", ap.Aid, err)
  260. return
  261. }
  262. //付费稿件不支持批量属性修改 因为涉及到价格设置一致性问题
  263. if s.isUGCPay(a) {
  264. log.Info("dealAttrs skip UGCPay(%d) error(%v)", ap.Aid, err)
  265. return
  266. }
  267. var tx *sql.Tx
  268. if tx, err = s.arc.BeginTran(c); err != nil {
  269. log.Error("s.arc.BeginTran() error(%v)", err)
  270. return
  271. }
  272. defer func() {
  273. if r := recover(); r != nil {
  274. tx.Rollback()
  275. log.Error("wocao jingran recover le error(%v)", r)
  276. }
  277. }()
  278. log.Info("aid(%d) start BatchAttr pro", ap.Aid)
  279. var operConts, flowConts []string
  280. attrs, forbidAttrs := s.archiveAttr(c, ap, false)
  281. if operConts, err = s.TxUpArchiveAttr(c, tx, a, ap.Aid, ap.UID, attrs, forbidAttrs, ""); err != nil {
  282. tx.Rollback()
  283. log.Error("s.arc.txUpArcAttrs(%d) error(%v)", ap.Aid, err)
  284. return
  285. }
  286. log.Info("aid(%d) update archive attribute( %+v )", ap.Aid, attrs)
  287. if _, err = s.arc.TxUpArcNote(tx, ap.Aid, ap.Note); err != nil {
  288. tx.Rollback()
  289. log.Error("s.arc.TxUpArcNote(%d,%s) error(%v)", ap.Aid, ap.Note, err)
  290. return
  291. }
  292. log.Info("aid(%d) update archive note(%s)", ap.Aid, ap.Note)
  293. //流量属性控制,比如:频道禁止
  294. if flowConts, err = s.txBatchUpFlowsState(c, tx, a.Aid, ap.UID, ap.FlowAttribute); err != nil {
  295. log.Error("s.txBatchUpFlowsState error(%v)", err)
  296. tx.Rollback()
  297. return
  298. }
  299. if len(flowConts) > 0 {
  300. operConts = append(operConts, flowConts...)
  301. }
  302. if err = tx.Commit(); err != nil {
  303. log.Error("tx.Commit() error(%v)", err)
  304. return
  305. }
  306. log.Info("aid(%d) end BatchAttr pro", ap.Aid)
  307. oper := &archive.ArcOper{Aid: ap.Aid, UID: ap.UID, TypeID: a.TypeID, State: archive.AccessState(a.State, a.Access), Round: a.Round, Attribute: a.Attribute, Remark: ap.Note}
  308. oper.Content = strings.Join(operConts, ",")
  309. s.addArchiveOper(c, oper)
  310. s.busSecondRound(ap.Aid, 0, ap.Notify, ap.AdminChange, false, false, false, false, ap.FromList, ap)
  311. // log
  312. ap.CTime = a.CTime
  313. s.sendArchiveLog(c, ap, operConts, a)
  314. return
  315. }
  316. func (s *Service) dealArchiveSecondRound(c context.Context, ap *archive.ArcParam) (err error) {
  317. s.busSecondRound(ap.Aid, 0, false, false, false, false, false, false, ap.FromList, ap)
  318. return
  319. }
  320. func (s *Service) dealTypeID(c context.Context, ap *archive.ArcParam) (err error) {
  321. var a *archive.Archive
  322. if a, err = s.arc.Archive(c, ap.Aid); err != nil || a == nil {
  323. log.Error("s.arc.Archive(%d) error(%v) or a==nil", ap.Aid, err)
  324. return
  325. }
  326. var tx *sql.Tx
  327. if tx, err = s.arc.BeginTran(c); err != nil {
  328. log.Error("s.arc.BeginTran() error(%v)", err)
  329. return
  330. }
  331. defer func() {
  332. if r := recover(); r != nil {
  333. tx.Rollback()
  334. log.Error("wocao jingran recover le error(%v)", r)
  335. }
  336. }()
  337. log.Info("aid(%d) start BatchType pro", ap.Aid)
  338. if _, err = s.arc.TxUpArcTypeID(tx, ap.Aid, ap.TypeID); err != nil {
  339. tx.Rollback()
  340. log.Error("s.arc.TxUpArcTypeID(%d,%d) error(%v)", ap.Aid, ap.TypeID, err)
  341. return
  342. }
  343. log.Info("aid(%d) update archive type_id(%d)", ap.Aid, ap.TypeID)
  344. if _, err = s.arc.TxUpArcNote(tx, ap.Aid, ap.Note); err != nil {
  345. tx.Rollback()
  346. log.Error("s.arc.TxUpArcNote(%d,%s) error(%v)", ap.Aid, ap.Note, err)
  347. return
  348. }
  349. log.Info("aid(%d) update archive note(%s)", ap.Aid, ap.Note)
  350. if err = tx.Commit(); err != nil {
  351. log.Error("tx.Commit() error(%v)", err)
  352. return
  353. }
  354. log.Info("aid(%d) end BatchType pro", ap.Aid)
  355. var changeTypeID bool
  356. oper := &archive.ArcOper{Aid: ap.Aid, UID: ap.UID, TypeID: ap.TypeID, State: archive.AccessState(a.State, a.Access), Round: a.Round, Attribute: a.Attribute, Remark: ap.Note}
  357. oper.Content, changeTypeID = s.diffTypeID(ap.TypeID, a.TypeID, a.State)
  358. var operConts []string
  359. operConts = append(operConts, oper.Content)
  360. s.addArchiveOper(c, oper)
  361. if ap.ForceSync {
  362. s.busArchiveForceSync(ap.Aid)
  363. } else {
  364. s.busSecondRound(ap.Aid, 0, ap.Notify, ap.AdminChange, changeTypeID, false, false, false, ap.FromList, ap)
  365. }
  366. // log
  367. ap.CTime = a.CTime
  368. s.sendArchiveLog(c, ap, operConts, a)
  369. return
  370. }
  371. // // BatchZlimit batche modify zlimit.
  372. // func (s *Service) BatchZlimit(c context.Context, ap *archive.ArcParam) (err error) {
  373. // var (
  374. // tx *sql.Tx
  375. // aps []*archive.ArcParam
  376. // aids = xstr.JoinInts(ap.Aids)
  377. // )
  378. // if tx, err = s.mng.BeginTran(c); err != nil {
  379. // log.Error("s.arc.BeginTran() error(%v)", err)
  380. // return
  381. // }
  382. // defer func() {
  383. // if r := recover(); r != nil {
  384. // tx.Rollback()
  385. // log.Error("wocao jingran recover le error(%v)", r)
  386. // }
  387. // }()
  388. // log.Info("aids(%s) start BatchZlimit pro", aids)
  389. // for _, aid := range ap.Aids {
  390. // if _, err = s.mng.TxAddUpArea(tx, aid, ap.GroupID); err != nil {
  391. // tx.Rollback()
  392. // log.Error("s.arc.TxAddUpArea(%d,%d) error(%v)", aid, ap.GroupID, err)
  393. // return
  394. // }
  395. // log.Info("aid(%d) update archive gid(%d)", aid, ap.GroupID)
  396. // ap.Aid = aid
  397. // aps = append(aps, ap)
  398. // }
  399. // if err = tx.Commit(); err != nil {
  400. // log.Error("tx.Commit() error(%v)", err)
  401. // return
  402. // }
  403. // log.Info("aids(%s) end BatchZlimit pro", aids)
  404. // s.busSecondRound(aps)
  405. // return
  406. // }
  407. // UpAuther update owner.
  408. func (s *Service) UpAuther(c context.Context, ap *archive.ArcParam) (err error) {
  409. var tx *sql.Tx
  410. if tx, err = s.arc.BeginTran(c); err != nil {
  411. log.Error("s.arc.BeginTran() error(%v)", err)
  412. return
  413. }
  414. defer func() {
  415. if r := recover(); r != nil {
  416. tx.Rollback()
  417. log.Error("wocao jingran recover le error(%v)", r)
  418. }
  419. }()
  420. log.Info("aid(%d) start up author", ap.Aid)
  421. if _, err = s.arc.TxUpArcAuthor(tx, ap.Aid, ap.Mid, ap.Author); err != nil {
  422. tx.Rollback()
  423. log.Error("s.Auther s.dede.TxUpArcAuthor(%d,%d,%s) error(%v)", ap.Aid, ap.Mid, ap.Author, err)
  424. return
  425. }
  426. log.Info("aid(%d) update archive mid(%d) author(%s)", ap.Aid, ap.Mid, ap.Author)
  427. if err = tx.Commit(); err != nil {
  428. log.Error("tx.Commit() error(%v)", err)
  429. return
  430. }
  431. log.Info("aid(%d) end up author", ap.Aid)
  432. // databus
  433. s.busSecondRound(ap.Aid, 0, ap.Notify, false, false, false, false, false, ap.FromList, ap)
  434. return
  435. }
  436. // UpAccess update access.
  437. func (s *Service) UpAccess(c context.Context, ap *archive.ArcParam) (err error) {
  438. var tx *sql.Tx
  439. if tx, err = s.arc.BeginTran(c); err != nil {
  440. log.Error("s.arc.BeginTran() error(%v)", err)
  441. return
  442. }
  443. defer func() {
  444. if r := recover(); r != nil {
  445. tx.Rollback()
  446. log.Error("wocao jingran recover le error(%v)", r)
  447. }
  448. }()
  449. log.Info("aid(%d) start up access", ap.Aid)
  450. if _, err = s.arc.TxUpArcAccess(tx, ap.Aid, ap.Access); err != nil {
  451. tx.Rollback()
  452. log.Error("s.Access s.dede.TxUpArcAccess(%d,%d) error(%v)", ap.Aid, ap.Access, err)
  453. return
  454. }
  455. log.Info("aid(%d) update archive access(%d)", ap.Aid, ap.Access)
  456. if err = tx.Commit(); err != nil {
  457. log.Error("tx.Commit() error(%v)", err)
  458. return
  459. }
  460. log.Info("aid(%d) end up access", ap.Aid)
  461. // databus
  462. s.busSecondRound(ap.Aid, 0, ap.Notify, ap.AdminChange, false, false, false, false, ap.FromList, ap)
  463. return
  464. }
  465. // UpArchiveAttr update archive attr by aid.
  466. func (s *Service) UpArchiveAttr(c context.Context, aid, uid int64, attrs map[uint]int32, forbidAttrs map[string]map[uint]int32, redirectURL string) (err error) {
  467. var a *archive.Archive
  468. if a, err = s.arc.Archive(c, aid); err != nil || a == nil {
  469. log.Error("s.arc.Archive(%d) error(%v) or a==nil", aid, err)
  470. return
  471. }
  472. log.Info("aid(%d) begin tran change attribute", aid)
  473. var tx *sql.Tx
  474. if tx, err = s.arc.BeginTran(c); err != nil {
  475. log.Error("begin tran error(%v)", err)
  476. return
  477. }
  478. defer func() {
  479. if r := recover(); r != nil {
  480. tx.Rollback()
  481. log.Error("wocao jingran recover le error(%v)", r)
  482. }
  483. }()
  484. var conts []string
  485. if conts, err = s.txUpArcAttrs(tx, a, attrs, redirectURL); err != nil {
  486. tx.Rollback()
  487. return
  488. }
  489. var tmpCs []string
  490. if tmpCs, err = s.txUpArcForbidAttrs(c, tx, a, forbidAttrs); err != nil {
  491. tx.Rollback()
  492. return
  493. }
  494. conts = append(conts, tmpCs...)
  495. if err = tx.Commit(); err != nil {
  496. log.Error("tx.Commit error(%v)", err)
  497. return
  498. }
  499. log.Info("aid(%d) end tran change attribute", aid)
  500. if len(conts) > 0 {
  501. s.arc.AddArcOper(c, a.Aid, uid, a.Attribute, a.TypeID, int16(a.State), a.Round, 1, strings.Join(conts, ","), "")
  502. }
  503. // NOTE: send modify_archive for sync dede.
  504. s.busModifyArchive(aid, false, false)
  505. return
  506. }
  507. // UpArcDtime update archive dtime by aid.
  508. func (s *Service) UpArcDtime(c context.Context, aid int64, dtime time.Time) (err error) {
  509. if delay, _ := s.arc.Delay(c, aid, archive.DelayTypeForUser); delay == nil {
  510. err = ecode.NothingFound
  511. return
  512. }
  513. log.Info("aid(%d) dtime(%d) begin tran change archive delaytime", aid, dtime)
  514. var tx *sql.Tx
  515. if tx, err = s.arc.BeginTran(c); err != nil {
  516. log.Error("begin tran error(%v)", err)
  517. return
  518. }
  519. if _, err = s.arc.TxUpDelayDtime(tx, aid, archive.DelayTypeForUser, dtime); err != nil {
  520. tx.Rollback()
  521. log.Error("s.arc.TxUpDelayDtime() error(%v)", err)
  522. return
  523. }
  524. if _, err = s.arc.TxUpArcMtime(tx, aid); err != nil {
  525. tx.Rollback()
  526. log.Error("s.arc.TxUpArcMtime() error(%v)", err)
  527. return
  528. }
  529. if err = tx.Commit(); err != nil {
  530. log.Error("tx.Commit error(%v)", err)
  531. return
  532. }
  533. log.Info("aid(%d) dtime(%d) end tran change archive delaytime", aid, dtime)
  534. return
  535. }
  536. /**
  537. * archive map
  538. * return map[int64]*archive.ChannelReviewInfo, error/nil
  539. */
  540. func (s *Service) checkChannelReview(c context.Context, arcs map[int64]*archive.Archive) (res map[int64]*archive.ChannelReviewInfo, err error) {
  541. var reviewRes map[int64]*tagrpc.ResChannelCheckBack
  542. res = map[int64]*archive.ChannelReviewInfo{}
  543. aids := []int64{}
  544. //检查是否开放浏览
  545. for _, a := range arcs {
  546. if a == nil {
  547. continue
  548. }
  549. if archive.NormalState(a.State) {
  550. aids = append(aids, a.Aid)
  551. }
  552. res[a.Aid] = &archive.ChannelReviewInfo{AID: a.Aid}
  553. }
  554. if len(aids) <= 0 {
  555. return
  556. }
  557. //检查是否有回查数据
  558. if _, aids, err = s.arc.RecheckIDByAID(c, archive.TypeChannelRecheck, aids); err != nil || len(aids) <= 0 {
  559. return
  560. }
  561. //实时查询是否变更了频道,频道是否需要回查
  562. if reviewRes, err = s.tag.CheckChannelReview(c, aids); err != nil {
  563. log.Error("checkChannelReview s.arc.CheckChannelReview error(%v) aids(%v)", err, aids)
  564. err = nil
  565. }
  566. for _, aid := range aids {
  567. res[aid].CanOperRecheck = true
  568. if reviewRes != nil && reviewRes[aid] != nil {
  569. res[aid].NeedReview = reviewRes[aid].CheckBack == 1
  570. cids := []int64{}
  571. for chid := range reviewRes[aid].Channels {
  572. cids = append(cids, chid)
  573. }
  574. res[aid].ChannelIDs = xstr.JoinInts(cids)
  575. }
  576. }
  577. return
  578. }
  579. //UpArcTag 保存tag
  580. func (s *Service) UpArcTag(c context.Context, uid int64, pm *archive.TagParam) (err error) {
  581. var (
  582. arc *archive.Archive
  583. checkRes map[int64]*archive.ChannelReviewInfo
  584. needReview, fromChannel bool
  585. canOperRecheck bool
  586. )
  587. //archive check
  588. if arc, err = s.arc.Archive(c, pm.AID); err != nil {
  589. log.Error("UpArcTag s.arc.Archive(%d) error(%v) params(%+v)", pm.AID, err, pm)
  590. return
  591. }
  592. if arc == nil {
  593. err = ecode.NothingFound
  594. return
  595. }
  596. //check whether need channel review: channel changes every 3h, used to notice
  597. fromChannel = strings.TrimSpace(pm.FromChannelReview) == "1"
  598. if fromChannel {
  599. if checkRes, err = s.checkChannelReview(c, map[int64]*archive.Archive{pm.AID: arc}); err != nil {
  600. log.Error("UpArcTag s.checkChannelReview(%d) error(%v) params(%+v)", pm.AID, err, pm)
  601. return
  602. }
  603. if checkRes != nil && checkRes[pm.AID] != nil {
  604. needReview = checkRes[pm.AID].NeedReview
  605. canOperRecheck = checkRes[pm.AID].CanOperRecheck
  606. }
  607. }
  608. if err = s.saveTag(c, uid, arc, pm.Tags, "", canOperRecheck, nil); err != nil {
  609. log.Error("UpArcTag s.saveTag error(%v) uid(%d) canoperrecheck(%v) params(%+v)", err, uid, canOperRecheck, pm)
  610. return
  611. }
  612. if fromChannel && !needReview {
  613. err = ecode.VideoupChannelReviewNotTrigger
  614. }
  615. return
  616. }
  617. //saveTag 更新tag,可能触发频道回查
  618. func (s *Service) saveTag(c context.Context, uid int64, arc *archive.Archive, tags, note string, canOperRecheck bool, ap *archive.ArcParam) (err error) {
  619. var (
  620. tx *sql.Tx
  621. tagChange bool
  622. remark, flowDiff string
  623. content []string
  624. )
  625. //compare tag
  626. aid := arc.Aid
  627. arc.Tag = strings.Join(Slice2String(SliceUnique(Slice2Interface(strings.Split(arc.Tag, ",")))), ",")
  628. tags = strings.Join(Slice2String(SliceUnique(Slice2Interface(strings.Split(tags, ",")))), ",")
  629. tagChange = arc.Tag != tags
  630. //update db by transaction
  631. if tx, err = s.arc.BeginTran(c); err != nil {
  632. log.Error("saveTag s.arc.BeginTran error(%v)", err)
  633. return
  634. }
  635. if tagChange {
  636. if _, err = s.arc.TxUpTag(tx, aid, tags); err != nil {
  637. log.Error("saveTag s.arc.TxUpTag(%d,%s) error(%v)", aid, tags, err)
  638. tx.Rollback()
  639. return
  640. }
  641. content = append(content, archive.Operformat(archive.OperTypeTag, arc.Tag, tags, archive.OperStyleOne))
  642. }
  643. if canOperRecheck {
  644. if err = s.arc.TxUpRecheckState(tx, archive.TypeChannelRecheck, aid, archive.RecheckStateDone); err != nil {
  645. log.Error("saveTag s.arc.TxUpRecheckState(%d) error(%v)", aid, err)
  646. tx.Rollback()
  647. return
  648. }
  649. if _, flowDiff, err = s.txAddOrUpdateFlowState(c, tx, aid, archive.FlowGroupNoChannel, uid, archive.PoolArcForbid, archive.FlowDelete, "频道回查取消频道禁止"); err != nil {
  650. log.Error("saveTag s.txAddOrUpdateFlowState(%d,%d) error(%v)", aid, uid, err)
  651. tx.Rollback()
  652. return
  653. }
  654. if flowDiff != "" {
  655. content = append(content, flowDiff)
  656. }
  657. remark = "频道已回查"
  658. }
  659. if err = tx.Commit(); err != nil {
  660. log.Error("saveTag tx.Commit error(%v)", err)
  661. return
  662. }
  663. log.Info("saveTag aid(%d) origin old-tags(%s) new-tags(%s) canoperrecheck(%v)", aid, arc.Tag, tags, canOperRecheck)
  664. //log tag change/channel review history
  665. remark = remark + note
  666. if len(content) > 0 || remark != "" {
  667. oper := &archive.ArcOper{
  668. Aid: aid,
  669. UID: uid,
  670. TypeID: arc.TypeID,
  671. State: archive.AccessState(arc.State, arc.Access),
  672. Round: arc.Round,
  673. Content: strings.Join(content, ","),
  674. Attribute: arc.Attribute,
  675. Remark: remark,
  676. }
  677. s.arc.AddArcOper(c, oper.Aid, oper.UID, oper.Attribute, oper.TypeID, oper.State, oper.Round, 0, oper.Content, oper.Remark)
  678. }
  679. //同步tag服务
  680. if tagChange {
  681. if ap != nil && ap.IsUpBind {
  682. err = s.upBind(c, aid, arc.Mid, tags, arc.TypeID)
  683. } else {
  684. err = s.adminBind(c, aid, arc.Mid, tags, arc.TypeID)
  685. }
  686. }
  687. //仅同步隐藏tag
  688. if !tagChange && ap != nil && ap.SyncHiddenTag {
  689. err = s.upBind(c, aid, arc.Mid, arc.Tag, arc.TypeID)
  690. }
  691. return
  692. }
  693. //BatchUpTag batch update archive tag
  694. func (s *Service) BatchUpTag(c context.Context, uid int64, pm *archive.BatchTagParam) (warning string, err error) {
  695. var (
  696. arcList map[int64]*archive.Archive
  697. reviewResp map[int64]*archive.ChannelReviewInfo
  698. ok, fromChannel bool
  699. warningID []int64
  700. originTag string
  701. )
  702. //get archive list
  703. if arcList, err = s.arc.Archives(c, pm.AIDs); err != nil {
  704. log.Error("batchTag vdaSvc.GetArchives error(%v) params(%+v)", err, pm)
  705. return
  706. }
  707. fromChannel = strings.TrimSpace(pm.FromList) == archive.FromListChannelReview
  708. //batch check whether need channel review, if not mark aid to noneedaids
  709. if fromChannel {
  710. if reviewResp, err = s.checkChannelReview(c, arcList); err != nil {
  711. log.Error("batchTag s.checkChannelReview error(%v) params(%+v)", err, pm)
  712. return
  713. }
  714. }
  715. //save each archive tag
  716. mp := &archive.MultSyncParam{
  717. ArcParam: &archive.ArcParam{
  718. UID: uid,
  719. Note: pm.Note,
  720. FromList: pm.FromList,
  721. IsUpBind: pm.IsUpBind,
  722. SyncHiddenTag: pm.SyncHiddenTag,
  723. },
  724. }
  725. for id, arc := range arcList {
  726. if fromChannel && (reviewResp == nil || reviewResp[id] == nil || !reviewResp[id].NeedReview) {
  727. warningID = append(warningID, id)
  728. }
  729. originTag = arc.Tag
  730. //tag change
  731. if pm.Action != "" {
  732. arc.Tag = StringHandler(arc.Tag, pm.Tags, ",", pm.Action == "delete")
  733. }
  734. //批量修改tag
  735. mp.Action = archive.ActionArchiveTag
  736. //频道回查逻辑
  737. if reviewResp != nil && reviewResp[id] != nil && reviewResp[id].CanOperRecheck {
  738. mp.Action = archive.ActionArchiveTagRecheck
  739. }
  740. //非频道回查的tag未变更/频道回查tag未变更且无回查记录,提前返回
  741. if originTag == arc.Tag && ((!fromChannel && !pm.SyncHiddenTag) || (fromChannel && mp.Action == archive.ActionArchiveTag)) {
  742. continue
  743. }
  744. mp.ArcParam.Aid = id
  745. mp.ArcParam.Tag = arc.Tag
  746. log.Info("BatchUpTag begin to pushmultsync action(%s) arcparam(%+v)", mp.Action, mp.ArcParam)
  747. if ok, err = s.busCache.PushMultSync(c, mp); err != nil {
  748. log.Error("BatchUpTag s.busCache.PushMultSync(%d,%s,%s,%v) error(%v)", id, arc.Tag, pm.Note, pm.FromList, err)
  749. return
  750. }
  751. if !ok {
  752. log.Warn("BatchUpTag s.busCache.PushMultSync(%d,%s,%s,%v)", id, arc.Tag, pm.Note, pm.FromList)
  753. continue
  754. }
  755. }
  756. if len(warningID) > 0 {
  757. warning = fmt.Sprintf("稿件 %s 不需要频道回查", xstr.JoinInts(warningID))
  758. }
  759. return
  760. }
  761. func (s *Service) dealTag(c context.Context, canOperRecheck bool, ap *archive.ArcParam) (err error) {
  762. var arc *archive.Archive
  763. if arc, err = s.arc.Archive(c, ap.Aid); err != nil {
  764. log.Error(" UpArcTag s.arc.Archive(%d) error(%v)", ap.Aid, err)
  765. return
  766. }
  767. if arc == nil {
  768. err = ecode.NothingFound
  769. return
  770. }
  771. if err = s.saveTag(c, ap.UID, arc, ap.Tag, ap.Note, canOperRecheck, ap); err != nil {
  772. log.Error("dealTag s.saveTag(%d,%d,%s,%s,%v) error(%v)", ap.UID, ap.Aid, ap.Tag, ap.Note, canOperRecheck, err)
  773. }
  774. return
  775. }
  776. func (s *Service) adminBind(c context.Context, aid, mid int64, tags string, typeID int16) (err error) {
  777. log.Info("before sync tag for aid(%d) tags(%s)", aid, tags)
  778. typeName := ""
  779. if tp, ok := s.typeCache[typeID]; ok && tp != nil {
  780. typeName = tp.Name
  781. if tp, err = s.TypeTopParent(typeID); err != nil {
  782. log.Error("adminBind s.TypeTopParent(%d) error(%v) aid(%d)", typeID, err, aid)
  783. err = nil
  784. } else if tp != nil {
  785. typeName = fmt.Sprintf("%s,%s", typeName, tp.Name)
  786. }
  787. }
  788. if err = s.tag.AdminBind(c, aid, mid, tags, typeName, ""); err != nil {
  789. log.Error("adminBind sync tag error(%v) aid(%+v) tags(%s) typename(%s)", err, aid, tags, typeName)
  790. return
  791. }
  792. log.Info("end sync tag for aid(%d) tags(%s) successfully", aid, tags)
  793. return
  794. }
  795. func (s *Service) upBind(c context.Context, aid, mid int64, tags string, typeID int16) (err error) {
  796. log.Info("upBind before sync tag for aid(%d) tags(%s)", aid, tags)
  797. typeName := ""
  798. if tp, ok := s.typeCache[typeID]; ok && tp != nil {
  799. typeName = tp.Name
  800. if tp, err = s.TypeTopParent(typeID); err != nil {
  801. log.Error("upBind s.TypeTopParent(%d) error(%v) aid(%d)", typeID, err, aid)
  802. err = nil
  803. } else if tp != nil {
  804. typeName = fmt.Sprintf("%s,%s", typeName, tp.Name)
  805. }
  806. }
  807. if err = s.tag.UpBind(c, aid, mid, tags, typeName, ""); err != nil {
  808. log.Error("upBind sync tag error(%v) aid(%+v) tags(%s) typename(%s)", err, aid, tags, typeName)
  809. return
  810. }
  811. log.Info("upBind end sync tag for aid(%d) tags(%s) successfully", aid, tags)
  812. return
  813. }
  814. //GetChannelInfo get channel info & hit_rules & need review
  815. func (s *Service) GetChannelInfo(c context.Context, aids []int64) (info map[int64]*archive.ChannelInfo, err error) {
  816. info = make(map[int64]*archive.ChannelInfo, len(aids))
  817. for _, aid := range aids {
  818. info[aid] = &archive.ChannelInfo{}
  819. }
  820. res, err := s.tag.CheckChannelReview(c, aids)
  821. if err != nil || res == nil {
  822. log.Error("GetChannelInfo s.tag.GetChannelInfo error(%v)/resp=nil aids(%v)", err, aids)
  823. return
  824. }
  825. if len(res) <= 0 {
  826. return
  827. }
  828. for aid := range info {
  829. if res[aid] == nil {
  830. continue
  831. }
  832. chs := []*archive.Channel{}
  833. for _, ch := range res[aid].Channels {
  834. if ch == nil {
  835. continue
  836. }
  837. chs = append(chs, &archive.Channel{
  838. TID: ch.Tid,
  839. Tname: ch.TName,
  840. HitRules: ch.HitRules,
  841. HitTagNames: ch.HitTNames,
  842. })
  843. }
  844. info[aid].Channels = chs
  845. info[aid].CheckBack = res[aid].CheckBack
  846. }
  847. return
  848. }
  849. // AITrack .
  850. func (s *Service) AITrack(c context.Context, aid []int64) (aids string, err error) {
  851. return s.data.ArchiveRelated(c, aid)
  852. }
  853. //ChannelNamesByAids .
  854. func (s *Service) ChannelNamesByAids(c context.Context, aids []int64) (aidMap map[int64][]string) {
  855. var (
  856. size = len(aids)
  857. maxSize = 1000
  858. sliceLimit = 50
  859. cnt = 0
  860. aidSli = [][]int64{}
  861. sli = []int64{}
  862. grp = errgroup.Group{}
  863. chlist chan map[int64][]string
  864. )
  865. if size > maxSize {
  866. size = maxSize
  867. }
  868. aidMap = make(map[int64][]string, size)
  869. //去重分组
  870. for _, aid := range aids {
  871. if _, exist := aidMap[aid]; exist {
  872. continue
  873. }
  874. aidMap[aid] = []string{}
  875. if cnt >= sliceLimit {
  876. cnt = 0
  877. aidSli = append(aidSli, sli)
  878. sli = []int64{}
  879. }
  880. cnt++
  881. sli = append(sli, aid)
  882. }
  883. if cnt > 0 {
  884. aidSli = append(aidSli, sli)
  885. }
  886. chlist = make(chan map[int64][]string, size)
  887. //batch get channel names
  888. for i, aids := range aidSli {
  889. if i >= maxSize {
  890. break
  891. }
  892. aidtmp := aids
  893. grp.Go(func() error {
  894. resp, err := s.tag.CheckChannelReview(context.TODO(), aidtmp)
  895. if err != nil {
  896. log.Error("ChannelNamesByAids s.tag.CheckChannelReview error(%v) aids(%v)", err, aidtmp)
  897. return nil
  898. }
  899. tnames := map[int64][]string{}
  900. for aid, rp := range resp {
  901. if rp == nil && rp.Channels == nil {
  902. continue
  903. }
  904. if _, exist := tnames[aid]; !exist {
  905. tnames[aid] = []string{}
  906. }
  907. for _, ch := range rp.Channels {
  908. if ch == nil {
  909. continue
  910. }
  911. tnames[aid] = append(tnames[aid], ch.TName)
  912. }
  913. }
  914. chlist <- tnames
  915. return nil
  916. })
  917. }
  918. grp.Wait()
  919. close(chlist)
  920. for tnames := range chlist {
  921. for aid, tname := range tnames {
  922. aidMap[aid] = tname
  923. }
  924. }
  925. return
  926. }