123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- package service
- import (
- "context"
- "fmt"
- "time"
- "go-common/app/job/main/ugcpay/conf"
- "go-common/app/job/main/ugcpay/dao"
- "go-common/app/job/main/ugcpay/model"
- xsql "go-common/library/database/sql"
- "go-common/library/log"
- "github.com/pkg/errors"
- )
- type taskAccountUser struct {
- dao *dao.Dao
- taskPre TaskProcess
- dayOffset int
- namePrefix string
- tl *taskLog
- }
- func (s *taskAccountUser) Run() (err error) {
- // 检查日账单任务是否完成
- if _, finished := s.tl.checkTask(s.taskPre); !finished {
- log.Info("taskAccountUser check task: %s not finished", s.taskPre.Name())
- return nil
- }
- var (
- ctx = context.Background()
- finished bool
- expectFN = func(ctx context.Context) (expect int64, err error) {
- var (
- beginTime, _ = dayRange(s.dayOffset)
- ver = dailyBillVer(beginTime)
- )
- if expect, err = s.dao.CountDailyBillByVer(ctx, ver); err != nil {
- return
- }
- return
- }
- )
- if finished, err = checkOrCreateTaskFromLog(ctx, s, s.tl, expectFN); err != nil || finished {
- return
- }
- return s.run(ctx)
- }
- func (s *taskAccountUser) TTL() int32 {
- return 3600 * 2
- }
- func (s *taskAccountUser) Name() string {
- return fmt.Sprintf("%s_%d", s.namePrefix, dailyBillVer(time.Now()))
- }
- func (s *taskAccountUser) run(ctx context.Context) (err error) {
- ll := &dailyBillLLByVer{
- limit: 1000,
- dao: s.dao,
- }
- beginTime, _ := dayRange(s.dayOffset)
- ll.ver = dailyBillVer(beginTime)
- return runLimitedList(ctx, ll, time.Millisecond*2, s.runDailyBill)
- }
- func (s *taskAccountUser) runDailyBill(ctx context.Context, ele interface{}) (err error) {
- dailyBill, ok := ele.(*model.DailyBill)
- if !ok {
- err = errors.Errorf("taskAccountUser convert ele: %+v failed", dailyBill)
- return
- }
- log.Info("taskAccountUser handle dailyBill: %+v", dailyBill)
- fn := func(ctx context.Context, tx *xsql.Tx) (affected bool, err error) {
- var (
- account *model.UserAccount
- accountLog *model.AccountLog
- userProfit = dailyBill.In - dailyBill.Out
- )
- affected = true
- // 获得该 mid 的 account
- if account, err = s.dao.UserAccount(ctx, dailyBill.MID, dailyBill.Biz, dailyBill.Currency); err != nil {
- return
- }
- // 初始化 biz_account
- if account == nil {
- if account, err = s.initUserAccount(ctx, dailyBill.MID, dailyBill.Biz, dailyBill.Currency); err != nil {
- return
- }
- }
- // 虚拟账户平账,低于一定阈值由虚拟账户转出
- if userProfit < 0 {
- bizRefund, userRefund := calcRefundFee(account.Balance, -userProfit, conf.Conf.Biz.AccountUserMin)
- userProfit = -userRefund
- if bizRefund > 0 {
- // 获得 biz_account
- var bizAccount *model.BizAccount
- if bizAccount, err = s.dao.BizAccount(ctx, model.BizAsset, model.CurrencyBP); err != nil {
- return
- }
- // 初始化 biz_account
- if bizAccount == nil {
- if bizAccount, err = initBizAccount(ctx, model.BizAsset, model.CurrencyBP, s.dao); err != nil {
- return
- }
- }
- bizAccountLog := &model.AccountLog{
- AccountID: bizAccount.ID,
- Name: s.Name(),
- From: bizAccount.Balance,
- To: bizAccount.Balance - bizRefund,
- Ver: bizAccount.Ver + 1,
- State: model.AccountStateLoss,
- }
- bizAccount.Balance = bizAccount.Balance - bizRefund
- // 更新 biz account
- var rowAffected int64
- if rowAffected, err = s.dao.TXUpdateBizAccount(ctx, tx, bizAccount); err != nil {
- tx.Rollback()
- return
- }
- if rowAffected <= 0 {
- log.Error("TXUpdateBizAccount no affected biz account: %+v", bizAccount)
- tx.Rollback()
- affected = false
- return
- }
- // 添加资金池账户 log
- if err = s.dao.TXInsertBizAccountLog(ctx, tx, bizAccountLog); err != nil {
- tx.Rollback()
- return
- }
- }
- }
- accountLog = &model.AccountLog{
- AccountID: account.ID,
- Name: fmt.Sprintf("%s_%d", s.Name(), dailyBill.MID),
- From: account.Balance,
- To: account.Balance + userProfit,
- Ver: account.Ver + 1,
- State: model.AccountStateIncome,
- }
- account.Balance = account.Balance + userProfit
- // 更新 user account
- rowAffected, err := s.dao.TXUpdateUserAccount(ctx, tx, account)
- if err != nil {
- tx.Rollback()
- return
- }
- if rowAffected <= 0 {
- log.Error("TXUpdateUserAccount no affected user account: %+v", account)
- tx.Rollback()
- affected = false
- return
- }
- // 添加资金池账户 log
- err = s.dao.TXInsertUserAccountLog(ctx, tx, accountLog)
- if err != nil {
- tx.Rollback()
- return
- }
- log.Info("taskAccountUser: %+v ", account)
- return
- }
- return runTXCASTaskWithLog(ctx, s, s.tl, fn)
- }
- func (s *taskAccountUser) initUserAccount(ctx context.Context, mid int64, biz, currency string) (account *model.UserAccount, err error) {
- account = &model.UserAccount{}
- account.MID = mid
- account.Biz = biz
- account.Currency = currency
- account.State = model.StateValid
- account.Ver = 1
- if account.ID, err = s.dao.InsertUserAccount(ctx, account); err != nil {
- return
- }
- return
- }
- // 虚拟账户平账
- func calcRefundFee(balance int64, loss int64, minBalance int64) (bizRefund int64, userRefund int64) {
- if balance-loss >= minBalance {
- userRefund = loss
- bizRefund = 0
- return
- }
- if balance > minBalance {
- userRefund = balance - minBalance
- }
- bizRefund = loss - userRefund
- return
- }
|