dialoptions.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "fmt"
  22. "net"
  23. "time"
  24. "google.golang.org/grpc/balancer"
  25. "google.golang.org/grpc/credentials"
  26. "google.golang.org/grpc/internal"
  27. "google.golang.org/grpc/internal/backoff"
  28. "google.golang.org/grpc/internal/envconfig"
  29. "google.golang.org/grpc/internal/transport"
  30. "google.golang.org/grpc/keepalive"
  31. "google.golang.org/grpc/resolver"
  32. "google.golang.org/grpc/stats"
  33. )
  34. // dialOptions configure a Dial call. dialOptions are set by the DialOption
  35. // values passed to Dial.
  36. type dialOptions struct {
  37. unaryInt UnaryClientInterceptor
  38. streamInt StreamClientInterceptor
  39. cp Compressor
  40. dc Decompressor
  41. bs backoff.Strategy
  42. block bool
  43. insecure bool
  44. timeout time.Duration
  45. scChan <-chan ServiceConfig
  46. authority string
  47. copts transport.ConnectOptions
  48. callOptions []CallOption
  49. // This is used by v1 balancer dial option WithBalancer to support v1
  50. // balancer, and also by WithBalancerName dial option.
  51. balancerBuilder balancer.Builder
  52. // This is to support grpclb.
  53. resolverBuilder resolver.Builder
  54. reqHandshake envconfig.RequireHandshakeSetting
  55. channelzParentID int64
  56. disableServiceConfig bool
  57. disableRetry bool
  58. disableHealthCheck bool
  59. }
  60. // DialOption configures how we set up the connection.
  61. type DialOption interface {
  62. apply(*dialOptions)
  63. }
  64. // EmptyDialOption does not alter the dial configuration. It can be embedded in
  65. // another structure to build custom dial options.
  66. //
  67. // This API is EXPERIMENTAL.
  68. type EmptyDialOption struct{}
  69. func (EmptyDialOption) apply(*dialOptions) {}
  70. // funcDialOption wraps a function that modifies dialOptions into an
  71. // implementation of the DialOption interface.
  72. type funcDialOption struct {
  73. f func(*dialOptions)
  74. }
  75. func (fdo *funcDialOption) apply(do *dialOptions) {
  76. fdo.f(do)
  77. }
  78. func newFuncDialOption(f func(*dialOptions)) *funcDialOption {
  79. return &funcDialOption{
  80. f: f,
  81. }
  82. }
  83. // WithWaitForHandshake blocks until the initial settings frame is received from
  84. // the server before assigning RPCs to the connection.
  85. //
  86. // Deprecated: this will become the default behavior in the 1.17 release, and
  87. // will be removed after the 1.18 release. To override the default behavior in
  88. // the 1.17 release, either use this dial option or set the environment
  89. // variable GRPC_GO_READY_BEFORE_HANDSHAKE=on.
  90. func WithWaitForHandshake() DialOption {
  91. return newFuncDialOption(func(o *dialOptions) {
  92. o.reqHandshake = envconfig.RequireHandshakeOn
  93. })
  94. }
  95. // WithWriteBufferSize determines how much data can be batched before doing a
  96. // write on the wire. The corresponding memory allocation for this buffer will
  97. // be twice the size to keep syscalls low. The default value for this buffer is
  98. // 32KB.
  99. //
  100. // Zero will disable the write buffer such that each write will be on underlying
  101. // connection. Note: A Send call may not directly translate to a write.
  102. func WithWriteBufferSize(s int) DialOption {
  103. return newFuncDialOption(func(o *dialOptions) {
  104. o.copts.WriteBufferSize = s
  105. })
  106. }
  107. // WithReadBufferSize lets you set the size of read buffer, this determines how
  108. // much data can be read at most for each read syscall.
  109. //
  110. // The default value for this buffer is 32KB. Zero will disable read buffer for
  111. // a connection so data framer can access the underlying conn directly.
  112. func WithReadBufferSize(s int) DialOption {
  113. return newFuncDialOption(func(o *dialOptions) {
  114. o.copts.ReadBufferSize = s
  115. })
  116. }
  117. // WithInitialWindowSize returns a DialOption which sets the value for initial
  118. // window size on a stream. The lower bound for window size is 64K and any value
  119. // smaller than that will be ignored.
  120. func WithInitialWindowSize(s int32) DialOption {
  121. return newFuncDialOption(func(o *dialOptions) {
  122. o.copts.InitialWindowSize = s
  123. })
  124. }
  125. // WithInitialConnWindowSize returns a DialOption which sets the value for
  126. // initial window size on a connection. The lower bound for window size is 64K
  127. // and any value smaller than that will be ignored.
  128. func WithInitialConnWindowSize(s int32) DialOption {
  129. return newFuncDialOption(func(o *dialOptions) {
  130. o.copts.InitialConnWindowSize = s
  131. })
  132. }
  133. // WithMaxMsgSize returns a DialOption which sets the maximum message size the
  134. // client can receive.
  135. //
  136. // Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
  137. func WithMaxMsgSize(s int) DialOption {
  138. return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
  139. }
  140. // WithDefaultCallOptions returns a DialOption which sets the default
  141. // CallOptions for calls over the connection.
  142. func WithDefaultCallOptions(cos ...CallOption) DialOption {
  143. return newFuncDialOption(func(o *dialOptions) {
  144. o.callOptions = append(o.callOptions, cos...)
  145. })
  146. }
  147. // WithCodec returns a DialOption which sets a codec for message marshaling and
  148. // unmarshaling.
  149. //
  150. // Deprecated: use WithDefaultCallOptions(CallCustomCodec(c)) instead.
  151. func WithCodec(c Codec) DialOption {
  152. return WithDefaultCallOptions(CallCustomCodec(c))
  153. }
  154. // WithCompressor returns a DialOption which sets a Compressor to use for
  155. // message compression. It has lower priority than the compressor set by the
  156. // UseCompressor CallOption.
  157. //
  158. // Deprecated: use UseCompressor instead.
  159. func WithCompressor(cp Compressor) DialOption {
  160. return newFuncDialOption(func(o *dialOptions) {
  161. o.cp = cp
  162. })
  163. }
  164. // WithDecompressor returns a DialOption which sets a Decompressor to use for
  165. // incoming message decompression. If incoming response messages are encoded
  166. // using the decompressor's Type(), it will be used. Otherwise, the message
  167. // encoding will be used to look up the compressor registered via
  168. // encoding.RegisterCompressor, which will then be used to decompress the
  169. // message. If no compressor is registered for the encoding, an Unimplemented
  170. // status error will be returned.
  171. //
  172. // Deprecated: use encoding.RegisterCompressor instead.
  173. func WithDecompressor(dc Decompressor) DialOption {
  174. return newFuncDialOption(func(o *dialOptions) {
  175. o.dc = dc
  176. })
  177. }
  178. // WithBalancer returns a DialOption which sets a load balancer with the v1 API.
  179. // Name resolver will be ignored if this DialOption is specified.
  180. //
  181. // Deprecated: use the new balancer APIs in balancer package and
  182. // WithBalancerName.
  183. func WithBalancer(b Balancer) DialOption {
  184. return newFuncDialOption(func(o *dialOptions) {
  185. o.balancerBuilder = &balancerWrapperBuilder{
  186. b: b,
  187. }
  188. })
  189. }
  190. // WithBalancerName sets the balancer that the ClientConn will be initialized
  191. // with. Balancer registered with balancerName will be used. This function
  192. // panics if no balancer was registered by balancerName.
  193. //
  194. // The balancer cannot be overridden by balancer option specified by service
  195. // config.
  196. //
  197. // This is an EXPERIMENTAL API.
  198. func WithBalancerName(balancerName string) DialOption {
  199. builder := balancer.Get(balancerName)
  200. if builder == nil {
  201. panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
  202. }
  203. return newFuncDialOption(func(o *dialOptions) {
  204. o.balancerBuilder = builder
  205. })
  206. }
  207. // withResolverBuilder is only for grpclb.
  208. func withResolverBuilder(b resolver.Builder) DialOption {
  209. return newFuncDialOption(func(o *dialOptions) {
  210. o.resolverBuilder = b
  211. })
  212. }
  213. // WithServiceConfig returns a DialOption which has a channel to read the
  214. // service configuration.
  215. //
  216. // Deprecated: service config should be received through name resolver, as
  217. // specified here.
  218. // https://github.com/grpc/grpc/blob/master/doc/service_config.md
  219. func WithServiceConfig(c <-chan ServiceConfig) DialOption {
  220. return newFuncDialOption(func(o *dialOptions) {
  221. o.scChan = c
  222. })
  223. }
  224. // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
  225. // when backing off after failed connection attempts.
  226. func WithBackoffMaxDelay(md time.Duration) DialOption {
  227. return WithBackoffConfig(BackoffConfig{MaxDelay: md})
  228. }
  229. // WithBackoffConfig configures the dialer to use the provided backoff
  230. // parameters after connection failures.
  231. //
  232. // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
  233. // for use.
  234. func WithBackoffConfig(b BackoffConfig) DialOption {
  235. return withBackoff(backoff.Exponential{
  236. MaxDelay: b.MaxDelay,
  237. })
  238. }
  239. // withBackoff sets the backoff strategy used for connectRetryNum after a failed
  240. // connection attempt.
  241. //
  242. // This can be exported if arbitrary backoff strategies are allowed by gRPC.
  243. func withBackoff(bs backoff.Strategy) DialOption {
  244. return newFuncDialOption(func(o *dialOptions) {
  245. o.bs = bs
  246. })
  247. }
  248. // WithBlock returns a DialOption which makes caller of Dial blocks until the
  249. // underlying connection is up. Without this, Dial returns immediately and
  250. // connecting the server happens in background.
  251. func WithBlock() DialOption {
  252. return newFuncDialOption(func(o *dialOptions) {
  253. o.block = true
  254. })
  255. }
  256. // WithInsecure returns a DialOption which disables transport security for this
  257. // ClientConn. Note that transport security is required unless WithInsecure is
  258. // set.
  259. func WithInsecure() DialOption {
  260. return newFuncDialOption(func(o *dialOptions) {
  261. o.insecure = true
  262. })
  263. }
  264. // WithTransportCredentials returns a DialOption which configures a connection
  265. // level security credentials (e.g., TLS/SSL). This should not be used together
  266. // with WithCredentialsBundle.
  267. func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
  268. return newFuncDialOption(func(o *dialOptions) {
  269. o.copts.TransportCredentials = creds
  270. })
  271. }
  272. // WithPerRPCCredentials returns a DialOption which sets credentials and places
  273. // auth state on each outbound RPC.
  274. func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
  275. return newFuncDialOption(func(o *dialOptions) {
  276. o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
  277. })
  278. }
  279. // WithCredentialsBundle returns a DialOption to set a credentials bundle for
  280. // the ClientConn.WithCreds. This should not be used together with
  281. // WithTransportCredentials.
  282. //
  283. // This API is experimental.
  284. func WithCredentialsBundle(b credentials.Bundle) DialOption {
  285. return newFuncDialOption(func(o *dialOptions) {
  286. o.copts.CredsBundle = b
  287. })
  288. }
  289. // WithTimeout returns a DialOption that configures a timeout for dialing a
  290. // ClientConn initially. This is valid if and only if WithBlock() is present.
  291. //
  292. // Deprecated: use DialContext and context.WithTimeout instead.
  293. func WithTimeout(d time.Duration) DialOption {
  294. return newFuncDialOption(func(o *dialOptions) {
  295. o.timeout = d
  296. })
  297. }
  298. func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
  299. return newFuncDialOption(func(o *dialOptions) {
  300. o.copts.Dialer = f
  301. })
  302. }
  303. func init() {
  304. internal.WithContextDialer = withContextDialer
  305. internal.WithResolverBuilder = withResolverBuilder
  306. }
  307. // WithDialer returns a DialOption that specifies a function to use for dialing
  308. // network addresses. If FailOnNonTempDialError() is set to true, and an error
  309. // is returned by f, gRPC checks the error's Temporary() method to decide if it
  310. // should try to reconnect to the network address.
  311. func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
  312. return withContextDialer(
  313. func(ctx context.Context, addr string) (net.Conn, error) {
  314. if deadline, ok := ctx.Deadline(); ok {
  315. return f(addr, deadline.Sub(time.Now()))
  316. }
  317. return f(addr, 0)
  318. })
  319. }
  320. // WithStatsHandler returns a DialOption that specifies the stats handler for
  321. // all the RPCs and underlying network connections in this ClientConn.
  322. func WithStatsHandler(h stats.Handler) DialOption {
  323. return newFuncDialOption(func(o *dialOptions) {
  324. o.copts.StatsHandler = h
  325. })
  326. }
  327. // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on
  328. // non-temporary dial errors. If f is true, and dialer returns a non-temporary
  329. // error, gRPC will fail the connection to the network address and won't try to
  330. // reconnect. The default value of FailOnNonTempDialError is false.
  331. //
  332. // FailOnNonTempDialError only affects the initial dial, and does not do
  333. // anything useful unless you are also using WithBlock().
  334. //
  335. // This is an EXPERIMENTAL API.
  336. func FailOnNonTempDialError(f bool) DialOption {
  337. return newFuncDialOption(func(o *dialOptions) {
  338. o.copts.FailOnNonTempDialError = f
  339. })
  340. }
  341. // WithUserAgent returns a DialOption that specifies a user agent string for all
  342. // the RPCs.
  343. func WithUserAgent(s string) DialOption {
  344. return newFuncDialOption(func(o *dialOptions) {
  345. o.copts.UserAgent = s
  346. })
  347. }
  348. // WithKeepaliveParams returns a DialOption that specifies keepalive parameters
  349. // for the client transport.
  350. func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
  351. return newFuncDialOption(func(o *dialOptions) {
  352. o.copts.KeepaliveParams = kp
  353. })
  354. }
  355. // WithUnaryInterceptor returns a DialOption that specifies the interceptor for
  356. // unary RPCs.
  357. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
  358. return newFuncDialOption(func(o *dialOptions) {
  359. o.unaryInt = f
  360. })
  361. }
  362. // WithStreamInterceptor returns a DialOption that specifies the interceptor for
  363. // streaming RPCs.
  364. func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
  365. return newFuncDialOption(func(o *dialOptions) {
  366. o.streamInt = f
  367. })
  368. }
  369. // WithAuthority returns a DialOption that specifies the value to be used as the
  370. // :authority pseudo-header. This value only works with WithInsecure and has no
  371. // effect if TransportCredentials are present.
  372. func WithAuthority(a string) DialOption {
  373. return newFuncDialOption(func(o *dialOptions) {
  374. o.authority = a
  375. })
  376. }
  377. // WithChannelzParentID returns a DialOption that specifies the channelz ID of
  378. // current ClientConn's parent. This function is used in nested channel creation
  379. // (e.g. grpclb dial).
  380. func WithChannelzParentID(id int64) DialOption {
  381. return newFuncDialOption(func(o *dialOptions) {
  382. o.channelzParentID = id
  383. })
  384. }
  385. // WithDisableServiceConfig returns a DialOption that causes grpc to ignore any
  386. // service config provided by the resolver and provides a hint to the resolver
  387. // to not fetch service configs.
  388. func WithDisableServiceConfig() DialOption {
  389. return newFuncDialOption(func(o *dialOptions) {
  390. o.disableServiceConfig = true
  391. })
  392. }
  393. // WithDisableRetry returns a DialOption that disables retries, even if the
  394. // service config enables them. This does not impact transparent retries, which
  395. // will happen automatically if no data is written to the wire or if the RPC is
  396. // unprocessed by the remote server.
  397. //
  398. // Retry support is currently disabled by default, but will be enabled by
  399. // default in the future. Until then, it may be enabled by setting the
  400. // environment variable "GRPC_GO_RETRY" to "on".
  401. //
  402. // This API is EXPERIMENTAL.
  403. func WithDisableRetry() DialOption {
  404. return newFuncDialOption(func(o *dialOptions) {
  405. o.disableRetry = true
  406. })
  407. }
  408. // WithMaxHeaderListSize returns a DialOption that specifies the maximum
  409. // (uncompressed) size of header list that the client is prepared to accept.
  410. func WithMaxHeaderListSize(s uint32) DialOption {
  411. return newFuncDialOption(func(o *dialOptions) {
  412. o.copts.MaxHeaderListSize = &s
  413. })
  414. }
  415. // WithDisableHealthCheck disables the LB channel health checking for all SubConns of this ClientConn.
  416. //
  417. // This API is EXPERIMENTAL.
  418. func WithDisableHealthCheck() DialOption {
  419. return newFuncDialOption(func(o *dialOptions) {
  420. o.disableHealthCheck = true
  421. })
  422. }
  423. func defaultDialOptions() dialOptions {
  424. return dialOptions{
  425. disableRetry: !envconfig.Retry,
  426. reqHandshake: envconfig.RequireHandshake,
  427. copts: transport.ConnectOptions{
  428. WriteBufferSize: defaultWriteBufSize,
  429. ReadBufferSize: defaultReadBufSize,
  430. },
  431. }
  432. }