task_shell_recharge.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "time"
  7. "go-common/app/job/main/ugcpay/dao"
  8. "go-common/app/job/main/ugcpay/model"
  9. "go-common/app/job/main/ugcpay/service/pay"
  10. xsql "go-common/library/database/sql"
  11. "go-common/library/log"
  12. "github.com/pkg/errors"
  13. )
  14. // 结算贝壳任务
  15. type taskRechargeShell struct {
  16. dao *dao.Dao
  17. pay *pay.Pay
  18. rnd *rand.Rand
  19. monthOffset int
  20. namePrefix string
  21. tl *taskLog
  22. }
  23. func (s *taskRechargeShell) Run() (err error) {
  24. var (
  25. ctx = context.Background()
  26. finished bool
  27. expectFN = func(ctx context.Context) (expect int64, err error) {
  28. var (
  29. beginTime, _ = monthRange(s.monthOffset)
  30. monthVer = monthlyBillVer(beginTime)
  31. )
  32. if expect, err = s.dao.CountMonthlyBillByVer(ctx, monthVer); err != nil {
  33. return
  34. }
  35. return
  36. }
  37. )
  38. if finished, err = checkOrCreateTaskFromLog(ctx, s, s.tl, expectFN); err != nil || finished {
  39. return
  40. }
  41. return s.run(ctx)
  42. }
  43. func (s *taskRechargeShell) TTL() int32 {
  44. return 3600 * 2
  45. }
  46. func (s *taskRechargeShell) Name() string {
  47. return fmt.Sprintf("%s_%d", s.namePrefix, monthlyBillVer(time.Now()))
  48. }
  49. func (s *taskRechargeShell) run(ctx context.Context) (err error) {
  50. ll := &monthlyBillLL{
  51. limit: 1000,
  52. dao: s.dao,
  53. }
  54. beginTime, _ := monthRange(s.monthOffset)
  55. ll.ver = monthlyBillVer(beginTime)
  56. return runLimitedList(ctx, ll, time.Millisecond*2, s.runMonthlyBill)
  57. }
  58. func (s *taskRechargeShell) runMonthlyBill(ctx context.Context, ele interface{}) (err error) {
  59. monthlyBill, ok := ele.(*model.Bill)
  60. if !ok {
  61. return errors.Errorf("runMonthlyBill convert ele: %+v failed", monthlyBill)
  62. }
  63. log.Info("taskRechargeShell start handle monthly bill: %+v", monthlyBill)
  64. var fn func(ctx context.Context, tx *xsql.Tx) (affected bool, err error)
  65. if monthlyBill.In-monthlyBill.Out > 0 {
  66. fn = s.fnRechargeShell(ctx, monthlyBill)
  67. } else {
  68. fn = s.fnRecordRecharge(ctx, monthlyBill)
  69. }
  70. if err = runTXCASTaskWithLog(ctx, s, s.tl, fn); err != nil {
  71. return
  72. }
  73. return
  74. }
  75. func (s *taskRechargeShell) fnRecordRecharge(ctx context.Context, monthlyBill *model.Bill) func(ctx context.Context, tx *xsql.Tx) (affected bool, err error) {
  76. return func(ctx context.Context, tx *xsql.Tx) (affected bool, err error) {
  77. affected = true
  78. var (
  79. readyRecharge = monthlyBill.In - monthlyBill.Out
  80. )
  81. // 记录贝壳订单
  82. orderRechargeShell := &model.OrderRechargeShell{
  83. MID: monthlyBill.MID,
  84. OrderID: orderID(s.rnd),
  85. Biz: model.BizAsset,
  86. Amount: readyRecharge,
  87. State: "finished",
  88. Ver: monthlyBill.Ver,
  89. }
  90. var (
  91. orderRechargeShellLog = &model.OrderRechargeShellLog{
  92. OrderID: orderRechargeShell.OrderID,
  93. FromState: "finished",
  94. ToState: "finished",
  95. BillUserMonthlyID: monthlyBill.BillID,
  96. }
  97. )
  98. _, err = s.dao.TXInsertOrderRechargeShell(ctx, tx, orderRechargeShell)
  99. if err != nil {
  100. tx.Rollback()
  101. return
  102. }
  103. // 插入 order_recharge_shell_log, uk: bill_monthly_bill_id
  104. _, err = s.dao.TXInsertOrderRechargeShellLog(ctx, tx, orderRechargeShellLog)
  105. if err != nil {
  106. tx.Rollback()
  107. return
  108. }
  109. log.Info("fnRecordRecharge : %+v, orderRechargeShell: %+v", monthlyBill, orderRechargeShell)
  110. return
  111. }
  112. }
  113. func (s *taskRechargeShell) fnRechargeShell(ctx context.Context, monthlyBill *model.Bill) func(ctx context.Context, tx *xsql.Tx) (affected bool, err error) {
  114. return func(ctx context.Context, tx *xsql.Tx) (affected bool, err error) {
  115. affected = true
  116. var (
  117. account *model.UserAccount
  118. readyRecharge = monthlyBill.In - monthlyBill.Out
  119. accountLog *model.AccountLog
  120. )
  121. // 获得该 mid 的 虚拟账户
  122. if account, err = s.dao.UserAccount(ctx, monthlyBill.MID, model.BizAsset, model.CurrencyBP); err != nil {
  123. return
  124. }
  125. if account == nil {
  126. err = errors.Errorf("runMonthlyBill not found valid user_account, monthly_bill: %+v", monthlyBill)
  127. return
  128. }
  129. // 检查虚拟账户余额是否足够
  130. if account.Balance-readyRecharge < 0 {
  131. err = errors.Errorf("runMonthlyBill failed, account.Balance - readyRecharge < 0 !!!! account: %+v, monthly bill: %+v", account, monthlyBill)
  132. return
  133. }
  134. accountLog = &model.AccountLog{
  135. AccountID: account.ID,
  136. Name: s.Name(),
  137. From: account.Balance,
  138. To: account.Balance - readyRecharge,
  139. Ver: account.Ver + 1,
  140. State: model.AccountStateWithdraw,
  141. }
  142. account.Balance -= readyRecharge
  143. // 扣减虚拟账户余额
  144. rowAffected, err := s.dao.TXUpdateUserAccount(ctx, tx, account)
  145. if err != nil {
  146. tx.Rollback()
  147. return
  148. }
  149. if rowAffected <= 0 {
  150. log.Error("TXUpdateUserAccount no affected user account: %+v", account)
  151. affected = false
  152. tx.Rollback()
  153. return
  154. }
  155. err = s.dao.TXInsertUserAccountLog(ctx, tx, accountLog)
  156. if err != nil {
  157. tx.Rollback()
  158. return
  159. }
  160. // 开始转贝壳
  161. orderRechargeShell := &model.OrderRechargeShell{
  162. MID: monthlyBill.MID,
  163. OrderID: orderID(s.rnd),
  164. Biz: model.BizAsset,
  165. Amount: readyRecharge,
  166. State: "created",
  167. Ver: monthlyBill.Ver,
  168. }
  169. var (
  170. orderRechargeShellLog = &model.OrderRechargeShellLog{
  171. OrderID: orderRechargeShell.OrderID,
  172. FromState: "created",
  173. ToState: "created",
  174. BillUserMonthlyID: monthlyBill.BillID,
  175. }
  176. )
  177. _, err = s.dao.TXInsertOrderRechargeShell(ctx, tx, orderRechargeShell)
  178. if err != nil {
  179. tx.Rollback()
  180. return
  181. }
  182. // 插入 order_recharge_shell_log, uk: bill_monthly_bill_id
  183. _, err = s.dao.TXInsertOrderRechargeShellLog(ctx, tx, orderRechargeShellLog)
  184. if err != nil {
  185. tx.Rollback()
  186. return
  187. }
  188. // 请求支付中心转贝壳
  189. _, payJSON, err := s.pay.RechargeShell(orderRechargeShell.OrderID, orderRechargeShell.MID, orderRechargeShell.Amount, orderRechargeShell.Amount)
  190. if err != nil {
  191. tx.Rollback()
  192. return
  193. }
  194. if err = s.dao.PayRechargeShell(ctx, payJSON); err != nil {
  195. tx.Rollback()
  196. return
  197. }
  198. log.Info("fnRechargeShell: %+v, account: %+v, orderRechargeShell: %+v", monthlyBill, account, orderRechargeShell)
  199. return
  200. }
  201. }