up_charge.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package charge
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "strconv"
  7. "time"
  8. model "go-common/app/job/main/growup/model/charge"
  9. xtime "go-common/library/time"
  10. )
  11. func (s *Service) calUpCharge(c context.Context, t time.Time, avc chan []*model.AvCharge) (daily, weekly, monthly map[int64]*model.UpCharge, err error) {
  12. od, err := s.getUpDailyCharge(c, t)
  13. if err != nil {
  14. return
  15. }
  16. daily = calUpDailyCharge(od, avc)
  17. weekly, err = s.getUpWeeklyCharge(c)
  18. if err != nil {
  19. return
  20. }
  21. // group by weekly
  22. upChargeStat(startWeeklyDate, weekly, daily)
  23. monthly, err = s.getUpMonthlyCharge(c)
  24. if err != nil {
  25. return
  26. }
  27. // group by monthly
  28. upChargeStat(startMonthlyDate, monthly, daily)
  29. return
  30. }
  31. // 1 od: old daily up charges
  32. func (s *Service) getUpDailyCharge(c context.Context, t time.Time) (od map[int64]*model.UpCharge, err error) {
  33. // old: yesterday
  34. old := t.AddDate(0, 0, -1)
  35. return s.getUpCharges(c, "up_daily_charge", old.Format("2006-01-02"))
  36. }
  37. // 2 ow: old weekly up charges
  38. func (s *Service) getUpWeeklyCharge(c context.Context) (ow map[int64]*model.UpCharge, err error) {
  39. return s.getUpCharges(c, "up_weekly_charge", startWeeklyDate.Format("2006-01-02"))
  40. }
  41. // 3 om: old monthly up charges
  42. func (s *Service) getUpMonthlyCharge(c context.Context) (om map[int64]*model.UpCharge, err error) {
  43. return s.getUpCharges(c, "up_monthly_charge", startMonthlyDate.Format("2006-01-02"))
  44. }
  45. func calUpDailyCharge(od map[int64]*model.UpCharge, avc chan []*model.AvCharge) (mu map[int64]*model.UpCharge) {
  46. mu = make(map[int64]*model.UpCharge)
  47. for charges := range avc {
  48. for _, charge := range charges {
  49. if charge.IncCharge <= 0 {
  50. continue
  51. }
  52. // udc: up daily charge
  53. if udc, ok := mu[charge.MID]; ok {
  54. udc.IncCharge += charge.IncCharge
  55. udc.TotalCharge += charge.IncCharge
  56. } else {
  57. var total int64
  58. if o, ok := od[charge.MID]; ok {
  59. total = o.TotalCharge
  60. }
  61. mu[charge.MID] = &model.UpCharge{
  62. MID: charge.MID,
  63. IncCharge: charge.IncCharge,
  64. TotalCharge: total + charge.IncCharge,
  65. Date: charge.Date,
  66. }
  67. }
  68. }
  69. }
  70. return
  71. }
  72. // os: old weekly/month charge chan, empty maybe
  73. func upChargeStat(t time.Time, os, daily map[int64]*model.UpCharge) {
  74. for mid, ucd := range daily {
  75. if charge, ok := os[mid]; ok {
  76. // update
  77. charge.IncCharge += ucd.IncCharge
  78. charge.TotalCharge += ucd.IncCharge
  79. charge.Date = xtime.Time(t.Unix())
  80. } else {
  81. // new
  82. os[mid] = &model.UpCharge{
  83. MID: mid,
  84. IncCharge: ucd.IncCharge,
  85. TotalCharge: ucd.TotalCharge,
  86. Date: xtime.Time(t.Unix()),
  87. }
  88. }
  89. }
  90. }
  91. // get up charges by date
  92. func (s *Service) getUpCharges(c context.Context, table, date string) (m map[int64]*model.UpCharge, err error) {
  93. var id int64
  94. m = make(map[int64]*model.UpCharge)
  95. for {
  96. var charges map[int64]*model.UpCharge
  97. id, charges, err = s.dao.GetUpCharges(c, table, date, id, 2000)
  98. if err != nil {
  99. return
  100. }
  101. if len(charges) == 0 {
  102. break
  103. }
  104. for k, v := range charges {
  105. m[k] = v
  106. }
  107. }
  108. return
  109. }
  110. // BatchInsertUpCharge batch insert up charge
  111. func (s *Service) BatchInsertUpCharge(c context.Context, table string, us map[int64]*model.UpCharge) (err error) {
  112. var (
  113. buff = make([]*model.UpCharge, _batchSize)
  114. buffEnd = 0
  115. )
  116. for _, u := range us {
  117. buff[buffEnd] = u
  118. buffEnd++
  119. if buffEnd >= _batchSize {
  120. values := upChargeValues(buff[:buffEnd])
  121. buffEnd = 0
  122. _, err = s.dao.InsertUpCharge(c, table, values)
  123. if err != nil {
  124. return
  125. }
  126. }
  127. }
  128. if buffEnd > 0 {
  129. values := upChargeValues(buff[:buffEnd])
  130. buffEnd = 0
  131. _, err = s.dao.InsertUpCharge(c, table, values)
  132. }
  133. return
  134. }
  135. func upChargeValues(us []*model.UpCharge) (values string) {
  136. var buf bytes.Buffer
  137. for _, charge := range us {
  138. buf.WriteString("(")
  139. buf.WriteString(strconv.FormatInt(charge.MID, 10))
  140. buf.WriteByte(',')
  141. buf.WriteString(strconv.FormatInt(charge.IncCharge, 10))
  142. buf.WriteByte(',')
  143. buf.WriteString(strconv.FormatInt(charge.TotalCharge, 10))
  144. buf.WriteByte(',')
  145. buf.WriteString(fmt.Sprintf("'%s'", charge.Date.Time().Format("2006-01-02")))
  146. buf.WriteString(")")
  147. buf.WriteByte(',')
  148. }
  149. if buf.Len() > 0 {
  150. buf.Truncate(buf.Len() - 1)
  151. }
  152. values = buf.String()
  153. buf.Reset()
  154. return
  155. }