123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- package service
- import (
- "context"
- "encoding/json"
- "fmt"
- "net/url"
- "sort"
- "sync"
- "go-common/app/admin/main/apm/conf"
- "go-common/app/admin/main/apm/model/discovery"
- "go-common/library/conf/env"
- "go-common/library/ecode"
- "go-common/library/log"
- "go-common/library/naming"
- errgroup "go-common/library/sync/errgroup.v2"
- )
- // DiscoveryProxy discovery proxy.
- func (s *Service) DiscoveryProxy(c context.Context, method, loc string, params url.Values) (data interface{}, err error) {
- var bs json.RawMessage
- uri := conf.Conf.Host.APICo + "/discovery/" + loc
- if loc == "nodes" {
- api := conf.Conf.Discovery.API
- v1 := make(map[string]interface{})
- for _, v := range api {
- sData := make([]*discovery.Addr, 0, 3)
- uri = "http://" + v + "/discovery/" + loc
- bs, err = s.curl(c, method, uri, params)
- if err != nil {
- log.Error("apmSvc.DiscoveryProxy curl url:"+uri+" params:(%v) error(%v)", params.Encode(), err)
- return nil, err
- }
- if err = json.Unmarshal(bs, &sData); err != nil {
- return nil, err
- }
- Others := make([]*discovery.Addr, 0, len(sData))
- tmpData := &discovery.Status{}
- tmpData.Status = 0
- for _, vv := range sData {
- if vv.Addr == v {
- tmpData.Status = vv.Status
- continue
- }
- Others = append(Others, vv)
- }
- tmpData.Others = Others
- v1[v] = tmpData
- }
- data = v1
- } else if loc == "polling" {
- api := conf.Conf.Discovery.API
- v1 := make([]string, 0)
- tmp := make(map[string]struct{})
- for _, v := range api {
- sData := make([]string, 0)
- uri = "http://" + v + "/discovery/" + loc
- bs, err = s.curl(c, method, uri, params)
- if err != nil {
- log.Error("apmSvc.DiscoveryProxy curl url:"+uri+" params:(%v) error(%v)", params.Encode(), err)
- return nil, err
- }
- if err = json.Unmarshal(bs, &sData); err != nil {
- return nil, err
- }
- for _, vv := range sData {
- if _, ok := tmp[vv]; !ok {
- v1 = append(v1, vv)
- tmp[vv] = struct{}{}
- }
- }
- }
- sort.Strings(v1)
- data = v1
- } else {
- data, err = s.curl(c, method, uri, params)
- }
- return
- }
- func (s *Service) curl(c context.Context, method, uri string, params url.Values) (data json.RawMessage, err error) {
- var res struct {
- Code int `json:"code"`
- Data json.RawMessage `json:"data"`
- Message string `json:"message"`
- }
- if method == "GET" {
- if err = s.client.Get(c, uri, "", params, &res); err != nil {
- log.Error("apmSvc.DiscoveryProxy get url:"+uri+" params:(%v) error(%v)", params.Encode(), err)
- return
- }
- } else {
- if err = s.client.Post(c, uri, "", params, &res); err != nil {
- log.Error("apmSvc.DiscoveryProxy post url:"+uri+" params:(%v) error(%v)", params.Encode(), err)
- return
- }
- }
- if res.Code != 0 {
- err = ecode.Int(res.Code)
- log.Error("apmSvc.DiscoveryProxy url:"+uri+" params:(%v) ecode(%v)", params.Encode(), err)
- return
- }
- data = res.Data
- return
- }
- // DatabusConsumerAddrs databus consumer addrs.
- func (s *Service) DatabusConsumerAddrs(c context.Context, group string) (addrs []string, err error) {
- uri := conf.Conf.Host.APICo + "/discovery/fetch"
- params := url.Values{}
- params.Set("env", env.DeployEnv)
- params.Set("appid", "middleware.databus")
- params.Set("status", "1")
- data, err := s.curl(c, "GET", uri, params)
- if err != nil {
- log.Error("curl discovery fetch error:%v", err)
- return
- }
- var res struct {
- ZoneInstances map[string][]*naming.Instance `json:"zone_instances"`
- }
- if err = json.Unmarshal(data, &res); err != nil {
- log.Error("json unmarshal discovery data error:%v", err)
- return
- }
- var (
- wg errgroup.Group
- lock sync.Mutex
- )
- for _, instances := range res.ZoneInstances {
- for _, instance := range instances {
- for _, addr := range instance.Addrs {
- u, _ := url.Parse(addr)
- if u.Scheme == "http" {
- wg.Go(func(c context.Context) error {
- p := url.Values{}
- p.Set("group", group)
- var as struct {
- Code int `json:"code"`
- Data []string `json:"data"`
- }
- if e := s.client.Get(context.Background(), fmt.Sprintf("http://%s/databus/consumer/addrs", u.Host), "", p, &as); e != nil {
- log.Error("curl databus consumer addrs error:%v", e)
- return nil
- }
- lock.Lock()
- addrs = append(addrs, as.Data...)
- lock.Unlock()
- return nil
- })
- }
- }
- }
- }
- wg.Wait()
- return
- }
|