creative_activity.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "go-common/app/job/main/growup/model"
  11. "go-common/library/log"
  12. xtime "go-common/library/time"
  13. "go-common/library/xstr"
  14. )
  15. const (
  16. _like = "1"
  17. _share = "2"
  18. _play = "3"
  19. _reply = "4"
  20. _dm = "5"
  21. )
  22. // CreativeActivity creative activity job
  23. func (s *Service) CreativeActivity(c context.Context, date time.Time) (err error) {
  24. activities, err := s.dao.GetCActivities(c)
  25. if err != nil {
  26. log.Error("s.dao.GetCActivities error(%v)", err)
  27. return
  28. }
  29. // get signed up
  30. ups, err := s.signed(c, int64(_dbLimit))
  31. if err != nil {
  32. log.Error("s.signed error(%v)", err)
  33. return
  34. }
  35. now := xtime.Time(date.Unix())
  36. // calculate activity within the statistical period
  37. for _, ac := range activities {
  38. if now >= ac.StatisticsStart && now <= ac.StatisticsEnd {
  39. log.Info("calculate ac: %d", ac.ID)
  40. err = s.handleActivity(c, ac, ups, date)
  41. if err != nil {
  42. log.Error("s.handleActivity error(%v)", err)
  43. return
  44. }
  45. }
  46. }
  47. return
  48. }
  49. func (s *Service) handleActivity(c context.Context, ac *model.CActivity, upInfo map[int64]*model.UpInfoVideo, date time.Time) (err error) {
  50. // 获取已报名并且在签约时间内的up主,
  51. signedUps, err := s.getSignedUps(c, ac, upInfo)
  52. if err != nil {
  53. log.Error("s.getSignedUps error(%v)", err)
  54. return
  55. }
  56. // 获取活动稿件点赞、分享、播放、评论、弹幕
  57. archiveInfo, err := s.getArchiveInfo(c, ac.ID)
  58. if err != nil {
  59. log.Error("s.getArchiveInfo error(%v)", err)
  60. return
  61. }
  62. if len(archiveInfo) == 0 {
  63. log.Info("activity(%d) get 0 archiveInfo", ac.ID)
  64. return
  65. }
  66. log.Info("get %d archiveInfo", len(archiveInfo))
  67. fmt.Printf("get %d archiveInfo\n", len(archiveInfo))
  68. // 获取在投稿时间内的稿件
  69. avs, err := s.getAvsUpload(c, signedUps)
  70. if err != nil {
  71. log.Error("s.getAvsByMID error(%v)", err)
  72. return
  73. }
  74. if len(avs) == 0 {
  75. log.Info("activity(%d) get 0 av", ac.ID)
  76. return
  77. }
  78. log.Info("get %d archives", len(avs))
  79. fmt.Printf("get %d archives\n", len(avs))
  80. // 筛选在签约时间内的稿件
  81. handleActivityAvs(signedUps, avs, ac)
  82. for _, u := range signedUps {
  83. // 获取up主对应稿件要求的和
  84. err = s.getUpStatisData(c, u, ac, date, archiveInfo)
  85. if err != nil {
  86. log.Error("s.getUpData error(%v)", err)
  87. return
  88. }
  89. }
  90. // 计算中奖up主
  91. upBonus, err := s.calUpWin(c, signedUps, ac)
  92. if err != nil {
  93. log.Error("s.calUpBonus error(%v)", err)
  94. return
  95. }
  96. err = s.updateUpActivity(c, ac.ID, upBonus)
  97. if err != nil {
  98. log.Error("s.updateUpActivity error(%v)", err)
  99. }
  100. return
  101. }
  102. func (s *Service) calUpWin(c context.Context, ups map[int64]*model.UpActivity, ac *model.CActivity) (bonusUp []*model.UpActivity, err error) {
  103. activityBonus, err := s.dao.GetActivityBonus(c, ac.ID)
  104. if err != nil {
  105. log.Error("s.dao.GetActivityBonus error(%v)", err)
  106. return
  107. }
  108. bonusUp = make([]*model.UpActivity, 0)
  109. var (
  110. upList = make([]*model.UpActivity, len(ups))
  111. index int
  112. )
  113. for _, up := range ups {
  114. upList[index] = up
  115. index++
  116. }
  117. sort.Slice(upList, func(i, j int) bool {
  118. return upList[i].ItemVal > upList[j].ItemVal
  119. })
  120. // 1:达标型 2:排序型
  121. if ac.WinType == 1 {
  122. for _, up := range upList {
  123. // uplist经过ItemVal,所以如果有ItemVal<RequireValue,可以直接break
  124. if up.ItemVal < ac.RequireValue {
  125. break
  126. }
  127. bonusUp = append(bonusUp, up)
  128. }
  129. } else if ac.WinType == 2 {
  130. // 去掉item为0的up
  131. for i := 0; i < len(upList); i++ {
  132. if upList[i].ItemVal == 0 {
  133. upList = upList[:i]
  134. break
  135. }
  136. }
  137. if len(upList) > int(ac.RequireValue) {
  138. upList = upList[:int(ac.RequireValue)]
  139. }
  140. bonusUp = upList
  141. }
  142. if len(bonusUp) == 0 {
  143. return
  144. }
  145. // 计算up主奖金
  146. calUpBonus(bonusUp, activityBonus, ac.BonusType, ac.WinType)
  147. return
  148. }
  149. // cal ups bonus
  150. func calUpBonus(bonusUp []*model.UpActivity, activityBonus map[int64]int64, bonusType, winType int) (err error) {
  151. // 中奖类型 1:达标型 2:排序型
  152. // 奖金类型 1:平分 2:各得
  153. if winType == 1 {
  154. money, ok := activityBonus[0]
  155. if !ok {
  156. err = fmt.Errorf("活动奖金设置错误:达标型未设置金额")
  157. return
  158. }
  159. if bonusType == 1 {
  160. money = money / int64(len(bonusUp))
  161. }
  162. for _, up := range bonusUp {
  163. up.Bonus = money
  164. up.Rank = 0
  165. if up.State < 2 {
  166. // 中奖
  167. up.State = 2
  168. up.SuccessTime = xtime.Time(time.Now().Unix())
  169. }
  170. }
  171. } else if winType == 2 {
  172. other := len(activityBonus)
  173. otherMoney, ok := activityBonus[int64(other)]
  174. if !ok {
  175. err = fmt.Errorf("活动奖金设置错误:排序型没有其他金额")
  176. return
  177. }
  178. for i := 0; i < len(bonusUp); i++ {
  179. var (
  180. rank = i + 1
  181. money int64
  182. ok bool
  183. )
  184. if rank >= other {
  185. money = otherMoney
  186. } else {
  187. money, ok = activityBonus[int64(rank)]
  188. if !ok {
  189. err = fmt.Errorf("活动奖金设置错误:没有名次金额")
  190. return
  191. }
  192. }
  193. if money == 0 {
  194. continue
  195. }
  196. bonusUp[i].Bonus = money
  197. bonusUp[i].Rank = i + 1
  198. if bonusUp[i].State < 2 {
  199. // 中奖
  200. bonusUp[i].State = 2
  201. bonusUp[i].SuccessTime = xtime.Time(time.Now().Unix())
  202. }
  203. }
  204. }
  205. return
  206. }
  207. func (s *Service) getUpStatisData(c context.Context, up *model.UpActivity, ac *model.CActivity, date time.Time, archiveInfo map[int64]map[int]*model.ArchiveStat) (err error) {
  208. if len(up.AIDs) == 0 {
  209. return
  210. }
  211. avItem := make([]*model.AvItem, 0)
  212. aIDs := up.AIDs
  213. for _, avID := range aIDs {
  214. // 稿件统计数据: 计算当天的和 - 统计开始前一天的和
  215. var itemSumStart, itemSumEnd int64
  216. itemSumEnd, err = s.getAvStatisState(c, avID, ac, archiveInfo, 2)
  217. if err != nil {
  218. log.Error("s.getAvStatisState error(%v)", err)
  219. return
  220. }
  221. itemSumStart, err = s.getAvStatisState(c, avID, ac, archiveInfo, 1)
  222. if err != nil {
  223. log.Error("s.getAvStatisState error(%v)", err)
  224. return
  225. }
  226. itemSum := itemSumEnd - itemSumStart
  227. if itemSum > 0 {
  228. avItem = append(avItem, &model.AvItem{
  229. AvID: avID,
  230. Value: itemSum,
  231. })
  232. }
  233. }
  234. if len(avItem) == 0 {
  235. return
  236. }
  237. sort.Slice(avItem, func(i, j int) bool {
  238. return avItem[i].Value > avItem[j].Value
  239. })
  240. // 1:uid 2 avid
  241. if ac.Object == 1 {
  242. up.AIDs = make([]int64, 0)
  243. for _, av := range avItem {
  244. if len(up.AIDs) < 5 {
  245. up.AIDs = append(up.AIDs, av.AvID)
  246. }
  247. up.ItemVal += av.Value
  248. }
  249. up.AIDNum = int64(len(avItem))
  250. } else if ac.Object == 2 {
  251. up.AIDs = []int64{avItem[0].AvID}
  252. up.ItemVal = avItem[0].Value
  253. up.AIDNum = 1
  254. }
  255. return
  256. }
  257. func (s *Service) getAvStatisState(c context.Context, avID int64, ac *model.CActivity, archiveInfo map[int64]map[int]*model.ArchiveStat, state int) (sum int64, err error) {
  258. if _, ok := archiveInfo[avID]; !ok {
  259. return
  260. }
  261. stat, ok := archiveInfo[avID][state]
  262. if !ok {
  263. return
  264. }
  265. requireItems := strings.Split(ac.RequireItems, ",")
  266. for _, item := range requireItems {
  267. switch item {
  268. case _like:
  269. sum += stat.Like
  270. case _share:
  271. sum += stat.Share
  272. case _play:
  273. sum += stat.Play
  274. case _reply:
  275. sum += stat.Reply
  276. case _dm:
  277. sum += stat.Dm
  278. }
  279. }
  280. return
  281. }
  282. func handleActivityAvs(ups map[int64]*model.UpActivity, avs []*model.AvUpload, ac *model.CActivity) {
  283. for _, av := range avs {
  284. if !(av.UploadTime >= ac.UploadStart && av.UploadTime <= ac.UploadEnd) {
  285. continue
  286. }
  287. if _, ok := ups[av.MID]; ok {
  288. if len(ups[av.MID].AIDs) == 0 {
  289. ups[av.MID].AIDs = make([]int64, 0)
  290. }
  291. ups[av.MID].AIDs = append(ups[av.MID].AIDs, av.AvID)
  292. }
  293. }
  294. }
  295. func (s *Service) getAvsUpload(c context.Context, ups map[int64]*model.UpActivity) (avs []*model.AvUpload, err error) {
  296. avs = make([]*model.AvUpload, 0)
  297. var id int64
  298. for {
  299. var av []*model.AvUpload
  300. av, err = s.dao.GetAvUploadByMID(c, id, _dbLimit)
  301. if err != nil {
  302. return
  303. }
  304. avs = append(avs, av...)
  305. if len(av) < _dbLimit {
  306. break
  307. }
  308. id = av[len(av)-1].ID
  309. }
  310. return
  311. }
  312. func (s *Service) getArchiveInfo(c context.Context, activityID int64) (archives map[int64]map[int]*model.ArchiveStat, err error) {
  313. archives = make(map[int64]map[int]*model.ArchiveStat) // map[av_id][state]*model.ArchiveStat
  314. var id int64
  315. for {
  316. var arch []*model.ArchiveStat
  317. arch, err = s.dao.GetArchiveInfo(c, activityID, id, _dbLimit)
  318. if err != nil {
  319. return
  320. }
  321. for _, a := range arch {
  322. if _, ok := archives[a.AvID]; !ok {
  323. archives[a.AvID] = make(map[int]*model.ArchiveStat)
  324. }
  325. archives[a.AvID][a.State] = a
  326. }
  327. if len(arch) < _dbLimit {
  328. break
  329. }
  330. id = arch[len(arch)-1].ID
  331. }
  332. return
  333. }
  334. func (s *Service) getSignedUps(c context.Context, ac *model.CActivity, upInfo map[int64]*model.UpInfoVideo) (signedUps map[int64]*model.UpActivity, err error) {
  335. signedUps = make(map[int64]*model.UpActivity)
  336. // if need sign up
  337. if ac.SignUp == 1 {
  338. var ups []*model.UpActivity
  339. ups, err = s.dao.ListUpActivity(c, ac.ID)
  340. if err != nil {
  341. return
  342. }
  343. for _, up := range ups {
  344. if info, ok := upInfo[up.MID]; ok && info.SignedAt >= ac.SignedStart && info.SignedAt <= ac.SignedEnd {
  345. signedUps[up.MID] = up
  346. }
  347. }
  348. } else {
  349. for _, info := range upInfo {
  350. if info.SignedAt >= ac.SignedStart && info.SignedAt <= ac.SignedEnd {
  351. signedUps[info.MID] = &model.UpActivity{MID: info.MID, ActivityID: ac.ID, State: 1, Nickname: info.Nickname}
  352. }
  353. }
  354. }
  355. return
  356. }
  357. func (s *Service) updateUpActivity(c context.Context, id int64, upBonus []*model.UpActivity) (err error) {
  358. // 更新所有之前获奖up主为已报名
  359. // update state 2->1 将所有状态设置为已报名
  360. _, err = s.dao.UpdateUpActivityState(c, id, 2, 1)
  361. if err != nil {
  362. log.Error("s.dao.UpdateUpActivityState error(%v)", err)
  363. return
  364. }
  365. if len(upBonus) > 0 {
  366. _, err = s.insertUpActivityBatch(c, upBonus)
  367. if err != nil {
  368. log.Error("s.insertUpActivity error(%v)", err)
  369. }
  370. }
  371. return
  372. }
  373. func (s *Service) insertUpActivityBatch(c context.Context, ups []*model.UpActivity) (rows int64, err error) {
  374. insert := make([]*model.UpActivity, _dbBatchSize)
  375. insertIndex := 0
  376. for _, up := range ups {
  377. insert[insertIndex] = up
  378. insertIndex++
  379. if insertIndex >= _dbBatchSize {
  380. _, err = s.insertUpActivity(c, insert[:insertIndex])
  381. if err != nil {
  382. return
  383. }
  384. insertIndex = 0
  385. }
  386. }
  387. if insertIndex > 0 {
  388. _, err = s.insertUpActivity(c, insert[:insertIndex])
  389. }
  390. return
  391. }
  392. func assembleUpActivity(ups []*model.UpActivity) (vals string) {
  393. var buf bytes.Buffer
  394. for _, row := range ups {
  395. buf.WriteString("(")
  396. buf.WriteString(strconv.FormatInt(row.MID, 10))
  397. buf.WriteByte(',')
  398. buf.WriteString("\"" + row.Nickname + "\"")
  399. buf.WriteByte(',')
  400. buf.WriteString(strconv.FormatInt(row.ActivityID, 10))
  401. buf.WriteByte(',')
  402. buf.WriteString("'" + xstr.JoinInts(row.AIDs) + "'")
  403. buf.WriteByte(',')
  404. buf.WriteString(strconv.FormatInt(row.AIDNum, 10))
  405. buf.WriteByte(',')
  406. buf.WriteString(strconv.Itoa(row.Rank))
  407. buf.WriteByte(',')
  408. buf.WriteString(strconv.FormatInt(row.Bonus, 10))
  409. buf.WriteByte(',')
  410. buf.WriteString(strconv.Itoa(row.State))
  411. buf.WriteByte(',')
  412. buf.WriteString("'" + row.SuccessTime.Time().Format("2006-01-02 15:04:05") + "'")
  413. buf.WriteString(")")
  414. buf.WriteByte(',')
  415. }
  416. if buf.Len() > 0 {
  417. buf.Truncate(buf.Len() - 1)
  418. }
  419. vals = buf.String()
  420. buf.Reset()
  421. return
  422. }
  423. func (s *Service) insertUpActivity(c context.Context, ups []*model.UpActivity) (rows int64, err error) {
  424. vals := assembleUpActivity(ups)
  425. rows, err = s.dao.InsertUpActivityBatch(c, vals)
  426. return
  427. }