123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- package agent
- import (
- "context"
- "fmt"
- "net"
- "net/http"
- "os"
- "strings"
- "sync"
- "time"
- "go-common/app/service/main/bns/agent/backend"
- "go-common/app/service/main/bns/conf"
- "go-common/library/conf/env"
- "go-common/library/log"
- "go-common/library/stat/prom"
- )
- // Agent easyns agent
- type Agent struct {
- // plugable easyns server backend
- backend backend.Backend
- // agent id, now it is os hostname
- agentID string
- // agent cfg
- cfg *conf.Config
- // agent local cache, distributed by region, zone, env
- //caches cache.LocalCaches
- // httpAddrs are the addresses per protocol the HTTP server binds to
- httpAddrs []conf.ProtoAddr
- // httpServers provides the HTTP API on various endpoints
- httpServers []*HTTPServer
- // dnsAddr is the address the DNS server binds to
- dnsAddrs []conf.ProtoAddr
- // dnsServer provides the DNS API
- dnsServers []*DNSServer
- // wgServers is the wait group for all HTTP servers
- wgServers sync.WaitGroup
- }
- // New agent
- func New(c *conf.Config) (*Agent, error) {
- hostname, err := os.Hostname()
- if err != nil {
- return nil, fmt.Errorf("get hostname as agent id failed: %s", err)
- }
- httpAddrs, err := c.HTTPAddrs()
- if err != nil {
- return nil, fmt.Errorf("invalid HTTP bind address: %s", err)
- }
- dnsAddrs, err := c.DNSAddrs()
- if err != nil {
- return nil, fmt.Errorf("invalid DNS bind address: %s", err)
- }
- // var caches cache.LocalCaches
- a := &Agent{
- cfg: c,
- agentID: hostname,
- httpAddrs: httpAddrs,
- dnsAddrs: dnsAddrs,
- }
- return a, nil
- }
- // Start agent
- func (a *Agent) Start() error {
- // start DNS servers
- if err := a.listenAndServeDNS(); err != nil {
- return err
- }
- // listen HTTP
- httpln, err := a.listenHTTP(a.httpAddrs)
- if err != nil {
- return err
- }
- // initial backend
- a.backend, err = backend.New(a.cfg.Backend.Backend, a.cfg.Backend.Config)
- if err != nil {
- return err
- }
- ctx, cancel := context.WithTimeout(context.Background(), time.Second)
- defer cancel()
- // make sure backend is available
- if err = a.backend.Ping(ctx); err != nil {
- return err
- }
- // start serving http servers
- for _, l := range httpln {
- srv := NewHTTPServer(l.Addr().String(), a)
- if err := a.serveHTTP(l, srv); err != nil {
- return err
- }
- a.httpServers = append(a.httpServers, srv)
- }
- return nil
- }
- // Query name
- func (a *Agent) Query(name string) ([]*backend.Instance, error) {
- target, sel, err := backend.ParseName(name, a.DefaultSel())
- if err != nil {
- log.Error("dns: parse name failed! name: %s, err: %s", name, err)
- return nil, err
- }
- // TODO
- inss, err := a.backend.Query(context.Background(), target, sel, backend.Metadata{})
- if err != nil {
- prom.BusinessErrCount.Incr("bns:query")
- }
- return inss, err
- }
- func (a *Agent) listenAndServeDNS() error {
- notif := make(chan conf.ProtoAddr, len(a.dnsAddrs))
- for _, p := range a.dnsAddrs {
- p := p // capture loop var
- // create server
- svr, err := NewDNSServer(a, a.cfg.DNS)
- if err != nil {
- return err
- }
- a.dnsServers = append(a.dnsServers, svr)
- // start server
- a.wgServers.Add(1)
- go func() {
- defer a.wgServers.Done()
- err := svr.ListenAndServe(p.Net, p.Addr, func() { notif <- p })
- if err != nil && !strings.Contains(err.Error(), "accept") {
- log.Error("agent: Error starting DNS server %s (%s): %v", p.Addr, p.Net, err)
- }
- }()
- }
- // wait for servers to be up
- timeout := time.After(time.Second)
- for range a.dnsAddrs {
- select {
- case p := <-notif:
- log.Info("agent: Started DNS server %s (%s)", p.Addr, p.Net)
- continue
- case <-timeout:
- return fmt.Errorf("agent: timeout starting DNS servers")
- }
- }
- return nil
- }
- func (a *Agent) listenHTTP(addrs []conf.ProtoAddr) ([]net.Listener, error) {
- var ln []net.Listener
- for _, p := range addrs {
- var l net.Listener
- var err error
- switch {
- case p.Net == "unix":
- l, err = a.listenSocket(p.Addr)
- case p.Net == "tcp" && p.Proto == "http":
- l, err = net.Listen("tcp", p.Addr)
- default:
- return nil, fmt.Errorf("%s:%s listener not supported", p.Net, p.Proto)
- }
- if err != nil {
- for _, l := range ln {
- l.Close()
- }
- return nil, err
- }
- if tcpl, ok := l.(*net.TCPListener); ok {
- l = &tcpKeepAliveListener{tcpl}
- }
- ln = append(ln, l)
- }
- return ln, nil
- }
- // tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
- // connections. It's used by NewHttpServer so dead TCP connections
- // eventually go away.
- type tcpKeepAliveListener struct {
- *net.TCPListener
- }
- func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
- tc, err := ln.AcceptTCP()
- if err != nil {
- return
- }
- tc.SetKeepAlive(true)
- tc.SetKeepAlivePeriod(30 * time.Second)
- return tc, nil
- }
- func (a *Agent) listenSocket(path string) (net.Listener, error) {
- if _, err := os.Stat(path); !os.IsNotExist(err) {
- log.Warn("agent: Replacing socket %q\n", path)
- }
- if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
- return nil, fmt.Errorf("error removing socket file: %s", err)
- }
- l, err := net.Listen("unix", path)
- if err != nil {
- return nil, err
- }
- // TODO: set file permissions.
- return l, nil
- }
- func (a *Agent) serveHTTP(l net.Listener, srv *HTTPServer) error {
- srv.proto = "http"
- notif := make(chan string)
- a.wgServers.Add(1)
- go func() {
- defer a.wgServers.Done()
- notif <- srv.Addr
- err := srv.Serve(l)
- if err != nil && err != http.ErrServerClosed {
- log.Error("agent: Error starting http service: %s\n", err.Error())
- }
- }()
- select {
- case addr := <-notif:
- log.Info("agent: Started HTTP Server on %s\n", addr)
- return nil
- case <-time.After(time.Second):
- return fmt.Errorf("agent: timeout starting HTTP servers")
- }
- }
- // ShutDown agent
- func (a *Agent) ShutDown(ctx context.Context) error {
- errmsg := make([]string, 0)
- for _, hsrv := range a.httpServers {
- if err := hsrv.Shutdown(ctx); err != nil {
- errmsg = append(errmsg, err.Error())
- }
- }
- for _, dsrv := range a.dnsServers {
- if err := dsrv.Shutdown(); err != nil {
- errmsg = append(errmsg, err.Error())
- }
- }
- err := a.backend.Close(ctx)
- if err != nil {
- errmsg = append(errmsg, err.Error())
- }
- if len(errmsg) > 0 {
- return fmt.Errorf("%s", strings.Join(errmsg, "\n"))
- }
- return nil
- }
- // DefaultSel default selector from config
- func (a *Agent) DefaultSel() backend.Selector {
- return backend.Selector{
- Env: env.DeployEnv,
- Region: env.Region,
- Zone: env.Zone,
- }
- }
|