mysql.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package dao
  2. import (
  3. "context"
  4. xsql "database/sql"
  5. "time"
  6. "go-common/app/job/main/point/model"
  7. "go-common/library/database/sql"
  8. "github.com/pkg/errors"
  9. )
  10. const (
  11. _insertPoint = "INSERT INTO point_info(mid,point_balance,ver) VALUES(?,?,?);"
  12. _updatePoint = "UPDATE point_info SET point_balance=?,ver=? WHERE mid=? AND ver=?;"
  13. _insertPointHistory = "INSERT INTO point_change_history(mid,point,order_id,change_type,change_time,relation_id,point_balance,remark,operator) VALUES(?,?,?,?,?,?,?,?,?);"
  14. _checkHistoryCount = "SELECT COUNT(1) FROM point_change_history WHERE mid = ? AND order_id = ?;"
  15. _insertPointSQL = "INSERT INTO point_info(mid,point_balance,ver) VALUES(?,?,?)"
  16. _updatePointSQL = "UPDATE point_info SET point_balance = ?,ver=? WHERE mid=? AND ver=?"
  17. _pointInfoSQL = "SELECT mid,point_balance,ver FROM point_info WHERE mid=?"
  18. _InsertPointHistorySQL = "INSERT INTO point_change_history(mid,point,order_id,change_type,change_time,relation_id,point_balance,remark,operator) VALUES(?,?,?,?,?,?,?,?,?)"
  19. _midByMtime = "SELECT mid, point_balance FROM point_info where mtime > ?;"
  20. _lastOneHistory = "SELECT `point_balance` FROM `point_change_history` WHERE mid =? ORDER BY id DESC LIMIT 1;"
  21. _fixUpdatePointSQL = "UPDATE point_info SET point_balance = ? WHERE mid=?"
  22. )
  23. // BeginTran begin transaction.
  24. func (d *Dao) BeginTran(c context.Context) (*sql.Tx, error) {
  25. return d.db.Begin(c)
  26. }
  27. //TxPointInfo .
  28. func (d *Dao) TxPointInfo(c context.Context, tx *sql.Tx, mid int64) (pi *model.PointInfo, err error) {
  29. row := tx.QueryRow(_pointInfoSQL, mid)
  30. pi = new(model.PointInfo)
  31. if err = row.Scan(&pi.Mid, &pi.PointBalance, &pi.Ver); err != nil {
  32. if err == sql.ErrNoRows {
  33. err = nil
  34. pi = nil
  35. } else {
  36. err = errors.WithStack(err)
  37. }
  38. }
  39. return
  40. }
  41. //InsertPoint .
  42. func (d *Dao) InsertPoint(c context.Context, tx *sql.Tx, pi *model.PointInfo) (a int64, err error) {
  43. var res xsql.Result
  44. if res, err = tx.Exec(_insertPointSQL, pi.Mid, pi.PointBalance, pi.Ver); err != nil {
  45. err = errors.WithStack(err)
  46. return
  47. }
  48. if a, err = res.RowsAffected(); err != nil {
  49. err = errors.WithStack(err)
  50. return
  51. }
  52. return
  53. }
  54. //UpdatePointInfo .
  55. func (d *Dao) UpdatePointInfo(c context.Context, tx *sql.Tx, pi *model.PointInfo, ver int64) (a int64, err error) {
  56. var res xsql.Result
  57. if res, err = tx.Exec(_updatePointSQL, pi.PointBalance, pi.Ver, pi.Mid, ver); err != nil {
  58. err = errors.WithStack(err)
  59. return
  60. }
  61. if a, err = res.RowsAffected(); err != nil {
  62. err = errors.WithStack(err)
  63. return
  64. }
  65. return
  66. }
  67. //InsertPointHistory .
  68. func (d *Dao) InsertPointHistory(c context.Context, tx *sql.Tx, ph *model.PointHistory) (a int64, err error) {
  69. var res xsql.Result
  70. if res, err = tx.Exec(_InsertPointHistorySQL, ph.Mid, ph.Point, ph.OrderID, ph.ChangeType, ph.ChangeTime, ph.RelationID, ph.PointBalance, ph.Remark, ph.Operator); err != nil {
  71. err = errors.WithStack(err)
  72. return
  73. }
  74. if a, err = res.RowsAffected(); err != nil {
  75. err = errors.WithStack(err)
  76. }
  77. return
  78. }
  79. //AddPoint addPoint
  80. func (d *Dao) AddPoint(c context.Context, p *model.VipPoint) (a int64, err error) {
  81. var res xsql.Result
  82. if res, err = d.db.Exec(c, _insertPoint, &p.Mid, &p.PointBalance, &p.Ver); err != nil {
  83. err = errors.WithStack(err)
  84. return
  85. }
  86. if a, err = res.RowsAffected(); err != nil {
  87. err = errors.WithStack(err)
  88. return
  89. }
  90. return
  91. }
  92. //UpdatePoint UpdatePoint row
  93. func (d *Dao) UpdatePoint(c context.Context, p *model.VipPoint, oldver int64) (a int64, err error) {
  94. var res xsql.Result
  95. if res, err = d.db.Exec(c, _updatePoint, &p.PointBalance, &p.Ver, &p.Mid, oldver); err != nil {
  96. err = errors.WithStack(err)
  97. return
  98. }
  99. if a, err = res.RowsAffected(); err != nil {
  100. err = errors.WithStack(err)
  101. }
  102. return
  103. }
  104. //AddPointHistory add point history
  105. func (d *Dao) AddPointHistory(c context.Context, ph *model.VipPointChangeHistory) (a int64, err error) {
  106. var res xsql.Result
  107. if res, err = d.db.Exec(c, _insertPointHistory, &ph.Mid, &ph.Point, &ph.OrderID, &ph.ChangeType, &ph.ChangeTime, &ph.RelationID, &ph.PointBalance, &ph.Remark, &ph.Operator); err != nil {
  108. err = errors.WithStack(err)
  109. return
  110. }
  111. if a, err = res.RowsAffected(); err != nil {
  112. err = errors.WithStack(err)
  113. }
  114. return
  115. }
  116. //HistoryCount check if have repeat record.
  117. func (d *Dao) HistoryCount(c context.Context, mid int64, orderID string) (count int64, err error) {
  118. row := d.db.QueryRow(c, _checkHistoryCount, mid, orderID)
  119. if err = row.Scan(&count); err != nil {
  120. if err == xsql.ErrNoRows {
  121. err = nil
  122. } else {
  123. err = errors.WithStack(err)
  124. }
  125. }
  126. return
  127. }
  128. //MidsByMtime point mids by mtime.
  129. func (d *Dao) MidsByMtime(c context.Context, mtime time.Time) (pis []*model.PointInfo, err error) {
  130. var rows *sql.Rows
  131. if rows, err = d.db.Query(c, _midByMtime, mtime); err != nil {
  132. err = errors.WithStack(err)
  133. return
  134. }
  135. for rows.Next() {
  136. pi := new(model.PointInfo)
  137. if err = rows.Scan(&pi.Mid, &pi.PointBalance); err != nil {
  138. pis = nil
  139. err = errors.WithStack(err)
  140. return
  141. }
  142. pis = append(pis, pi)
  143. }
  144. return
  145. }
  146. //LastOneHistory last one history.
  147. func (d *Dao) LastOneHistory(c context.Context, mid int64) (point int64, err error) {
  148. row := d.db.QueryRow(c, _lastOneHistory, mid)
  149. if err = row.Scan(&point); err != nil {
  150. if err == xsql.ErrNoRows {
  151. err = nil
  152. } else {
  153. err = errors.WithStack(err)
  154. }
  155. }
  156. return
  157. }
  158. //FixPointInfo fix point data .
  159. func (d *Dao) FixPointInfo(c context.Context, mid int64, pointBalance int64) (a int64, err error) {
  160. var res xsql.Result
  161. if res, err = d.db.Exec(c, _fixUpdatePointSQL, pointBalance, mid); err != nil {
  162. err = errors.WithStack(err)
  163. return
  164. }
  165. if a, err = res.RowsAffected(); err != nil {
  166. err = errors.WithStack(err)
  167. return
  168. }
  169. return
  170. }