grpclb.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. //go:generate ./regenerate.sh
  19. // Package grpclb defines a grpclb balancer.
  20. //
  21. // To install grpclb balancer, import this package as:
  22. // import _ "google.golang.org/grpc/balancer/grpclb"
  23. package grpclb
  24. import (
  25. "context"
  26. "errors"
  27. "strconv"
  28. "strings"
  29. "sync"
  30. "time"
  31. durationpb "github.com/golang/protobuf/ptypes/duration"
  32. "google.golang.org/grpc"
  33. "google.golang.org/grpc/balancer"
  34. lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
  35. "google.golang.org/grpc/connectivity"
  36. "google.golang.org/grpc/credentials"
  37. "google.golang.org/grpc/grpclog"
  38. "google.golang.org/grpc/internal"
  39. "google.golang.org/grpc/internal/backoff"
  40. "google.golang.org/grpc/resolver"
  41. )
  42. const (
  43. lbTokeyKey = "lb-token"
  44. defaultFallbackTimeout = 10 * time.Second
  45. grpclbName = "grpclb"
  46. )
  47. var (
  48. // defaultBackoffConfig configures the backoff strategy that's used when the
  49. // init handshake in the RPC is unsuccessful. It's not for the clientconn
  50. // reconnect backoff.
  51. //
  52. // It has the same value as the default grpc.DefaultBackoffConfig.
  53. //
  54. // TODO: make backoff configurable.
  55. defaultBackoffConfig = backoff.Exponential{
  56. MaxDelay: 120 * time.Second,
  57. }
  58. errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection")
  59. )
  60. func convertDuration(d *durationpb.Duration) time.Duration {
  61. if d == nil {
  62. return 0
  63. }
  64. return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
  65. }
  66. // Client API for LoadBalancer service.
  67. // Mostly copied from generated pb.go file.
  68. // To avoid circular dependency.
  69. type loadBalancerClient struct {
  70. cc *grpc.ClientConn
  71. }
  72. func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (*balanceLoadClientStream, error) {
  73. desc := &grpc.StreamDesc{
  74. StreamName: "BalanceLoad",
  75. ServerStreams: true,
  76. ClientStreams: true,
  77. }
  78. stream, err := c.cc.NewStream(ctx, desc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
  79. if err != nil {
  80. return nil, err
  81. }
  82. x := &balanceLoadClientStream{stream}
  83. return x, nil
  84. }
  85. type balanceLoadClientStream struct {
  86. grpc.ClientStream
  87. }
  88. func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
  89. return x.ClientStream.SendMsg(m)
  90. }
  91. func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
  92. m := new(lbpb.LoadBalanceResponse)
  93. if err := x.ClientStream.RecvMsg(m); err != nil {
  94. return nil, err
  95. }
  96. return m, nil
  97. }
  98. func init() {
  99. balancer.Register(newLBBuilder())
  100. }
  101. // newLBBuilder creates a builder for grpclb.
  102. func newLBBuilder() balancer.Builder {
  103. return newLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
  104. }
  105. // newLBBuilderWithFallbackTimeout creates a grpclb builder with the given
  106. // fallbackTimeout. If no response is received from the remote balancer within
  107. // fallbackTimeout, the backend addresses from the resolved address list will be
  108. // used.
  109. //
  110. // Only call this function when a non-default fallback timeout is needed.
  111. func newLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Builder {
  112. return &lbBuilder{
  113. fallbackTimeout: fallbackTimeout,
  114. }
  115. }
  116. type lbBuilder struct {
  117. fallbackTimeout time.Duration
  118. }
  119. func (b *lbBuilder) Name() string {
  120. return grpclbName
  121. }
  122. func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
  123. // This generates a manual resolver builder with a random scheme. This
  124. // scheme will be used to dial to remote LB, so we can send filtered address
  125. // updates to remote LB ClientConn using this manual resolver.
  126. scheme := "grpclb_internal_" + strconv.FormatInt(time.Now().UnixNano(), 36)
  127. r := &lbManualResolver{scheme: scheme, ccb: cc}
  128. var target string
  129. targetSplitted := strings.Split(cc.Target(), ":///")
  130. if len(targetSplitted) < 2 {
  131. target = cc.Target()
  132. } else {
  133. target = targetSplitted[1]
  134. }
  135. lb := &lbBalancer{
  136. cc: newLBCacheClientConn(cc),
  137. target: target,
  138. opt: opt,
  139. fallbackTimeout: b.fallbackTimeout,
  140. doneCh: make(chan struct{}),
  141. manualResolver: r,
  142. csEvltr: &balancer.ConnectivityStateEvaluator{},
  143. subConns: make(map[resolver.Address]balancer.SubConn),
  144. scStates: make(map[balancer.SubConn]connectivity.State),
  145. picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
  146. clientStats: newRPCStats(),
  147. backoff: defaultBackoffConfig, // TODO: make backoff configurable.
  148. }
  149. var err error
  150. if opt.CredsBundle != nil {
  151. lb.grpclbClientConnCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBalancer)
  152. if err != nil {
  153. grpclog.Warningf("lbBalancer: client connection creds NewWithMode failed: %v", err)
  154. }
  155. lb.grpclbBackendCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer)
  156. if err != nil {
  157. grpclog.Warningf("lbBalancer: backend creds NewWithMode failed: %v", err)
  158. }
  159. }
  160. return lb
  161. }
  162. type lbBalancer struct {
  163. cc *lbCacheClientConn
  164. target string
  165. opt balancer.BuildOptions
  166. // grpclbClientConnCreds is the creds bundle to be used to connect to grpclb
  167. // servers. If it's nil, use the TransportCredentials from BuildOptions
  168. // instead.
  169. grpclbClientConnCreds credentials.Bundle
  170. // grpclbBackendCreds is the creds bundle to be used for addresses that are
  171. // returned by grpclb server. If it's nil, don't set anything when creating
  172. // SubConns.
  173. grpclbBackendCreds credentials.Bundle
  174. fallbackTimeout time.Duration
  175. doneCh chan struct{}
  176. // manualResolver is used in the remote LB ClientConn inside grpclb. When
  177. // resolved address updates are received by grpclb, filtered updates will be
  178. // send to remote LB ClientConn through this resolver.
  179. manualResolver *lbManualResolver
  180. // The ClientConn to talk to the remote balancer.
  181. ccRemoteLB *grpc.ClientConn
  182. // backoff for calling remote balancer.
  183. backoff backoff.Strategy
  184. // Support client side load reporting. Each picker gets a reference to this,
  185. // and will update its content.
  186. clientStats *rpcStats
  187. mu sync.Mutex // guards everything following.
  188. // The full server list including drops, used to check if the newly received
  189. // serverList contains anything new. Each generate picker will also have
  190. // reference to this list to do the first layer pick.
  191. fullServerList []*lbpb.Server
  192. // All backends addresses, with metadata set to nil. This list contains all
  193. // backend addresses in the same order and with the same duplicates as in
  194. // serverlist. When generating picker, a SubConn slice with the same order
  195. // but with only READY SCs will be gerenated.
  196. backendAddrs []resolver.Address
  197. // Roundrobin functionalities.
  198. csEvltr *balancer.ConnectivityStateEvaluator
  199. state connectivity.State
  200. subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn.
  201. scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
  202. picker balancer.Picker
  203. // Support fallback to resolved backend addresses if there's no response
  204. // from remote balancer within fallbackTimeout.
  205. fallbackTimerExpired bool
  206. serverListReceived bool
  207. // resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set
  208. // when resolved address updates are received, and read in the goroutine
  209. // handling fallback.
  210. resolvedBackendAddrs []resolver.Address
  211. }
  212. // regeneratePicker takes a snapshot of the balancer, and generates a picker from
  213. // it. The picker
  214. // - always returns ErrTransientFailure if the balancer is in TransientFailure,
  215. // - does two layer roundrobin pick otherwise.
  216. // Caller must hold lb.mu.
  217. func (lb *lbBalancer) regeneratePicker() {
  218. if lb.state == connectivity.TransientFailure {
  219. lb.picker = &errPicker{err: balancer.ErrTransientFailure}
  220. return
  221. }
  222. var readySCs []balancer.SubConn
  223. for _, a := range lb.backendAddrs {
  224. if sc, ok := lb.subConns[a]; ok {
  225. if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready {
  226. readySCs = append(readySCs, sc)
  227. }
  228. }
  229. }
  230. if len(lb.fullServerList) <= 0 {
  231. if len(readySCs) <= 0 {
  232. lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
  233. return
  234. }
  235. lb.picker = &rrPicker{subConns: readySCs}
  236. return
  237. }
  238. lb.picker = &lbPicker{
  239. serverList: lb.fullServerList,
  240. subConns: readySCs,
  241. stats: lb.clientStats,
  242. }
  243. }
  244. func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
  245. grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
  246. lb.mu.Lock()
  247. defer lb.mu.Unlock()
  248. oldS, ok := lb.scStates[sc]
  249. if !ok {
  250. grpclog.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
  251. return
  252. }
  253. lb.scStates[sc] = s
  254. switch s {
  255. case connectivity.Idle:
  256. sc.Connect()
  257. case connectivity.Shutdown:
  258. // When an address was removed by resolver, b called RemoveSubConn but
  259. // kept the sc's state in scStates. Remove state for this sc here.
  260. delete(lb.scStates, sc)
  261. }
  262. oldAggrState := lb.state
  263. lb.state = lb.csEvltr.RecordTransition(oldS, s)
  264. // Regenerate picker when one of the following happens:
  265. // - this sc became ready from not-ready
  266. // - this sc became not-ready from ready
  267. // - the aggregated state of balancer became TransientFailure from non-TransientFailure
  268. // - the aggregated state of balancer became non-TransientFailure from TransientFailure
  269. if (oldS == connectivity.Ready) != (s == connectivity.Ready) ||
  270. (lb.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
  271. lb.regeneratePicker()
  272. }
  273. lb.cc.UpdateBalancerState(lb.state, lb.picker)
  274. }
  275. // fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use
  276. // resolved backends (backends received from resolver, not from remote balancer)
  277. // if no connection to remote balancers was successful.
  278. func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
  279. timer := time.NewTimer(fallbackTimeout)
  280. defer timer.Stop()
  281. select {
  282. case <-timer.C:
  283. case <-lb.doneCh:
  284. return
  285. }
  286. lb.mu.Lock()
  287. if lb.serverListReceived {
  288. lb.mu.Unlock()
  289. return
  290. }
  291. lb.fallbackTimerExpired = true
  292. lb.refreshSubConns(lb.resolvedBackendAddrs, false)
  293. lb.mu.Unlock()
  294. }
  295. // HandleResolvedAddrs sends the updated remoteLB addresses to remoteLB
  296. // clientConn. The remoteLB clientConn will handle creating/removing remoteLB
  297. // connections.
  298. func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
  299. grpclog.Infof("lbBalancer: handleResolvedResult: %+v", addrs)
  300. if len(addrs) <= 0 {
  301. return
  302. }
  303. var remoteBalancerAddrs, backendAddrs []resolver.Address
  304. for _, a := range addrs {
  305. if a.Type == resolver.GRPCLB {
  306. remoteBalancerAddrs = append(remoteBalancerAddrs, a)
  307. } else {
  308. backendAddrs = append(backendAddrs, a)
  309. }
  310. }
  311. if lb.ccRemoteLB == nil {
  312. if len(remoteBalancerAddrs) <= 0 {
  313. grpclog.Errorf("grpclb: no remote balancer address is available, should never happen")
  314. return
  315. }
  316. // First time receiving resolved addresses, create a cc to remote
  317. // balancers.
  318. lb.dialRemoteLB(remoteBalancerAddrs[0].ServerName)
  319. // Start the fallback goroutine.
  320. go lb.fallbackToBackendsAfter(lb.fallbackTimeout)
  321. }
  322. // cc to remote balancers uses lb.manualResolver. Send the updated remote
  323. // balancer addresses to it through manualResolver.
  324. lb.manualResolver.NewAddress(remoteBalancerAddrs)
  325. lb.mu.Lock()
  326. lb.resolvedBackendAddrs = backendAddrs
  327. // If serverListReceived is true, connection to remote balancer was
  328. // successful and there's no need to do fallback anymore.
  329. // If fallbackTimerExpired is false, fallback hasn't happened yet.
  330. if !lb.serverListReceived && lb.fallbackTimerExpired {
  331. // This means we received a new list of resolved backends, and we are
  332. // still in fallback mode. Need to update the list of backends we are
  333. // using to the new list of backends.
  334. lb.refreshSubConns(lb.resolvedBackendAddrs, false)
  335. }
  336. lb.mu.Unlock()
  337. }
  338. func (lb *lbBalancer) Close() {
  339. select {
  340. case <-lb.doneCh:
  341. return
  342. default:
  343. }
  344. close(lb.doneCh)
  345. if lb.ccRemoteLB != nil {
  346. lb.ccRemoteLB.Close()
  347. }
  348. lb.cc.close()
  349. }