server.go 16 KB


  1. // Package server generate by warden_gen
  2. package grpc
  3. import (
  4. "context"
  5. "crypto/md5"
  6. "encoding/hex"
  7. "encoding/json"
  8. "fmt"
  9. "go-common/app/service/video/stream-mng/api/v1"
  10. "go-common/app/service/video/stream-mng/common"
  11. "go-common/app/service/video/stream-mng/service"
  12. "go-common/library/ecode"
  13. "go-common/library/log"
  14. nmd "go-common/library/net/metadata"
  15. "go-common/library/net/rpc/warden"
  16. "google.golang.org/grpc"
  17. "math/rand"
  18. "strconv"
  19. "strings"
  20. "time"
  21. )
  22. // New Stream warden rpc server
  23. func New(c *warden.ServerConfig, svr *service.Service) *warden.Server {
  24. //ws := warden.NewServer(c, grpc.MaxRecvMsgSize(32*1024*1024), grpc.MaxSendMsgSize(32*1024*1024)) 这里需要考虑配置问题
  25. ws := warden.NewServer(c)
  26. ws.Use(middleware())
  27. v1.RegisterStreamServer(ws.Server(), &server{svr})
  28. ws, err := ws.Start()
  29. if err != nil {
  30. panic(err)
  31. }
  32. return ws
  33. }
  34. type server struct {
  35. svr *service.Service
  36. }
  37. var _ v1.StreamServer = &server{}
  38. // GetSingleScreeShotByRoomID
  39. func (s *server) GetSingleScreeShot(ctx context.Context, req *v1.GetSingleScreeShotReq) (*v1.GetSingleScreeShotReply, error) {
  40. roomID := req.RoomId
  41. start := req.StartTime
  42. end := req.EndTime
  43. channel := req.Channel
  44. resp := &v1.GetSingleScreeShotReply{}
  45. if roomID <= 0 || start == "" || end == "" {
  46. st, _ := ecode.Error(ecode.RequestErr, "some fields are empty").WithDetails(resp)
  47. return nil, st
  48. }
  49. startTime, err := time.ParseInLocation("2006-01-02 15:04:05", start, time.Local)
  50. if err != nil {
  51. st, _ := ecode.Error(ecode.RequestErr, "Start time format is incorrect").WithDetails(resp)
  52. return nil, st
  53. }
  54. endTime, err := time.ParseInLocation("2006-01-02 15:04:05", end, time.Local)
  55. if err != nil {
  56. st, _ := ecode.Error(ecode.RequestErr, "End time format is incorrect").WithDetails(resp)
  57. return nil, st
  58. }
  59. info, err := s.svr.GetSingleScreeShot(ctx, roomID, startTime.Unix(), endTime.Unix(), channel)
  60. if err != nil {
  61. st, _ := ecode.Error(ecode.RequestErr, err.Error()).WithDetails(resp)
  62. return nil, st
  63. }
  64. return &v1.GetSingleScreeShotReply{
  65. List: info,
  66. }, nil
  67. }
  68. // GetMultiScreenShotByRommID
  69. func (s *server) GetMultiScreenShot(ctx context.Context, req *v1.GetMultiScreenShotReq) (*v1.GetMultiScreenShotReply, error) {
  70. rooms := req.RoomIds
  71. ts := req.Ts
  72. channel := req.Channel
  73. resp := &v1.GetMultiScreenShotReply{}
  74. if rooms == "" || ts == 0 {
  75. st, _ := ecode.Error(ecode.RequestErr, "some fields are empty").WithDetails(resp)
  76. return nil, st
  77. }
  78. // 切割room_id
  79. roomIDs := strings.Split(rooms, ",")
  80. if len(roomIDs) <= 0 {
  81. st, _ := ecode.Error(ecode.RequestErr, "room_ids is not right").WithDetails(resp)
  82. return nil, st
  83. }
  84. res := v1.GetMultiScreenShotReply{
  85. List: map[int64]string{},
  86. }
  87. rids := []int64{}
  88. for _, v := range roomIDs {
  89. roomID, err := strconv.ParseInt(v, 10, 64)
  90. if err != nil {
  91. continue
  92. }
  93. rids = append(rids, roomID)
  94. }
  95. urls, err := s.svr.GetMultiScreenShot(ctx, rids, ts, channel)
  96. if err != nil {
  97. st, _ := ecode.Error(ecode.RequestErr, err.Error()).WithDetails(resp)
  98. return nil, st
  99. }
  100. res.List = urls
  101. return &res, nil
  102. }
  103. // GetOriginScreenShotPic
  104. func (s *server) GetOriginScreenShotPic(ctx context.Context, req *v1.GetOriginScreenShotPicReq) (*v1.GetOriginScreenShotPicReply, error) {
  105. rooms := req.RoomIds
  106. ts := req.Ts
  107. resp := &v1.GetOriginScreenShotPicReply{}
  108. if rooms == "" || ts == 0 {
  109. st, _ := ecode.Error(ecode.RequestErr, "some fields are empty").WithDetails(resp)
  110. return nil, st
  111. }
  112. // 切割room_id
  113. roomIDs := strings.Split(rooms, ",")
  114. if len(roomIDs) <= 0 {
  115. st, _ := ecode.Error(ecode.RequestErr, "room_ids is not right").WithDetails(resp)
  116. return nil, st
  117. }
  118. res := v1.GetOriginScreenShotPicReply{
  119. List: map[int64]string{},
  120. }
  121. rids := []int64{}
  122. for _, v := range roomIDs {
  123. roomID, err := strconv.ParseInt(v, 10, 64)
  124. if err != nil {
  125. continue
  126. }
  127. rids = append(rids, roomID)
  128. }
  129. urls, err := s.svr.GetOriginScreenShotPic(ctx, rids, ts)
  130. if err != nil {
  131. st, _ := ecode.Error(ecode.RequestErr, err.Error()).WithDetails(resp)
  132. return nil, st
  133. }
  134. res.List = urls
  135. return &res, nil
  136. }
  137. // CreateOfficeStream 创建正式流
  138. func (s *server) CreateOfficalStream(ctx context.Context, req *v1.CreateOfficalStreamReq) (*v1.CreateOfficalStreamReply, error) {
  139. key := req.Key
  140. streamName := req.StreamName
  141. if req.Uid != 0 {
  142. key = mockStreamKey(fmt.Sprintf("%d", req.Uid))
  143. streamName = mockStreamName(fmt.Sprintf("%d", req.Uid))
  144. }
  145. resp := &v1.CreateOfficalStreamReply{}
  146. // 检查参数
  147. if streamName == "" || key == "" || req.RoomId <= 0 {
  148. st, _ := ecode.Error(ecode.RequestErr, "some fields are empty").WithDetails(resp)
  149. return nil, st
  150. }
  151. flag := s.svr.CreateOfficalStream(ctx, streamName, key, req.RoomId)
  152. return &v1.CreateOfficalStreamReply{
  153. Success: flag,
  154. }, nil
  155. }
  156. // GetStreamInfo 获取单个流信息
  157. func (s *server) GetStreamInfo(ctx context.Context, req *v1.GetStreamInfoReq) (*v1.GetStreamInfoReply, error) {
  158. rid := req.RoomId
  159. sname := req.StreamName
  160. resp := &v1.GetStreamInfoReply{}
  161. if rid == 0 && sname == "" {
  162. resp.Code = -400
  163. resp.Message = "some fields are empty"
  164. return resp, nil
  165. }
  166. info, err := s.svr.GetStreamInfo(ctx, int64(rid), sname)
  167. if err != nil {
  168. resp.Code = -400
  169. resp.Message = err.Error()
  170. return resp, nil
  171. }
  172. baseList := []*v1.StreamBase{}
  173. for _, v := range info.List {
  174. forward := []uint32{}
  175. for _, f := range v.Forward {
  176. forward = append(forward, uint32(f))
  177. }
  178. baseList = append(baseList, &v1.StreamBase{
  179. StreamName: v.StreamName,
  180. DefaultUpstream: uint32(v.DefaultUpStream),
  181. Origin: uint32(v.Origin),
  182. Forward: forward,
  183. Type: uint32(v.Type),
  184. Options: uint32(v.Options),
  185. //Key: v.Key,
  186. })
  187. }
  188. resp.Code = 0
  189. resp.Data = &v1.StreamFullInfo{
  190. RoomId: uint32(info.RoomID),
  191. Hot: uint32(info.Hot),
  192. List: baseList,
  193. }
  194. return resp, nil
  195. }
  196. // GetMultiStreamInfo 批量获取流信息
  197. func (s *server) GetMultiStreamInfo(ctx context.Context, req *v1.GetMultiStreamInfoReq) (*v1.GetMultiStreamInfoReply, error) {
  198. rids := req.RoomIds
  199. resp := &v1.GetMultiStreamInfoReply{}
  200. // 切割room_id
  201. if len(rids) <= 0 {
  202. resp.Code = 0
  203. resp.Message = "success"
  204. return resp, nil
  205. }
  206. if len(rids) > 30 {
  207. resp.Code = -400
  208. resp.Message = "The number of rooms must be less than 30"
  209. return resp, nil
  210. }
  211. roomIDs := []int64{}
  212. for _, v := range rids {
  213. roomID := int64(v)
  214. if roomID <= 0 {
  215. continue
  216. }
  217. roomIDs = append(roomIDs, roomID)
  218. }
  219. info, err := s.svr.GetMultiStreamInfo(ctx, roomIDs)
  220. if err != nil {
  221. log.Infov(ctx, log.KV("log", err.Error()))
  222. resp.Code = 0
  223. resp.Message = "success"
  224. return resp, nil
  225. }
  226. if info == nil || len(info) == 0 {
  227. log.Infov(ctx, log.KV("log", "can find any things"))
  228. resp.Code = 0
  229. resp.Message = "success"
  230. return resp, nil
  231. }
  232. res := map[uint32]*v1.StreamFullInfo{}
  233. for id, v := range info {
  234. item := &v1.StreamFullInfo{}
  235. item.Hot = uint32(v.Hot)
  236. item.RoomId = uint32(v.RoomID)
  237. baseList := []*v1.StreamBase{}
  238. for _, i := range v.List {
  239. forward := []uint32{}
  240. for _, f := range i.Forward {
  241. forward = append(forward, uint32(f))
  242. }
  243. baseList = append(baseList, &v1.StreamBase{
  244. StreamName: i.StreamName,
  245. DefaultUpstream: uint32(i.DefaultUpStream),
  246. Origin: uint32(i.Origin),
  247. Forward: forward,
  248. Type: uint32(i.Type),
  249. Options: uint32(i.Options),
  250. //Key: i.Key,
  251. })
  252. }
  253. item.List = baseList
  254. res[uint32(id)] = item
  255. }
  256. resp.Code = 0
  257. resp.Data = res
  258. return resp, nil
  259. }
  260. // ChangeSrc 切换cdn
  261. func (s *server) ChangeSrc(ctx context.Context, req *v1.ChangeSrcReq) (*v1.EmptyStruct, error) {
  262. resp := &v1.EmptyStruct{}
  263. if req.RoomId <= 0 || req.Src == 0 || req.Source == "" || req.OperateName == "" {
  264. st, _ := ecode.Error(ecode.RequestErr, "some fields are empty").WithDetails(resp)
  265. return nil, st
  266. }
  267. // todo 后续改为新的src
  268. src := int8(req.Src)
  269. if _, ok := common.SrcMapBitwise[src]; !ok {
  270. st, _ := ecode.Error(ecode.RequestErr, "src is not right").WithDetails(resp)
  271. return nil, st
  272. }
  273. err := s.svr.ChangeSrc(ctx, req.RoomId, common.SrcMapBitwise[src], req.Source, req.OperateName, req.Reason)
  274. if err != nil {
  275. st, _ := ecode.Error(ecode.RequestErr, err.Error()).WithDetails(resp)
  276. return nil, st
  277. }
  278. return resp, nil
  279. }
  280. // GetStreamLastTime 得到流到最后推流时间;主流的推流时间up_rank = 1
  281. func (s *server) GetStreamLastTime(ctx context.Context, req *v1.GetStreamLastTimeReq) (*v1.GetStreamLastTimeReply, error) {
  282. rid := req.RoomId
  283. resp := &v1.GetStreamLastTimeReply{}
  284. if rid <= 0 {
  285. st, _ := ecode.Error(ecode.RequestErr, "room_id is not right").WithDetails(resp)
  286. return nil, st
  287. }
  288. t, err := s.svr.GetStreamLastTime(ctx, rid)
  289. if err != nil {
  290. st, _ := ecode.Error(ecode.RequestErr, err.Error()).WithDetails(resp)
  291. return nil, st
  292. }
  293. return &v1.GetStreamLastTimeReply{
  294. LastTime: t,
  295. }, nil
  296. }
  297. // GetStreamNameByRoomID 需要考虑备用流 + 考虑短号
  298. func (s *server) GetStreamNameByRoomID(ctx context.Context, req *v1.GetStreamNameByRoomIDReq) (*v1.GetStreamNameByRoomIDReply, error) {
  299. rid := req.RoomId
  300. resp := &v1.GetStreamNameByRoomIDReply{}
  301. if rid <= 0 {
  302. st, _ := ecode.Error(ecode.RequestErr, "room_id is not right").WithDetails(resp)
  303. return nil, st
  304. }
  305. res, err := s.svr.GetStreamNameByRoomID(ctx, rid, false)
  306. if err != nil {
  307. st, _ := ecode.Error(ecode.RequestErr, err.Error()).WithDetails(resp)
  308. return nil, st
  309. }
  310. if len(res) == 0 {
  311. st, _ := ecode.Error(ecode.RequestErr, fmt.Sprintf("can not find info by room_id=%d", rid)).WithDetails(resp)
  312. return nil, st
  313. }
  314. return &v1.GetStreamNameByRoomIDReply{
  315. StreamName: res[0],
  316. }, nil
  317. }
  318. // GetRoomIDByStreamName 查询房间号
  319. func (s *server) GetRoomIDByStreamName(ctx context.Context, req *v1.GetRoomIDByStreamNameReq) (*v1.GetRoomIDByStreamNameReply, error) {
  320. resp := &v1.GetRoomIDByStreamNameReply{}
  321. if req.StreamName == "" {
  322. st, _ := ecode.Error(ecode.RequestErr, "stream name is empty").WithDetails(resp)
  323. return nil, st
  324. }
  325. res, err := s.svr.GetRoomIDByStreamName(ctx, req.StreamName)
  326. if err != nil {
  327. st, _ := ecode.Error(ecode.RequestErr, err.Error()).WithDetails(resp)
  328. return nil, st
  329. }
  330. if res <= 0 {
  331. st, _ := ecode.Error(ecode.RequestErr, fmt.Sprintf("can not find any info by name = %s", req.StreamName)).WithDetails(resp)
  332. return nil, st
  333. }
  334. return &v1.GetRoomIDByStreamNameReply{
  335. RoomId: res,
  336. }, nil
  337. }
  338. // GetAdapterStreamByStreamName 适配结果输出, 此处也可以输入备用流, 该结果只输出直推上行
  339. func (s *server) GetAdapterStreamByStreamName(ctx context.Context, req *v1.GetAdapterStreamByStreamNameReq) (*v1.GetAdapterStreamByStreamNameReply, error) {
  340. res := v1.GetAdapterStreamByStreamNameReply{
  341. List: map[string]*v1.AdapterStream{},
  342. }
  343. snames := req.StreamNames
  344. if snames == "" {
  345. st, _ := ecode.Error(ecode.RequestErr, "stream_names is empty").WithDetails(&res)
  346. return nil, st
  347. }
  348. nameSlice := strings.Split(snames, ",")
  349. if len(nameSlice) == 0 {
  350. st, _ := ecode.Error(ecode.RequestErr, "stream_names is empty").WithDetails(&res)
  351. return nil, st
  352. }
  353. if len(nameSlice) > 500 {
  354. st, _ := ecode.Error(ecode.RequestErr, "too many names").WithDetails(&res)
  355. return nil, st
  356. }
  357. info := s.svr.GetAdapterStreamByStreamName(ctx, nameSlice)
  358. if info != nil {
  359. for name, v := range info {
  360. res.List[name] = &v1.AdapterStream{
  361. Src: v.Src,
  362. RoomId: v.RoomID,
  363. UpRank: v.UpRank,
  364. SrcName: v.SrcName,
  365. }
  366. }
  367. }
  368. return &res, nil
  369. }
  370. // GetSrcByRoomID
  371. func (s *server) GetSrcByRoomID(ctx context.Context, req *v1.GetSrcByRoomIDReq) (*v1.GetSrcByRoomIDReply, error) {
  372. rid := req.RoomId
  373. resp := &v1.GetSrcByRoomIDReply{}
  374. if rid <= 0 {
  375. st, _ := ecode.Error(ecode.RequestErr, "room_id is not right").WithDetails(resp)
  376. return nil, st
  377. }
  378. info, err := s.svr.GetSrcByRoomID(ctx, rid)
  379. if err != nil {
  380. st, _ := ecode.Error(ecode.RequestErr, err.Error()).WithDetails(resp)
  381. return nil, st
  382. }
  383. if info == nil || len(info) == 0 {
  384. st, _ := ecode.Error(ecode.RequestErr, "获取线路失败").WithDetails(resp)
  385. return nil, st
  386. }
  387. res := &v1.GetSrcByRoomIDReply{
  388. List: []*v1.RoomSrcCheck{},
  389. }
  390. for _, v := range info {
  391. res.List = append(res.List, &v1.RoomSrcCheck{
  392. Src: v.Src,
  393. Checked: int32(v.Checked),
  394. Desc: v.Desc,
  395. })
  396. }
  397. return res, nil
  398. }
  399. // GetLineListByRoomID
  400. func (s *server) GetLineListByRoomID(ctx context.Context, req *v1.GetLineListByRoomIDReq) (*v1.GetLineListByRoomIDReply, error) {
  401. resp := &v1.GetLineListByRoomIDReply{}
  402. if req.RoomId <= 0 {
  403. st, _ := ecode.Error(ecode.RequestErr, "room_id is not right").WithDetails(resp)
  404. return nil, st
  405. }
  406. info, err := s.svr.GetLineListByRoomID(ctx, req.RoomId)
  407. if err != nil {
  408. st, _ := ecode.Error(ecode.RequestErr, err.Error()).WithDetails(resp)
  409. return nil, st
  410. }
  411. if info == nil || len(info) == 0 {
  412. st, _ := ecode.Error(ecode.RequestErr, "获取线路失败").WithDetails(resp)
  413. return nil, st
  414. }
  415. res := &v1.GetLineListByRoomIDReply{
  416. List: []*v1.LineList{},
  417. }
  418. for _, v := range info {
  419. res.List = append(res.List, &v1.LineList{
  420. Src: v.Src,
  421. Use: v.Use,
  422. Desc: v.Desc,
  423. })
  424. }
  425. return res, nil
  426. }
  427. // GetUpStreamRtmp UpStream
  428. func (s *server) GetUpStreamRtmp(ctx context.Context, req *v1.GetUpStreamRtmpReq) (*v1.GetUpStreamRtmpReply, error) {
  429. resp := &v1.GetUpStreamRtmpReply{}
  430. if req.RoomId == 0 || req.Platform == "" {
  431. st, _ := ecode.Error(ecode.RequestErr, "some fields are empty").WithDetails(resp)
  432. return nil, st
  433. }
  434. if req.Ip == "" {
  435. if cmd, ok := nmd.FromContext(ctx); ok {
  436. if ip, ok := cmd[nmd.RemoteIP].(string); ok {
  437. req.Ip = ip
  438. }
  439. }
  440. }
  441. info, err := s.svr.GetUpStreamRtmp(ctx, req.RoomId, req.FreeFlow, req.Ip, req.AreaId, int(req.Attentions), 0, req.Platform)
  442. if err != nil {
  443. st, _ := ecode.Error(ecode.RequestErr, err.Error()).WithDetails(resp)
  444. return nil, st
  445. }
  446. if info != nil {
  447. resp.Up = &v1.UpStreamRtmp{
  448. Addr: info.Addr,
  449. Code: info.Code,
  450. NewLink: info.NewLink,
  451. }
  452. }
  453. return resp, nil
  454. }
  455. // StreamCut 切流的房间和时间, 内部调用
  456. func (s *server) StreamCut(ctx context.Context, req *v1.StreamCutReq) (*v1.EmptyStruct, error) {
  457. roomID := req.RoomId
  458. cutTime := req.CutTime
  459. resp := &v1.EmptyStruct{}
  460. if roomID <= 0 {
  461. st, _ := ecode.Error(ecode.RequestErr, "room_ids is not right").WithDetails(resp)
  462. return nil, st
  463. }
  464. if cutTime == 0 {
  465. cutTime = 1
  466. }
  467. s.svr.StreamCut(ctx, roomID, cutTime, 0)
  468. return &v1.EmptyStruct{}, nil
  469. }
  470. // Ping Service
  471. func (s *server) Ping(ctx context.Context, req *v1.PingReq) (*v1.PingReply, error) {
  472. return &v1.PingReply{}, nil
  473. }
  474. // Close Service
  475. func (s *server) Close(ctx context.Context, req *v1.CloseReq) (*v1.CloseReply, error) {
  476. return &v1.CloseReply{}, nil
  477. }
  478. // ClearStreamStatus
  479. func (s *server) ClearStreamStatus(ctx context.Context, req *v1.ClearStreamStatusReq) (*v1.EmptyStruct, error) {
  480. rid := req.RoomId
  481. resp := &v1.EmptyStruct{}
  482. if rid <= 0 {
  483. st, _ := ecode.Error(ecode.RequestErr, "room_ids is not right").WithDetails(resp)
  484. return nil, st
  485. }
  486. err := s.svr.ClearStreamStatus(ctx, rid)
  487. if err != nil {
  488. st, _ := ecode.Error(ecode.RequestErr, err.Error()).WithDetails(resp)
  489. return nil, st
  490. }
  491. return &v1.EmptyStruct{}, nil
  492. }
  493. // CheckLiveStreamList
  494. func (s *server) CheckLiveStreamList(ctx context.Context, req *v1.CheckLiveStreamReq) (*v1.CheckLiveStreamReply, error) {
  495. resp := &v1.CheckLiveStreamReply{}
  496. rids := req.RoomId
  497. if len(rids) == 0 {
  498. st, _ := ecode.Error(ecode.RequestErr, "room_ids is empty").WithDetails(resp)
  499. return nil, st
  500. }
  501. res := s.svr.CheckLiveStreamList(ctx, rids)
  502. resp.List = res
  503. return resp, nil
  504. }
  505. // mockStream 模拟生成的流名
  506. func mockStreamName(uid string) string {
  507. num := rand.Int63n(88888888)
  508. return fmt.Sprintf("live_%s_%d", uid, num+1111111)
  509. }
  510. // mockStreamKey 模拟生成的key
  511. func mockStreamKey(uid string) string {
  512. str := fmt.Sprintf("nvijqwopW1%s%d", uid, time.Now().Unix())
  513. h := md5.New()
  514. h.Write([]byte(str))
  515. cipherStr := h.Sum(nil)
  516. md5Str := hex.EncodeToString(cipherStr)
  517. return md5Str
  518. }
  519. func middleware() grpc.UnaryServerInterceptor {
  520. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
  521. resp, err = handler(ctx, req)
  522. out := ""
  523. if err != nil {
  524. out = err.Error()
  525. } else {
  526. jo, _ := json.Marshal(resp)
  527. out = string(jo)
  528. }
  529. // 记录调用方法
  530. log.Infov(ctx,
  531. log.KV("path", info.FullMethod),
  532. log.KV("caller", nmd.String(ctx, nmd.Caller)),
  533. log.KV("input_params", fmt.Sprintf("%s", req)),
  534. log.KV("output_data", out),
  535. )
  536. return
  537. }
  538. }