task_weight.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/admin/main/videoup/model/utils"
  7. "go-common/library/sync/errgroup"
  8. "go-common/library/xstr"
  9. "time"
  10. "go-common/app/admin/main/videoup/model/archive"
  11. accApi "go-common/app/service/main/account/api"
  12. "go-common/library/log"
  13. )
  14. // weightConf 所有激活的配置项
  15. func (s *Service) weightConf(c context.Context) (wconfs map[int8]map[int64]*archive.WCItem, err error) {
  16. items, err := s.arc.WeightConf(c)
  17. if err != nil {
  18. log.Error("WeightConf error(%v)", err)
  19. return
  20. }
  21. wconfs = map[int8]map[int64]*archive.WCItem{}
  22. for _, item := range items {
  23. conf, ok := wconfs[item.Radio]
  24. if !ok {
  25. conf = make(map[int64]*archive.WCItem)
  26. }
  27. conf[item.CID] = item
  28. wconfs[item.Radio] = conf
  29. }
  30. return
  31. }
  32. // 同一类型的同id配置只能有一个
  33. func (s *Service) checkConflict(c context.Context, mcases map[int64]*archive.WCItem) (err error) {
  34. wcf, err := s.weightConf(c)
  35. if err != nil {
  36. log.Error("s.weightConf error(%v)", err)
  37. return
  38. }
  39. s.twConCache = wcf
  40. for _, item := range mcases {
  41. if cfgs, ok := s.twConCache[item.Radio]; ok {
  42. if cfg, ok := cfgs[item.CID]; ok {
  43. return fmt.Errorf("config(%d) conflict with (id=%d,desc=%s)", item.CID, cfg.ID, cfg.Desc)
  44. }
  45. }
  46. }
  47. return
  48. }
  49. // AddWeightConf 配置权重
  50. func (s *Service) AddWeightConf(c context.Context, cfg *archive.WeightConf, uid int64, uname string) (err error) {
  51. //0. 一级分区转化为二级分区
  52. if cfg.Radio == archive.WConfType {
  53. cfg.Ids = s.tarnsType(c, cfg.Ids)
  54. }
  55. //1. 解析表单提交的配置
  56. mcases, istaskid, err := archive.ParseWeightConf(cfg, uid, uname)
  57. if err != nil {
  58. log.Error("archive.ParseWeightConf(%v, %v) error(%v)", cfg, uname, err)
  59. return
  60. }
  61. //2. 检查是否互相冲突
  62. if err = s.checkConflict(c, mcases); err != nil {
  63. log.Error("s.checkConflict error(%v)", err)
  64. return
  65. }
  66. //3. 更新任务的权重配置
  67. if istaskid {
  68. if err = s.setWeightConf(c, cfg.Ids, mcases); err != nil {
  69. log.Error("s.setWeightConf error(%v)", err)
  70. return
  71. }
  72. }
  73. //4. 将配置存储到数据库表
  74. if err = s.arc.InWeightConf(c, mcases); err != nil {
  75. log.Error("s.arc.InWeightConf(%v) error(%v)", mcases, err)
  76. return
  77. }
  78. return
  79. }
  80. // DelWeightConf 删除配置项
  81. func (s *Service) DelWeightConf(c context.Context, id int64) (err error) {
  82. if _, err = s.arc.DelWeightConf(c, id); err != nil {
  83. log.Error("s.DelWeightConf(%d) error(%v)", id, err)
  84. }
  85. return
  86. }
  87. // ListWeightConf 列出配置
  88. func (s *Service) ListWeightConf(c context.Context, v *archive.Confs) (cfg []*archive.WCItem, err error) {
  89. if cfg, err = s.arc.ListWeightConf(c, v); err != nil {
  90. log.Error("s.ListWeightConf(%+v) error(%v)", v, err)
  91. return
  92. }
  93. // 根据taskid补充filename和title
  94. switch v.Radio {
  95. case archive.WConfMid: // 补充昵称和粉丝数
  96. s.singleIDtoName(c, cfg, func(c context.Context, cid int64) (res []interface{}, err error) {
  97. stat, err := s.profile(c, cid)
  98. if err != nil {
  99. log.Error("s.profile(%d) error(%v)", cid, err)
  100. return
  101. }
  102. res = []interface{}{stat.Profile.Name, stat.Follower}
  103. return
  104. }, false, "CID", "Creator", "Fans")
  105. case archive.WConfTaskID: //补充title和filename
  106. s.mulIDtoName(c, cfg, s.arc.LWConfigHelp, "CID", "FileName", "Title", "Vid")
  107. case archive.WConfType: //补充分区名称
  108. s.singleIDtoName(c, cfg, func(c context.Context, cid int64) (res []interface{}, err error) {
  109. if item, ok := s.typeCache[int16(cid)]; ok {
  110. res = []interface{}{item.Name}
  111. }
  112. return
  113. }, false, "CID", "TypeName")
  114. case archive.WConfUpFrom: //补充投稿来源
  115. s.singleIDtoName(c, cfg, func(c context.Context, cid int64) (res []interface{}, err error) {
  116. res = []interface{}{archive.UpFrom(int8(cid))}
  117. return
  118. }, false, "CID", "UpFrom")
  119. default:
  120. }
  121. return
  122. }
  123. // ListWeightLogs 权重变更日志
  124. func (s *Service) ListWeightLogs(c context.Context, taskid int64, page int) (cfg []*archive.TaskWeightLog, items int64, err error) {
  125. cfg, err = s.arc.WeightLog(c, taskid)
  126. if err != nil {
  127. log.Info("s.ListWeightLogs(%d) error(%v)", taskid, err)
  128. return
  129. }
  130. cfg, items, err = s.weightLogHelp(c, taskid, cfg, page)
  131. return
  132. }
  133. // MaxWeight 当前的权重最大值, 供用户配置做参考
  134. func (s *Service) MaxWeight(c context.Context) (max int64, err error) {
  135. return s.arc.GetMaxWeight(c)
  136. }
  137. // 补充查询日志需要的信息
  138. func (s *Service) weightLogHelp(c context.Context, taskid int64, twl []*archive.TaskWeightLog, page int) (retwl []*archive.TaskWeightLog, items int64, err error) {
  139. var (
  140. mid int64
  141. ps *accApi.ProfileStatReply
  142. name string
  143. fans int
  144. upspecial []int8
  145. )
  146. task, err := s.arc.TaskByID(c, taskid)
  147. if task == nil {
  148. log.Error("s.arc.TaskByID(%d) err(%v)", taskid, err)
  149. return
  150. }
  151. // 1.获取用户信息
  152. items = int64(len(twl))
  153. if items == 0 {
  154. return
  155. }
  156. mid = twl[items-1].Mid
  157. if mid == 0 {
  158. log.Error("weightLogHelp taskid=%d miss mid", taskid)
  159. goto EMPTYMID
  160. }
  161. ps, err = s.profile(c, mid)
  162. if err != nil || ps == nil {
  163. log.Error("can't get Account.Card(%d) err(%v)", mid, err)
  164. err = nil
  165. } else {
  166. name = ps.Profile.Name
  167. fans = int(ps.Follower)
  168. }
  169. upspecial = s.getSpecial(mid)
  170. EMPTYMID:
  171. retwl = []*archive.TaskWeightLog{}
  172. // 反转日志顺序
  173. var count int
  174. for i := len(twl) - 1 - (page-1)*20; i >= 0 && count <= 20; func() { i--; count++ }() {
  175. twl[i].Creator = name
  176. twl[i].UpSpecial = upspecial
  177. twl[i].Fans = int64(fans)
  178. twl[i].Wait = twl[i].Uptime.TimeValue().Sub(task.CTime.TimeValue()).Seconds() * 1000.0
  179. if !task.PTime.TimeValue().IsZero() {
  180. twl[i].Ptime = string(task.PTime)
  181. }
  182. if len(twl[i].CfItems) > 0 {
  183. for _, item := range twl[i].CfItems {
  184. var desc = item.Desc
  185. var v = new(archive.WCItem)
  186. if err = json.Unmarshal([]byte(item.Desc), v); err == nil { //兼容新旧日志格式
  187. desc = v.Desc
  188. }
  189. err = nil
  190. twl[i].Desc += fmt.Sprintf("%s:%s; ", archive.CfWeightDesc(item.Radio), desc)
  191. }
  192. }
  193. retwl = append(retwl, twl[i])
  194. }
  195. return
  196. }
  197. func (s *Service) getSpecial(mid int64) []int8 {
  198. upspecial := []int8{}
  199. for k, v := range s.upperCache {
  200. if _, ok := v[mid]; ok {
  201. upspecial = append(upspecial, k)
  202. }
  203. }
  204. return upspecial
  205. }
  206. func (s *Service) setWeightConf(c context.Context, ids string, mcases map[int64]*archive.WCItem) (err error) {
  207. var mitems map[int64]*archive.TaskPriority
  208. arrid, _ := xstr.SplitInts(ids)
  209. // 1. 获取旧的配置
  210. if mitems, err = s.getTWCache(c, arrid); err != nil {
  211. log.Error("s.getTWCache(%v) error(%v)", arrid, err)
  212. return
  213. }
  214. if len(mitems) == 0 {
  215. return fmt.Errorf("没有找到任务(%v)", arrid)
  216. }
  217. // 2. 将新加配置加入
  218. for _, item := range mitems {
  219. if one, ok := mcases[item.TaskID]; ok && one != nil {
  220. one.Mtime = utils.NewFormatTime(time.Now())
  221. item.CfItems = append(item.CfItems, one)
  222. }
  223. }
  224. // 3. 更新配置到redis和数据库
  225. var wg errgroup.Group
  226. wg.Go(func() (err error) {
  227. return s.task.SetWeightRedis(c, mitems)
  228. })
  229. for _, item := range mitems {
  230. var descb []byte
  231. v := item
  232. wg.Go(func() (err error) {
  233. if descb, err = json.Marshal(v.CfItems); err != nil {
  234. return
  235. }
  236. _, err = s.arc.UpCwAfterAdd(c, v.TaskID, string(descb))
  237. return
  238. })
  239. }
  240. err = wg.Wait()
  241. return
  242. }
  243. // ShowWeightVC 展示权重配置值
  244. func (s *Service) ShowWeightVC(c context.Context) (wvc *archive.WeightVC, err error) {
  245. if wvc, err = s.arc.WeightVC(c); err != nil {
  246. log.Error("s.arc.WeightVC error(%v)", err)
  247. return
  248. }
  249. if wvc == nil {
  250. wvc = archive.WLVConf
  251. s.arc.InWeightVC(c, wvc, "默认权重配置")
  252. }
  253. return
  254. }
  255. // SetWeightVC 设置权重配置
  256. func (s *Service) SetWeightVC(c context.Context, wvc *archive.WeightVC) (err error) {
  257. _, err = s.arc.SetWeightVC(c, wvc, "设置权重配置")
  258. return
  259. }
  260. // 处理一级分区转二级分区
  261. func (s *Service) tarnsType(c context.Context, ids string) (res string) {
  262. var (
  263. mapNoNeed = make(map[int16]struct{}) // 不需要的一级分区
  264. mapNeed = make(map[int16]struct{}) // 需要的一级分区
  265. idres = []int64{}
  266. )
  267. arrid, _ := xstr.SplitInts(ids)
  268. for _, id := range arrid {
  269. if mod, ok := s.typeCache[int16(id)]; ok {
  270. if mod.PID == 0 {
  271. mapNeed[int16(id)] = struct{}{}
  272. } else {
  273. mapNoNeed[mod.PID] = struct{}{}
  274. idres = append(idres, id)
  275. }
  276. }
  277. }
  278. for k := range mapNoNeed {
  279. delete(mapNeed, k)
  280. }
  281. for k := range mapNeed {
  282. idres = append(idres, s.typeCache2[k]...)
  283. }
  284. return xstr.JoinInts(idres)
  285. }