|
- package service
- import (
- "context"
- "fmt"
- "math/rand"
- "time"
- "go-common/app/job/main/ugcpay/dao"
- "go-common/app/job/main/ugcpay/model"
- "go-common/app/job/main/ugcpay/service/pay"
- xsql "go-common/library/database/sql"
- "go-common/library/log"
- "github.com/pkg/errors"
- )
- type taskBillDaily struct {
- dao *dao.Dao
- pay *pay.Pay
- rnd *rand.Rand
- dayOffset int
- namePrefix string
- tl *taskLog
- }
- func (s *taskBillDaily) Run() (err error) {
- var (
- ctx = context.Background()
- finished bool
- expectFN = func(ctx context.Context) (expect int64, err error) {
- var (
- beginTime, endTime = dayRange(s.dayOffset)
- expectPaid, expectRefunded int64
- )
- if expectPaid, err = s.dao.CountPaidOrderUser(ctx, beginTime, endTime); err != nil {
- return
- }
- if expectRefunded, err = s.dao.CountRefundedOrderUser(ctx, beginTime, endTime); err != nil {
- return
- }
- expect = expectPaid + expectRefunded
- return
- }
- )
- if finished, err = checkOrCreateTaskFromLog(ctx, s, s.tl, expectFN); err != nil || finished {
- return
- }
- return s.run(ctx)
- }
- func (s *taskBillDaily) TTL() int32 {
- return 3600 * 2
- }
- func (s *taskBillDaily) Name() string {
- return fmt.Sprintf("%s_%d", s.namePrefix, dailyBillVer(time.Now()))
- }
- // 日账单生成
- func (s *taskBillDaily) run(ctx context.Context) (err error) {
- // 已支付成功订单入账
- paidLL := &orderPaidLL{
- limit: 1000,
- dao: s.dao,
- }
- paidLL.beginTime, paidLL.endTime = dayRange(s.dayOffset)
- if err = runLimitedList(ctx, paidLL, time.Millisecond*5, s.runPaidOrder); err != nil {
- return
- }
- // 已退款订单入账
- refundLL := &orderRefundedLL{
- limit: 1000,
- dao: s.dao,
- }
- refundLL.beginTime, refundLL.endTime = dayRange(s.dayOffset)
- return runLimitedList(ctx, refundLL, time.Millisecond*5, s.runRefundedOrder)
- }
- // 处理退款order
- func (s *taskBillDaily) runRefundedOrder(ctx context.Context, ele interface{}) (err error) {
- order, ok := ele.(*model.Order)
- if !ok {
- return errors.Errorf("refundedOrderHandler convert ele: %+v failed", order)
- }
- log.Info("runRefundedOrder handle order: %+v", order)
- logOrder := &model.LogOrder{
- OrderID: order.OrderID,
- FromState: order.State,
- ToState: model.OrderStateRefundFinished,
- }
- order.State = model.OrderStateRefundFinished
- fn := func(ctx context.Context, tx *xsql.Tx) (affected bool, err error) {
- var (
- bill *model.DailyBill
- asset *model.Asset
- ver = dailyBillVer(order.RefundTime)
- monthVer = monthlyBillVer(order.RefundTime)
- billDailyLog *model.LogBillDaily
- userIncome, _ = calcAssetIncome(order.RealFee) // 收入计算结果
- )
- affected = true
- // 获得订单对应的asset
- if asset, err = s.dao.Asset(ctx, order.OID, order.OType, order.Currency); err != nil {
- return
- }
- if asset == nil {
- err = errors.Errorf("dailyBillHander find invalid asset order, order: %+v", order)
- return
- }
- // 获得该mid对应的日账单
- if bill, err = s.dao.DailyBill(ctx, asset.MID, model.BizAsset, model.CurrencyBP, ver); err != nil {
- return
- }
- if bill == nil {
- if bill, err = s.initDailyBill(ctx, asset.MID, model.BizAsset, model.CurrencyBP, ver, monthVer); err != nil {
- return
- }
- }
- // 计算日账单
- billDailyLog = &model.LogBillDaily{
- BillID: bill.BillID,
- FromIn: bill.In,
- ToIn: bill.In,
- FromOut: bill.Out + userIncome,
- ToOut: bill.Out,
- OrderID: order.OrderID + "_r",
- }
- bill.Out += userIncome
- // 更新order
- rowAffected, err := s.dao.TXUpdateOrder(ctx, tx, order)
- if err != nil {
- tx.Rollback()
- return
- }
- if rowAffected <= 0 {
- tx.Rollback()
- log.Error("UpdateOrder no affected from order: %+v", order)
- affected = false
- return
- }
- // 添加 order log
- _, err = s.dao.TXInsertOrderUserLog(ctx, tx, logOrder)
- if err != nil {
- tx.Rollback()
- return
- }
- // 更新daily_bill
- rowAffected, err = s.dao.TXUpdateDailyBill(ctx, tx, bill)
- if err != nil {
- tx.Rollback()
- return
- }
- if rowAffected <= 0 {
- log.Error("TXUpdateDailyBill no affected bill: %+v", bill)
- tx.Rollback()
- affected = false
- return
- }
- // 添加 daily bill log , uk order_id
- _, err = s.dao.TXInsertLogDailyBill(ctx, tx, billDailyLog)
- if err != nil {
- tx.Rollback()
- return
- }
- // 更新 aggr
- aggrMonthlyAsset := &model.AggrIncomeUserAsset{
- MID: bill.MID,
- Currency: bill.Currency,
- Ver: monthVer,
- OID: order.OID,
- OType: order.OType,
- }
- aggrAllAsset := &model.AggrIncomeUserAsset{
- MID: bill.MID,
- Currency: bill.Currency,
- Ver: 0,
- OID: order.OID,
- OType: order.OType,
- }
- aggrUser := &model.AggrIncomeUser{
- MID: bill.MID,
- Currency: bill.Currency,
- }
- _, err = s.dao.TXUpsertDeltaAggrIncomeUserAsset(ctx, tx, aggrAllAsset, 0, 1, 0, userIncome)
- if err != nil {
- tx.Rollback()
- return
- }
- _, err = s.dao.TXUpsertDeltaAggrIncomeUserAsset(ctx, tx, aggrMonthlyAsset, 0, 1, 0, userIncome)
- if err != nil {
- tx.Rollback()
- return
- }
- _, err = s.dao.TXUpsertDeltaAggrIncomeUser(ctx, tx, aggrUser, 0, 1, 0, userIncome)
- if err != nil {
- tx.Rollback()
- return
- }
- log.Info("Settle daily bill: %+v, aggrAllAsset: %+v, aggrMonthlyAsset: %+v, aggrUser: %+v, from refunded order: %+v", bill, aggrAllAsset, aggrMonthlyAsset, aggrUser, order)
- return
- }
- return runTXCASTaskWithLog(ctx, s, s.tl, fn)
- }
- func (s *taskBillDaily) runPaidOrder(ctx context.Context, ele interface{}) (err error) {
- order, ok := ele.(*model.Order)
- if !ok {
- return errors.Errorf("runPaidOrder convert ele: %+v failed", order)
- }
- log.Info("runPaidOrder handle order: %+v", order)
- checkOK, payDesc, err := s.checkOrder(ctx, order) // 对支付订单对账
- if err != nil {
- return err
- }
- logOrder := &model.LogOrder{
- OrderID: order.OrderID,
- FromState: order.State,
- Desc: payDesc,
- }
- var fn func(context.Context, *xsql.Tx) (affected bool, err error)
- if checkOK { // 对账成功
- logOrder.ToState = model.OrderStateSettled
- order.State = model.OrderStateSettled
- fn = func(ctx context.Context, tx *xsql.Tx) (affected bool, err error) {
- var (
- bill *model.DailyBill
- asset *model.Asset
- ver = dailyBillVer(order.PayTime)
- monthVer = monthlyBillVer(order.PayTime)
- billDailyLog *model.LogBillDaily
- userIncome, _ = calcAssetIncome(order.RealFee) // 收入计算结果
- )
- affected = true
- // 获得订单对应的asset
- if asset, err = s.dao.Asset(ctx, order.OID, order.OType, order.Currency); err != nil {
- return
- }
- if asset == nil {
- log.Error("runPaidOrder find invalid asset order, order: %+v", order)
- return
- }
- // 获得该mid对应的日账单
- if bill, err = s.dao.DailyBill(ctx, asset.MID, model.BizAsset, model.CurrencyBP, ver); err != nil {
- return
- }
- if bill == nil {
- if bill, err = s.initDailyBill(ctx, asset.MID, model.BizAsset, model.CurrencyBP, ver, monthVer); err != nil {
- return
- }
- }
- // 计算日账单
- billDailyLog = &model.LogBillDaily{
- BillID: bill.BillID,
- FromIn: bill.In,
- ToIn: bill.In + userIncome,
- FromOut: bill.Out,
- ToOut: bill.Out,
- OrderID: order.OrderID,
- }
- bill.In += userIncome
- // 更新order
- rowAffected, err := s.dao.TXUpdateOrder(ctx, tx, order)
- if err != nil {
- tx.Rollback()
- return
- }
- if rowAffected <= 0 {
- tx.Rollback()
- log.Error("UpdateOrder no affected from order: %+v", order)
- affected = false
- return
- }
- // 添加 order log
- _, err = s.dao.TXInsertOrderUserLog(ctx, tx, logOrder)
- if err != nil {
- tx.Rollback()
- return
- }
- // 更新daily_bill
- rowAffected, err = s.dao.TXUpdateDailyBill(ctx, tx, bill)
- if err != nil {
- tx.Rollback()
- return
- }
- if rowAffected <= 0 {
- log.Error("TXUpsertDeltaDailyBill no affected bill: %+v", bill)
- tx.Rollback()
- affected = false
- return
- }
- // 添加 daily bill log , uk order_id
- _, err = s.dao.TXInsertLogDailyBill(ctx, tx, billDailyLog)
- if err != nil {
- tx.Rollback()
- return
- }
- // 更新 aggr
- aggrMonthlyAsset := &model.AggrIncomeUserAsset{
- MID: bill.MID,
- Currency: bill.Currency,
- Ver: monthVer,
- OID: order.OID,
- OType: order.OType,
- }
- aggrAllAsset := &model.AggrIncomeUserAsset{
- MID: bill.MID,
- Currency: bill.Currency,
- Ver: 0,
- OID: order.OID,
- OType: order.OType,
- }
- aggrUser := &model.AggrIncomeUser{
- MID: bill.MID,
- Currency: bill.Currency,
- }
- _, err = s.dao.TXUpsertDeltaAggrIncomeUserAsset(ctx, tx, aggrAllAsset, 1, 0, userIncome, 0)
- if err != nil {
- tx.Rollback()
- return
- }
- _, err = s.dao.TXUpsertDeltaAggrIncomeUserAsset(ctx, tx, aggrMonthlyAsset, 1, 0, userIncome, 0)
- if err != nil {
- tx.Rollback()
- return
- }
- _, err = s.dao.TXUpsertDeltaAggrIncomeUser(ctx, tx, aggrUser, 1, 0, userIncome, 0)
- if err != nil {
- tx.Rollback()
- return
- }
- log.Info("taskBillDaily: %+v, aggrAllAsset: %+v, aggrMonthlyAsset: %+v, aggrUser: %+v, from paid order: %+v", bill, aggrAllAsset, aggrMonthlyAsset, aggrUser, order)
- return
- }
- } else { // 对账失败
- logOrder.ToState = model.OrderStateBadDebt
- order.State = model.OrderStateBadDebt
- fn = func(ctx context.Context, tx *xsql.Tx) (affected bool, err error) {
- var (
- orderBadDebt *model.OrderBadDebt
- )
- affected = true
- if orderBadDebt, err = s.dao.OrderBadDebt(ctx, order.OrderID); err != nil {
- return
- }
- if orderBadDebt == nil {
- if orderBadDebt, err = s.initBadDebt(ctx, order.OrderID); err != nil {
- return
- }
- }
- orderBadDebt.Type = "unknown"
- orderBadDebt.State = "failed"
- // 更新order
- rowAffected, theErr := s.dao.TXUpdateOrder(ctx, tx, order)
- if theErr != nil {
- tx.Rollback()
- return
- }
- if rowAffected <= 0 {
- tx.Rollback()
- log.Error("UpdateOrder no affected from order: %+v", order)
- affected = false
- return
- }
- // 添加order log
- _, theErr = s.dao.TXInsertOrderUserLog(ctx, tx, logOrder)
- if theErr != nil {
- tx.Rollback()
- return
- }
- // 添加坏账表
- _, err = s.dao.TXUpdateOrderBadDebt(ctx, tx, orderBadDebt)
- if err != nil {
- tx.Rollback()
- return
- }
- log.Info("Add bad debt: %+v", orderBadDebt)
- return
- }
- }
- return runTXCASTaskWithLog(ctx, s, s.tl, fn)
- }
- func (s *taskBillDaily) checkOrder(ctx context.Context, order *model.Order) (ok bool, payDesc string, err error) {
- ok = false
- if order == nil {
- return
- }
- if order.PayID == "" {
- log.Error("Check order found baddebt order: %+v", order)
- return
- }
- payParam := s.pay.CheckOrder(order.PayID)
- s.pay.Sign(payParam)
- payJSON, err := s.pay.ToJSON(payParam)
- if err != nil {
- return
- }
- orders, err := s.dao.PayCheckOrder(ctx, payJSON)
- if err != nil {
- return
- }
- result, ok := orders[order.PayID]
- if !ok {
- return
- }
- payDesc = result.RecoStatusDesc
- switch result.RecoStatusDesc {
- case model.PayCheckOrderStateSuccess:
- ok = true
- default:
- ok = false
- }
- return
- }
- func (s *taskBillDaily) initDailyBill(ctx context.Context, mid int64, biz, currency string, ver, monthVer int64) (bill *model.DailyBill, err error) {
- bill = &model.DailyBill{}
- bill.BillID = orderID(s.rnd)
- bill.MID = mid
- bill.Biz = model.BizAsset
- bill.Currency = model.CurrencyBP
- bill.In = 0
- bill.Out = 0
- bill.Ver = ver
- bill.MonthVer = monthVer
- bill.Version = 1
- if bill.ID, err = s.dao.InsertDailyBill(ctx, bill); err != nil {
- return
- }
- return
- }
- func (s *taskBillDaily) initBadDebt(ctx context.Context, orderID string) (data *model.OrderBadDebt, err error) {
- data = &model.OrderBadDebt{
- OrderID: orderID,
- Type: "",
- State: "",
- }
- if data.ID, err = s.dao.InsertOrderBadDebt(ctx, data); err != nil {
- return
- }
- return
- }
|