weight.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. "go-common/app/admin/main/videoup-task/model"
  8. account "go-common/app/service/main/account/api"
  9. "go-common/library/log"
  10. "go-common/library/sync/errgroup"
  11. "go-common/library/xstr"
  12. )
  13. // weightConf 所有激活的配置项
  14. func (s *Service) weightConf(c context.Context) (wconfs map[int8]map[int64]*model.WCItem, err error) {
  15. items, err := s.dao.WeightConf(c)
  16. if err != nil {
  17. log.Error("WeightConf error(%v)", err)
  18. return
  19. }
  20. wconfs = map[int8]map[int64]*model.WCItem{}
  21. for _, item := range items {
  22. conf, ok := wconfs[item.Radio]
  23. if !ok {
  24. conf = make(map[int64]*model.WCItem)
  25. }
  26. conf[item.CID] = item
  27. wconfs[item.Radio] = conf
  28. }
  29. return
  30. }
  31. // 同一类型的同id配置只能有一个
  32. func (s *Service) checkConflict(c context.Context, mcases map[int64]*model.WCItem) (err error) {
  33. wcf, err := s.weightConf(c)
  34. if err != nil {
  35. log.Error("s.weightConf error(%v)", err)
  36. return
  37. }
  38. s.twConCache = wcf
  39. for _, item := range mcases {
  40. if cfgs, ok := s.twConCache[item.Radio]; ok {
  41. if cfg, ok := cfgs[item.CID]; ok {
  42. return fmt.Errorf("config(%d) conflict with (id=%d,desc=%s)", item.CID, cfg.ID, cfg.Desc)
  43. }
  44. }
  45. }
  46. return
  47. }
  48. // AddWeightConf 配置权重
  49. func (s *Service) AddWeightConf(c context.Context, cfg *model.WeightConf, uid int64, uname string) (err error) {
  50. //0. 一级分区转化为二级分区
  51. if cfg.Radio == model.WConfType {
  52. cfg.Ids = s.tarnsType(c, cfg.Ids)
  53. }
  54. //1. 解析表单提交的配置
  55. mcases, istaskid, err := model.ParseWeightConf(cfg, uid, uname)
  56. if err != nil {
  57. log.Error("model.ParseWeightConf(%v, %v, %v) error(%v)", cfg, uname, err)
  58. return
  59. }
  60. //2. 检查是否互相冲突
  61. if err = s.checkConflict(c, mcases); err != nil {
  62. log.Error("s.checkConflict error(%v)", err)
  63. return
  64. }
  65. //3. 更新任务的权重配置
  66. if istaskid {
  67. if err = s.setWeightConf(c, cfg.Ids, mcases); err != nil {
  68. log.Error("s.setWeightConf error(%v)", err)
  69. return
  70. }
  71. }
  72. //4. 将配置存储到数据库表
  73. if err = s.dao.InWeightConf(c, mcases); err != nil {
  74. log.Error("s.dao.InWeightConf(%v) error(%v)", mcases, err)
  75. return
  76. }
  77. return
  78. }
  79. // DelWeightConf 删除配置项
  80. func (s *Service) DelWeightConf(c context.Context, id int64) (err error) {
  81. if _, err = s.dao.DelWeightConf(c, id); err != nil {
  82. log.Error("s.DelWeightConf(%d) error(%v)", id, err)
  83. }
  84. return
  85. }
  86. // ListWeightConf 列出配置
  87. func (s *Service) ListWeightConf(c context.Context, v *model.Confs) (cfg []*model.WCItem, err error) {
  88. if cfg, err = s.dao.ListWeightConf(c, v); err != nil {
  89. log.Error("s.ListWeightConf(%+v) error(%v)", v, err)
  90. return
  91. }
  92. // 根据taskid补充filename和title
  93. switch v.Radio {
  94. case model.WConfMid: // 补充昵称和粉丝数
  95. s.singleIDtoName(c, cfg, func(c context.Context, cid int64) (res []interface{}, err error) {
  96. stat, err := s.profile(c, cid)
  97. if err != nil {
  98. log.Error("s.profile(%d) error(%v)", cid, err)
  99. return
  100. }
  101. res = []interface{}{stat.Profile.Name, stat.Follower}
  102. return
  103. }, false, "CID", "Creator", "Fans")
  104. case model.WConfTaskID: //补充title和filename
  105. s.mulIDtoName(c, cfg, s.dao.LWConfigHelp, "CID", "FileName", "Title", "Vid")
  106. case model.WConfType: //补充分区名称
  107. s.singleIDtoName(c, cfg, func(c context.Context, cid int64) (res []interface{}, err error) {
  108. if item, ok := s.typeCache[int16(cid)]; ok {
  109. res = []interface{}{item.Name}
  110. }
  111. return
  112. }, false, "CID", "TypeName")
  113. case model.WConfUpFrom: //补充投稿来源
  114. s.singleIDtoName(c, cfg, func(c context.Context, cid int64) (res []interface{}, err error) {
  115. res = []interface{}{model.UpFrom(int8(cid))}
  116. return
  117. }, false, "CID", "UpFrom")
  118. default:
  119. }
  120. return
  121. }
  122. // ListWeightLogs 权重变更日志
  123. func (s *Service) ListWeightLogs(c context.Context, taskid int64, page int) (cfg []*model.TaskWeightLog, items int64, err error) {
  124. cfg, err = s.dao.WeightLog(c, taskid)
  125. if err != nil {
  126. log.Info("s.ListWeightLogs(%d) error(%v)", taskid, err)
  127. return
  128. }
  129. cfg, items, err = s.weightLogHelp(c, taskid, cfg, page)
  130. return
  131. }
  132. // MaxWeight 当前的权重最大值, 供用户配置做参考
  133. func (s *Service) MaxWeight(c context.Context) (max int64, err error) {
  134. return s.dao.GetMaxWeight(c)
  135. }
  136. // 补充查询日志需要的信息
  137. func (s *Service) weightLogHelp(c context.Context, taskid int64, twl []*model.TaskWeightLog, page int) (retwl []*model.TaskWeightLog, items int64, err error) {
  138. var (
  139. mid int64
  140. ps *account.ProfileStatReply
  141. name string
  142. fans int
  143. upspecial []int8
  144. )
  145. task, err := s.dao.TaskByID(c, taskid)
  146. if task == nil {
  147. log.Error("s.dao.TaskByID(%d) err(%v)", taskid, err)
  148. return
  149. }
  150. // 1.获取用户信息
  151. items = int64(len(twl))
  152. if items == 0 {
  153. return
  154. }
  155. mid = twl[items-1].Mid
  156. if mid == 0 {
  157. log.Error("weightLogHelp taskid=%d miss mid", taskid)
  158. goto EMPTYMID
  159. }
  160. ps, err = s.profile(c, mid)
  161. if err != nil || ps == nil {
  162. log.Error("can't get Account.Card(%d) err(%v)", mid, err)
  163. err = nil
  164. } else {
  165. name = ps.Profile.Name
  166. fans = int(ps.Follower)
  167. }
  168. upspecial = s.getSpecial(mid)
  169. EMPTYMID:
  170. retwl = []*model.TaskWeightLog{}
  171. // 反转日志顺序
  172. var count int
  173. for i := len(twl) - 1 - (page-1)*20; i >= 0 && count <= 20; func() { i--; count++ }() {
  174. twl[i].Creator = name
  175. twl[i].UpSpecial = upspecial
  176. twl[i].Fans = int64(fans)
  177. twl[i].Wait = twl[i].Uptime.TimeValue().Sub(task.CTime.TimeValue()).Seconds() * 1000.0
  178. if !task.PTime.TimeValue().IsZero() {
  179. twl[i].Ptime = string(task.PTime)
  180. }
  181. if len(twl[i].CfItems) > 0 {
  182. for _, item := range twl[i].CfItems {
  183. var desc = item.Desc
  184. var v = new(model.WCItem)
  185. if err = json.Unmarshal([]byte(item.Desc), v); err == nil { //兼容新旧日志格式
  186. desc = v.Desc
  187. }
  188. err = nil
  189. twl[i].Desc += fmt.Sprintf("%s:%s; ", model.CfWeightDesc(item.Radio), desc)
  190. }
  191. }
  192. retwl = append(retwl, twl[i])
  193. }
  194. return
  195. }
  196. func (s *Service) getSpecial(mid int64) []int8 {
  197. upspecial := []int8{}
  198. for k, v := range s.upperCache {
  199. if _, ok := v[mid]; ok {
  200. upspecial = append(upspecial, k)
  201. }
  202. }
  203. return upspecial
  204. }
  205. func (s *Service) setWeightConf(c context.Context, ids string, mcases map[int64]*model.WCItem) (err error) {
  206. var mitems map[int64]*model.TaskPriority
  207. arrid, _ := xstr.SplitInts(ids)
  208. // 1. 获取旧的配置
  209. if mitems, err = s.getTWCache(c, arrid); err != nil {
  210. log.Error("s.getTWCache(%v) error(%v)", arrid, err)
  211. return
  212. }
  213. if len(mitems) == 0 {
  214. return fmt.Errorf("没有找到任务(%v)", arrid)
  215. }
  216. // 2. 将新加配置加入
  217. for _, item := range mitems {
  218. if one, ok := mcases[item.TaskID]; ok && one != nil {
  219. one.Mtime = model.NewFormatTime(time.Now())
  220. item.CfItems = append(item.CfItems, one)
  221. }
  222. }
  223. // 3. 更新配置到redis和数据库
  224. var wg errgroup.Group
  225. wg.Go(func() (err error) {
  226. return s.dao.SetWeightRedis(c, mitems)
  227. })
  228. for _, item := range mitems {
  229. var descb []byte
  230. v := item
  231. wg.Go(func() (err error) {
  232. if descb, err = json.Marshal(v.CfItems); err != nil {
  233. return
  234. }
  235. _, err = s.dao.UpCwAfterAdd(c, v.TaskID, string(descb))
  236. return
  237. })
  238. }
  239. err = wg.Wait()
  240. return
  241. }
  242. // ShowWeightVC 展示权重配置值
  243. func (s *Service) ShowWeightVC(c context.Context) (wvc *model.WeightVC, err error) {
  244. if wvc, err = s.dao.WeightVC(c); err != nil {
  245. log.Error("s.dao.WeightVC error(%v)", err)
  246. return
  247. }
  248. if wvc == nil {
  249. wvc = model.WLVConf
  250. s.dao.InWeightVC(c, wvc, "默认权重配置")
  251. }
  252. return
  253. }
  254. // SetWeightVC 设置权重配置
  255. func (s *Service) SetWeightVC(c context.Context, wvc *model.WeightVC) (err error) {
  256. _, err = s.dao.SetWeightVC(c, wvc, "设置权重配置")
  257. return
  258. }
  259. // 处理一级分区转二级分区
  260. func (s *Service) tarnsType(c context.Context, ids string) (res string) {
  261. var (
  262. mapNoNeed = make(map[int16]struct{}) // 不需要的一级分区
  263. mapNeed = make(map[int16]struct{}) // 需要的一级分区
  264. idres = []int64{}
  265. )
  266. arrid, _ := xstr.SplitInts(ids)
  267. for _, id := range arrid {
  268. if mod, ok := s.typeCache[int16(id)]; ok {
  269. if mod.PID == 0 {
  270. mapNeed[int16(id)] = struct{}{}
  271. } else {
  272. mapNoNeed[mod.PID] = struct{}{}
  273. idres = append(idres, id)
  274. }
  275. }
  276. }
  277. for k := range mapNoNeed {
  278. delete(mapNeed, k)
  279. }
  280. for k := range mapNeed {
  281. idres = append(idres, s.typeCache2[k]...)
  282. }
  283. return xstr.JoinInts(idres)
  284. }
  285. func (s *Service) getTWCache(c context.Context, ids []int64) (mcases map[int64]*model.TaskPriority, err error) {
  286. if mcases, err = s.dao.GetWeightRedis(c, ids); len(mcases) != len(ids) {
  287. mcases, err = s.dao.GetWeightDB(c, ids)
  288. }
  289. return
  290. }