123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- package model
- import (
- "errors"
- "go-common/app/service/main/broadcast/libs/bufio"
- "go-common/app/service/main/broadcast/libs/bytes"
- "go-common/app/service/main/broadcast/libs/encoding/binary"
- "go-common/app/service/main/broadcast/libs/websocket"
- )
- const (
- // MaxBodySize max proto body size
- MaxBodySize = int32(1 << 12)
- )
- const (
- // size
- _packSize = 4
- _headerSize = 2
- _verSize = 2
- _operationSize = 4
- _seqIDSize = 4
- _compressSize = 1
- _contentTypeSize = 1
- _rawHeaderSize = _packSize + _headerSize + _verSize + _operationSize + _seqIDSize + _compressSize + _contentTypeSize
- _maxPackSize = MaxBodySize + int32(_rawHeaderSize)
- // offset
- _packOffset = 0
- _headerOffset = _packOffset + _packSize
- _verOffset = _headerOffset + _headerSize
- _operationOffset = _verOffset + _verSize
- _seqIDOffset = _operationOffset + _operationSize
- _compressOffset = _seqIDOffset + _seqIDSize
- _contentTypeOffset = _compressOffset + _compressSize
- )
- var (
- emptyJSONBody = []byte("{}")
- // ErrProtoPackLen proto packet len error
- ErrProtoPackLen = errors.New("default server codec pack length error")
- // ErrProtoHeaderLen proto header len error
- ErrProtoHeaderLen = errors.New("default server codec header length error")
- )
- var (
- // ProtoReady proto ready
- ProtoReady = &Proto{Operation: OpProtoReady}
- // ProtoFinish proto finish
- ProtoFinish = &Proto{Operation: OpProtoFinish}
- )
- // WriteTo write a proto to bytes writer.
- func (p *Proto) WriteTo(b *bytes.Writer) {
- var (
- packLen = _rawHeaderSize + int32(len(p.Body))
- buf = b.Peek(_rawHeaderSize)
- )
- binary.BigEndian.PutInt32(buf[_packOffset:], packLen)
- binary.BigEndian.PutInt16(buf[_headerOffset:], int16(_rawHeaderSize))
- binary.BigEndian.PutInt16(buf[_verOffset:], int16(p.Ver))
- binary.BigEndian.PutInt32(buf[_operationOffset:], p.Operation)
- binary.BigEndian.PutInt32(buf[_seqIDOffset:], p.SeqId)
- binary.BigEndian.PutInt8(buf[_compressOffset:], int8(p.Compress))
- binary.BigEndian.PutInt8(buf[_contentTypeOffset:], int8(p.ContentType))
- if p.Body != nil {
- b.Write(p.Body)
- }
- }
- // ReadTCP read a proto from TCP reader.
- func (p *Proto) ReadTCP(rr *bufio.Reader) (err error) {
- var (
- bodyLen int
- headerLen int16
- packLen int32
- buf []byte
- )
- if buf, err = rr.Pop(_rawHeaderSize); err != nil {
- return
- }
- packLen = binary.BigEndian.Int32(buf[_packOffset:_headerOffset])
- headerLen = binary.BigEndian.Int16(buf[_headerOffset:_verOffset])
- p.Ver = int32(binary.BigEndian.Int16(buf[_verOffset:_operationOffset]))
- p.Operation = binary.BigEndian.Int32(buf[_operationOffset:_seqIDOffset])
- p.SeqId = binary.BigEndian.Int32(buf[_seqIDOffset:_compressOffset])
- p.Compress = int32(binary.BigEndian.Int8(buf[_compressOffset:_contentTypeOffset]))
- p.ContentType = int32(binary.BigEndian.Int8(buf[_contentTypeOffset:]))
- if packLen > _maxPackSize {
- return ErrProtoPackLen
- }
- if headerLen != _rawHeaderSize {
- return ErrProtoHeaderLen
- }
- if bodyLen = int(packLen - int32(headerLen)); bodyLen > 0 {
- p.Body, err = rr.Pop(bodyLen)
- } else {
- p.Body = nil
- }
- return
- }
- // WriteTCP write a proto to TCP writer.
- func (p *Proto) WriteTCP(wr *bufio.Writer) (err error) {
- var (
- buf []byte
- packLen int32
- )
- if p.Operation == OpRaw {
- // write without buffer, job concact proto into raw buffer
- _, err = wr.WriteRaw(p.Body)
- return
- }
- packLen = _rawHeaderSize + int32(len(p.Body))
- if buf, err = wr.Peek(_rawHeaderSize); err != nil {
- return
- }
- binary.BigEndian.PutInt32(buf[_packOffset:], packLen)
- binary.BigEndian.PutInt16(buf[_headerOffset:], int16(_rawHeaderSize))
- binary.BigEndian.PutInt16(buf[_verOffset:], int16(p.Ver))
- binary.BigEndian.PutInt32(buf[_operationOffset:], p.Operation)
- binary.BigEndian.PutInt32(buf[_seqIDOffset:], p.SeqId)
- binary.BigEndian.PutInt8(buf[_compressOffset:], int8(p.Compress))
- binary.BigEndian.PutInt8(buf[_contentTypeOffset:], int8(p.ContentType))
- if p.Body != nil {
- _, err = wr.Write(p.Body)
- }
- return
- }
- // ReadWebsocket read a proto from websocket connection.
- func (p *Proto) ReadWebsocket(ws *websocket.Conn) (err error) {
- var (
- bodyLen int
- headerLen int16
- packLen int32
- buf []byte
- )
- if _, buf, err = ws.ReadMessage(); err != nil {
- return
- }
- if len(buf) < _rawHeaderSize {
- return ErrProtoPackLen
- }
- packLen = binary.BigEndian.Int32(buf[_packOffset:_headerOffset])
- headerLen = binary.BigEndian.Int16(buf[_headerOffset:_verOffset])
- p.Ver = int32(binary.BigEndian.Int16(buf[_verOffset:_operationOffset]))
- p.Operation = binary.BigEndian.Int32(buf[_operationOffset:_seqIDOffset])
- p.SeqId = binary.BigEndian.Int32(buf[_seqIDOffset:_compressOffset])
- p.Compress = int32(binary.BigEndian.Int8(buf[_compressOffset:_contentTypeOffset]))
- p.ContentType = int32(binary.BigEndian.Int8(buf[_contentTypeOffset:]))
- if packLen > _maxPackSize {
- return ErrProtoPackLen
- }
- if headerLen != _rawHeaderSize {
- return ErrProtoHeaderLen
- }
- if bodyLen = int(packLen - int32(headerLen)); bodyLen > 0 {
- p.Body = buf[headerLen:packLen]
- } else {
- p.Body = nil
- }
- return
- }
- // WriteWebsocket write a proto to websocket connection.
- func (p *Proto) WriteWebsocket(ws *websocket.Conn) (err error) {
- var (
- buf []byte
- packLen int
- )
- // NOTE: 通过 OpRaw = 9 为ws批量消息处理
- // if p.Operation == OpRaw {
- // err = ws.WriteMessage(websocket.BinaryMessage, p.Body)
- // return
- // }
- packLen = _rawHeaderSize + len(p.Body)
- if err = ws.WriteHeader(websocket.BinaryMessage, packLen); err != nil {
- return
- }
- if buf, err = ws.Peek(_rawHeaderSize); err != nil {
- return
- }
- binary.BigEndian.PutInt32(buf[_packOffset:], int32(packLen))
- binary.BigEndian.PutInt16(buf[_headerOffset:], int16(_rawHeaderSize))
- binary.BigEndian.PutInt16(buf[_verOffset:], int16(p.Ver))
- binary.BigEndian.PutInt32(buf[_operationOffset:], p.Operation)
- binary.BigEndian.PutInt32(buf[_seqIDOffset:], p.SeqId)
- binary.BigEndian.PutInt8(buf[_compressOffset:], int8(p.Compress))
- binary.BigEndian.PutInt8(buf[_contentTypeOffset:], int8(p.ContentType))
- if p.Body != nil {
- err = ws.WriteBody(p.Body)
- }
- return
- }
- // WriteWebsocketHeart write a heartbeat proto to websocket connnection.
- func (p *Proto) WriteWebsocketHeart(wr *websocket.Conn) (err error) {
- var (
- buf []byte
- packLen int
- )
- if len(p.Body) == 0 {
- p.Body = emptyJSONBody
- }
- packLen = _rawHeaderSize + len(p.Body)
- // websocket header
- if err = wr.WriteHeader(websocket.BinaryMessage, packLen); err != nil {
- return
- }
- if buf, err = wr.Peek(_rawHeaderSize); err != nil {
- return
- }
- // proto header
- binary.BigEndian.PutInt32(buf[_packOffset:], int32(packLen))
- binary.BigEndian.PutInt16(buf[_headerOffset:], int16(_rawHeaderSize))
- binary.BigEndian.PutInt16(buf[_verOffset:], int16(p.Ver))
- binary.BigEndian.PutInt32(buf[_operationOffset:], p.Operation)
- binary.BigEndian.PutInt32(buf[_seqIDOffset:], p.SeqId)
- binary.BigEndian.PutInt8(buf[_compressOffset:], int8(p.Compress))
- binary.BigEndian.PutInt8(buf[_contentTypeOffset:], int8(p.ContentType))
- // proto body
- if p.Body != nil {
- err = wr.WriteBody(p.Body)
- }
- return
- }
- // WriteTCPHeart write a heartbeat proto to TCP writer.
- func (p *Proto) WriteTCPHeart(wr *bufio.Writer) (err error) {
- var (
- buf []byte
- packLen int32
- )
- if len(p.Body) == 0 {
- p.Body = emptyJSONBody
- }
- packLen = _rawHeaderSize + int32(len(p.Body))
- if buf, err = wr.Peek(_rawHeaderSize); err != nil {
- return
- }
- // header
- binary.BigEndian.PutInt32(buf[_packOffset:], int32(packLen))
- binary.BigEndian.PutInt16(buf[_headerOffset:], int16(_rawHeaderSize))
- binary.BigEndian.PutInt16(buf[_verOffset:], int16(p.Ver))
- binary.BigEndian.PutInt32(buf[_operationOffset:], p.Operation)
- binary.BigEndian.PutInt32(buf[_seqIDOffset:], p.SeqId)
- binary.BigEndian.PutInt8(buf[_compressOffset:], int8(p.Compress))
- binary.BigEndian.PutInt8(buf[_contentTypeOffset:], int8(p.ContentType))
- // body
- if p.Body != nil {
- _, err = wr.Write(p.Body)
- }
- return
- }
|