service.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "go-common/app/job/main/creative/conf"
  7. "go-common/app/job/main/creative/dao/academy"
  8. "go-common/app/job/main/creative/dao/archive"
  9. "go-common/app/job/main/creative/dao/monitor"
  10. "go-common/app/job/main/creative/dao/newcomer"
  11. "go-common/app/job/main/creative/dao/weeklyhonor"
  12. "go-common/app/job/main/creative/model"
  13. "go-common/library/conf/env"
  14. "go-common/library/queue/databus"
  15. "go-common/library/xstr"
  16. "github.com/robfig/cron"
  17. )
  18. // Service is service.
  19. type Service struct {
  20. c *conf.Config
  21. //arc databus
  22. arcSub *databus.Databus
  23. arcNotifySub *databus.Databus
  24. upPub *databus.Databus
  25. // wait group
  26. wg sync.WaitGroup
  27. // monitor
  28. monitor *monitor.Dao
  29. arc *archive.Dao
  30. arcNotifyMo int64
  31. arcMo int64
  32. // chan for mids
  33. midsChan chan map[int64]int
  34. //aca
  35. aca *academy.Dao
  36. // honDao
  37. honDao *weeklyhonor.Dao
  38. //task databus
  39. newc *newcomer.Dao
  40. taskSub, shareSub, relationSub, statLikeSub *databus.Databus
  41. statShareSub, statCoinSub, statFavSub *databus.Databus
  42. statReplySub, statDMSub, statViewSub, newUpSub *databus.Databus
  43. taskSubQueue []chan *databus.Message
  44. shareSubQueue []chan *model.ShareMsg
  45. relationQueue []chan *model.Relation //用户关注队列
  46. followerQueue []chan *model.Stat //粉丝数队列
  47. newUpQueue []chan *model.Up //新投稿
  48. oldUpQueue []chan *model.Up //进阶任务视频投稿超过5个
  49. mobileUpQueue []chan *model.Up //手机投稿
  50. databusQueueLen int //消费databus 队列长度
  51. statViewQueueLen int //播放消费databus 队列长度
  52. statLikeQueueLen int //点赞消费databus 队列长度
  53. chanSize int //chan 缓冲长度
  54. //单个稿件计数
  55. statViewSubQueue []chan *model.StatView
  56. statLikeSubQueue []chan *model.StatLike
  57. statShareSubQueue []chan *model.StatShare
  58. statCoinSubQueue []chan *model.StatCoin
  59. statFavSubQueue []chan *model.StatFav
  60. statReplySubQueue []chan *model.StatReply
  61. statDMSubQueue []chan *model.StatDM
  62. //db
  63. taskQueue []chan []*model.UserTask
  64. TaskCache []*model.Task
  65. TaskMapCache map[int64]*model.Task
  66. GiftRewardCache map[int8][]*model.GiftReward //gift-reward
  67. //notify
  68. taskNotifyQueue []chan []int64
  69. rewardNotifyQueue []chan []int64
  70. testNotifyMids map[int64]struct{}
  71. }
  72. // New is go-common/app/service/videoup service implementation.
  73. func New(c *conf.Config) (s *Service) {
  74. s = &Service{
  75. c: c,
  76. arcSub: databus.New(c.ArcSub),
  77. arcNotifySub: databus.New(c.ArcNotifySub),
  78. upPub: databus.New(c.UpPub),
  79. monitor: monitor.New(c),
  80. midsChan: make(chan map[int64]int, c.ChanSize),
  81. arc: archive.New(c),
  82. aca: academy.New(c),
  83. honDao: weeklyhonor.New(c),
  84. //task
  85. newc: newcomer.New(c),
  86. taskQueue: make([]chan []*model.UserTask, c.Task.TableConsumeNum),
  87. TaskMapCache: make(map[int64]*model.Task),
  88. GiftRewardCache: make(map[int8][]*model.GiftReward),
  89. //notify
  90. taskNotifyQueue: make([]chan []int64, c.Task.TaskTableConsumeNum),
  91. rewardNotifyQueue: make([]chan []int64, c.Task.RewardTableConsumeNum),
  92. testNotifyMids: make(map[int64]struct{}),
  93. chanSize: c.Task.ChanSize,
  94. }
  95. s.newTaskDatabus()
  96. if c.Consume {
  97. s.wg.Add(1)
  98. go s.arcCanalConsume()
  99. s.wg.Add(1)
  100. go s.arcNotifyCanalConsume()
  101. go s.monitorConsume()
  102. }
  103. if c.HotSwitch {
  104. go s.FlushHot(model.BusinessForArchvie) //计算视频稿件的hot
  105. go s.FlushHot(model.BusinessForArticle) //计算专栏稿件的hot
  106. }
  107. s.taskConsume()
  108. return
  109. }
  110. // InitCron init cron
  111. func (s *Service) InitCron() {
  112. c := cron.New()
  113. if s.c.HonorSwitch {
  114. c.AddFunc(s.c.HonorFlushSpec, s.FlushHonor)
  115. c.AddFunc(s.c.HonorMSGSpec, s.SendMsg)
  116. }
  117. c.Start()
  118. }
  119. func (s *Service) newTaskDatabus() {
  120. s.databusQueueLen = s.c.Task.DatabusQueueLen
  121. s.statViewQueueLen = s.c.Task.StatViewQueueLen //播放
  122. s.statLikeQueueLen = s.c.Task.StatLikeQueueLen //点赞
  123. s.taskSub = databus.New(s.c.TaskSub)
  124. s.shareSub = databus.New(s.c.ShareSub) //分享自己的稿件
  125. s.relationSub = databus.New(s.c.RelationSub)
  126. s.newUpSub = databus.New(s.c.NewUpSub) //新投稿up主
  127. //单个稿件计数
  128. s.statViewSub = databus.New(s.c.StatViewSub)
  129. s.statLikeSub = databus.New(s.c.StatLikeSub)
  130. s.statShareSub = databus.New(s.c.StatShareSub) //计数分享
  131. s.statCoinSub = databus.New(s.c.StatCoinSub)
  132. s.statFavSub = databus.New(s.c.StatFavSub)
  133. s.statReplySub = databus.New(s.c.StatReplySub)
  134. s.statDMSub = databus.New(s.c.StatDMSub)
  135. s.taskSubQueue = make([]chan *databus.Message, s.databusQueueLen) //设置水印、观看创作学院视频、参加激励计划、开通粉丝勋章
  136. s.shareSubQueue = make([]chan *model.ShareMsg, s.databusQueueLen) //分享自己的稿件
  137. s.relationQueue = make([]chan *model.Relation, s.databusQueueLen) //用户关注队列
  138. s.followerQueue = make([]chan *model.Stat, s.databusQueueLen) //粉丝数队列
  139. s.newUpQueue = make([]chan *model.Up, s.databusQueueLen) //新投稿up主
  140. s.oldUpQueue = make([]chan *model.Up, s.databusQueueLen) //进阶任务视频投稿超过5个
  141. s.mobileUpQueue = make([]chan *model.Up, s.databusQueueLen) //进阶任务手机投稿
  142. //单个稿件计数
  143. s.statViewSubQueue = make([]chan *model.StatView, s.statViewQueueLen)
  144. s.statLikeSubQueue = make([]chan *model.StatLike, s.statLikeQueueLen)
  145. s.statShareSubQueue = make([]chan *model.StatShare, s.databusQueueLen)
  146. s.statCoinSubQueue = make([]chan *model.StatCoin, s.databusQueueLen)
  147. s.statFavSubQueue = make([]chan *model.StatFav, s.databusQueueLen)
  148. s.statReplySubQueue = make([]chan *model.StatReply, s.databusQueueLen)
  149. s.statDMSubQueue = make([]chan *model.StatDM, s.databusQueueLen)
  150. }
  151. // TaskClose close task sub.
  152. func (s *Service) TaskClose() {
  153. s.taskSub.Close() //水印、激励、观看创作学院、开通粉丝勋章
  154. s.shareSub.Close() //个人稿件分享
  155. s.relationSub.Close() //关注哔哩哔哩创组中心,新手和进阶粉丝数
  156. s.newUpSub.Close() //投第一个稿件
  157. //计数
  158. s.statViewSub.Close()
  159. s.statLikeSub.Close()
  160. s.statShareSub.Close()
  161. s.statCoinSub.Close()
  162. s.statFavSub.Close()
  163. s.statReplySub.Close()
  164. s.statDMSub.Close()
  165. }
  166. func (s *Service) taskConsume() {
  167. s.loadTasks() //定时缓存所有任务
  168. s.loadGiftRewards() //定时缓存所有奖励
  169. go s.loadProc()
  170. //非实时任务状态变更
  171. s.initTaskQueue()
  172. go s.commitTask()
  173. //过期任务通知
  174. if s.c.Task.SwitchMsgNotify {
  175. mids, _ := xstr.SplitInts(s.c.Task.TestNotifyMids)
  176. for _, mid := range mids {
  177. s.testNotifyMids[mid] = struct{}{} //test mids
  178. }
  179. s.initTaskNotifyQueue()
  180. go s.expireTaskNotify()
  181. //奖励领取通知
  182. s.initRewardNotifyQueue()
  183. go s.rewardReceiveNotify()
  184. }
  185. //实时任务状态变更
  186. if s.c.Task.SwitchHighQPS { //消息qps 较高的消费
  187. s.initStatViewQueue()
  188. s.initStatLikeQueue()
  189. s.wg.Add(2)
  190. go s.statView() //1
  191. go s.statLike() //2
  192. }
  193. if s.c.Task.SwitchDatabus { //消息qps 较少的消费
  194. s.wg.Add(9)
  195. s.initDatabusQueue()
  196. go s.task() //1
  197. go s.share() //2
  198. go s.relation() //3
  199. go s.statShare() //4
  200. go s.statCoin() //5
  201. go s.statFav() //6
  202. go s.statReply() //7
  203. go s.statDM() //8
  204. go s.newUp() //9
  205. }
  206. }
  207. // Ping service
  208. func (s *Service) Ping(c context.Context) (err error) {
  209. return
  210. }
  211. func (s *Service) monitorConsume() {
  212. if s.c.Env != env.DeployEnvProd {
  213. return
  214. }
  215. var arcNotifyMo, arcmo int64
  216. for {
  217. time.Sleep(time.Minute * 1)
  218. if s.arcNotifyMo-arcNotifyMo == 0 {
  219. s.monitor.Send(context.TODO(), s.c.Monitor.UserName, "creative-job did not consume within a minute, moni url"+s.c.Monitor.Moni)
  220. }
  221. arcNotifyMo = s.arcNotifyMo
  222. if s.arcMo-arcmo == 0 {
  223. s.monitor.Send(context.TODO(), s.c.Monitor.UserName, "creative-job did not consume within a minute, moni url"+s.c.Monitor.Moni)
  224. }
  225. arcmo = s.arcMo
  226. }
  227. }
  228. // Close sub.
  229. func (s *Service) Close() {
  230. s.arcSub.Close()
  231. s.arcNotifySub.Close()
  232. s.upPub.Close()
  233. close(s.midsChan)
  234. s.TaskClose() //task databus close
  235. s.wg.Wait()
  236. }