client.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. package warden
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "go-common/library/conf/env"
  12. "go-common/library/conf/flagvar"
  13. "go-common/library/ecode"
  14. "go-common/library/naming"
  15. "go-common/library/naming/discovery"
  16. nmd "go-common/library/net/metadata"
  17. "go-common/library/net/netutil/breaker"
  18. "go-common/library/net/rpc/warden/balancer/wrr"
  19. "go-common/library/net/rpc/warden/resolver"
  20. "go-common/library/net/rpc/warden/status"
  21. "go-common/library/net/trace"
  22. xtime "go-common/library/time"
  23. "github.com/pkg/errors"
  24. "google.golang.org/grpc"
  25. "google.golang.org/grpc/credentials"
  26. "google.golang.org/grpc/metadata"
  27. "google.golang.org/grpc/peer"
  28. gstatus "google.golang.org/grpc/status"
  29. )
  30. var _grpcTarget flagvar.StringVars
  31. var (
  32. _once sync.Once
  33. _defaultCliConf = &ClientConfig{
  34. Dial: xtime.Duration(time.Second * 10),
  35. Timeout: xtime.Duration(time.Millisecond * 250),
  36. Subset: 50,
  37. }
  38. _defaultClient *Client
  39. )
  40. // ClientConfig is rpc client conf.
  41. type ClientConfig struct {
  42. Dial xtime.Duration
  43. Timeout xtime.Duration
  44. Breaker *breaker.Config
  45. Method map[string]*ClientConfig
  46. Clusters []string
  47. Zone string
  48. Subset int
  49. NonBlock bool
  50. }
  51. // Client is the framework's client side instance, it contains the ctx, opt and interceptors.
  52. // Create an instance of Client, by using NewClient().
  53. type Client struct {
  54. conf *ClientConfig
  55. breaker *breaker.Group
  56. mutex sync.RWMutex
  57. opt []grpc.DialOption
  58. handlers []grpc.UnaryClientInterceptor
  59. }
  60. // handle returns a new unary client interceptor for OpenTracing\Logging\LinkTimeout.
  61. func (c *Client) handle() grpc.UnaryClientInterceptor {
  62. return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {
  63. var (
  64. ok bool
  65. cmd nmd.MD
  66. t trace.Trace
  67. gmd metadata.MD
  68. conf *ClientConfig
  69. cancel context.CancelFunc
  70. addr string
  71. p peer.Peer
  72. )
  73. var ec ecode.Codes = ecode.OK
  74. // apm tracing
  75. if t, ok = trace.FromContext(ctx); ok {
  76. t = t.Fork(_family, method)
  77. defer t.Finish(&err)
  78. }
  79. // setup metadata
  80. gmd = metadata.MD{}
  81. trace.Inject(t, trace.GRPCFormat, gmd)
  82. c.mutex.RLock()
  83. if conf, ok = c.conf.Method[method]; !ok {
  84. conf = c.conf
  85. }
  86. c.mutex.RUnlock()
  87. brk := c.breaker.Get(method)
  88. if err = brk.Allow(); err != nil {
  89. statsClient.Incr(method, "breaker")
  90. return
  91. }
  92. defer onBreaker(brk, &err)
  93. _, ctx, cancel = conf.Timeout.Shrink(ctx)
  94. defer cancel()
  95. // meta color
  96. if cmd, ok = nmd.FromContext(ctx); ok {
  97. var color, ip, port string
  98. if color, ok = cmd[nmd.Color].(string); ok {
  99. gmd[nmd.Color] = []string{color}
  100. }
  101. if ip, ok = cmd[nmd.RemoteIP].(string); ok {
  102. gmd[nmd.RemoteIP] = []string{ip}
  103. }
  104. if port, ok = cmd[nmd.RemotePort].(string); ok {
  105. gmd[nmd.RemotePort] = []string{port}
  106. }
  107. } else {
  108. // NOTE: dead code delete it after test in futrue
  109. cmd = nmd.MD{}
  110. ctx = nmd.NewContext(ctx, cmd)
  111. }
  112. gmd[nmd.Caller] = []string{env.AppID}
  113. // merge with old matadata if exists
  114. if oldmd, ok := metadata.FromOutgoingContext(ctx); ok {
  115. gmd = metadata.Join(gmd, oldmd)
  116. }
  117. ctx = metadata.NewOutgoingContext(ctx, gmd)
  118. opts = append(opts, grpc.Peer(&p))
  119. if err = invoker(ctx, method, req, reply, cc, opts...); err != nil {
  120. gst, _ := gstatus.FromError(err)
  121. ec = status.ToEcode(gst)
  122. err = errors.WithMessage(ec, gst.Message())
  123. }
  124. if p.Addr != nil {
  125. addr = p.Addr.String()
  126. }
  127. if t != nil {
  128. t.SetTag(trace.String(trace.TagAddress, addr), trace.String(trace.TagComment, ""))
  129. }
  130. return
  131. }
  132. }
  133. func onBreaker(breaker breaker.Breaker, err *error) {
  134. if err != nil && *err != nil {
  135. if ecode.ServerErr.Equal(*err) || ecode.ServiceUnavailable.Equal(*err) || ecode.Deadline.Equal(*err) || ecode.LimitExceed.Equal(*err) {
  136. breaker.MarkFailed()
  137. return
  138. }
  139. }
  140. breaker.MarkSuccess()
  141. }
  142. // NewConn will create a grpc conn by default config.
  143. func NewConn(target string, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
  144. return DefaultClient().Dial(context.Background(), target, opt...)
  145. }
  146. // NewClient returns a new blank Client instance with a default client interceptor.
  147. // opt can be used to add grpc dial options.
  148. func NewClient(conf *ClientConfig, opt ...grpc.DialOption) *Client {
  149. resolver.Register(discovery.Builder())
  150. c := new(Client)
  151. if err := c.SetConfig(conf); err != nil {
  152. panic(err)
  153. }
  154. c.UseOpt(grpc.WithBalancerName(wrr.Name))
  155. c.UseOpt(opt...)
  156. c.Use(c.recovery(), clientLogging(), c.handle(), wrr.Stats())
  157. return c
  158. }
  159. // DefaultClient returns a new default Client instance with a default client interceptor and default dialoption.
  160. // opt can be used to add grpc dial options.
  161. func DefaultClient() *Client {
  162. resolver.Register(discovery.Builder())
  163. _once.Do(func() {
  164. _defaultClient = NewClient(nil)
  165. })
  166. return _defaultClient
  167. }
  168. // SetConfig hot reloads client config
  169. func (c *Client) SetConfig(conf *ClientConfig) (err error) {
  170. if conf == nil {
  171. conf = _defaultCliConf
  172. }
  173. if conf.Dial <= 0 {
  174. conf.Dial = xtime.Duration(time.Second * 10)
  175. }
  176. if conf.Timeout <= 0 {
  177. conf.Timeout = xtime.Duration(time.Millisecond * 250)
  178. }
  179. if conf.Subset <= 0 {
  180. conf.Subset = 50
  181. }
  182. // FIXME(maojian) check Method dial/timeout
  183. c.mutex.Lock()
  184. c.conf = conf
  185. if c.breaker == nil {
  186. c.breaker = breaker.NewGroup(conf.Breaker)
  187. } else {
  188. c.breaker.Reload(conf.Breaker)
  189. }
  190. c.mutex.Unlock()
  191. return nil
  192. }
  193. // Use attachs a global inteceptor to the Client.
  194. // For example, this is the right place for a circuit breaker or error management inteceptor.
  195. func (c *Client) Use(handlers ...grpc.UnaryClientInterceptor) *Client {
  196. finalSize := len(c.handlers) + len(handlers)
  197. if finalSize >= int(_abortIndex) {
  198. panic("warden: client use too many handlers")
  199. }
  200. mergedHandlers := make([]grpc.UnaryClientInterceptor, finalSize)
  201. copy(mergedHandlers, c.handlers)
  202. copy(mergedHandlers[len(c.handlers):], handlers)
  203. c.handlers = mergedHandlers
  204. return c
  205. }
  206. // UseOpt attachs a global grpc DialOption to the Client.
  207. func (c *Client) UseOpt(opt ...grpc.DialOption) *Client {
  208. c.opt = append(c.opt, opt...)
  209. return c
  210. }
  211. // Dial creates a client connection to the given target.
  212. // Target format is scheme://authority/endpoint?query_arg=value
  213. // example: discovery://default/account.account.service?cluster=shfy01&cluster=shfy02
  214. func (c *Client) Dial(ctx context.Context, target string, opt ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
  215. if !c.conf.NonBlock {
  216. c.opt = append(c.opt, grpc.WithBlock())
  217. }
  218. c.opt = append(c.opt, grpc.WithInsecure())
  219. c.opt = append(c.opt, grpc.WithUnaryInterceptor(c.chainUnaryClient()))
  220. c.opt = append(c.opt, opt...)
  221. c.mutex.RLock()
  222. conf := c.conf
  223. c.mutex.RUnlock()
  224. if conf.Dial > 0 {
  225. var cancel context.CancelFunc
  226. ctx, cancel = context.WithTimeout(ctx, time.Duration(conf.Dial))
  227. defer cancel()
  228. }
  229. if u, e := url.Parse(target); e == nil {
  230. v := u.Query()
  231. for _, c := range c.conf.Clusters {
  232. v.Add(naming.MetaCluster, c)
  233. }
  234. if c.conf.Zone != "" {
  235. v.Add(naming.MetaZone, c.conf.Zone)
  236. }
  237. if v.Get("subset") == "" && c.conf.Subset > 0 {
  238. v.Add("subset", strconv.FormatInt(int64(c.conf.Subset), 10))
  239. }
  240. u.RawQuery = v.Encode()
  241. // 比较_grpcTarget中的appid是否等于u.path中的appid,并替换成mock的地址
  242. for _, t := range _grpcTarget {
  243. strs := strings.SplitN(t, "=", 2)
  244. if len(strs) == 2 && ("/"+strs[0]) == u.Path {
  245. u.Path = "/" + strs[1]
  246. u.Scheme = "passthrough"
  247. u.RawQuery = ""
  248. break
  249. }
  250. }
  251. target = u.String()
  252. }
  253. if conn, err = grpc.DialContext(ctx, target, c.opt...); err != nil {
  254. fmt.Fprintf(os.Stderr, "warden client: dial %s error %v!", target, err)
  255. }
  256. err = errors.WithStack(err)
  257. return
  258. }
  259. // DialTLS creates a client connection over tls transport to the given target.
  260. func (c *Client) DialTLS(ctx context.Context, target string, file string, name string) (conn *grpc.ClientConn, err error) {
  261. var creds credentials.TransportCredentials
  262. creds, err = credentials.NewClientTLSFromFile(file, name)
  263. if err != nil {
  264. err = errors.WithStack(err)
  265. return
  266. }
  267. c.opt = append(c.opt, grpc.WithBlock())
  268. c.opt = append(c.opt, grpc.WithTransportCredentials(creds))
  269. c.opt = append(c.opt, grpc.WithUnaryInterceptor(c.chainUnaryClient()))
  270. c.mutex.RLock()
  271. conf := c.conf
  272. c.mutex.RUnlock()
  273. if conf.Dial > 0 {
  274. var cancel context.CancelFunc
  275. ctx, cancel = context.WithTimeout(ctx, time.Duration(conf.Dial))
  276. defer cancel()
  277. }
  278. conn, err = grpc.DialContext(ctx, target, c.opt...)
  279. err = errors.WithStack(err)
  280. return
  281. }
  282. // chainUnaryClient creates a single interceptor out of a chain of many interceptors.
  283. //
  284. // Execution is done in left-to-right order, including passing of context.
  285. // For example ChainUnaryClient(one, two, three) will execute one before two before three.
  286. func (c *Client) chainUnaryClient() grpc.UnaryClientInterceptor {
  287. n := len(c.handlers)
  288. if n == 0 {
  289. return func(ctx context.Context, method string, req, reply interface{},
  290. cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  291. return invoker(ctx, method, req, reply, cc, opts...)
  292. }
  293. }
  294. return func(ctx context.Context, method string, req, reply interface{},
  295. cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  296. var (
  297. i int
  298. chainHandler grpc.UnaryInvoker
  299. )
  300. chainHandler = func(ictx context.Context, imethod string, ireq, ireply interface{}, ic *grpc.ClientConn, iopts ...grpc.CallOption) error {
  301. if i == n-1 {
  302. return invoker(ictx, imethod, ireq, ireply, ic, iopts...)
  303. }
  304. i++
  305. return c.handlers[i](ictx, imethod, ireq, ireply, ic, chainHandler, iopts...)
  306. }
  307. return c.handlers[0](ctx, method, req, reply, cc, chainHandler, opts...)
  308. }
  309. }