video.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "math"
  6. "sort"
  7. "strconv"
  8. "time"
  9. "go-common/app/job/main/videoup-report/model/archive"
  10. "go-common/library/log"
  11. )
  12. // VideoReports get video report record from DB
  13. func (s *Service) VideoReports(c context.Context, t int8, stime, etime time.Time) (reports []*archive.Report, err error) {
  14. if reports, err = s.arc.Reports(c, t, stime, etime); err != nil {
  15. log.Error("s.arc.Reports(%d) err(%v)", t, err)
  16. return
  17. }
  18. return
  19. }
  20. // hdlVideoUpdateBinLog handle bilibili_archive's video table update bin log
  21. func (s *Service) hdlVideoUpdateBinLog(nMsg, oMsg []byte) {
  22. var (
  23. nv = &archive.Video{}
  24. ov = &archive.Video{}
  25. err error
  26. )
  27. if err = json.Unmarshal(nMsg, nv); err != nil {
  28. log.Error("json.Unmarshal(%s) error(%v)", nMsg, err)
  29. return
  30. }
  31. if err = json.Unmarshal(oMsg, ov); err != nil {
  32. log.Error("json.Unmarshal(%s) error(%v)", oMsg, err)
  33. return
  34. }
  35. if nv.Status != ov.Status {
  36. s.hdlVideoAudit(*nv, *ov)
  37. }
  38. if ov.XcodeState != nv.XcodeState {
  39. s.hdlXcodeTime(*nv, *ov)
  40. }
  41. // 视频状态变为待审核(视频信息改动或者一转完成)
  42. if nv.Status != ov.Status {
  43. if nv.Status == archive.VideoStatusWait { //待审核
  44. s.hdlVideoTask(context.TODO(), nv.Filename)
  45. }
  46. if nv.Status == archive.VideoStatusDelete { //视频删除
  47. s.arc.DelDispatch(context.TODO(), nv.Aid, nv.Cid)
  48. }
  49. }
  50. }
  51. // hdlVideoAudit handle video audit stats
  52. func (s *Service) hdlVideoAudit(video, oldVideo archive.Video) {
  53. var (
  54. err error
  55. arc = &archive.Archive{}
  56. )
  57. if arc, err = s.arc.ArchiveByAid(context.TODO(), video.Aid); err != nil {
  58. log.Error("s.arc.ArchiveByAid(%d) error(%v)", video.Aid, err)
  59. return
  60. }
  61. s.videoAuditCache.Lock()
  62. defer s.videoAuditCache.Unlock()
  63. if _, ok := s.videoAuditCache.Data[arc.TypeID]; !ok {
  64. s.videoAuditCache.Data[arc.TypeID] = make(map[string]int)
  65. }
  66. switch video.Status {
  67. case archive.VideoStatusWait:
  68. s.videoAuditCache.Data[arc.TypeID]["auditing"]++
  69. case archive.VideoStatusOpen:
  70. s.videoAuditCache.Data[arc.TypeID]["audited"]++
  71. }
  72. }
  73. // hdlVideoAuditCount handle audit stats count
  74. func (s *Service) hdlVideoAuditCount() {
  75. var (
  76. err error
  77. report *archive.Report
  78. ctime = time.Now()
  79. mtime = ctime
  80. bs []byte
  81. )
  82. if report, err = s.arc.ReportLast(context.TODO(), archive.ReportTypeVideoAudit); err != nil {
  83. log.Error("s.arc.ReportLast(%d) error(%v)", archive.ReportTypeVideoAudit, err)
  84. return
  85. }
  86. if report != nil && time.Now().Unix()-report.CTime.Unix() < 60*5 {
  87. log.Info("s.arc.ReportLast(%d) 距离上一次写入还没过5分钟!", archive.ReportTypeVideoAudit)
  88. return
  89. }
  90. s.videoAuditCache.Lock()
  91. defer s.videoAuditCache.Unlock()
  92. if bs, err = json.Marshal(s.videoAuditCache.Data); err != nil {
  93. log.Error("json.Marshal(%v) error(%v)", s.videoAuditCache.Data, err)
  94. return
  95. }
  96. if _, err = s.arc.ReportAdd(context.TODO(), archive.ReportTypeVideoAudit, string(bs), ctime, mtime); err != nil {
  97. log.Error("s.arc.ReportAdd(%d,%s,%v,%v) error(%v)", archive.ReportTypeVideoAudit, string(bs), ctime, mtime, err)
  98. return
  99. }
  100. s.videoAuditCache.Data = make(map[int16]map[string]int)
  101. }
  102. // VideoAudit get video audit by typeid
  103. func (s *Service) VideoAudit(c context.Context, stime, etime time.Time) (reports []*archive.Report, err error) {
  104. if reports, err = s.arc.Reports(c, archive.ReportTypeVideoAudit, stime, etime); err != nil {
  105. log.Error("s.arc.Reports(%d) err(%v)", archive.ReportTypeVideoAudit, err)
  106. return
  107. }
  108. return
  109. }
  110. // hdlXcodeTime Stats video xcode spend time.
  111. func (s *Service) hdlXcodeTime(nv, ov archive.Video) {
  112. if nv.XcodeState != archive.VideoXcodeSDFinish && nv.XcodeState != archive.VideoXcodeHDFinish && nv.XcodeState != archive.VideoDispatchFinish {
  113. return
  114. }
  115. var (
  116. nMt time.Time
  117. oMt time.Time
  118. err error
  119. )
  120. s.xcodeTimeCache.Lock()
  121. defer s.xcodeTimeCache.Unlock()
  122. if nMt, err = time.ParseInLocation("2006-01-02 15:04:05", nv.MTime, time.Local); err != nil {
  123. log.Error("time.ParseInLocation(%s) err(%v)", nv.MTime, err)
  124. return
  125. }
  126. if oMt, err = time.ParseInLocation("2006-01-02 15:04:05", ov.MTime, time.Local); err != nil {
  127. log.Error("time.ParseInLocation(%s) err(%v)", ov.MTime, err)
  128. return
  129. }
  130. t := int(nMt.Unix() - oMt.Unix())
  131. if t <= 0 {
  132. log.Info("warning: xcode spend time: %d", t)
  133. return
  134. }
  135. s.xcodeTimeCache.Data[nv.XcodeState] = append(s.xcodeTimeCache.Data[nv.XcodeState], t)
  136. }
  137. // hdlXcodeStats handle calculate and save hdlXcodeTime() stats result
  138. func (s *Service) hdlXcodeStats() {
  139. var (
  140. c = context.TODO()
  141. states = []int8{archive.VideoXcodeSDFinish, archive.VideoXcodeHDFinish, archive.VideoDispatchFinish} //xcode states need stats
  142. levels = []int8{50, 60, 80, 90}
  143. xcodeStats = make(map[int8]map[string]int)
  144. bs []byte
  145. err error
  146. ctime = time.Now()
  147. mtime = ctime
  148. )
  149. for _, st := range states {
  150. if _, ok := s.xcodeTimeCache.Data[st]; !ok {
  151. continue
  152. }
  153. sort.Ints(s.xcodeTimeCache.Data[st])
  154. seconds := s.xcodeTimeCache.Data[st]
  155. if len(seconds) < 1 {
  156. continue
  157. }
  158. for _, l := range levels {
  159. m := "m" + strconv.Itoa(int(l))
  160. o := int(math.Floor(float64(len(seconds))*(float64(l)/100)+0.5)) - 1 //seconds offset
  161. if o < 0 {
  162. continue
  163. }
  164. if o < 0 || o >= len(seconds) {
  165. log.Error("s.hdlVideoXcodeStats() index out of range. seconds(%d)", o)
  166. continue
  167. }
  168. if _, ok := xcodeStats[st]; !ok {
  169. xcodeStats[st] = make(map[string]int)
  170. }
  171. xcodeStats[st][m] = seconds[o]
  172. }
  173. }
  174. if bs, err = json.Marshal(xcodeStats); err != nil {
  175. log.Error("s.hdlVideoXcodeStats() json.Marshal error(%v)", err)
  176. return
  177. }
  178. log.Info("s.hdlVideoXcodeStats() end xcode stats xcodeStats:%s", bs)
  179. if len(xcodeStats) < 1 {
  180. log.Info("s.hdlVideoXcodeStats() end xcode stats ignore empty data")
  181. return
  182. }
  183. if _, err = s.arc.ReportAdd(c, archive.ReportTypeXcode, string(bs), ctime, mtime); err != nil {
  184. log.Error("s.hdlVideoXcodeStats() s.arc.ReportAdd error(%v)", err)
  185. return
  186. }
  187. s.xcodeTimeCache.Lock()
  188. defer s.xcodeTimeCache.Unlock()
  189. s.xcodeTimeCache.Data = make(map[int8][]int)
  190. }
  191. // hdlTraffic Calculate how long it took to check video flow in ten minutes.
  192. // Stats result include sd_xcode,video check,hd_xcode,dispatch time.
  193. func (s *Service) hdlTraffic() {
  194. var (
  195. err error
  196. ctx = context.TODO()
  197. report *archive.Report //Single report type
  198. reports []*archive.Report //Report type slice
  199. tooks []*archive.TaskTook //Task took time stats
  200. statsCache = make(map[int8]map[string][]int) //Event took time list
  201. traffic = make(map[int8]map[string]int) //Event took time stats result
  202. bs []byte //Json byte
  203. ctime = time.Now() //Stats create time
  204. mtime = ctime //Stats modify time
  205. states = []int8{archive.VideoUploadInfo, archive.VideoXcodeSDFinish, archive.VideoXcodeHDFinish, archive.VideoDispatchFinish} //xcode states need stats
  206. )
  207. //0.Get the last report write time. If less than 10 minutes, then return.
  208. if report, err = s.arc.ReportLast(ctx, archive.ReportTypeTraffic); err != nil {
  209. log.Error("s.arc.ReportLast(%d) error(%v)", archive.ReportTypeTraffic, err)
  210. return
  211. }
  212. if report != nil && time.Now().Unix()-report.CTime.Unix() < 60*6 {
  213. log.Info("s.arc.ReportLast(%d) 距离上一次写入还没过6分钟!", archive.ReportTypeTraffic)
  214. return
  215. }
  216. now := time.Now()
  217. stime := now.Add(-10 * time.Minute)
  218. //1.Get video task time stats.
  219. if tooks, err = s.arc.TaskTooks(ctx, stime); err != nil {
  220. log.Error("s.arc.TaskTooks(%v) error(%v)", stime, err)
  221. return
  222. }
  223. statsCache[archive.VideoUploadInfo] = make(map[string][]int)
  224. for _, v := range tooks {
  225. statsCache[archive.VideoUploadInfo]["m50"] = append(statsCache[archive.VideoUploadInfo]["m50"], v.M50)
  226. statsCache[archive.VideoUploadInfo]["m60"] = append(statsCache[archive.VideoUploadInfo]["m60"], v.M60)
  227. statsCache[archive.VideoUploadInfo]["m80"] = append(statsCache[archive.VideoUploadInfo]["m80"], v.M80)
  228. statsCache[archive.VideoUploadInfo]["m90"] = append(statsCache[archive.VideoUploadInfo]["m90"], v.M90)
  229. }
  230. //2.Get sd_xcode,hd_xcode,dispatch time stats.
  231. if reports, err = s.arc.Reports(ctx, archive.ReportTypeXcode, stime, now); err != nil {
  232. log.Error("s.arc.Reports(%d) err(%v)", archive.ReportTypeXcode, err)
  233. return
  234. }
  235. xcodeStats := make(map[int8]map[string]int)
  236. for _, v := range reports {
  237. err = json.Unmarshal([]byte(v.Content), &xcodeStats)
  238. if err != nil {
  239. log.Error("json.Unmarshal(%s) err(%v)", v.Content, err)
  240. continue
  241. }
  242. for state, stats := range xcodeStats {
  243. if _, ok := statsCache[state]; !ok {
  244. statsCache[state] = make(map[string][]int)
  245. }
  246. totalTime := 0
  247. for level, val := range stats {
  248. totalTime += val
  249. statsCache[state][level] = append(statsCache[state][level], val)
  250. }
  251. }
  252. }
  253. //3.Calculate total time stats.
  254. for state, stats := range statsCache {
  255. for level, vals := range stats {
  256. total := 0
  257. for _, v := range vals {
  258. total += v
  259. }
  260. if _, ok := traffic[state]; !ok {
  261. traffic[state] = make(map[string]int)
  262. }
  263. traffic[state][level] = total / len(vals)
  264. }
  265. }
  266. //4.Save stats result
  267. if len(traffic) < 1 {
  268. log.Info("s.hdlTraffic() end traffic stats ignore empty data")
  269. return
  270. }
  271. if bs, err = json.Marshal(traffic); err != nil {
  272. log.Error("s.hdlTraffic() json.Marshal error(%v)", err)
  273. return
  274. }
  275. log.Info("s.hdlTraffic() end traffic stats traffic:%s", bs)
  276. if _, err = s.arc.ReportAdd(ctx, archive.ReportTypeTraffic, string(bs), ctime, mtime); err != nil {
  277. log.Error("s.hdlVideoXcodeStats() s.arc.ReportAdd error(%v)", err)
  278. return
  279. }
  280. //5.Update video traffic jam time
  281. jamTime := 0
  282. stateOk := true
  283. for _, s := range states {
  284. if _, ok := traffic[s]; !ok {
  285. stateOk = false
  286. break
  287. }
  288. if _, ok := traffic[s]["m60"]; !ok {
  289. stateOk = false
  290. break
  291. }
  292. if _, ok := traffic[s]["m80"]; !ok {
  293. stateOk = false
  294. break
  295. }
  296. jamTime += traffic[s]["m60"]
  297. jamTime += traffic[s]["m80"]
  298. }
  299. if !stateOk {
  300. log.Error("s.hdlTraffic() 一审耗时计算失败!traffic:%v", traffic)
  301. } else {
  302. err = s.redis.SetVideoJam(ctx, jamTime)
  303. log.Info("s.hdlTraffic() s.redis.SetVideoJam(%d)", jamTime)
  304. if err != nil {
  305. log.Error("s.hdlTraffic() 更新Redis失败!error(%v)", err)
  306. }
  307. }
  308. }
  309. func (s *Service) putVideoChan(action string, nwMsg []byte, oldMsg []byte) {
  310. var (
  311. err error
  312. chanSize = int64(s.c.ChanSize)
  313. )
  314. nw := &archive.Video{}
  315. if err = json.Unmarshal(nwMsg, nw); err != nil {
  316. log.Error("json.Unmarshal(%s) error(%v)", nwMsg, err)
  317. return
  318. }
  319. switch action {
  320. case _insertAct:
  321. s.videoUpInfoChs[nw.Aid%chanSize] <- &archive.VideoUpInfo{Nw: nw, Old: nil}
  322. case _updateAct:
  323. old := &archive.Video{}
  324. if err = json.Unmarshal(oldMsg, old); err != nil {
  325. log.Error("json.Unmarshal(%s) error(%v)", oldMsg, err)
  326. return
  327. }
  328. s.videoUpInfoChs[nw.Aid%chanSize] <- &archive.VideoUpInfo{Nw: nw, Old: old}
  329. }
  330. }
  331. func (s *Service) upVideoproc(k int) {
  332. defer s.waiter.Done()
  333. for {
  334. var (
  335. ok bool
  336. upInfo *archive.VideoUpInfo
  337. )
  338. if upInfo, ok = <-s.videoUpInfoChs[k]; !ok {
  339. log.Info("s.videoUpInfoCh[%d] closed", k)
  340. return
  341. }
  342. s.trackVideo(upInfo.Nw, upInfo.Old)
  343. go s.hdlMonitorVideo(upInfo.Nw, upInfo.Old)
  344. }
  345. }