server.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. package warden
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "math"
  7. "net"
  8. "os"
  9. "sync"
  10. "time"
  11. "go-common/library/conf/dsn"
  12. "go-common/library/log"
  13. nmd "go-common/library/net/metadata"
  14. "go-common/library/net/trace"
  15. xtime "go-common/library/time"
  16. //this package is for json format response
  17. _ "go-common/library/net/rpc/warden/encoding/json"
  18. "go-common/library/net/rpc/warden/status"
  19. "github.com/pkg/errors"
  20. "google.golang.org/grpc"
  21. "google.golang.org/grpc/keepalive"
  22. "google.golang.org/grpc/metadata"
  23. "google.golang.org/grpc/peer"
  24. "google.golang.org/grpc/reflection"
  25. )
  26. const (
  27. _family = "grpc"
  28. _noUser = "no_user"
  29. )
  30. var (
  31. _grpcDSN string
  32. _defaultSerConf = &ServerConfig{
  33. Network: "tcp",
  34. Addr: "0.0.0.0:9000",
  35. Timeout: xtime.Duration(time.Second),
  36. IdleTimeout: xtime.Duration(time.Second * 60),
  37. MaxLifeTime: xtime.Duration(time.Hour * 2),
  38. ForceCloseWait: xtime.Duration(time.Second * 20),
  39. KeepAliveInterval: xtime.Duration(time.Second * 60),
  40. KeepAliveTimeout: xtime.Duration(time.Second * 20),
  41. }
  42. _abortIndex int8 = math.MaxInt8 / 2
  43. )
  44. // ServerConfig is rpc server conf.
  45. type ServerConfig struct {
  46. // Network is grpc listen network,default value is tcp
  47. Network string `dsn:"network"`
  48. // Addr is grpc listen addr,default value is 0.0.0.0:9000
  49. Addr string `dsn:"address"`
  50. // Timeout is context timeout for per rpc call.
  51. Timeout xtime.Duration `dsn:"query.timeout"`
  52. // IdleTimeout is a duration for the amount of time after which an idle connection would be closed by sending a GoAway.
  53. // Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment.
  54. IdleTimeout xtime.Duration `dsn:"query.idleTimeout"`
  55. // MaxLifeTime is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway.
  56. // A random jitter of +/-10% will be added to MaxConnectionAge to spread out connection storms.
  57. MaxLifeTime xtime.Duration `dsn:"query.maxLife"`
  58. // ForceCloseWait is an additive period after MaxLifeTime after which the connection will be forcibly closed.
  59. ForceCloseWait xtime.Duration `dsn:"query.closeWait"`
  60. // KeepAliveInterval is after a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive.
  61. KeepAliveInterval xtime.Duration `dsn:"query.keepaliveInterval"`
  62. // KeepAliveTimeout is After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that
  63. // the connection is closed.
  64. KeepAliveTimeout xtime.Duration `dsn:"query.keepaliveTimeout"`
  65. }
  66. // Server is the framework's server side instance, it contains the GrpcServer, interceptor and interceptors.
  67. // Create an instance of Server, by using NewServer().
  68. type Server struct {
  69. conf *ServerConfig
  70. mutex sync.RWMutex
  71. server *grpc.Server
  72. handlers []grpc.UnaryServerInterceptor
  73. }
  74. // handle return a new unary server interceptor for OpenTracing\Logging\LinkTimeout.
  75. func (s *Server) handle() grpc.UnaryServerInterceptor {
  76. return func(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
  77. var (
  78. cancel func()
  79. caller string
  80. addr string
  81. color string
  82. remote string
  83. remotePort string
  84. )
  85. s.mutex.RLock()
  86. conf := s.conf
  87. s.mutex.RUnlock()
  88. // get derived timeout from grpc context,
  89. // compare with the warden configured,
  90. // and use the minimum one
  91. timeout := time.Duration(conf.Timeout)
  92. if dl, ok := ctx.Deadline(); ok {
  93. ctimeout := time.Until(dl)
  94. if ctimeout-time.Millisecond*20 > 0 {
  95. ctimeout = ctimeout - time.Millisecond*20
  96. }
  97. if timeout > ctimeout {
  98. timeout = ctimeout
  99. }
  100. }
  101. ctx, cancel = context.WithTimeout(ctx, timeout)
  102. defer cancel()
  103. // get grpc metadata(trace & remote_ip & color)
  104. var t trace.Trace
  105. if gmd, ok := metadata.FromIncomingContext(ctx); ok {
  106. t, _ = trace.Extract(trace.GRPCFormat, gmd)
  107. if strs, ok := gmd[nmd.Color]; ok {
  108. color = strs[0]
  109. }
  110. if strs, ok := gmd[nmd.RemoteIP]; ok {
  111. remote = strs[0]
  112. }
  113. if callers, ok := gmd[nmd.Caller]; ok {
  114. caller = callers[0]
  115. }
  116. if remotePorts, ok := gmd[nmd.RemotePort]; ok {
  117. remotePort = remotePorts[0]
  118. }
  119. }
  120. if t == nil {
  121. t = trace.New(args.FullMethod)
  122. } else {
  123. t.SetTitle(args.FullMethod)
  124. }
  125. if pr, ok := peer.FromContext(ctx); ok {
  126. addr = pr.Addr.String()
  127. t.SetTag(trace.String(trace.TagAddress, addr))
  128. }
  129. defer t.Finish(&err)
  130. // use common meta data context instead of grpc context
  131. ctx = nmd.NewContext(ctx, nmd.MD{
  132. nmd.Trace: t,
  133. nmd.Color: color,
  134. nmd.RemoteIP: remote,
  135. nmd.Caller: caller,
  136. nmd.RemotePort: remotePort,
  137. })
  138. resp, err = handler(ctx, req)
  139. // monitor & logging
  140. if caller == "" {
  141. caller = _noUser
  142. }
  143. return resp, status.FromError(err).Err()
  144. }
  145. }
  146. func init() {
  147. addFlag(flag.CommandLine)
  148. }
  149. func addFlag(fs *flag.FlagSet) {
  150. v := os.Getenv("GRPC")
  151. if v == "" {
  152. v = "tcp://0.0.0.0:9000/?timeout=1s&idle_timeout=60s"
  153. }
  154. fs.StringVar(&_grpcDSN, "grpc", v, "listen grpc dsn, or use GRPC env variable.")
  155. fs.Var(&_grpcTarget, "grpc.target", "usage: -grpc.target=seq.service=127.0.0.1:9000 -grpc.target=fav.service=192.168.10.1:9000")
  156. }
  157. func parseDSN(rawdsn string) *ServerConfig {
  158. conf := new(ServerConfig)
  159. d, err := dsn.Parse(rawdsn)
  160. if err != nil {
  161. panic(errors.WithMessage(err, fmt.Sprintf("warden: invalid dsn: %s", rawdsn)))
  162. }
  163. if _, err = d.Bind(conf); err != nil {
  164. panic(errors.WithMessage(err, fmt.Sprintf("warden: invalid dsn: %s", rawdsn)))
  165. }
  166. return conf
  167. }
  168. // NewServer returns a new blank Server instance with a default server interceptor.
  169. func NewServer(conf *ServerConfig, opt ...grpc.ServerOption) (s *Server) {
  170. if conf == nil {
  171. if !flag.Parsed() {
  172. fmt.Fprint(os.Stderr, "[warden] please call flag.Parse() before Init warden server, some configure may not effect\n")
  173. }
  174. conf = parseDSN(_grpcDSN)
  175. } else {
  176. fmt.Fprintf(os.Stderr, "[warden] config is Deprecated, argument will be ignored. please use -grpc flag or GRPC env to configure warden server.\n")
  177. }
  178. s = new(Server)
  179. if err := s.SetConfig(conf); err != nil {
  180. panic(errors.Errorf("warden: set config failed!err: %s", err.Error()))
  181. }
  182. keepParam := grpc.KeepaliveParams(keepalive.ServerParameters{
  183. MaxConnectionIdle: time.Duration(s.conf.IdleTimeout),
  184. MaxConnectionAgeGrace: time.Duration(s.conf.ForceCloseWait),
  185. Time: time.Duration(s.conf.KeepAliveInterval),
  186. Timeout: time.Duration(s.conf.KeepAliveTimeout),
  187. MaxConnectionAge: time.Duration(s.conf.MaxLifeTime),
  188. })
  189. opt = append(opt, keepParam, grpc.UnaryInterceptor(s.interceptor))
  190. s.server = grpc.NewServer(opt...)
  191. s.Use(s.recovery(), s.handle(), serverLogging(), s.stats(), s.validate())
  192. return
  193. }
  194. // SetConfig hot reloads server config
  195. func (s *Server) SetConfig(conf *ServerConfig) (err error) {
  196. if conf == nil {
  197. conf = _defaultSerConf
  198. }
  199. if conf.Timeout <= 0 {
  200. conf.Timeout = xtime.Duration(time.Second)
  201. }
  202. if conf.IdleTimeout <= 0 {
  203. conf.IdleTimeout = xtime.Duration(time.Second * 60)
  204. }
  205. if conf.MaxLifeTime <= 0 {
  206. conf.MaxLifeTime = xtime.Duration(time.Hour * 2)
  207. }
  208. if conf.ForceCloseWait <= 0 {
  209. conf.ForceCloseWait = xtime.Duration(time.Second * 20)
  210. }
  211. if conf.KeepAliveInterval <= 0 {
  212. conf.KeepAliveInterval = xtime.Duration(time.Second * 60)
  213. }
  214. if conf.KeepAliveTimeout <= 0 {
  215. conf.KeepAliveTimeout = xtime.Duration(time.Second * 20)
  216. }
  217. if conf.Addr == "" {
  218. conf.Addr = "0.0.0.0:9000"
  219. }
  220. if conf.Network == "" {
  221. conf.Network = "tcp"
  222. }
  223. s.mutex.Lock()
  224. s.conf = conf
  225. s.mutex.Unlock()
  226. return nil
  227. }
  228. // interceptor is a single interceptor out of a chain of many interceptors.
  229. // Execution is done in left-to-right order, including passing of context.
  230. // For example ChainUnaryServer(one, two, three) will execute one before two before three, and three
  231. // will see context changes of one and two.
  232. func (s *Server) interceptor(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  233. var (
  234. i int
  235. chain grpc.UnaryHandler
  236. )
  237. n := len(s.handlers)
  238. if n == 0 {
  239. return handler(ctx, req)
  240. }
  241. chain = func(ic context.Context, ir interface{}) (interface{}, error) {
  242. if i == n-1 {
  243. return handler(ic, ir)
  244. }
  245. i++
  246. return s.handlers[i](ic, ir, args, chain)
  247. }
  248. return s.handlers[0](ctx, req, args, chain)
  249. }
  250. // Server return the grpc server for registering service.
  251. func (s *Server) Server() *grpc.Server {
  252. return s.server
  253. }
  254. // Use attachs a global inteceptor to the server.
  255. // For example, this is the right place for a rate limiter or error management inteceptor.
  256. func (s *Server) Use(handlers ...grpc.UnaryServerInterceptor) *Server {
  257. finalSize := len(s.handlers) + len(handlers)
  258. if finalSize >= int(_abortIndex) {
  259. panic("warden: server use too many handlers")
  260. }
  261. mergedHandlers := make([]grpc.UnaryServerInterceptor, finalSize)
  262. copy(mergedHandlers, s.handlers)
  263. copy(mergedHandlers[len(s.handlers):], handlers)
  264. s.handlers = mergedHandlers
  265. return s
  266. }
  267. // Run create a tcp listener and start goroutine for serving each incoming request.
  268. // Run will return a non-nil error unless Stop or GracefulStop is called.
  269. func (s *Server) Run(addr string) error {
  270. lis, err := net.Listen("tcp", addr)
  271. if err != nil {
  272. err = errors.WithStack(err)
  273. log.Error("failed to listen: %v", err)
  274. return err
  275. }
  276. reflection.Register(s.server)
  277. return s.Serve(lis)
  278. }
  279. // RunUnix create a unix listener and start goroutine for serving each incoming request.
  280. // RunUnix will return a non-nil error unless Stop or GracefulStop is called.
  281. func (s *Server) RunUnix(file string) error {
  282. lis, err := net.Listen("unix", file)
  283. if err != nil {
  284. err = errors.WithStack(err)
  285. log.Error("failed to listen: %v", err)
  286. return err
  287. }
  288. reflection.Register(s.server)
  289. return s.Serve(lis)
  290. }
  291. // Start create a new goroutine run server with configured listen addr
  292. // will panic if any error happend
  293. // return server itself
  294. func (s *Server) Start() (*Server, error) {
  295. lis, err := net.Listen(s.conf.Network, s.conf.Addr)
  296. if err != nil {
  297. return nil, err
  298. }
  299. reflection.Register(s.server)
  300. go func() {
  301. if err := s.Serve(lis); err != nil {
  302. panic(err)
  303. }
  304. }()
  305. return s, nil
  306. }
  307. // Serve accepts incoming connections on the listener lis, creating a new
  308. // ServerTransport and service goroutine for each.
  309. // Serve will return a non-nil error unless Stop or GracefulStop is called.
  310. func (s *Server) Serve(lis net.Listener) error {
  311. return s.server.Serve(lis)
  312. }
  313. // Shutdown stops the server gracefully. It stops the server from
  314. // accepting new connections and RPCs and blocks until all the pending RPCs are
  315. // finished or the context deadline is reached.
  316. func (s *Server) Shutdown(ctx context.Context) (err error) {
  317. ch := make(chan struct{})
  318. go func() {
  319. s.server.GracefulStop()
  320. close(ch)
  321. }()
  322. select {
  323. case <-ctx.Done():
  324. s.server.Stop()
  325. err = ctx.Err()
  326. case <-ch:
  327. }
  328. return
  329. }