dm.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. "sync"
  8. "go-common/app/admin/main/dm/model"
  9. "go-common/app/admin/main/dm/model/oplog"
  10. accountApi "go-common/app/service/main/account/api"
  11. account "go-common/app/service/main/account/model"
  12. "go-common/app/service/main/archive/model/archive"
  13. "go-common/library/log"
  14. "go-common/library/sync/errgroup"
  15. )
  16. const (
  17. _pageSize = 50
  18. )
  19. // dms get dm list from database.
  20. func (s *Service) dms(c context.Context, tp int32, oid int64, dmids []int64) (dms []*model.DM, err error) {
  21. if len(dmids) == 0 {
  22. return
  23. }
  24. contentSpe := make(map[int64]*model.ContentSpecial)
  25. idxMap, special, err := s.dao.IndexsByID(c, tp, oid, dmids)
  26. if err != nil || len(idxMap) == 0 {
  27. return
  28. }
  29. contents, err := s.dao.Contents(c, oid, dmids)
  30. if err != nil {
  31. return
  32. }
  33. if len(special) > 0 {
  34. if contentSpe, err = s.dao.SpecialContents(c, special); err != nil {
  35. return
  36. }
  37. }
  38. for _, content := range contents {
  39. if idx, ok := idxMap[content.ID]; ok {
  40. idx.Content = content
  41. if idx.Pool == model.PoolSpecial {
  42. if _, ok = contentSpe[idx.ID]; ok {
  43. idx.ContentSpe = contentSpe[idx.ID]
  44. }
  45. }
  46. dms = append(dms, idx)
  47. }
  48. }
  49. return
  50. }
  51. // DMSearch danmu list from search.
  52. func (s *Service) DMSearch(c context.Context, p *model.SearchDMParams) (res *model.SearchDMResult, err error) {
  53. var (
  54. mids, dmids []int64
  55. sorted []*model.DM
  56. protectCnt int64
  57. dmMap = make(map[int64]*model.DM)
  58. uidMap = make(map[int64]bool)
  59. )
  60. res = &model.SearchDMResult{}
  61. sub, err := s.dao.Subject(c, p.Type, p.Oid)
  62. if err != nil {
  63. log.Error("s.dao.Subject(%d,%d) error(%v)", p.Type, p.Oid, err)
  64. return
  65. }
  66. if sub == nil {
  67. return
  68. }
  69. srchData, err := s.dao.SearchDM(c, p)
  70. if err != nil {
  71. log.Error("s.dao.SearchDM(%v) error(%v)", p, err)
  72. return
  73. }
  74. if srchData == nil {
  75. return
  76. }
  77. for _, v := range srchData.Result {
  78. dmids = append(dmids, v.ID)
  79. }
  80. dms, err := s.dms(c, p.Type, p.Oid, dmids)
  81. if err != nil {
  82. log.Error("s.dms(%d,%v) error(%v)", p.Oid, dmids, err)
  83. return
  84. }
  85. for _, dm := range dms {
  86. dmMap[dm.ID] = dm
  87. if _, ok := uidMap[dm.Mid]; !ok && dm.Mid > 0 {
  88. uidMap[dm.Mid] = true
  89. mids = append(mids, dm.Mid)
  90. }
  91. }
  92. for _, dmid := range dmids {
  93. if dm, ok := dmMap[dmid]; ok {
  94. sorted = append(sorted, dm)
  95. }
  96. }
  97. total := len(mids)
  98. pageNum := total / _pageSize
  99. if total%_pageSize != 0 {
  100. pageNum++
  101. }
  102. var (
  103. g errgroup.Group
  104. lk sync.Mutex
  105. infoMap = make(map[int64]*account.Info, total)
  106. )
  107. for i := 0; i < pageNum; i++ {
  108. start := i * _pageSize
  109. end := (i + 1) * _pageSize
  110. if end > total {
  111. end = total
  112. }
  113. g.Go(func() (err error) {
  114. var (
  115. arg = &accountApi.MidsReq{Mids: mids[start:end]}
  116. res *accountApi.InfosReply
  117. )
  118. if res, err = s.accountRPC.Infos3(c, arg); err != nil {
  119. log.Error("s.accRPC.Infos3(%v) error(%v)", arg, err)
  120. } else {
  121. for mid, info := range res.GetInfos() {
  122. lk.Lock()
  123. infoMap[mid] = info
  124. lk.Unlock()
  125. }
  126. }
  127. return
  128. })
  129. }
  130. g.Go(func() (err error) {
  131. if protectCnt, err = s.dao.SearchProtectCount(context.TODO(), p.Type, p.Oid); err != nil {
  132. log.Error("s.dao.SearchProtectCount(%d,%d) error(%v)", p.Type, p.Oid, err)
  133. }
  134. return
  135. })
  136. if err = g.Wait(); err != nil {
  137. return
  138. }
  139. for _, dm := range sorted {
  140. msg := dm.Content.Msg
  141. if dm.Pool == model.PoolSpecial && dm.ContentSpe != nil {
  142. msg = dm.ContentSpe.Msg
  143. }
  144. item := &model.DMItem{
  145. IDStr: strconv.FormatInt(dm.ID, 10),
  146. ID: dm.ID,
  147. Type: dm.Type,
  148. Oid: dm.Oid,
  149. Mid: dm.Mid,
  150. Pool: dm.Pool,
  151. State: dm.State,
  152. Attrs: dm.AttrNtoA(),
  153. Msg: msg,
  154. Ctime: dm.Ctime,
  155. Mode: dm.Content.Mode,
  156. IP: dm.Content.IP,
  157. Color: fmt.Sprintf("#%06X", dm.Content.Color),
  158. Progress: dm.Progress,
  159. Fontsize: dm.Content.FontSize,
  160. }
  161. if info, ok := infoMap[dm.Mid]; ok {
  162. item.Uname = info.Name
  163. }
  164. res.Result = append(res.Result, item)
  165. }
  166. res.MaxLimit = sub.Maxlimit
  167. res.Total = sub.ACount
  168. res.Page = srchData.Page.Num
  169. res.Pagesize = srchData.Page.Size
  170. res.Deleted = sub.ACount - sub.Count
  171. res.Protected = protectCnt
  172. res.Count = srchData.Page.Total
  173. return
  174. }
  175. // XMLCacheFlush 刷新弹幕缓存
  176. func (s *Service) XMLCacheFlush(c context.Context, tp int32, oid int64) {
  177. v := make(map[string]interface{})
  178. v["type"] = tp
  179. v["oid"] = oid
  180. v["force"] = true
  181. data, err := json.Marshal(v)
  182. if err != nil {
  183. log.Error("json.Marshal(%v) error(%v)", v, err)
  184. return
  185. }
  186. action := &model.Action{Action: model.ActFlushDM, Data: data, Oid: oid}
  187. s.addAction(action)
  188. }
  189. // EditDMState multi edit dm state.
  190. func (s *Service) EditDMState(c context.Context, tp, state int32, oid int64, reason int8, dmids []int64, moral float64, adminID int64, operator, remark string) (err error) {
  191. if err = s.editDmState(c, tp, state, oid, reason, dmids, moral, adminID, operator, remark); err != nil {
  192. log.Error("s.dao.UpSearchDMState(%d,%d,%v) err (%v)", tp, oid, dmids, err)
  193. return
  194. }
  195. // update dm search index
  196. if err = s.uptSearchDmState(c, tp, state, map[int64][]int64{oid: dmids}); err != nil {
  197. log.Error("s.dao.UpSearchDMState(%d,%d,%v) err (%v)", tp, oid, dmids, err)
  198. }
  199. return
  200. }
  201. // editDmState multi edit dm state.
  202. func (s *Service) editDmState(c context.Context, tp, state int32, oid int64, reason int8, dmids []int64, moral float64, adminID int64, operator, remark string) (err error) {
  203. sub, err := s.dao.Subject(c, tp, oid)
  204. if err != nil || sub == nil {
  205. return
  206. }
  207. dms, err := s.dms(c, tp, oid, dmids)
  208. if err != nil {
  209. return
  210. }
  211. count := countDMNum(dms, state)
  212. affect, err := s.dao.SetStateByIDs(c, tp, oid, dmids, state)
  213. if err != nil || affect == 0 {
  214. return
  215. }
  216. if sub.Count+count < 0 {
  217. count = -sub.Count
  218. }
  219. // update dm_index count
  220. if _, err = s.dao.IncrSubjectCount(c, tp, oid, count); err != nil {
  221. return
  222. }
  223. // write dm admin log
  224. if affect > 0 {
  225. if remark == "" {
  226. remark = model.AdminRptReason[reason]
  227. }
  228. s.OpLog(c, oid, adminID, tp, dmids, "status", "", fmt.Sprint(state), remark, oplog.SourceManager, oplog.OperatorAdmin)
  229. }
  230. // update dm monitor count
  231. if sub.IsMonitoring() {
  232. s.oidLock.Lock()
  233. s.moniOidMap[sub.Oid] = struct{}{}
  234. s.oidLock.Unlock()
  235. }
  236. // flush xml cache
  237. s.XMLCacheFlush(c, tp, oid)
  238. // reduce moral
  239. uidMap := make(map[int64]struct{})
  240. for _, dm := range dms {
  241. if _, ok := uidMap[dm.Mid]; !ok {
  242. uidMap[dm.Mid] = struct{}{}
  243. }
  244. }
  245. if len(uidMap) > 0 && moral > 0 {
  246. for uid := range uidMap {
  247. s.reduceMoral(c, uid, int64(-moral), reason, operator, "弹幕管理")
  248. }
  249. }
  250. return
  251. }
  252. // EditDMPool edit dm pool.
  253. func (s *Service) EditDMPool(c context.Context, tp int32, oid int64, pool int32, dmids []int64, adminID int64) (err error) {
  254. sub, err := s.dao.Subject(c, tp, oid)
  255. if err != nil || sub == nil {
  256. return
  257. }
  258. if sub.Childpool < pool {
  259. if _, err = s.dao.UpSubjectPool(c, tp, oid, pool); err != nil {
  260. return
  261. }
  262. }
  263. affect, err := s.dao.SetPoolIDByIDs(c, tp, oid, pool, dmids)
  264. if err != nil {
  265. return
  266. }
  267. if affect > 0 {
  268. if pool == model.PoolNormal {
  269. s.dao.IncrSubMoveCount(c, tp, oid, -affect) // NOTE update move_count,ignore error
  270. } else {
  271. s.dao.IncrSubMoveCount(c, tp, oid, affect) // NOTE update move_count,ignore error
  272. }
  273. s.OpLog(c, oid, adminID, tp, dmids, "pool", "", fmt.Sprint(pool), "弹幕池变更", oplog.SourceManager, oplog.OperatorAdmin)
  274. }
  275. if err = s.uptSearchDMPool(c, tp, oid, pool, dmids); err != nil {
  276. log.Error("s.dao.UpSearchDMPool(%d,%d,%v) err (%v)", tp, oid, dmids, err)
  277. }
  278. return
  279. }
  280. // EditDMAttr change dm attr
  281. func (s *Service) EditDMAttr(c context.Context, tp int32, oid int64, dmids []int64, bit uint, value int32, adminID int64) (err error) {
  282. var (
  283. eg = errgroup.Group{}
  284. attrMap = make(map[int32][]int64)
  285. )
  286. dms, err := s.dms(c, tp, oid, dmids)
  287. if err != nil {
  288. log.Error("s.dms(oid:%d ids:%v) error(%v)", oid, dmids, err)
  289. return
  290. }
  291. for _, dm := range dms {
  292. dm.AttrSet(value, bit)
  293. attrMap[dm.Attr] = append(attrMap[dm.Attr], dm.ID)
  294. }
  295. for k, v := range attrMap {
  296. attr := k
  297. ids := v
  298. eg.Go(func() (err error) {
  299. affect, err := s.dao.SetAttrByIDs(c, tp, oid, ids, attr)
  300. if err != nil {
  301. log.Error("s.dao.SetAttrByIDs(oid:%d ids:%v) error(%v)", oid, ids, err)
  302. return
  303. }
  304. if affect > 0 {
  305. s.OpLog(c, oid, adminID, tp, ids, "attribute", "", fmt.Sprint(attr), "弹幕保护状态变更", oplog.SourceManager, oplog.OperatorAdmin)
  306. }
  307. if err = s.uptSearchDMAttr(c, tp, oid, attr, dmids); err != nil {
  308. log.Error("dao.UpSearchDMAttr(oid:%d,attr:%d) error(%v)", oid, attr, err)
  309. }
  310. return
  311. })
  312. }
  313. return eg.Wait()
  314. }
  315. // DMIndexInfo get dm index info
  316. func (s *Service) DMIndexInfo(c context.Context, cid int64) (info *model.DMIndexInfo, err error) {
  317. info = new(model.DMIndexInfo)
  318. sub, err := s.dao.Subject(c, model.SubTypeVideo, cid)
  319. if err != nil || sub == nil {
  320. return
  321. }
  322. argAid2 := &archive.ArgAid2{Aid: sub.Pid}
  323. arc, err := s.arcRPC.Archive3(c, argAid2)
  324. if err != nil {
  325. log.Error("s.arcRPC.Archive3(%v) error(%v)", argAid2, err)
  326. err = nil
  327. } else {
  328. info.Title = arc.Title
  329. info.Cover = arc.Pic
  330. }
  331. argVideo := &archive.ArgVideo2{Aid: sub.Pid, Cid: cid}
  332. video, err := s.arcRPC.Video3(c, argVideo)
  333. if err != nil {
  334. log.Error("s.arcRPC.Video3(%v) error(%v)", argVideo, err)
  335. err = nil
  336. } else {
  337. info.Duration = video.Duration
  338. info.ETitle = video.Part
  339. }
  340. argMid := &accountApi.MidReq{Mid: sub.Mid}
  341. uInfo, err := s.accountRPC.Info3(c, argMid)
  342. if err != nil {
  343. log.Error("s.accRPC.Info3(%v) error(%v)", argMid, err)
  344. err = nil
  345. } else {
  346. info.UName = uInfo.GetInfo().GetName()
  347. }
  348. info.AID = sub.Pid
  349. info.CID = sub.Oid
  350. info.MID = sub.Mid
  351. info.Limit = sub.Maxlimit
  352. if sub.State == model.SubStateOpen {
  353. info.Active = 1
  354. } else {
  355. info.Active = 0
  356. }
  357. info.CTime = int64(sub.Ctime)
  358. info.MTime = int64(sub.Mtime)
  359. return
  360. }
  361. // countDMNum count state changed dm count
  362. func countDMNum(dms []*model.DM, state int32) (count int64) {
  363. for _, dm := range dms {
  364. if model.DMVisible(dm.State) && !model.DMVisible(state) {
  365. count--
  366. } else if !model.DMVisible(dm.State) && model.DMVisible(state) {
  367. count++
  368. }
  369. }
  370. return count
  371. }
  372. // FixDMCount fix dm acount,count of aid.
  373. func (s *Service) FixDMCount(c context.Context, aid int64) (err error) {
  374. var (
  375. arg = archive.ArgAid2{Aid: aid}
  376. oids []int64
  377. states = []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13} // 弹幕所有状态
  378. normalState = []int64{0, 2, 6} // 前台可能展示的弹幕状态
  379. )
  380. pages, err := s.arcRPC.Page3(c, &arg)
  381. if err != nil {
  382. log.Error("arcRPC.Page3(%v) error(%v)", arg, err)
  383. return
  384. }
  385. if len(pages) == 0 {
  386. log.Warn("aid:%d have no pages", aid)
  387. return
  388. }
  389. for _, page := range pages {
  390. oids = append(oids, page.Cid)
  391. }
  392. subs, err := s.dao.Subjects(c, model.SubTypeVideo, oids)
  393. if err != nil {
  394. return
  395. }
  396. for _, sub := range subs {
  397. tp := sub.Type
  398. oid := sub.Oid
  399. s.cache.Do(c, func(ctx context.Context) {
  400. acount, err := s.dao.DMCount(ctx, tp, oid, states)
  401. if err != nil {
  402. return
  403. }
  404. count, err := s.dao.DMCount(ctx, tp, oid, normalState)
  405. if err != nil {
  406. return
  407. }
  408. s.dao.UpSubjectCount(ctx, tp, oid, acount, count) // 更新新库dm_subject
  409. log.Info("fix dm count,type:%d,oid:%d,acount:%d,count:%d", tp, oid, acount, count)
  410. })
  411. }
  412. return
  413. }
  414. func (s *Service) uptSearchDmState(c context.Context, tp int32, state int32, dmidM map[int64][]int64) (err error) {
  415. if err = s.dao.UpSearchDMState(c, tp, state, dmidM); err != nil {
  416. return
  417. }
  418. if err = s.dao.UpSearchRecentDMState(c, tp, state, dmidM); err != nil {
  419. return
  420. }
  421. return
  422. }
  423. func (s *Service) uptSearchDMPool(c context.Context, tp int32, oid int64, pool int32, dmids []int64) (err error) {
  424. if err = s.dao.UpSearchDMPool(c, tp, oid, pool, dmids); err != nil {
  425. return
  426. }
  427. if err = s.dao.UpSearchRecentDMPool(c, tp, oid, pool, dmids); err != nil {
  428. return
  429. }
  430. return
  431. }
  432. func (s *Service) uptSearchDMAttr(c context.Context, tp int32, oid int64, attr int32, dmids []int64) (err error) {
  433. if err = s.dao.UpSearchDMAttr(c, tp, oid, attr, dmids); err != nil {
  434. return
  435. }
  436. if err = s.dao.UpSearchRecentDMAttr(c, tp, oid, attr, dmids); err != nil {
  437. return
  438. }
  439. return
  440. }