run.go 13 KB


  1. package income
  2. import (
  3. "context"
  4. "fmt"
  5. "sort"
  6. "time"
  7. model "go-common/app/job/main/growup/model/income"
  8. task "go-common/app/job/main/growup/service"
  9. "go-common/library/log"
  10. "golang.org/x/sync/errgroup"
  11. )
  12. // RunAndSendMail run and send email
  13. func (s *Service) RunAndSendMail(c context.Context, date time.Time) (err error) {
  14. var mailReceivers []string
  15. var msg string
  16. for _, v := range s.conf.Mail.Send {
  17. if v.Type == 3 {
  18. mailReceivers = v.Addr
  19. }
  20. }
  21. startTime := time.Now().Unix()
  22. err = s.run(c, date)
  23. if err != nil {
  24. msg = err.Error()
  25. mailReceivers = []string{"shaozhenyu@bilibili.com", "gaopeng@bilibili.com", "limengqing@bilibili.com"}
  26. } else {
  27. msg = fmt.Sprintf("%s 计算完成,耗时%ds", date.Format("2006-01-02"), time.Now().Unix()-startTime)
  28. }
  29. emailErr := s.email.SendMail(date, msg, "创作激励每日计算%d年%d月%d日", mailReceivers...)
  30. if emailErr != nil {
  31. log.Error("s.email.SendMail error(%v)", emailErr)
  32. }
  33. return
  34. }
  35. func (s *Service) run(c context.Context, date time.Time) (err error) {
  36. defer func() {
  37. task.GetTaskService().SetTaskStatus(c, task.TaskCreativeIncome, date.Format(_layout), err)
  38. }()
  39. err = task.GetTaskService().TaskReady(c, date.Format("2006-01-02"), task.TaskAvCharge, task.TaskCmCharge, task.TaskTagRatio, task.TaskBubbleMeta, task.TaskBlacklist, task.TaskBgmSync)
  40. if err != nil {
  41. return
  42. }
  43. startWeeklyDate = getStartWeeklyDate(date)
  44. startMonthlyDate = getStartMonthlyDate(date)
  45. /*################ Serializable Begin ################*/
  46. // av charge ratio
  47. ratios, err := s.ratio.ArchiveChargeRatio(c, int64(_limitSize))
  48. if err != nil {
  49. return
  50. }
  51. // up charge ratio
  52. urs, err := s.ratio.UpChargeRatio(c, int64(_limitSize))
  53. if err != nil {
  54. return
  55. }
  56. // av income statistics
  57. astat, err := s.income.avIncomeStatSvr.AvIncomeStat(c, int64(_limitSize))
  58. if err != nil {
  59. log.Error("s.income.avIncomeStatSvr.AvIncomeStat error(%v) ", err)
  60. return
  61. }
  62. log.Info("get av_income_statis : %d", len(astat))
  63. // bgm income statistics
  64. bstat, err := s.income.bgmIncomeStatSvr.BgmIncomeStat(c, int64(_limitSize))
  65. if err != nil {
  66. log.Error("s.income.bgmIncomeStatSvr.BgmIncomeStat error(%v) ", err)
  67. return
  68. }
  69. log.Info("get bgm_income_statis : %d", len(bstat))
  70. // column income statistics
  71. cstat, err := s.income.columnIncomeStatSvr.ColumnIncomeStat(c, int64(_limitSize))
  72. if err != nil {
  73. log.Error("s.income.columnIncomeStatSvr.ColumnIncomeStat error(%v) ", err)
  74. return
  75. }
  76. log.Info("get column_income_statis : %d", len(cstat))
  77. // up income statistics
  78. ustat, err := s.income.upIncomeStatSvr.UpIncomeStat(c, int64(_limitSize))
  79. if err != nil {
  80. log.Error("s.income.upIncomeStatSvr.UpIncomeStat error(%v) ", err)
  81. return
  82. }
  83. log.Info("get up_income_statis : %d", len(ustat))
  84. // up accounts
  85. accs, err := s.income.upAccountSvr.UpAccount(c, int64(_limitSize))
  86. if err != nil {
  87. log.Error("s.income.upAccountSvr.UpAccount error(%v)", err)
  88. return
  89. }
  90. log.Info("get up_account : %d", len(accs))
  91. // bubble meta
  92. bubbleMeta, err := s.GetBubbleMeta(c)
  93. if err != nil {
  94. log.Error("s.GetBubbleMeta error(%v)", err)
  95. return
  96. }
  97. log.Info("get lottery_av_info avids: %d", len(bubbleMeta))
  98. bubbleRatio := s.avToBubbleRatio(bubbleMeta)
  99. // av signed ups
  100. var (
  101. avFilters []AvFilter
  102. columnFilters []ColumnFilter
  103. bgmFilter BgmFilter
  104. )
  105. //black list
  106. blacks, err := s.Blacklist(c, 2000)
  107. if err != nil {
  108. return
  109. }
  110. // black ctype 0: av
  111. avBlackFilter := avFilter(blacks[0])
  112. // black ctype 2: column
  113. columnBlackFilter := columnFilter(blacks[2])
  114. // business orders
  115. bos, err := s.GetBusinessOrders(c, 2000)
  116. if err != nil {
  117. return
  118. }
  119. bosFilter := avFilter(bos)
  120. // signed up
  121. signed := make(map[int64]bool)
  122. signedAv, err := s.Signed(c, "video", 2000)
  123. if err != nil {
  124. return
  125. }
  126. savf := signedAvFilter(signedAv, date)
  127. for mid := range signedAv {
  128. signed[mid] = true
  129. }
  130. signedColumn, err := s.Signed(c, "column", 2000)
  131. if err != nil {
  132. return
  133. }
  134. for mid := range signedColumn {
  135. signed[mid] = true
  136. }
  137. signedBgm, err := s.Signed(c, "bgm", 2000)
  138. if err != nil {
  139. return
  140. }
  141. for mid := range signedBgm {
  142. signed[mid] = true
  143. }
  144. bgms, err := s.BGMs(c, 2000)
  145. if err != nil {
  146. return
  147. }
  148. {
  149. avFilters = append(avFilters, avBlackFilter)
  150. avFilters = append(avFilters, bosFilter)
  151. avFilters = append(avFilters, savf)
  152. bgmFilter = signedBgmFilter(signedBgm, date)
  153. columnFilters = append(columnFilters, signedColumnFilter(signedColumn, date))
  154. columnFilters = append(columnFilters, columnBlackFilter)
  155. }
  156. /*################ Serializable End ##################*/
  157. var (
  158. readGroup errgroup.Group
  159. sourceCh = make(chan []*model.AvCharge, 1000)
  160. // av
  161. incomeCh = make(chan []*model.AvCharge, 1000)
  162. // bgm
  163. bgmCh = make(chan []*model.AvCharge, 1000)
  164. // column
  165. columnSourceCh = make(chan []*model.ColumnCharge, 1000)
  166. // business income
  167. businessCh = make(chan map[int64]*model.UpBusinessIncome, 10)
  168. )
  169. // get av daily charge and repost to other channels
  170. readGroup.Go(func() (err error) {
  171. err = s.avCharge.AvCharges(c, date, sourceCh, bubbleRatio)
  172. if err != nil {
  173. log.Error("s.avCharge.AvCharges error(%v)", err)
  174. return
  175. }
  176. log.Info("av_daily_charge finished")
  177. return
  178. })
  179. // get column daily charge
  180. readGroup.Go(func() (err error) {
  181. err = s.columnCharges(c, date, columnSourceCh)
  182. if err != nil {
  183. log.Error("s.columnCharges error(%v)", err)
  184. return
  185. }
  186. log.Info("column_daily_charge finished")
  187. return
  188. })
  189. readGroup.Go(func() (err error) {
  190. defer func() {
  191. close(incomeCh)
  192. close(bgmCh)
  193. }()
  194. for charges := range sourceCh {
  195. incomeCh <- charges
  196. bgmCh <- charges
  197. }
  198. return
  199. })
  200. // up and av income compute
  201. var (
  202. um map[int64]*model.UpIncome
  203. am map[int64][]*model.AvIncome
  204. bm map[int64]map[int64]map[int64]*model.BgmIncome
  205. cm map[int64][]*model.ColumnIncome
  206. )
  207. readGroup.Go(func() (err error) {
  208. //um, am = s.income.Compute(c, date, incomeCh, urs, ars, ustat, astat, accs, filters, signed)
  209. var business map[int64]*model.UpBusinessIncome
  210. am, business = s.income.CalAvIncome(incomeCh, urs[1], ratios[1], avFilters, signed)
  211. businessCh <- business
  212. return
  213. })
  214. readGroup.Go(func() (err error) {
  215. var business map[int64]*model.UpBusinessIncome
  216. bm, business = s.income.CalBgmIncome(bgmCh, bgms, urs[3], ratios[3], avFilters, bgmFilter, blacks[3], signed)
  217. businessCh <- business
  218. return
  219. })
  220. readGroup.Go(func() (err error) {
  221. var business map[int64]*model.UpBusinessIncome
  222. cm, business = s.income.CalColumnIncome(columnSourceCh, urs[2], ratios[2], columnFilters, signed)
  223. businessCh <- business
  224. return
  225. })
  226. readGroup.Go(func() (err error) {
  227. um = s.income.CalUpIncome(businessCh, date)
  228. s.income.IncomeStat(um, am, bm, cm, ustat, astat, bstat, cstat)
  229. s.income.PurgeUpAccount(date, accs, um)
  230. return
  231. })
  232. if err = readGroup.Wait(); err != nil {
  233. log.Error("run readGroup.Wait error(%v)", err)
  234. return
  235. }
  236. // security verification
  237. {
  238. if len(am) == 0 {
  239. err = fmt.Errorf("Error: insert 0 av_income")
  240. return
  241. }
  242. if len(bm) == 0 {
  243. err = fmt.Errorf("Error: insert 0 bgm_income")
  244. return
  245. }
  246. if len(cm) == 0 {
  247. err = fmt.Errorf("Error: insert 0 column_income")
  248. return
  249. }
  250. if len(um) == 0 {
  251. err = fmt.Errorf("Error: insert 0 up_income")
  252. return
  253. }
  254. if len(astat) == 0 {
  255. err = fmt.Errorf("Error: insert 0 av_income_statis")
  256. return
  257. }
  258. if len(bstat) == 0 {
  259. err = fmt.Errorf("Error: insert 0 bgm_income_statis")
  260. return
  261. }
  262. if len(cstat) == 0 {
  263. err = fmt.Errorf("Error: insert 0 column_income_statis")
  264. return
  265. }
  266. if len(ustat) == 0 {
  267. err = fmt.Errorf("Error: insert 0 up_income_statis")
  268. return
  269. }
  270. if len(accs) == 0 {
  271. err = fmt.Errorf("Error: insert 0 up_account")
  272. return
  273. }
  274. }
  275. // persistent
  276. var writeGroup errgroup.Group
  277. // av_income
  278. writeGroup.Go(func() (err error) {
  279. err = s.income.avIncomeSvr.BatchInsertAvIncome(c, am)
  280. if err != nil {
  281. log.Error("s.income.BatchInsertAvIncome error(%v)", err)
  282. return
  283. }
  284. log.Info("insert av_income : %d", len(am))
  285. return
  286. })
  287. // column_income
  288. writeGroup.Go(func() (err error) {
  289. err = s.income.columnIncomeSvr.BatchInsertColumnIncome(c, cm)
  290. if err != nil {
  291. log.Error("s.income.BatchInsertColumnIncome error(%v)", err)
  292. return
  293. }
  294. log.Info("insert column_income : %d", len(cm))
  295. return
  296. })
  297. // bgm_income
  298. writeGroup.Go(func() (err error) {
  299. err = s.income.bgmIncomeSvr.BatchInsertBgmIncome(c, bm)
  300. if err != nil {
  301. log.Error("s.income.BatchInsertBgmIncome error(%v)", err)
  302. return
  303. }
  304. log.Info("insert bgm_income : %d", len(bm))
  305. return
  306. })
  307. // up income
  308. writeGroup.Go(func() (err error) {
  309. err = s.income.upIncomeSvr.BatchInsertUpIncome(c, um)
  310. if err != nil {
  311. log.Error("s.income.BatchInsertUpIncome error(%v)", err)
  312. return
  313. }
  314. log.Info("insert up_income : %d", len(um))
  315. return
  316. })
  317. // av_income_statis
  318. writeGroup.Go(func() (err error) {
  319. err = s.income.avIncomeStatSvr.BatchInsertAvIncomeStat(c, astat)
  320. if err != nil {
  321. log.Error("s.income.BatchInsertAvIncomeStat error(%v)", err)
  322. return
  323. }
  324. log.Info("insert av_income_statis : %d", len(astat))
  325. return
  326. })
  327. // column_income_statis
  328. writeGroup.Go(func() (err error) {
  329. err = s.income.columnIncomeStatSvr.BatchInsertColumnIncomeStat(c, cstat)
  330. if err != nil {
  331. log.Error("s.income.BatchInsertColumnIncomeStat error(%v)", err)
  332. return
  333. }
  334. log.Info("insert column_income_statis : %d", len(cstat))
  335. return
  336. })
  337. // bgm_income_statis
  338. writeGroup.Go(func() (err error) {
  339. err = s.income.bgmIncomeStatSvr.BatchInsertBgmIncomeStat(c, bstat)
  340. if err != nil {
  341. log.Error("s.income.BatchInsertBgmIncomeStat error(%v)", err)
  342. return
  343. }
  344. log.Info("insert bgm_income_statis : %d", len(bstat))
  345. return
  346. })
  347. // up_income_statis
  348. writeGroup.Go(func() (err error) {
  349. err = s.income.upIncomeStatSvr.BatchInsertUpIncomeStat(c, ustat)
  350. if err != nil {
  351. log.Error("s.income.BatchInsertUpIncomeStat error(%v)", err)
  352. return
  353. }
  354. log.Info("insert up_income_statis : %d", len(ustat))
  355. return
  356. })
  357. // up_account batch insert
  358. writeGroup.Go(func() (err error) {
  359. err = s.income.upAccountSvr.BatchInsertUpAccount(c, accs)
  360. if err != nil {
  361. log.Error("s.income.BatchInsertUpAccount error(%v)", err)
  362. return
  363. }
  364. log.Info("insert up_account : %d", len(accs))
  365. return
  366. })
  367. // up account single update
  368. writeGroup.Go(func() (err error) {
  369. err = s.income.upAccountSvr.UpdateUpAccount(c, accs)
  370. if err != nil {
  371. log.Error("s.income.UpdateUpAccount error(%v)", err)
  372. return
  373. }
  374. log.Info("update up_account : %d", len(accs))
  375. return
  376. })
  377. if err = writeGroup.Wait(); err != nil {
  378. log.Error("run writeGroup.Wait error(%v)", err)
  379. }
  380. return
  381. }
  382. func signedBgmFilter(m map[int64]*model.Signed, date time.Time) BgmFilter {
  383. return func(charge *model.AvCharge, bgm *model.BGM) bool {
  384. if up, ok := m[bgm.MID]; ok {
  385. if charge.Date.Time().Before(up.SignedAt.Time()) {
  386. return true
  387. }
  388. if (up.AccountState == 5 || up.AccountState == 6) && up.QuitAt.Time().Before(date.AddDate(0, 0, 1)) {
  389. return true
  390. }
  391. } else {
  392. return true
  393. }
  394. return false
  395. }
  396. }
  397. func signedAvFilter(m map[int64]*model.Signed, date time.Time) AvFilter {
  398. return func(charge *model.AvCharge) bool {
  399. if up, ok := m[charge.MID]; ok {
  400. if charge.UploadTime.Time().Before(up.SignedAt.Time()) {
  401. return true
  402. }
  403. if (up.AccountState == 5 || up.AccountState == 6) && up.QuitAt.Time().Before(date.AddDate(0, 0, 1)) {
  404. return true
  405. }
  406. } else {
  407. return true
  408. }
  409. return false
  410. }
  411. }
  412. func signedColumnFilter(m map[int64]*model.Signed, date time.Time) ColumnFilter {
  413. return func(charge *model.ColumnCharge) bool {
  414. if up, ok := m[charge.MID]; ok {
  415. if charge.UploadTime.Time().Before(up.SignedAt.Time()) {
  416. return true
  417. }
  418. if (up.AccountState == 5 || up.AccountState == 6) && up.QuitAt.Time().Before(date.AddDate(0, 0, 1)) {
  419. return true
  420. }
  421. } else {
  422. return true
  423. }
  424. return false
  425. }
  426. }
  427. func avFilter(m map[int64]bool) AvFilter {
  428. return func(charge *model.AvCharge) bool {
  429. return m[charge.AvID]
  430. }
  431. }
  432. func columnFilter(m map[int64]bool) ColumnFilter {
  433. return func(charge *model.ColumnCharge) bool {
  434. return m[charge.ArticleID]
  435. }
  436. }
  437. func (s *Service) avToBubbleRatio(bubbleMeta map[int64][]int) map[int64]float64 {
  438. var (
  439. res = make(map[int64]float64)
  440. typeToRatio = make(map[int]float64)
  441. chooseBType = func(bTypes []int) (bType int) {
  442. if len(bTypes) == 1 {
  443. return bTypes[0]
  444. }
  445. sort.Slice(bTypes, func(i, j int) bool {
  446. bti, btj := bTypes[i], bTypes[j]
  447. if typeToRatio[bti] == typeToRatio[btj] {
  448. return bti < btj
  449. }
  450. return typeToRatio[bti] < typeToRatio[btj]
  451. })
  452. return bTypes[0]
  453. }
  454. )
  455. for _, v := range s.conf.Bubble.BRatio {
  456. typeToRatio[v.BType] = v.Ratio
  457. }
  458. for avID, bTypes := range bubbleMeta {
  459. bType := chooseBType(bTypes)
  460. res[avID] = typeToRatio[bType]
  461. }
  462. return res
  463. }
  464. // BgmFilter av charge filter
  465. type BgmFilter func(*model.AvCharge, *model.BGM) bool
  466. // AvFilter av charge filter
  467. type AvFilter func(*model.AvCharge) bool
  468. // ColumnFilter column charge filter
  469. type ColumnFilter func(*model.ColumnCharge) bool
  470. // ChargeRegulator regulates av charge
  471. type ChargeRegulator func(*model.AvCharge)
  472. // UpdateBusinessIncome ..
  473. func (s *Service) UpdateBusinessIncome(c context.Context, date string) (err error) {
  474. return s.income.UpdateBusinessIncomeByDate(c, date)
  475. }