run_column.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. package charge
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. model "go-common/app/job/main/growup/model/charge"
  7. "go-common/library/log"
  8. "golang.org/x/sync/errgroup"
  9. )
  10. var (
  11. _cmWeeklyCharge = "column_weekly_charge"
  12. _cmMonthlyCharge = "column_monthly_charge"
  13. _cmDailyStatis = "column_charge_daily_statis"
  14. _cmWeeklyStatis = "column_charge_weekly_statis"
  15. _cmMonthlyStatis = "column_charge_monthly_statis"
  16. )
  17. func (s *Service) runColumn(c context.Context, date time.Time) (err error) {
  18. startWeeklyDate = getStartWeeklyDate(date)
  19. startMonthlyDate = getStartMonthlyDate(date)
  20. var (
  21. readGroup errgroup.Group
  22. sourceCh = make(chan []*model.Column, 1000)
  23. cmCh = make(chan []*model.Column, 1000)
  24. dailyStatisCh = make(chan []*model.Column, 1000)
  25. )
  26. readGroup.Go(func() (err error) {
  27. err = s.columnCharges(c, date, sourceCh)
  28. if err != nil {
  29. log.Error("s.columnCharges error(%v)", err)
  30. return
  31. }
  32. log.Info("read column_daily_charge finished")
  33. return
  34. })
  35. readGroup.Go(func() (err error) {
  36. defer func() {
  37. close(cmCh)
  38. close(dailyStatisCh)
  39. }()
  40. for charges := range sourceCh {
  41. cmCh <- charges
  42. dailyStatisCh <- charges
  43. }
  44. return
  45. })
  46. var (
  47. weeklyMap map[int64]*model.Column
  48. monthlyMap map[int64]*model.Column
  49. statisMap map[int64]*model.ColumnStatis
  50. weeklyCh = make(chan map[int64]*model.Column, 1)
  51. monthlyCh = make(chan map[int64]*model.Column, 1)
  52. )
  53. readGroup.Go(func() (err error) {
  54. defer func() {
  55. close(weeklyCh)
  56. close(monthlyCh)
  57. }()
  58. weeklyMap, monthlyMap, statisMap, err = s.handleColumn(c, date, cmCh)
  59. if err != nil {
  60. log.Error("s.handleColumn error(%v)", err)
  61. return
  62. }
  63. weeklyCh <- weeklyMap
  64. monthlyCh <- monthlyMap
  65. log.Info("handleColumn finished")
  66. return
  67. })
  68. var (
  69. dateStatis = &SectionEntries{}
  70. cmDaily = make(chan []*model.Archive, 2000)
  71. cmWeekly = make(chan []*model.Archive, 1)
  72. cmMonthly = make(chan []*model.Archive, 1)
  73. )
  74. readGroup.Go(func() (err error) {
  75. defer close(cmDaily)
  76. for cms := range dailyStatisCh {
  77. cmDaily <- transCm2Archive(cms)
  78. }
  79. return
  80. })
  81. readGroup.Go(func() (err error) {
  82. defer close(cmWeekly)
  83. cmWeekly <- transCmMap2Archive(<-weeklyCh)
  84. return
  85. })
  86. readGroup.Go(func() (err error) {
  87. defer close(cmMonthly)
  88. cmMonthly <- transCmMap2Archive(<-monthlyCh)
  89. return
  90. })
  91. readGroup.Go(func() (err error) {
  92. dateStatis.daily, err = s.handleDateStatis(c, cmDaily, date, _cmDailyStatis)
  93. if err != nil {
  94. log.Error("s.handleDateStatis(%s) error(%v)", _cmDailyStatis, err)
  95. }
  96. return
  97. })
  98. readGroup.Go(func() (err error) {
  99. dateStatis.weekly, err = s.handleDateStatis(c, cmWeekly, startWeeklyDate, _cmWeeklyStatis)
  100. if err != nil {
  101. log.Error("s.handleDateStatis(%s) error(%v)", _cmWeeklyStatis, err)
  102. }
  103. return
  104. })
  105. readGroup.Go(func() (err error) {
  106. dateStatis.monthly, err = s.handleDateStatis(c, cmMonthly, startMonthlyDate, _cmMonthlyStatis)
  107. if err != nil {
  108. log.Error("s.handleDateStatis(%s) error(%v)", _cmMonthlyStatis, err)
  109. }
  110. return
  111. })
  112. if err = readGroup.Wait(); err != nil {
  113. log.Error("run readGroup.Wait error(%v)", err)
  114. return
  115. }
  116. {
  117. if len(weeklyMap) == 0 {
  118. err = fmt.Errorf("Error: insert 0 column_weekly_charge")
  119. return
  120. }
  121. if len(monthlyMap) == 0 {
  122. err = fmt.Errorf("Error: insert 0 column_monthly_charge")
  123. return
  124. }
  125. if len(statisMap) == 0 {
  126. err = fmt.Errorf("Error: insert 0 column_charge_statis")
  127. return
  128. }
  129. if len(dateStatis.daily) == 0 {
  130. err = fmt.Errorf("Error: insert 0 column_charge_daily_statis")
  131. return
  132. }
  133. if len(dateStatis.weekly) == 0 {
  134. err = fmt.Errorf("Error: insert 0 column_charge_weekly_statis")
  135. return
  136. }
  137. if len(dateStatis.monthly) == 0 {
  138. err = fmt.Errorf("Error: insert 0 column_charge_monthly_statis")
  139. return
  140. }
  141. }
  142. // persist
  143. var writeGroup errgroup.Group
  144. // column_weekly_charge
  145. writeGroup.Go(func() (err error) {
  146. err = s.cmDBStore(c, _cmWeeklyCharge, weeklyMap)
  147. if err != nil {
  148. log.Error("s.cmDBStore column_weekly_charge error(%v)", err)
  149. return
  150. }
  151. log.Info("insert column_weekly_charge : %d", len(weeklyMap))
  152. // column_monthly_charge
  153. err = s.cmDBStore(c, _cmMonthlyCharge, monthlyMap)
  154. if err != nil {
  155. log.Error("s.cmDBStore column_monthly_charge error(%v)", err)
  156. return
  157. }
  158. log.Info("insert column_monthly_charge : %d", len(monthlyMap))
  159. // column_charge_statis
  160. err = s.cmStatisDBStore(c, statisMap)
  161. if err != nil {
  162. log.Error("s.cmStatisDBStore error(%v)", err)
  163. return
  164. }
  165. log.Info("insert column_charge_statis : %d", len(statisMap))
  166. // column_charge_daily_statis
  167. _, err = s.dateStatisInsert(c, dateStatis.daily, _cmDailyStatis)
  168. if err != nil {
  169. log.Error("s.dateStatisInsert error(%v)", err)
  170. return
  171. }
  172. log.Info("insert column_charge_daily_statis : %d", len(dateStatis.daily))
  173. // column_charge_weekly_statis
  174. _, err = s.dateStatisInsert(c, dateStatis.weekly, _cmWeeklyStatis)
  175. if err != nil {
  176. log.Error("s.dateStatisInsert error(%v)", err)
  177. return
  178. }
  179. log.Info("insert column_charge_weekly_statis : %d", len(dateStatis.weekly))
  180. // column_charge_monthly_statis
  181. _, err = s.dateStatisInsert(c, dateStatis.monthly, _cmMonthlyStatis)
  182. if err != nil {
  183. log.Error("s.dateStatisInsert error(%v)", err)
  184. return
  185. }
  186. log.Info("insert column_charge_monthly_statis : %d", len(dateStatis.monthly))
  187. return
  188. })
  189. // writeGroup.Go(func() (err error) {
  190. // return
  191. // })
  192. //
  193. // writeGroup.Go(func() (err error) {
  194. // return
  195. // })
  196. //
  197. // writeGroup.Go(func() (err error) {
  198. // return
  199. // })
  200. //
  201. // writeGroup.Go(func() (err error) {
  202. // return
  203. // })
  204. //
  205. // writeGroup.Go(func() (err error) {
  206. // return
  207. // })
  208. if err = writeGroup.Wait(); err != nil {
  209. log.Error("run writeGroup.Wait error(%v)", err)
  210. }
  211. return
  212. }