push.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package service
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. pb "go-common/app/interface/main/broadcast/api/grpc/v1"
  6. "go-common/app/service/main/broadcast/model"
  7. "go-common/library/log"
  8. )
  9. const (
  10. _pushMsg = "push"
  11. _broadcastMsg = "broadcast"
  12. _broadcastRoomMsg = "broadcast_room"
  13. )
  14. type databusMsg struct {
  15. Type string `json:"type,omitempty"`
  16. Operation int32 `json:"operation,omitempty"`
  17. Server string `json:"server,omitempty"`
  18. Keys []string `json:"keys,omitempty"`
  19. Room string `json:"room,omitempty"`
  20. Speed int32 `json:"speed,omitempty"`
  21. Platform string `json:"platform,omitempty"`
  22. ContentType int32 `json:"content_type,omitempty"`
  23. Message json.RawMessage `json:"message,omitempty"`
  24. }
  25. func (s *Service) pushMsg(msg []byte) (err error) {
  26. m := &databusMsg{}
  27. if err = json.Unmarshal(msg, m); err != nil {
  28. log.Error("json.Unmarshal(%s) error(%s)", msg, err)
  29. return
  30. }
  31. log.Info("push message:%s", msg)
  32. switch m.Type {
  33. case _pushMsg:
  34. if len(m.Keys) == 0 {
  35. log.Error("service push keys is invalid:%+v", m)
  36. return
  37. }
  38. s.pushKeys(m.Operation, m.Server, m.Keys, m.Message, m.ContentType)
  39. case _broadcastMsg:
  40. s.broadcast(m.Operation, m.Message, m.Speed, m.Platform, m.ContentType)
  41. case _broadcastRoomMsg:
  42. if m.Room == "" {
  43. log.Error("service broadcast room is invalid:%+v", m)
  44. return
  45. }
  46. if err = s.room(m.Room).Push(m.Operation, m.Message, m.ContentType); err != nil {
  47. log.Error("room.Push(%s) roomId:%s error(%v)", m.Message, m.Room, err)
  48. }
  49. // NOTE: 弹幕兼容老协议推送,等web播放器接完可以去掉了
  50. if m.Operation == 1000 {
  51. s.broadcastRoom(m.Room, 5, []byte(fmt.Sprintf(`{"cmd":"DM","info":%s}`, m.Message)))
  52. }
  53. default:
  54. log.Error("unknown message type:%s", m.Type)
  55. }
  56. return
  57. }
  58. // pushKeys push a message to a batch of subkeys.
  59. func (s *Service) pushKeys(operation int32, serverID string, subKeys []string, body []byte, contentType int32) {
  60. p := &model.Proto{
  61. Ver: 1,
  62. Operation: operation,
  63. ContentType: contentType,
  64. Body: body,
  65. }
  66. var args = pb.PushMsgReq{
  67. Keys: subKeys,
  68. ProtoOp: operation,
  69. Proto: p,
  70. }
  71. if c, ok := s.cometServers[serverID]; ok {
  72. if err := c.Push(&args); err != nil {
  73. log.Error("c.Push(%v) serverId:%s error(%v)", args, serverID, err)
  74. }
  75. }
  76. }
  77. // broadcast broadcast a message to all.
  78. func (s *Service) broadcast(operation int32, body []byte, speed int32, platform string, contentType int32) {
  79. p := &model.Proto{
  80. Ver: 1,
  81. Operation: operation,
  82. ContentType: contentType,
  83. Body: body,
  84. }
  85. comets := s.cometServers
  86. speed /= int32(len(comets))
  87. var args = pb.BroadcastReq{
  88. ProtoOp: operation,
  89. Proto: p,
  90. Speed: speed,
  91. Platform: platform,
  92. }
  93. for serverID, c := range comets {
  94. if err := c.Broadcast(&args); err != nil {
  95. log.Error("c.Broadcast(%v) serverId:%s error(%v)", args, serverID, err)
  96. }
  97. }
  98. }
  99. // broadcastRoomRawBytes broadcast aggregation messages to room.
  100. func (s *Service) broadcastRoomRawBytes(roomID string, body []byte) {
  101. args := pb.BroadcastRoomReq{
  102. RoomID: roomID,
  103. Proto: &model.Proto{
  104. Ver: 1,
  105. Operation: model.OpRaw,
  106. Body: body,
  107. },
  108. }
  109. comets := s.cometServers
  110. for serverID, c := range comets {
  111. if err := c.BroadcastRoom(&args); err != nil {
  112. log.Error("c.BroadcastRoom(%v) roomID:%s serverId:%s error(%v)", args, roomID, serverID, err)
  113. }
  114. }
  115. }
  116. // broadcastRoom broadcast messages to room.
  117. func (s *Service) broadcastRoom(roomID string, op int32, body []byte) {
  118. args := pb.BroadcastRoomReq{
  119. RoomID: roomID,
  120. Proto: &model.Proto{
  121. Ver: 1,
  122. Operation: op,
  123. Body: body,
  124. },
  125. }
  126. comets := s.cometServers
  127. for serverID, c := range comets {
  128. if err := c.BroadcastRoom(&args); err != nil {
  129. log.Error("c.BroadcastRoom(%v) roomID:%s serverId:%s error(%v)", args, roomID, serverID, err)
  130. }
  131. }
  132. }