tx.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "go-common/app/service/live/wallet/model"
  6. "go-common/library/database/sql"
  7. "go-common/library/ecode"
  8. "go-common/library/log"
  9. "time"
  10. )
  11. const (
  12. _selWallet = "SELECT uid,gold,iap_gold,silver,cost_base,gold_recharge_cnt,gold_pay_cnt,silver_pay_cnt,snapshot_time,snapshot_gold,snapshot_iap_gold,snapshot_silver,reserved1,reserved2 FROM user_wallet_%d WHERE uid=? FOR UPDATE"
  13. _recharge = "UPDATE user_wallet_%d set %s = %s + %d,%s=%s+%d where uid = ?"
  14. _rechargeWihoutCnt = "UPDATE user_wallet_%d set %s=%s + %d where uid = ?"
  15. _rechargeWithSnap = "UPDATE user_wallet_%d set %s=%s + %d,%s = %s+ %d, snapshot_time = ? , snapshot_gold = ? , snapshot_iap_gold = ? , snapshot_silver = ? where uid = ?"
  16. _rechargeWithSnapWithoutCnt = "UPDATE user_wallet_%d set %s = %s + %d, snapshot_time = ? , snapshot_gold = ? , snapshot_iap_gold = ? , snapshot_silver = ? where uid = ?"
  17. _exchange = "UPDATE user_wallet_%d set %s = %s - %d , %s = %s + %d, %s = %s + %d, %s = %s + %d where uid = ?"
  18. _exhcangeWithSnap = "UPDATE user_wallet_%d set %s = %s - %d , %s = %s + %d, %s = %s + %d, %s = %s + %d, snapshot_time = ? , snapshot_gold = ? , snapshot_iap_gold = ? , snapshot_silver = ? where uid = ? "
  19. _modifyCnt = "UPDATE user_wallet_%d set gold_pay_cnt = gold_pay_cnt + %d, gold_recharge_cnt = gold_recharge_cnt + %d, silver_pay_cnt = silver_pay_cnt + %d where uid = ?"
  20. )
  21. // 开启事务
  22. func (d *Dao) BeginTx(c context.Context) (conn *sql.Tx, err error) {
  23. return d.db.Begin(c)
  24. }
  25. func (d *Dao) DoTx(c context.Context, doFunc func(conn *sql.Tx) (v interface{}, err error)) (v interface{}, err error) {
  26. conn, err := d.BeginTx(c)
  27. if err != nil {
  28. err = ecode.ServerErr
  29. return
  30. }
  31. v, err = doFunc(conn)
  32. var txErr error
  33. if err != nil {
  34. conn.Rollback()
  35. err = ecode.ServerErr
  36. } else {
  37. txErr = conn.Commit()
  38. if txErr != nil {
  39. err = ecode.ServerErr
  40. v = nil
  41. }
  42. }
  43. return v, err
  44. }
  45. // 为了后续的更新获取数据
  46. func (d *Dao) WalletForUpdate(conn *sql.Tx, uid int64) (wallet *model.DetailWithSnapShot, err error) {
  47. row := conn.QueryRow(fmt.Sprintf(_selWallet, tableIndex(uid)), uid)
  48. wallet = &model.DetailWithSnapShot{}
  49. var snapShotTime time.Time
  50. if err = row.Scan(&wallet.Uid, &wallet.Gold, &wallet.IapGold, &wallet.Silver, &wallet.CostBase, &wallet.GoldRechargeCnt,
  51. &wallet.GoldPayCnt, &wallet.SilverPayCnt, &snapShotTime, &wallet.SnapShotGold, &wallet.SnapShotIapGold,
  52. &wallet.SnapShotSilver, &wallet.Reserved1, &wallet.Reserved2); err == sql.ErrNoRows {
  53. // 查询结果为空时,初始化数据
  54. _, err = d.InitWalletInTx(conn, uid, 0, 0, 0)
  55. wallet.SnapShotTime = snapShotTime.Format("2006-01-02 15:04:05")
  56. return
  57. }
  58. if err != nil {
  59. log.Error("[tx.wallet|Melonseed] row.Scan err: %s", err.Error())
  60. return
  61. }
  62. wallet.SnapShotTime = snapShotTime.Format("2006-01-02 15:04:05")
  63. return
  64. }
  65. // InitExp 初始化用户钱包,用于首次查询
  66. func (d *Dao) InitWalletInTx(conn *sql.Tx, uid int64, gold int64, iap_gold int64, silver int64) (row int64, err error) {
  67. res, err := conn.Exec(fmt.Sprintf(_insWallet, tableIndex(uid)), uid, gold, iap_gold, silver)
  68. if err != nil {
  69. log.Error("[tx.wallet|InitWallet] Exec err: %v", err)
  70. return
  71. }
  72. return res.RowsAffected()
  73. }
  74. func (d *Dao) execSqlInTx(conn *sql.Tx, sql *string, params ...interface{}) (affect int64, err error) {
  75. res, err := conn.Exec(*sql, params...)
  76. if err != nil {
  77. log.Error("[tx.wallet|execSqlInTx] Exec err: %v sql:%s", err, *sql)
  78. return
  79. }
  80. return res.RowsAffected()
  81. }
  82. func (d *Dao) changeCoinInTx(conn *sql.Tx, uid int64, sysCoinTypeNo int32, num int64, originWallet *model.DetailWithSnapShot, cntField string) (affect int64, err error) {
  83. // 判断
  84. coinType := model.GetSysCoinTypeByNo(sysCoinTypeNo)
  85. var s string
  86. absNum := num
  87. if absNum < 0 {
  88. absNum = -absNum
  89. }
  90. if model.TodayNeedSnapShot(originWallet) {
  91. if cntField == "" {
  92. s = getRechargeWithoutCntWithSnapShotSQL(uid, coinType, num)
  93. } else {
  94. s = getRechargeWithSnapShotSQL(uid, coinType, num, cntField, absNum)
  95. }
  96. date := time.Now().Format("2006-01-02 15:04:05")
  97. return d.execSqlInTx(conn, &s, date, originWallet.Gold, originWallet.IapGold, originWallet.Silver, uid)
  98. } else {
  99. if cntField == "" {
  100. s = getRechargeWithoutCntSQL(uid, coinType, num)
  101. } else {
  102. s = getRechargeSQL(uid, coinType, num, cntField, absNum)
  103. }
  104. return d.execSqlInTx(conn, &s, uid)
  105. }
  106. }
  107. // RechargeGold 充值IOS金瓜子 记入充值总值
  108. func (d *Dao) RechargeCoinInTx(conn *sql.Tx, uid int64, sysCoinTypeNo int32, num int64, originWallet *model.DetailWithSnapShot) (affect int64, err error) {
  109. rechargerCntField := model.GetRechargeCnt(sysCoinTypeNo)
  110. return d.changeCoinInTx(conn, uid, sysCoinTypeNo, num, originWallet, rechargerCntField)
  111. }
  112. func (d *Dao) PayCoinInTx(conn *sql.Tx, uid int64, sysCoinTypeNo int32, num int64, originWallet *model.DetailWithSnapShot) (affect int64, err error) {
  113. cntField := model.GetPayCnt(sysCoinTypeNo)
  114. return d.changeCoinInTx(conn, uid, sysCoinTypeNo, -num, originWallet, cntField)
  115. }
  116. func (d *Dao) ModifyCoinInTx(conn *sql.Tx, uid int64, sysCoinTypeNo int32, num int64, originWallet *model.DetailWithSnapShot) (affect int64, err error) {
  117. return d.changeCoinInTx(conn, uid, sysCoinTypeNo, num, originWallet, "")
  118. }
  119. func (d *Dao) ExchangeCoinInTx(conn *sql.Tx, uid int64, srcSysCoinTypeNo int32, srcNum int64, destSysCoinTypeNo int32, destNum int64, originWallet *model.DetailWithSnapShot) (affect int64, err error) {
  120. rechargeCntNum := destNum
  121. var rechargerCntField string
  122. if destSysCoinTypeNo == model.SysCoinTypeSilver { // 如果为银瓜子则不存在充值统计总数字段 使用 消费统计字段代替 适配sql
  123. rechargerCntField = "silver_pay_cnt"
  124. rechargeCntNum = 0
  125. } else {
  126. rechargerCntField = model.GetRechargeCnt(destSysCoinTypeNo)
  127. }
  128. payCntField := model.GetPayCnt(srcSysCoinTypeNo)
  129. srcCoinType := model.GetSysCoinTypeByNo(srcSysCoinTypeNo)
  130. destCoinType := model.GetSysCoinTypeByNo(destSysCoinTypeNo)
  131. var s string
  132. if model.TodayNeedSnapShot(originWallet) {
  133. s = fmt.Sprintf(_exhcangeWithSnap, tableIndex(uid), srcCoinType, srcCoinType, srcNum, destCoinType, destCoinType, destNum, rechargerCntField, rechargerCntField, rechargeCntNum, payCntField, payCntField, srcNum)
  134. date := time.Now().Format("2006-01-02 15:04:05")
  135. return d.execSqlInTx(conn, &s, date, originWallet.Gold, originWallet.IapGold, originWallet.Silver, uid)
  136. } else {
  137. s = fmt.Sprintf(_exchange, tableIndex(uid), srcCoinType, srcCoinType, srcNum, destCoinType, destCoinType, destNum, rechargerCntField, rechargerCntField, rechargeCntNum, payCntField, payCntField, srcNum)
  138. return d.execSqlInTx(conn, &s, uid)
  139. }
  140. }
  141. func getRechargeWithSnapShotSQL(uid int64, coinType string, num int64, cntField string, cntNum int64) string {
  142. return fmt.Sprintf(_rechargeWithSnap, tableIndex(uid), coinType, coinType, num, cntField, cntField, cntNum)
  143. }
  144. func getRechargeWithoutCntWithSnapShotSQL(uid int64, coinType string, num int64) string {
  145. return fmt.Sprintf(_rechargeWithSnapWithoutCnt, tableIndex(uid), coinType, coinType, num)
  146. }
  147. func getRechargeSQL(uid int64, coinType string, num int64, cntField string, cntNum int64) string {
  148. return fmt.Sprintf(_recharge, tableIndex(uid), coinType, coinType, num, cntField, cntField, cntNum)
  149. }
  150. func getRechargeWithoutCntSQL(uid int64, coinType string, num int64) string {
  151. return fmt.Sprintf(_rechargeWihoutCnt, tableIndex(uid), coinType, coinType, num)
  152. }
  153. func (d *Dao) NewCoinStreamRecordInTx(conn *sql.Tx, record *model.CoinStreamRecord) (int64, error) {
  154. s := fmt.Sprintf(_newCoinStremaRecord, getCoinStreamTable(record.TransactionId))
  155. date := model.GetWalletFormatTime(record.OpTime)
  156. return d.execSqlInTx(conn, &s, record.Uid, record.TransactionId, record.ExtendTid, record.CoinType,
  157. record.DeltaCoinNum, record.OrgCoinNum, record.OpResult, record.OpReason, record.OpType, date,
  158. record.BizCode, record.Area, record.Source, record.MetaData, record.BizSource,
  159. record.Platform, record.Reserved1, record.Version)
  160. }
  161. func (d *Dao) NewCoinExchangeRecordInTx(conn *sql.Tx, record *model.CoinExchangeRecord) (int64, error) {
  162. s := fmt.Sprintf(_newCoinExchange, getCoinExchangeTable(record.Uid))
  163. date := model.GetWalletFormatTime(record.ExchangeTime)
  164. return d.execSqlInTx(conn, &s, record.Uid, record.TransactionId, record.SrcType, record.SrcNum, record.DestType, record.DestNum, record.Status, date)
  165. }
  166. func (d *Dao) ModifyCntInTx(conn *sql.Tx, uid int64, goldPay int, goldRecharge int, silverPay int) (int64, error) {
  167. s := fmt.Sprintf(_modifyCnt, tableIndex(uid), goldPay, goldRecharge, silverPay)
  168. return d.execSqlInTx(conn, &s, uid)
  169. }