server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. package blademaster
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "net"
  7. "net/http"
  8. "os"
  9. "regexp"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "go-common/library/conf/dsn"
  15. "go-common/library/log"
  16. "go-common/library/net/ip"
  17. "go-common/library/net/metadata"
  18. "go-common/library/stat"
  19. xtime "go-common/library/time"
  20. "github.com/pkg/errors"
  21. )
  22. const (
  23. defaultMaxMemory = 32 << 20 // 32 MB
  24. )
  25. var (
  26. _ IRouter = &Engine{}
  27. stats = stat.HTTPServer
  28. _httpDSN string
  29. )
  30. func init() {
  31. addFlag(flag.CommandLine)
  32. }
  33. func addFlag(fs *flag.FlagSet) {
  34. v := os.Getenv("HTTP")
  35. if v == "" {
  36. v = "tcp://0.0.0.0:8000/?timeout=1s"
  37. }
  38. fs.StringVar(&_httpDSN, "http", v, "listen http dsn, or use HTTP env variable.")
  39. }
  40. func parseDSN(rawdsn string) *ServerConfig {
  41. conf := new(ServerConfig)
  42. d, err := dsn.Parse(rawdsn)
  43. if err != nil {
  44. panic(errors.Wrapf(err, "blademaster: invalid dsn: %s", rawdsn))
  45. }
  46. if _, err = d.Bind(conf); err != nil {
  47. panic(errors.Wrapf(err, "blademaster: invalid dsn: %s", rawdsn))
  48. }
  49. return conf
  50. }
  51. // Handler responds to an HTTP request.
  52. type Handler interface {
  53. ServeHTTP(c *Context)
  54. }
  55. // HandlerFunc http request handler function.
  56. type HandlerFunc func(*Context)
  57. // ServeHTTP calls f(ctx).
  58. func (f HandlerFunc) ServeHTTP(c *Context) {
  59. f(c)
  60. }
  61. // ServerConfig is the bm server config model
  62. type ServerConfig struct {
  63. Network string `dsn:"network"`
  64. // FIXME: rename to Address
  65. Addr string `dsn:"address"`
  66. Timeout xtime.Duration `dsn:"query.timeout"`
  67. ReadTimeout xtime.Duration `dsn:"query.readTimeout"`
  68. WriteTimeout xtime.Duration `dsn:"query.writeTimeout"`
  69. }
  70. // MethodConfig is
  71. type MethodConfig struct {
  72. Timeout xtime.Duration
  73. }
  74. // Start listen and serve bm engine by given DSN.
  75. func (engine *Engine) Start() error {
  76. conf := engine.conf
  77. l, err := net.Listen(conf.Network, conf.Addr)
  78. if err != nil {
  79. errors.Wrapf(err, "blademaster: listen tcp: %s", conf.Addr)
  80. return err
  81. }
  82. log.Info("blademaster: start http listen addr: %s", conf.Addr)
  83. server := &http.Server{
  84. ReadTimeout: time.Duration(conf.ReadTimeout),
  85. WriteTimeout: time.Duration(conf.WriteTimeout),
  86. }
  87. go func() {
  88. if err := engine.RunServer(server, l); err != nil {
  89. if errors.Cause(err) == http.ErrServerClosed {
  90. log.Info("blademaster: server closed")
  91. return
  92. }
  93. panic(errors.Wrapf(err, "blademaster: engine.ListenServer(%+v, %+v)", server, l))
  94. }
  95. }()
  96. return nil
  97. }
  98. // Engine is the framework's instance, it contains the muxer, middleware and configuration settings.
  99. // Create an instance of Engine, by using New() or Default()
  100. type Engine struct {
  101. RouterGroup
  102. lock sync.RWMutex
  103. conf *ServerConfig
  104. address string
  105. mux *http.ServeMux // http mux router
  106. server atomic.Value // store *http.Server
  107. metastore map[string]map[string]interface{} // metastore is the path as key and the metadata of this path as value, it export via /metadata
  108. pcLock sync.RWMutex
  109. methodConfigs map[string]*MethodConfig
  110. injections []injection
  111. }
  112. type injection struct {
  113. pattern *regexp.Regexp
  114. handlers []HandlerFunc
  115. }
  116. // New returns a new blank Engine instance without any middleware attached.
  117. //
  118. // Deprecated: please use NewServer.
  119. func New() *Engine {
  120. engine := &Engine{
  121. RouterGroup: RouterGroup{
  122. Handlers: nil,
  123. basePath: "/",
  124. root: true,
  125. },
  126. address: ip.InternalIP(),
  127. conf: &ServerConfig{
  128. Timeout: xtime.Duration(time.Second),
  129. },
  130. mux: http.NewServeMux(),
  131. metastore: make(map[string]map[string]interface{}),
  132. methodConfigs: make(map[string]*MethodConfig),
  133. injections: make([]injection, 0),
  134. }
  135. engine.RouterGroup.engine = engine
  136. // NOTE add prometheus monitor location
  137. engine.addRoute("GET", "/metrics", monitor())
  138. engine.addRoute("GET", "/metadata", engine.metadata())
  139. startPerf()
  140. return engine
  141. }
  142. // NewServer returns a new blank Engine instance without any middleware attached.
  143. func NewServer(conf *ServerConfig) *Engine {
  144. if conf == nil {
  145. if !flag.Parsed() {
  146. fmt.Fprint(os.Stderr, "[blademaster] please call flag.Parse() before Init warden server, some configure may not effect.\n")
  147. }
  148. conf = parseDSN(_httpDSN)
  149. } else {
  150. fmt.Fprintf(os.Stderr, "[blademaster] config will be deprecated, argument will be ignored. please use -http flag or HTTP env to configure http server.\n")
  151. }
  152. engine := &Engine{
  153. RouterGroup: RouterGroup{
  154. Handlers: nil,
  155. basePath: "/",
  156. root: true,
  157. },
  158. address: ip.InternalIP(),
  159. mux: http.NewServeMux(),
  160. metastore: make(map[string]map[string]interface{}),
  161. methodConfigs: make(map[string]*MethodConfig),
  162. }
  163. if err := engine.SetConfig(conf); err != nil {
  164. panic(err)
  165. }
  166. engine.RouterGroup.engine = engine
  167. // NOTE add prometheus monitor location
  168. engine.addRoute("GET", "/metrics", monitor())
  169. engine.addRoute("GET", "/metadata", engine.metadata())
  170. startPerf()
  171. return engine
  172. }
  173. // SetMethodConfig is used to set config on specified path
  174. func (engine *Engine) SetMethodConfig(path string, mc *MethodConfig) {
  175. engine.pcLock.Lock()
  176. engine.methodConfigs[path] = mc
  177. engine.pcLock.Unlock()
  178. }
  179. // DefaultServer returns an Engine instance with the Recovery, Logger and CSRF middleware already attached.
  180. func DefaultServer(conf *ServerConfig) *Engine {
  181. engine := NewServer(conf)
  182. engine.Use(Recovery(), Trace(), Logger(), CSRF(), Mobile())
  183. return engine
  184. }
  185. // Default returns an Engine instance with the Recovery, Logger and CSRF middleware already attached.
  186. //
  187. // Deprecated: please use DefaultServer.
  188. func Default() *Engine {
  189. engine := New()
  190. engine.Use(Recovery(), Trace(), Logger(), CSRF(), Mobile())
  191. return engine
  192. }
  193. func (engine *Engine) addRoute(method, path string, handlers ...HandlerFunc) {
  194. if path[0] != '/' {
  195. panic("blademaster: path must begin with '/'")
  196. }
  197. if method == "" {
  198. panic("blademaster: HTTP method can not be empty")
  199. }
  200. if len(handlers) == 0 {
  201. panic("blademaster: there must be at least one handler")
  202. }
  203. if _, ok := engine.metastore[path]; !ok {
  204. engine.metastore[path] = make(map[string]interface{})
  205. }
  206. engine.metastore[path]["method"] = method
  207. engine.mux.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) {
  208. c := &Context{
  209. Context: nil,
  210. engine: engine,
  211. index: -1,
  212. handlers: nil,
  213. Keys: nil,
  214. method: "",
  215. Error: nil,
  216. }
  217. c.Request = req
  218. c.Writer = w
  219. c.handlers = handlers
  220. c.method = method
  221. engine.handleContext(c)
  222. })
  223. }
  224. // SetConfig is used to set the engine configuration.
  225. // Only the valid config will be loaded.
  226. func (engine *Engine) SetConfig(conf *ServerConfig) (err error) {
  227. if conf.Timeout <= 0 {
  228. return errors.New("blademaster: config timeout must greater than 0")
  229. }
  230. if conf.Network == "" {
  231. conf.Network = "tcp"
  232. }
  233. engine.lock.Lock()
  234. engine.conf = conf
  235. engine.lock.Unlock()
  236. return
  237. }
  238. func (engine *Engine) methodConfig(path string) *MethodConfig {
  239. engine.pcLock.RLock()
  240. mc := engine.methodConfigs[path]
  241. engine.pcLock.RUnlock()
  242. return mc
  243. }
  244. func (engine *Engine) handleContext(c *Context) {
  245. var cancel func()
  246. req := c.Request
  247. ctype := req.Header.Get("Content-Type")
  248. switch {
  249. case strings.Contains(ctype, "multipart/form-data"):
  250. req.ParseMultipartForm(defaultMaxMemory)
  251. default:
  252. req.ParseForm()
  253. }
  254. // get derived timeout from http request header,
  255. // compare with the engine configured,
  256. // and use the minimum one
  257. engine.lock.RLock()
  258. tm := time.Duration(engine.conf.Timeout)
  259. engine.lock.RUnlock()
  260. // the method config is preferred
  261. if pc := engine.methodConfig(c.Request.URL.Path); pc != nil {
  262. tm = time.Duration(pc.Timeout)
  263. }
  264. if ctm := timeout(req); ctm > 0 && tm > ctm {
  265. tm = ctm
  266. }
  267. md := metadata.MD{
  268. metadata.Color: color(req),
  269. metadata.RemoteIP: remoteIP(req),
  270. metadata.RemotePort: remotePort(req),
  271. metadata.Caller: caller(req),
  272. metadata.Mirror: mirror(req),
  273. }
  274. ctx := metadata.NewContext(context.Background(), md)
  275. if tm > 0 {
  276. c.Context, cancel = context.WithTimeout(ctx, tm)
  277. } else {
  278. c.Context, cancel = context.WithCancel(ctx)
  279. }
  280. defer cancel()
  281. c.Next()
  282. }
  283. // Router return a http.Handler for using http.ListenAndServe() directly.
  284. func (engine *Engine) Router() http.Handler {
  285. return engine.mux
  286. }
  287. // Server is used to load stored http server.
  288. func (engine *Engine) Server() *http.Server {
  289. s, ok := engine.server.Load().(*http.Server)
  290. if !ok {
  291. return nil
  292. }
  293. return s
  294. }
  295. // Shutdown the http server without interrupting active connections.
  296. func (engine *Engine) Shutdown(ctx context.Context) error {
  297. server := engine.Server()
  298. if server == nil {
  299. return errors.New("blademaster: no server")
  300. }
  301. return errors.WithStack(server.Shutdown(ctx))
  302. }
  303. // UseFunc attachs a global middleware to the router. ie. the middleware attached though UseFunc() will be
  304. // included in the handlers chain for every single request. Even 404, 405, static files...
  305. // For example, this is the right place for a logger or error management middleware.
  306. func (engine *Engine) UseFunc(middleware ...HandlerFunc) IRoutes {
  307. engine.RouterGroup.UseFunc(middleware...)
  308. return engine
  309. }
  310. // Use attachs a global middleware to the router. ie. the middleware attached though Use() will be
  311. // included in the handlers chain for every single request. Even 404, 405, static files...
  312. // For example, this is the right place for a logger or error management middleware.
  313. func (engine *Engine) Use(middleware ...Handler) IRoutes {
  314. engine.RouterGroup.Use(middleware...)
  315. return engine
  316. }
  317. // Ping is used to set the general HTTP ping handler.
  318. func (engine *Engine) Ping(handler HandlerFunc) {
  319. engine.GET("/monitor/ping", handler)
  320. }
  321. // Register is used to export metadata to discovery.
  322. func (engine *Engine) Register(handler HandlerFunc) {
  323. engine.GET("/register", handler)
  324. }
  325. // Run attaches the router to a http.Server and starts listening and serving HTTP requests.
  326. // It is a shortcut for http.ListenAndServe(addr, router)
  327. // Note: this method will block the calling goroutine indefinitely unless an error happens.
  328. func (engine *Engine) Run(addr ...string) (err error) {
  329. address := resolveAddress(addr)
  330. server := &http.Server{
  331. Addr: address,
  332. Handler: engine.mux,
  333. }
  334. engine.server.Store(server)
  335. if err = server.ListenAndServe(); err != nil {
  336. err = errors.Wrapf(err, "addrs: %v", addr)
  337. }
  338. return
  339. }
  340. // RunTLS attaches the router to a http.Server and starts listening and serving HTTPS (secure) requests.
  341. // It is a shortcut for http.ListenAndServeTLS(addr, certFile, keyFile, router)
  342. // Note: this method will block the calling goroutine indefinitely unless an error happens.
  343. func (engine *Engine) RunTLS(addr, certFile, keyFile string) (err error) {
  344. server := &http.Server{
  345. Addr: addr,
  346. Handler: engine.mux,
  347. }
  348. engine.server.Store(server)
  349. if err = server.ListenAndServeTLS(certFile, keyFile); err != nil {
  350. err = errors.Wrapf(err, "tls: %s/%s:%s", addr, certFile, keyFile)
  351. }
  352. return
  353. }
  354. // RunUnix attaches the router to a http.Server and starts listening and serving HTTP requests
  355. // through the specified unix socket (ie. a file).
  356. // Note: this method will block the calling goroutine indefinitely unless an error happens.
  357. func (engine *Engine) RunUnix(file string) (err error) {
  358. os.Remove(file)
  359. listener, err := net.Listen("unix", file)
  360. if err != nil {
  361. err = errors.Wrapf(err, "unix: %s", file)
  362. return
  363. }
  364. defer listener.Close()
  365. server := &http.Server{
  366. Handler: engine.mux,
  367. }
  368. engine.server.Store(server)
  369. if err = server.Serve(listener); err != nil {
  370. err = errors.Wrapf(err, "unix: %s", file)
  371. }
  372. return
  373. }
  374. // RunServer will serve and start listening HTTP requests by given server and listener.
  375. // Note: this method will block the calling goroutine indefinitely unless an error happens.
  376. func (engine *Engine) RunServer(server *http.Server, l net.Listener) (err error) {
  377. server.Handler = engine.mux
  378. engine.server.Store(server)
  379. if err = server.Serve(l); err != nil {
  380. err = errors.Wrapf(err, "listen server: %+v/%+v", server, l)
  381. return
  382. }
  383. return
  384. }
  385. func (engine *Engine) metadata() HandlerFunc {
  386. return func(c *Context) {
  387. c.JSON(engine.metastore, nil)
  388. }
  389. }
  390. // Inject is
  391. func (engine *Engine) Inject(pattern string, handlers ...HandlerFunc) {
  392. engine.injections = append(engine.injections, injection{
  393. pattern: regexp.MustCompile(pattern),
  394. handlers: handlers,
  395. })
  396. }