bnj.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "math/rand"
  6. "regexp"
  7. "strings"
  8. "time"
  9. "go-common/app/job/main/dm2/model"
  10. "go-common/app/service/main/archive/api"
  11. arcMdl "go-common/app/service/main/archive/model/archive"
  12. filterMdl "go-common/app/service/main/filter/api/grpc/v1"
  13. "go-common/library/ecode"
  14. "go-common/library/log"
  15. "go-common/library/net/metadata"
  16. "go-common/library/queue/databus"
  17. xtime "go-common/library/time"
  18. )
  19. var (
  20. msgRegex = regexp.MustCompile(`^(\s|\xE3\x80\x80)*$`) // 全文仅空格
  21. _bnjDmMsgLen = 100
  22. _dateFormat = "2006-01-02 15:04:05"
  23. )
  24. func init() {
  25. rand.Seed(time.Now().Unix())
  26. }
  27. func (s *Service) initBnj() {
  28. var err error
  29. if s.conf.BNJ.Aid <= 0 {
  30. return
  31. }
  32. s.bnjAid = s.conf.BNJ.Aid
  33. //bnj count
  34. if s.conf.BNJ.BnjCounter != nil {
  35. bnjSubAids := make(map[int64]struct{})
  36. for _, aid := range s.conf.BNJ.BnjCounter.SubAids {
  37. bnjSubAids[aid] = struct{}{}
  38. }
  39. s.bnjSubAids = bnjSubAids
  40. }
  41. // bnj danmu
  42. s.bnjVideos(context.TODO())
  43. s.bnjLiveConfig(context.TODO())
  44. go func() {
  45. ticker := time.NewTicker(time.Second * 30)
  46. for range ticker.C {
  47. s.bnjVideos(context.TODO())
  48. s.bnjLiveConfig(context.TODO())
  49. }
  50. }()
  51. s.bnjIgnoreRate = s.conf.BNJ.BnjLiveDanmu.IgnoreRate
  52. s.bnjIgnoreBeginTime = time.Duration(s.conf.BNJ.BnjLiveDanmu.IgnoreBegin)
  53. s.bnjIgnoreEndTime = time.Duration(s.conf.BNJ.BnjLiveDanmu.IgnoreEnd)
  54. s.bnjliveRoomID = s.conf.BNJ.BnjLiveDanmu.RoomID
  55. s.bnjUserLevel = s.conf.BNJ.BnjLiveDanmu.Level
  56. if s.bnjStart, err = time.ParseInLocation(_dateFormat, s.conf.BNJ.BnjLiveDanmu.Start, time.Now().Location()); err != nil {
  57. panic(err)
  58. }
  59. s.bnjCsmr = databus.New(s.conf.Databus.BnjCsmr)
  60. log.Info("bnj init start:%v room_id:%v", s.bnjStart.String(), s.conf.BNJ.BnjLiveDanmu.RoomID)
  61. go s.bnjProc()
  62. }
  63. func (s *Service) bnjProc() {
  64. var (
  65. err error
  66. c = context.Background()
  67. )
  68. for {
  69. msg, ok := <-s.bnjCsmr.Messages()
  70. if !ok {
  71. log.Error("bnj bnjProc consumer exit")
  72. return
  73. }
  74. log.Info("bnj partition:%d,offset:%d,key:%s,value:%s", msg.Partition, msg.Offset, msg.Key, msg.Value)
  75. m := &model.LiveDanmu{}
  76. if err = json.Unmarshal(msg.Value, m); err != nil {
  77. log.Error("json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  78. continue
  79. }
  80. if err = s.bnjLiveDanmu(c, m); err != nil {
  81. log.Error("bnj bnjLiveDanmu(msg:%+v),error(%v)", m, err)
  82. continue
  83. }
  84. if err = msg.Commit(); err != nil {
  85. log.Error("commit offset(%v) error(%v)", msg, err)
  86. }
  87. }
  88. }
  89. func (s *Service) bnjVideos(c context.Context) (err error) {
  90. var (
  91. videos []*model.Video
  92. )
  93. if videos, err = s.dao.Videos(c, s.bnjAid); err != nil {
  94. log.Error("bnj bnjVideos(aid:%v) error(%v)", s.bnjAid, err)
  95. return
  96. }
  97. if len(videos) >= 4 {
  98. videos = videos[:4]
  99. }
  100. for _, video := range videos {
  101. if err = s.syncBnjVideo(c, model.SubTypeVideo, video); err != nil {
  102. log.Error("bnj syncBnjVideo(video:%+v) error(%v)", video, err)
  103. return
  104. }
  105. }
  106. s.bnjArcVideos = videos
  107. return
  108. }
  109. func (s *Service) syncBnjVideo(c context.Context, tp int32, v *model.Video) (err error) {
  110. sub, err := s.dao.Subject(c, tp, v.Cid)
  111. if err != nil {
  112. return
  113. }
  114. if sub == nil {
  115. if v.XCodeState >= model.VideoXcodeHDFinish {
  116. if _, err = s.dao.AddSubject(c, tp, v.Cid, v.Aid, v.Mid, s.maxlimit(v.Duration), 0); err != nil {
  117. return
  118. }
  119. }
  120. } else {
  121. if sub.Mid != v.Mid {
  122. if _, err = s.dao.UpdateSubMid(c, tp, v.Cid, v.Mid); err != nil {
  123. return
  124. }
  125. }
  126. }
  127. return
  128. }
  129. // bnjDmCount laji bnj count
  130. func (s *Service) bnjDmCount(c context.Context, sub *model.Subject, dm *model.DM) (err error) {
  131. var (
  132. dmid int64
  133. pages []*api.Page
  134. chosen *api.Page
  135. choseSub *model.Subject
  136. )
  137. if _, ok := s.bnjSubAids[sub.Pid]; !ok {
  138. return
  139. }
  140. if pages, err = s.arcRPC.Page3(c, &arcMdl.ArgAid2{
  141. Aid: s.bnjAid,
  142. RealIP: metadata.String(c, metadata.RemoteIP),
  143. }); err != nil {
  144. log.Error("bnjDmCount Page3(aid:%v) error(%v)", sub.Pid, err)
  145. return
  146. }
  147. if len(pages) <= 0 {
  148. return
  149. }
  150. idx := time.Now().Unix() % int64(len(pages))
  151. if chosen = pages[idx]; chosen == nil {
  152. return
  153. }
  154. if choseSub, err = s.subject(c, model.SubTypeVideo, chosen.Cid); err != nil {
  155. return
  156. }
  157. if dmid, err = s.genDMID(c); err != nil {
  158. log.Error("bnjDmCount genDMID() error(%v)", err)
  159. return
  160. }
  161. forkDM := &model.DM{
  162. ID: dmid,
  163. Type: model.SubTypeVideo,
  164. Oid: chosen.Cid,
  165. Mid: dm.Mid,
  166. Progress: int32((chosen.Duration + 1) * 1000),
  167. Pool: dm.Pool,
  168. State: model.StateAdminDelete,
  169. Ctime: dm.Ctime,
  170. Mtime: dm.Mtime,
  171. Content: &model.Content{
  172. ID: dmid,
  173. FontSize: dm.Content.FontSize,
  174. Color: dm.Content.Color,
  175. Mode: dm.Content.Mode,
  176. IP: dm.Content.IP,
  177. Plat: dm.Content.Plat,
  178. Msg: dm.Content.Msg,
  179. Ctime: dm.Content.Ctime,
  180. Mtime: dm.Content.Mtime,
  181. },
  182. }
  183. if dm.Pool == model.PoolSpecial {
  184. forkDM.ContentSpe = &model.ContentSpecial{
  185. ID: dmid,
  186. Msg: dm.ContentSpe.Msg,
  187. Ctime: dm.ContentSpe.Ctime,
  188. Mtime: dm.ContentSpe.Mtime,
  189. }
  190. }
  191. if err = s.bnjAddDM(c, choseSub, forkDM); err != nil {
  192. return
  193. }
  194. return
  195. }
  196. // bnjAddDM add dm index and content to db by transaction.
  197. func (s *Service) bnjAddDM(c context.Context, sub *model.Subject, dm *model.DM) (err error) {
  198. if dm.State != model.StateAdminDelete {
  199. return
  200. }
  201. tx, err := s.dao.BeginTran(c)
  202. if err != nil {
  203. return
  204. }
  205. // special dm
  206. if dm.Pool == model.PoolSpecial && dm.ContentSpe != nil {
  207. if _, err = s.dao.TxAddContentSpecial(tx, dm.ContentSpe); err != nil {
  208. return tx.Rollback()
  209. }
  210. }
  211. if _, err = s.dao.TxAddContent(tx, dm.Oid, dm.Content); err != nil {
  212. return tx.Rollback()
  213. }
  214. if _, err = s.dao.TxAddIndex(tx, dm); err != nil {
  215. return tx.Rollback()
  216. }
  217. if _, err = s.dao.TxIncrSubjectCount(tx, sub.Type, sub.Oid, 1, 0, sub.Childpool); err != nil {
  218. return tx.Rollback()
  219. }
  220. return tx.Commit()
  221. }
  222. func (s *Service) genDMID(c context.Context) (dmid int64, err error) {
  223. if dmid, err = s.seqRPC.ID(c, s.seqArg); err != nil {
  224. log.Error("seqRPC.ID() error(%v)", err)
  225. return
  226. }
  227. return
  228. }
  229. // bnjLiveDanmu laji live to video
  230. // TODO stime
  231. func (s *Service) bnjLiveDanmu(c context.Context, liveDanmu *model.LiveDanmu) (err error) {
  232. var (
  233. cid, dmid int64
  234. progress float64
  235. )
  236. // ignore time before
  237. if time.Since(s.bnjStart) < 0 {
  238. return
  239. }
  240. // limit
  241. if liveDanmu == nil || s.bnjliveRoomID <= 0 || s.bnjliveRoomID != liveDanmu.RoomID || liveDanmu.MsgType != model.LiveDanmuMsgTypeNormal {
  242. return
  243. }
  244. if liveDanmu.UserLevel < s.bnjUserLevel {
  245. return
  246. }
  247. if s.bnjIgnoreRate <= 0 || rand.Int63n(s.bnjIgnoreRate) != 0 {
  248. return
  249. }
  250. if cid, progress, err = s.pickBnjVideo(c, liveDanmu.Time); err != nil {
  251. return
  252. }
  253. // ignore illegal progress
  254. if progress <= 0 {
  255. return
  256. }
  257. if err = s.checkBnjDmMsg(c, liveDanmu.Content); err != nil {
  258. log.Error("bnj bnjLiveDanmu checkBnjDmMsg(liveDanmu:%+v) error(%v)", liveDanmu, err)
  259. return
  260. }
  261. if dmid, err = s.genDMID(c); err != nil {
  262. log.Error("bnj bnjLiveDanmu genDMID() error(%v)", err)
  263. return
  264. }
  265. now := time.Now().Unix()
  266. forkDM := &model.DM{
  267. ID: dmid,
  268. Type: model.SubTypeVideo,
  269. Oid: cid,
  270. Mid: liveDanmu.UID,
  271. Progress: int32(progress * 1000),
  272. Pool: model.PoolNormal,
  273. State: model.StateMonitorAfter,
  274. Ctime: model.ConvertStime(time.Now()),
  275. Mtime: model.ConvertStime(time.Now()),
  276. Content: &model.Content{
  277. ID: dmid,
  278. FontSize: 25,
  279. Color: 16777215,
  280. Mode: model.ModeRolling,
  281. Plat: 0,
  282. Msg: liveDanmu.Content,
  283. Ctime: xtime.Time(now),
  284. Mtime: xtime.Time(now),
  285. },
  286. }
  287. if err = s.bnjCheckFilterService(c, forkDM); err != nil {
  288. log.Error("s.bnjCheckFilterService(%+v) error(%v)", forkDM, err)
  289. return
  290. }
  291. var (
  292. bs []byte
  293. )
  294. if bs, err = json.Marshal(forkDM); err != nil {
  295. log.Error("json.Marshal(%+v) error(%v)", forkDM, err)
  296. return
  297. }
  298. act := &model.Action{
  299. Action: model.ActAddDM,
  300. Data: bs,
  301. }
  302. if err = s.actionAct(c, act); err != nil {
  303. log.Error("s.actionAddDM(%+v) error(%v)", liveDanmu, err)
  304. return
  305. }
  306. return
  307. }
  308. func (s *Service) pickBnjVideo(c context.Context, timestamp int64) (cid int64, progress float64, err error) {
  309. var (
  310. idx int
  311. video *model.Video
  312. )
  313. progress = float64(timestamp - s.bnjStart.Unix())
  314. for idx, video = range s.bnjArcVideos {
  315. if progress > float64(video.Duration) {
  316. progress = progress - float64(video.Duration)
  317. continue
  318. }
  319. // ignore p1 start
  320. if idx != 0 && progress < s.bnjIgnoreBeginTime.Seconds() {
  321. err = ecode.DMProgressTooBig
  322. return
  323. }
  324. if float64(video.Duration)-progress < s.bnjIgnoreEndTime.Seconds() {
  325. err = ecode.DMProgressTooBig
  326. return
  327. }
  328. if progress >= 0 {
  329. progress = progress + float64(rand.Int31n(1000)/1000)
  330. }
  331. cid = video.Cid
  332. return
  333. }
  334. err = ecode.DMProgressTooBig
  335. return
  336. }
  337. func (s *Service) bnjCheckFilterService(c context.Context, dm *model.DM) (err error) {
  338. var (
  339. filterReply *filterMdl.FilterReply
  340. )
  341. if filterReply, err = s.filterRPC.Filter(c, &filterMdl.FilterReq{
  342. Area: "danmu",
  343. Message: dm.Content.Msg,
  344. Id: dm.ID,
  345. Oid: dm.Oid,
  346. Mid: dm.Mid,
  347. }); err != nil {
  348. log.Error("checkFilterService(dm:%+v),err(%v)", dm, err)
  349. return
  350. }
  351. if filterReply.Level > 0 || filterReply.Limit == model.SpamBlack || filterReply.Limit == model.SpamOverflow {
  352. dm.State = model.StateFilter
  353. log.Info("bnj filter service delete(dmid:%d,data:+%v)", dm.ID, filterReply)
  354. }
  355. return
  356. }
  357. func (s *Service) checkBnjDmMsg(c context.Context, msg string) (err error) {
  358. var (
  359. msgLen = len([]rune(msg))
  360. )
  361. if msgRegex.MatchString(msg) { // 空白弹幕
  362. err = ecode.DMMsgIlleagel
  363. return
  364. }
  365. if msgLen > _bnjDmMsgLen {
  366. err = ecode.DMMsgTooLong
  367. return
  368. }
  369. if strings.Contains(msg, `\n`) || strings.Contains(msg, `/n`) {
  370. err = ecode.DMMsgIlleagel
  371. return
  372. }
  373. return
  374. }
  375. func (s *Service) bnjLiveConfig(c context.Context) (err error) {
  376. var (
  377. bnjConfig *model.BnjLiveConfig
  378. start time.Time
  379. )
  380. if bnjConfig, err = s.dao.BnjConfig(c); err != nil {
  381. log.Error("bnjLiveConfig error current:%v err:%+v", time.Now().String(), err)
  382. return
  383. }
  384. if bnjConfig == nil {
  385. log.Error("bnjLiveConfig error current:%v bnjConfig nil", time.Now().String())
  386. return
  387. }
  388. if start, err = time.ParseInLocation(_dateFormat, bnjConfig.DanmuDtarTime, time.Now().Location()); err != nil {
  389. log.Error("bnjLiveConfig start time error current:%v config:%+v", time.Now().String(), bnjConfig)
  390. return
  391. }
  392. if bnjConfig.CommentID <= 0 || bnjConfig.RoomID <= 0 {
  393. log.Info("bnjLiveConfig illegal current:%v config:%+v", time.Now().String(), bnjConfig)
  394. return
  395. }
  396. s.bnjAid = bnjConfig.CommentID
  397. s.bnjliveRoomID = bnjConfig.RoomID
  398. s.bnjStart = start
  399. log.Info("bnjLiveConfig ok current:%v config:%+v", time.Now().String(), bnjConfig)
  400. return
  401. }