123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- package service
- import (
- "encoding/json"
- "fmt"
- pb "go-common/app/interface/main/broadcast/api/grpc/v1"
- "go-common/app/service/main/broadcast/model"
- "go-common/library/log"
- )
- const (
- _pushMsg = "push"
- _broadcastMsg = "broadcast"
- _broadcastRoomMsg = "broadcast_room"
- )
- type databusMsg struct {
- Type string `json:"type,omitempty"`
- Operation int32 `json:"operation,omitempty"`
- Server string `json:"server,omitempty"`
- Keys []string `json:"keys,omitempty"`
- Room string `json:"room,omitempty"`
- Speed int32 `json:"speed,omitempty"`
- Platform string `json:"platform,omitempty"`
- ContentType int32 `json:"content_type,omitempty"`
- Message json.RawMessage `json:"message,omitempty"`
- }
- func (s *Service) pushMsg(msg []byte) (err error) {
- m := &databusMsg{}
- if err = json.Unmarshal(msg, m); err != nil {
- log.Error("json.Unmarshal(%s) error(%s)", msg, err)
- return
- }
- log.Info("push message:%s", msg)
- switch m.Type {
- case _pushMsg:
- if len(m.Keys) == 0 {
- log.Error("service push keys is invalid:%+v", m)
- return
- }
- s.pushKeys(m.Operation, m.Server, m.Keys, m.Message, m.ContentType)
- case _broadcastMsg:
- s.broadcast(m.Operation, m.Message, m.Speed, m.Platform, m.ContentType)
- case _broadcastRoomMsg:
- if m.Room == "" {
- log.Error("service broadcast room is invalid:%+v", m)
- return
- }
- if err = s.room(m.Room).Push(m.Operation, m.Message, m.ContentType); err != nil {
- log.Error("room.Push(%s) roomId:%s error(%v)", m.Message, m.Room, err)
- }
- // NOTE: 弹幕兼容老协议推送,等web播放器接完可以去掉了
- if m.Operation == 1000 {
- s.broadcastRoom(m.Room, 5, []byte(fmt.Sprintf(`{"cmd":"DM","info":%s}`, m.Message)))
- }
- default:
- log.Error("unknown message type:%s", m.Type)
- }
- return
- }
- // pushKeys push a message to a batch of subkeys.
- func (s *Service) pushKeys(operation int32, serverID string, subKeys []string, body []byte, contentType int32) {
- p := &model.Proto{
- Ver: 1,
- Operation: operation,
- ContentType: contentType,
- Body: body,
- }
- var args = pb.PushMsgReq{
- Keys: subKeys,
- ProtoOp: operation,
- Proto: p,
- }
- if c, ok := s.cometServers[serverID]; ok {
- if err := c.Push(&args); err != nil {
- log.Error("c.Push(%v) serverId:%s error(%v)", args, serverID, err)
- }
- }
- }
- // broadcast broadcast a message to all.
- func (s *Service) broadcast(operation int32, body []byte, speed int32, platform string, contentType int32) {
- p := &model.Proto{
- Ver: 1,
- Operation: operation,
- ContentType: contentType,
- Body: body,
- }
- comets := s.cometServers
- speed /= int32(len(comets))
- var args = pb.BroadcastReq{
- ProtoOp: operation,
- Proto: p,
- Speed: speed,
- Platform: platform,
- }
- for serverID, c := range comets {
- if err := c.Broadcast(&args); err != nil {
- log.Error("c.Broadcast(%v) serverId:%s error(%v)", args, serverID, err)
- }
- }
- }
- // broadcastRoomRawBytes broadcast aggregation messages to room.
- func (s *Service) broadcastRoomRawBytes(roomID string, body []byte) {
- args := pb.BroadcastRoomReq{
- RoomID: roomID,
- Proto: &model.Proto{
- Ver: 1,
- Operation: model.OpRaw,
- Body: body,
- },
- }
- comets := s.cometServers
- for serverID, c := range comets {
- if err := c.BroadcastRoom(&args); err != nil {
- log.Error("c.BroadcastRoom(%v) roomID:%s serverId:%s error(%v)", args, roomID, serverID, err)
- }
- }
- }
- // broadcastRoom broadcast messages to room.
- func (s *Service) broadcastRoom(roomID string, op int32, body []byte) {
- args := pb.BroadcastRoomReq{
- RoomID: roomID,
- Proto: &model.Proto{
- Ver: 1,
- Operation: op,
- Body: body,
- },
- }
- comets := s.cometServers
- for serverID, c := range comets {
- if err := c.BroadcastRoom(&args); err != nil {
- log.Error("c.BroadcastRoom(%v) roomID:%s serverId:%s error(%v)", args, roomID, serverID, err)
- }
- }
- }
|