resolver.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package resolver
  2. import (
  3. "context"
  4. "math/rand"
  5. "net/url"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "go-common/library/conf/env"
  11. "go-common/library/log"
  12. "go-common/library/naming"
  13. wmeta "go-common/library/net/rpc/warden/metadata"
  14. "github.com/dgryski/go-farm"
  15. "github.com/pkg/errors"
  16. "google.golang.org/grpc/resolver"
  17. )
  18. const (
  19. // Scheme is the scheme of discovery address
  20. Scheme = "grpc"
  21. )
  22. var (
  23. _ resolver.Resolver = &Resolver{}
  24. _ resolver.Builder = &Builder{}
  25. mu sync.Mutex
  26. )
  27. // Register register resolver builder if nil.
  28. func Register(b naming.Builder) {
  29. mu.Lock()
  30. defer mu.Unlock()
  31. if resolver.Get(b.Scheme()) == nil {
  32. resolver.Register(&Builder{b})
  33. }
  34. }
  35. // Set override any registered builder
  36. func Set(b naming.Builder) {
  37. mu.Lock()
  38. defer mu.Unlock()
  39. resolver.Register(&Builder{b})
  40. }
  41. // Builder is also a resolver builder.
  42. // It's build() function always returns itself.
  43. type Builder struct {
  44. naming.Builder
  45. }
  46. // Build returns itself for Resolver, because it's both a builder and a resolver.
  47. func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
  48. var zone = env.Zone
  49. ss := int64(50)
  50. clusters := map[string]struct{}{}
  51. str := strings.SplitN(target.Endpoint, "?", 2)
  52. if len(str) == 0 {
  53. return nil, errors.Errorf("warden resolver: parse target.Endpoint(%s) failed!err:=endpoint is empty", target.Endpoint)
  54. } else if len(str) == 2 {
  55. m, err := url.ParseQuery(str[1])
  56. if err == nil {
  57. for _, c := range m[naming.MetaCluster] {
  58. clusters[c] = struct{}{}
  59. }
  60. zones := m[naming.MetaZone]
  61. if len(zones) > 0 {
  62. zone = zones[0]
  63. }
  64. if sub, ok := m["subset"]; ok {
  65. if t, err := strconv.ParseInt(sub[0], 10, 64); err == nil {
  66. ss = t
  67. }
  68. }
  69. }
  70. }
  71. r := &Resolver{
  72. nr: b.Builder.Build(str[0]),
  73. cc: cc,
  74. quit: make(chan struct{}, 1),
  75. clusters: clusters,
  76. zone: zone,
  77. subsetSize: ss,
  78. }
  79. go r.updateproc()
  80. return r, nil
  81. }
  82. // Resolver watches for the updates on the specified target.
  83. // Updates include address updates and service config updates.
  84. type Resolver struct {
  85. nr naming.Resolver
  86. cc resolver.ClientConn
  87. quit chan struct{}
  88. clusters map[string]struct{}
  89. zone string
  90. subsetSize int64
  91. }
  92. // Close is a noop for Resolver.
  93. func (r *Resolver) Close() {
  94. select {
  95. case r.quit <- struct{}{}:
  96. r.nr.Close()
  97. default:
  98. }
  99. }
  100. // ResolveNow is a noop for Resolver.
  101. func (r *Resolver) ResolveNow(o resolver.ResolveNowOption) {
  102. }
  103. func (r *Resolver) updateproc() {
  104. event := r.nr.Watch()
  105. for {
  106. select {
  107. case <-r.quit:
  108. return
  109. case _, ok := <-event:
  110. if !ok {
  111. return
  112. }
  113. }
  114. if insMap, ok := r.nr.Fetch(context.Background()); ok {
  115. instances, ok := insMap[r.zone]
  116. if !ok {
  117. for _, value := range insMap {
  118. instances = append(instances, value...)
  119. }
  120. }
  121. if r.subsetSize > 0 && len(instances) > 0 {
  122. instances = r.subset(instances, env.Hostname, r.subsetSize)
  123. }
  124. r.newAddress(instances)
  125. }
  126. }
  127. }
  128. func (r *Resolver) subset(backends []*naming.Instance, clientID string, size int64) []*naming.Instance {
  129. if len(backends) <= int(size) {
  130. return backends
  131. }
  132. sort.Slice(backends, func(i, j int) bool {
  133. return backends[i].Hostname < backends[j].Hostname
  134. })
  135. count := int64(len(backends)) / size
  136. id := farm.Fingerprint64([]byte(clientID))
  137. round := int64(id / uint64(count))
  138. s := rand.NewSource(round)
  139. ra := rand.New(s)
  140. ra.Shuffle(len(backends), func(i, j int) {
  141. backends[i], backends[j] = backends[j], backends[i]
  142. })
  143. start := (id % uint64(count)) * uint64(size)
  144. return backends[int(start) : int(start)+int(size)]
  145. }
  146. func (r *Resolver) newAddress(instances []*naming.Instance) {
  147. if len(instances) <= 0 {
  148. return
  149. }
  150. addrs := make([]resolver.Address, 0, len(instances))
  151. for _, ins := range instances {
  152. if len(r.clusters) > 0 {
  153. if _, ok := r.clusters[ins.Metadata[naming.MetaCluster]]; !ok {
  154. continue
  155. }
  156. }
  157. var weight int64
  158. if weight, _ = strconv.ParseInt(ins.Metadata[naming.MetaWeight], 10, 64); weight <= 0 {
  159. weight = 10
  160. }
  161. var rpc string
  162. for _, a := range ins.Addrs {
  163. u, err := url.Parse(a)
  164. if err == nil && u.Scheme == Scheme {
  165. rpc = u.Host
  166. }
  167. }
  168. if rpc == "" {
  169. log.Warn("warden/resolver: invalid rpc address(%s,%s,%v) found!", ins.AppID, ins.Hostname, ins.Addrs)
  170. continue
  171. }
  172. addr := resolver.Address{
  173. Addr: rpc,
  174. Type: resolver.Backend,
  175. ServerName: ins.AppID,
  176. Metadata: wmeta.MD{Weight: weight, Color: ins.Metadata[naming.MetaColor]},
  177. }
  178. addrs = append(addrs, addr)
  179. }
  180. r.cc.NewAddress(addrs)
  181. }