server_tcp_v1.go 6.4 KB


  1. package server
  2. import (
  3. "io"
  4. "net"
  5. "time"
  6. "go-common/app/service/main/broadcast/libs/bufio"
  7. "go-common/app/service/main/broadcast/libs/bytes"
  8. itime "go-common/app/service/main/broadcast/libs/time"
  9. "go-common/app/service/main/broadcast/model"
  10. "go-common/library/log"
  11. )
  12. // InitTCPV1 listen all tcp.bind and start accept connections.
  13. func InitTCPV1(server *Server, addrs []string, accept int) (err error) {
  14. var (
  15. bind string
  16. listener *net.TCPListener
  17. addr *net.TCPAddr
  18. )
  19. for _, bind = range addrs {
  20. if addr, err = net.ResolveTCPAddr("tcp", bind); err != nil {
  21. log.Error("net.ResolveTCPAddr(\"tcp\", \"%s\") error(%v)", bind, err)
  22. return
  23. }
  24. if listener, err = net.ListenTCP("tcp", addr); err != nil {
  25. log.Error("net.ListenTCP(\"tcp\", \"%s\") error(%v)", bind, err)
  26. return
  27. }
  28. log.Info("start tcp listen: \"%s\"", bind)
  29. // split N core accept
  30. for i := 0; i < accept; i++ {
  31. go acceptTCPV1(server, listener)
  32. }
  33. }
  34. return
  35. }
  36. // Accept accepts connections on the listener and serves requests
  37. // for each incoming connection. Accept blocks; the caller typically
  38. // invokes it in a go statement.
  39. func acceptTCPV1(server *Server, lis *net.TCPListener) {
  40. var (
  41. conn *net.TCPConn
  42. err error
  43. r int
  44. )
  45. for {
  46. if conn, err = lis.AcceptTCP(); err != nil {
  47. // if listener close then return
  48. log.Error("listener.Accept(\"%s\") error(%v)", lis.Addr().String(), err)
  49. time.Sleep(time.Second)
  50. continue
  51. }
  52. if err = conn.SetKeepAlive(server.c.TCP.Keepalive); err != nil {
  53. log.Error("conn.SetKeepAlive() error(%v)", err)
  54. return
  55. }
  56. if err = conn.SetReadBuffer(server.c.TCP.Rcvbuf); err != nil {
  57. log.Error("conn.SetReadBuffer() error(%v)", err)
  58. return
  59. }
  60. if err = conn.SetWriteBuffer(server.c.TCP.Sndbuf); err != nil {
  61. log.Error("conn.SetWriteBuffer() error(%v)", err)
  62. return
  63. }
  64. go serveTCPV1(server, conn, r)
  65. if r++; r == _maxInt {
  66. r = 0
  67. }
  68. }
  69. }
  70. func serveTCPV1(s *Server, conn *net.TCPConn, r int) {
  71. var (
  72. // timer
  73. tr = s.round.Timer(r)
  74. rp = s.round.Reader(r)
  75. wp = s.round.Writer(r)
  76. // ip addr
  77. lAddr = conn.LocalAddr().String()
  78. rAddr = conn.RemoteAddr().String()
  79. )
  80. if s.c.Broadcast.Debug {
  81. log.Info("start tcp serve \"%s\" with \"%s\"", lAddr, rAddr)
  82. }
  83. s.serveTCPV1(conn, rp, wp, tr)
  84. }
  85. // TODO linger close?
  86. func (s *Server) serveTCPV1(conn *net.TCPConn, rp, wp *bytes.Pool, tr *itime.Timer) {
  87. var (
  88. err error
  89. roomID string
  90. hb time.Duration // heartbeat
  91. p *model.Proto
  92. b *Bucket
  93. trd *itime.TimerData
  94. rpt *Report
  95. rb = rp.Get()
  96. wb = wp.Get()
  97. ch = NewChannel(s.c.ProtoSection.CliProto, s.c.ProtoSection.SvrProto)
  98. rr = &ch.Reader
  99. wr = &ch.Writer
  100. )
  101. ch.Reader.ResetBuffer(conn, rb.Bytes())
  102. ch.Writer.ResetBuffer(conn, wb.Bytes())
  103. // handshake
  104. trd = tr.Add(time.Duration(s.c.ProtoSection.HandshakeTimeout), func() {
  105. conn.Close()
  106. })
  107. ch.V1 = true
  108. ch.IP, _, _ = net.SplitHostPort(conn.RemoteAddr().String())
  109. // must not setadv, only used in auth
  110. if p, err = ch.CliProto.Set(); err == nil {
  111. if ch.Key, roomID, ch.Mid, hb, rpt, err = s.authTCPV1(rr, wr, p, ch.IP); err == nil {
  112. b = s.Bucket(ch.Key)
  113. err = b.Put(roomID, ch)
  114. }
  115. }
  116. if err != nil {
  117. if err != io.EOF {
  118. log.Error("key: %s ip: %s handshake failed error(%v)", ch.Key, conn.RemoteAddr().String(), err)
  119. }
  120. conn.Close()
  121. rp.Put(rb)
  122. wp.Put(wb)
  123. tr.Del(trd)
  124. return
  125. }
  126. trd.Key = ch.Key
  127. tr.Set(trd, hb)
  128. var online int32
  129. if ch.Room != nil {
  130. online = ch.Room.OnlineNum()
  131. }
  132. report(actionConnect, rpt, online)
  133. // hanshake ok start dispatch goroutine
  134. go s.dispatchTCPV1(ch.Key, conn, wr, wp, wb, ch)
  135. for {
  136. if p, err = ch.CliProto.Set(); err != nil {
  137. break
  138. }
  139. if err = p.ReadTCPV1(rr); err != nil {
  140. break
  141. }
  142. if p.Operation == model.OpHeartbeat {
  143. tr.Set(trd, hb)
  144. p.Body = nil
  145. p.Operation = model.OpHeartbeatReply
  146. } else {
  147. if err = s.Operate(p, ch, b); err != nil {
  148. break
  149. }
  150. }
  151. ch.CliProto.SetAdv()
  152. ch.Signal()
  153. }
  154. if err != nil && err != io.EOF {
  155. log.Error("key: %s server tcp failed error(%v)", ch.Key, err)
  156. }
  157. b.Del(ch)
  158. tr.Del(trd)
  159. rp.Put(rb)
  160. conn.Close()
  161. ch.Close()
  162. //if err = s.Disconnect(context.Background(), ch.Mid, roomID); err != nil {
  163. // log.Error("key: %s operator do disconnect error(%v)", ch.Key, err)
  164. //}
  165. if ch.Room != nil {
  166. online = ch.Room.OnlineNum()
  167. }
  168. report(actionDisconnect, rpt, online)
  169. }
  170. // dispatch accepts connections on the listener and serves requests
  171. // for each incoming connection. dispatch blocks; the caller typically
  172. // invokes it in a go statement.
  173. func (s *Server) dispatchTCPV1(key string, conn *net.TCPConn, wr *bufio.Writer, wp *bytes.Pool, wb *bytes.Buffer, ch *Channel) {
  174. var (
  175. err error
  176. finish bool
  177. online int32
  178. )
  179. for {
  180. var p = ch.Ready()
  181. switch p {
  182. case model.ProtoFinish:
  183. finish = true
  184. goto failed
  185. case model.ProtoReady:
  186. // fetch message from svrbox(client send)
  187. for {
  188. if p, err = ch.CliProto.Get(); err != nil {
  189. err = nil // must be empty error
  190. break
  191. }
  192. if p.Operation == model.OpHeartbeatReply {
  193. if ch.Room != nil {
  194. online = ch.Room.OnlineNum()
  195. }
  196. if err = p.WriteTCPHeartV1(wr, online); err != nil {
  197. goto failed
  198. }
  199. } else {
  200. if err = p.WriteTCPV1(wr); err != nil {
  201. goto failed
  202. }
  203. }
  204. p.Body = nil // avoid memory leak
  205. ch.CliProto.GetAdv()
  206. }
  207. default:
  208. // server send
  209. if err = p.WriteTCPV1(wr); err != nil {
  210. goto failed
  211. }
  212. }
  213. // only hungry flush response
  214. if err = wr.Flush(); err != nil {
  215. break
  216. }
  217. }
  218. failed:
  219. if err != nil {
  220. log.Error("key: %s dispatch tcp error(%v)", key, err)
  221. }
  222. conn.Close()
  223. wp.Put(wb)
  224. // must ensure all channel message discard, for reader won't blocking Signal
  225. for !finish {
  226. finish = (ch.Ready() == model.ProtoFinish)
  227. }
  228. }
  229. // auth for goim handshake with client, use rsa & aes.
  230. func (s *Server) authTCPV1(rr *bufio.Reader, wr *bufio.Writer, p *model.Proto, ip string) (key, roomID string, userID int64, heartbeat time.Duration, rpt *Report, err error) {
  231. if err = p.ReadTCPV1(rr); err != nil {
  232. return
  233. }
  234. if p.Operation != model.OpAuth {
  235. log.Warn("auth operation not valid: %d", p.Operation)
  236. err = ErrOperation
  237. return
  238. }
  239. if userID, roomID, key, rpt, err = s.NoAuth(int16(p.Ver), p.Body, ip); err != nil {
  240. return
  241. }
  242. heartbeat = _clientHeartbeat
  243. p.Body = nil
  244. p.Operation = model.OpAuthReply
  245. if err = p.WriteTCPV1(wr); err != nil {
  246. return
  247. }
  248. err = wr.Flush()
  249. return
  250. }