model.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package model
  2. import (
  3. "errors"
  4. "go-common/app/service/main/broadcast/libs/bufio"
  5. "go-common/app/service/main/broadcast/libs/bytes"
  6. "go-common/app/service/main/broadcast/libs/encoding/binary"
  7. "go-common/app/service/main/broadcast/libs/websocket"
  8. )
  9. const (
  10. // MaxBodySize max proto body size
  11. MaxBodySize = int32(1 << 12)
  12. )
  13. const (
  14. // size
  15. _packSize = 4
  16. _headerSize = 2
  17. _verSize = 2
  18. _operationSize = 4
  19. _seqIDSize = 4
  20. _compressSize = 1
  21. _contentTypeSize = 1
  22. _rawHeaderSize = _packSize + _headerSize + _verSize + _operationSize + _seqIDSize + _compressSize + _contentTypeSize
  23. _maxPackSize = MaxBodySize + int32(_rawHeaderSize)
  24. // offset
  25. _packOffset = 0
  26. _headerOffset = _packOffset + _packSize
  27. _verOffset = _headerOffset + _headerSize
  28. _operationOffset = _verOffset + _verSize
  29. _seqIDOffset = _operationOffset + _operationSize
  30. _compressOffset = _seqIDOffset + _seqIDSize
  31. _contentTypeOffset = _compressOffset + _compressSize
  32. )
  33. var (
  34. emptyJSONBody = []byte("{}")
  35. // ErrProtoPackLen proto packet len error
  36. ErrProtoPackLen = errors.New("default server codec pack length error")
  37. // ErrProtoHeaderLen proto header len error
  38. ErrProtoHeaderLen = errors.New("default server codec header length error")
  39. )
  40. var (
  41. // ProtoReady proto ready
  42. ProtoReady = &Proto{Operation: OpProtoReady}
  43. // ProtoFinish proto finish
  44. ProtoFinish = &Proto{Operation: OpProtoFinish}
  45. )
  46. // WriteTo write a proto to bytes writer.
  47. func (p *Proto) WriteTo(b *bytes.Writer) {
  48. var (
  49. packLen = _rawHeaderSize + int32(len(p.Body))
  50. buf = b.Peek(_rawHeaderSize)
  51. )
  52. binary.BigEndian.PutInt32(buf[_packOffset:], packLen)
  53. binary.BigEndian.PutInt16(buf[_headerOffset:], int16(_rawHeaderSize))
  54. binary.BigEndian.PutInt16(buf[_verOffset:], int16(p.Ver))
  55. binary.BigEndian.PutInt32(buf[_operationOffset:], p.Operation)
  56. binary.BigEndian.PutInt32(buf[_seqIDOffset:], p.SeqId)
  57. binary.BigEndian.PutInt8(buf[_compressOffset:], int8(p.Compress))
  58. binary.BigEndian.PutInt8(buf[_contentTypeOffset:], int8(p.ContentType))
  59. if p.Body != nil {
  60. b.Write(p.Body)
  61. }
  62. }
  63. // ReadTCP read a proto from TCP reader.
  64. func (p *Proto) ReadTCP(rr *bufio.Reader) (err error) {
  65. var (
  66. bodyLen int
  67. headerLen int16
  68. packLen int32
  69. buf []byte
  70. )
  71. if buf, err = rr.Pop(_rawHeaderSize); err != nil {
  72. return
  73. }
  74. packLen = binary.BigEndian.Int32(buf[_packOffset:_headerOffset])
  75. headerLen = binary.BigEndian.Int16(buf[_headerOffset:_verOffset])
  76. p.Ver = int32(binary.BigEndian.Int16(buf[_verOffset:_operationOffset]))
  77. p.Operation = binary.BigEndian.Int32(buf[_operationOffset:_seqIDOffset])
  78. p.SeqId = binary.BigEndian.Int32(buf[_seqIDOffset:_compressOffset])
  79. p.Compress = int32(binary.BigEndian.Int8(buf[_compressOffset:_contentTypeOffset]))
  80. p.ContentType = int32(binary.BigEndian.Int8(buf[_contentTypeOffset:]))
  81. if packLen > _maxPackSize {
  82. return ErrProtoPackLen
  83. }
  84. if headerLen != _rawHeaderSize {
  85. return ErrProtoHeaderLen
  86. }
  87. if bodyLen = int(packLen - int32(headerLen)); bodyLen > 0 {
  88. p.Body, err = rr.Pop(bodyLen)
  89. } else {
  90. p.Body = nil
  91. }
  92. return
  93. }
  94. // WriteTCP write a proto to TCP writer.
  95. func (p *Proto) WriteTCP(wr *bufio.Writer) (err error) {
  96. var (
  97. buf []byte
  98. packLen int32
  99. )
  100. if p.Operation == OpRaw {
  101. // write without buffer, job concact proto into raw buffer
  102. _, err = wr.WriteRaw(p.Body)
  103. return
  104. }
  105. packLen = _rawHeaderSize + int32(len(p.Body))
  106. if buf, err = wr.Peek(_rawHeaderSize); err != nil {
  107. return
  108. }
  109. binary.BigEndian.PutInt32(buf[_packOffset:], packLen)
  110. binary.BigEndian.PutInt16(buf[_headerOffset:], int16(_rawHeaderSize))
  111. binary.BigEndian.PutInt16(buf[_verOffset:], int16(p.Ver))
  112. binary.BigEndian.PutInt32(buf[_operationOffset:], p.Operation)
  113. binary.BigEndian.PutInt32(buf[_seqIDOffset:], p.SeqId)
  114. binary.BigEndian.PutInt8(buf[_compressOffset:], int8(p.Compress))
  115. binary.BigEndian.PutInt8(buf[_contentTypeOffset:], int8(p.ContentType))
  116. if p.Body != nil {
  117. _, err = wr.Write(p.Body)
  118. }
  119. return
  120. }
  121. // ReadWebsocket read a proto from websocket connection.
  122. func (p *Proto) ReadWebsocket(ws *websocket.Conn) (err error) {
  123. var (
  124. bodyLen int
  125. headerLen int16
  126. packLen int32
  127. buf []byte
  128. )
  129. if _, buf, err = ws.ReadMessage(); err != nil {
  130. return
  131. }
  132. if len(buf) < _rawHeaderSize {
  133. return ErrProtoPackLen
  134. }
  135. packLen = binary.BigEndian.Int32(buf[_packOffset:_headerOffset])
  136. headerLen = binary.BigEndian.Int16(buf[_headerOffset:_verOffset])
  137. p.Ver = int32(binary.BigEndian.Int16(buf[_verOffset:_operationOffset]))
  138. p.Operation = binary.BigEndian.Int32(buf[_operationOffset:_seqIDOffset])
  139. p.SeqId = binary.BigEndian.Int32(buf[_seqIDOffset:_compressOffset])
  140. p.Compress = int32(binary.BigEndian.Int8(buf[_compressOffset:_contentTypeOffset]))
  141. p.ContentType = int32(binary.BigEndian.Int8(buf[_contentTypeOffset:]))
  142. if packLen > _maxPackSize {
  143. return ErrProtoPackLen
  144. }
  145. if headerLen != _rawHeaderSize {
  146. return ErrProtoHeaderLen
  147. }
  148. if bodyLen = int(packLen - int32(headerLen)); bodyLen > 0 {
  149. p.Body = buf[headerLen:packLen]
  150. } else {
  151. p.Body = nil
  152. }
  153. return
  154. }
  155. // WriteWebsocket write a proto to websocket connection.
  156. func (p *Proto) WriteWebsocket(ws *websocket.Conn) (err error) {
  157. var (
  158. buf []byte
  159. packLen int
  160. )
  161. // NOTE: 通过 OpRaw = 9 为ws批量消息处理
  162. // if p.Operation == OpRaw {
  163. // err = ws.WriteMessage(websocket.BinaryMessage, p.Body)
  164. // return
  165. // }
  166. packLen = _rawHeaderSize + len(p.Body)
  167. if err = ws.WriteHeader(websocket.BinaryMessage, packLen); err != nil {
  168. return
  169. }
  170. if buf, err = ws.Peek(_rawHeaderSize); err != nil {
  171. return
  172. }
  173. binary.BigEndian.PutInt32(buf[_packOffset:], int32(packLen))
  174. binary.BigEndian.PutInt16(buf[_headerOffset:], int16(_rawHeaderSize))
  175. binary.BigEndian.PutInt16(buf[_verOffset:], int16(p.Ver))
  176. binary.BigEndian.PutInt32(buf[_operationOffset:], p.Operation)
  177. binary.BigEndian.PutInt32(buf[_seqIDOffset:], p.SeqId)
  178. binary.BigEndian.PutInt8(buf[_compressOffset:], int8(p.Compress))
  179. binary.BigEndian.PutInt8(buf[_contentTypeOffset:], int8(p.ContentType))
  180. if p.Body != nil {
  181. err = ws.WriteBody(p.Body)
  182. }
  183. return
  184. }
  185. // WriteWebsocketHeart write a heartbeat proto to websocket connnection.
  186. func (p *Proto) WriteWebsocketHeart(wr *websocket.Conn) (err error) {
  187. var (
  188. buf []byte
  189. packLen int
  190. )
  191. if len(p.Body) == 0 {
  192. p.Body = emptyJSONBody
  193. }
  194. packLen = _rawHeaderSize + len(p.Body)
  195. // websocket header
  196. if err = wr.WriteHeader(websocket.BinaryMessage, packLen); err != nil {
  197. return
  198. }
  199. if buf, err = wr.Peek(_rawHeaderSize); err != nil {
  200. return
  201. }
  202. // proto header
  203. binary.BigEndian.PutInt32(buf[_packOffset:], int32(packLen))
  204. binary.BigEndian.PutInt16(buf[_headerOffset:], int16(_rawHeaderSize))
  205. binary.BigEndian.PutInt16(buf[_verOffset:], int16(p.Ver))
  206. binary.BigEndian.PutInt32(buf[_operationOffset:], p.Operation)
  207. binary.BigEndian.PutInt32(buf[_seqIDOffset:], p.SeqId)
  208. binary.BigEndian.PutInt8(buf[_compressOffset:], int8(p.Compress))
  209. binary.BigEndian.PutInt8(buf[_contentTypeOffset:], int8(p.ContentType))
  210. // proto body
  211. if p.Body != nil {
  212. err = wr.WriteBody(p.Body)
  213. }
  214. return
  215. }
  216. // WriteTCPHeart write a heartbeat proto to TCP writer.
  217. func (p *Proto) WriteTCPHeart(wr *bufio.Writer) (err error) {
  218. var (
  219. buf []byte
  220. packLen int32
  221. )
  222. if len(p.Body) == 0 {
  223. p.Body = emptyJSONBody
  224. }
  225. packLen = _rawHeaderSize + int32(len(p.Body))
  226. if buf, err = wr.Peek(_rawHeaderSize); err != nil {
  227. return
  228. }
  229. // header
  230. binary.BigEndian.PutInt32(buf[_packOffset:], int32(packLen))
  231. binary.BigEndian.PutInt16(buf[_headerOffset:], int16(_rawHeaderSize))
  232. binary.BigEndian.PutInt16(buf[_verOffset:], int16(p.Ver))
  233. binary.BigEndian.PutInt32(buf[_operationOffset:], p.Operation)
  234. binary.BigEndian.PutInt32(buf[_seqIDOffset:], p.SeqId)
  235. binary.BigEndian.PutInt8(buf[_compressOffset:], int8(p.Compress))
  236. binary.BigEndian.PutInt8(buf[_contentTypeOffset:], int8(p.ContentType))
  237. // body
  238. if p.Body != nil {
  239. _, err = wr.Write(p.Body)
  240. }
  241. return
  242. }