task_bill_daily.go 12 KB


  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "time"
  7. "go-common/app/job/main/ugcpay/dao"
  8. "go-common/app/job/main/ugcpay/model"
  9. "go-common/app/job/main/ugcpay/service/pay"
  10. xsql "go-common/library/database/sql"
  11. "go-common/library/log"
  12. "github.com/pkg/errors"
  13. )
  14. type taskBillDaily struct {
  15. dao *dao.Dao
  16. pay *pay.Pay
  17. rnd *rand.Rand
  18. dayOffset int
  19. namePrefix string
  20. tl *taskLog
  21. }
  22. func (s *taskBillDaily) Run() (err error) {
  23. var (
  24. ctx = context.Background()
  25. finished bool
  26. expectFN = func(ctx context.Context) (expect int64, err error) {
  27. var (
  28. beginTime, endTime = dayRange(s.dayOffset)
  29. expectPaid, expectRefunded int64
  30. )
  31. if expectPaid, err = s.dao.CountPaidOrderUser(ctx, beginTime, endTime); err != nil {
  32. return
  33. }
  34. if expectRefunded, err = s.dao.CountRefundedOrderUser(ctx, beginTime, endTime); err != nil {
  35. return
  36. }
  37. expect = expectPaid + expectRefunded
  38. return
  39. }
  40. )
  41. if finished, err = checkOrCreateTaskFromLog(ctx, s, s.tl, expectFN); err != nil || finished {
  42. return
  43. }
  44. return s.run(ctx)
  45. }
  46. func (s *taskBillDaily) TTL() int32 {
  47. return 3600 * 2
  48. }
  49. func (s *taskBillDaily) Name() string {
  50. return fmt.Sprintf("%s_%d", s.namePrefix, dailyBillVer(time.Now()))
  51. }
  52. // 日账单生成
  53. func (s *taskBillDaily) run(ctx context.Context) (err error) {
  54. // 已支付成功订单入账
  55. paidLL := &orderPaidLL{
  56. limit: 1000,
  57. dao: s.dao,
  58. }
  59. paidLL.beginTime, paidLL.endTime = dayRange(s.dayOffset)
  60. if err = runLimitedList(ctx, paidLL, time.Millisecond*5, s.runPaidOrder); err != nil {
  61. return
  62. }
  63. // 已退款订单入账
  64. refundLL := &orderRefundedLL{
  65. limit: 1000,
  66. dao: s.dao,
  67. }
  68. refundLL.beginTime, refundLL.endTime = dayRange(s.dayOffset)
  69. return runLimitedList(ctx, refundLL, time.Millisecond*5, s.runRefundedOrder)
  70. }
  71. // 处理退款order
  72. func (s *taskBillDaily) runRefundedOrder(ctx context.Context, ele interface{}) (err error) {
  73. order, ok := ele.(*model.Order)
  74. if !ok {
  75. return errors.Errorf("refundedOrderHandler convert ele: %+v failed", order)
  76. }
  77. log.Info("runRefundedOrder handle order: %+v", order)
  78. logOrder := &model.LogOrder{
  79. OrderID: order.OrderID,
  80. FromState: order.State,
  81. ToState: model.OrderStateRefundFinished,
  82. }
  83. order.State = model.OrderStateRefundFinished
  84. fn := func(ctx context.Context, tx *xsql.Tx) (affected bool, err error) {
  85. var (
  86. bill *model.DailyBill
  87. asset *model.Asset
  88. ver = dailyBillVer(order.RefundTime)
  89. monthVer = monthlyBillVer(order.RefundTime)
  90. billDailyLog *model.LogBillDaily
  91. userIncome, _ = calcAssetIncome(order.RealFee) // 收入计算结果
  92. )
  93. affected = true
  94. // 获得订单对应的asset
  95. if asset, err = s.dao.Asset(ctx, order.OID, order.OType, order.Currency); err != nil {
  96. return
  97. }
  98. if asset == nil {
  99. err = errors.Errorf("dailyBillHander find invalid asset order, order: %+v", order)
  100. return
  101. }
  102. // 获得该mid对应的日账单
  103. if bill, err = s.dao.DailyBill(ctx, asset.MID, model.BizAsset, model.CurrencyBP, ver); err != nil {
  104. return
  105. }
  106. if bill == nil {
  107. if bill, err = s.initDailyBill(ctx, asset.MID, model.BizAsset, model.CurrencyBP, ver, monthVer); err != nil {
  108. return
  109. }
  110. }
  111. // 计算日账单
  112. billDailyLog = &model.LogBillDaily{
  113. BillID: bill.BillID,
  114. FromIn: bill.In,
  115. ToIn: bill.In,
  116. FromOut: bill.Out + userIncome,
  117. ToOut: bill.Out,
  118. OrderID: order.OrderID + "_r",
  119. }
  120. bill.Out += userIncome
  121. // 更新order
  122. rowAffected, err := s.dao.TXUpdateOrder(ctx, tx, order)
  123. if err != nil {
  124. tx.Rollback()
  125. return
  126. }
  127. if rowAffected <= 0 {
  128. tx.Rollback()
  129. log.Error("UpdateOrder no affected from order: %+v", order)
  130. affected = false
  131. return
  132. }
  133. // 添加 order log
  134. _, err = s.dao.TXInsertOrderUserLog(ctx, tx, logOrder)
  135. if err != nil {
  136. tx.Rollback()
  137. return
  138. }
  139. // 更新daily_bill
  140. rowAffected, err = s.dao.TXUpdateDailyBill(ctx, tx, bill)
  141. if err != nil {
  142. tx.Rollback()
  143. return
  144. }
  145. if rowAffected <= 0 {
  146. log.Error("TXUpdateDailyBill no affected bill: %+v", bill)
  147. tx.Rollback()
  148. affected = false
  149. return
  150. }
  151. // 添加 daily bill log , uk order_id
  152. _, err = s.dao.TXInsertLogDailyBill(ctx, tx, billDailyLog)
  153. if err != nil {
  154. tx.Rollback()
  155. return
  156. }
  157. // 更新 aggr
  158. aggrMonthlyAsset := &model.AggrIncomeUserAsset{
  159. MID: bill.MID,
  160. Currency: bill.Currency,
  161. Ver: monthVer,
  162. OID: order.OID,
  163. OType: order.OType,
  164. }
  165. aggrAllAsset := &model.AggrIncomeUserAsset{
  166. MID: bill.MID,
  167. Currency: bill.Currency,
  168. Ver: 0,
  169. OID: order.OID,
  170. OType: order.OType,
  171. }
  172. aggrUser := &model.AggrIncomeUser{
  173. MID: bill.MID,
  174. Currency: bill.Currency,
  175. }
  176. _, err = s.dao.TXUpsertDeltaAggrIncomeUserAsset(ctx, tx, aggrAllAsset, 0, 1, 0, userIncome)
  177. if err != nil {
  178. tx.Rollback()
  179. return
  180. }
  181. _, err = s.dao.TXUpsertDeltaAggrIncomeUserAsset(ctx, tx, aggrMonthlyAsset, 0, 1, 0, userIncome)
  182. if err != nil {
  183. tx.Rollback()
  184. return
  185. }
  186. _, err = s.dao.TXUpsertDeltaAggrIncomeUser(ctx, tx, aggrUser, 0, 1, 0, userIncome)
  187. if err != nil {
  188. tx.Rollback()
  189. return
  190. }
  191. log.Info("Settle daily bill: %+v, aggrAllAsset: %+v, aggrMonthlyAsset: %+v, aggrUser: %+v, from refunded order: %+v", bill, aggrAllAsset, aggrMonthlyAsset, aggrUser, order)
  192. return
  193. }
  194. return runTXCASTaskWithLog(ctx, s, s.tl, fn)
  195. }
  196. func (s *taskBillDaily) runPaidOrder(ctx context.Context, ele interface{}) (err error) {
  197. order, ok := ele.(*model.Order)
  198. if !ok {
  199. return errors.Errorf("runPaidOrder convert ele: %+v failed", order)
  200. }
  201. log.Info("runPaidOrder handle order: %+v", order)
  202. checkOK, payDesc, err := s.checkOrder(ctx, order) // 对支付订单对账
  203. if err != nil {
  204. return err
  205. }
  206. logOrder := &model.LogOrder{
  207. OrderID: order.OrderID,
  208. FromState: order.State,
  209. Desc: payDesc,
  210. }
  211. var fn func(context.Context, *xsql.Tx) (affected bool, err error)
  212. if checkOK { // 对账成功
  213. logOrder.ToState = model.OrderStateSettled
  214. order.State = model.OrderStateSettled
  215. fn = func(ctx context.Context, tx *xsql.Tx) (affected bool, err error) {
  216. var (
  217. bill *model.DailyBill
  218. asset *model.Asset
  219. ver = dailyBillVer(order.PayTime)
  220. monthVer = monthlyBillVer(order.PayTime)
  221. billDailyLog *model.LogBillDaily
  222. userIncome, _ = calcAssetIncome(order.RealFee) // 收入计算结果
  223. )
  224. affected = true
  225. // 获得订单对应的asset
  226. if asset, err = s.dao.Asset(ctx, order.OID, order.OType, order.Currency); err != nil {
  227. return
  228. }
  229. if asset == nil {
  230. log.Error("runPaidOrder find invalid asset order, order: %+v", order)
  231. return
  232. }
  233. // 获得该mid对应的日账单
  234. if bill, err = s.dao.DailyBill(ctx, asset.MID, model.BizAsset, model.CurrencyBP, ver); err != nil {
  235. return
  236. }
  237. if bill == nil {
  238. if bill, err = s.initDailyBill(ctx, asset.MID, model.BizAsset, model.CurrencyBP, ver, monthVer); err != nil {
  239. return
  240. }
  241. }
  242. // 计算日账单
  243. billDailyLog = &model.LogBillDaily{
  244. BillID: bill.BillID,
  245. FromIn: bill.In,
  246. ToIn: bill.In + userIncome,
  247. FromOut: bill.Out,
  248. ToOut: bill.Out,
  249. OrderID: order.OrderID,
  250. }
  251. bill.In += userIncome
  252. // 更新order
  253. rowAffected, err := s.dao.TXUpdateOrder(ctx, tx, order)
  254. if err != nil {
  255. tx.Rollback()
  256. return
  257. }
  258. if rowAffected <= 0 {
  259. tx.Rollback()
  260. log.Error("UpdateOrder no affected from order: %+v", order)
  261. affected = false
  262. return
  263. }
  264. // 添加 order log
  265. _, err = s.dao.TXInsertOrderUserLog(ctx, tx, logOrder)
  266. if err != nil {
  267. tx.Rollback()
  268. return
  269. }
  270. // 更新daily_bill
  271. rowAffected, err = s.dao.TXUpdateDailyBill(ctx, tx, bill)
  272. if err != nil {
  273. tx.Rollback()
  274. return
  275. }
  276. if rowAffected <= 0 {
  277. log.Error("TXUpsertDeltaDailyBill no affected bill: %+v", bill)
  278. tx.Rollback()
  279. affected = false
  280. return
  281. }
  282. // 添加 daily bill log , uk order_id
  283. _, err = s.dao.TXInsertLogDailyBill(ctx, tx, billDailyLog)
  284. if err != nil {
  285. tx.Rollback()
  286. return
  287. }
  288. // 更新 aggr
  289. aggrMonthlyAsset := &model.AggrIncomeUserAsset{
  290. MID: bill.MID,
  291. Currency: bill.Currency,
  292. Ver: monthVer,
  293. OID: order.OID,
  294. OType: order.OType,
  295. }
  296. aggrAllAsset := &model.AggrIncomeUserAsset{
  297. MID: bill.MID,
  298. Currency: bill.Currency,
  299. Ver: 0,
  300. OID: order.OID,
  301. OType: order.OType,
  302. }
  303. aggrUser := &model.AggrIncomeUser{
  304. MID: bill.MID,
  305. Currency: bill.Currency,
  306. }
  307. _, err = s.dao.TXUpsertDeltaAggrIncomeUserAsset(ctx, tx, aggrAllAsset, 1, 0, userIncome, 0)
  308. if err != nil {
  309. tx.Rollback()
  310. return
  311. }
  312. _, err = s.dao.TXUpsertDeltaAggrIncomeUserAsset(ctx, tx, aggrMonthlyAsset, 1, 0, userIncome, 0)
  313. if err != nil {
  314. tx.Rollback()
  315. return
  316. }
  317. _, err = s.dao.TXUpsertDeltaAggrIncomeUser(ctx, tx, aggrUser, 1, 0, userIncome, 0)
  318. if err != nil {
  319. tx.Rollback()
  320. return
  321. }
  322. log.Info("taskBillDaily: %+v, aggrAllAsset: %+v, aggrMonthlyAsset: %+v, aggrUser: %+v, from paid order: %+v", bill, aggrAllAsset, aggrMonthlyAsset, aggrUser, order)
  323. return
  324. }
  325. } else { // 对账失败
  326. logOrder.ToState = model.OrderStateBadDebt
  327. order.State = model.OrderStateBadDebt
  328. fn = func(ctx context.Context, tx *xsql.Tx) (affected bool, err error) {
  329. var (
  330. orderBadDebt *model.OrderBadDebt
  331. )
  332. affected = true
  333. if orderBadDebt, err = s.dao.OrderBadDebt(ctx, order.OrderID); err != nil {
  334. return
  335. }
  336. if orderBadDebt == nil {
  337. if orderBadDebt, err = s.initBadDebt(ctx, order.OrderID); err != nil {
  338. return
  339. }
  340. }
  341. orderBadDebt.Type = "unknown"
  342. orderBadDebt.State = "failed"
  343. // 更新order
  344. rowAffected, theErr := s.dao.TXUpdateOrder(ctx, tx, order)
  345. if theErr != nil {
  346. tx.Rollback()
  347. return
  348. }
  349. if rowAffected <= 0 {
  350. tx.Rollback()
  351. log.Error("UpdateOrder no affected from order: %+v", order)
  352. affected = false
  353. return
  354. }
  355. // 添加order log
  356. _, theErr = s.dao.TXInsertOrderUserLog(ctx, tx, logOrder)
  357. if theErr != nil {
  358. tx.Rollback()
  359. return
  360. }
  361. // 添加坏账表
  362. _, err = s.dao.TXUpdateOrderBadDebt(ctx, tx, orderBadDebt)
  363. if err != nil {
  364. tx.Rollback()
  365. return
  366. }
  367. log.Info("Add bad debt: %+v", orderBadDebt)
  368. return
  369. }
  370. }
  371. return runTXCASTaskWithLog(ctx, s, s.tl, fn)
  372. }
  373. func (s *taskBillDaily) checkOrder(ctx context.Context, order *model.Order) (ok bool, payDesc string, err error) {
  374. ok = false
  375. if order == nil {
  376. return
  377. }
  378. if order.PayID == "" {
  379. log.Error("Check order found baddebt order: %+v", order)
  380. return
  381. }
  382. payParam := s.pay.CheckOrder(order.PayID)
  383. s.pay.Sign(payParam)
  384. payJSON, err := s.pay.ToJSON(payParam)
  385. if err != nil {
  386. return
  387. }
  388. orders, err := s.dao.PayCheckOrder(ctx, payJSON)
  389. if err != nil {
  390. return
  391. }
  392. result, ok := orders[order.PayID]
  393. if !ok {
  394. return
  395. }
  396. payDesc = result.RecoStatusDesc
  397. switch result.RecoStatusDesc {
  398. case model.PayCheckOrderStateSuccess:
  399. ok = true
  400. default:
  401. ok = false
  402. }
  403. return
  404. }
  405. func (s *taskBillDaily) initDailyBill(ctx context.Context, mid int64, biz, currency string, ver, monthVer int64) (bill *model.DailyBill, err error) {
  406. bill = &model.DailyBill{}
  407. bill.BillID = orderID(s.rnd)
  408. bill.MID = mid
  409. bill.Biz = model.BizAsset
  410. bill.Currency = model.CurrencyBP
  411. bill.In = 0
  412. bill.Out = 0
  413. bill.Ver = ver
  414. bill.MonthVer = monthVer
  415. bill.Version = 1
  416. if bill.ID, err = s.dao.InsertDailyBill(ctx, bill); err != nil {
  417. return
  418. }
  419. return
  420. }
  421. func (s *taskBillDaily) initBadDebt(ctx context.Context, orderID string) (data *model.OrderBadDebt, err error) {
  422. data = &model.OrderBadDebt{
  423. OrderID: orderID,
  424. Type: "",
  425. State: "",
  426. }
  427. if data.ID, err = s.dao.InsertOrderBadDebt(ctx, data); err != nil {
  428. return
  429. }
  430. return
  431. }