limited_list.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package service
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/job/main/ugcpay/dao"
  6. "go-common/app/job/main/ugcpay/model"
  7. "go-common/library/log"
  8. )
  9. type limitedList interface {
  10. LimitSize() int
  11. BeginID(ctx context.Context) (id int64, err error)
  12. List(ctx context.Context, beginID int64) (maxID int64, list []interface{}, err error)
  13. }
  14. func runLimitedList(ctx context.Context, ll limitedList, sleep time.Duration, handler func(ctx context.Context, ele interface{}) error) (err error) {
  15. if ll.LimitSize() <= 0 {
  16. return
  17. }
  18. var (
  19. id int64
  20. list = make([]interface{}, ll.LimitSize())
  21. )
  22. if id, err = ll.BeginID(ctx); err != nil {
  23. return
  24. }
  25. for len(list) >= ll.LimitSize() {
  26. if id, list, err = ll.List(ctx, id); err != nil {
  27. return
  28. }
  29. for _, ele := range list {
  30. if err = handler(ctx, ele); err != nil {
  31. log.Error("handle failed, ele: %+v, err: %+v", ele, err)
  32. err = nil
  33. continue
  34. }
  35. if sleep > 0 {
  36. time.Sleep(sleep)
  37. }
  38. }
  39. }
  40. return
  41. }
  42. type orderPaidLL struct {
  43. beginTime time.Time
  44. endTime time.Time
  45. limit int
  46. dao *dao.Dao
  47. }
  48. func (o *orderPaidLL) LimitSize() int {
  49. return o.limit
  50. }
  51. func (o *orderPaidLL) BeginID(ctx context.Context) (id int64, err error) {
  52. return o.dao.MinIDOrderPaid(ctx, o.beginTime)
  53. }
  54. func (o *orderPaidLL) List(ctx context.Context, beginID int64) (maxID int64, list []interface{}, err error) {
  55. var rawList []*model.Order
  56. if maxID, rawList, err = o.dao.OrderPaidList(ctx, o.beginTime, o.endTime, beginID, o.limit); err != nil {
  57. return
  58. }
  59. log.Info("orderPaidLL beginID: %d, beginTime: %+v, endTime: %+v, limit: %d, size: %d", beginID, o.beginTime, o.endTime, o.limit, len(rawList))
  60. for _, r := range rawList {
  61. list = append(list, r)
  62. }
  63. return
  64. }
  65. type orderRefundedLL struct {
  66. beginTime time.Time
  67. endTime time.Time
  68. limit int
  69. dao *dao.Dao
  70. }
  71. func (o *orderRefundedLL) LimitSize() int {
  72. return o.limit
  73. }
  74. func (o *orderRefundedLL) BeginID(ctx context.Context) (id int64, err error) {
  75. return o.dao.MinIDOrderRefunded(ctx, o.beginTime)
  76. }
  77. func (o *orderRefundedLL) List(ctx context.Context, beginID int64) (maxID int64, list []interface{}, err error) {
  78. var rawList []*model.Order
  79. if maxID, rawList, err = o.dao.OrderRefundedList(ctx, o.beginTime, o.endTime, beginID, o.limit); err != nil {
  80. return
  81. }
  82. log.Info("orderRefundedLL beginID: %d, beginTime: %+v, endTime: %+v, limit: %d, size: %d", beginID, o.beginTime, o.endTime, o.limit, len(rawList))
  83. for _, r := range rawList {
  84. list = append(list, r)
  85. }
  86. return
  87. }
  88. type dailyBillLLByVer struct {
  89. ver int64
  90. limit int
  91. dao *dao.Dao
  92. }
  93. func (d *dailyBillLLByVer) LimitSize() int {
  94. return d.limit
  95. }
  96. func (d *dailyBillLLByVer) BeginID(ctx context.Context) (id int64, err error) {
  97. return d.dao.MinIDDailyBillByVer(ctx, d.ver)
  98. }
  99. func (d *dailyBillLLByVer) List(ctx context.Context, beginID int64) (maxID int64, list []interface{}, err error) {
  100. var rawList []*model.DailyBill
  101. if maxID, rawList, err = d.dao.DailyBillListByVer(ctx, d.ver, beginID, d.limit); err != nil {
  102. return
  103. }
  104. log.Info("dailyBillLLByVer beginID: %d, ver: %d, limit: %d, size: %d", beginID, d.ver, d.limit, len(rawList))
  105. for _, r := range rawList {
  106. list = append(list, r)
  107. }
  108. return
  109. }
  110. type dailyBillLLByMonthVer struct {
  111. monthVer int64
  112. limit int
  113. dao *dao.Dao
  114. }
  115. func (d *dailyBillLLByMonthVer) LimitSize() int {
  116. return d.limit
  117. }
  118. func (d *dailyBillLLByMonthVer) BeginID(ctx context.Context) (id int64, err error) {
  119. return d.dao.MinIDDailyBillByMonthVer(ctx, d.monthVer)
  120. }
  121. func (d *dailyBillLLByMonthVer) List(ctx context.Context, beginID int64) (maxID int64, list []interface{}, err error) {
  122. var rawList []*model.DailyBill
  123. if maxID, rawList, err = d.dao.DailyBillListByMonthVer(ctx, d.monthVer, beginID, d.limit); err != nil {
  124. return
  125. }
  126. log.Info("dailyBillLLByMonthVer beginID: %d, monthVer: %d, limit: %d, size: %d", beginID, d.monthVer, d.limit, len(rawList))
  127. for _, r := range rawList {
  128. list = append(list, r)
  129. }
  130. return
  131. }
  132. type monthlyBillLL struct {
  133. ver int64
  134. limit int
  135. dao *dao.Dao
  136. }
  137. func (m *monthlyBillLL) LimitSize() int {
  138. return m.limit
  139. }
  140. func (m *monthlyBillLL) BeginID(ctx context.Context) (id int64, err error) {
  141. return m.dao.MinIDMonthlyBill(ctx, m.ver)
  142. }
  143. func (m *monthlyBillLL) List(ctx context.Context, beginID int64) (maxID int64, list []interface{}, err error) {
  144. var rawList []*model.Bill
  145. if maxID, rawList, err = m.dao.MonthlyBillList(ctx, m.ver, beginID, m.limit); err != nil {
  146. return
  147. }
  148. log.Info("monthlyBillLL beginID: %d, ver: %d, limit: %d, size: %d", beginID, m.ver, m.limit, len(rawList))
  149. for _, r := range rawList {
  150. list = append(list, r)
  151. }
  152. return
  153. }