consul_resolver.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package consul
  2. import (
  3. "errors"
  4. "fmt"
  5. "regexp"
  6. "sync"
  7. "github.com/hashicorp/consul/api"
  8. "google.golang.org/grpc/resolver"
  9. )
  10. const (
  11. defaultPort = "8500"
  12. )
  13. var (
  14. errMissingAddr = errors.New("consul resolver: missing address")
  15. errAddrMisMatch = errors.New("consul resolver: invalied uri")
  16. errEndsWithColon = errors.New("consul resolver: missing port after port-separator colon")
  17. regexConsul, _ = regexp.Compile("^([A-z0-9.]+)(:[0-9]{1,5})?/([A-z_]+)$")
  18. )
  19. func Init() {
  20. fmt.Printf("calling consul init\n")
  21. resolver.Register(NewBuilder())
  22. }
  23. type consulBuilder struct {
  24. }
  25. type consulResolver struct {
  26. address string
  27. wg sync.WaitGroup
  28. cc resolver.ClientConn
  29. name string
  30. disableServiceConfig bool
  31. lastIndex uint64
  32. }
  33. func NewBuilder() resolver.Builder {
  34. return &consulBuilder{}
  35. }
  36. func (cb *consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
  37. fmt.Printf("calling consul build\n")
  38. fmt.Printf("target: %v\n", target)
  39. host, port, name, err := parseTarget(fmt.Sprintf("%s/%s", target.Authority, target.Endpoint))
  40. if err != nil {
  41. return nil, err
  42. }
  43. cr := &consulResolver{
  44. address: fmt.Sprintf("%s%s", host, port),
  45. name: name,
  46. cc: cc,
  47. disableServiceConfig: opts.DisableServiceConfig,
  48. lastIndex: 0,
  49. }
  50. cr.wg.Add(1)
  51. go cr.watcher()
  52. return cr, nil
  53. }
  54. func (cr *consulResolver) watcher() {
  55. fmt.Printf("calling consul watcher\n")
  56. config := api.DefaultConfig()
  57. config.Address = cr.address
  58. client, err := api.NewClient(config)
  59. if err != nil {
  60. fmt.Printf("error create consul client: %v\n", err)
  61. return
  62. }
  63. for {
  64. services, metainfo, err := client.Health().Service(cr.name, cr.name, true, &api.QueryOptions{WaitIndex: cr.lastIndex})
  65. if err != nil {
  66. fmt.Printf("error retrieving instances from Consul: %v", err)
  67. }
  68. cr.lastIndex = metainfo.LastIndex
  69. var newAddrs []resolver.Address
  70. for _, service := range services {
  71. addr := fmt.Sprintf("%v:%v", service.Service.Address, service.Service.Port)
  72. newAddrs = append(newAddrs, resolver.Address{Addr: addr})
  73. }
  74. fmt.Printf("adding service addrs\n")
  75. fmt.Printf("newAddrs: %v\n", newAddrs)
  76. cr.cc.NewAddress(newAddrs)
  77. cr.cc.NewServiceConfig(cr.name)
  78. }
  79. }
  80. func (cb *consulBuilder) Scheme() string {
  81. return "consul"
  82. }
  83. func (cr *consulResolver) ResolveNow(opt resolver.ResolveNowOption) {
  84. }
  85. func (cr *consulResolver) Close() {
  86. }
  87. func parseTarget(target string) (host, port, name string, err error) {
  88. fmt.Printf("target uri: %v\n", target)
  89. if target == "" {
  90. return "", "", "", errMissingAddr
  91. }
  92. if !regexConsul.MatchString(target) {
  93. return "", "", "", errAddrMisMatch
  94. }
  95. groups := regexConsul.FindStringSubmatch(target)
  96. host = groups[1]
  97. port = groups[2]
  98. name = groups[3]
  99. if port == "" {
  100. port = defaultPort
  101. }
  102. return host, port, name, nil
  103. }