archive.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. package archive
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "go-common/app/service/main/videoup/model/archive"
  10. xsql "go-common/library/database/sql"
  11. "go-common/library/log"
  12. "go-common/library/xstr"
  13. )
  14. const (
  15. // insert
  16. _inArcSQL = "INSERT INTO archive (mid,typeid,title,author,cover,content,tag,attribute,copyright,state,round,pubtime,ctime,mtime,reject_reason) VALUES (?,?,?,?,?,?,?,?,?,-30,0,?,?,?,'')"
  17. _inAddSQL = "INSERT INTO archive_addit (aid,mission_id,up_from,ipv6,source,order_id,flow_id,advertiser,flow_remark,description,desc_format_id,dynamic) VALUES (?,?,?,?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE mission_id=?,source=?,order_id=?,flow_id=?,advertiser=?,flow_remark=?,description=?,desc_format_id=?,dynamic=?"
  18. _inAddRdrSQL = "INSERT INTO archive_addit (aid,redirect_url) VALUES (?,?) ON DUPLICATE KEY UPDATE redirect_url=?"
  19. _inAddReaSQL = "INSERT INTO archive_addit (aid,recheck_reason) VALUES (?,?) ON DUPLICATE KEY UPDATE recheck_reason=?"
  20. _inAddMissionSQL = "INSERT INTO archive_addit (aid,mission_id) VALUES (?,?) ON DUPLICATE KEY UPDATE mission_id=?"
  21. // update
  22. _upArcSQL = "UPDATE archive SET typeid=?,title=?,cover=?,content=?,tag=?,copyright=? WHERE id=?"
  23. _upArcMidSQL = "UPDATE archive SET mid=?,state=? WHERE id=?"
  24. _upArcStateSQL = "UPDATE archive SET state=? WHERE id=?"
  25. _upArcAttrSQL = "UPDATE archive SET attribute=attribute&(~(1<<?))|(?<<?) WHERE id=?"
  26. _upTagSQL = "UPDATE archive SET tag=? WHERE id=?"
  27. // select
  28. _arcSQL = "SELECT id,mid,typeid,copyright,author,title,cover,reject_reason,content,tag,duration,round,attribute,access,state,pubtime,ctime,mtime FROM archive WHERE id=?"
  29. _arcAddSQL = "SELECT aid,mission_id,from_ip,ipv6,up_from,recheck_reason,redirect_url,source,order_id,flow_id,advertiser,flow_remark,description,desc_format_id,dynamic FROM archive_addit WHERE aid=?"
  30. _arcMidsSQL = "SELECT id,mid FROM archive WHERE id IN (%s)"
  31. _arcUpAllSQL = `SELECT id FROM archive WHERE mid = ? AND state != -100 ORDER BY id DESC LIMIT ?,?`
  32. _arcUpOpenSQL = `SELECT id FROM archive WHERE mid = ? AND (state >= 0 OR state = -6) ORDER BY id DESC LIMIT ?,?`
  33. _arcUpUnOpenSQL = `SELECT id FROM archive WHERE mid = ? AND state < 0 AND state != -100 AND state != -6 ORDER BY id DESC LIMIT ?,?`
  34. _arcUpAllCountSQL = "SELECT count(*) FROM archive WHERE mid = ? AND state != -100"
  35. _arcUpOpenCountSQL = "SELECT count(*) FROM archive WHERE mid = ? AND (state >= 0 OR state = -6)"
  36. _arcUpUnOpenCountSQL = "SELECT count(*) FROM archive WHERE mid = ? AND state < 0 AND state != -100 AND state != -6"
  37. _simpleArcSQL = "SELECT id,title,mid FROM archive WHERE id = ?"
  38. _getRecoSQL = "SELECT reco_aid FROM archive_recommend WHERE state= 0 and aid=? ORDER BY ctime asc"
  39. _rejectArcsSQL = "SELECT id,mid,title,reject_reason,mtime FROM archive WHERE mid = ? AND state = ? AND mtime > ? ORDER BY mtime DESC LIMIT ?,?"
  40. _rejectArcsCountSQL = "SELECT count(*) FROM archive WHERE mid = ? AND state = ? AND mtime > ?"
  41. _delRecoSQL = "UPDATE archive_recommend SET state=1 WHERE aid=?"
  42. _batchAddRecoSQL = "INSERT IGNORE INTO archive_recommend (aid,reco_aid) VALUES %s on duplicate key update state=0"
  43. //POI 元数据
  44. _arcPOISQL = "SELECT data from archive_biz WHERE aid=? AND type= ?"
  45. _arcVoteSQL = "SELECT data from archive_biz WHERE aid=? AND type= 2"
  46. _inADDBizSQL = "INSERT INTO archive_biz (aid,type,data) VALUES (?,?,?) ON DUPLICATE KEY UPDATE data=?"
  47. )
  48. // TxAddArchive insert archive.
  49. func (d *Dao) TxAddArchive(tx *xsql.Tx, a *archive.Archive) (aid int64, err error) {
  50. var now = time.Now()
  51. res, err := tx.Exec(_inArcSQL, a.Mid, a.TypeID, a.Title, a.Author, a.Cover, a.Desc, a.Tag, a.Attribute, a.Copyright, now, now, now)
  52. if err != nil {
  53. log.Error("d.inArc.Exec() error(%v)", err)
  54. return
  55. }
  56. if aid, err = res.LastInsertId(); err != nil {
  57. log.Error("res.LastInsertId() error(%v)", err)
  58. return
  59. }
  60. if strings.Contains(xstr.JoinInts(d.c.KeepArc.Aids)+",", strconv.FormatInt(aid, 10)+",") {
  61. keepAid := aid
  62. aid, err = d.TxAddArchive(tx, a)
  63. a.State = archive.StateForbidUpDelete
  64. a.Mid = d.c.KeepArc.Mid // 内部归属mid
  65. a.Aid = keepAid // 内部保留aid
  66. d.TxUpArchiveMid(tx, a)
  67. return
  68. }
  69. return
  70. }
  71. // TxUpArchive update archive.
  72. func (d *Dao) TxUpArchive(tx *xsql.Tx, a *archive.Archive) (rows int64, err error) {
  73. res, err := tx.Exec(_upArcSQL, a.TypeID, a.Title, a.Cover, a.Desc, a.Tag, a.Copyright, a.Aid)
  74. if err != nil {
  75. log.Error("d.upArc.Exec() error(%v)", err)
  76. return
  77. }
  78. rows, err = res.RowsAffected()
  79. return
  80. }
  81. // TxUpArchiveMid update archive mid.
  82. func (d *Dao) TxUpArchiveMid(tx *xsql.Tx, a *archive.Archive) (rows int64, err error) {
  83. res, err := tx.Exec(_upArcMidSQL, a.Mid, a.State, a.Aid)
  84. if err != nil {
  85. log.Error("d.upArcMid.Exec() error(%v)", err)
  86. return
  87. }
  88. rows, err = res.RowsAffected()
  89. return
  90. }
  91. // TxUpArchiveState update Archive state.
  92. func (d *Dao) TxUpArchiveState(tx *xsql.Tx, aid int64, state int8) (rows int64, err error) {
  93. res, err := tx.Exec(_upArcStateSQL, state, aid)
  94. if err != nil {
  95. log.Error("d.upVideoState.Exec error(%v)", err)
  96. return
  97. }
  98. rows, err = res.RowsAffected()
  99. return
  100. }
  101. // TxUpAddit update archive addit.
  102. func (d *Dao) TxUpAddit(tx *xsql.Tx, aid, missionID, orderID, flowID, descFormatID int64, ipv6 []byte, source, advertiser, flowRemark, desc, dynamic string, upFrom int8) (rows int64, err error) {
  103. if ipv6 == nil {
  104. ipv6 = []byte{}
  105. }
  106. res, err := tx.Exec(_inAddSQL, aid, missionID, upFrom, ipv6, source, orderID, flowID, advertiser, flowRemark, desc, descFormatID, dynamic, missionID, source, orderID, flowID, advertiser, flowRemark, desc, descFormatID, dynamic)
  107. if err != nil {
  108. log.Error("d.inArcAddit.Exec() error(%v)", err)
  109. return
  110. }
  111. rows, err = res.RowsAffected()
  112. return
  113. }
  114. // TxUpArchiveBiz update archive biz.
  115. func (d *Dao) TxUpArchiveBiz(tx *xsql.Tx, aid, bizType int64, data string) (rows int64, err error) {
  116. res, err := tx.Exec(_inADDBizSQL, aid, bizType, data, data)
  117. if err != nil {
  118. log.Error("d.TxUpArchiveBiz.Exec() error(%v)", err)
  119. return
  120. }
  121. rows, err = res.RowsAffected()
  122. return
  123. }
  124. // TxUpAdditReason update archive recheck_reason
  125. func (d *Dao) TxUpAdditReason(tx *xsql.Tx, aid int64, reason string) (rows int64, err error) {
  126. res, err := tx.Exec(_inAddReaSQL, aid, reason, reason)
  127. if err != nil {
  128. log.Error("d.inAdditReason.Exec() error(%v)", err)
  129. return
  130. }
  131. rows, err = res.RowsAffected()
  132. return
  133. }
  134. // TxUpAdditRedirect update archive redirect url.
  135. func (d *Dao) TxUpAdditRedirect(tx *xsql.Tx, aid int64, redirectURL string) (rows int64, err error) {
  136. res, err := tx.Exec(_inAddRdrSQL, aid, redirectURL, redirectURL)
  137. if err != nil {
  138. log.Error("d._inAdditRedirect.Exec() error(%v)", err)
  139. return
  140. }
  141. rows, err = res.RowsAffected()
  142. return
  143. }
  144. // TxUpArcAttr update attribute by aid.
  145. func (d *Dao) TxUpArcAttr(tx *xsql.Tx, aid int64, bit uint, val int32) (rows int64, err error) {
  146. res, err := tx.Exec(_upArcAttrSQL, bit, val, bit, aid)
  147. attSql := fmt.Sprintf("UPDATE archive SET attribute=attribute&(~(1<<%d))|(%d<<%d) WHERE id=%d", bit, val, bit, aid)
  148. log.Info("aid(%d) attribute update log sql (%s)", aid, attSql)
  149. if err != nil {
  150. log.Error("d.upArcAttr.Exec() error(%v)", err)
  151. return
  152. }
  153. rows, err = res.RowsAffected()
  154. return
  155. }
  156. // TxUpTag update tag by aid.
  157. func (d *Dao) TxUpTag(tx *xsql.Tx, aid int64, tag string) (rows int64, err error) {
  158. res, err := tx.Exec(_upTagSQL, tag, aid)
  159. if err != nil {
  160. log.Error("d.upTag.Exec() error(%v)", err)
  161. return
  162. }
  163. rows, err = res.RowsAffected()
  164. return
  165. }
  166. // Archive get a archive by avid.
  167. func (d *Dao) Archive(c context.Context, aid int64) (a *archive.Archive, err error) {
  168. var (
  169. row = d.rddb.QueryRow(c, _arcSQL, aid)
  170. reason, tag sql.NullString
  171. )
  172. a = &archive.Archive{}
  173. if err = row.Scan(&a.Aid, &a.Mid, &a.TypeID, &a.Copyright, &a.Author, &a.Title, &a.Cover, &reason, &a.Desc, &tag, &a.Duration,
  174. &a.Round, &a.Attribute, &a.Access, &a.State, &a.PTime, &a.CTime, &a.MTime); err != nil {
  175. if err == xsql.ErrNoRows {
  176. a = nil
  177. err = nil
  178. } else {
  179. log.Error("row.Scan error(%v)", err)
  180. }
  181. return
  182. }
  183. a.RejectReason = reason.String
  184. a.Tag = tag.String
  185. return
  186. }
  187. // POI get a archive POI by avid.
  188. func (d *Dao) POI(c context.Context, aid int64) (data []byte, err error) {
  189. var (
  190. row = d.rddb.QueryRow(c, _arcPOISQL, aid, archive.BIZPOI)
  191. )
  192. if err = row.Scan(&data); err != nil {
  193. if err == xsql.ErrNoRows {
  194. err = nil
  195. } else {
  196. log.Error("row.Scan error(%v)", err)
  197. }
  198. return
  199. }
  200. return
  201. }
  202. // Vote get a archive Vote by avid.
  203. func (d *Dao) Vote(c context.Context, aid int64) (data []byte, err error) {
  204. var (
  205. row = d.rddb.QueryRow(c, _arcVoteSQL, aid)
  206. )
  207. if err = row.Scan(&data); err != nil {
  208. if err == xsql.ErrNoRows {
  209. err = nil
  210. } else {
  211. log.Error("row.Scan error(%v)", err)
  212. }
  213. return
  214. }
  215. return
  216. }
  217. // Addit get a archive addit by avid.
  218. func (d *Dao) Addit(c context.Context, aid int64) (ad *archive.Addit, err error) {
  219. row := d.rddb.QueryRow(c, _arcAddSQL, aid)
  220. ad = &archive.Addit{}
  221. if err = row.Scan(&ad.Aid, &ad.MissionID, &ad.FromIP, &ad.IPv6, &ad.UpFrom, &ad.RecheckReason, &ad.RedirectURL, &ad.Source, &ad.OrderID, &ad.FlowID, &ad.Advertiser, &ad.FlowRemark, &ad.Desc, &ad.DescFormatID, &ad.Dynamic); err != nil {
  222. if err == xsql.ErrNoRows {
  223. err = nil
  224. ad = nil
  225. } else {
  226. log.Error("row.Scan error(%v)", err)
  227. }
  228. }
  229. return
  230. }
  231. // Mids multi get archive mid by aids.
  232. func (d *Dao) Mids(c context.Context, aids []int64) (mm map[int64]int64, err error) {
  233. rows, err := d.rddb.Query(c, fmt.Sprintf(_arcMidsSQL, xstr.JoinInts(aids)))
  234. if err != nil {
  235. log.Error("db.Query() error(%v)", err)
  236. return
  237. }
  238. defer rows.Close()
  239. mm = make(map[int64]int64, len(aids))
  240. for rows.Next() {
  241. var aid, mid int64
  242. if err = rows.Scan(&aid, &mid); err != nil {
  243. log.Error("rows.Scan error(%v)", err)
  244. return
  245. }
  246. mm[aid] = mid
  247. }
  248. return
  249. }
  250. // ArchivesUpAll get archive all aids by mid.
  251. func (d *Dao) ArchivesUpAll(c context.Context, mid int64, offset int, ps int) (aids []int64, err error) {
  252. rows, err := d.rddb.Query(c, _arcUpAllSQL, mid, offset, ps)
  253. if err != nil {
  254. log.Error("db.Query() error(%v)", err)
  255. return
  256. }
  257. defer rows.Close()
  258. for rows.Next() {
  259. var aid int64
  260. if err = rows.Scan(&aid); err != nil {
  261. log.Error("rows.Scan error(%v)", err)
  262. return
  263. }
  264. aids = append(aids, aid)
  265. }
  266. return
  267. }
  268. // ArchivesUpOpen get archive open aids by mid.
  269. func (d *Dao) ArchivesUpOpen(c context.Context, mid int64, offset int, ps int) (aids []int64, err error) {
  270. rows, err := d.rddb.Query(c, _arcUpOpenSQL, mid, offset, ps)
  271. if err != nil {
  272. log.Error("db.Query() error(%v)", err)
  273. return
  274. }
  275. defer rows.Close()
  276. for rows.Next() {
  277. var aid int64
  278. if err = rows.Scan(&aid); err != nil {
  279. log.Error("rows.Scan error(%v)", err)
  280. return
  281. }
  282. aids = append(aids, aid)
  283. }
  284. return
  285. }
  286. // ArchivesUpUnOpen get archive unopen aids by mid.
  287. func (d *Dao) ArchivesUpUnOpen(c context.Context, mid int64, offset int, ps int) (aids []int64, err error) {
  288. rows, err := d.rddb.Query(c, _arcUpUnOpenSQL, mid, offset, ps)
  289. if err != nil {
  290. log.Error("db.Query() error(%v)", err)
  291. return
  292. }
  293. defer rows.Close()
  294. for rows.Next() {
  295. var aid int64
  296. if err = rows.Scan(&aid); err != nil {
  297. log.Error("rows.Scan error(%v)", err)
  298. return
  299. }
  300. aids = append(aids, aid)
  301. }
  302. return
  303. }
  304. // ArchiveAllUpCount get all archive count by mid.
  305. func (d *Dao) ArchiveAllUpCount(c context.Context, mid int64) (count int64, err error) {
  306. row := d.rddb.QueryRow(c, _arcUpAllCountSQL, mid)
  307. if err = row.Scan(&count); err != nil {
  308. if err == sql.ErrNoRows {
  309. err = nil
  310. } else {
  311. log.Error("row.Scan error(%v)", err)
  312. }
  313. }
  314. return
  315. }
  316. // ArchiveOpenUpCount get open archive count by mid.
  317. func (d *Dao) ArchiveOpenUpCount(c context.Context, mid int64) (count int64, err error) {
  318. row := d.rddb.QueryRow(c, _arcUpOpenCountSQL, mid)
  319. if err = row.Scan(&count); err != nil {
  320. if err == sql.ErrNoRows {
  321. err = nil
  322. } else {
  323. log.Error("row.Scan error(%v)", err)
  324. }
  325. }
  326. return
  327. }
  328. // ArchiveUnOpenUpCount get un open archive count by mid.
  329. func (d *Dao) ArchiveUnOpenUpCount(c context.Context, mid int64) (count int64, err error) {
  330. row := d.rddb.QueryRow(c, _arcUpUnOpenCountSQL, mid)
  331. if err = row.Scan(&count); err != nil {
  332. if err == sql.ErrNoRows {
  333. err = nil
  334. } else {
  335. log.Error("row.Scan error(%v)", err)
  336. }
  337. }
  338. return
  339. }
  340. // SimpleArchive get a archive by avid.
  341. func (d *Dao) SimpleArchive(c context.Context, aid int64) (a *archive.SimpleArchive, err error) {
  342. row := d.rddb.QueryRow(c, _simpleArcSQL, aid)
  343. a = &archive.SimpleArchive{}
  344. if err = row.Scan(&a.Aid, &a.Title, &a.Mid); err != nil {
  345. if err == xsql.ErrNoRows {
  346. a = nil
  347. err = nil
  348. } else {
  349. log.Error("row.Scan error(%v)", err)
  350. }
  351. }
  352. return
  353. }
  354. // Recos fn
  355. func (d *Dao) Recos(c context.Context, aid int64) (aids []int64, err error) {
  356. rows, err := d.db.Query(c, _getRecoSQL, aid)
  357. if err != nil {
  358. log.Error("d.db.Query(%d) error(%v)", aid, err)
  359. return
  360. }
  361. defer rows.Close()
  362. for rows.Next() {
  363. var aid int64
  364. if err = rows.Scan(&aid); err != nil {
  365. log.Error("rows.Scan error(%v)", err)
  366. return
  367. }
  368. aids = append(aids, aid)
  369. }
  370. return
  371. }
  372. // RecoUpdate fn
  373. func (d *Dao) RecoUpdate(c context.Context, aid int64, recoIDs []int64) (effCnt int64, err error) {
  374. if len(recoIDs) == 0 {
  375. _, err = d.db.Query(c, _delRecoSQL, aid)
  376. if err != nil {
  377. log.Error("d.db.Query error(%v)| aid (%d)", err, aid)
  378. return
  379. }
  380. return
  381. }
  382. _, err = d.db.Query(c, _delRecoSQL, aid)
  383. if err != nil {
  384. log.Error("d.db.Query(%d) error(%v)", aid, err)
  385. return
  386. }
  387. var (
  388. batchVals = make([]string, 0, len(recoIDs))
  389. )
  390. for _, recoID := range recoIDs {
  391. batchVals = append(batchVals, fmt.Sprintf("(%d,%d)", aid, recoID))
  392. }
  393. res, err := d.db.Exec(c, fmt.Sprintf(_batchAddRecoSQL, strings.Join(batchVals, ",")))
  394. if err != nil {
  395. log.Error("d.db.Exe _batchAddRecoSQL batchVals(%+v) error(%+v)", batchVals, err)
  396. return
  397. }
  398. return res.RowsAffected()
  399. }
  400. // UpMissionID update mission_id for archive.
  401. func (d *Dao) UpMissionID(c context.Context, aa *archive.ArcMissionParam) (rows int64, err error) {
  402. res, err := d.db.Exec(c, _inAddMissionSQL, aa.AID, aa.MissionID, aa.MissionID)
  403. if err != nil {
  404. log.Error("UpMissionID.Exec error(%v)", err)
  405. return
  406. }
  407. return res.RowsAffected()
  408. }
  409. // RejectedArchives list rejected archives
  410. func (d *Dao) RejectedArchives(c context.Context, mid int64, state, offset, limit int32, start *time.Time) (arcs []*archive.Archive, count int32, err error) {
  411. row := d.slaveDB.QueryRow(c, _rejectArcsCountSQL, mid, state, start)
  412. if err = row.Scan(&count); err != nil {
  413. log.Error("rows.Scan error(%v)", err)
  414. return
  415. }
  416. if count == 0 {
  417. return
  418. }
  419. rows, err := d.slaveDB.Query(c, _rejectArcsSQL, mid, state, start, offset, limit)
  420. if err != nil {
  421. log.Error("db.Query error(%v)", err)
  422. return
  423. }
  424. for rows.Next() {
  425. a := archive.Archive{}
  426. if err = rows.Scan(&a.Aid, &a.Mid, &a.Title, &a.RejectReason, &a.MTime); err != nil {
  427. log.Error("rows.Scan error(%v)", err)
  428. return
  429. }
  430. arcs = append(arcs, &a)
  431. }
  432. return
  433. }