resolver_conn_wrapper.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "fmt"
  21. "strings"
  22. "google.golang.org/grpc/grpclog"
  23. "google.golang.org/grpc/internal/channelz"
  24. "google.golang.org/grpc/resolver"
  25. )
  26. // ccResolverWrapper is a wrapper on top of cc for resolvers.
  27. // It implements resolver.ClientConnection interface.
  28. type ccResolverWrapper struct {
  29. cc *ClientConn
  30. resolver resolver.Resolver
  31. addrCh chan []resolver.Address
  32. scCh chan string
  33. done chan struct{}
  34. lastAddressesCount int
  35. }
  36. // split2 returns the values from strings.SplitN(s, sep, 2).
  37. // If sep is not found, it returns ("", s, false) instead.
  38. func split2(s, sep string) (string, string, bool) {
  39. spl := strings.SplitN(s, sep, 2)
  40. if len(spl) < 2 {
  41. return "", "", false
  42. }
  43. return spl[0], spl[1], true
  44. }
  45. // parseTarget splits target into a struct containing scheme, authority and
  46. // endpoint.
  47. //
  48. // If target is not a valid scheme://authority/endpoint, it returns {Endpoint:
  49. // target}.
  50. func parseTarget(target string) (ret resolver.Target) {
  51. var ok bool
  52. ret.Scheme, ret.Endpoint, ok = split2(target, "://")
  53. if !ok {
  54. return resolver.Target{Endpoint: target}
  55. }
  56. ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
  57. if !ok {
  58. return resolver.Target{Endpoint: target}
  59. }
  60. return ret
  61. }
  62. // newCCResolverWrapper parses cc.target for scheme and gets the resolver
  63. // builder for this scheme and builds the resolver. The monitoring goroutine
  64. // for it is not started yet and can be created by calling start().
  65. //
  66. // If withResolverBuilder dial option is set, the specified resolver will be
  67. // used instead.
  68. func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
  69. rb := cc.dopts.resolverBuilder
  70. if rb == nil {
  71. return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme)
  72. }
  73. ccr := &ccResolverWrapper{
  74. cc: cc,
  75. addrCh: make(chan []resolver.Address, 1),
  76. scCh: make(chan string, 1),
  77. done: make(chan struct{}),
  78. }
  79. var err error
  80. ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig})
  81. if err != nil {
  82. return nil, err
  83. }
  84. return ccr, nil
  85. }
  86. func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) {
  87. ccr.resolver.ResolveNow(o)
  88. }
  89. func (ccr *ccResolverWrapper) close() {
  90. ccr.resolver.Close()
  91. close(ccr.done)
  92. }
  93. // NewAddress is called by the resolver implemenetion to send addresses to gRPC.
  94. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
  95. select {
  96. case <-ccr.done:
  97. return
  98. default:
  99. }
  100. grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
  101. if channelz.IsOn() {
  102. ccr.addChannelzTraceEvent(addrs)
  103. }
  104. ccr.cc.handleResolvedAddrs(addrs, nil)
  105. }
  106. // NewServiceConfig is called by the resolver implemenetion to send service
  107. // configs to gRPC.
  108. func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
  109. select {
  110. case <-ccr.done:
  111. return
  112. default:
  113. }
  114. grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
  115. ccr.cc.handleServiceConfig(sc)
  116. }
  117. func (ccr *ccResolverWrapper) addChannelzTraceEvent(addrs []resolver.Address) {
  118. if len(addrs) == 0 && ccr.lastAddressesCount != 0 {
  119. channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
  120. Desc: "Resolver returns an empty address list",
  121. Severity: channelz.CtWarning,
  122. })
  123. } else if len(addrs) != 0 && ccr.lastAddressesCount == 0 {
  124. var s string
  125. for i, a := range addrs {
  126. if a.ServerName != "" {
  127. s += a.Addr + "(" + a.ServerName + ")"
  128. } else {
  129. s += a.Addr
  130. }
  131. if i != len(addrs)-1 {
  132. s += " "
  133. }
  134. }
  135. channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
  136. Desc: fmt.Sprintf("Resolver returns a non-empty address list (previous one was empty) %q", s),
  137. Severity: channelz.CtINFO,
  138. })
  139. }
  140. ccr.lastAddressesCount = len(addrs)
  141. }