mysql.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package dao
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "strconv"
  7. "time"
  8. "go-common/app/job/main/coin/model"
  9. "go-common/library/log"
  10. )
  11. const (
  12. // SHARDING table shard.
  13. SHARDING = 50
  14. // coin_settle_x
  15. _getSettle = "SELECT id,mid,aid,type,coin_count,exp_sub,state FROM coin_settle_%d WHERE id> ? LIMIT 10000"
  16. _hitSettlePeriod = "SELECT id,from_year,from_month,from_day,to_year,to_month,to_day FROM coin_settle_period WHERE from_year*10000+from_month*100+from_day<=? AND to_year*10000+to_month*100+to_day>?"
  17. _getSettlePeriod = "SELECT id,from_year,from_month,from_day,to_year,to_month,to_day FROM coin_settle_period WHERE id=?"
  18. _clearCoinCount = "UPDATE coin_settle_%d SET coin_count=0, mtime=?"
  19. _updateCoinCount = "UPDATE coin_settle_%d SET coin_count=?, mtime=? WHERE aid=? AND type=?"
  20. _updateSettle = "UPDATE coin_settle_%d SET state=1, exp_total=?, mtime=? WHERE id=?"
  21. _upsertSettle = "INSERT INTO coin_settle_%d (mid,aid,type,coin_count,ctime,mtime) VALUES (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE coin_count=coin_count+?,state=0, mtime=?"
  22. // coin_archive_x
  23. _getTotalCoins = "SELECT aid,type,SUM(multiply) FROM coin_archive_%d WHERE timestamp>=? AND timestamp<? GROUP BY aid,type"
  24. )
  25. func hashField(aid, tp int64) int64 {
  26. return aid*1000 + tp
  27. }
  28. // SettlePeriod settle coin by tableid.
  29. func (dao *Dao) SettlePeriod(c context.Context, id int64) (period *model.CoinSettlePeriod, err error) {
  30. row := dao.getSettlePeriodStmt.QueryRow(c, id)
  31. period = &model.CoinSettlePeriod{}
  32. if err = row.Scan(&period.ID,
  33. &period.FromYear,
  34. &period.FromMonth,
  35. &period.FromDay,
  36. &period.ToYear,
  37. &period.ToMonth,
  38. &period.ToDay); err != nil {
  39. if err == sql.ErrNoRows {
  40. err = nil
  41. } else {
  42. log.Error("row.Scan error(%v)", err)
  43. }
  44. }
  45. return
  46. }
  47. // HitSettlePeriod get table id.
  48. func (dao *Dao) HitSettlePeriod(c context.Context, now time.Time) (period *model.CoinSettlePeriod, err error) {
  49. ymd, _ := strconv.Atoi(now.Format("20060102"))
  50. row := dao.hitSettlePeriodStmt.QueryRow(c, ymd, ymd)
  51. period = &model.CoinSettlePeriod{}
  52. if err = row.Scan(&period.ID,
  53. &period.FromYear,
  54. &period.FromMonth,
  55. &period.FromDay,
  56. &period.ToYear,
  57. &period.ToMonth,
  58. &period.ToDay); err != nil {
  59. if err == sql.ErrNoRows {
  60. err = nil
  61. } else {
  62. log.Error("row.Scan error(%v)", err)
  63. }
  64. return
  65. }
  66. if period.ID == 0 {
  67. err = fmt.Errorf("zero id at(%d)", ymd)
  68. }
  69. return
  70. }
  71. // UpdateSettle update settle info.
  72. func (dao *Dao) UpdateSettle(c context.Context, tableID, id, expTotal int64, now time.Time) (err error) {
  73. sqlStr := fmt.Sprintf(_updateSettle, tableID)
  74. if _, err = dao.coinDB.Exec(c, sqlStr, expTotal, now, id); err != nil {
  75. log.Error("dao.coinDB.Exec(%s, %d, %v, %d) error(%v)", sqlStr, expTotal, now, id, err)
  76. PromError("db:UpdateSettle")
  77. }
  78. return
  79. }
  80. // UpdateCoinCount update coin.
  81. func (dao *Dao) UpdateCoinCount(c context.Context, tableID, aid, tp, count int64, now time.Time) (err error) {
  82. sqlStr := fmt.Sprintf(_updateCoinCount, tableID)
  83. if _, err = dao.coinDB.Exec(c, sqlStr, count, now, aid, tp); err != nil {
  84. log.Error("dao.coinDB.Exec(%s, %d, %v, %d) error(%v)", sqlStr, count, now, aid, err)
  85. }
  86. return
  87. }
  88. // Every10000 get 10000 coin record.
  89. func (dao *Dao) Every10000(c context.Context, tableID int64, idx int64) (settles []*model.CoinSettle, maxid int64, err error) {
  90. sqlStr := fmt.Sprintf(_getSettle, tableID)
  91. rows, err := dao.coinDB.Query(c, sqlStr, idx)
  92. if err != nil {
  93. log.Error("dao.coinDB.Query(%s, %d) error(%v)", sqlStr, idx, err)
  94. return
  95. }
  96. defer rows.Close()
  97. maxid = idx
  98. settles = make([]*model.CoinSettle, 0, 10000)
  99. for rows.Next() {
  100. settle := &model.CoinSettle{}
  101. if err = rows.Scan(&settle.ID,
  102. &settle.Mid,
  103. &settle.Aid,
  104. &settle.AvType,
  105. &settle.CoinCount,
  106. &settle.ExpSub,
  107. &settle.State); err != nil {
  108. log.Error("rows.Scan error(%v)", err)
  109. return
  110. }
  111. if settle.ID > maxid {
  112. maxid = settle.ID
  113. }
  114. settles = append(settles, settle)
  115. }
  116. return
  117. }
  118. // UpsertSettle update coin settle.
  119. func (dao *Dao) UpsertSettle(c context.Context, tableID, mid, aid, tp, coinCount int64, now time.Time) (err error) {
  120. sqlStr := fmt.Sprintf(_upsertSettle, tableID)
  121. if _, err = dao.coinDB.Exec(c, sqlStr, mid, aid, tp, coinCount, now, now, coinCount, now); err != nil {
  122. log.Error("dao.coinDB.Exec(%s,%d,%d,%d,%v,%v,%d,%v) error(%v)", sqlStr, mid, aid, coinCount, now, now, coinCount, now, err)
  123. }
  124. return
  125. }
  126. // ClearCoinCount clear settle.
  127. func (dao *Dao) ClearCoinCount(c context.Context, tableID int64, now time.Time) (err error) {
  128. sqlStr := fmt.Sprintf(_clearCoinCount, tableID)
  129. if _, err = dao.coinDB.Exec(c, sqlStr, now); err != nil {
  130. log.Error("dao.coinDB.Exec(%s, %v) error(%v)", sqlStr, now, err)
  131. }
  132. return
  133. }
  134. // TotalCoins get total coins.
  135. func (dao *Dao) TotalCoins(c context.Context, id int, start, end time.Time) (coins map[int64]int64, err error) {
  136. rows, err := dao.getTotalCoinsStmt[id].Query(c, start.Unix(), end.Unix())
  137. if err != nil {
  138. log.Error("dao.getTotalCoinsStmt[%d].Query(%v, %v) error(%v)", id, start, end, err)
  139. return
  140. }
  141. coins = make(map[int64]int64)
  142. defer rows.Close()
  143. for rows.Next() {
  144. var aid, tp, count int64
  145. if err = rows.Scan(&aid, &tp, &count); err != nil {
  146. log.Error("rows.Scan error(%v)", err)
  147. return
  148. }
  149. coins[hashField(aid, tp)] = count
  150. }
  151. return
  152. }