import.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package ugc
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. ugcmdl "go-common/app/job/main/tv/model/ugc"
  7. "go-common/app/service/main/archive/api"
  8. "go-common/library/database/sql"
  9. "go-common/library/log"
  10. "go-common/library/xstr"
  11. )
  12. const (
  13. _import = "SELECT mid FROM ugc_uploader WHERE toinit = 1 AND retry < UNIX_TIMESTAMP(now()) AND deleted = 0 LIMIT "
  14. _postponeUp = "UPDATE ugc_uploader SET retry = ? WHERE mid = ? AND deleted = 0"
  15. _finishUp = "UPDATE ugc_uploader SET toinit = 0 WHERE mid = ? AND deleted = 0"
  16. _filterAids = "SELECT aid FROM ugc_archive WHERE aid IN (%s) AND deleted = 0"
  17. _importArc = "REPLACE INTO ugc_archive(aid, videos, mid, typeid, title, cover, content, duration, copyright, pubtime, state) VALUES (?,?,?,?,?,?,?,?,?,?,?)"
  18. )
  19. // TxImportArc imports an arc
  20. func (d *Dao) TxImportArc(tx *sql.Tx, arc *api.Arc) (err error) {
  21. if _, err = tx.Exec(_importArc, arc.Aid,
  22. arc.Videos, arc.Author.Mid, arc.TypeID, arc.Title, arc.Pic, arc.Desc, arc.Duration,
  23. arc.Copyright, arc.PubDate, arc.State); err != nil {
  24. log.Error("_importArc, failed to update: (%v), Error: %v", arc, err)
  25. }
  26. return
  27. }
  28. // Import picks the uppers to init with the RPC data
  29. func (d *Dao) Import(c context.Context) (res []*ugcmdl.Upper, err error) {
  30. var rows *sql.Rows
  31. if rows, err = d.DB.Query(c, _import+fmt.Sprintf("%d", d.conf.UgcSync.Batch.ImportNum)); err != nil {
  32. log.Error("d.Import.Query error(%v)", err)
  33. return
  34. }
  35. defer rows.Close()
  36. for rows.Next() {
  37. var r = &ugcmdl.Upper{}
  38. if err = rows.Scan(&r.MID); err != nil {
  39. log.Error("Manual row.Scan() error(%v)", err)
  40. return
  41. }
  42. res = append(res, r)
  43. }
  44. if err = rows.Err(); err != nil {
  45. log.Error("d.Import.Query error(%v)", err)
  46. }
  47. return
  48. }
  49. // PpUpper means postpone upper init operation due to some error happened
  50. func (d *Dao) PpUpper(c context.Context, mid int64) (err error) {
  51. var delay = time.Now().Unix() + int64(d.conf.UgcSync.Frequency.ErrorWait)
  52. if _, err = d.DB.Exec(c, _postponeUp, delay, mid); err != nil {
  53. log.Error("PpUpper, failed to delay: (%v,%v), Error: %v", delay, mid, err)
  54. }
  55. return
  56. }
  57. // FinishUpper updates the upper's to_init status to 0 means we finish the import operation
  58. func (d *Dao) FinishUpper(c context.Context, mid int64) (err error) {
  59. if _, err = d.DB.Exec(c, _finishUp, mid); err != nil {
  60. log.Error("FinishUpper, failed to Update: (%v,%v), Error: %v", _finishUp, mid, err)
  61. }
  62. return
  63. }
  64. // FilterExist filters the existing archives and remove them from the res, to have only non-existing data to insert
  65. func (d *Dao) FilterExist(c context.Context, res *map[int64]*api.Arc, aids []int64) (err error) {
  66. var rows *sql.Rows
  67. if rows, err = d.DB.Query(c, fmt.Sprintf(_filterAids, xstr.JoinInts(aids))); err != nil {
  68. if err == sql.ErrNoRows {
  69. err = nil // if non of them exist, it's good, we do nothing
  70. return
  71. }
  72. log.Error("d._filterAids.Query error(%v)", err)
  73. return
  74. }
  75. defer rows.Close()
  76. for rows.Next() {
  77. var aidEx int64
  78. if err = rows.Scan(&aidEx); err != nil {
  79. log.Error("Manual row.Scan() error(%v)", err)
  80. return
  81. }
  82. delete(*res, aidEx) // remove existing data from the map
  83. }
  84. if err = rows.Err(); err != nil {
  85. log.Error("d.FilterExist.Query error(%v)", err)
  86. }
  87. return
  88. }