client.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. // Copyright 2009 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package rpc
  5. import (
  6. "bufio"
  7. "context"
  8. "encoding/gob"
  9. "errors"
  10. "io"
  11. "log"
  12. "net"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. "go-common/library/conf/env"
  17. "go-common/library/ecode"
  18. xlog "go-common/library/log"
  19. "go-common/library/net/metadata"
  20. "go-common/library/net/netutil/breaker"
  21. "go-common/library/net/trace"
  22. "go-common/library/stat"
  23. xtime "go-common/library/time"
  24. perr "github.com/pkg/errors"
  25. )
  26. const (
  27. _family = "gorpc"
  28. _pingDuration = time.Second * 1
  29. )
  30. var (
  31. stats = stat.RPCClient
  32. // ErrShutdown shutdown error.
  33. ErrShutdown = errors.New("connection is shut down")
  34. // ErrNoClient current no rpc client.
  35. ErrNoClient = errors.New("no rpc client")
  36. errClient = new(client)
  37. )
  38. // ServerError represents an error that has been returned from
  39. // the remote side of the RPC connection.
  40. type ServerError string
  41. func (e ServerError) Error() string {
  42. return string(e)
  43. }
  44. // Call represents an active RPC.
  45. type Call struct {
  46. ServiceMethod string // The name of the service and method to call.
  47. Args interface{} // The argument to the function (*struct).
  48. Reply interface{} // The reply from the function (*struct).
  49. Trace trace.Trace
  50. Color string
  51. RemoteIP string
  52. Timeout time.Duration
  53. Error error // After completion, the error status.
  54. Done chan *Call // Strobes when call is complete.
  55. }
  56. // client represents an RPC Client.
  57. // There may be multiple outstanding Calls associated
  58. // with a single Client, and a Client may be used by
  59. // multiple goroutines simultaneously.
  60. type client struct {
  61. codec *clientCodec
  62. reqMutex sync.Mutex // protects following
  63. request Request
  64. mutex sync.Mutex // protects following
  65. seq uint64
  66. pending map[uint64]*Call
  67. closing bool // user has called Close
  68. shutdown bool // server has told us to stop
  69. timeout time.Duration // call timeout
  70. remoteAddr string // server address
  71. }
  72. func (client *client) send(call *Call) {
  73. client.reqMutex.Lock()
  74. defer client.reqMutex.Unlock()
  75. // Register this call.
  76. client.mutex.Lock()
  77. if client.shutdown || client.closing {
  78. call.Error = ErrShutdown
  79. client.mutex.Unlock()
  80. call.done()
  81. return
  82. }
  83. seq := client.seq
  84. client.seq++
  85. client.mutex.Unlock()
  86. // Encode and send the request.
  87. client.request.Seq = seq
  88. client.request.ServiceMethod = call.ServiceMethod
  89. client.request.Color = call.Color
  90. client.request.RemoteIP = call.RemoteIP
  91. client.request.Timeout = call.Timeout
  92. if call.Trace != nil {
  93. trace.Inject(call.Trace, nil, &client.request.Trace)
  94. } else {
  95. client.request.Trace = TraceInfo{}
  96. }
  97. err := client.codec.WriteRequest(&client.request, call.Args)
  98. if err != nil {
  99. err = perr.WithStack(err)
  100. if call != nil {
  101. call.Error = err
  102. call.done()
  103. }
  104. } else {
  105. client.mutex.Lock()
  106. client.pending[seq] = call
  107. client.mutex.Unlock()
  108. }
  109. }
  110. func (client *client) input() {
  111. var err error
  112. var response Response
  113. for err == nil {
  114. response = Response{}
  115. err = client.codec.ReadResponseHeader(&response)
  116. if err != nil {
  117. break
  118. }
  119. seq := response.Seq
  120. client.mutex.Lock()
  121. call := client.pending[seq]
  122. delete(client.pending, seq)
  123. client.mutex.Unlock()
  124. switch {
  125. case call == nil:
  126. // We've got no pending call. That usually means that
  127. // WriteRequest partially failed, and call was already
  128. // removed; response is a server telling us about an
  129. // error reading request body. We should still attempt
  130. // to read error body, but there's no one to give it to.
  131. err = client.codec.ReadResponseBody(nil)
  132. if err != nil {
  133. err = errors.New("reading error body: " + err.Error())
  134. }
  135. case response.Error != "":
  136. // We've got an error response. Give this to the request;
  137. // any subsequent requests will get the ReadResponseBody
  138. // error if there is one.
  139. call.Error = ServerError(response.Error)
  140. err = client.codec.ReadResponseBody(nil)
  141. if err != nil {
  142. err = errors.New("reading error body: " + err.Error())
  143. }
  144. call.done()
  145. default:
  146. err = client.codec.ReadResponseBody(call.Reply)
  147. if err != nil {
  148. call.Error = errors.New("reading body " + err.Error())
  149. }
  150. call.done()
  151. }
  152. }
  153. // Terminate pending calls.
  154. client.reqMutex.Lock()
  155. client.mutex.Lock()
  156. client.shutdown = true
  157. closing := client.closing
  158. if err == io.EOF {
  159. if closing {
  160. err = ErrShutdown
  161. } else {
  162. err = io.ErrUnexpectedEOF
  163. }
  164. }
  165. for _, call := range client.pending {
  166. call.Error = err
  167. call.done()
  168. }
  169. client.mutex.Unlock()
  170. client.reqMutex.Unlock()
  171. if err != io.EOF && !closing {
  172. log.Println("rpc: client protocol error:", err)
  173. }
  174. }
  175. func (call *Call) done() {
  176. select {
  177. case call.Done <- call:
  178. // ok
  179. default:
  180. // We don't want to block here. It is the caller's responsibility to make
  181. // sure the channel has enough buffer space. See comment in Go().
  182. log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
  183. }
  184. }
  185. // Finish must called after Go.
  186. func (call *Call) Finish() {
  187. if call.Trace != nil {
  188. call.Trace.Finish(&call.Error)
  189. }
  190. }
  191. // newClient returns a new Client to handle requests to the
  192. // set of services at the other end of the connection.
  193. // It adds a buffer to the write side of the connection so
  194. // the header and payload are sent as a unit.
  195. func newClient(timeout time.Duration, conn net.Conn) (*client, error) {
  196. encBuf := bufio.NewWriter(conn)
  197. client := &clientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
  198. c := newClientWithCodec(client)
  199. c.timeout = timeout
  200. c.remoteAddr = conn.RemoteAddr().String()
  201. // handshake
  202. c.Call(_authServiceMethod, &Auth{User: env.AppID}, &struct{}{})
  203. return c, nil
  204. }
  205. // newClientWithCodec is like newClient but uses the specified
  206. // codec to encode requests and decode responses.
  207. func newClientWithCodec(codec *clientCodec) *client {
  208. client := &client{
  209. codec: codec,
  210. pending: make(map[uint64]*Call),
  211. }
  212. go client.input()
  213. return client
  214. }
  215. type clientCodec struct {
  216. rwc io.ReadWriteCloser
  217. dec *gob.Decoder
  218. enc *gob.Encoder
  219. encBuf *bufio.Writer
  220. }
  221. func (c *clientCodec) WriteRequest(r *Request, body interface{}) (err error) {
  222. if err = c.enc.Encode(r); err != nil {
  223. return perr.WithStack(err)
  224. }
  225. if err = c.enc.Encode(body); err != nil {
  226. return perr.WithStack(err)
  227. }
  228. return perr.WithStack(c.encBuf.Flush())
  229. }
  230. func (c *clientCodec) ReadResponseHeader(r *Response) error {
  231. return perr.WithStack(c.dec.Decode(r))
  232. }
  233. func (c *clientCodec) ReadResponseBody(body interface{}) error {
  234. return perr.WithStack(c.dec.Decode(body))
  235. }
  236. func (c *clientCodec) Close() error {
  237. return perr.WithStack(c.rwc.Close())
  238. }
  239. // dial connects to an RPC server at the specified network address.
  240. func dial(network, addr string, timeout time.Duration) (*client, error) {
  241. // TODO dial timeout
  242. conn, err := net.Dial(network, addr)
  243. if err != nil {
  244. err = perr.WithStack(err)
  245. return nil, err
  246. }
  247. return newClient(timeout, conn)
  248. }
  249. // Close close the rpc client.
  250. func (client *client) Close() error {
  251. client.mutex.Lock()
  252. if client.closing {
  253. client.mutex.Unlock()
  254. return ErrShutdown
  255. }
  256. client.closing = true
  257. client.mutex.Unlock()
  258. return client.codec.Close()
  259. }
  260. func (client *client) do(call *Call, done chan *Call) {
  261. if done == nil {
  262. done = make(chan *Call, 10) // buffered.
  263. } else {
  264. // If caller passes done != nil, it must arrange that
  265. // done has enough buffer for the number of simultaneous
  266. // RPCs that will be using that channel. If the channel
  267. // is totally unbuffered, it's best not to run at all.
  268. if cap(done) == 0 {
  269. log.Panic("rpc: done channel is unbuffered")
  270. }
  271. }
  272. call.Done = done
  273. client.send(call)
  274. }
  275. // Go invokes the function asynchronously. It returns the Call structure representing
  276. // the invocation. The done channel will signal when the call is complete by returning
  277. // the same Call object. If done is nil, Go will allocate a new channel.
  278. // If non-nil, done must be buffered or Go will deliberately crash.
  279. // Must call Finish() after call Go.
  280. func (client *client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
  281. call := new(Call)
  282. call.ServiceMethod = serviceMethod
  283. call.Args = args
  284. call.Reply = reply
  285. client.do(call, done)
  286. return call
  287. }
  288. // Call invokes the named function, waits for it to complete, and returns its error status.
  289. func (client *client) Call(serviceMethod string, args interface{}, reply interface{}) (err error) {
  290. call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
  291. return call.Error
  292. }
  293. // Do do a rpc call.
  294. func (client *client) Do(call *Call) {
  295. client.do(call, make(chan *Call, 1))
  296. }
  297. type td struct {
  298. path string
  299. timeout time.Duration
  300. }
  301. // Client wrapper is a client holder with implements pinger.
  302. type Client struct {
  303. addr string
  304. timeout xtime.Duration
  305. client atomic.Value
  306. quit chan struct{}
  307. breaker *breaker.Group
  308. }
  309. // Dial connects to an RPC server at the specified network address.
  310. func Dial(addr string, timeout xtime.Duration, bkc *breaker.Config) *Client {
  311. client := &Client{
  312. addr: addr,
  313. timeout: timeout,
  314. quit: make(chan struct{}),
  315. }
  316. // breaker
  317. client.breaker = breaker.NewGroup(bkc)
  318. // timeout
  319. if timeout <= 0 {
  320. client.timeout = xtime.Duration(300 * time.Millisecond)
  321. }
  322. client.timeout = timeout
  323. // dial
  324. rc, err := dial("tcp", addr, time.Duration(timeout))
  325. if err != nil {
  326. xlog.Error("dial(%s, %s) error(%v)", "tcp", addr, err)
  327. } else {
  328. client.client.Store(rc)
  329. }
  330. go client.ping()
  331. return client
  332. }
  333. // Call invokes the named function, waits for it to complete, and returns its error status.
  334. func (c *Client) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) {
  335. var (
  336. ok bool
  337. code string
  338. rc *client
  339. call *Call
  340. cancel func()
  341. t trace.Trace
  342. timeout = time.Duration(c.timeout)
  343. )
  344. if rc, ok = c.client.Load().(*client); !ok || rc == errClient {
  345. xlog.Error("client is errClient (no rpc client) by ping addr(%s) error", c.addr)
  346. return ErrNoClient
  347. }
  348. if t, ok = trace.FromContext(ctx); !ok {
  349. t = trace.New(serviceMethod)
  350. }
  351. t = t.Fork(_family, serviceMethod)
  352. t.SetTag(trace.String(trace.TagAddress, rc.remoteAddr))
  353. defer t.Finish(&err)
  354. // breaker
  355. brk := c.breaker.Get(serviceMethod)
  356. if err = brk.Allow(); err != nil {
  357. code = "breaker"
  358. stats.Incr(serviceMethod, code)
  359. return
  360. }
  361. defer c.onBreaker(brk, &err)
  362. // stat
  363. now := time.Now()
  364. defer func() {
  365. stats.Timing(serviceMethod, int64(time.Since(now)/time.Millisecond))
  366. if code != "" {
  367. stats.Incr(serviceMethod, code)
  368. }
  369. }()
  370. // timeout: get from conf
  371. // if context > conf use conf else context
  372. deliver := true
  373. if deadline, ok := ctx.Deadline(); ok {
  374. if ctimeout := time.Until(deadline); ctimeout < timeout {
  375. timeout = ctimeout
  376. deliver = false
  377. }
  378. }
  379. if deliver {
  380. ctx, cancel = context.WithTimeout(ctx, timeout)
  381. defer cancel()
  382. }
  383. color := metadata.String(ctx, metadata.Color)
  384. remoteIP := metadata.String(ctx, metadata.RemoteIP)
  385. // call
  386. call = &Call{
  387. ServiceMethod: serviceMethod,
  388. Args: args,
  389. Reply: reply,
  390. Trace: t,
  391. Color: color,
  392. RemoteIP: remoteIP,
  393. Timeout: timeout,
  394. }
  395. rc.Do(call)
  396. select {
  397. case call = <-call.Done:
  398. err = call.Error
  399. code = ecode.Cause(err).Error()
  400. case <-ctx.Done():
  401. err = ecode.Deadline
  402. code = "timeout"
  403. }
  404. return
  405. }
  406. func (c *Client) onBreaker(breaker breaker.Breaker, err *error) {
  407. if err != nil && *err != nil && (*err == ErrShutdown || *err == io.ErrUnexpectedEOF || ecode.Deadline.Equal(*err) || ecode.ServiceUnavailable.Equal(*err) || ecode.ServerErr.Equal(*err)) {
  408. breaker.MarkFailed()
  409. } else {
  410. breaker.MarkSuccess()
  411. }
  412. }
  413. // ping ping the rpc connect and re connect when has an error.
  414. func (c *Client) ping() {
  415. var (
  416. err error
  417. cancel func()
  418. call *Call
  419. ctx context.Context
  420. client, _ = c.client.Load().(*client)
  421. )
  422. for {
  423. select {
  424. case <-c.quit:
  425. c.client.Store(errClient)
  426. if client != nil {
  427. client.Close()
  428. }
  429. return
  430. default:
  431. }
  432. if client == nil || err != nil {
  433. if client, err = dial("tcp", c.addr, time.Duration(c.timeout)); err != nil {
  434. xlog.Error("dial(%s, %s) error(%v)", "tcp", c.addr, err)
  435. time.Sleep(_pingDuration)
  436. continue
  437. }
  438. c.client.Store(client)
  439. }
  440. ctx, cancel = context.WithTimeout(context.Background(), time.Duration(c.timeout))
  441. select {
  442. case call = <-client.Go(_pingServiceMethod, _pingArg, _pingArg, make(chan *Call, 1)).Done:
  443. err = call.Error
  444. case <-ctx.Done():
  445. err = ecode.Deadline
  446. }
  447. cancel()
  448. if err != nil {
  449. if err == ErrShutdown || err == io.ErrUnexpectedEOF || ecode.Deadline.Equal(err) {
  450. xlog.Error("rpc ping error beiTle addr(%s)", c.addr)
  451. c.client.Store(errClient)
  452. client.Close()
  453. } else {
  454. err = nil // never touch here
  455. }
  456. }
  457. time.Sleep(_pingDuration)
  458. }
  459. }
  460. // Close close client connection.
  461. func (c *Client) Close() {
  462. close(c.quit)
  463. }