task_weight.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. package service
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/job/main/videoup-report/model/archive"
  6. "go-common/app/job/main/videoup-report/model/task"
  7. "go-common/app/job/main/videoup-report/model/utils"
  8. "go-common/library/log"
  9. )
  10. const (
  11. _pm = int64(3) //period minute
  12. //普通任务参数
  13. _nc1 = int64(3) // 等待时长9分钟
  14. _nc2 = int64(5) // 等待时长15分钟
  15. _nc3 = int64(9) // 等待时长27分钟
  16. _nc4 = int64(15) // 等待时长45分钟
  17. //定时任务参数
  18. _tc1 = int64(80) // 距离发布4小时
  19. _tc2 = int64(40) // 距离发布2小时
  20. _tc3 = int64(20) // 距离发布1小时
  21. )
  22. func (s *Service) upTaskWeightCache() (err error) {
  23. var (
  24. ids []int64
  25. firstid int64
  26. lastid int64
  27. tasksMap map[int64]*task.WeightParams
  28. )
  29. c := context.TODO()
  30. s.jumplist.Reset()
  31. defer func() {
  32. s.lastjumpMap = make(map[int64]struct{})
  33. for item := s.jumplist.POP(); item != nil; item = s.jumplist.POP() {
  34. select {
  35. case s.jumpchan <- item:
  36. s.lastjumpMap[item.TaskID] = struct{}{}
  37. default:
  38. log.Warn("jumpchan full")
  39. }
  40. }
  41. }()
  42. for {
  43. cacheMap := make(map[int64]*task.WeightParams)
  44. // 先从数据库批量读取出id来
  45. if ids, lastid, err = s.arc.TaskIDforWeight(c, firstid); len(ids) == 0 {
  46. if err != nil {
  47. log.Error("s.task.TaskIDforWeight(%d) error(%v)", firstid, err)
  48. }
  49. return
  50. }
  51. // 先从redis取权重配置,redis挂了从数据库读取权重配置
  52. if tasksMap, err = s.redis.GetWeight(c, ids); len(tasksMap) != len(ids) {
  53. log.Warn("GetTaskWeight from redis need len(%d) while len(%d)", len(ids), len(tasksMap))
  54. if tasksMap, err = s.arc.GetTaskWeight(c, firstid); err != nil {
  55. log.Error("s.arc.GetTaskWeight(%d) error(%v)", firstid, err)
  56. return
  57. }
  58. }
  59. for id, tp := range tasksMap {
  60. if tp.State == 0 {
  61. s.setTask(c, id, tp, cacheMap)
  62. }
  63. }
  64. if len(cacheMap) > 0 {
  65. s.redis.SetWeight(c, cacheMap)
  66. }
  67. firstid = lastid
  68. time.Sleep(time.Second)
  69. }
  70. }
  71. func (s *Service) upTaskWeightDB(c context.Context, item *task.WeightLog) (err error) {
  72. // 超时还没入库的直接丢掉,免得阻塞
  73. if time.Since(item.Uptime.TimeValue()).Minutes() > 3.0 {
  74. log.Warn("task weight item(%v) expired! ", item)
  75. return
  76. }
  77. _, err = s.arc.UpTaskWeight(c, item.TaskID, item.Weight)
  78. if err != nil {
  79. log.Error("UpTaskWeight(%d,%d) error(%v)", item.Weight, item.TaskID, err)
  80. return
  81. }
  82. return
  83. }
  84. func (s *Service) setTask(c context.Context, taskid int64, tp *task.WeightParams, cm map[int64]*task.WeightParams) {
  85. tpc := s.newPriority(c, tp)
  86. // 上轮次权重前2000的这轮也必须更新
  87. if _, ok := s.lastjumpMap[tpc.TaskID]; ok {
  88. select {
  89. case s.jumpchan <- tpc:
  90. default:
  91. log.Warn("jumpchan full")
  92. }
  93. } else {
  94. s.jumplist.PUSH(tpc) //插队序列
  95. }
  96. tp.Weight = tpc.Weight
  97. if cm != nil {
  98. cm[taskid] = tp
  99. }
  100. select {
  101. case s.tasklogchan <- tpc:
  102. default:
  103. log.Info("s.tasklogchan full(%d)", len(s.tasklogchan))
  104. }
  105. }
  106. // 任务权重计算
  107. func (s *Service) newPriority(c context.Context, tp *task.WeightParams) (tpc *task.WeightLog) {
  108. var round int64
  109. var wcf = task.WLVConf
  110. tpc = &task.WeightLog{
  111. TaskID: tp.TaskID,
  112. Mid: tp.Mid,
  113. }
  114. if tp.AccFailed {
  115. if tp.Fans, tp.AccFailed = s.getUpperFans(c, tp.Mid); !tp.AccFailed {
  116. tp.Special = s.getUpSpecial(tp.Mid, tp.Fans)
  117. s.arc.SetUpSpecial(c, tp.TaskID, tp.Special)
  118. }
  119. }
  120. round = int64(time.Since(tp.Ctime.TimeValue()).Minutes()) / _pm
  121. //定时任务
  122. if !tp.Ptime.TimeValue().IsZero() {
  123. pround := int64((time.Until(tp.Ptime.TimeValue()).Minutes())) / 3
  124. sumtesmp := wcf.Tlv4 * (int64(tp.Ptime.TimeValue().Sub(tp.Ctime.TimeValue()).Minutes()) - 60*4) / 3
  125. switch {
  126. case pround >= _tc1:
  127. tpc.TWeight = round * wcf.Tlv4
  128. case _tc2 <= pround && pround < _tc1:
  129. tpc.TWeight = (_tc1-pround)*wcf.Tlv1 + sumtesmp
  130. case _tc3 <= pround && pround < _tc2:
  131. tpc.TWeight = (_tc2-pround)*wcf.Tlv2 + wcf.Tsum2h + sumtesmp
  132. case pround < _tc3:
  133. tpc.TWeight = (_tc3-pround)*wcf.Tlv3 + wcf.Tsum1h + sumtesmp
  134. }
  135. tpc.NWeight = 0
  136. } else { // 普通任务加权
  137. switch {
  138. case round < _nc1:
  139. tpc.NWeight = wcf.Nlv5 * round
  140. case _nc1 <= round && round < _nc2:
  141. tpc.NWeight = wcf.Nlv1*(round-_nc1) + wcf.Nsum9
  142. case _nc2 <= round && round < _nc3:
  143. tpc.NWeight = wcf.Nlv2*(round-_nc2) + wcf.Nsum15
  144. case _nc3 <= round && round < _nc4:
  145. tpc.NWeight = wcf.Nlv3*(round-_nc3) + wcf.Nsum27
  146. default:
  147. tpc.NWeight = wcf.Nlv4*(round-_nc4) + wcf.Nsum45
  148. }
  149. tpc.TWeight = 0
  150. }
  151. // 特殊任务加权
  152. switch tp.Special {
  153. case task.UpperBigNormal:
  154. tpc.SWeight = wcf.Slv1 * round
  155. case task.UpperSuperNormal:
  156. tpc.SWeight = wcf.Slv2 * round
  157. case task.UpperWhite:
  158. tpc.SWeight = wcf.Slv3 * round
  159. case task.UpperBigWhite:
  160. tpc.SWeight = wcf.Slv4 * round
  161. case task.UpperSuperWhite:
  162. tpc.SWeight = wcf.Slv5 * round
  163. case task.UpperSuperBlack:
  164. tpc.SWeight = wcf.Slv6 * round
  165. case task.UpperBlack:
  166. tpc.SWeight = wcf.Slv7 * round
  167. default:
  168. tpc.SWeight = 0
  169. }
  170. //配置任务加权
  171. tpc.CWeight = 0
  172. if len(tp.CfItems) > 0 {
  173. tpc.CfItems = tp.CfItems
  174. for _, item := range tp.CfItems {
  175. if item.Rule == 0 {
  176. round2 := int64(time.Since(item.Mtime.TimeValue()).Minutes()) / _pm
  177. if round2 < round {
  178. round = round2
  179. }
  180. tpc.CWeight += round * item.Weight
  181. } else {
  182. tpc.CWeight += item.Weight
  183. }
  184. }
  185. if tpc.CWeight <= wcf.MinWeight {
  186. tpc.CWeight = wcf.MinWeight
  187. }
  188. }
  189. tpc.Weight = tpc.NWeight + tpc.SWeight + tpc.TWeight + tpc.CWeight
  190. if tpc.Weight >= wcf.MaxWeight {
  191. tpc.Weight = wcf.MaxWeight
  192. }
  193. tpc.Uptime = utils.NewFormatTime(time.Now())
  194. return
  195. }
  196. // 设置特殊用户的审核任务
  197. func (s *Service) setTaskUPSpecial(c context.Context, t *task.Task, mid int64) (fans int64, failed bool) {
  198. fans, failed = s.getUpperFans(c, mid)
  199. t.UPSpecial = s.getUpSpecial(mid, fans)
  200. return
  201. }
  202. func (s *Service) getUpSpecial(mid int64, fans int64) (upspecial int8) {
  203. switch {
  204. case s.isWhite(mid): //优质
  205. if fans >= task.SuperUpperTH {
  206. upspecial = task.UpperSuperWhite
  207. } else if fans >= task.BigUpperTH {
  208. upspecial = task.UpperSuperWhite
  209. } else {
  210. upspecial = task.UpperWhite
  211. }
  212. case s.isBlack(mid):
  213. if fans >= task.SuperUpperTH {
  214. upspecial = task.UpperSuperBlack
  215. } else {
  216. upspecial = task.UpperBlack
  217. }
  218. default:
  219. if fans >= task.SuperUpperTH {
  220. upspecial = task.UpperSuperNormal
  221. } else if fans >= task.BigUpperTH {
  222. upspecial = task.UpperBigNormal
  223. }
  224. }
  225. return
  226. }
  227. // 设置定时发布的审核任务
  228. func (s *Service) setTaskTimed(c context.Context, t *task.Task) {
  229. adelay, err := s.arc.Delay(c, t.Aid)
  230. if err != nil {
  231. log.Error("s.arc.Delay(%d) error(%v)", t.Aid, err)
  232. return
  233. }
  234. if adelay != nil && adelay.State == 0 && !adelay.DTime.IsZero() {
  235. t.Ptime = utils.NewFormatTime(adelay.DTime)
  236. }
  237. }
  238. func (s *Service) setTaskUpFrom(c context.Context, aid int64, tp *task.WeightParams) {
  239. if addit, _ := s.arc.Addit(c, aid); addit != nil {
  240. tp.UpFrom = addit.UpFrom
  241. }
  242. }
  243. func (s *Service) setTaskUpGroup(c context.Context, mid int64, tp *task.WeightParams) {
  244. ugs := []int8{}
  245. for gid, cache := range s.upperCache {
  246. if _, ok := cache[mid]; ok {
  247. ugs = append(ugs, gid)
  248. }
  249. }
  250. tp.UpGroups = ugs
  251. }
  252. // 设置配置用户权重的审核任务
  253. func (s *Service) getConfWeight(c context.Context, t *task.Task, a *archive.Archive) (cfitems []*task.ConfigItem) {
  254. // 1. 按照用户
  255. if confs, ok := s.weightCache[task.WConfMid]; ok {
  256. if conf, ok := confs[a.Mid]; ok {
  257. cfitems = append(cfitems, conf)
  258. }
  259. }
  260. // 2. 按照分区
  261. if confs, ok := s.weightCache[task.WConfType]; ok {
  262. if conf, ok := confs[int64(a.TypeID)]; ok {
  263. cfitems = append(cfitems, conf)
  264. }
  265. }
  266. // 3. 按照投稿来源
  267. if confs, ok := s.weightCache[task.WConfUpFrom]; ok {
  268. addit, err := s.arc.Addit(c, a.ID)
  269. if addit == nil {
  270. if err != nil {
  271. log.Error(" s.arc.Addit(%d) error(%v)", t.Aid, err)
  272. }
  273. return
  274. }
  275. if conf, ok := confs[int64(addit.UpFrom)]; ok {
  276. cfitems = append(cfitems, conf)
  277. }
  278. }
  279. return
  280. }
  281. // 权重配置
  282. func (s *Service) weightConf(c context.Context) (cfc map[int8]map[int64]*task.ConfigItem, err error) {
  283. var (
  284. ids []int64
  285. arrcfs []*task.ConfigItem
  286. )
  287. if arrcfs, err = s.arc.WeightConf(context.TODO()); err != nil {
  288. log.Error("s.arc.WeightConf error(%v)", err)
  289. return
  290. }
  291. cfc = map[int8]map[int64]*task.ConfigItem{}
  292. for _, item := range arrcfs {
  293. if !item.Et.TimeValue().IsZero() && item.Et.TimeValue().Before(time.Now()) {
  294. ids = append(ids, item.ID)
  295. continue
  296. }
  297. if _, ok := cfc[item.Radio]; !ok {
  298. cfc[item.Radio] = make(map[int64]*task.ConfigItem)
  299. }
  300. cfc[item.Radio][item.CID] = item
  301. }
  302. if len(ids) > 0 {
  303. log.Info("task config(%v) 权重配置已过期,自动失效", ids)
  304. s.arc.DelWeightConfs(c, ids)
  305. }
  306. return
  307. }
  308. func (s *Service) taskWeightConsumer() {
  309. var c = context.TODO()
  310. defer s.waiter.Done()
  311. for {
  312. if s.closed {
  313. return
  314. }
  315. select {
  316. case item, ok := <-s.jumpchan: //插队序列
  317. if !ok {
  318. log.Error("s.jumpchan closed")
  319. return
  320. }
  321. s.upTaskWeightDB(c, item)
  322. case item, ok := <-s.tasklogchan:
  323. if !ok {
  324. log.Error("s.tasklogchan closed")
  325. return
  326. }
  327. s.hbase.AddLog(c, item)
  328. default:
  329. time.Sleep(time.Second)
  330. }
  331. }
  332. }
  333. func (s *Service) taskweightproc() {
  334. defer s.waiter.Done()
  335. for {
  336. if s.closed {
  337. return
  338. }
  339. s.upTaskWeightCache()
  340. log.Info("taskweightproc 插队序列(%d) 日志序列(%d)", len(s.jumpchan), len(s.tasklogchan))
  341. time.Sleep(3 * time.Minute)
  342. }
  343. }