task_account_biz.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/app/job/main/ugcpay/dao"
  7. "go-common/app/job/main/ugcpay/model"
  8. xsql "go-common/library/database/sql"
  9. "go-common/library/log"
  10. "github.com/pkg/errors"
  11. )
  12. type taskAccountBiz struct {
  13. dao *dao.Dao
  14. taskPre TaskProcess
  15. dayOffset int
  16. namePrefix string
  17. tl *taskLog
  18. }
  19. func (s *taskAccountBiz) Run() (err error) {
  20. // 检查日账单任务是否完成
  21. if _, finished := s.tl.checkTask(s.taskPre); !finished {
  22. log.Info("taskAccountBiz check task: %s not finished", s.taskPre.Name())
  23. return nil
  24. }
  25. var (
  26. ctx = context.Background()
  27. finished bool
  28. expectFN = func(ctx context.Context) (expect int64, err error) {
  29. expect = 1
  30. return
  31. }
  32. )
  33. if finished, err = checkOrCreateTaskFromLog(ctx, s, s.tl, expectFN); err != nil || finished {
  34. return
  35. }
  36. return runTXCASTaskWithLog(ctx, s, s.tl, s.run)
  37. }
  38. func (s *taskAccountBiz) TTL() int32 {
  39. return 3600 * 2
  40. }
  41. func (s *taskAccountBiz) Name() string {
  42. return fmt.Sprintf("%s_%d", s.namePrefix, dailyBillVer(time.Now()))
  43. }
  44. func (s *taskAccountBiz) run(ctx context.Context, tx *xsql.Tx) (affected bool, err error) {
  45. var (
  46. timeFrom, timeTo time.Time = dayRange(s.dayOffset)
  47. ver = dailyBillVer(timeFrom)
  48. bizAccount *model.BizAccount
  49. bizAccountLog *model.AccountLog
  50. sumPaidOrderRealfee int64
  51. sumRefundedOrderRealfee int64
  52. sumBillDailyIn int64
  53. sumBillDailyOut int64
  54. bizProfit int64
  55. )
  56. affected = true
  57. if sumPaidOrderRealfee, err = s.dao.SumPaidOrderUserRealFee(ctx, timeFrom, timeTo); err != nil {
  58. return
  59. }
  60. if sumRefundedOrderRealfee, err = s.dao.SumRefundedOrderUserRealFee(ctx, timeFrom, timeTo); err != nil {
  61. return
  62. }
  63. if sumBillDailyIn, sumBillDailyOut, err = s.dao.SumDailyBill(ctx, ver); err != nil {
  64. return
  65. }
  66. log.Info("taskAccountBiz: %s, sumPaidOrderRealfee: %d, sumRefundedOrderRealfee: %d, sumBillDailyIn: %d, sumBillDailyOut: %d", s.Name(), sumPaidOrderRealfee, sumRefundedOrderRealfee, sumBillDailyIn, sumBillDailyOut)
  67. if sumPaidOrderRealfee < sumBillDailyIn {
  68. err = errors.Errorf("taskAccountBiz find sumPaidOrderRealfee(%d) < sumBillDailyIn(%d), ver: %d", sumPaidOrderRealfee, sumBillDailyIn, ver)
  69. return
  70. }
  71. if sumRefundedOrderRealfee < sumBillDailyOut {
  72. err = errors.Errorf("taskAccountBiz find sumRefundedOrderRealfee(%d) < sumBillDailyOut(%d), ver: %d", sumRefundedOrderRealfee, sumBillDailyOut, ver)
  73. return
  74. }
  75. // 日收益 - 日支出
  76. bizProfit = (sumPaidOrderRealfee - sumBillDailyIn) - (sumRefundedOrderRealfee - sumBillDailyOut)
  77. // 获得 biz_account
  78. if bizAccount, err = s.dao.BizAccount(ctx, model.BizAsset, model.CurrencyBP); err != nil {
  79. return
  80. }
  81. // 初始化 biz_account
  82. if bizAccount == nil {
  83. if bizAccount, err = initBizAccount(ctx, model.BizAsset, model.CurrencyBP, s.dao); err != nil {
  84. return
  85. }
  86. }
  87. bizAccountLog = &model.AccountLog{
  88. AccountID: bizAccount.ID,
  89. Name: s.Name(),
  90. From: bizAccount.Balance,
  91. To: bizAccount.Balance + bizProfit,
  92. Ver: bizAccount.Ver + 1,
  93. State: model.AccountStateProfit,
  94. }
  95. bizAccount.Balance = bizAccount.Balance + bizProfit
  96. // 更新 biz account
  97. rowAffected, err := s.dao.TXUpdateBizAccount(ctx, tx, bizAccount)
  98. if err != nil {
  99. tx.Rollback()
  100. return
  101. }
  102. if rowAffected <= 0 {
  103. log.Error("TXUpdateBizAccount no affected biz account: %+v", bizAccount)
  104. tx.Rollback()
  105. affected = false
  106. return
  107. }
  108. // 添加资金池账户 log
  109. err = s.dao.TXInsertBizAccountLog(ctx, tx, bizAccountLog)
  110. if err != nil {
  111. tx.Rollback()
  112. return
  113. }
  114. log.Info("taskAccountBiz: %+v ", bizAccount)
  115. return
  116. }
  117. func initBizAccount(ctx context.Context, biz, currency string, dao *dao.Dao) (bizAccount *model.BizAccount, err error) {
  118. bizAccount = &model.BizAccount{
  119. Biz: biz,
  120. Currency: currency,
  121. State: model.StateValid,
  122. Ver: 1,
  123. }
  124. if bizAccount.ID, err = dao.InsertBizAccount(ctx, bizAccount); err != nil {
  125. return
  126. }
  127. return
  128. }