run_statis.go 16 KB


  1. package income
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. model "go-common/app/job/main/growup/model/income"
  7. task "go-common/app/job/main/growup/service"
  8. "go-common/library/log"
  9. "golang.org/x/sync/errgroup"
  10. )
  11. // RunStatis run income statistics
  12. func (s *Service) RunStatis(c context.Context, date time.Time) (err error) {
  13. var msg string
  14. mailReceivers := []string{"shaozhenyu@bilibili.com", "gaopeng@bilibili.com", "limengqing@bilibili.com"}
  15. startTime := time.Now().Unix()
  16. err = s.runStatis(c, date)
  17. if err != nil {
  18. msg = err.Error()
  19. } else {
  20. msg = fmt.Sprintf("%s 计算完成,耗时%ds", date.Format("2006-01-02"), time.Now().Unix()-startTime)
  21. }
  22. err = s.email.SendMail(date, msg, "创作激励每日统计%d年%d月%d日", mailReceivers...)
  23. if err != nil {
  24. log.Error("s.email.SendMail error(%v)", err)
  25. }
  26. return
  27. }
  28. func (s *Service) runStatis(c context.Context, date time.Time) (err error) {
  29. defer func() {
  30. task.GetTaskService().SetTaskStatus(c, task.TaskCreativeStatis, date.Format(_layout), err)
  31. }()
  32. // task status type
  33. err = task.GetTaskService().TaskReady(c, date.Format("2006-01-02"), task.TaskCreativeIncome)
  34. if err != nil {
  35. return
  36. }
  37. startWeeklyDate = getStartWeeklyDate(date)
  38. startMonthlyDate = getStartMonthlyDate(date)
  39. var (
  40. readGroup errgroup.Group
  41. avSections = SectionEntries{}
  42. cmSections = SectionEntries{}
  43. bgmSections = SectionEntries{}
  44. upSection = make([]*model.DateStatis, 0)
  45. upAvSection = make([]*model.DateStatis, 0)
  46. upCmSection = make([]*model.DateStatis, 0)
  47. upBgmSection = make([]*model.DateStatis, 0)
  48. upIncomeWeekly = make(map[int64]*model.UpIncome)
  49. upIncomeMonthly = make(map[int64]*model.UpIncome)
  50. upAvStatisCh = make(chan map[int64]*model.UpArchStatis, 1)
  51. upCmStatisCh = make(chan map[int64]*model.UpArchStatis, 1)
  52. upBgmStatisCh = make(chan map[int64]*model.UpArchStatis, 1)
  53. avSourceCh = make(chan []*model.ArchiveIncome, 1000)
  54. avDailyCh = make(chan []*model.ArchiveIncome, 1000)
  55. avWeeklyCh = make(chan []*model.ArchiveIncome, 1000)
  56. avMonthlyCh = make(chan []*model.ArchiveIncome, 1000)
  57. upAvCh = make(chan []*model.ArchiveIncome, 1000)
  58. cmSourceCh = make(chan []*model.ArchiveIncome, 1000)
  59. cmDailyCh = make(chan []*model.ArchiveIncome, 1000)
  60. cmWeeklyCh = make(chan []*model.ArchiveIncome, 1000)
  61. cmMonthlyCh = make(chan []*model.ArchiveIncome, 1000)
  62. upCmCh = make(chan []*model.ArchiveIncome, 1000)
  63. bgmSourceCh = make(chan []*model.ArchiveIncome, 1000)
  64. bgmDailyCh = make(chan []*model.ArchiveIncome, 1000)
  65. bgmWeeklyCh = make(chan []*model.ArchiveIncome, 1000)
  66. bgmMonthlyCh = make(chan []*model.ArchiveIncome, 1000)
  67. upBgmCh = make(chan []*model.ArchiveIncome, 1000)
  68. upSourceCh = make(chan []*model.UpIncome, 1000)
  69. upDailyCh = make(chan []*model.UpIncome, 1000)
  70. upStatisCh = make(chan []*model.UpIncome, 1000)
  71. )
  72. // get income by date to statistics
  73. preDate := startMonthlyDate
  74. if startWeeklyDate.Before(startMonthlyDate) {
  75. preDate = startWeeklyDate
  76. }
  77. // get av_income
  78. readGroup.Go(func() (err error) {
  79. err = s.income.dateStatisSvr.getArchiveByDate(c, avSourceCh, preDate, date, _video, _limitSize)
  80. if err != nil {
  81. log.Error("s.getArchiveByDate error(%v)", err)
  82. }
  83. return
  84. })
  85. // get column_income
  86. readGroup.Go(func() (err error) {
  87. err = s.income.dateStatisSvr.getArchiveByDate(c, cmSourceCh, preDate, date, _column, _limitSize)
  88. if err != nil {
  89. log.Error("s.getArchiveByDate error(%v)", err)
  90. }
  91. return
  92. })
  93. // get bgm_income
  94. readGroup.Go(func() (err error) {
  95. err = s.income.dateStatisSvr.getArchiveByDate(c, bgmSourceCh, preDate, date, _bgm, _limitSize)
  96. if err != nil {
  97. log.Error("s.getArchiveByDate error(%v)", err)
  98. }
  99. return
  100. })
  101. // get up_income
  102. readGroup.Go(func() (err error) {
  103. err = s.income.upIncomeSvr.getUpIncomeByDate(c, upSourceCh, date.Format(_layout), _limitSize)
  104. if err != nil {
  105. log.Error("s.getUpIncomeByDate error(%v)", err)
  106. }
  107. return
  108. })
  109. readGroup.Go(func() (err error) {
  110. defer func() {
  111. close(avDailyCh)
  112. close(avWeeklyCh)
  113. close(avMonthlyCh)
  114. close(upAvCh)
  115. }()
  116. for av := range avSourceCh {
  117. avDailyCh <- av
  118. avWeeklyCh <- av
  119. avMonthlyCh <- av
  120. upAvCh <- av
  121. }
  122. return
  123. })
  124. readGroup.Go(func() (err error) {
  125. defer func() {
  126. close(cmDailyCh)
  127. close(cmWeeklyCh)
  128. close(cmMonthlyCh)
  129. close(upCmCh)
  130. }()
  131. for cm := range cmSourceCh {
  132. cmDailyCh <- cm
  133. cmWeeklyCh <- cm
  134. cmMonthlyCh <- cm
  135. upCmCh <- cm
  136. }
  137. return
  138. })
  139. readGroup.Go(func() (err error) {
  140. defer func() {
  141. close(bgmDailyCh)
  142. close(bgmWeeklyCh)
  143. close(bgmMonthlyCh)
  144. close(upBgmCh)
  145. }()
  146. for bgm := range bgmSourceCh {
  147. bgmDailyCh <- bgm
  148. bgmWeeklyCh <- bgm
  149. bgmMonthlyCh <- bgm
  150. upBgmCh <- bgm
  151. }
  152. return
  153. })
  154. readGroup.Go(func() (err error) {
  155. defer func() {
  156. close(upStatisCh)
  157. close(upDailyCh)
  158. }()
  159. for up := range upSourceCh {
  160. upStatisCh <- up
  161. upDailyCh <- up
  162. }
  163. return
  164. })
  165. // up
  166. readGroup.Go(func() (err error) {
  167. upSection, upAvSection, upCmSection, upBgmSection, err = s.income.dateStatisSvr.handleDateUp(c, upStatisCh, date)
  168. if err != nil {
  169. log.Error("s.income.dateStatisSvr.HandleUp error(%v)", err)
  170. }
  171. return
  172. })
  173. // video
  174. readGroup.Go(func() (err error) {
  175. avSections.avDaily, err = s.income.dateStatisSvr.handleDateStatis(c, avDailyCh, date, _avIncomeDailyStatis)
  176. if err != nil {
  177. log.Error("s.income.dateStatisSvr.handleDateStatis error(%v)", err)
  178. }
  179. return
  180. })
  181. readGroup.Go(func() (err error) {
  182. avSections.avWeekly, err = s.income.dateStatisSvr.handleDateStatis(c, avWeeklyCh, startWeeklyDate, _avIncomeWeeklyStatis)
  183. if err != nil {
  184. log.Error("s.income.dateStatisSvr.handleDateStatis error(%v)", err)
  185. }
  186. return
  187. })
  188. readGroup.Go(func() (err error) {
  189. avSections.avMonthly, err = s.income.dateStatisSvr.handleDateStatis(c, avMonthlyCh, startMonthlyDate, _avIncomeMonthlyStatis)
  190. if err != nil {
  191. log.Error("s.income.dateStatisSvr.handleDateStatis error(%v)", err)
  192. }
  193. return
  194. })
  195. // column
  196. readGroup.Go(func() (err error) {
  197. cmSections.avDaily, err = s.income.dateStatisSvr.handleDateStatis(c, cmDailyCh, date, _cmIncomeDailyStatis)
  198. if err != nil {
  199. log.Error("s.income.dateStatisSvr.handleDateStatis error(%v)", err)
  200. }
  201. return
  202. })
  203. readGroup.Go(func() (err error) {
  204. cmSections.avWeekly, err = s.income.dateStatisSvr.handleDateStatis(c, cmWeeklyCh, startWeeklyDate, _cmIncomeWeeklyStatis)
  205. if err != nil {
  206. log.Error("s.income.dateStatisSvr.handleDateStatis error(%v)", err)
  207. }
  208. return
  209. })
  210. readGroup.Go(func() (err error) {
  211. cmSections.avMonthly, err = s.income.dateStatisSvr.handleDateStatis(c, cmMonthlyCh, startMonthlyDate, _cmIncomeMonthlyStatis)
  212. if err != nil {
  213. log.Error("s.income.dateStatisSvr.handleDateStatis error(%v)", err)
  214. }
  215. return
  216. })
  217. // bgm
  218. readGroup.Go(func() (err error) {
  219. bgmSections.avDaily, err = s.income.dateStatisSvr.handleDateStatis(c, bgmDailyCh, date, _bgmIncomeDailyStatis)
  220. if err != nil {
  221. log.Error("s.income.dateStatisSvr.handleDateStatis error(%v)", err)
  222. }
  223. return
  224. })
  225. readGroup.Go(func() (err error) {
  226. bgmSections.avWeekly, err = s.income.dateStatisSvr.handleDateStatis(c, bgmWeeklyCh, startWeeklyDate, _bgmIncomeWeeklyStatis)
  227. if err != nil {
  228. log.Error("s.income.dateStatisSvr.handleDateStatis error(%v)", err)
  229. }
  230. return
  231. })
  232. readGroup.Go(func() (err error) {
  233. bgmSections.avMonthly, err = s.income.dateStatisSvr.handleDateStatis(c, bgmMonthlyCh, startMonthlyDate, _bgmIncomeMonthlyStatis)
  234. if err != nil {
  235. log.Error("s.income.dateStatisSvr.handleDateStatis error(%v)", err)
  236. }
  237. return
  238. })
  239. // up_av_statis
  240. readGroup.Go(func() (err error) {
  241. err = s.income.upIncomeSvr.handleUpArchStatis(c, upAvStatisCh, upAvCh)
  242. if err != nil {
  243. log.Error("p.upIncomeSvr.handleUpArchStatis error(%v)", err)
  244. }
  245. return
  246. })
  247. // up_column_statis
  248. readGroup.Go(func() (err error) {
  249. err = s.income.upIncomeSvr.handleUpArchStatis(c, upCmStatisCh, upCmCh)
  250. if err != nil {
  251. log.Error("p.upIncomeSvr.handleUpArchStatis error(%v)", err)
  252. }
  253. return
  254. })
  255. // up_bgm_statis
  256. readGroup.Go(func() (err error) {
  257. err = s.income.upIncomeSvr.handleUpArchStatis(c, upBgmStatisCh, upBgmCh)
  258. if err != nil {
  259. log.Error("p.upIncomeSvr.handleUpArchStatis error(%v)", err)
  260. }
  261. return
  262. })
  263. // up_income_weekly up_income_monthly
  264. readGroup.Go(func() (err error) {
  265. upIncomeWeekly, upIncomeMonthly, err = s.income.upIncomeSvr.handleUpIncomeWeeklyAndMonthly(c, date, upAvStatisCh, upCmStatisCh, upBgmStatisCh, upDailyCh)
  266. if err != nil {
  267. log.Error("p.upIncomeSvr.handleUpIncomeWeeklyAndMonthly error(%v)", err)
  268. }
  269. return
  270. })
  271. if err = readGroup.Wait(); err != nil {
  272. log.Error("run readGroup.Wait error(%v)", err)
  273. return
  274. }
  275. {
  276. if len(avSections.avDaily) == 0 {
  277. err = fmt.Errorf("Error: insert 0 av_income_daily_statis")
  278. return
  279. }
  280. if len(avSections.avWeekly) == 0 {
  281. err = fmt.Errorf("Error: insert 0 av_income_weekly_statis")
  282. return
  283. }
  284. if len(avSections.avMonthly) == 0 {
  285. err = fmt.Errorf("Error: insert 0 av_income_monthly_statis")
  286. return
  287. }
  288. if len(cmSections.avDaily) == 0 {
  289. err = fmt.Errorf("Error: insert 0 cm_income_daily_statis")
  290. return
  291. }
  292. if len(cmSections.avWeekly) == 0 {
  293. err = fmt.Errorf("Error: insert 0 cm_income_weekly_statis")
  294. return
  295. }
  296. if len(cmSections.avMonthly) == 0 {
  297. err = fmt.Errorf("Error: insert 0 cm_income_monthly_statis")
  298. return
  299. }
  300. if len(bgmSections.avDaily) == 0 {
  301. err = fmt.Errorf("Error: insert 0 bgm_income_daily_statis")
  302. return
  303. }
  304. if len(bgmSections.avWeekly) == 0 {
  305. err = fmt.Errorf("Error: insert 0 bgm_income_weekly_statis")
  306. return
  307. }
  308. if len(bgmSections.avMonthly) == 0 {
  309. err = fmt.Errorf("Error: insert 0 bgm_income_monthly_statis")
  310. return
  311. }
  312. if len(upSection) == 0 {
  313. err = fmt.Errorf("Error: insert 0 up_income_daily_statis")
  314. return
  315. }
  316. if len(upAvSection) == 0 {
  317. err = fmt.Errorf("Error: insert 0 up_av_daily_statis")
  318. return
  319. }
  320. if len(upCmSection) == 0 {
  321. err = fmt.Errorf("Error: insert 0 up_column_daily_statis")
  322. return
  323. }
  324. if len(upBgmSection) == 0 {
  325. err = fmt.Errorf("Error: insert 0 up_bgm_daily_statis")
  326. return
  327. }
  328. if len(upIncomeWeekly) == 0 {
  329. err = fmt.Errorf("Error: insert 0 up_income_weekly")
  330. return
  331. }
  332. if len(upIncomeMonthly) == 0 {
  333. err = fmt.Errorf("Error: insert 0 up_income_monthly")
  334. return
  335. }
  336. }
  337. // persistent
  338. var writeGroup errgroup.Group
  339. // av_income_daily_statis
  340. writeGroup.Go(func() (err error) {
  341. _, err = s.income.dateStatisSvr.incomeDateStatisInsert(c, avSections.avDaily, _avIncomeDailyStatis)
  342. if err != nil {
  343. log.Error("s.income.dateStatisSvr.incomeDateStatisInsert(daily) error(%v)", err)
  344. return
  345. }
  346. log.Info("insert av_income_daily_statis : %d", len(avSections.avDaily))
  347. return
  348. })
  349. // av_income_weekly_statis
  350. writeGroup.Go(func() (err error) {
  351. _, err = s.income.dateStatisSvr.incomeDateStatisInsert(c, avSections.avWeekly, _avIncomeWeeklyStatis)
  352. if err != nil {
  353. log.Error("s.income.dateStatisSvr.incomeDateStatisInsert(weekly) error(%v)", err)
  354. return
  355. }
  356. log.Info("insert av_income_weekly_statis : %d", len(avSections.avWeekly))
  357. return
  358. })
  359. // av_income_monthly_statis
  360. writeGroup.Go(func() (err error) {
  361. _, err = s.income.dateStatisSvr.incomeDateStatisInsert(c, avSections.avMonthly, _avIncomeMonthlyStatis)
  362. if err != nil {
  363. log.Error("s.income.dateStatisSvr.incomeDateStatisInsert(monthly) error(%v)", err)
  364. return
  365. }
  366. log.Info("insert av_income_monthly_statis : %d", len(avSections.avMonthly))
  367. return
  368. })
  369. // column_income_daily_statis
  370. writeGroup.Go(func() (err error) {
  371. _, err = s.income.dateStatisSvr.incomeDateStatisInsert(c, cmSections.avDaily, _cmIncomeDailyStatis)
  372. if err != nil {
  373. log.Error("s.income.dateStatisSvr.incomeDateStatisInsert(daily) error(%v)", err)
  374. return
  375. }
  376. log.Info("insert column_income_daily_statis : %d", len(cmSections.avDaily))
  377. return
  378. })
  379. // column_income_weekly_statis
  380. writeGroup.Go(func() (err error) {
  381. _, err = s.income.dateStatisSvr.incomeDateStatisInsert(c, cmSections.avWeekly, _cmIncomeWeeklyStatis)
  382. if err != nil {
  383. log.Error("s.income.dateStatisSvr.incomeDateStatisInsert(weekly) error(%v)", err)
  384. return
  385. }
  386. log.Info("insert column_income_weekly_statis : %d", len(cmSections.avWeekly))
  387. return
  388. })
  389. // column_income_monthly_statis
  390. writeGroup.Go(func() (err error) {
  391. _, err = s.income.dateStatisSvr.incomeDateStatisInsert(c, cmSections.avMonthly, _cmIncomeMonthlyStatis)
  392. if err != nil {
  393. log.Error("s.income.dateStatisSvr.incomeDateStatisInsert(monthly) error(%v)", err)
  394. return
  395. }
  396. log.Info("insert column_income_monthly_statis : %d", len(cmSections.avMonthly))
  397. return
  398. })
  399. // bgm_income_daily_statis
  400. writeGroup.Go(func() (err error) {
  401. _, err = s.income.dateStatisSvr.incomeDateStatisInsert(c, bgmSections.avDaily, _bgmIncomeDailyStatis)
  402. if err != nil {
  403. log.Error("s.income.dateStatisSvr.incomeDateStatisInsert(daily) error(%v)", err)
  404. return
  405. }
  406. log.Info("insert bgm_income_daily_statis : %d", len(bgmSections.avDaily))
  407. return
  408. })
  409. // bgm_income_weekly_statis
  410. writeGroup.Go(func() (err error) {
  411. _, err = s.income.dateStatisSvr.incomeDateStatisInsert(c, bgmSections.avWeekly, _bgmIncomeWeeklyStatis)
  412. if err != nil {
  413. log.Error("s.income.dateStatisSvr.incomeDateStatisInsert(weekly) error(%v)", err)
  414. return
  415. }
  416. log.Info("insert bgm_income_weekly_statis : %d", len(bgmSections.avWeekly))
  417. return
  418. })
  419. // bgm_income_monthly_statis
  420. writeGroup.Go(func() (err error) {
  421. _, err = s.income.dateStatisSvr.incomeDateStatisInsert(c, bgmSections.avMonthly, _bgmIncomeMonthlyStatis)
  422. if err != nil {
  423. log.Error("s.income.dateStatisSvr.incomeDateStatisInsert(monthly) error(%v)", err)
  424. return
  425. }
  426. log.Info("insert bgm_income_monthly_statis : %d", len(bgmSections.avMonthly))
  427. return
  428. })
  429. // up_income_daily_statis
  430. writeGroup.Go(func() (err error) {
  431. _, err = s.income.dateStatisSvr.upIncomeDailyStatisInsert(c, upSection, "up_income_daily_statis")
  432. if err != nil {
  433. log.Error("s.upIncomeDailyStatisInsert error(%v)", err)
  434. return
  435. }
  436. log.Info("insert up_income_daily_statis : %d", len(upSection))
  437. return
  438. })
  439. // up_av_daily_statis
  440. writeGroup.Go(func() (err error) {
  441. _, err = s.income.dateStatisSvr.upIncomeDailyStatisInsert(c, upAvSection, "up_av_daily_statis")
  442. if err != nil {
  443. log.Error("s.upIncomeDailyStatisInsert error(%v)", err)
  444. return
  445. }
  446. log.Info("insert up_av_daily_statis : %d", len(upAvSection))
  447. return
  448. })
  449. // up_column_daily_statis
  450. writeGroup.Go(func() (err error) {
  451. _, err = s.income.dateStatisSvr.upIncomeDailyStatisInsert(c, upCmSection, "up_column_daily_statis")
  452. if err != nil {
  453. log.Error("s.upIncomeDailyStatisInsert error(%v)", err)
  454. return
  455. }
  456. log.Info("insert up_column_daily_statis : %d", len(upCmSection))
  457. return
  458. })
  459. // up_bgm_daily_statis
  460. writeGroup.Go(func() (err error) {
  461. _, err = s.income.dateStatisSvr.upIncomeDailyStatisInsert(c, upBgmSection, "up_bgm_daily_statis")
  462. if err != nil {
  463. log.Error("s.upIncomeDailyStatisInsert error(%v)", err)
  464. return
  465. }
  466. log.Info("insert up_bgm_daily_statis : %d", len(upBgmSection))
  467. return
  468. })
  469. // up_income_weekly
  470. writeGroup.Go(func() (err error) {
  471. err = s.income.upIncomeSvr.UpIncomeDBStoreBatch(c, _upIncomeWeekly, upIncomeWeekly)
  472. if err != nil {
  473. log.Error("s.UpIncomeDBStoreBatch up_income_weekly error(%v)", err)
  474. return
  475. }
  476. log.Info("insert up_income_weekly : %d", len(upIncomeWeekly))
  477. return
  478. })
  479. // up_income_monthly
  480. writeGroup.Go(func() (err error) {
  481. err = s.income.upIncomeSvr.UpIncomeDBStoreBatch(c, _upIncomeMonthly, upIncomeMonthly)
  482. if err != nil {
  483. log.Error("s.UpIncomeDBStoreBatch up_income_monthly error(%v)", err)
  484. return
  485. }
  486. log.Info("insert up_income_monthly : %d", len(upIncomeMonthly))
  487. return
  488. })
  489. if err = writeGroup.Wait(); err != nil {
  490. log.Error("run writeGroup.Wait error(%v)", err)
  491. }
  492. return
  493. }