cover.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package service
  2. import (
  3. "context"
  4. "strings"
  5. "time"
  6. "go-common/app/job/live/dao-anchor-job/internal/dao"
  7. "go-common/library/sync/errgroup"
  8. "go-common/library/log"
  9. )
  10. //封面图/关键帧相关脚本
  11. const ROOM_LEN_KEY_FRAME = 500
  12. //updateKeyFrame 更新关键帧
  13. func (s *Service) updateKeyFrame() {
  14. ctx := context.TODO()
  15. ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
  16. defer cancel()
  17. log.Info("updateKeyFrame_start")
  18. //获取全量开播房间
  19. allLiveingRoom, err := s.dao.GetAllLiveRoomIds(ctx)
  20. if allLiveingRoom == nil || err != nil {
  21. log.Error("updateKeyFrame_allLiveingRoom_error:reply=%v;err=%v", allLiveingRoom, err)
  22. return
  23. }
  24. slice := make([]int64, 0)
  25. eg := errgroup.Group{}
  26. for i := 0; i < len(allLiveingRoom); {
  27. end := ROOM_LEN_KEY_FRAME + i
  28. if (ROOM_LEN_KEY_FRAME + i) >= len(allLiveingRoom) {
  29. end = len(allLiveingRoom)
  30. }
  31. slice = allLiveingRoom[i:end]
  32. if len(slice) <= 0 {
  33. break
  34. } else {
  35. eg.Go(func(sliceParam []int64) func() error {
  36. return func() (err error) {
  37. for _, roomId := range sliceParam {
  38. coverUrl, err := s.dealKeyFrame(ctx, roomId)
  39. if err != nil {
  40. time.Sleep(time.Second)
  41. log.Error("updateKeyFrame_deal_error:roomId=%d;ketFrame=%s", roomId, coverUrl)
  42. continue
  43. }
  44. if coverUrl == "" {
  45. continue
  46. }
  47. //更新关键帧
  48. coverUrlArr := strings.Split(coverUrl, "?")
  49. coverUrl = coverUrlArr[0] + "?" + time.Now().Format("01021504")
  50. s.dao.UpdateRoomEx(ctx, roomId, []string{"keyframe"}, coverUrl)
  51. time.Sleep(time.Millisecond * 10)
  52. }
  53. return
  54. }
  55. }(slice))
  56. }
  57. i = end
  58. }
  59. eg.Wait()
  60. log.Info("updateKeyFrame_end")
  61. return
  62. }
  63. func (s *Service) dealKeyFrame(ctx context.Context, roomId int64) (coverUrl string, err error) {
  64. //二次确认是否关播,关播不再做
  65. roomInfos, err := s.dao.GetInfosByRoomIds(ctx, []int64{roomId}, []string{"live_status"})
  66. if err != nil {
  67. log.Error("updateKeyFrame_GetInfosByRoomIds_error:room_id=%d;err=%v", roomId, err)
  68. return
  69. }
  70. roomInfo := roomInfos[roomId]
  71. //未开播,不更新关键帧
  72. if roomInfo == nil || roomInfo.LiveStatus != dao.LIVE_OPEN {
  73. return
  74. }
  75. //判断是否为pk房间,pk房间不更新关键帧
  76. pkReply, err := s.dao.GetPkStatus(ctx, roomId)
  77. if err != nil {
  78. log.Error("updateKeyFrame_GetPkStatus_error:room_id=%d", roomId)
  79. return
  80. }
  81. if pkReply.PkStatus > 0 {
  82. return
  83. }
  84. //获取关键帧
  85. startTime := time.Now().Add(-time.Minute)
  86. endTime := time.Now()
  87. pics, err := s.dao.GetPicsByRoomId(ctx, roomId, startTime, endTime)
  88. if err != nil || pics == nil || pics[0] == "" {
  89. log.Warn("updateKeyFrame_GetPicsByRoomId_error:room_id=%d;pics=%v;err=%v", roomId, pics, err)
  90. return
  91. }
  92. //上传至bfs
  93. reply, err := s.dao.ImgDownload(ctx, pics[0])
  94. if err != nil || reply == nil {
  95. log.Warn("updateKeyFrame_ImgDownload_error:room_id=%d;pic=%s;err=%v;reply=%v", roomId, pics[0], err, reply)
  96. return
  97. }
  98. coverUrl, err = s.dao.ImgUpload(ctx, roomId, pics[0], reply)
  99. if err != nil || coverUrl == "" {
  100. log.Error("updateKeyFrame_ImgUploadBfs_error:room_id=%d;pic=%s", roomId, pics[0])
  101. return
  102. }
  103. return
  104. }