grpclb_remote_balancer.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpclb
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "net"
  24. "reflect"
  25. "time"
  26. timestamppb "github.com/golang/protobuf/ptypes/timestamp"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/balancer"
  29. lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
  30. "google.golang.org/grpc/connectivity"
  31. "google.golang.org/grpc/grpclog"
  32. "google.golang.org/grpc/internal"
  33. "google.golang.org/grpc/internal/channelz"
  34. "google.golang.org/grpc/metadata"
  35. "google.golang.org/grpc/resolver"
  36. )
  37. // processServerList updates balaner's internal state, create/remove SubConns
  38. // and regenerates picker using the received serverList.
  39. func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
  40. grpclog.Infof("lbBalancer: processing server list: %+v", l)
  41. lb.mu.Lock()
  42. defer lb.mu.Unlock()
  43. // Set serverListReceived to true so fallback will not take effect if it has
  44. // not hit timeout.
  45. lb.serverListReceived = true
  46. // If the new server list == old server list, do nothing.
  47. if reflect.DeepEqual(lb.fullServerList, l.Servers) {
  48. grpclog.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
  49. return
  50. }
  51. lb.fullServerList = l.Servers
  52. var backendAddrs []resolver.Address
  53. for i, s := range l.Servers {
  54. if s.Drop {
  55. continue
  56. }
  57. md := metadata.Pairs(lbTokeyKey, s.LoadBalanceToken)
  58. ip := net.IP(s.IpAddress)
  59. ipStr := ip.String()
  60. if ip.To4() == nil {
  61. // Add square brackets to ipv6 addresses, otherwise net.Dial() and
  62. // net.SplitHostPort() will return too many colons error.
  63. ipStr = fmt.Sprintf("[%s]", ipStr)
  64. }
  65. addr := resolver.Address{
  66. Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
  67. Metadata: &md,
  68. }
  69. grpclog.Infof("lbBalancer: server list entry[%d]: ipStr:|%s|, port:|%d|, load balancer token:|%v|",
  70. i, ipStr, s.Port, s.LoadBalanceToken)
  71. backendAddrs = append(backendAddrs, addr)
  72. }
  73. // Call refreshSubConns to create/remove SubConns.
  74. lb.refreshSubConns(backendAddrs, true)
  75. // Regenerate and update picker no matter if there's update on backends (if
  76. // any SubConn will be newed/removed). Because since the full serverList was
  77. // different, there might be updates in drops or pick weights(different
  78. // number of duplicates). We need to update picker with the fulllist.
  79. //
  80. // Now with cache, even if SubConn was newed/removed, there might be no
  81. // state changes.
  82. lb.regeneratePicker()
  83. lb.cc.UpdateBalancerState(lb.state, lb.picker)
  84. }
  85. // refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool
  86. // indicating whether the backendAddrs are different from the cached
  87. // backendAddrs (whether any SubConn was newed/removed).
  88. // Caller must hold lb.mu.
  89. func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fromGRPCLBServer bool) bool {
  90. opts := balancer.NewSubConnOptions{}
  91. if fromGRPCLBServer {
  92. opts.CredsBundle = lb.grpclbBackendCreds
  93. }
  94. lb.backendAddrs = nil
  95. var backendsUpdated bool
  96. // addrsSet is the set converted from backendAddrs, it's used to quick
  97. // lookup for an address.
  98. addrsSet := make(map[resolver.Address]struct{})
  99. // Create new SubConns.
  100. for _, addr := range backendAddrs {
  101. addrWithoutMD := addr
  102. addrWithoutMD.Metadata = nil
  103. addrsSet[addrWithoutMD] = struct{}{}
  104. lb.backendAddrs = append(lb.backendAddrs, addrWithoutMD)
  105. if _, ok := lb.subConns[addrWithoutMD]; !ok {
  106. backendsUpdated = true
  107. // Use addrWithMD to create the SubConn.
  108. sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, opts)
  109. if err != nil {
  110. grpclog.Warningf("roundrobinBalancer: failed to create new SubConn: %v", err)
  111. continue
  112. }
  113. lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map.
  114. if _, ok := lb.scStates[sc]; !ok {
  115. // Only set state of new sc to IDLE. The state could already be
  116. // READY for cached SubConns.
  117. lb.scStates[sc] = connectivity.Idle
  118. }
  119. sc.Connect()
  120. }
  121. }
  122. for a, sc := range lb.subConns {
  123. // a was removed by resolver.
  124. if _, ok := addrsSet[a]; !ok {
  125. backendsUpdated = true
  126. lb.cc.RemoveSubConn(sc)
  127. delete(lb.subConns, a)
  128. // Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
  129. // The entry will be deleted in HandleSubConnStateChange.
  130. }
  131. }
  132. return backendsUpdated
  133. }
  134. func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error {
  135. for {
  136. reply, err := s.Recv()
  137. if err != nil {
  138. if err == io.EOF {
  139. return errServerTerminatedConnection
  140. }
  141. return fmt.Errorf("grpclb: failed to recv server list: %v", err)
  142. }
  143. if serverList := reply.GetServerList(); serverList != nil {
  144. lb.processServerList(serverList)
  145. }
  146. }
  147. }
  148. func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) {
  149. ticker := time.NewTicker(interval)
  150. defer ticker.Stop()
  151. for {
  152. select {
  153. case <-ticker.C:
  154. case <-s.Context().Done():
  155. return
  156. }
  157. stats := lb.clientStats.toClientStats()
  158. t := time.Now()
  159. stats.Timestamp = &timestamppb.Timestamp{
  160. Seconds: t.Unix(),
  161. Nanos: int32(t.Nanosecond()),
  162. }
  163. if err := s.Send(&lbpb.LoadBalanceRequest{
  164. LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
  165. ClientStats: stats,
  166. },
  167. }); err != nil {
  168. return
  169. }
  170. }
  171. }
  172. func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) {
  173. lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
  174. ctx, cancel := context.WithCancel(context.Background())
  175. defer cancel()
  176. stream, err := lbClient.BalanceLoad(ctx, grpc.FailFast(false))
  177. if err != nil {
  178. return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
  179. }
  180. // grpclb handshake on the stream.
  181. initReq := &lbpb.LoadBalanceRequest{
  182. LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
  183. InitialRequest: &lbpb.InitialLoadBalanceRequest{
  184. Name: lb.target,
  185. },
  186. },
  187. }
  188. if err := stream.Send(initReq); err != nil {
  189. return true, fmt.Errorf("grpclb: failed to send init request: %v", err)
  190. }
  191. reply, err := stream.Recv()
  192. if err != nil {
  193. return true, fmt.Errorf("grpclb: failed to recv init response: %v", err)
  194. }
  195. initResp := reply.GetInitialResponse()
  196. if initResp == nil {
  197. return true, fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
  198. }
  199. if initResp.LoadBalancerDelegate != "" {
  200. return true, fmt.Errorf("grpclb: Delegation is not supported")
  201. }
  202. go func() {
  203. if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
  204. lb.sendLoadReport(stream, d)
  205. }
  206. }()
  207. // No backoff if init req/resp handshake was successful.
  208. return false, lb.readServerList(stream)
  209. }
  210. func (lb *lbBalancer) watchRemoteBalancer() {
  211. var retryCount int
  212. for {
  213. doBackoff, err := lb.callRemoteBalancer()
  214. select {
  215. case <-lb.doneCh:
  216. return
  217. default:
  218. if err != nil {
  219. if err == errServerTerminatedConnection {
  220. grpclog.Info(err)
  221. } else {
  222. grpclog.Warning(err)
  223. }
  224. }
  225. }
  226. if !doBackoff {
  227. retryCount = 0
  228. continue
  229. }
  230. timer := time.NewTimer(lb.backoff.Backoff(retryCount))
  231. select {
  232. case <-timer.C:
  233. case <-lb.doneCh:
  234. timer.Stop()
  235. return
  236. }
  237. retryCount++
  238. }
  239. }
  240. func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
  241. var dopts []grpc.DialOption
  242. if creds := lb.opt.DialCreds; creds != nil {
  243. if err := creds.OverrideServerName(remoteLBName); err == nil {
  244. dopts = append(dopts, grpc.WithTransportCredentials(creds))
  245. } else {
  246. grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v, using Insecure", err)
  247. dopts = append(dopts, grpc.WithInsecure())
  248. }
  249. } else if bundle := lb.grpclbClientConnCreds; bundle != nil {
  250. dopts = append(dopts, grpc.WithCredentialsBundle(bundle))
  251. } else {
  252. dopts = append(dopts, grpc.WithInsecure())
  253. }
  254. if lb.opt.Dialer != nil {
  255. // WithDialer takes a different type of function, so we instead use a
  256. // special DialOption here.
  257. wcd := internal.WithContextDialer.(func(func(context.Context, string) (net.Conn, error)) grpc.DialOption)
  258. dopts = append(dopts, wcd(lb.opt.Dialer))
  259. }
  260. // Explicitly set pickfirst as the balancer.
  261. dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName))
  262. wrb := internal.WithResolverBuilder.(func(resolver.Builder) grpc.DialOption)
  263. dopts = append(dopts, wrb(lb.manualResolver))
  264. if channelz.IsOn() {
  265. dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID))
  266. }
  267. // DialContext using manualResolver.Scheme, which is a random scheme
  268. // generated when init grpclb. The target scheme here is not important.
  269. //
  270. // The grpc dial target will be used by the creds (ALTS) as the authority,
  271. // so it has to be set to remoteLBName that comes from resolver.
  272. cc, err := grpc.DialContext(context.Background(), remoteLBName, dopts...)
  273. if err != nil {
  274. grpclog.Fatalf("failed to dial: %v", err)
  275. }
  276. lb.ccRemoteLB = cc
  277. go lb.watchRemoteBalancer()
  278. }