av_charge_statis.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package income
  2. import (
  3. "bytes"
  4. "context"
  5. "strconv"
  6. model "go-common/app/job/main/growup/model/income"
  7. "go-common/library/log"
  8. )
  9. // GetAvChargeStatisMap get av charge statis map
  10. func (s *AvChargeSvr) GetAvChargeStatisMap(c context.Context) (chargeStatisMap map[int64]*model.AvChargeStatis, err error) {
  11. avChargeStatis, err := s.GetAvChargeStatis(c)
  12. if err != nil {
  13. log.Error("s.GetAvChargeStatis error(%v)", err)
  14. return
  15. }
  16. chargeStatisMap = make(map[int64]*model.AvChargeStatis)
  17. for _, chargeStatis := range avChargeStatis {
  18. chargeStatisMap[chargeStatis.AvID] = chargeStatis
  19. }
  20. return
  21. }
  22. // GetAvChargeStatis get av charge statis
  23. func (s *AvChargeSvr) GetAvChargeStatis(c context.Context) (avChargeStatis []*model.AvChargeStatis, err error) {
  24. var id int64
  25. for {
  26. statis, err1 := s.dao.AvChargeStatis(c, id, _limitSize)
  27. if err1 != nil {
  28. err = err1
  29. return
  30. }
  31. avChargeStatis = append(avChargeStatis, statis...)
  32. if len(statis) < _limitSize {
  33. break
  34. }
  35. id = statis[len(statis)-1].ID
  36. }
  37. return
  38. }
  39. // CalAvChargeStatis cal av charge statis
  40. func (s *AvChargeSvr) CalAvChargeStatis(dailyCharge *model.AvCharge, chargeStatisMap map[int64]*model.AvChargeStatis) {
  41. if statisCharge, ok := chargeStatisMap[dailyCharge.AvID]; ok {
  42. updateAvChargeStatis(statisCharge, dailyCharge)
  43. } else {
  44. chargeStatisMap[dailyCharge.AvID] = addAvChargeStatis(dailyCharge)
  45. }
  46. }
  47. func addAvChargeStatis(daily *model.AvCharge) *model.AvChargeStatis {
  48. return &model.AvChargeStatis{
  49. AvID: daily.AvID,
  50. MID: daily.MID,
  51. TagID: daily.TagID,
  52. IsOriginal: daily.IsOriginal,
  53. UploadTime: daily.UploadTime,
  54. TotalCharge: daily.IncCharge,
  55. DBState: _dbInsert,
  56. }
  57. }
  58. func updateAvChargeStatis(avChargeStatis *model.AvChargeStatis, daily *model.AvCharge) {
  59. avChargeStatis.TotalCharge += daily.IncCharge
  60. avChargeStatis.DBState = _dbUpdate
  61. }
  62. // AvChargeStatisDBStore store charge statis
  63. func (s *AvChargeSvr) AvChargeStatisDBStore(c context.Context, chargeStatisMap map[int64]*model.AvChargeStatis) error {
  64. insert, update := make([]*model.AvChargeStatis, batchSize), make([]*model.AvChargeStatis, batchSize)
  65. insertIndex, updateIndex := 0, 0
  66. for _, charge := range chargeStatisMap {
  67. if charge.DBState == _dbInsert {
  68. insert[insertIndex] = charge
  69. insertIndex++
  70. } else if charge.DBState == _dbUpdate {
  71. update[updateIndex] = charge
  72. updateIndex++
  73. }
  74. if insertIndex >= batchSize {
  75. _, err := s.avChargeStatisBatchInsert(c, insert[:insertIndex])
  76. if err != nil {
  77. log.Error("s.avChargeStatisBatchInsert error(%v)", err)
  78. return err
  79. }
  80. insertIndex = 0
  81. }
  82. if updateIndex >= batchSize {
  83. _, err := s.avChargeStatisBatchInsert(c, update[:updateIndex])
  84. if err != nil {
  85. log.Error("s.avChargeStatisBatchInsert error(%v)", err)
  86. return err
  87. }
  88. updateIndex = 0
  89. }
  90. }
  91. if insertIndex > 0 {
  92. _, err := s.avChargeStatisBatchInsert(c, insert[:insertIndex])
  93. if err != nil {
  94. log.Error("s.avChargeStatisBatchInsert error(%v)", err)
  95. return err
  96. }
  97. }
  98. if updateIndex > 0 {
  99. _, err := s.avChargeStatisBatchInsert(c, update[:updateIndex])
  100. if err != nil {
  101. log.Error("s.avChargeStatisBatchInsert error(%v)", err)
  102. return err
  103. }
  104. }
  105. return nil
  106. }
  107. func assembleAvChargeStatis(avChargeStatis []*model.AvChargeStatis) (vals string) {
  108. var buf bytes.Buffer
  109. for _, row := range avChargeStatis {
  110. buf.WriteString("(")
  111. buf.WriteString(strconv.FormatInt(row.AvID, 10))
  112. buf.WriteByte(',')
  113. buf.WriteString(strconv.FormatInt(row.MID, 10))
  114. buf.WriteByte(',')
  115. buf.WriteString(strconv.FormatInt(row.TagID, 10))
  116. buf.WriteByte(',')
  117. buf.WriteString(strconv.Itoa(row.IsOriginal))
  118. buf.WriteByte(',')
  119. buf.WriteString(strconv.FormatInt(row.TotalCharge, 10))
  120. buf.WriteByte(',')
  121. buf.WriteString("'" + row.UploadTime.Time().Format(_layoutSec) + "'")
  122. buf.WriteString(")")
  123. buf.WriteByte(',')
  124. }
  125. if buf.Len() > 0 {
  126. buf.Truncate(buf.Len() - 1)
  127. }
  128. vals = buf.String()
  129. buf.Reset()
  130. return
  131. }
  132. func (s *AvChargeSvr) avChargeStatisBatchInsert(c context.Context, avChargeStatis []*model.AvChargeStatis) (rows int64, err error) {
  133. vals := assembleAvChargeStatis(avChargeStatis)
  134. rows, err = s.dao.InsertAvChargeStatisBatch(c, vals)
  135. return
  136. }