trend.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "strconv"
  7. "time"
  8. "go-common/app/job/main/up-rating/model"
  9. "go-common/library/log"
  10. "golang.org/x/sync/errgroup"
  11. )
  12. // CalTrend cal trend
  13. func (s *Service) CalTrend(c context.Context, date time.Time) (err error) {
  14. return s.calTrend(c, date)
  15. }
  16. func (s *Service) calTrend(c context.Context, date time.Time) (err error) {
  17. ds, err := s.getDiffs(c, date)
  18. if err != nil {
  19. return
  20. }
  21. var (
  22. magneticSec = sections(TotalType)
  23. creativeSec = sections(CreativeType)
  24. influenceSec = sections(InfluenceType)
  25. creditSec = sections(CreditType)
  26. g errgroup.Group
  27. // for concurrent write
  28. _ascMagneticCh = make(chan []*model.Diff, 5)
  29. _ascCreativeCh = make(chan []*model.Diff, 5)
  30. _ascInfluenceCh = make(chan []*model.Diff, 5)
  31. _ascCreditCh = make(chan []*model.Diff, 5)
  32. _descMagneticCh = make(chan []*model.Diff, 5)
  33. _descCreativeCh = make(chan []*model.Diff, 5)
  34. _descInfluenceCh = make(chan []*model.Diff, 5)
  35. _descCreditCh = make(chan []*model.Diff, 5)
  36. )
  37. // magnetic ascend
  38. g.Go(func() (err error) {
  39. defer close(_ascMagneticCh)
  40. mr := classify(ds, TotalType, "asc", magneticSec)
  41. push(_ascMagneticCh, mr)
  42. return
  43. })
  44. g.Go(func() (err error) {
  45. err = s.batchInsertDiffs(c, "asc", _ascMagneticCh)
  46. return
  47. })
  48. // creativity ascend
  49. g.Go(func() (err error) {
  50. defer close(_ascCreativeCh)
  51. cr := classify(ds, CreativeType, "asc", creativeSec)
  52. push(_ascCreativeCh, cr)
  53. return
  54. })
  55. g.Go(func() (err error) {
  56. err = s.batchInsertDiffs(c, "asc", _ascCreativeCh)
  57. return
  58. })
  59. // influence ascend
  60. g.Go(func() (err error) {
  61. defer close(_ascInfluenceCh)
  62. ir := classify(ds, InfluenceType, "asc", influenceSec)
  63. push(_ascInfluenceCh, ir)
  64. return
  65. })
  66. g.Go(func() (err error) {
  67. err = s.batchInsertDiffs(c, "asc", _ascInfluenceCh)
  68. return
  69. })
  70. // credit ascend
  71. g.Go(func() (err error) {
  72. defer close(_ascCreditCh)
  73. cr := classify(ds, CreditType, "asc", creditSec)
  74. push(_ascCreditCh, cr)
  75. return
  76. })
  77. g.Go(func() (err error) {
  78. err = s.batchInsertDiffs(c, "asc", _ascCreditCh)
  79. return
  80. })
  81. // magnetic descend
  82. g.Go(func() (err error) {
  83. defer close(_descMagneticCh)
  84. mr := classify(ds, TotalType, "desc", magneticSec)
  85. push(_descMagneticCh, mr)
  86. return
  87. })
  88. g.Go(func() (err error) {
  89. err = s.batchInsertDiffs(c, "desc", _descMagneticCh)
  90. return
  91. })
  92. // creativity descend
  93. g.Go(func() (err error) {
  94. defer close(_descCreativeCh)
  95. cr := classify(ds, CreativeType, "desc", creativeSec)
  96. push(_descCreativeCh, cr)
  97. return
  98. })
  99. g.Go(func() (err error) {
  100. err = s.batchInsertDiffs(c, "desc", _descCreativeCh)
  101. return
  102. })
  103. // influence descend
  104. g.Go(func() (err error) {
  105. defer close(_descInfluenceCh)
  106. ir := classify(ds, InfluenceType, "desc", influenceSec)
  107. push(_descInfluenceCh, ir)
  108. return
  109. })
  110. g.Go(func() (err error) {
  111. err = s.batchInsertDiffs(c, "desc", _descInfluenceCh)
  112. return
  113. })
  114. // credit descend
  115. g.Go(func() (err error) {
  116. defer close(_descCreditCh)
  117. cr := classify(ds, CreditType, "desc", creditSec)
  118. push(_descCreditCh, cr)
  119. return
  120. })
  121. g.Go(func() (err error) {
  122. err = s.batchInsertDiffs(c, "desc", _descCreditCh)
  123. return
  124. })
  125. if err = g.Wait(); err != nil {
  126. log.Error("g.Wait error(%v)", err)
  127. }
  128. return
  129. }
  130. func push(ch chan []*model.Diff, m map[int64]map[int]Heap) {
  131. for _, sm := range m {
  132. for _, h := range sm {
  133. ch <- h.Result()
  134. }
  135. }
  136. }
  137. func (s *Service) getDiffs(c context.Context, date time.Time) (ds map[int64][]*model.Diff, err error) {
  138. lastMonth := time.Date(date.Year(), date.Month()-1, 1, 0, 0, 0, 0, time.Local)
  139. last, err := s.getR(c, lastMonth)
  140. if err != nil {
  141. return
  142. }
  143. curMonth := time.Date(date.Year(), date.Month(), 1, 0, 0, 0, 0, time.Local)
  144. cur, err := s.getR(c, curMonth)
  145. if err != nil {
  146. return
  147. }
  148. ds = diff(last, cur)
  149. return
  150. }
  151. func sections(ctype int) (ss []*section) {
  152. switch ctype {
  153. case TotalType:
  154. ss = initSec(10, 600)
  155. case CreativeType, InfluenceType, CreditType:
  156. ss = initSec(10, 200)
  157. }
  158. return
  159. }
  160. type section struct {
  161. start int64
  162. end int64
  163. tips string
  164. }
  165. func initSec(c, total int64) (ss []*section) {
  166. size := total / c
  167. for index := int64(0); index < c; index++ {
  168. start := index * size
  169. end := start + size - 1
  170. if index == c-1 {
  171. end = total
  172. }
  173. sec := &section{
  174. start: start,
  175. end: end,
  176. tips: fmt.Sprintf("\"%d-%d\"", start, end),
  177. }
  178. ss = append(ss, sec)
  179. }
  180. return
  181. }
  182. func getSection(score int64, ss []*section) (index int, tips string) {
  183. for index, section := range ss {
  184. if score >= section.start && score <= section.end {
  185. return index, section.tips
  186. }
  187. }
  188. return
  189. }
  190. func getHeap(order string, ctype int) Heap {
  191. if order == "asc" {
  192. return &AscHeap{heap: make([]*model.Diff, 0), ctype: ctype}
  193. }
  194. return &DescHeap{heap: make([]*model.Diff, 0), ctype: ctype}
  195. }
  196. // map[tag_id]map[section][]diffs
  197. func classify(ds map[int64][]*model.Diff, ctype int, order string, ss []*section) (m map[int64]map[int]Heap) {
  198. m = make(map[int64]map[int]Heap)
  199. for tagID, diffs := range ds {
  200. for _, diff := range diffs {
  201. sec, tips := getSection(diff.GetScore(ctype), ss)
  202. // need clone
  203. _diff := clone(diff)
  204. _diff.Section = sec
  205. _diff.CType = ctype
  206. _diff.Tips = tips
  207. if sm, ok := m[tagID]; ok {
  208. if _, ok = sm[sec]; ok {
  209. sm[sec].Put(_diff)
  210. } else {
  211. h := getHeap(order, ctype)
  212. h.Put(_diff)
  213. sm[sec] = h
  214. }
  215. } else {
  216. h := getHeap(order, ctype)
  217. h.Put(_diff)
  218. sm := make(map[int]Heap)
  219. sm[sec] = h
  220. m[tagID] = sm
  221. }
  222. }
  223. }
  224. return
  225. }
  226. func clone(a *model.Diff) *model.Diff {
  227. return &model.Diff{
  228. MID: a.MID,
  229. MagneticScore: a.MagneticScore,
  230. CreativityScore: a.CreativityScore,
  231. InfluenceScore: a.InfluenceScore,
  232. CreditScore: a.CreditScore,
  233. MagneticDiff: a.MagneticDiff,
  234. CreativityDiff: a.CreativityDiff,
  235. InfluenceDiff: a.InfluenceDiff,
  236. CreditDiff: a.CreditDiff,
  237. TotalAvs: a.TotalAvs,
  238. Fans: a.Fans,
  239. TagID: a.TagID,
  240. Date: a.Date,
  241. }
  242. }
  243. // ds map[tag_id][]*model.Diff
  244. func diff(last map[int64]*model.Rating, cur map[int64]*model.Rating) (ds map[int64][]*model.Diff) {
  245. ds = make(map[int64][]*model.Diff)
  246. for mid, r := range cur {
  247. if _, ok := last[mid]; !ok {
  248. continue
  249. }
  250. lr := last[mid]
  251. diff := &model.Diff{
  252. MID: mid,
  253. TagID: r.TagID,
  254. MagneticScore: r.MagneticScore,
  255. CreativityScore: r.CreativityScore,
  256. InfluenceScore: r.InfluenceScore,
  257. CreditScore: r.CreditScore,
  258. MagneticDiff: int(r.MagneticScore - lr.MagneticScore),
  259. CreativityDiff: int(r.CreativityScore - lr.CreativityScore),
  260. InfluenceDiff: int(r.InfluenceScore - lr.InfluenceScore),
  261. CreditDiff: int(r.CreditScore - lr.CreditScore),
  262. Date: r.Date,
  263. }
  264. if _, ok := ds[diff.TagID]; ok {
  265. ds[diff.TagID] = append(ds[diff.TagID], diff)
  266. } else {
  267. ds[diff.TagID] = []*model.Diff{diff}
  268. }
  269. }
  270. return
  271. }
  272. // GetR m[mid]*model.Rating
  273. func (s *Service) getR(c context.Context, date time.Time) (m map[int64]*model.Rating, err error) {
  274. var (
  275. g errgroup.Group
  276. routines = 5 // 4 core + 1
  277. ch = make(chan []*model.Rating, routines)
  278. )
  279. m = make(map[int64]*model.Rating)
  280. offset, end, total, err := s.RatingOffEnd(c, date)
  281. if err != nil {
  282. return
  283. }
  284. if total == 0 {
  285. return
  286. }
  287. t := end - offset
  288. section := (t - t%routines) / routines
  289. for i := 0; i < routines; i++ {
  290. begin := section*i + offset
  291. over := begin + section
  292. if i == routines-1 {
  293. over = end
  294. }
  295. g.Go(func() (err error) {
  296. err = s.Ratings(c, date, begin, over, ch)
  297. if err != nil {
  298. log.Error("get rating infos error(%v)", err)
  299. }
  300. return
  301. })
  302. }
  303. g.Go(func() (err error) {
  304. Loop:
  305. for rs := range ch {
  306. for _, r := range rs {
  307. m[r.MID] = r
  308. }
  309. if len(m) == total {
  310. break Loop
  311. }
  312. }
  313. return
  314. })
  315. if err = g.Wait(); err != nil {
  316. log.Error("get rating wait error(%v)", err)
  317. }
  318. return
  319. }
  320. // BatchInsertDiffs batch insert diffs
  321. func (s *Service) batchInsertDiffs(c context.Context, table string, wch chan []*model.Diff) (err error) {
  322. var (
  323. buff = make([]*model.Diff, _limit)
  324. buffEnd = 0
  325. )
  326. for ds := range wch {
  327. for _, d := range ds {
  328. buff[buffEnd] = d
  329. buffEnd++
  330. if buffEnd >= _limit {
  331. values := diffValues(buff[:buffEnd])
  332. buffEnd = 0
  333. _, err = s.dao.InsertTrend(c, table, values)
  334. if err != nil {
  335. return
  336. }
  337. }
  338. }
  339. if buffEnd > 0 {
  340. values := diffValues(buff[:buffEnd])
  341. buffEnd = 0
  342. _, err = s.dao.InsertTrend(c, table, values)
  343. }
  344. }
  345. return
  346. }
  347. func diffValues(ds []*model.Diff) (values string) {
  348. var buf bytes.Buffer
  349. for _, r := range ds {
  350. buf.WriteString("(")
  351. buf.WriteString(strconv.FormatInt(r.MID, 10))
  352. buf.WriteByte(',')
  353. buf.WriteString(strconv.FormatInt(r.TagID, 10))
  354. buf.WriteByte(',')
  355. buf.WriteString(strconv.FormatInt(r.CreativityScore, 10))
  356. buf.WriteByte(',')
  357. buf.WriteString(strconv.Itoa(r.CreativityDiff))
  358. buf.WriteByte(',')
  359. buf.WriteString(strconv.FormatInt(r.InfluenceScore, 10))
  360. buf.WriteByte(',')
  361. buf.WriteString(strconv.Itoa(r.InfluenceDiff))
  362. buf.WriteByte(',')
  363. buf.WriteString(strconv.FormatInt(r.CreditScore, 10))
  364. buf.WriteByte(',')
  365. buf.WriteString(strconv.Itoa(r.CreditDiff))
  366. buf.WriteByte(',')
  367. buf.WriteString(strconv.FormatInt(r.MagneticScore, 10))
  368. buf.WriteByte(',')
  369. buf.WriteString(strconv.Itoa(r.MagneticDiff))
  370. buf.WriteByte(',')
  371. buf.WriteString(fmt.Sprintf("'%s'", r.Date.Time().Format(_layout)))
  372. buf.WriteByte(',')
  373. buf.WriteString(strconv.Itoa(r.CType))
  374. buf.WriteByte(',')
  375. buf.WriteString(strconv.Itoa(r.Section))
  376. buf.WriteByte(',')
  377. buf.WriteString(fmt.Sprintf("'%s'", r.Tips))
  378. buf.WriteString(")")
  379. buf.WriteByte(',')
  380. }
  381. if buf.Len() > 0 {
  382. buf.Truncate(buf.Len() - 1)
  383. }
  384. values = buf.String()
  385. buf.Reset()
  386. return
  387. }
  388. // DelTrends del trends
  389. func (s *Service) DelTrends(c context.Context, table string) (err error) {
  390. for {
  391. var rows int64
  392. rows, err = s.dao.DelTrend(c, table, _limit)
  393. if err != nil {
  394. return
  395. }
  396. if rows == 0 {
  397. break
  398. }
  399. }
  400. return
  401. }