server_tcp.go 9.2 KB


  1. package server
  2. import (
  3. "context"
  4. "io"
  5. "net"
  6. "strings"
  7. "time"
  8. iModel "go-common/app/interface/main/broadcast/model"
  9. "go-common/app/service/main/broadcast/libs/bufio"
  10. "go-common/app/service/main/broadcast/libs/bytes"
  11. itime "go-common/app/service/main/broadcast/libs/time"
  12. "go-common/app/service/main/broadcast/model"
  13. "go-common/library/log"
  14. "go-common/library/net/metadata"
  15. )
  16. // InitTCP listen all tcp.bind and start accept connections.
  17. func InitTCP(server *Server, addrs []string, accept int) (err error) {
  18. var (
  19. bind string
  20. listener *net.TCPListener
  21. addr *net.TCPAddr
  22. )
  23. for _, bind = range addrs {
  24. if addr, err = net.ResolveTCPAddr("tcp", bind); err != nil {
  25. log.Error("net.ResolveTCPAddr(\"tcp\", \"%s\") error(%v)", bind, err)
  26. return
  27. }
  28. if listener, err = net.ListenTCP("tcp", addr); err != nil {
  29. log.Error("net.ListenTCP(\"tcp\", \"%s\") error(%v)", bind, err)
  30. return
  31. }
  32. log.Info("start tcp listen: \"%s\"", bind)
  33. // split N core accept
  34. for i := 0; i < accept; i++ {
  35. go acceptTCP(server, listener)
  36. }
  37. }
  38. return
  39. }
  40. // Accept accepts connections on the listener and serves requests
  41. // for each incoming connection. Accept blocks; the caller typically
  42. // invokes it in a go statement.
  43. func acceptTCP(server *Server, lis *net.TCPListener) {
  44. var (
  45. conn *net.TCPConn
  46. err error
  47. r int
  48. )
  49. for {
  50. if conn, err = lis.AcceptTCP(); err != nil {
  51. // if listener close then return
  52. log.Error("listener.Accept(\"%s\") error(%v)", lis.Addr().String(), err)
  53. return
  54. }
  55. if err = conn.SetKeepAlive(server.c.TCP.Keepalive); err != nil {
  56. log.Error("conn.SetKeepAlive() error(%v)", err)
  57. return
  58. }
  59. if err = conn.SetReadBuffer(server.c.TCP.Rcvbuf); err != nil {
  60. log.Error("conn.SetReadBuffer() error(%v)", err)
  61. return
  62. }
  63. if err = conn.SetWriteBuffer(server.c.TCP.Sndbuf); err != nil {
  64. log.Error("conn.SetWriteBuffer() error(%v)", err)
  65. return
  66. }
  67. go serveTCP(server, conn, r)
  68. if r++; r == _maxInt {
  69. r = 0
  70. }
  71. }
  72. }
  73. func serveTCP(s *Server, conn *net.TCPConn, r int) {
  74. var (
  75. // timer
  76. tr = s.round.Timer(r)
  77. rp = s.round.Reader(r)
  78. wp = s.round.Writer(r)
  79. // ip addr
  80. lAddr = conn.LocalAddr().String()
  81. rAddr = conn.RemoteAddr().String()
  82. )
  83. if s.c.Broadcast.Debug {
  84. log.Info("start tcp serve \"%s\" with \"%s\"", lAddr, rAddr)
  85. }
  86. s.ServeTCP(conn, rp, wp, tr)
  87. }
  88. // ServeTCP .
  89. func (s *Server) ServeTCP(conn *net.TCPConn, rp, wp *bytes.Pool, tr *itime.Timer) {
  90. var (
  91. err error
  92. rid string
  93. accepts []int32
  94. white bool
  95. p *model.Proto
  96. b *Bucket
  97. trd *itime.TimerData
  98. lastHb = time.Now()
  99. rb = rp.Get()
  100. wb = wp.Get()
  101. ch = NewChannel(s.c.ProtoSection.CliProto, s.c.ProtoSection.SvrProto)
  102. rr = &ch.Reader
  103. wr = &ch.Writer
  104. )
  105. ch.Reader.ResetBuffer(conn, rb.Bytes())
  106. ch.Writer.ResetBuffer(conn, wb.Bytes())
  107. // handshake
  108. step := 0
  109. trd = tr.Add(time.Duration(s.c.ProtoSection.HandshakeTimeout), func() {
  110. conn.Close()
  111. log.Error("key: %s remoteIP: %s step: %d tcp handshake timeout", ch.Key, conn.RemoteAddr().String(), step)
  112. })
  113. ch.IP, _, _ = net.SplitHostPort(conn.RemoteAddr().String())
  114. // must not setadv, only used in auth
  115. step = 1
  116. md := metadata.MD{
  117. metadata.RemoteIP: ch.IP,
  118. }
  119. ctx := metadata.NewContext(context.Background(), md)
  120. ctx, cancel := context.WithCancel(ctx)
  121. defer cancel()
  122. if p, err = ch.CliProto.Set(); err == nil {
  123. if ch.Mid, ch.Key, rid, ch.Platform, accepts, err = s.authTCP(ctx, rr, wr, p); err == nil {
  124. ch.Watch(accepts...)
  125. b = s.Bucket(ch.Key)
  126. err = b.Put(rid, ch)
  127. if s.c.Broadcast.Debug {
  128. log.Info("tcp connnected key:%s mid:%d proto:%+v", ch.Key, ch.Mid, p)
  129. }
  130. }
  131. }
  132. step = 2
  133. if err != nil {
  134. conn.Close()
  135. rp.Put(rb)
  136. wp.Put(wb)
  137. tr.Del(trd)
  138. log.Error("key: %s handshake failed error(%v)", ch.Key, err)
  139. return
  140. }
  141. trd.Key = ch.Key
  142. tr.Set(trd, _clientHeartbeat)
  143. white = whitelist.Contains(ch.Mid)
  144. if white {
  145. whitelist.Printf("key: %s[%s] auth\n", ch.Key, rid)
  146. }
  147. step = 3
  148. reportCh(actionConnect, ch)
  149. // hanshake ok start dispatch goroutine
  150. go s.dispatchTCP(conn, wr, wp, wb, ch)
  151. serverHeartbeat := s.RandServerHearbeat()
  152. for {
  153. if p, err = ch.CliProto.Set(); err != nil {
  154. break
  155. }
  156. if white {
  157. whitelist.Printf("key: %s start read proto\n", ch.Key)
  158. }
  159. if err = p.ReadTCP(rr); err != nil {
  160. break
  161. }
  162. if white {
  163. whitelist.Printf("key: %s read proto:%v\n", ch.Key, p)
  164. }
  165. if p.Operation == model.OpHeartbeat {
  166. tr.Set(trd, _clientHeartbeat)
  167. p.Body = nil
  168. p.Operation = model.OpHeartbeatReply
  169. // last server heartbeat
  170. if now := time.Now(); now.Sub(lastHb) > serverHeartbeat {
  171. if err = s.Heartbeat(ctx, ch.Mid, ch.Key); err == nil {
  172. lastHb = now
  173. } else {
  174. err = nil
  175. }
  176. }
  177. if s.c.Broadcast.Debug {
  178. log.Info("tcp heartbeat receive key:%s, mid:%d", ch.Key, ch.Mid)
  179. }
  180. step++
  181. } else {
  182. if err = s.Operate(p, ch, b); err != nil {
  183. break
  184. }
  185. }
  186. if white {
  187. whitelist.Printf("key: %s process proto:%v\n", ch.Key, p)
  188. }
  189. ch.CliProto.SetAdv()
  190. ch.Signal()
  191. if white {
  192. whitelist.Printf("key: %s signal\n", ch.Key)
  193. }
  194. }
  195. if white {
  196. whitelist.Printf("key: %s server tcp error(%v)\n", ch.Key, err)
  197. }
  198. if err != nil && err != io.EOF && !strings.Contains(err.Error(), "closed") {
  199. log.Error("key: %s server tcp failed error(%v)", ch.Key, err)
  200. }
  201. b.Del(ch)
  202. tr.Del(trd)
  203. rp.Put(rb)
  204. conn.Close()
  205. ch.Close()
  206. if err = s.Disconnect(ctx, ch.Mid, ch.Key); err != nil {
  207. log.Error("key: %s operator do disconnect error(%v)", ch.Key, err)
  208. }
  209. if white {
  210. whitelist.Printf("key: %s disconnect error(%v)\n", ch.Key, err)
  211. }
  212. reportCh(actionDisconnect, ch)
  213. if s.c.Broadcast.Debug {
  214. log.Info("tcp disconnected key: %s mid:%d", ch.Key, ch.Mid)
  215. }
  216. }
  217. // dispatch accepts connections on the listener and serves requests
  218. // for each incoming connection. dispatch blocks; the caller typically
  219. // invokes it in a go statement.
  220. func (s *Server) dispatchTCP(conn *net.TCPConn, wr *bufio.Writer, wp *bytes.Pool, wb *bytes.Buffer, ch *Channel) {
  221. var (
  222. err error
  223. finish bool
  224. online int32
  225. white = whitelist.Contains(ch.Mid)
  226. )
  227. if s.c.Broadcast.Debug {
  228. log.Info("key: %s start dispatch tcp goroutine", ch.Key)
  229. }
  230. for {
  231. if white {
  232. whitelist.Printf("key: %s wait proto ready\n", ch.Key)
  233. }
  234. var p = ch.Ready()
  235. if white {
  236. whitelist.Printf("key: %s proto ready\n", ch.Key)
  237. }
  238. if s.c.Broadcast.Debug {
  239. log.Info("key:%s dispatch msg:%v", ch.Key, *p)
  240. }
  241. switch p {
  242. case model.ProtoFinish:
  243. if white {
  244. whitelist.Printf("key: %s receive proto finish\n", ch.Key)
  245. }
  246. if s.c.Broadcast.Debug {
  247. log.Info("key: %s wakeup exit dispatch goroutine", ch.Key)
  248. }
  249. finish = true
  250. goto failed
  251. case model.ProtoReady:
  252. // fetch message from svrbox(client send)
  253. for {
  254. if p, err = ch.CliProto.Get(); err != nil {
  255. err = nil // must be empty error
  256. break
  257. }
  258. if white {
  259. whitelist.Printf("key: %s start write client proto%v\n", ch.Key, p)
  260. }
  261. if p.Operation == model.OpHeartbeatReply {
  262. if ch.Room != nil {
  263. online = ch.Room.OnlineNum()
  264. b := map[string]interface{}{"room": map[string]interface{}{"online": online, "room_id": ch.Room.ID}}
  265. p.Body = iModel.Message(b, nil)
  266. }
  267. if err = p.WriteTCPHeart(wr); err != nil {
  268. goto failed
  269. }
  270. } else {
  271. if err = p.WriteTCP(wr); err != nil {
  272. goto failed
  273. }
  274. }
  275. if white {
  276. whitelist.Printf("key: %s write client proto%v\n", ch.Key, p)
  277. }
  278. p.Body = nil // avoid memory leak
  279. ch.CliProto.GetAdv()
  280. }
  281. default:
  282. if white {
  283. whitelist.Printf("key: %s start write server proto%v\n", ch.Key, p)
  284. }
  285. // server send
  286. if err = p.WriteTCP(wr); err != nil {
  287. goto failed
  288. }
  289. if white {
  290. whitelist.Printf("key: %s write server proto%v\n", ch.Key, p)
  291. }
  292. if s.c.Broadcast.Debug {
  293. log.Info("tcp sent a message key:%s mid:%d proto:%+v", ch.Key, ch.Mid, p)
  294. }
  295. }
  296. if white {
  297. whitelist.Printf("key: %s start flush \n", ch.Key)
  298. }
  299. // only hungry flush response
  300. if err = wr.Flush(); err != nil {
  301. break
  302. }
  303. if white {
  304. whitelist.Printf("key: %s flush\n", ch.Key)
  305. }
  306. }
  307. failed:
  308. if white {
  309. whitelist.Printf("key: %s dispatch tcp error(%v)\n", ch.Key, err)
  310. }
  311. if err != nil {
  312. log.Error("key: %s dispatch tcp error(%v)", ch.Key, err)
  313. }
  314. conn.Close()
  315. wp.Put(wb)
  316. // must ensure all channel message discard, for reader won't blocking Signal
  317. for !finish {
  318. finish = (ch.Ready() == model.ProtoFinish)
  319. }
  320. if s.c.Broadcast.Debug {
  321. log.Info("key: %s dispatch goroutine exit", ch.Key)
  322. }
  323. }
  324. // auth for goim handshake with client, use rsa & aes.
  325. func (s *Server) authTCP(ctx context.Context, rr *bufio.Reader, wr *bufio.Writer, p *model.Proto) (mid int64, key string, rid string, platform string, accepts []int32, err error) {
  326. if err = p.ReadTCP(rr); err != nil {
  327. log.Error("authTCP.ReadTCP(key:%v).err(%v)", key, err)
  328. return
  329. }
  330. if p.Operation != model.OpAuth {
  331. log.Warn("auth operation not valid: %d", p.Operation)
  332. err = ErrOperation
  333. return
  334. }
  335. if mid, key, rid, platform, accepts, err = s.Connect(ctx, p, ""); err != nil {
  336. log.Error("authTCP.Connect(key:%v).err(%v)", key, err)
  337. return
  338. }
  339. p.Body = []byte(`{"code":0,"message":"ok"}`)
  340. p.Operation = model.OpAuthReply
  341. if err = p.WriteTCP(wr); err != nil {
  342. log.Error("authTCP.WriteTCP(key:%v).err(%v)", key, err)
  343. return
  344. }
  345. err = wr.Flush()
  346. return
  347. }