123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507 |
- // Copyright 2009 The Go Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- package rpc
- import (
- "bufio"
- "context"
- "encoding/gob"
- "errors"
- "io"
- "log"
- "net"
- "sync"
- "sync/atomic"
- "time"
- "go-common/library/conf/env"
- "go-common/library/ecode"
- xlog "go-common/library/log"
- "go-common/library/net/metadata"
- "go-common/library/net/netutil/breaker"
- "go-common/library/net/trace"
- "go-common/library/stat"
- xtime "go-common/library/time"
- perr "github.com/pkg/errors"
- )
- const (
- _family = "gorpc"
- _pingDuration = time.Second * 1
- )
- var (
- stats = stat.RPCClient
- // ErrShutdown shutdown error.
- ErrShutdown = errors.New("connection is shut down")
- // ErrNoClient current no rpc client.
- ErrNoClient = errors.New("no rpc client")
- errClient = new(client)
- )
- // ServerError represents an error that has been returned from
- // the remote side of the RPC connection.
- type ServerError string
- func (e ServerError) Error() string {
- return string(e)
- }
- // Call represents an active RPC.
- type Call struct {
- ServiceMethod string // The name of the service and method to call.
- Args interface{} // The argument to the function (*struct).
- Reply interface{} // The reply from the function (*struct).
- Trace trace.Trace
- Color string
- RemoteIP string
- Timeout time.Duration
- Error error // After completion, the error status.
- Done chan *Call // Strobes when call is complete.
- }
- // client represents an RPC Client.
- // There may be multiple outstanding Calls associated
- // with a single Client, and a Client may be used by
- // multiple goroutines simultaneously.
- type client struct {
- codec *clientCodec
- reqMutex sync.Mutex // protects following
- request Request
- mutex sync.Mutex // protects following
- seq uint64
- pending map[uint64]*Call
- closing bool // user has called Close
- shutdown bool // server has told us to stop
- timeout time.Duration // call timeout
- remoteAddr string // server address
- }
- func (client *client) send(call *Call) {
- client.reqMutex.Lock()
- defer client.reqMutex.Unlock()
- // Register this call.
- client.mutex.Lock()
- if client.shutdown || client.closing {
- call.Error = ErrShutdown
- client.mutex.Unlock()
- call.done()
- return
- }
- seq := client.seq
- client.seq++
- client.mutex.Unlock()
- // Encode and send the request.
- client.request.Seq = seq
- client.request.ServiceMethod = call.ServiceMethod
- client.request.Color = call.Color
- client.request.RemoteIP = call.RemoteIP
- client.request.Timeout = call.Timeout
- if call.Trace != nil {
- trace.Inject(call.Trace, nil, &client.request.Trace)
- } else {
- client.request.Trace = TraceInfo{}
- }
- err := client.codec.WriteRequest(&client.request, call.Args)
- if err != nil {
- err = perr.WithStack(err)
- if call != nil {
- call.Error = err
- call.done()
- }
- } else {
- client.mutex.Lock()
- client.pending[seq] = call
- client.mutex.Unlock()
- }
- }
- func (client *client) input() {
- var err error
- var response Response
- for err == nil {
- response = Response{}
- err = client.codec.ReadResponseHeader(&response)
- if err != nil {
- break
- }
- seq := response.Seq
- client.mutex.Lock()
- call := client.pending[seq]
- delete(client.pending, seq)
- client.mutex.Unlock()
- switch {
- case call == nil:
- // We've got no pending call. That usually means that
- // WriteRequest partially failed, and call was already
- // removed; response is a server telling us about an
- // error reading request body. We should still attempt
- // to read error body, but there's no one to give it to.
- err = client.codec.ReadResponseBody(nil)
- if err != nil {
- err = errors.New("reading error body: " + err.Error())
- }
- case response.Error != "":
- // We've got an error response. Give this to the request;
- // any subsequent requests will get the ReadResponseBody
- // error if there is one.
- call.Error = ServerError(response.Error)
- err = client.codec.ReadResponseBody(nil)
- if err != nil {
- err = errors.New("reading error body: " + err.Error())
- }
- call.done()
- default:
- err = client.codec.ReadResponseBody(call.Reply)
- if err != nil {
- call.Error = errors.New("reading body " + err.Error())
- }
- call.done()
- }
- }
- // Terminate pending calls.
- client.reqMutex.Lock()
- client.mutex.Lock()
- client.shutdown = true
- closing := client.closing
- if err == io.EOF {
- if closing {
- err = ErrShutdown
- } else {
- err = io.ErrUnexpectedEOF
- }
- }
- for _, call := range client.pending {
- call.Error = err
- call.done()
- }
- client.mutex.Unlock()
- client.reqMutex.Unlock()
- if err != io.EOF && !closing {
- log.Println("rpc: client protocol error:", err)
- }
- }
- func (call *Call) done() {
- select {
- case call.Done <- call:
- // ok
- default:
- // We don't want to block here. It is the caller's responsibility to make
- // sure the channel has enough buffer space. See comment in Go().
- log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
- }
- }
- // Finish must called after Go.
- func (call *Call) Finish() {
- if call.Trace != nil {
- call.Trace.Finish(&call.Error)
- }
- }
- // newClient returns a new Client to handle requests to the
- // set of services at the other end of the connection.
- // It adds a buffer to the write side of the connection so
- // the header and payload are sent as a unit.
- func newClient(timeout time.Duration, conn net.Conn) (*client, error) {
- encBuf := bufio.NewWriter(conn)
- client := &clientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
- c := newClientWithCodec(client)
- c.timeout = timeout
- c.remoteAddr = conn.RemoteAddr().String()
- // handshake
- c.Call(_authServiceMethod, &Auth{User: env.AppID}, &struct{}{})
- return c, nil
- }
- // newClientWithCodec is like newClient but uses the specified
- // codec to encode requests and decode responses.
- func newClientWithCodec(codec *clientCodec) *client {
- client := &client{
- codec: codec,
- pending: make(map[uint64]*Call),
- }
- go client.input()
- return client
- }
- type clientCodec struct {
- rwc io.ReadWriteCloser
- dec *gob.Decoder
- enc *gob.Encoder
- encBuf *bufio.Writer
- }
- func (c *clientCodec) WriteRequest(r *Request, body interface{}) (err error) {
- if err = c.enc.Encode(r); err != nil {
- return perr.WithStack(err)
- }
- if err = c.enc.Encode(body); err != nil {
- return perr.WithStack(err)
- }
- return perr.WithStack(c.encBuf.Flush())
- }
- func (c *clientCodec) ReadResponseHeader(r *Response) error {
- return perr.WithStack(c.dec.Decode(r))
- }
- func (c *clientCodec) ReadResponseBody(body interface{}) error {
- return perr.WithStack(c.dec.Decode(body))
- }
- func (c *clientCodec) Close() error {
- return perr.WithStack(c.rwc.Close())
- }
- // dial connects to an RPC server at the specified network address.
- func dial(network, addr string, timeout time.Duration) (*client, error) {
- // TODO dial timeout
- conn, err := net.Dial(network, addr)
- if err != nil {
- err = perr.WithStack(err)
- return nil, err
- }
- return newClient(timeout, conn)
- }
- // Close close the rpc client.
- func (client *client) Close() error {
- client.mutex.Lock()
- if client.closing {
- client.mutex.Unlock()
- return ErrShutdown
- }
- client.closing = true
- client.mutex.Unlock()
- return client.codec.Close()
- }
- func (client *client) do(call *Call, done chan *Call) {
- if done == nil {
- done = make(chan *Call, 10) // buffered.
- } else {
- // If caller passes done != nil, it must arrange that
- // done has enough buffer for the number of simultaneous
- // RPCs that will be using that channel. If the channel
- // is totally unbuffered, it's best not to run at all.
- if cap(done) == 0 {
- log.Panic("rpc: done channel is unbuffered")
- }
- }
- call.Done = done
- client.send(call)
- }
- // Go invokes the function asynchronously. It returns the Call structure representing
- // the invocation. The done channel will signal when the call is complete by returning
- // the same Call object. If done is nil, Go will allocate a new channel.
- // If non-nil, done must be buffered or Go will deliberately crash.
- // Must call Finish() after call Go.
- func (client *client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
- call := new(Call)
- call.ServiceMethod = serviceMethod
- call.Args = args
- call.Reply = reply
- client.do(call, done)
- return call
- }
- // Call invokes the named function, waits for it to complete, and returns its error status.
- func (client *client) Call(serviceMethod string, args interface{}, reply interface{}) (err error) {
- call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
- return call.Error
- }
- // Do do a rpc call.
- func (client *client) Do(call *Call) {
- client.do(call, make(chan *Call, 1))
- }
- type td struct {
- path string
- timeout time.Duration
- }
- // Client wrapper is a client holder with implements pinger.
- type Client struct {
- addr string
- timeout xtime.Duration
- client atomic.Value
- quit chan struct{}
- breaker *breaker.Group
- }
- // Dial connects to an RPC server at the specified network address.
- func Dial(addr string, timeout xtime.Duration, bkc *breaker.Config) *Client {
- client := &Client{
- addr: addr,
- timeout: timeout,
- quit: make(chan struct{}),
- }
- // breaker
- client.breaker = breaker.NewGroup(bkc)
- // timeout
- if timeout <= 0 {
- client.timeout = xtime.Duration(300 * time.Millisecond)
- }
- client.timeout = timeout
- // dial
- rc, err := dial("tcp", addr, time.Duration(timeout))
- if err != nil {
- xlog.Error("dial(%s, %s) error(%v)", "tcp", addr, err)
- } else {
- client.client.Store(rc)
- }
- go client.ping()
- return client
- }
- // Call invokes the named function, waits for it to complete, and returns its error status.
- func (c *Client) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) {
- var (
- ok bool
- code string
- rc *client
- call *Call
- cancel func()
- t trace.Trace
- timeout = time.Duration(c.timeout)
- )
- if rc, ok = c.client.Load().(*client); !ok || rc == errClient {
- xlog.Error("client is errClient (no rpc client) by ping addr(%s) error", c.addr)
- return ErrNoClient
- }
- if t, ok = trace.FromContext(ctx); !ok {
- t = trace.New(serviceMethod)
- }
- t = t.Fork(_family, serviceMethod)
- t.SetTag(trace.String(trace.TagAddress, rc.remoteAddr))
- defer t.Finish(&err)
- // breaker
- brk := c.breaker.Get(serviceMethod)
- if err = brk.Allow(); err != nil {
- code = "breaker"
- stats.Incr(serviceMethod, code)
- return
- }
- defer c.onBreaker(brk, &err)
- // stat
- now := time.Now()
- defer func() {
- stats.Timing(serviceMethod, int64(time.Since(now)/time.Millisecond))
- if code != "" {
- stats.Incr(serviceMethod, code)
- }
- }()
- // timeout: get from conf
- // if context > conf use conf else context
- deliver := true
- if deadline, ok := ctx.Deadline(); ok {
- if ctimeout := time.Until(deadline); ctimeout < timeout {
- timeout = ctimeout
- deliver = false
- }
- }
- if deliver {
- ctx, cancel = context.WithTimeout(ctx, timeout)
- defer cancel()
- }
- color := metadata.String(ctx, metadata.Color)
- remoteIP := metadata.String(ctx, metadata.RemoteIP)
- // call
- call = &Call{
- ServiceMethod: serviceMethod,
- Args: args,
- Reply: reply,
- Trace: t,
- Color: color,
- RemoteIP: remoteIP,
- Timeout: timeout,
- }
- rc.Do(call)
- select {
- case call = <-call.Done:
- err = call.Error
- code = ecode.Cause(err).Error()
- case <-ctx.Done():
- err = ecode.Deadline
- code = "timeout"
- }
- return
- }
- func (c *Client) onBreaker(breaker breaker.Breaker, err *error) {
- if err != nil && *err != nil && (*err == ErrShutdown || *err == io.ErrUnexpectedEOF || ecode.Deadline.Equal(*err) || ecode.ServiceUnavailable.Equal(*err) || ecode.ServerErr.Equal(*err)) {
- breaker.MarkFailed()
- } else {
- breaker.MarkSuccess()
- }
- }
- // ping ping the rpc connect and re connect when has an error.
- func (c *Client) ping() {
- var (
- err error
- cancel func()
- call *Call
- ctx context.Context
- client, _ = c.client.Load().(*client)
- )
- for {
- select {
- case <-c.quit:
- c.client.Store(errClient)
- if client != nil {
- client.Close()
- }
- return
- default:
- }
- if client == nil || err != nil {
- if client, err = dial("tcp", c.addr, time.Duration(c.timeout)); err != nil {
- xlog.Error("dial(%s, %s) error(%v)", "tcp", c.addr, err)
- time.Sleep(_pingDuration)
- continue
- }
- c.client.Store(client)
- }
- ctx, cancel = context.WithTimeout(context.Background(), time.Duration(c.timeout))
- select {
- case call = <-client.Go(_pingServiceMethod, _pingArg, _pingArg, make(chan *Call, 1)).Done:
- err = call.Error
- case <-ctx.Done():
- err = ecode.Deadline
- }
- cancel()
- if err != nil {
- if err == ErrShutdown || err == io.ErrUnexpectedEOF || ecode.Deadline.Equal(err) {
- xlog.Error("rpc ping error beiTle addr(%s)", c.addr)
- c.client.Store(errClient)
- client.Close()
- } else {
- err = nil // never touch here
- }
- }
- time.Sleep(_pingDuration)
- }
- }
- // Close close client connection.
- func (c *Client) Close() {
- close(c.quit)
- }
|