mysql.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package dao
  2. import (
  3. "context"
  4. "database/sql"
  5. "time"
  6. "go-common/app/service/main/history/model"
  7. "go-common/library/database/tidb"
  8. "go-common/library/log"
  9. )
  10. var (
  11. _businessesSQL = "SELECT id, name, ttl FROM business"
  12. _addHistorySQL = "INSERT INTO histories(mid, kid, business_id, aid, sid, epid, sub_type, cid, device, progress, view_at) VALUES(?,?,?,?,?,?,?,?,?,?,?)" +
  13. "ON DUPLICATE KEY UPDATE aid =?, sid=?, epid=?, sub_type=?, cid=?, device=?, progress=?, view_at=?"
  14. _deleteSQL = "DELETE FROM histories WHERE business_id = ? AND mtime >= ? AND mtime < ? LIMIT ?"
  15. _allHisSQL = "SELECT mtime FROM histories WHERE mid = ? AND business_id = ? ORDER BY mtime desc"
  16. _earlyHistorySQL = "SELECT mtime FROM histories WHERE business_id = ? ORDER BY mtime LIMIT 1"
  17. _delUserHisSQL = "DELETE FROM histories WHERE mid = ? AND mtime < ? and business_id = ?"
  18. )
  19. // Businesses business
  20. func (d *Dao) Businesses(c context.Context) (res []*model.Business, err error) {
  21. var rows *tidb.Rows
  22. if rows, err = d.businessesStmt.Query(c); err != nil {
  23. log.Error("db.businessesStmt.Query error(%v)", err)
  24. return
  25. }
  26. defer rows.Close()
  27. for rows.Next() {
  28. b := &model.Business{}
  29. if err = rows.Scan(&b.ID, &b.Name, &b.TTL); err != nil {
  30. log.Error("rows.Business.Scan error(%v)", err)
  31. return
  32. }
  33. res = append(res, b)
  34. }
  35. err = rows.Err()
  36. return
  37. }
  38. // DeleteHistories delete histories
  39. func (d *Dao) DeleteHistories(c context.Context, bid int64, beginTime, endTime time.Time) (rows int64, err error) {
  40. var res sql.Result
  41. begin := time.Now()
  42. if res, err = d.longDB.Exec(c, _deleteSQL, bid, beginTime, endTime, d.conf.Job.DeleteLimit); err != nil {
  43. log.Error("DeleteHistories(%v %v %v) err: %v", bid, beginTime, endTime, err)
  44. return
  45. }
  46. rows, err = res.RowsAffected()
  47. log.Info("clean business histories bid: %v begin: %v end: %v rows: %v, time: %v", bid, beginTime, endTime, rows, time.Since(begin))
  48. return
  49. }
  50. // AddHistories add histories to db
  51. func (d *Dao) AddHistories(c context.Context, hs []*model.History) (err error) {
  52. if len(hs) == 0 {
  53. return
  54. }
  55. var tx *tidb.Tx
  56. if tx, err = d.db.Begin(c); err != nil {
  57. log.Error("tx.BeginTran() error(%v)", err)
  58. return
  59. }
  60. for _, h := range hs {
  61. if _, err = tx.Stmts(d.insertStmt).Exec(c, h.Mid, h.Kid, h.BusinessID, h.Aid, h.Sid, h.Epid, h.SubType, h.Cid, h.Device, h.Progress, h.ViewAt,
  62. h.Aid, h.Sid, h.Epid, h.SubType, h.Cid, h.Device, h.Progress, h.ViewAt); err != nil {
  63. log.Error("addHistories exec err mid: %v err: %+v", h.Mid, err)
  64. tx.Rollback()
  65. return
  66. }
  67. }
  68. if err = tx.Commit(); err != nil {
  69. log.Error("add histories commit(%+v) err: %v", hs, err)
  70. return
  71. }
  72. log.Infov(c, log.D{Key: "log", Value: "addHistories db"}, log.D{Key: "len", Value: len(hs)})
  73. return
  74. }
  75. // DeleteUserHistories .
  76. func (d *Dao) DeleteUserHistories(c context.Context, mid, bid int64, t time.Time) (rows int64, err error) {
  77. var res sql.Result
  78. if res, err = d.delUserStmt.Exec(c, mid, t, bid); err != nil {
  79. log.Error("DeleteUserHistories(%v %v) err: %v", bid, t, err)
  80. return
  81. }
  82. rows, err = res.RowsAffected()
  83. return
  84. }
  85. // UserHistories .
  86. func (d *Dao) UserHistories(c context.Context, mid, businessID int64) (res []time.Time, err error) {
  87. var rows *tidb.Rows
  88. if rows, err = d.allHisStmt.Query(c, mid, businessID); err != nil {
  89. log.Error("db.UserHistories.Query error(%v)", err)
  90. return
  91. }
  92. defer rows.Close()
  93. for rows.Next() {
  94. var t time.Time
  95. if err = rows.Scan(&t); err != nil {
  96. log.Error("rows.UserHistories.Scan error(%v)", err)
  97. return
  98. }
  99. res = append(res, t)
  100. }
  101. err = rows.Err()
  102. return
  103. }
  104. // EarlyHistory .
  105. func (d *Dao) EarlyHistory(c context.Context, businessID int64) (res time.Time, err error) {
  106. if err = d.longDB.QueryRow(c, _earlyHistorySQL, businessID).Scan(&res); err != nil {
  107. if err == tidb.ErrNoRows {
  108. res = time.Now()
  109. err = nil
  110. return
  111. }
  112. log.Error("db.EarlyHistory.Query error(%v)", err)
  113. }
  114. return
  115. }