discovery.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net/url"
  7. "sort"
  8. "sync"
  9. "go-common/app/admin/main/apm/conf"
  10. "go-common/app/admin/main/apm/model/discovery"
  11. "go-common/library/conf/env"
  12. "go-common/library/ecode"
  13. "go-common/library/log"
  14. "go-common/library/naming"
  15. errgroup "go-common/library/sync/errgroup.v2"
  16. )
  17. // DiscoveryProxy discovery proxy.
  18. func (s *Service) DiscoveryProxy(c context.Context, method, loc string, params url.Values) (data interface{}, err error) {
  19. var bs json.RawMessage
  20. uri := conf.Conf.Host.APICo + "/discovery/" + loc
  21. if loc == "nodes" {
  22. api := conf.Conf.Discovery.API
  23. v1 := make(map[string]interface{})
  24. for _, v := range api {
  25. sData := make([]*discovery.Addr, 0, 3)
  26. uri = "http://" + v + "/discovery/" + loc
  27. bs, err = s.curl(c, method, uri, params)
  28. if err != nil {
  29. log.Error("apmSvc.DiscoveryProxy curl url:"+uri+" params:(%v) error(%v)", params.Encode(), err)
  30. return nil, err
  31. }
  32. if err = json.Unmarshal(bs, &sData); err != nil {
  33. return nil, err
  34. }
  35. Others := make([]*discovery.Addr, 0, len(sData))
  36. tmpData := &discovery.Status{}
  37. tmpData.Status = 0
  38. for _, vv := range sData {
  39. if vv.Addr == v {
  40. tmpData.Status = vv.Status
  41. continue
  42. }
  43. Others = append(Others, vv)
  44. }
  45. tmpData.Others = Others
  46. v1[v] = tmpData
  47. }
  48. data = v1
  49. } else if loc == "polling" {
  50. api := conf.Conf.Discovery.API
  51. v1 := make([]string, 0)
  52. tmp := make(map[string]struct{})
  53. for _, v := range api {
  54. sData := make([]string, 0)
  55. uri = "http://" + v + "/discovery/" + loc
  56. bs, err = s.curl(c, method, uri, params)
  57. if err != nil {
  58. log.Error("apmSvc.DiscoveryProxy curl url:"+uri+" params:(%v) error(%v)", params.Encode(), err)
  59. return nil, err
  60. }
  61. if err = json.Unmarshal(bs, &sData); err != nil {
  62. return nil, err
  63. }
  64. for _, vv := range sData {
  65. if _, ok := tmp[vv]; !ok {
  66. v1 = append(v1, vv)
  67. tmp[vv] = struct{}{}
  68. }
  69. }
  70. }
  71. sort.Strings(v1)
  72. data = v1
  73. } else {
  74. data, err = s.curl(c, method, uri, params)
  75. }
  76. return
  77. }
  78. func (s *Service) curl(c context.Context, method, uri string, params url.Values) (data json.RawMessage, err error) {
  79. var res struct {
  80. Code int `json:"code"`
  81. Data json.RawMessage `json:"data"`
  82. Message string `json:"message"`
  83. }
  84. if method == "GET" {
  85. if err = s.client.Get(c, uri, "", params, &res); err != nil {
  86. log.Error("apmSvc.DiscoveryProxy get url:"+uri+" params:(%v) error(%v)", params.Encode(), err)
  87. return
  88. }
  89. } else {
  90. if err = s.client.Post(c, uri, "", params, &res); err != nil {
  91. log.Error("apmSvc.DiscoveryProxy post url:"+uri+" params:(%v) error(%v)", params.Encode(), err)
  92. return
  93. }
  94. }
  95. if res.Code != 0 {
  96. err = ecode.Int(res.Code)
  97. log.Error("apmSvc.DiscoveryProxy url:"+uri+" params:(%v) ecode(%v)", params.Encode(), err)
  98. return
  99. }
  100. data = res.Data
  101. return
  102. }
  103. // DatabusConsumerAddrs databus consumer addrs.
  104. func (s *Service) DatabusConsumerAddrs(c context.Context, group string) (addrs []string, err error) {
  105. uri := conf.Conf.Host.APICo + "/discovery/fetch"
  106. params := url.Values{}
  107. params.Set("env", env.DeployEnv)
  108. params.Set("appid", "middleware.databus")
  109. params.Set("status", "1")
  110. data, err := s.curl(c, "GET", uri, params)
  111. if err != nil {
  112. log.Error("curl discovery fetch error:%v", err)
  113. return
  114. }
  115. var res struct {
  116. ZoneInstances map[string][]*naming.Instance `json:"zone_instances"`
  117. }
  118. if err = json.Unmarshal(data, &res); err != nil {
  119. log.Error("json unmarshal discovery data error:%v", err)
  120. return
  121. }
  122. var (
  123. wg errgroup.Group
  124. lock sync.Mutex
  125. )
  126. for _, instances := range res.ZoneInstances {
  127. for _, instance := range instances {
  128. for _, addr := range instance.Addrs {
  129. u, _ := url.Parse(addr)
  130. if u.Scheme == "http" {
  131. wg.Go(func(c context.Context) error {
  132. p := url.Values{}
  133. p.Set("group", group)
  134. var as struct {
  135. Code int `json:"code"`
  136. Data []string `json:"data"`
  137. }
  138. if e := s.client.Get(context.Background(), fmt.Sprintf("http://%s/databus/consumer/addrs", u.Host), "", p, &as); e != nil {
  139. log.Error("curl databus consumer addrs error:%v", e)
  140. return nil
  141. }
  142. lock.Lock()
  143. addrs = append(addrs, as.Data...)
  144. lock.Unlock()
  145. return nil
  146. })
  147. }
  148. }
  149. }
  150. }
  151. wg.Wait()
  152. return
  153. }