123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- package discovery
- import (
- "context"
- "encoding/json"
- "fmt"
- "math/rand"
- "net"
- "net/http"
- "net/url"
- "os"
- "strings"
- "sync"
- "time"
- "go-common/library/log"
- "go-common/library/naming"
- "go-common/library/stat/prom"
- "go-common/app/service/main/bns/agent/backend"
- )
- func init() {
- backend.Registry("discovery", New)
- }
- const (
- // NodeStatusUP Ready to receive register
- NodeStatusUP NodeStatus = iota
- // NodeStatusLost lost with each other
- NodeStatusLost
- defaultCacheExpireIn int64 = 10
- )
- // NodeStatus Status of instance
- type NodeStatus int
- // ServerNode backend servier node status
- type ServerNode struct {
- Addr string `json:"addr"`
- Zone string `json:"zone"`
- Status NodeStatus `json:"status"`
- }
- // InstanceList discovery instance list
- type InstanceList struct {
- Instances []naming.Instance `json:"instances"`
- LatestTimestamp int64 `json:"latest_timestamp"`
- }
- // InstanceMetadata discovery instance metadata
- type InstanceMetadata struct {
- Provider interface{} `json:"provider"`
- }
- type discoveryResp struct {
- Code int64 `json:"code"`
- Message string `json:"message"`
- }
- type discoveryFetchResp struct {
- discoveryResp `json:",inline"`
- Data *InstanceList `json:"data"`
- }
- type discoveryNodeResp struct {
- discoveryResp `json:",inline"`
- Data []ServerNode `json:"data"`
- }
- var urlParse = url.Parse
- // New discovery backend
- func New(config map[string]interface{}) (backend.Backend, error) {
- if config == nil {
- return nil, fmt.Errorf("discovery require url, secret, appkey")
- }
- var url, secret, appKey string
- var ok bool
- discoveryURL := os.Getenv("DISCOVERY_URL")
- if url, ok = config["url"].(string); !ok && discoveryURL == "" {
- return nil, fmt.Errorf("discovery require `url`")
- }
- // use env DISCOVERY_URL overwrite config
- if discoveryURL != "" {
- url = discoveryURL
- }
- if !strings.HasPrefix(url, "http://") && !strings.HasPrefix(url, "https://") {
- url = "http://" + url
- }
- refreshInterval := time.Minute
- if second, ok := config["refresh_interval"].(int); ok {
- refreshInterval = time.Duration(second) * time.Second
- }
- cacheExpireIn := defaultCacheExpireIn
- if second, ok := config["cacheExpireIn"].(int); ok {
- cacheExpireIn = int64(second)
- }
- u, err := urlParse(url)
- if err != nil {
- return nil, err
- }
- dis := &discovery{
- client: http.DefaultClient,
- scheme: u.Scheme,
- host: u.Host,
- discoveryHosts: []string{u.Host},
- secret: secret,
- appKey: appKey,
- refreshTick: time.NewTicker(refreshInterval),
- cacheMap: make(map[string]*cacheNode),
- cacheExpireIn: cacheExpireIn,
- }
- go dis.daemon()
- return dis, nil
- }
- type cacheNode struct {
- expired int64
- data []*backend.Instance
- }
- func (c *cacheNode) IsExpired() bool {
- return time.Now().Unix() > c.expired
- }
- func newCacheNode(data []*backend.Instance, expireIn int64) *cacheNode {
- return &cacheNode{expired: time.Now().Unix() + expireIn, data: data}
- }
- type discovery struct {
- client *http.Client
- secret string
- appKey string
- host string
- scheme string
- discoveryHosts []string
- rmx sync.RWMutex
- cachermx sync.RWMutex
- cacheMap map[string]*cacheNode
- cacheExpireIn int64
- refreshTick *time.Ticker
- }
- var _ backend.Backend = &discovery{}
- func (d *discovery) Ping(ctx context.Context) error {
- _, err := d.Nodes(ctx)
- return err
- }
- func (d *discovery) Close(ctx context.Context) error {
- d.refreshTick.Stop()
- return nil
- }
- func (d *discovery) daemon() {
- for range d.refreshTick.C {
- log.V(10).Info("refresh discovery nodes ...")
- nodes, err := d.Nodes(context.Background())
- if err != nil {
- log.Error("refresh discovery nodes error %s", err)
- continue
- }
- hosts := make([]string, 0, len(nodes))
- for i := range nodes {
- if nodes[i].Status == NodeStatusUP {
- hosts = append(hosts, nodes[i].Addr)
- }
- }
- d.rmx.Lock()
- d.discoveryHosts = hosts
- d.rmx.Unlock()
- log.V(10).Info("new discovery nodes list %v", hosts)
- }
- }
- func (d *discovery) Nodes(ctx context.Context) ([]ServerNode, error) {
- req, err := http.NewRequest(http.MethodGet, "/discovery/nodes", nil)
- if err != nil {
- return nil, err
- }
- resp, err := d.do(req)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- nodesResp := &discoveryNodeResp{}
- if err := json.NewDecoder(resp.Body).Decode(nodesResp); err != nil {
- return nil, err
- }
- if nodesResp.Code < 0 || nodesResp.Data == nil || len(nodesResp.Data) == 0 {
- return nil, fmt.Errorf("no data found, err: %s", nodesResp.Message)
- }
- log.V(10).Info("responsed data: %v", nodesResp.Data)
- return nodesResp.Data, nil
- }
- func (d *discovery) do(req *http.Request) (resp *http.Response, err error) {
- req.URL.Scheme = d.scheme
- req.Host = d.host
- req.URL.Scheme = d.scheme
- req.Host = d.host
- d.rmx.RLock()
- hosts := d.discoveryHosts
- d.rmx.RUnlock()
- for _, host := range shuffle(hosts) {
- req.URL.Host = host
- resp, err = d.client.Do(req)
- log.V(5).Info("request discovery.. request: %s", req.URL)
- if err == nil {
- return
- }
- log.Error("request discovery %s err %s, try next", host, err)
- }
- return
- }
- // Query appid
- func (d *discovery) Query(ctx context.Context, target backend.Target, sel backend.Selector, md backend.Metadata) ([]*backend.Instance, error) {
- // TODO: parallel query
- key := target.Name + sel.String()
- if data, ok := d.fromCache(key); ok {
- prom.CacheHit.Incr("bns:discovery_mem_cache_hit")
- return data, nil
- }
- prom.CacheMiss.Incr("bns:discovery_mem_cache_miss")
- instanceList, err := d.fetch(ctx, target.Name, sel, md)
- if err != nil {
- return nil, err
- }
- data := copyInstance(instanceList)
- if len(data) != 0 {
- d.setCache(key, data)
- }
- return data, nil
- }
- func (d *discovery) fromCache(key string) ([]*backend.Instance, bool) {
- d.cachermx.RLock()
- defer d.cachermx.RUnlock()
- node, ok := d.cacheMap[key]
- if !ok || node.IsExpired() {
- return nil, false
- }
- return node.data, true
- }
- func (d *discovery) setCache(key string, data []*backend.Instance) {
- d.cachermx.Lock()
- defer d.cachermx.Unlock()
- d.cacheMap[key] = newCacheNode(data, d.cacheExpireIn)
- }
- func (d *discovery) fetch(ctx context.Context, discoveryID string, sel backend.Selector, md backend.Metadata) (*InstanceList, error) {
- params := url.Values{}
- params.Add("appid", discoveryID)
- params.Add("env", sel.Env)
- params.Add("region", sel.Region)
- params.Add("hostname", md.ClientHost)
- params.Add("zone", sel.Zone)
- params.Add("status", "1")
- if md.LatestTimestamps != "" {
- params.Add("latest_timestamp", md.LatestTimestamps)
- } else {
- params.Add("latest_timestamp", "0")
- }
- payload := params.Encode()
- req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("/discovery/fetch?%s", payload), nil)
- if err != nil {
- return nil, err
- }
- resp, err := d.do(req)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- fetchResp := &discoveryFetchResp{}
- if err := json.NewDecoder(resp.Body).Decode(fetchResp); err != nil {
- return nil, err
- }
- if fetchResp.Code < 0 || fetchResp.Data == nil || fetchResp.Data.Instances == nil || len(fetchResp.Data.Instances) == 0 {
- return nil, fmt.Errorf("no data found, err: %s", fetchResp.Message)
- }
- log.V(10).Info("fetchResponsed data: %v", fetchResp.Data)
- return fetchResp.Data, nil
- }
- func shuffle(hosts []string) []string {
- for i := 0; i < len(hosts); i++ {
- j := rand.Intn(len(hosts) - i)
- hosts[i], hosts[i+j] = hosts[i+j], hosts[i]
- }
- return hosts
- }
- func copyInstance(il *InstanceList) (inss []*backend.Instance) {
- copyloop:
- for _, in := range il.Instances {
- out := &backend.Instance{
- DiscoveryID: in.AppID,
- Env: in.Env,
- Hostname: in.Hostname,
- Zone: in.Zone,
- }
- for _, addr := range in.Addrs {
- ip, err := ipFromURI(addr)
- if err == nil {
- out.IPAddr = ip
- inss = append(inss, out)
- continue copyloop
- }
- log.Error("extract ip from addr %s error: %s", addr, err)
- }
- log.Error("can't found any ip for discoveryID: %s", in.AppID)
- }
- return
- }
- // extract ip from uri
- func ipFromURI(uri string) (net.IP, error) {
- var hostport string
- if u, err := url.Parse(uri); err != nil {
- hostport = uri
- } else {
- hostport = u.Host
- }
- if strings.ContainsRune(hostport, ':') {
- host, _, err := net.SplitHostPort(hostport)
- if err != nil {
- return net.IPv4zero, err
- }
- return net.ParseIP(host), nil
- }
- return net.ParseIP(hostport), nil
- }
|