run_bgm.go 6.1 KB


  1. package charge
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. model "go-common/app/job/main/growup/model/charge"
  7. "go-common/library/log"
  8. "golang.org/x/sync/errgroup"
  9. )
  10. var (
  11. _bgmDailyCharge = "bgm_daily_charge"
  12. _bgmWeeklyCharge = "bgm_weekly_charge"
  13. _bgmMonthlyCharge = "bgm_monthly_charge"
  14. _bgmDailyStatis = "bgm_charge_daily_statis"
  15. _bgmWeeklyStatis = "bgm_charge_weekly_statis"
  16. _bgmMonthlyStatis = "bgm_charge_monthly_statis"
  17. )
  18. func (s *Service) runBgm(c context.Context, date time.Time, avBgmCharge chan []*model.AvCharge) (err error) {
  19. startWeeklyDate = getStartWeeklyDate(date)
  20. startMonthlyDate = getStartMonthlyDate(date)
  21. var (
  22. readGroup errgroup.Group
  23. dailyMap = make(map[string]*model.BgmCharge)
  24. sourceCh = make(chan []*model.BgmCharge, 1000)
  25. dailyStatisCh = make(chan []*model.BgmCharge, 1000)
  26. bgmCh = make(chan []*model.BgmCharge, 1000)
  27. )
  28. readGroup.Go(func() (err error) {
  29. dailyMap, err = s.bgmCharges(c, date, sourceCh, avBgmCharge)
  30. if err != nil {
  31. log.Error("s.bgmCharges error(%v)", err)
  32. return
  33. }
  34. log.Info("get bgm_daily_charge finished")
  35. return
  36. })
  37. readGroup.Go(func() (err error) {
  38. defer func() {
  39. close(bgmCh)
  40. close(dailyStatisCh)
  41. }()
  42. for charges := range sourceCh {
  43. bgmCh <- charges
  44. dailyStatisCh <- charges
  45. }
  46. return
  47. })
  48. var (
  49. weeklyMap map[string]*model.BgmCharge
  50. monthlyMap map[string]*model.BgmCharge
  51. statisMap map[string]*model.BgmStatis
  52. weeklyCh = make(chan map[string]*model.BgmCharge, 1)
  53. monthlyCh = make(chan map[string]*model.BgmCharge, 1)
  54. )
  55. readGroup.Go(func() (err error) {
  56. defer func() {
  57. close(weeklyCh)
  58. close(monthlyCh)
  59. }()
  60. weeklyMap, monthlyMap, statisMap, err = s.handleBgm(c, date, bgmCh)
  61. if err != nil {
  62. log.Error("s.handleBgm error(%v)", err)
  63. return
  64. }
  65. weeklyCh <- weeklyMap
  66. monthlyCh <- monthlyMap
  67. log.Info("handleBgm finished")
  68. return
  69. })
  70. var (
  71. dateStatis = &SectionEntries{}
  72. bgmDaily = make(chan []*model.Archive, 2000)
  73. bgmWeekly = make(chan []*model.Archive, 1)
  74. bgmMonthly = make(chan []*model.Archive, 1)
  75. )
  76. readGroup.Go(func() (err error) {
  77. defer close(bgmDaily)
  78. for bgms := range dailyStatisCh {
  79. bgmDaily <- transBgm2Archive(bgms)
  80. }
  81. return
  82. })
  83. readGroup.Go(func() (err error) {
  84. defer close(bgmWeekly)
  85. bgmWeekly <- transBgmMap2Archive(<-weeklyCh)
  86. return
  87. })
  88. readGroup.Go(func() (err error) {
  89. defer close(bgmMonthly)
  90. bgmMonthly <- transBgmMap2Archive(<-monthlyCh)
  91. return
  92. })
  93. readGroup.Go(func() (err error) {
  94. dateStatis.daily, err = s.handleDateStatis(c, bgmDaily, date, _bgmDailyStatis)
  95. if err != nil {
  96. log.Error("s.handleDateStatis(%s) error(%v)", _bgmDailyStatis, err)
  97. }
  98. return
  99. })
  100. readGroup.Go(func() (err error) {
  101. dateStatis.weekly, err = s.handleDateStatis(c, bgmWeekly, startWeeklyDate, _bgmWeeklyStatis)
  102. if err != nil {
  103. log.Error("s.handleDateStatis(%s) error(%v)", _bgmWeeklyStatis, err)
  104. }
  105. return
  106. })
  107. readGroup.Go(func() (err error) {
  108. dateStatis.monthly, err = s.handleDateStatis(c, bgmMonthly, startMonthlyDate, _bgmMonthlyStatis)
  109. if err != nil {
  110. log.Error("s.handleDateStatis(%s) error(%v)", _bgmMonthlyStatis, err)
  111. }
  112. return
  113. })
  114. if err = readGroup.Wait(); err != nil {
  115. log.Error("run readGroup.Wait error(%v)", err)
  116. return
  117. }
  118. {
  119. if len(dailyMap) == 0 {
  120. err = fmt.Errorf("Error: insert 0 bgm_daily_charge")
  121. return
  122. }
  123. if len(weeklyMap) == 0 {
  124. err = fmt.Errorf("Error: insert 0 bgm_weekly_charge")
  125. return
  126. }
  127. if len(monthlyMap) == 0 {
  128. err = fmt.Errorf("Error: insert 0 bgm_monthly_charge")
  129. return
  130. }
  131. if len(statisMap) == 0 {
  132. err = fmt.Errorf("Error: insert 0 bgm_charge_statis")
  133. return
  134. }
  135. if len(dateStatis.daily) == 0 {
  136. err = fmt.Errorf("Error: insert 0 bgm_charge_daily_statis")
  137. return
  138. }
  139. if len(dateStatis.weekly) == 0 {
  140. err = fmt.Errorf("Error: insert 0 bgm_charge_weekly_statis")
  141. return
  142. }
  143. if len(dateStatis.monthly) == 0 {
  144. err = fmt.Errorf("Error: insert 0 bgm_charge_monthly_statis")
  145. return
  146. }
  147. }
  148. // persist
  149. var writeGroup errgroup.Group
  150. // bgm_daily_charge
  151. writeGroup.Go(func() (err error) {
  152. err = s.bgmDBStore(c, _bgmDailyCharge, dailyMap)
  153. if err != nil {
  154. log.Error("s.bgmDBStore bgm_daily_charge error(%v)", err)
  155. return
  156. }
  157. log.Info("insert bgm_daily_charge : %d", len(dailyMap))
  158. // bgm_weekly_charge
  159. err = s.bgmDBStore(c, _bgmWeeklyCharge, weeklyMap)
  160. if err != nil {
  161. log.Error("s.bgmDBStore bgm_weekly_charge error(%v)", err)
  162. return
  163. }
  164. log.Info("insert bgm_weekly_charge : %d", len(weeklyMap))
  165. // bgm_monthly_charge
  166. err = s.bgmDBStore(c, _bgmMonthlyCharge, monthlyMap)
  167. if err != nil {
  168. log.Error("s.bgmDBStore bgm_monthly_charge error(%v)", err)
  169. return
  170. }
  171. log.Info("insert bgm_monthly_charge : %d", len(monthlyMap))
  172. // bgm_charge_statis
  173. err = s.bgmStatisDBStore(c, statisMap)
  174. if err != nil {
  175. log.Error("s.bgmStatisDBStore error(%v)", err)
  176. return
  177. }
  178. log.Info("insert bgm_charge_statis : %d", len(statisMap))
  179. // bgm_charge_daily_statis
  180. _, err = s.dateStatisInsert(c, dateStatis.daily, _bgmDailyStatis)
  181. if err != nil {
  182. log.Error("s.dateStatisInsert error(%v)", err)
  183. return
  184. }
  185. log.Info("insert bgm_charge_daily_statis : %d", len(dateStatis.daily))
  186. // bgm_charge_weekly_statis
  187. _, err = s.dateStatisInsert(c, dateStatis.weekly, _bgmWeeklyStatis)
  188. if err != nil {
  189. log.Error("s.dateStatisInsert error(%v)", err)
  190. return
  191. }
  192. log.Info("insert bgm_charge_weekly_statis : %d", len(dateStatis.weekly))
  193. // bgm_charge_monthly_statis
  194. _, err = s.dateStatisInsert(c, dateStatis.monthly, _bgmMonthlyStatis)
  195. if err != nil {
  196. log.Error("s.dateStatisInsert error(%v)", err)
  197. return
  198. }
  199. log.Info("insert bgm_charge_monthly_statis : %d", len(dateStatis.monthly))
  200. return
  201. })
  202. // writeGroup.Go(func() (err error) {
  203. // return
  204. // })
  205. //
  206. // writeGroup.Go(func() (err error) {
  207. // return
  208. // })
  209. //
  210. // writeGroup.Go(func() (err error) {
  211. // return
  212. // })
  213. //
  214. // writeGroup.Go(func() (err error) {
  215. // return
  216. // })
  217. //
  218. // writeGroup.Go(func() (err error) {
  219. // return
  220. // })
  221. //
  222. // writeGroup.Go(func() (err error) {
  223. // return
  224. // })
  225. if err = writeGroup.Wait(); err != nil {
  226. log.Error("run writeGroup.Wait error(%v)", err)
  227. }
  228. return
  229. }