video.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. package service
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "go-common/app/job/bbq/video/dao"
  7. searchv1 "go-common/app/service/bbq/search/api/grpc/v1"
  8. "go-common/library/log"
  9. "io"
  10. "net/url"
  11. "os"
  12. "strconv"
  13. "time"
  14. )
  15. const (
  16. _retryTimes = 3
  17. _selection = 5 //运营精选状态
  18. )
  19. //taskCheckVideoDBVSES 视频全量脚本
  20. func (s *Service) taskSyncVideo2ES() {
  21. var step time.Duration
  22. var id int64
  23. for {
  24. ids, videos, err := s.dao.VideoList(context.Background(), id)
  25. if err != nil {
  26. log.Error("sync video err(%v)", err)
  27. return
  28. }
  29. if videos == nil {
  30. return
  31. }
  32. videoStatisticsHive, _ := s.dao.VideoStatisticsHiveList(context.Background(), ids)
  33. videoStatistics, _ := s.dao.VideoStatisticsList(context.Background(), ids)
  34. videoTags, _ := s.dao.VideoTagsList(context.Background(), ids)
  35. req := new(searchv1.SaveVideoRequest)
  36. for _, v := range videos {
  37. fmt.Println(v.SVID)
  38. id = v.SVID
  39. tmp := &searchv1.VideoESInfo{
  40. SVID: v.SVID,
  41. Title: v.Title,
  42. Content: v.Content,
  43. MID: v.MID,
  44. CID: v.CID,
  45. Pubtime: int64(v.Pubtime),
  46. Ctime: int64(v.Ctime),
  47. Mtime: int64(v.Mtime),
  48. Duration: v.Duration,
  49. Original: v.Original,
  50. State: v.State,
  51. VerID: v.VerID,
  52. Ver: v.Ver,
  53. From: v.From,
  54. AVID: v.AVID,
  55. Tid: v.Tid,
  56. SubTid: v.SubTid,
  57. ISFullScreen: v.ISFullScreen,
  58. Score: v.Score,
  59. }
  60. if videoStatisticsHive[id] != nil {
  61. tmp.PlayHive = videoStatisticsHive[id].PlayHive
  62. tmp.FavHive = videoStatisticsHive[id].FavHive
  63. tmp.CoinHive = videoStatisticsHive[id].CoinHive
  64. tmp.SubtitlesHive = videoStatisticsHive[id].SubtitlesHive
  65. tmp.LikesHive = videoStatisticsHive[id].LikesHive
  66. tmp.ShareHive = videoStatisticsHive[id].ShareHive
  67. tmp.ReportHive = videoStatisticsHive[id].ReportHive
  68. tmp.DurationDailyHive = videoStatisticsHive[id].DurationDailyHive
  69. tmp.DurationAllHive = videoStatisticsHive[id].DurationAllHive
  70. tmp.ReplyHive = videoStatisticsHive[id].ReplyHive
  71. tmp.ShareDailyHive = videoStatisticsHive[id].ShareDailyHive
  72. tmp.PlayDailyHive = videoStatisticsHive[id].PlayDailyHive
  73. tmp.SubtitlesDailyHive = videoStatisticsHive[id].SubtitlesDailyHive
  74. tmp.LikesDailyHive = videoStatisticsHive[id].LikesDailyHive
  75. tmp.FavDailyHive = videoStatisticsHive[id].FavDailyHive
  76. tmp.ReplyDailyHive = videoStatisticsHive[id].ReplyDailyHive
  77. tmp.AccessHive = videoStatisticsHive[id].AccessHive
  78. }
  79. if videoStatistics[id] != nil {
  80. tmp.Play = videoStatistics[id].Play
  81. tmp.Subtitles = videoStatistics[id].Subtitles
  82. tmp.Like = videoStatistics[id].Like
  83. tmp.Share = videoStatistics[id].Share
  84. tmp.Report = videoStatistics[id].Report
  85. }
  86. if videoTags[id] != nil {
  87. tmp.Tags = videoTags[id]
  88. }
  89. req.List = append(req.List, tmp)
  90. }
  91. step = 1
  92. for {
  93. if _, err := s.dao.SearchClient.SaveVideo(context.Background(), req); err != nil {
  94. time.Sleep(step * time.Second)
  95. step++
  96. continue
  97. }
  98. break
  99. }
  100. }
  101. }
  102. //SaveVideo2ES 保存视频到es
  103. func (s *Service) SaveVideo2ES(ids string) (res bool) {
  104. res = true
  105. if len(ids) == 0 {
  106. return
  107. }
  108. videos, err := s.dao.VideoListByIDs(context.Background(), ids)
  109. if err != nil || videos == nil {
  110. res = false
  111. return
  112. }
  113. videoStatisticsHive, _ := s.dao.VideoStatisticsHiveList(context.Background(), ids)
  114. videoStatistics, _ := s.dao.VideoStatisticsList(context.Background(), ids)
  115. // videoTags, _ := s.dao.VideoTagsList(context.Background(), ids)
  116. var step time.Duration
  117. var id int64
  118. req := new(searchv1.SaveVideoRequest)
  119. for _, v := range videos {
  120. id = v.SVID
  121. fmt.Println(id)
  122. tmp := &searchv1.VideoESInfo{
  123. SVID: v.SVID,
  124. Title: v.Title,
  125. Content: v.Content,
  126. MID: v.MID,
  127. CID: v.CID,
  128. Pubtime: int64(v.Pubtime),
  129. Ctime: int64(v.Ctime),
  130. Mtime: int64(v.Mtime),
  131. Duration: v.Duration,
  132. Original: v.Original,
  133. State: v.State,
  134. VerID: v.VerID,
  135. Ver: v.Ver,
  136. From: v.From,
  137. AVID: v.AVID,
  138. Tid: v.Tid,
  139. SubTid: v.SubTid,
  140. ISFullScreen: v.ISFullScreen,
  141. Score: v.Score,
  142. }
  143. if videoStatisticsHive[id] != nil {
  144. tmp.PlayHive = videoStatisticsHive[id].PlayHive
  145. tmp.FavHive = videoStatisticsHive[id].FavHive
  146. tmp.CoinHive = videoStatisticsHive[id].CoinHive
  147. tmp.SubtitlesHive = videoStatisticsHive[id].SubtitlesHive
  148. tmp.LikesHive = videoStatisticsHive[id].LikesHive
  149. tmp.ShareHive = videoStatisticsHive[id].ShareHive
  150. tmp.ReportHive = videoStatisticsHive[id].ReportHive
  151. tmp.DurationDailyHive = videoStatisticsHive[id].DurationDailyHive
  152. tmp.DurationAllHive = videoStatisticsHive[id].DurationAllHive
  153. tmp.ReplyHive = videoStatisticsHive[id].ReplyHive
  154. tmp.ShareDailyHive = videoStatisticsHive[id].ShareDailyHive
  155. tmp.PlayDailyHive = videoStatisticsHive[id].PlayDailyHive
  156. tmp.SubtitlesDailyHive = videoStatisticsHive[id].SubtitlesDailyHive
  157. tmp.LikesDailyHive = videoStatisticsHive[id].LikesDailyHive
  158. tmp.FavDailyHive = videoStatisticsHive[id].FavDailyHive
  159. tmp.ReplyDailyHive = videoStatisticsHive[id].ReplyDailyHive
  160. tmp.AccessHive = videoStatisticsHive[id].AccessHive
  161. }
  162. if videoStatistics[id] != nil {
  163. tmp.Play = videoStatistics[id].Play
  164. tmp.Subtitles = videoStatistics[id].Subtitles
  165. tmp.Like = videoStatistics[id].Like
  166. tmp.Share = videoStatistics[id].Share
  167. tmp.Report = videoStatistics[id].Report
  168. }
  169. // if videoTags[id] != nil {
  170. // tmp.Tags = videoTags[id]
  171. // }
  172. req.List = append(req.List, tmp)
  173. }
  174. step = 1
  175. for {
  176. if _, err := s.dao.SearchClient.SaveVideo(context.Background(), req); err != nil {
  177. if step == 11 {
  178. log.Error("save es err(%v) ids(%s)", err, ids)
  179. res = false
  180. break
  181. }
  182. time.Sleep(step * time.Second)
  183. step++
  184. continue
  185. }
  186. break
  187. }
  188. return
  189. }
  190. func formArrayString(arr []int64) string {
  191. var res string
  192. for i, v := range arr {
  193. if i != 0 {
  194. res += ","
  195. }
  196. res += strconv.FormatInt(v, 10)
  197. }
  198. return res
  199. }
  200. //deltaSync2ES 为不同表进行增量同步的脚本,baseTableQuery指明不同表的查询语句
  201. func (s *Service) deltaSync2ES(taskName string, baseTableQuery string) {
  202. task, err := s.dao.RawCheckTask(context.Background(), taskName)
  203. if err != nil {
  204. log.Error("get last_chek_time fail: task=%s", taskName)
  205. return
  206. }
  207. log.Info("get last_chek_time succ: task=%s, last_check_time=%d", taskName, task.LastCheck)
  208. // 获得所有变更的svid
  209. ids, mtime, err := s.dao.RawGetIDByMtime(baseTableQuery, task.LastCheck)
  210. if err != nil {
  211. log.Error("get raw id by mtime fail: task=%s, last_mtime=%d, base_table_query=%s",
  212. taskName, mtime, baseTableQuery)
  213. return
  214. }
  215. idsNum := len(ids)
  216. log.Info("get changed svids: task=%s, id_num=%d", taskName, idsNum)
  217. if idsNum == 0 {
  218. return
  219. }
  220. task.LastCheck = mtime
  221. // 对所有变更的svid分批次进行同步到es
  222. for i := 0; i < idsNum; i += dao.MaxSyncESNum {
  223. last := i + dao.MaxSyncESNum
  224. if last > idsNum {
  225. last = idsNum
  226. }
  227. selectedIDs := ids[i:last]
  228. idsStr := formArrayString(selectedIDs)
  229. if res := s.SaveVideo2ES(idsStr); !res {
  230. log.Error("sync video 2 es fail: task=%s, offset=%d, id_num=%d, base_table_query=%s",
  231. taskName, i, idsNum, baseTableQuery)
  232. return
  233. }
  234. log.Info("one sync video 2 es: task=%s, offset=%d, id_num=%d", taskName, i, idsNum)
  235. }
  236. // 更新task最近check的时间点
  237. if _, err := s.dao.UpdateTaskLastCheck(context.Background(), taskName, task.LastCheck); err != nil {
  238. log.Error("update task last check time fail: task=%s, last_mtime=%d, base_table_query=%s",
  239. taskName, task.LastCheck, baseTableQuery)
  240. return
  241. }
  242. log.Info("sync video 2 es: task=%s, id_num=%d, last_mtime=%d", taskName, idsNum, task.LastCheck)
  243. }
  244. //taskCheckVideo video表增量脚本
  245. func (s *Service) taskCheckVideo() {
  246. taskName := "checkVideo"
  247. s.deltaSync2ES(taskName, dao.QueryVideoByMtime)
  248. }
  249. //taskCheckVideoStatistics video_statistics表增量脚本
  250. func (s *Service) taskCheckVideoStatistics() {
  251. taskName := "checkVideoSt"
  252. s.deltaSync2ES(taskName, dao.QueryVideoStatisticsByMtime)
  253. }
  254. //taskCheckVideoStatisticsHive video_statistics_hive表增量脚本
  255. func (s *Service) taskCheckVideoStatisticsHive() {
  256. taskName := "checkVideoStHv"
  257. s.deltaSync2ES(taskName, dao.QueryVideoStatisticsHiveByMtime)
  258. }
  259. //taskCheckVideoTag video_tag表增量脚本
  260. func (s *Service) taskCheckVideoTag() {
  261. taskName := "checkVideoTag"
  262. s.deltaSync2ES(taskName, dao.QueryVideoTagByMtime)
  263. }
  264. //taskCheckTag tag表增量脚本
  265. func (s *Service) taskCheckTag() {
  266. taskName := "checkTag"
  267. task, err := s.dao.RawCheckTask(context.Background(), taskName)
  268. if err != nil {
  269. log.Error("get last_chek_time fail: task=%s", taskName)
  270. return
  271. }
  272. log.Info("get last_chek_time succ: task=%s, last_check_time=%d", taskName, task.LastCheck)
  273. for {
  274. ids, mtime, err := s.dao.RawTagByMtime(context.Background(), task.LastCheck)
  275. if err != nil || len(ids) == 0 {
  276. return
  277. }
  278. id := int64(0)
  279. for {
  280. svids, temp, err := s.dao.RawVideoTagByIDs(context.Background(), ids, id)
  281. if err != nil {
  282. return
  283. }
  284. if len(svids) == 0 {
  285. break
  286. }
  287. if flag := s.SaveVideo2ES(svids); !flag {
  288. return
  289. }
  290. id = temp
  291. }
  292. if num, err := s.dao.UpdateTaskLastCheck(context.Background(), taskName, mtime); err != nil || num == 0 {
  293. return
  294. }
  295. task.LastCheck = mtime
  296. }
  297. }
  298. // taskRmInvalidES 删除es中多余的视频
  299. func (s *Service) taskRmInvalidES() {
  300. fmt.Println("aaa")
  301. esReq := new(searchv1.ESVideoDataRequest)
  302. delReq := new(searchv1.DelVideoBySVIDRequest)
  303. svid := int64(0)
  304. query := `{"query":{"range":{"svid":{"gt":%d}}},"sort":[{"svid":"asc"}],"from":0,"size":10}`
  305. for {
  306. esReq.Query = fmt.Sprintf(query, svid)
  307. res, err := s.dao.SearchClient.ESVideoData(context.Background(), esReq)
  308. if err != nil {
  309. return
  310. }
  311. svids := make([]string, 0)
  312. for _, v := range res.List {
  313. svids = append(svids, strconv.Itoa(int(v.SVID)))
  314. svid = v.SVID
  315. }
  316. vs, err := s.dao.RawVideoBySVIDS(context.Background(), svids)
  317. if err != nil {
  318. return
  319. }
  320. notList := make([]int64, 0)
  321. for _, v := range res.List {
  322. if _, ok := vs[v.SVID]; !ok {
  323. fmt.Println(v.SVID)
  324. notList = append(notList, v.SVID)
  325. }
  326. }
  327. if len(notList) != 0 {
  328. delReq.SVIDs = notList
  329. s.dao.SearchClient.DelVideoBySVID(context.Background(), delReq)
  330. }
  331. }
  332. }
  333. func (s *Service) commitCID() {
  334. ctx := context.Background()
  335. path := s.c.URLs["bvc_push"]
  336. if path == "" {
  337. return
  338. }
  339. srcPath := s.c.Path["cids"]
  340. if srcPath == "" {
  341. return
  342. }
  343. if srcPath == "" {
  344. log.Error("sugsrc path is empty")
  345. return
  346. }
  347. src, err := os.Open(srcPath)
  348. if err != nil {
  349. log.Error("writeSug os.Open source sug error(%v)", err)
  350. return
  351. }
  352. defer src.Close()
  353. br := bufio.NewReader(src)
  354. i := 1
  355. for {
  356. a, _, c := br.ReadLine()
  357. if c == io.EOF {
  358. break
  359. }
  360. cid, err := strconv.ParseInt(string(a), 10, 64)
  361. if err != nil {
  362. log.Error("parse err [%v]", err)
  363. continue
  364. }
  365. svid, err := s.dao.GetSvidByCid(ctx, cid)
  366. if err != nil {
  367. continue
  368. }
  369. params := url.Values{}
  370. params.Set("svid", strconv.FormatInt(svid, 10))
  371. params.Set("cid", string(a))
  372. req, err := s.dao.HTTPClient.NewRequest("GET", path, "", params)
  373. if err != nil {
  374. log.Error("error(%v)", err)
  375. continue
  376. }
  377. var res struct {
  378. Code int `json:"code"`
  379. Msg string `json:"message"`
  380. }
  381. if err = s.dao.HTTPClient.Do(ctx, req, &res); err != nil {
  382. log.Errorv(ctx, log.KV("log", fmt.Sprintf("err[%v]", err)))
  383. continue
  384. }
  385. if res.Code != 0 {
  386. log.Errorv(ctx, log.KV("log", fmt.Sprintf("error(%v)", err)))
  387. } else {
  388. log.Info("commit svid:%d cid:%d success No.%d", svid, cid, i)
  389. }
  390. i++
  391. }
  392. }