mysql.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. pb "go-common/app/service/main/coin/api"
  7. "go-common/library/database/sql"
  8. "go-common/library/log"
  9. )
  10. const (
  11. _dayOfEachMonth = 25
  12. _sharding = 50
  13. _getTotalCoins = "SELECT aid,type,SUM(multiply) FROM coin_member_%d WHERE mid = ? GROUP BY aid,type ORDER BY cid desc LIMIT 20"
  14. _getTotalCoinsByMid = "SELECT IFNULL(SUM(multiply),0) FROM coin_member_%d WHERE mid=? AND aid=? AND type=?"
  15. _getTotalCoinsByMidAndUpMid = "SELECT IFNULL(coin_count,0) FROM coin_user_count_%02d WHERE mid=? AND up_mid=?"
  16. _getCoinCount = "SELECT IFNULL(count,0) FROM coin_count WHERE aid=? AND type=?"
  17. _updateCoinCount = "INSERT INTO coin_count(aid, type, `count`, ctime, mtime) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE count=count+?,mtime=?"
  18. _insertCoinArchive = "INSERT LOW_PRIORITY INTO coin_archive_%d (aid,type,mid,timestamp,multiply) VALUES (?,?,?,?,?)"
  19. _insertCoinMember = "INSERT LOW_PRIORITY INTO coin_member_%d (aid,type,mid,timestamp,multiply,up_mid) VALUES (?,?,?,?,?,?)"
  20. _updateCoinMemberCount = "INSERT INTO coin_user_count_%02d(mid, up_mid, coin_count, ctime) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE coin_count=coin_count+?"
  21. _getMemberVideoList = "SELECT aid,ip,timestamp,multiply FROM coin_member_%d WHERE mid = ? AND type=? AND timestamp > ? ORDER BY cid DESC LIMIT ?"
  22. // coin settle
  23. _hitSettlePeriod = "SELECT id FROM coin_settle_period WHERE to_year=? AND to_month=?"
  24. _updateSettleBD = "UPDATE coin_settle_%d SET exp_sub=?,description=?,itime=?,mtime=? WHERE aid=? and type=?" // update for bigdata
  25. _updateArchiveCoinsBD = "UPDATE coin_count SET count=?, mtime=? WHERE aid=? AND type=?" // update for bigdata
  26. )
  27. func (dao *Dao) midHit(mid int64) int64 {
  28. return mid % _sharding
  29. }
  30. func (dao *Dao) aidHit(aid int64) int64 {
  31. return aid % _sharding
  32. }
  33. func (dao *Dao) hitCoinPeriod(c context.Context, now time.Time) (id int64, err error) {
  34. year, month, day := now.Year(), now.Month(), now.Day()
  35. if now.Day() < _dayOfEachMonth {
  36. err = fmt.Errorf("opreation from bigdata must after the %dth of each month, today is (%d)", _dayOfEachMonth, day)
  37. log.Error("%v", err)
  38. return
  39. }
  40. row := dao.coin.QueryRow(c, _hitSettlePeriod, year, month)
  41. if err != nil {
  42. log.Error("dao.hitCoinPeriodStmt.QueryRow(%d, %d) error(%v)", year, month, err)
  43. return
  44. }
  45. if err = row.Scan(&id); err != nil {
  46. if err == sql.ErrNoRows {
  47. err = nil
  48. } else {
  49. log.Error("row.Scan error(%v)", err)
  50. }
  51. return
  52. }
  53. if id == 0 {
  54. err = fmt.Errorf("zero id at(%s)", now.String())
  55. log.Error("%v", err)
  56. }
  57. return
  58. }
  59. // UpdateCoinSettleBD update table coin_settle_%d.
  60. func (dao *Dao) UpdateCoinSettleBD(c context.Context, aid, tp, expSub int64, describe string, now time.Time) (affect int64, err error) {
  61. id, err := dao.hitCoinPeriod(c, now)
  62. if err != nil {
  63. return
  64. }
  65. sqlStr := fmt.Sprintf(_updateSettleBD, id)
  66. result, err := dao.coin.Exec(c, sqlStr, expSub, describe, now, now, aid, tp)
  67. if err != nil {
  68. log.Error("dao.coin.Exec(%s, %d, %s, %v, %v, %d) error(%v)", sqlStr, expSub, describe, now, now, aid, err)
  69. return
  70. }
  71. affect, err = result.LastInsertId()
  72. return
  73. }
  74. // CoinList return video list of coin added in one month
  75. func (dao *Dao) CoinList(c context.Context, mid, tp, ts, size int64) (rcs []*pb.ModelList, err error) {
  76. var rows *sql.Rows
  77. if rows, err = dao.coin.Query(c, fmt.Sprintf(_getMemberVideoList, dao.midHit(mid)), mid, tp, ts, size); err != nil {
  78. log.Error("dao.getMemberVideoList.Query() error(%v)", err)
  79. return
  80. }
  81. defer rows.Close()
  82. for rows.Next() {
  83. var rc = &pb.ModelList{}
  84. if err = rows.Scan(&rc.Aid, &rc.IP, &rc.Ts, &rc.Number); err != nil {
  85. log.Error("row.Scan(),error(%v)", err)
  86. rcs = nil
  87. return
  88. }
  89. rcs = append(rcs, rc)
  90. }
  91. return
  92. }
  93. // CoinsAddedByMid get coin added by mid of aid&tp.
  94. func (dao *Dao) CoinsAddedByMid(c context.Context, mid, aid, tp int64) (added int64, err error) {
  95. row := dao.coin.QueryRow(c, fmt.Sprintf(_getTotalCoinsByMid, dao.midHit(mid)), mid, aid, tp)
  96. if err = row.Scan(&added); err != nil {
  97. if err == sql.ErrNoRows {
  98. err = nil
  99. } else {
  100. PromError("mysql:CoinsAddedByMid")
  101. log.Errorv(c,
  102. log.KV("log", "row.Scan"),
  103. log.KV("err", err),
  104. log.KV("mid", mid),
  105. log.KV("aid", aid),
  106. )
  107. }
  108. }
  109. return
  110. }
  111. // AddedCoins get coins added to up_mid.
  112. func (dao *Dao) AddedCoins(c context.Context, mid, upMid int64) (added int64, err error) {
  113. row := dao.coin.QueryRow(c, fmt.Sprintf(_getTotalCoinsByMidAndUpMid, dao.midHit(mid)), mid, upMid)
  114. if err = row.Scan(&added); err != nil {
  115. if err == sql.ErrNoRows {
  116. err = nil
  117. } else {
  118. log.Error("row.Scan error(%v)", err)
  119. }
  120. }
  121. return
  122. }
  123. // UserCoinsAdded get user coin added.
  124. func (dao *Dao) UserCoinsAdded(c context.Context, mid int64) (addeds map[int64]int64, err error) {
  125. var rows *sql.Rows
  126. addeds = make(map[int64]int64)
  127. if rows, err = dao.coin.Query(c, fmt.Sprintf(_getTotalCoins, dao.midHit(mid)), mid); err != nil {
  128. log.Errorv(c,
  129. log.KV("log", "dao.getTotalCoins"),
  130. log.KV("err", err),
  131. log.KV("mid", mid),
  132. )
  133. PromError("db:UserCoinsAdded")
  134. return
  135. }
  136. defer rows.Close()
  137. for rows.Next() {
  138. var (
  139. added, aid, tp int64
  140. )
  141. if err = rows.Scan(&aid, &tp, &added); err != nil {
  142. log.Errorv(c,
  143. log.KV("log", "row.Scan"),
  144. log.KV("err", err),
  145. log.KV("mid", mid),
  146. )
  147. PromError("db:UserCoinsAdded")
  148. addeds = nil
  149. return
  150. }
  151. addeds[hashField(aid, tp)] = added
  152. }
  153. return
  154. }
  155. // InsertCoinArchive .
  156. func (dao *Dao) InsertCoinArchive(c context.Context, aid, tp, mid, timestamp, multiply int64) (err error) {
  157. if _, err = dao.coin.Exec(c, fmt.Sprintf(_insertCoinArchive, dao.aidHit(aid)), aid, tp, mid, timestamp, multiply); err != nil {
  158. log.Errorv(c,
  159. log.KV("log", "coin.Exec"),
  160. log.KV("err", err),
  161. log.KV("mid", mid),
  162. log.KV("aid", aid),
  163. )
  164. PromError("db:InsertCoinArchive")
  165. }
  166. return
  167. }
  168. // InsertCoinMember .
  169. func (dao *Dao) InsertCoinMember(c context.Context, aid, tp, mid, timestamp, multiply int64, upMid int64) (err error) {
  170. if _, err = dao.coin.Exec(c, fmt.Sprintf(_insertCoinMember, dao.midHit(mid)), aid, tp, mid, timestamp, multiply, upMid); err != nil {
  171. log.Errorv(c,
  172. log.KV("log", "coin.Exec"),
  173. log.KV("err", err),
  174. log.KV("mid", mid),
  175. log.KV("aid", aid),
  176. )
  177. PromError("db:InsertCoinMember")
  178. }
  179. return
  180. }
  181. // UpdateItemCoinCount update coin_count.
  182. func (dao *Dao) UpdateItemCoinCount(c context.Context, aid, tp, count int64) (err error) {
  183. now := time.Now()
  184. if _, err = dao.coin.Exec(c, _updateCoinCount, aid, tp, count, now, now, count, now); err != nil {
  185. log.Errorv(c,
  186. log.KV("log", "UpdateItemCoinCount("),
  187. log.KV("err", err),
  188. log.KV("count", count),
  189. log.KV("aid", aid),
  190. )
  191. PromError("db:UpdateItemCoinCount")
  192. }
  193. return
  194. }
  195. // RawItemCoin get count by aid.
  196. func (dao *Dao) RawItemCoin(c context.Context, aid, tp int64) (count int64, err error) {
  197. row := dao.coin.QueryRow(c, _getCoinCount, aid, tp)
  198. if err = row.Scan(&count); err != nil {
  199. if err == sql.ErrNoRows {
  200. err = nil
  201. } else {
  202. log.Errorv(c,
  203. log.KV("log", "row.Scan"),
  204. log.KV("err", err),
  205. )
  206. PromError("db:RawItemCoin")
  207. }
  208. }
  209. return
  210. }
  211. // UpdateCoinMemberCount for archive
  212. func (dao *Dao) UpdateCoinMemberCount(c context.Context, mid, upMid, count int64) (err error) {
  213. if _, err = dao.coin.Exec(c, fmt.Sprintf(_updateCoinMemberCount, dao.midHit(mid)), mid, upMid, count, time.Now(), count); err != nil {
  214. log.Errorv(c,
  215. log.KV("log", "updateCoinMemberCount"),
  216. log.KV("mid", mid),
  217. log.KV("upmid", upMid),
  218. log.KV("count", count),
  219. log.KV("err", err),
  220. )
  221. PromError("db:UpdateCoinMemberCount")
  222. }
  223. return
  224. }
  225. // BeginTran begin a tx.
  226. func (dao *Dao) BeginTran(c context.Context) (t *sql.Tx, err error) {
  227. t, err = dao.coin.Begin(c)
  228. if err != nil {
  229. PromError("db:BeginTran")
  230. log.Errorv(c,
  231. log.KV("log", "d.BeginTran"),
  232. log.KV("err", err),
  233. )
  234. }
  235. return
  236. }
  237. // UpdateItemCoins update table coin_archive
  238. func (dao *Dao) UpdateItemCoins(c context.Context, aid, tp, coins int64, now time.Time) (affect int64, err error) {
  239. if err != nil {
  240. return
  241. }
  242. result, err := dao.coin.Exec(c, _updateArchiveCoinsBD, coins, now, aid, tp)
  243. if err != nil {
  244. log.Error("dao.coin.Exec(%s, %d, %s, %d) error(%v)", _updateArchiveCoinsBD, coins, now, aid, err)
  245. return
  246. }
  247. affect, err = result.RowsAffected()
  248. return
  249. }