tidb.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. pb "go-common/app/service/main/history/api/grpc"
  8. "go-common/app/service/main/history/model"
  9. "go-common/library/database/sql"
  10. "go-common/library/database/tidb"
  11. "go-common/library/log"
  12. "go-common/library/xstr"
  13. )
  14. var (
  15. _addHistorySQL = "INSERT INTO histories(mid, kid, business_id, aid, sid, epid, sub_type, cid, device, progress, view_at) VALUES(?,?,?,?,?,?,?,?,?,?,?)" +
  16. "ON DUPLICATE KEY UPDATE aid =?, sid=?, epid=?, sub_type=?, cid=?, device=?, progress=?, view_at=?"
  17. _businessesSQL = "SELECT id, name, ttl FROM business"
  18. _historiesSQL = "SELECT ctime, mtime, business_id, kid, aid, sid, epid, sub_type, cid, device, progress, view_at FROM histories WHERE mid=? AND view_at < ? ORDER BY view_at DESC LIMIT ?"
  19. _partHistoriesSQL = "SELECT ctime, mtime, business_id, kid, aid, sid, epid, sub_type, cid, device, progress, view_at FROM histories WHERE mid=? AND business_id in (%s) AND view_at < ? ORDER BY view_at DESC LIMIT ? "
  20. _queryHistoriesSQL = "SELECT ctime, mtime, business_id, kid, aid, sid, epid, sub_type, cid, device, progress, view_at FROM histories WHERE mid=? AND kid in (%s) AND business_id = ?"
  21. _historySQL = "SELECT ctime, mtime, business_id, kid, aid, sid, epid, sub_type, cid, device, progress, view_at FROM histories WHERE mid=? AND kid = ? AND business_id = ?"
  22. _deleteHistoriesSQL = "DELETE FROM histories WHERE mid = ? AND kid = ? AND business_id = ?"
  23. _clearHistoriesSQL = "DELETE FROM histories WHERE mid = ? AND business_id in (%s)"
  24. _clearAllHistoriesSQL = "DELETE FROM histories WHERE mid = ?"
  25. _userHide = "SELECT hide FROM users WHERE mid = ?"
  26. _updateUserHide = "INSERT INTO users(mid, hide) VALUES(?,?) ON DUPLICATE KEY UPDATE hide =?"
  27. )
  28. // AddHistories add histories to db
  29. func (d *Dao) AddHistories(c context.Context, hs []*model.History) (err error) {
  30. if len(hs) == 0 {
  31. return
  32. }
  33. var tx *tidb.Tx
  34. if tx, err = d.tidb.Begin(c); err != nil {
  35. log.Error("tx.BeginTran() error(%v)", err)
  36. return
  37. }
  38. for _, h := range hs {
  39. if _, err = tx.Stmts(d.insertStmt).Exec(c, h.Mid, h.Kid, h.BusinessID, h.Aid, h.Sid, h.Epid, h.SubType, h.Cid, h.Device, h.Progress, h.ViewAt,
  40. h.Aid, h.Sid, h.Epid, h.SubType, h.Cid, h.Device, h.Progress, h.ViewAt); err != nil {
  41. log.Errorv(c, log.D{Key: "mid", Value: h.Mid}, log.D{Key: "err", Value: err}, log.D{Key: "detail", Value: h})
  42. tx.Rollback()
  43. return
  44. }
  45. }
  46. if err = tx.Commit(); err != nil {
  47. log.Error("add histories commit(%+v) err: %v", hs, err)
  48. }
  49. return
  50. }
  51. // QueryBusinesses business
  52. func (d *Dao) QueryBusinesses(c context.Context) (res []*model.Business, err error) {
  53. var rows *tidb.Rows
  54. if rows, err = d.businessesStmt.Query(c); err != nil {
  55. log.Error("db.businessesStmt.Query error(%v)", err)
  56. return
  57. }
  58. defer rows.Close()
  59. for rows.Next() {
  60. b := &model.Business{}
  61. if err = rows.Scan(&b.ID, &b.Name, &b.TTL); err != nil {
  62. log.Error("rows.Business.Scan error(%v)", err)
  63. return
  64. }
  65. res = append(res, b)
  66. }
  67. err = rows.Err()
  68. return
  69. }
  70. // UserHistories get histories by time
  71. func (d *Dao) UserHistories(c context.Context, businesses []string, mid, viewAt, ps int64) (res map[string][]*model.History, err error) {
  72. var rows *tidb.Rows
  73. if len(businesses) == 0 {
  74. if rows, err = d.historiesStmt.Query(c, mid, viewAt, ps); err != nil {
  75. log.Error("db.Histories.Query error(%v)", err)
  76. return
  77. }
  78. } else {
  79. var ids []int64
  80. for _, b := range businesses {
  81. ids = append(ids, d.BusinessNames[b].ID)
  82. }
  83. sqlStr := fmt.Sprintf(_partHistoriesSQL, xstr.JoinInts(ids))
  84. if rows, err = d.tidb.Query(c, sqlStr, mid, viewAt, ps); err != nil {
  85. log.Error("UserHistories(%v,%d,%d,%d),db.Histories.Query error(%v)", businesses, mid, viewAt, ps, err)
  86. return
  87. }
  88. }
  89. defer rows.Close()
  90. for rows.Next() {
  91. b := &model.History{Mid: mid}
  92. if err = rows.Scan(&b.Ctime, &b.Mtime, &b.BusinessID, &b.Kid, &b.Aid, &b.Sid, &b.Epid, &b.SubType, &b.Cid, &b.Device, &b.Progress, &b.ViewAt); err != nil {
  93. log.Error("UserHistories(%v,%d,%d,%d),rows.Scan error(%v)", businesses, mid, viewAt, ps, err)
  94. if strings.Contains(fmt.Sprintf("%v", err), "18446744073709551615") {
  95. err = nil
  96. continue
  97. }
  98. return
  99. }
  100. b.Business = d.Businesses[b.BusinessID].Name
  101. if res == nil {
  102. res = make(map[string][]*model.History)
  103. }
  104. res[b.Business] = append(res[b.Business], b)
  105. }
  106. err = rows.Err()
  107. return
  108. }
  109. // Histories get histories by id
  110. func (d *Dao) Histories(c context.Context, business string, mid int64, ids []int64) (res map[int64]*model.History, err error) {
  111. var rows *tidb.Rows
  112. bid := d.BusinessNames[business].ID
  113. if len(ids) == 1 {
  114. if rows, err = d.historyStmt.Query(c, mid, ids[0], bid); err != nil {
  115. log.Error("db.Histories.Query error(%v)", err)
  116. return
  117. }
  118. } else {
  119. sqlStr := fmt.Sprintf(_queryHistoriesSQL, xstr.JoinInts(ids))
  120. if rows, err = d.tidb.Query(c, sqlStr, mid, bid); err != nil {
  121. log.Error("tidb.Histories.Query error(%v)", err)
  122. return
  123. }
  124. }
  125. defer rows.Close()
  126. for rows.Next() {
  127. b := &model.History{Mid: mid}
  128. if err = rows.Scan(&b.Ctime, &b.Mtime, &b.BusinessID, &b.Kid, &b.Aid, &b.Sid, &b.Epid, &b.SubType, &b.Cid, &b.Device, &b.Progress, &b.ViewAt); err != nil {
  129. log.Error("rows.Business.Scan error(%v)", err)
  130. return
  131. }
  132. b.Business = d.Businesses[b.BusinessID].Name
  133. if res == nil {
  134. res = make(map[int64]*model.History)
  135. }
  136. res[b.Kid] = b
  137. }
  138. err = rows.Err()
  139. return
  140. }
  141. // DeleteHistories .
  142. func (d *Dao) DeleteHistories(c context.Context, h *pb.DelHistoriesReq) (err error) {
  143. var tx *tidb.Tx
  144. if tx, err = d.tidb.Begin(c); err != nil {
  145. log.Error("tx.BeginTran() error(%v)", err)
  146. return
  147. }
  148. for _, r := range h.Records {
  149. _, err = tx.Stmts(d.deleteHistoriesStmt).Exec(c, h.Mid, r.ID, d.BusinessNames[r.Business].ID)
  150. if err != nil {
  151. tx.Rollback()
  152. return err
  153. }
  154. }
  155. if err = tx.Commit(); err != nil {
  156. log.Error("DeleteHistories(%+v),commit err:%+v", h, err)
  157. }
  158. return
  159. }
  160. // ClearHistory clear histories
  161. func (d *Dao) ClearHistory(c context.Context, mid int64, businesses []string) (err error) {
  162. var ids []string
  163. for _, b := range businesses {
  164. ids = append(ids, strconv.FormatInt(d.BusinessNames[b].ID, 10))
  165. }
  166. sqlStr := fmt.Sprintf(_clearHistoriesSQL, strings.Join(ids, ","))
  167. if _, err = d.tidb.Exec(c, sqlStr, mid); err != nil {
  168. log.Error("mid: %d clear(%v) err: %v", mid, businesses, err)
  169. }
  170. return
  171. }
  172. // ClearAllHistory clear all histories
  173. func (d *Dao) ClearAllHistory(c context.Context, mid int64) (err error) {
  174. if _, err = d.clearAllHistoriesStmt.Exec(c, mid); err != nil {
  175. log.Error("mid: %d clear all err: %v", mid, err)
  176. }
  177. return
  178. }
  179. // UpdateUserHide update user hide
  180. func (d *Dao) UpdateUserHide(c context.Context, mid int64, hide bool) (err error) {
  181. if _, err = d.updateUserHideStmt.Exec(c, mid, hide, hide); err != nil {
  182. log.Error("mid: %d updateUserHide(%v) err: %v", mid, hide, err)
  183. }
  184. return
  185. }
  186. // UserHide get user hide
  187. func (d *Dao) UserHide(c context.Context, mid int64) (hide bool, err error) {
  188. if err = d.userHideStmt.QueryRow(c, mid).Scan(&hide); err != nil {
  189. if err == sql.ErrNoRows {
  190. err = nil
  191. return
  192. }
  193. log.Error("mid: %d UserHide err: %v", mid, err)
  194. }
  195. return
  196. }