agent.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package agent
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "net/http"
  7. "os"
  8. "strings"
  9. "sync"
  10. "time"
  11. "go-common/app/service/main/bns/agent/backend"
  12. "go-common/app/service/main/bns/conf"
  13. "go-common/library/conf/env"
  14. "go-common/library/log"
  15. "go-common/library/stat/prom"
  16. )
  17. // Agent easyns agent
  18. type Agent struct {
  19. // plugable easyns server backend
  20. backend backend.Backend
  21. // agent id, now it is os hostname
  22. agentID string
  23. // agent cfg
  24. cfg *conf.Config
  25. // agent local cache, distributed by region, zone, env
  26. //caches cache.LocalCaches
  27. // httpAddrs are the addresses per protocol the HTTP server binds to
  28. httpAddrs []conf.ProtoAddr
  29. // httpServers provides the HTTP API on various endpoints
  30. httpServers []*HTTPServer
  31. // dnsAddr is the address the DNS server binds to
  32. dnsAddrs []conf.ProtoAddr
  33. // dnsServer provides the DNS API
  34. dnsServers []*DNSServer
  35. // wgServers is the wait group for all HTTP servers
  36. wgServers sync.WaitGroup
  37. }
  38. // New agent
  39. func New(c *conf.Config) (*Agent, error) {
  40. hostname, err := os.Hostname()
  41. if err != nil {
  42. return nil, fmt.Errorf("get hostname as agent id failed: %s", err)
  43. }
  44. httpAddrs, err := c.HTTPAddrs()
  45. if err != nil {
  46. return nil, fmt.Errorf("invalid HTTP bind address: %s", err)
  47. }
  48. dnsAddrs, err := c.DNSAddrs()
  49. if err != nil {
  50. return nil, fmt.Errorf("invalid DNS bind address: %s", err)
  51. }
  52. // var caches cache.LocalCaches
  53. a := &Agent{
  54. cfg: c,
  55. agentID: hostname,
  56. httpAddrs: httpAddrs,
  57. dnsAddrs: dnsAddrs,
  58. }
  59. return a, nil
  60. }
  61. // Start agent
  62. func (a *Agent) Start() error {
  63. // start DNS servers
  64. if err := a.listenAndServeDNS(); err != nil {
  65. return err
  66. }
  67. // listen HTTP
  68. httpln, err := a.listenHTTP(a.httpAddrs)
  69. if err != nil {
  70. return err
  71. }
  72. // initial backend
  73. a.backend, err = backend.New(a.cfg.Backend.Backend, a.cfg.Backend.Config)
  74. if err != nil {
  75. return err
  76. }
  77. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  78. defer cancel()
  79. // make sure backend is available
  80. if err = a.backend.Ping(ctx); err != nil {
  81. return err
  82. }
  83. // start serving http servers
  84. for _, l := range httpln {
  85. srv := NewHTTPServer(l.Addr().String(), a)
  86. if err := a.serveHTTP(l, srv); err != nil {
  87. return err
  88. }
  89. a.httpServers = append(a.httpServers, srv)
  90. }
  91. return nil
  92. }
  93. // Query name
  94. func (a *Agent) Query(name string) ([]*backend.Instance, error) {
  95. target, sel, err := backend.ParseName(name, a.DefaultSel())
  96. if err != nil {
  97. log.Error("dns: parse name failed! name: %s, err: %s", name, err)
  98. return nil, err
  99. }
  100. // TODO
  101. inss, err := a.backend.Query(context.Background(), target, sel, backend.Metadata{})
  102. if err != nil {
  103. prom.BusinessErrCount.Incr("bns:query")
  104. }
  105. return inss, err
  106. }
  107. func (a *Agent) listenAndServeDNS() error {
  108. notif := make(chan conf.ProtoAddr, len(a.dnsAddrs))
  109. for _, p := range a.dnsAddrs {
  110. p := p // capture loop var
  111. // create server
  112. svr, err := NewDNSServer(a, a.cfg.DNS)
  113. if err != nil {
  114. return err
  115. }
  116. a.dnsServers = append(a.dnsServers, svr)
  117. // start server
  118. a.wgServers.Add(1)
  119. go func() {
  120. defer a.wgServers.Done()
  121. err := svr.ListenAndServe(p.Net, p.Addr, func() { notif <- p })
  122. if err != nil && !strings.Contains(err.Error(), "accept") {
  123. log.Error("agent: Error starting DNS server %s (%s): %v", p.Addr, p.Net, err)
  124. }
  125. }()
  126. }
  127. // wait for servers to be up
  128. timeout := time.After(time.Second)
  129. for range a.dnsAddrs {
  130. select {
  131. case p := <-notif:
  132. log.Info("agent: Started DNS server %s (%s)", p.Addr, p.Net)
  133. continue
  134. case <-timeout:
  135. return fmt.Errorf("agent: timeout starting DNS servers")
  136. }
  137. }
  138. return nil
  139. }
  140. func (a *Agent) listenHTTP(addrs []conf.ProtoAddr) ([]net.Listener, error) {
  141. var ln []net.Listener
  142. for _, p := range addrs {
  143. var l net.Listener
  144. var err error
  145. switch {
  146. case p.Net == "unix":
  147. l, err = a.listenSocket(p.Addr)
  148. case p.Net == "tcp" && p.Proto == "http":
  149. l, err = net.Listen("tcp", p.Addr)
  150. default:
  151. return nil, fmt.Errorf("%s:%s listener not supported", p.Net, p.Proto)
  152. }
  153. if err != nil {
  154. for _, l := range ln {
  155. l.Close()
  156. }
  157. return nil, err
  158. }
  159. if tcpl, ok := l.(*net.TCPListener); ok {
  160. l = &tcpKeepAliveListener{tcpl}
  161. }
  162. ln = append(ln, l)
  163. }
  164. return ln, nil
  165. }
  166. // tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
  167. // connections. It's used by NewHttpServer so dead TCP connections
  168. // eventually go away.
  169. type tcpKeepAliveListener struct {
  170. *net.TCPListener
  171. }
  172. func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
  173. tc, err := ln.AcceptTCP()
  174. if err != nil {
  175. return
  176. }
  177. tc.SetKeepAlive(true)
  178. tc.SetKeepAlivePeriod(30 * time.Second)
  179. return tc, nil
  180. }
  181. func (a *Agent) listenSocket(path string) (net.Listener, error) {
  182. if _, err := os.Stat(path); !os.IsNotExist(err) {
  183. log.Warn("agent: Replacing socket %q\n", path)
  184. }
  185. if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
  186. return nil, fmt.Errorf("error removing socket file: %s", err)
  187. }
  188. l, err := net.Listen("unix", path)
  189. if err != nil {
  190. return nil, err
  191. }
  192. // TODO: set file permissions.
  193. return l, nil
  194. }
  195. func (a *Agent) serveHTTP(l net.Listener, srv *HTTPServer) error {
  196. srv.proto = "http"
  197. notif := make(chan string)
  198. a.wgServers.Add(1)
  199. go func() {
  200. defer a.wgServers.Done()
  201. notif <- srv.Addr
  202. err := srv.Serve(l)
  203. if err != nil && err != http.ErrServerClosed {
  204. log.Error("agent: Error starting http service: %s\n", err.Error())
  205. }
  206. }()
  207. select {
  208. case addr := <-notif:
  209. log.Info("agent: Started HTTP Server on %s\n", addr)
  210. return nil
  211. case <-time.After(time.Second):
  212. return fmt.Errorf("agent: timeout starting HTTP servers")
  213. }
  214. }
  215. // ShutDown agent
  216. func (a *Agent) ShutDown(ctx context.Context) error {
  217. errmsg := make([]string, 0)
  218. for _, hsrv := range a.httpServers {
  219. if err := hsrv.Shutdown(ctx); err != nil {
  220. errmsg = append(errmsg, err.Error())
  221. }
  222. }
  223. for _, dsrv := range a.dnsServers {
  224. if err := dsrv.Shutdown(); err != nil {
  225. errmsg = append(errmsg, err.Error())
  226. }
  227. }
  228. err := a.backend.Close(ctx)
  229. if err != nil {
  230. errmsg = append(errmsg, err.Error())
  231. }
  232. if len(errmsg) > 0 {
  233. return fmt.Errorf("%s", strings.Join(errmsg, "\n"))
  234. }
  235. return nil
  236. }
  237. // DefaultSel default selector from config
  238. func (a *Agent) DefaultSel() backend.Selector {
  239. return backend.Selector{
  240. Env: env.DeployEnv,
  241. Region: env.Region,
  242. Zone: env.Zone,
  243. }
  244. }