model_v1.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package model
  2. import (
  3. "go-common/app/service/main/broadcast/libs/bufio"
  4. "go-common/app/service/main/broadcast/libs/bytes"
  5. "go-common/app/service/main/broadcast/libs/encoding/binary"
  6. "go-common/app/service/main/broadcast/libs/websocket"
  7. )
  8. const (
  9. maxBodySizeV1 = int32(1 << 10)
  10. // size
  11. packSizeV1 = 4
  12. headerSizeV1 = 2
  13. verSizeV1 = 2
  14. operationSizeV1 = 4
  15. seqIDSizeV1 = 4
  16. heartbeatSizeV1 = 4
  17. rawHeaderSizeV1 = packSizeV1 + headerSizeV1 + verSizeV1 + operationSizeV1 + seqIDSizeV1
  18. maxPackSizeV1 = maxBodySizeV1 + int32(rawHeaderSizeV1)
  19. // offset
  20. packOffsetV1 = 0
  21. headerOffsetV1 = packOffsetV1 + packSizeV1
  22. verOffsetV1 = headerOffsetV1 + headerSizeV1
  23. operationOffsetV1 = verOffsetV1 + verSizeV1
  24. seqIDOffsetV1 = operationOffsetV1 + operationSizeV1
  25. heartbeatOffsetV1 = seqIDOffsetV1 + seqIDSizeV1
  26. )
  27. // WriteToV1 .
  28. func (p *Proto) WriteToV1(b *bytes.Writer) {
  29. var (
  30. packLen = rawHeaderSizeV1 + int32(len(p.Body))
  31. buf = b.Peek(rawHeaderSizeV1)
  32. )
  33. binary.BigEndian.PutInt32(buf[packOffsetV1:], packLen)
  34. binary.BigEndian.PutInt16(buf[headerOffsetV1:], int16(rawHeaderSizeV1))
  35. binary.BigEndian.PutInt16(buf[verOffsetV1:], int16(p.Ver))
  36. binary.BigEndian.PutInt32(buf[operationOffsetV1:], p.Operation)
  37. binary.BigEndian.PutInt32(buf[seqIDOffsetV1:], p.SeqId)
  38. if p.Body != nil {
  39. b.Write(p.Body)
  40. }
  41. }
  42. // ReadTCPV1 .
  43. func (p *Proto) ReadTCPV1(rr *bufio.Reader) (err error) {
  44. var (
  45. bodyLen int
  46. headerLen int16
  47. packLen int32
  48. buf []byte
  49. )
  50. if buf, err = rr.Pop(rawHeaderSizeV1); err != nil {
  51. return
  52. }
  53. packLen = binary.BigEndian.Int32(buf[packOffsetV1:headerOffsetV1])
  54. headerLen = binary.BigEndian.Int16(buf[headerOffsetV1:verOffsetV1])
  55. p.Ver = int32(binary.BigEndian.Int16(buf[verOffsetV1:operationOffsetV1]))
  56. p.Operation = binary.BigEndian.Int32(buf[operationOffsetV1:seqIDOffsetV1])
  57. p.SeqId = binary.BigEndian.Int32(buf[seqIDOffsetV1:])
  58. if packLen > maxPackSizeV1 {
  59. return ErrProtoPackLen
  60. }
  61. if headerLen != rawHeaderSizeV1 {
  62. return ErrProtoHeaderLen
  63. }
  64. if bodyLen = int(packLen - int32(headerLen)); bodyLen > 0 {
  65. p.Body, err = rr.Pop(bodyLen)
  66. } else {
  67. p.Body = nil
  68. }
  69. return
  70. }
  71. // WriteTCPV1 .
  72. func (p *Proto) WriteTCPV1(wr *bufio.Writer) (err error) {
  73. var (
  74. buf []byte
  75. packLen int32
  76. )
  77. if p.Operation == OpRaw {
  78. // write without buffer, job concact proto into raw buffer
  79. _, err = wr.WriteRaw(p.Body)
  80. return
  81. }
  82. packLen = rawHeaderSizeV1 + int32(len(p.Body))
  83. if buf, err = wr.Peek(rawHeaderSizeV1); err != nil {
  84. return
  85. }
  86. binary.BigEndian.PutInt32(buf[packOffsetV1:], packLen)
  87. binary.BigEndian.PutInt16(buf[headerOffsetV1:], int16(rawHeaderSizeV1))
  88. binary.BigEndian.PutInt16(buf[verOffsetV1:], int16(p.Ver))
  89. binary.BigEndian.PutInt32(buf[operationOffsetV1:], p.Operation)
  90. binary.BigEndian.PutInt32(buf[seqIDOffsetV1:], p.SeqId)
  91. if p.Body != nil {
  92. _, err = wr.Write(p.Body)
  93. }
  94. return
  95. }
  96. // WriteTCPHeartV1 .
  97. func (p *Proto) WriteTCPHeartV1(wr *bufio.Writer, online int32) (err error) {
  98. var (
  99. buf []byte
  100. packLen int
  101. )
  102. packLen = rawHeaderSizeV1 + heartbeatSizeV1
  103. if buf, err = wr.Peek(packLen); err != nil {
  104. return
  105. }
  106. // header
  107. binary.BigEndian.PutInt32(buf[packOffsetV1:], int32(packLen))
  108. binary.BigEndian.PutInt16(buf[headerOffsetV1:], int16(rawHeaderSizeV1))
  109. binary.BigEndian.PutInt16(buf[verOffsetV1:], int16(p.Ver))
  110. binary.BigEndian.PutInt32(buf[operationOffsetV1:], p.Operation)
  111. binary.BigEndian.PutInt32(buf[seqIDOffsetV1:], p.SeqId)
  112. // body
  113. binary.BigEndian.PutInt32(buf[heartbeatOffsetV1:], online)
  114. return
  115. }
  116. // ReadWebsocketV1 .
  117. func (p *Proto) ReadWebsocketV1(ws *websocket.Conn) (err error) {
  118. var (
  119. bodyLen int
  120. headerLen int16
  121. packLen int32
  122. buf []byte
  123. )
  124. if _, buf, err = ws.ReadMessage(); err != nil {
  125. return
  126. }
  127. if len(buf) < rawHeaderSizeV1 {
  128. return ErrProtoPackLen
  129. }
  130. packLen = binary.BigEndian.Int32(buf[packOffsetV1:headerOffsetV1])
  131. headerLen = binary.BigEndian.Int16(buf[headerOffsetV1:verOffsetV1])
  132. p.Ver = int32(binary.BigEndian.Int16(buf[verOffsetV1:operationOffsetV1]))
  133. p.Operation = binary.BigEndian.Int32(buf[operationOffsetV1:seqIDOffsetV1])
  134. p.SeqId = binary.BigEndian.Int32(buf[seqIDOffsetV1:])
  135. if packLen > maxPackSizeV1 {
  136. return ErrProtoPackLen
  137. }
  138. if headerLen != rawHeaderSizeV1 {
  139. return ErrProtoHeaderLen
  140. }
  141. if bodyLen = int(packLen - int32(headerLen)); bodyLen > 0 {
  142. p.Body = buf[headerLen:packLen]
  143. } else {
  144. p.Body = nil
  145. }
  146. return
  147. }
  148. // WriteWebsocketV1 .
  149. func (p *Proto) WriteWebsocketV1(ws *websocket.Conn) (err error) {
  150. var (
  151. buf []byte
  152. packLen int
  153. )
  154. if p.Operation == OpRaw {
  155. err = ws.WriteMessage(websocket.BinaryMessage, p.Body)
  156. return
  157. }
  158. packLen = rawHeaderSizeV1 + len(p.Body)
  159. if err = ws.WriteHeader(websocket.BinaryMessage, packLen); err != nil {
  160. return
  161. }
  162. if buf, err = ws.Peek(rawHeaderSizeV1); err != nil {
  163. return
  164. }
  165. binary.BigEndian.PutInt32(buf[packOffsetV1:], int32(packLen))
  166. binary.BigEndian.PutInt16(buf[headerOffsetV1:], int16(rawHeaderSizeV1))
  167. binary.BigEndian.PutInt16(buf[verOffsetV1:], int16(p.Ver))
  168. binary.BigEndian.PutInt32(buf[operationOffsetV1:], p.Operation)
  169. binary.BigEndian.PutInt32(buf[seqIDOffsetV1:], p.SeqId)
  170. if p.Body != nil {
  171. err = ws.WriteBody(p.Body)
  172. }
  173. return
  174. }
  175. // WriteWebsocketHeartV1 .
  176. func (p *Proto) WriteWebsocketHeartV1(wr *websocket.Conn, online int32) (err error) {
  177. var (
  178. buf []byte
  179. packLen int
  180. )
  181. packLen = rawHeaderSizeV1 + heartbeatSizeV1
  182. // websocket header
  183. if err = wr.WriteHeader(websocket.BinaryMessage, packLen); err != nil {
  184. return
  185. }
  186. if buf, err = wr.Peek(packLen); err != nil {
  187. return
  188. }
  189. // proto header
  190. binary.BigEndian.PutInt32(buf[packOffsetV1:], int32(packLen))
  191. binary.BigEndian.PutInt16(buf[headerOffsetV1:], int16(rawHeaderSizeV1))
  192. binary.BigEndian.PutInt16(buf[verOffsetV1:], int16(p.Ver))
  193. binary.BigEndian.PutInt32(buf[operationOffsetV1:], p.Operation)
  194. binary.BigEndian.PutInt32(buf[seqIDOffsetV1:], p.SeqId)
  195. // proto body
  196. binary.BigEndian.PutInt32(buf[heartbeatOffsetV1:], online)
  197. return
  198. }