task_account_user.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/app/job/main/ugcpay/conf"
  7. "go-common/app/job/main/ugcpay/dao"
  8. "go-common/app/job/main/ugcpay/model"
  9. xsql "go-common/library/database/sql"
  10. "go-common/library/log"
  11. "github.com/pkg/errors"
  12. )
  13. type taskAccountUser struct {
  14. dao *dao.Dao
  15. taskPre TaskProcess
  16. dayOffset int
  17. namePrefix string
  18. tl *taskLog
  19. }
  20. func (s *taskAccountUser) Run() (err error) {
  21. // 检查日账单任务是否完成
  22. if _, finished := s.tl.checkTask(s.taskPre); !finished {
  23. log.Info("taskAccountUser check task: %s not finished", s.taskPre.Name())
  24. return nil
  25. }
  26. var (
  27. ctx = context.Background()
  28. finished bool
  29. expectFN = func(ctx context.Context) (expect int64, err error) {
  30. var (
  31. beginTime, _ = dayRange(s.dayOffset)
  32. ver = dailyBillVer(beginTime)
  33. )
  34. if expect, err = s.dao.CountDailyBillByVer(ctx, ver); err != nil {
  35. return
  36. }
  37. return
  38. }
  39. )
  40. if finished, err = checkOrCreateTaskFromLog(ctx, s, s.tl, expectFN); err != nil || finished {
  41. return
  42. }
  43. return s.run(ctx)
  44. }
  45. func (s *taskAccountUser) TTL() int32 {
  46. return 3600 * 2
  47. }
  48. func (s *taskAccountUser) Name() string {
  49. return fmt.Sprintf("%s_%d", s.namePrefix, dailyBillVer(time.Now()))
  50. }
  51. func (s *taskAccountUser) run(ctx context.Context) (err error) {
  52. ll := &dailyBillLLByVer{
  53. limit: 1000,
  54. dao: s.dao,
  55. }
  56. beginTime, _ := dayRange(s.dayOffset)
  57. ll.ver = dailyBillVer(beginTime)
  58. return runLimitedList(ctx, ll, time.Millisecond*2, s.runDailyBill)
  59. }
  60. func (s *taskAccountUser) runDailyBill(ctx context.Context, ele interface{}) (err error) {
  61. dailyBill, ok := ele.(*model.DailyBill)
  62. if !ok {
  63. err = errors.Errorf("taskAccountUser convert ele: %+v failed", dailyBill)
  64. return
  65. }
  66. log.Info("taskAccountUser handle dailyBill: %+v", dailyBill)
  67. fn := func(ctx context.Context, tx *xsql.Tx) (affected bool, err error) {
  68. var (
  69. account *model.UserAccount
  70. accountLog *model.AccountLog
  71. userProfit = dailyBill.In - dailyBill.Out
  72. )
  73. affected = true
  74. // 获得该 mid 的 account
  75. if account, err = s.dao.UserAccount(ctx, dailyBill.MID, dailyBill.Biz, dailyBill.Currency); err != nil {
  76. return
  77. }
  78. // 初始化 biz_account
  79. if account == nil {
  80. if account, err = s.initUserAccount(ctx, dailyBill.MID, dailyBill.Biz, dailyBill.Currency); err != nil {
  81. return
  82. }
  83. }
  84. // 虚拟账户平账,低于一定阈值由虚拟账户转出
  85. if userProfit < 0 {
  86. bizRefund, userRefund := calcRefundFee(account.Balance, -userProfit, conf.Conf.Biz.AccountUserMin)
  87. userProfit = -userRefund
  88. if bizRefund > 0 {
  89. // 获得 biz_account
  90. var bizAccount *model.BizAccount
  91. if bizAccount, err = s.dao.BizAccount(ctx, model.BizAsset, model.CurrencyBP); err != nil {
  92. return
  93. }
  94. // 初始化 biz_account
  95. if bizAccount == nil {
  96. if bizAccount, err = initBizAccount(ctx, model.BizAsset, model.CurrencyBP, s.dao); err != nil {
  97. return
  98. }
  99. }
  100. bizAccountLog := &model.AccountLog{
  101. AccountID: bizAccount.ID,
  102. Name: s.Name(),
  103. From: bizAccount.Balance,
  104. To: bizAccount.Balance - bizRefund,
  105. Ver: bizAccount.Ver + 1,
  106. State: model.AccountStateLoss,
  107. }
  108. bizAccount.Balance = bizAccount.Balance - bizRefund
  109. // 更新 biz account
  110. var rowAffected int64
  111. if rowAffected, err = s.dao.TXUpdateBizAccount(ctx, tx, bizAccount); err != nil {
  112. tx.Rollback()
  113. return
  114. }
  115. if rowAffected <= 0 {
  116. log.Error("TXUpdateBizAccount no affected biz account: %+v", bizAccount)
  117. tx.Rollback()
  118. affected = false
  119. return
  120. }
  121. // 添加资金池账户 log
  122. if err = s.dao.TXInsertBizAccountLog(ctx, tx, bizAccountLog); err != nil {
  123. tx.Rollback()
  124. return
  125. }
  126. }
  127. }
  128. accountLog = &model.AccountLog{
  129. AccountID: account.ID,
  130. Name: fmt.Sprintf("%s_%d", s.Name(), dailyBill.MID),
  131. From: account.Balance,
  132. To: account.Balance + userProfit,
  133. Ver: account.Ver + 1,
  134. State: model.AccountStateIncome,
  135. }
  136. account.Balance = account.Balance + userProfit
  137. // 更新 user account
  138. rowAffected, err := s.dao.TXUpdateUserAccount(ctx, tx, account)
  139. if err != nil {
  140. tx.Rollback()
  141. return
  142. }
  143. if rowAffected <= 0 {
  144. log.Error("TXUpdateUserAccount no affected user account: %+v", account)
  145. tx.Rollback()
  146. affected = false
  147. return
  148. }
  149. // 添加资金池账户 log
  150. err = s.dao.TXInsertUserAccountLog(ctx, tx, accountLog)
  151. if err != nil {
  152. tx.Rollback()
  153. return
  154. }
  155. log.Info("taskAccountUser: %+v ", account)
  156. return
  157. }
  158. return runTXCASTaskWithLog(ctx, s, s.tl, fn)
  159. }
  160. func (s *taskAccountUser) initUserAccount(ctx context.Context, mid int64, biz, currency string) (account *model.UserAccount, err error) {
  161. account = &model.UserAccount{}
  162. account.MID = mid
  163. account.Biz = biz
  164. account.Currency = currency
  165. account.State = model.StateValid
  166. account.Ver = 1
  167. if account.ID, err = s.dao.InsertUserAccount(ctx, account); err != nil {
  168. return
  169. }
  170. return
  171. }
  172. // 虚拟账户平账
  173. func calcRefundFee(balance int64, loss int64, minBalance int64) (bizRefund int64, userRefund int64) {
  174. if balance-loss >= minBalance {
  175. userRefund = loss
  176. bizRefund = 0
  177. return
  178. }
  179. if balance > minBalance {
  180. userRefund = balance - minBalance
  181. }
  182. bizRefund = loss - userRefund
  183. return
  184. }