123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- package warden
- import (
- "context"
- "fmt"
- "net/url"
- "os"
- "strconv"
- "strings"
- "sync"
- "time"
- "go-common/library/conf/env"
- "go-common/library/conf/flagvar"
- "go-common/library/ecode"
- "go-common/library/naming"
- "go-common/library/naming/discovery"
- nmd "go-common/library/net/metadata"
- "go-common/library/net/netutil/breaker"
- "go-common/library/net/rpc/warden/balancer/wrr"
- "go-common/library/net/rpc/warden/resolver"
- "go-common/library/net/rpc/warden/status"
- "go-common/library/net/trace"
- xtime "go-common/library/time"
- "github.com/pkg/errors"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/peer"
- gstatus "google.golang.org/grpc/status"
- )
- var _grpcTarget flagvar.StringVars
- var (
- _once sync.Once
- _defaultCliConf = &ClientConfig{
- Dial: xtime.Duration(time.Second * 10),
- Timeout: xtime.Duration(time.Millisecond * 250),
- Subset: 50,
- }
- _defaultClient *Client
- )
- // ClientConfig is rpc client conf.
- type ClientConfig struct {
- Dial xtime.Duration
- Timeout xtime.Duration
- Breaker *breaker.Config
- Method map[string]*ClientConfig
- Clusters []string
- Zone string
- Subset int
- NonBlock bool
- }
- // Client is the framework's client side instance, it contains the ctx, opt and interceptors.
- // Create an instance of Client, by using NewClient().
- type Client struct {
- conf *ClientConfig
- breaker *breaker.Group
- mutex sync.RWMutex
- opt []grpc.DialOption
- handlers []grpc.UnaryClientInterceptor
- }
- // handle returns a new unary client interceptor for OpenTracing\Logging\LinkTimeout.
- func (c *Client) handle() grpc.UnaryClientInterceptor {
- return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {
- var (
- ok bool
- cmd nmd.MD
- t trace.Trace
- gmd metadata.MD
- conf *ClientConfig
- cancel context.CancelFunc
- addr string
- p peer.Peer
- )
- var ec ecode.Codes = ecode.OK
- // apm tracing
- if t, ok = trace.FromContext(ctx); ok {
- t = t.Fork(_family, method)
- defer t.Finish(&err)
- }
- // setup metadata
- gmd = metadata.MD{}
- trace.Inject(t, trace.GRPCFormat, gmd)
- c.mutex.RLock()
- if conf, ok = c.conf.Method[method]; !ok {
- conf = c.conf
- }
- c.mutex.RUnlock()
- brk := c.breaker.Get(method)
- if err = brk.Allow(); err != nil {
- statsClient.Incr(method, "breaker")
- return
- }
- defer onBreaker(brk, &err)
- _, ctx, cancel = conf.Timeout.Shrink(ctx)
- defer cancel()
- // meta color
- if cmd, ok = nmd.FromContext(ctx); ok {
- var color, ip, port string
- if color, ok = cmd[nmd.Color].(string); ok {
- gmd[nmd.Color] = []string{color}
- }
- if ip, ok = cmd[nmd.RemoteIP].(string); ok {
- gmd[nmd.RemoteIP] = []string{ip}
- }
- if port, ok = cmd[nmd.RemotePort].(string); ok {
- gmd[nmd.RemotePort] = []string{port}
- }
- } else {
- // NOTE: dead code delete it after test in futrue
- cmd = nmd.MD{}
- ctx = nmd.NewContext(ctx, cmd)
- }
- gmd[nmd.Caller] = []string{env.AppID}
- // merge with old matadata if exists
- if oldmd, ok := metadata.FromOutgoingContext(ctx); ok {
- gmd = metadata.Join(gmd, oldmd)
- }
- ctx = metadata.NewOutgoingContext(ctx, gmd)
- opts = append(opts, grpc.Peer(&p))
- if err = invoker(ctx, method, req, reply, cc, opts...); err != nil {
- gst, _ := gstatus.FromError(err)
- ec = status.ToEcode(gst)
- err = errors.WithMessage(ec, gst.Message())
- }
- if p.Addr != nil {
- addr = p.Addr.String()
- }
- if t != nil {
- t.SetTag(trace.String(trace.TagAddress, addr), trace.String(trace.TagComment, ""))
- }
- return
- }
- }
- func onBreaker(breaker breaker.Breaker, err *error) {
- if err != nil && *err != nil {
- if ecode.ServerErr.Equal(*err) || ecode.ServiceUnavailable.Equal(*err) || ecode.Deadline.Equal(*err) || ecode.LimitExceed.Equal(*err) {
- breaker.MarkFailed()
- return
- }
- }
- breaker.MarkSuccess()
- }
- // NewConn will create a grpc conn by default config.
- func NewConn(target string, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
- return DefaultClient().Dial(context.Background(), target, opt...)
- }
- // NewClient returns a new blank Client instance with a default client interceptor.
- // opt can be used to add grpc dial options.
- func NewClient(conf *ClientConfig, opt ...grpc.DialOption) *Client {
- resolver.Register(discovery.Builder())
- c := new(Client)
- if err := c.SetConfig(conf); err != nil {
- panic(err)
- }
- c.UseOpt(grpc.WithBalancerName(wrr.Name))
- c.UseOpt(opt...)
- c.Use(c.recovery(), clientLogging(), c.handle(), wrr.Stats())
- return c
- }
- // DefaultClient returns a new default Client instance with a default client interceptor and default dialoption.
- // opt can be used to add grpc dial options.
- func DefaultClient() *Client {
- resolver.Register(discovery.Builder())
- _once.Do(func() {
- _defaultClient = NewClient(nil)
- })
- return _defaultClient
- }
- // SetConfig hot reloads client config
- func (c *Client) SetConfig(conf *ClientConfig) (err error) {
- if conf == nil {
- conf = _defaultCliConf
- }
- if conf.Dial <= 0 {
- conf.Dial = xtime.Duration(time.Second * 10)
- }
- if conf.Timeout <= 0 {
- conf.Timeout = xtime.Duration(time.Millisecond * 250)
- }
- if conf.Subset <= 0 {
- conf.Subset = 50
- }
- // FIXME(maojian) check Method dial/timeout
- c.mutex.Lock()
- c.conf = conf
- if c.breaker == nil {
- c.breaker = breaker.NewGroup(conf.Breaker)
- } else {
- c.breaker.Reload(conf.Breaker)
- }
- c.mutex.Unlock()
- return nil
- }
- // Use attachs a global inteceptor to the Client.
- // For example, this is the right place for a circuit breaker or error management inteceptor.
- func (c *Client) Use(handlers ...grpc.UnaryClientInterceptor) *Client {
- finalSize := len(c.handlers) + len(handlers)
- if finalSize >= int(_abortIndex) {
- panic("warden: client use too many handlers")
- }
- mergedHandlers := make([]grpc.UnaryClientInterceptor, finalSize)
- copy(mergedHandlers, c.handlers)
- copy(mergedHandlers[len(c.handlers):], handlers)
- c.handlers = mergedHandlers
- return c
- }
- // UseOpt attachs a global grpc DialOption to the Client.
- func (c *Client) UseOpt(opt ...grpc.DialOption) *Client {
- c.opt = append(c.opt, opt...)
- return c
- }
- // Dial creates a client connection to the given target.
- // Target format is scheme://authority/endpoint?query_arg=value
- // example: discovery://default/account.account.service?cluster=shfy01&cluster=shfy02
- func (c *Client) Dial(ctx context.Context, target string, opt ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
- if !c.conf.NonBlock {
- c.opt = append(c.opt, grpc.WithBlock())
- }
- c.opt = append(c.opt, grpc.WithInsecure())
- c.opt = append(c.opt, grpc.WithUnaryInterceptor(c.chainUnaryClient()))
- c.opt = append(c.opt, opt...)
- c.mutex.RLock()
- conf := c.conf
- c.mutex.RUnlock()
- if conf.Dial > 0 {
- var cancel context.CancelFunc
- ctx, cancel = context.WithTimeout(ctx, time.Duration(conf.Dial))
- defer cancel()
- }
- if u, e := url.Parse(target); e == nil {
- v := u.Query()
- for _, c := range c.conf.Clusters {
- v.Add(naming.MetaCluster, c)
- }
- if c.conf.Zone != "" {
- v.Add(naming.MetaZone, c.conf.Zone)
- }
- if v.Get("subset") == "" && c.conf.Subset > 0 {
- v.Add("subset", strconv.FormatInt(int64(c.conf.Subset), 10))
- }
- u.RawQuery = v.Encode()
- // 比较_grpcTarget中的appid是否等于u.path中的appid,并替换成mock的地址
- for _, t := range _grpcTarget {
- strs := strings.SplitN(t, "=", 2)
- if len(strs) == 2 && ("/"+strs[0]) == u.Path {
- u.Path = "/" + strs[1]
- u.Scheme = "passthrough"
- u.RawQuery = ""
- break
- }
- }
- target = u.String()
- }
- if conn, err = grpc.DialContext(ctx, target, c.opt...); err != nil {
- fmt.Fprintf(os.Stderr, "warden client: dial %s error %v!", target, err)
- }
- err = errors.WithStack(err)
- return
- }
- // DialTLS creates a client connection over tls transport to the given target.
- func (c *Client) DialTLS(ctx context.Context, target string, file string, name string) (conn *grpc.ClientConn, err error) {
- var creds credentials.TransportCredentials
- creds, err = credentials.NewClientTLSFromFile(file, name)
- if err != nil {
- err = errors.WithStack(err)
- return
- }
- c.opt = append(c.opt, grpc.WithBlock())
- c.opt = append(c.opt, grpc.WithTransportCredentials(creds))
- c.opt = append(c.opt, grpc.WithUnaryInterceptor(c.chainUnaryClient()))
- c.mutex.RLock()
- conf := c.conf
- c.mutex.RUnlock()
- if conf.Dial > 0 {
- var cancel context.CancelFunc
- ctx, cancel = context.WithTimeout(ctx, time.Duration(conf.Dial))
- defer cancel()
- }
- conn, err = grpc.DialContext(ctx, target, c.opt...)
- err = errors.WithStack(err)
- return
- }
- // chainUnaryClient creates a single interceptor out of a chain of many interceptors.
- //
- // Execution is done in left-to-right order, including passing of context.
- // For example ChainUnaryClient(one, two, three) will execute one before two before three.
- func (c *Client) chainUnaryClient() grpc.UnaryClientInterceptor {
- n := len(c.handlers)
- if n == 0 {
- return func(ctx context.Context, method string, req, reply interface{},
- cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
- return invoker(ctx, method, req, reply, cc, opts...)
- }
- }
- return func(ctx context.Context, method string, req, reply interface{},
- cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
- var (
- i int
- chainHandler grpc.UnaryInvoker
- )
- chainHandler = func(ictx context.Context, imethod string, ireq, ireply interface{}, ic *grpc.ClientConn, iopts ...grpc.CallOption) error {
- if i == n-1 {
- return invoker(ictx, imethod, ireq, ireply, ic, iopts...)
- }
- i++
- return c.handlers[i](ictx, imethod, ireq, ireply, ic, chainHandler, iopts...)
- }
- return c.handlers[0](ctx, method, req, reply, cc, chainHandler, opts...)
- }
- }
|