123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- package dao
- import (
- "context"
- "fmt"
- "time"
- pb "go-common/app/service/main/coin/api"
- "go-common/library/database/sql"
- "go-common/library/log"
- )
- const (
- _dayOfEachMonth = 25
- _sharding = 50
- _getTotalCoins = "SELECT aid,type,SUM(multiply) FROM coin_member_%d WHERE mid = ? GROUP BY aid,type ORDER BY cid desc LIMIT 20"
- _getTotalCoinsByMid = "SELECT IFNULL(SUM(multiply),0) FROM coin_member_%d WHERE mid=? AND aid=? AND type=?"
- _getTotalCoinsByMidAndUpMid = "SELECT IFNULL(coin_count,0) FROM coin_user_count_%02d WHERE mid=? AND up_mid=?"
- _getCoinCount = "SELECT IFNULL(count,0) FROM coin_count WHERE aid=? AND type=?"
- _updateCoinCount = "INSERT INTO coin_count(aid, type, `count`, ctime, mtime) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE count=count+?,mtime=?"
- _insertCoinArchive = "INSERT LOW_PRIORITY INTO coin_archive_%d (aid,type,mid,timestamp,multiply) VALUES (?,?,?,?,?)"
- _insertCoinMember = "INSERT LOW_PRIORITY INTO coin_member_%d (aid,type,mid,timestamp,multiply,up_mid) VALUES (?,?,?,?,?,?)"
- _updateCoinMemberCount = "INSERT INTO coin_user_count_%02d(mid, up_mid, coin_count, ctime) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE coin_count=coin_count+?"
- _getMemberVideoList = "SELECT aid,ip,timestamp,multiply FROM coin_member_%d WHERE mid = ? AND type=? AND timestamp > ? ORDER BY cid DESC LIMIT ?"
- // coin settle
- _hitSettlePeriod = "SELECT id FROM coin_settle_period WHERE to_year=? AND to_month=?"
- _updateSettleBD = "UPDATE coin_settle_%d SET exp_sub=?,description=?,itime=?,mtime=? WHERE aid=? and type=?" // update for bigdata
- _updateArchiveCoinsBD = "UPDATE coin_count SET count=?, mtime=? WHERE aid=? AND type=?" // update for bigdata
- )
- func (dao *Dao) midHit(mid int64) int64 {
- return mid % _sharding
- }
- func (dao *Dao) aidHit(aid int64) int64 {
- return aid % _sharding
- }
- func (dao *Dao) hitCoinPeriod(c context.Context, now time.Time) (id int64, err error) {
- year, month, day := now.Year(), now.Month(), now.Day()
- if now.Day() < _dayOfEachMonth {
- err = fmt.Errorf("opreation from bigdata must after the %dth of each month, today is (%d)", _dayOfEachMonth, day)
- log.Error("%v", err)
- return
- }
- row := dao.coin.QueryRow(c, _hitSettlePeriod, year, month)
- if err != nil {
- log.Error("dao.hitCoinPeriodStmt.QueryRow(%d, %d) error(%v)", year, month, err)
- return
- }
- if err = row.Scan(&id); err != nil {
- if err == sql.ErrNoRows {
- err = nil
- } else {
- log.Error("row.Scan error(%v)", err)
- }
- return
- }
- if id == 0 {
- err = fmt.Errorf("zero id at(%s)", now.String())
- log.Error("%v", err)
- }
- return
- }
- // UpdateCoinSettleBD update table coin_settle_%d.
- func (dao *Dao) UpdateCoinSettleBD(c context.Context, aid, tp, expSub int64, describe string, now time.Time) (affect int64, err error) {
- id, err := dao.hitCoinPeriod(c, now)
- if err != nil {
- return
- }
- sqlStr := fmt.Sprintf(_updateSettleBD, id)
- result, err := dao.coin.Exec(c, sqlStr, expSub, describe, now, now, aid, tp)
- if err != nil {
- log.Error("dao.coin.Exec(%s, %d, %s, %v, %v, %d) error(%v)", sqlStr, expSub, describe, now, now, aid, err)
- return
- }
- affect, err = result.LastInsertId()
- return
- }
- // CoinList return video list of coin added in one month
- func (dao *Dao) CoinList(c context.Context, mid, tp, ts, size int64) (rcs []*pb.ModelList, err error) {
- var rows *sql.Rows
- if rows, err = dao.coin.Query(c, fmt.Sprintf(_getMemberVideoList, dao.midHit(mid)), mid, tp, ts, size); err != nil {
- log.Error("dao.getMemberVideoList.Query() error(%v)", err)
- return
- }
- defer rows.Close()
- for rows.Next() {
- var rc = &pb.ModelList{}
- if err = rows.Scan(&rc.Aid, &rc.IP, &rc.Ts, &rc.Number); err != nil {
- log.Error("row.Scan(),error(%v)", err)
- rcs = nil
- return
- }
- rcs = append(rcs, rc)
- }
- return
- }
- // CoinsAddedByMid get coin added by mid of aid&tp.
- func (dao *Dao) CoinsAddedByMid(c context.Context, mid, aid, tp int64) (added int64, err error) {
- row := dao.coin.QueryRow(c, fmt.Sprintf(_getTotalCoinsByMid, dao.midHit(mid)), mid, aid, tp)
- if err = row.Scan(&added); err != nil {
- if err == sql.ErrNoRows {
- err = nil
- } else {
- PromError("mysql:CoinsAddedByMid")
- log.Errorv(c,
- log.KV("log", "row.Scan"),
- log.KV("err", err),
- log.KV("mid", mid),
- log.KV("aid", aid),
- )
- }
- }
- return
- }
- // AddedCoins get coins added to up_mid.
- func (dao *Dao) AddedCoins(c context.Context, mid, upMid int64) (added int64, err error) {
- row := dao.coin.QueryRow(c, fmt.Sprintf(_getTotalCoinsByMidAndUpMid, dao.midHit(mid)), mid, upMid)
- if err = row.Scan(&added); err != nil {
- if err == sql.ErrNoRows {
- err = nil
- } else {
- log.Error("row.Scan error(%v)", err)
- }
- }
- return
- }
- // UserCoinsAdded get user coin added.
- func (dao *Dao) UserCoinsAdded(c context.Context, mid int64) (addeds map[int64]int64, err error) {
- var rows *sql.Rows
- addeds = make(map[int64]int64)
- if rows, err = dao.coin.Query(c, fmt.Sprintf(_getTotalCoins, dao.midHit(mid)), mid); err != nil {
- log.Errorv(c,
- log.KV("log", "dao.getTotalCoins"),
- log.KV("err", err),
- log.KV("mid", mid),
- )
- PromError("db:UserCoinsAdded")
- return
- }
- defer rows.Close()
- for rows.Next() {
- var (
- added, aid, tp int64
- )
- if err = rows.Scan(&aid, &tp, &added); err != nil {
- log.Errorv(c,
- log.KV("log", "row.Scan"),
- log.KV("err", err),
- log.KV("mid", mid),
- )
- PromError("db:UserCoinsAdded")
- addeds = nil
- return
- }
- addeds[hashField(aid, tp)] = added
- }
- return
- }
- // InsertCoinArchive .
- func (dao *Dao) InsertCoinArchive(c context.Context, aid, tp, mid, timestamp, multiply int64) (err error) {
- if _, err = dao.coin.Exec(c, fmt.Sprintf(_insertCoinArchive, dao.aidHit(aid)), aid, tp, mid, timestamp, multiply); err != nil {
- log.Errorv(c,
- log.KV("log", "coin.Exec"),
- log.KV("err", err),
- log.KV("mid", mid),
- log.KV("aid", aid),
- )
- PromError("db:InsertCoinArchive")
- }
- return
- }
- // InsertCoinMember .
- func (dao *Dao) InsertCoinMember(c context.Context, aid, tp, mid, timestamp, multiply int64, upMid int64) (err error) {
- if _, err = dao.coin.Exec(c, fmt.Sprintf(_insertCoinMember, dao.midHit(mid)), aid, tp, mid, timestamp, multiply, upMid); err != nil {
- log.Errorv(c,
- log.KV("log", "coin.Exec"),
- log.KV("err", err),
- log.KV("mid", mid),
- log.KV("aid", aid),
- )
- PromError("db:InsertCoinMember")
- }
- return
- }
- // UpdateItemCoinCount update coin_count.
- func (dao *Dao) UpdateItemCoinCount(c context.Context, aid, tp, count int64) (err error) {
- now := time.Now()
- if _, err = dao.coin.Exec(c, _updateCoinCount, aid, tp, count, now, now, count, now); err != nil {
- log.Errorv(c,
- log.KV("log", "UpdateItemCoinCount("),
- log.KV("err", err),
- log.KV("count", count),
- log.KV("aid", aid),
- )
- PromError("db:UpdateItemCoinCount")
- }
- return
- }
- // RawItemCoin get count by aid.
- func (dao *Dao) RawItemCoin(c context.Context, aid, tp int64) (count int64, err error) {
- row := dao.coin.QueryRow(c, _getCoinCount, aid, tp)
- if err = row.Scan(&count); err != nil {
- if err == sql.ErrNoRows {
- err = nil
- } else {
- log.Errorv(c,
- log.KV("log", "row.Scan"),
- log.KV("err", err),
- )
- PromError("db:RawItemCoin")
- }
- }
- return
- }
- // UpdateCoinMemberCount for archive
- func (dao *Dao) UpdateCoinMemberCount(c context.Context, mid, upMid, count int64) (err error) {
- if _, err = dao.coin.Exec(c, fmt.Sprintf(_updateCoinMemberCount, dao.midHit(mid)), mid, upMid, count, time.Now(), count); err != nil {
- log.Errorv(c,
- log.KV("log", "updateCoinMemberCount"),
- log.KV("mid", mid),
- log.KV("upmid", upMid),
- log.KV("count", count),
- log.KV("err", err),
- )
- PromError("db:UpdateCoinMemberCount")
- }
- return
- }
- // BeginTran begin a tx.
- func (dao *Dao) BeginTran(c context.Context) (t *sql.Tx, err error) {
- t, err = dao.coin.Begin(c)
- if err != nil {
- PromError("db:BeginTran")
- log.Errorv(c,
- log.KV("log", "d.BeginTran"),
- log.KV("err", err),
- )
- }
- return
- }
- // UpdateItemCoins update table coin_archive
- func (dao *Dao) UpdateItemCoins(c context.Context, aid, tp, coins int64, now time.Time) (affect int64, err error) {
- if err != nil {
- return
- }
- result, err := dao.coin.Exec(c, _updateArchiveCoinsBD, coins, now, aid, tp)
- if err != nil {
- log.Error("dao.coin.Exec(%s, %d, %s, %d) error(%v)", _updateArchiveCoinsBD, coins, now, aid, err)
- return
- }
- affect, err = result.RowsAffected()
- return
- }
|