up_income_date.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. package income
  2. import (
  3. "bytes"
  4. "context"
  5. "strconv"
  6. "strings"
  7. "time"
  8. model "go-common/app/job/main/growup/model/income"
  9. "go-common/library/log"
  10. xtime "go-common/library/time"
  11. )
  12. var (
  13. _upIncomeWeekly = "up_income_weekly"
  14. _upIncomeMonthly = "up_income_monthly"
  15. )
  16. func (s *UpIncomeSvr) handleUpIncomeWeeklyAndMonthly(
  17. c context.Context,
  18. date time.Time,
  19. upAvStatisCh chan map[int64]*model.UpArchStatis,
  20. upCmStatisCh chan map[int64]*model.UpArchStatis,
  21. upBgmStatisCh chan map[int64]*model.UpArchStatis,
  22. upSliceCh chan []*model.UpIncome) (weeklyMap, monthlyMap map[int64]*model.UpIncome, err error) {
  23. weeklyMap, monthlyMap, err = s.GetUpIncomeWeeklyAndMonthly(c, date)
  24. if err != nil {
  25. log.Error("s.GetUpIncomeWeeklyAndMonthly error(%v)", err)
  26. return
  27. }
  28. upAvStatis := <-upAvStatisCh
  29. upCmStatis := <-upCmStatisCh
  30. upBgmStatis := <-upBgmStatisCh
  31. s.calUpIncomeWeeklyAndMonthly(weeklyMap, monthlyMap, upAvStatis, upCmStatis, upBgmStatis, upSliceCh)
  32. return
  33. }
  34. // GetUpIncomeWeeklyAndMonthly get up_income_weekly and up_income_monthly
  35. func (s *UpIncomeSvr) GetUpIncomeWeeklyAndMonthly(c context.Context, date time.Time) (weeklyMap map[int64]*model.UpIncome, monthlyMap map[int64]*model.UpIncome, err error) {
  36. upIncomeWeekly, err := s.GetUpIncomeTable(c, startWeeklyDate, _upIncomeWeekly)
  37. if err != nil {
  38. log.Error("s.GetUpIncomeTable error(%v)", err)
  39. return
  40. }
  41. upIncomeMonthly, err := s.GetUpIncomeTable(c, startMonthlyDate, _upIncomeMonthly)
  42. if err != nil {
  43. log.Error("s.GetUpIncomeTable error(%v)", err)
  44. return
  45. }
  46. weeklyMap = make(map[int64]*model.UpIncome)
  47. monthlyMap = make(map[int64]*model.UpIncome)
  48. for _, weeklyIncome := range upIncomeWeekly {
  49. weeklyMap[weeklyIncome.MID] = weeklyIncome
  50. }
  51. for _, monthlyIncome := range upIncomeMonthly {
  52. monthlyMap[monthlyIncome.MID] = monthlyIncome
  53. }
  54. return
  55. }
  56. // GetUpIncomeTable get up income table
  57. func (s *UpIncomeSvr) GetUpIncomeTable(c context.Context, date time.Time, table string) (upIncomes []*model.UpIncome, err error) {
  58. var id int64
  59. for {
  60. upIncome, err1 := s.dao.GetUpIncomeTable(c, table, date.Format(_layout), id, _limitSize)
  61. if err1 != nil {
  62. err = err1
  63. return
  64. }
  65. upIncomes = append(upIncomes, upIncome...)
  66. if len(upIncome) < _limitSize {
  67. break
  68. }
  69. id = upIncome[len(upIncome)-1].ID
  70. }
  71. return
  72. }
  73. func (s *UpIncomeSvr) calUpIncomeWeeklyAndMonthly(weeklyMap, monthlyMap map[int64]*model.UpIncome,
  74. upAvStatis, upCmStatis, upBgmStatis map[int64]*model.UpArchStatis, upSliceCh chan []*model.UpIncome) {
  75. for upIncome := range upSliceCh {
  76. s.calUpIncome(upIncome, weeklyMap, monthlyMap, upAvStatis, upCmStatis, upBgmStatis)
  77. }
  78. }
  79. func (s *UpIncomeSvr) calUpIncome(upIncome []*model.UpIncome, weeklyMap, monthlyMap map[int64]*model.UpIncome,
  80. upAvStatis, upCmStatis, upBgmStatis map[int64]*model.UpArchStatis) {
  81. var weeklyAvCount, monthlyAvCount int
  82. var weeklyCmCount, monthlyCmCount int
  83. var weeklyBgmCount, monthlyBgmCount int
  84. for _, income := range upIncome {
  85. weeklyAvCount, monthlyAvCount = 0, 0
  86. weeklyCmCount, monthlyCmCount = 0, 0
  87. weeklyBgmCount, monthlyBgmCount = 0, 0
  88. if statis, ok := upAvStatis[income.MID]; ok {
  89. weeklyAvCount = len(strings.Split(statis.WeeklyAIDs, ","))
  90. monthlyAvCount = len(strings.Split(statis.MonthlyAIDs, ","))
  91. }
  92. if statis, ok := upCmStatis[income.MID]; ok {
  93. weeklyCmCount = len(strings.Split(statis.WeeklyAIDs, ","))
  94. monthlyCmCount = len(strings.Split(statis.MonthlyAIDs, ","))
  95. }
  96. if statis, ok := upBgmStatis[income.MID]; ok {
  97. weeklyBgmCount = len(strings.Split(statis.WeeklyAIDs, ","))
  98. monthlyBgmCount = len(strings.Split(statis.MonthlyAIDs, ","))
  99. }
  100. if weeklyIncome, ok := weeklyMap[income.MID]; ok {
  101. updateUpIncome(weeklyIncome, income, weeklyAvCount, weeklyCmCount, weeklyBgmCount)
  102. } else {
  103. weeklyMap[income.MID] = addUpIncome(income, startWeeklyDate, weeklyAvCount, weeklyCmCount, weeklyBgmCount)
  104. }
  105. if weeklyIncome, ok := monthlyMap[income.MID]; ok {
  106. updateUpIncome(weeklyIncome, income, monthlyAvCount, monthlyCmCount, monthlyBgmCount)
  107. } else {
  108. monthlyMap[income.MID] = addUpIncome(income, startMonthlyDate, monthlyAvCount, monthlyCmCount, monthlyBgmCount)
  109. }
  110. }
  111. }
  112. func addUpIncome(daily *model.UpIncome, fixDate time.Time, avCount, cmCount, bgmCount int) *model.UpIncome {
  113. return &model.UpIncome{
  114. MID: daily.MID,
  115. AvCount: int64(avCount),
  116. PlayCount: daily.PlayCount,
  117. AvIncome: daily.AvIncome,
  118. AvBaseIncome: daily.AvBaseIncome,
  119. AvTax: daily.AvTax,
  120. AvTotalIncome: daily.AvTotalIncome,
  121. ColumnCount: int64(cmCount),
  122. ColumnIncome: daily.ColumnIncome,
  123. ColumnBaseIncome: daily.ColumnBaseIncome,
  124. ColumnTax: daily.ColumnTax,
  125. ColumnTotalIncome: daily.ColumnTotalIncome,
  126. BgmCount: int64(bgmCount),
  127. BgmIncome: daily.BgmIncome,
  128. BgmBaseIncome: daily.BgmBaseIncome,
  129. BgmTax: daily.BgmTax,
  130. BgmTotalIncome: daily.BgmTotalIncome,
  131. AudioIncome: daily.AudioIncome,
  132. TaxMoney: daily.TaxMoney,
  133. Income: daily.Income,
  134. BaseIncome: daily.BaseIncome,
  135. TotalIncome: daily.TotalIncome,
  136. Date: xtime.Time(fixDate.Unix()),
  137. DBState: _dbInsert,
  138. }
  139. }
  140. func updateUpIncome(origin, daily *model.UpIncome, avCount, cmCount, bgmCount int) {
  141. origin.AvCount = int64(avCount)
  142. origin.PlayCount += daily.PlayCount
  143. origin.AvIncome += daily.AvIncome
  144. origin.AvBaseIncome += daily.AvBaseIncome
  145. origin.AvTax += daily.AvTax
  146. origin.AvTotalIncome = daily.AvTotalIncome
  147. origin.ColumnCount = int64(cmCount)
  148. origin.ColumnIncome += daily.ColumnIncome
  149. origin.ColumnBaseIncome += daily.ColumnBaseIncome
  150. origin.ColumnTax += daily.ColumnTax
  151. origin.ColumnTotalIncome = daily.ColumnTotalIncome
  152. origin.BgmCount = int64(bgmCount)
  153. origin.BgmIncome += daily.BgmIncome
  154. origin.BgmBaseIncome += daily.BgmBaseIncome
  155. origin.BgmTax += daily.BgmTax
  156. origin.BgmTotalIncome = daily.BgmTotalIncome
  157. origin.AudioIncome += daily.AudioIncome
  158. origin.TaxMoney += daily.TaxMoney
  159. origin.Income += daily.Income
  160. origin.BaseIncome += daily.BaseIncome
  161. origin.TotalIncome = daily.TotalIncome
  162. origin.DBState = _dbUpdate
  163. }
  164. // UpIncomeDBStore insert up_income
  165. func (s *UpIncomeSvr) UpIncomeDBStore(c context.Context, weeklyMap, monthlyMap map[int64]*model.UpIncome) (err error) {
  166. err = s.UpIncomeDBStoreBatch(c, _upIncomeWeekly, weeklyMap)
  167. if err != nil {
  168. log.Error("s.UpIncomeDBStoreBatch up_income_weekly error(%v)", err)
  169. return
  170. }
  171. err = s.UpIncomeDBStoreBatch(c, _upIncomeMonthly, monthlyMap)
  172. if err != nil {
  173. log.Error("s.UpIncomeDBStoreBatch up_income_monthly error(%v)", err)
  174. return
  175. }
  176. return
  177. }
  178. // UpIncomeDBStoreBatch up income db batch store
  179. func (s *UpIncomeSvr) UpIncomeDBStoreBatch(c context.Context, table string, upIncomeMap map[int64]*model.UpIncome) error {
  180. insert, update := make([]*model.UpIncome, batchSize), make([]*model.UpIncome, batchSize)
  181. insertIndex, updateIndex := 0, 0
  182. for _, income := range upIncomeMap {
  183. if income.DBState == _dbInsert {
  184. insert[insertIndex] = income
  185. insertIndex++
  186. } else if income.DBState == _dbUpdate {
  187. update[updateIndex] = income
  188. updateIndex++
  189. }
  190. if insertIndex >= batchSize {
  191. _, err := s.upIncomeBatchInsert(c, table, insert[:insertIndex])
  192. if err != nil {
  193. log.Error("s.upIncomeBatchInsert error(%v)", err)
  194. return err
  195. }
  196. insertIndex = 0
  197. }
  198. if updateIndex >= batchSize {
  199. _, err := s.upIncomeBatchInsert(c, table, update[:updateIndex])
  200. if err != nil {
  201. log.Error("s.upIncomeBatchInsert error(%v)", err)
  202. return err
  203. }
  204. updateIndex = 0
  205. }
  206. }
  207. if insertIndex > 0 {
  208. _, err := s.upIncomeBatchInsert(c, table, insert[:insertIndex])
  209. if err != nil {
  210. log.Error("s.upIncomeBatchInsert error(%v)", err)
  211. return err
  212. }
  213. }
  214. if updateIndex > 0 {
  215. _, err := s.upIncomeBatchInsert(c, table, update[:updateIndex])
  216. if err != nil {
  217. log.Error("s.upIncomeBatchInsert error(%v)", err)
  218. return err
  219. }
  220. }
  221. return nil
  222. }
  223. func (s *UpIncomeSvr) upIncomeBatchInsert(c context.Context, table string, us []*model.UpIncome) (rows int64, err error) {
  224. var buf bytes.Buffer
  225. for _, u := range us {
  226. buf.WriteString("(")
  227. buf.WriteString(strconv.FormatInt(u.MID, 10))
  228. buf.WriteByte(',')
  229. buf.WriteString(strconv.FormatInt(u.AvCount, 10))
  230. buf.WriteByte(',')
  231. buf.WriteString(strconv.FormatInt(u.PlayCount, 10))
  232. buf.WriteByte(',')
  233. buf.WriteString(strconv.FormatInt(u.AvIncome, 10))
  234. buf.WriteByte(',')
  235. buf.WriteString(strconv.FormatInt(u.AudioIncome, 10))
  236. buf.WriteByte(',')
  237. buf.WriteString(strconv.FormatInt(u.ColumnCount, 10))
  238. buf.WriteByte(',')
  239. buf.WriteString(strconv.FormatInt(u.ColumnIncome, 10))
  240. buf.WriteByte(',')
  241. buf.WriteString(strconv.FormatInt(u.BgmCount, 10))
  242. buf.WriteByte(',')
  243. buf.WriteString(strconv.FormatInt(u.BgmIncome, 10))
  244. buf.WriteByte(',')
  245. buf.WriteString(strconv.FormatInt(u.TaxMoney, 10))
  246. buf.WriteByte(',')
  247. buf.WriteString(strconv.FormatInt(u.Income, 10))
  248. buf.WriteByte(',')
  249. buf.WriteString(strconv.FormatInt(u.TotalIncome, 10))
  250. buf.WriteByte(',')
  251. buf.WriteString(strconv.FormatInt(u.AvBaseIncome, 10))
  252. buf.WriteByte(',')
  253. buf.WriteString(strconv.FormatInt(u.AvTax, 10))
  254. buf.WriteByte(',')
  255. buf.WriteString(strconv.FormatInt(u.ColumnBaseIncome, 10))
  256. buf.WriteByte(',')
  257. buf.WriteString(strconv.FormatInt(u.ColumnTax, 10))
  258. buf.WriteByte(',')
  259. buf.WriteString(strconv.FormatInt(u.BgmBaseIncome, 10))
  260. buf.WriteByte(',')
  261. buf.WriteString(strconv.FormatInt(u.BgmTax, 10))
  262. buf.WriteByte(',')
  263. buf.WriteString("'" + u.Date.Time().Format(_layout) + "'")
  264. buf.WriteByte(',')
  265. buf.WriteString(strconv.FormatInt(u.BaseIncome, 10))
  266. buf.WriteByte(',')
  267. buf.WriteString(strconv.FormatInt(u.AvTotalIncome, 10))
  268. buf.WriteByte(',')
  269. buf.WriteString(strconv.FormatInt(u.ColumnTotalIncome, 10))
  270. buf.WriteByte(',')
  271. buf.WriteString(strconv.FormatInt(u.BgmTotalIncome, 10))
  272. buf.WriteString(")")
  273. buf.WriteByte(',')
  274. }
  275. if buf.Len() > 0 {
  276. buf.Truncate(buf.Len() - 1)
  277. }
  278. values := buf.String()
  279. buf.Reset()
  280. rows, err = s.dao.InsertUpIncomeTable(c, table, values)
  281. return
  282. }