task_bill_monthly.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "math/rand"
  7. "time"
  8. "go-common/app/job/main/ugcpay/dao"
  9. "go-common/app/job/main/ugcpay/model"
  10. xsql "go-common/library/database/sql"
  11. "go-common/library/log"
  12. "github.com/pkg/errors"
  13. )
  14. type taskBillMonthly struct {
  15. dao *dao.Dao
  16. rnd *rand.Rand
  17. monthOffset int
  18. namePrefix string
  19. tl *taskLog
  20. }
  21. func (s *taskBillMonthly) Run() (err error) {
  22. var (
  23. ctx = context.Background()
  24. finished bool
  25. expectFN = func(ctx context.Context) (expect int64, err error) {
  26. var (
  27. beginTime, _ = monthRange(s.monthOffset)
  28. monthVer = monthlyBillVer(beginTime)
  29. )
  30. if expect, err = s.dao.CountDailyBillByMonthVer(ctx, monthVer); err != nil {
  31. return
  32. }
  33. return
  34. }
  35. )
  36. if finished, err = checkOrCreateTaskFromLog(ctx, s, s.tl, expectFN); err != nil || finished {
  37. return
  38. }
  39. return s.run(ctx)
  40. }
  41. func (s *taskBillMonthly) TTL() int32 {
  42. return 3600 * 2
  43. }
  44. func (s *taskBillMonthly) Name() string {
  45. return fmt.Sprintf("%s_%d", s.namePrefix, monthlyBillVer(time.Now()))
  46. }
  47. // 月账单生成
  48. func (s *taskBillMonthly) run(ctx context.Context) (err error) {
  49. ll := &dailyBillLLByMonthVer{
  50. limit: 1000,
  51. dao: s.dao,
  52. }
  53. beginTime, _ := monthRange(s.monthOffset)
  54. ll.monthVer = monthlyBillVer(beginTime)
  55. return runLimitedList(ctx, ll, time.Millisecond*2, s.runDailyBill)
  56. }
  57. func (s *taskBillMonthly) runDailyBill(ctx context.Context, ele interface{}) (err error) {
  58. dailyBill, ok := ele.(*model.DailyBill)
  59. if !ok {
  60. return errors.Errorf("taskBillMonthly convert ele: %+v failed", dailyBill)
  61. }
  62. log.Info("taskBillMonthly start handle daily biil: %+v", dailyBill)
  63. fn := func(ctx context.Context, tx *xsql.Tx) (affected bool, err error) {
  64. var (
  65. monthlyBill *model.Bill
  66. monthVer = dailyBill.MonthVer
  67. monthlyBillLog *model.LogBillMonthly
  68. )
  69. affected = true
  70. // 获得该 mid 的 daily_bill
  71. if monthlyBill, err = s.dao.MonthlyBill(ctx, dailyBill.MID, model.BizAsset, model.CurrencyBP, monthVer); err != nil {
  72. return
  73. }
  74. if monthlyBill == nil {
  75. if monthlyBill, err = s.initMonthlyBill(ctx, dailyBill.MID, dailyBill.Biz, dailyBill.Currency, dailyBill.MonthVer); err != nil {
  76. return
  77. }
  78. }
  79. monthlyBillLog = &model.LogBillMonthly{
  80. BillID: monthlyBill.BillID,
  81. FromIn: monthlyBill.In,
  82. ToIn: monthlyBill.In + dailyBill.In,
  83. FromOut: monthlyBill.Out,
  84. ToOut: monthlyBill.Out + dailyBill.Out,
  85. BillUserDailyID: dailyBill.BillID,
  86. }
  87. monthlyBill.In += dailyBill.In
  88. monthlyBill.Out += dailyBill.Out
  89. // 添加 monthly bill log , uk : daily_bill_id
  90. _, err = s.dao.TXInsertLogMonthlyBill(ctx, tx, monthlyBillLog)
  91. if err != nil {
  92. tx.Rollback()
  93. return
  94. }
  95. // 更新 monthly bill
  96. _, err = s.dao.TXUpdateMonthlyBill(ctx, tx, monthlyBill)
  97. if err != nil {
  98. tx.Rollback()
  99. return
  100. }
  101. log.Info("taskBillMonthly: %+v,from daily bill: %+v", monthlyBill, dailyBill)
  102. return
  103. }
  104. return runTXCASTaskWithLog(ctx, s, s.tl, fn)
  105. }
  106. func (s *taskBillMonthly) initMonthlyBill(ctx context.Context, mid int64, biz, currency string, ver int64) (data *model.Bill, err error) {
  107. data = &model.Bill{
  108. BillID: orderID(s.rnd),
  109. MID: mid,
  110. Biz: biz,
  111. Currency: currency,
  112. In: 0,
  113. Out: 0,
  114. Ver: ver,
  115. Version: 1,
  116. }
  117. if data.ID, err = s.dao.InsertMonthlyBill(ctx, data); err != nil {
  118. return
  119. }
  120. return
  121. }
  122. func orderID(rnd *rand.Rand) string {
  123. var b bytes.Buffer
  124. b.WriteString(fmt.Sprintf("%05d", rnd.Int63n(99999)))
  125. b.WriteString(fmt.Sprintf("%03d", time.Now().UnixNano()/1e6%1000))
  126. b.WriteString(time.Now().Format("060102150405"))
  127. return b.String()
  128. }