server.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669
  1. package rpc
  2. import (
  3. "bufio"
  4. ctx "context"
  5. "encoding/gob"
  6. "errors"
  7. "flag"
  8. "fmt"
  9. "io"
  10. "log"
  11. "net"
  12. "os"
  13. "reflect"
  14. "runtime"
  15. "strings"
  16. "sync"
  17. "time"
  18. "unicode"
  19. "unicode/utf8"
  20. "go-common/library/conf/dsn"
  21. "go-common/library/conf/env"
  22. xlog "go-common/library/log"
  23. "go-common/library/net/metadata"
  24. "go-common/library/net/rpc/context"
  25. "go-common/library/net/rpc/interceptor"
  26. "go-common/library/net/trace"
  27. pkgerr "github.com/pkg/errors"
  28. )
  29. var (
  30. _gorpcDSN string
  31. )
  32. func init() {
  33. addFlag(flag.CommandLine)
  34. }
  35. func addFlag(fs *flag.FlagSet) {
  36. v := os.Getenv("GORPC")
  37. if v == "" {
  38. if env.GORPCPort != "" {
  39. v = "tcp://0.0.0.0:" + env.GORPCPort
  40. } else {
  41. v = "tcp://0.0.0.0:8099"
  42. }
  43. }
  44. fs.StringVar(&_gorpcDSN, "gorpc", v, "listen go rpc dsn, or use GORPC env variable.")
  45. }
  46. func parseDSN(rawdsn string) *ServerConfig {
  47. conf := new(ServerConfig)
  48. d, err := dsn.Parse(rawdsn)
  49. if err != nil {
  50. panic(pkgerr.WithMessage(err, "net/rpc: invalid dsn"))
  51. }
  52. if _, err = d.Bind(conf); err != nil {
  53. panic(pkgerr.WithMessage(err, "net/rpc: invalid dsn"))
  54. }
  55. return conf
  56. }
  57. // ServerConfig rpc server settings.
  58. type ServerConfig struct {
  59. Proto string `dsn:"network"`
  60. Addr string `dsn:"address"`
  61. }
  62. // NewServer new a rpc server.
  63. func NewServer(c *ServerConfig) *Server {
  64. if c == nil {
  65. if !flag.Parsed() {
  66. fmt.Fprint(os.Stderr, "[net/rpc] please call flag.Parse() before Init go rpc server, some configure may not effect.\n")
  67. }
  68. c = parseDSN(_gorpcDSN)
  69. } else {
  70. fmt.Fprintf(os.Stderr, "[net/rpc] config will be deprecated, argument will be ignored. please use -gorpc flag or GORPC env to configure go rpc server.\n")
  71. }
  72. s := newServer()
  73. s.Interceptor = interceptor.NewInterceptor("")
  74. go rpcListen(c, s)
  75. return s
  76. }
  77. // rpcListen start rpc listen.
  78. func rpcListen(c *ServerConfig, s *Server) {
  79. l, err := net.Listen(c.Proto, c.Addr)
  80. if err != nil {
  81. xlog.Error("net.Listen(rpcAddr:(%v)) error(%v)", c.Addr, err)
  82. panic(err)
  83. }
  84. // if process exit, then close the rpc bind
  85. defer func() {
  86. xlog.Info("rpc addr:(%s) close", c.Addr)
  87. if err := l.Close(); err != nil {
  88. xlog.Error("listener.Close() error(%v)", err)
  89. }
  90. }()
  91. xlog.Info("start rpc listen addr: %s", c.Addr)
  92. s.Accept(l)
  93. }
  94. var (
  95. // Precompute the reflect type for error. Can't use error directly
  96. // because Typeof takes an empty interface value. This is annoying.
  97. typeOfError = reflect.TypeOf((*error)(nil)).Elem()
  98. ctxType = reflect.TypeOf((*context.Context)(nil)).Elem()
  99. )
  100. // methodType 方法类型
  101. type methodType struct {
  102. method reflect.Method //方法
  103. ArgType reflect.Type //参数类型
  104. ReplyType reflect.Type //回复类型
  105. }
  106. // service 服务
  107. type service struct {
  108. name string // name of service
  109. rcvr reflect.Value // receiver of methods for the service
  110. typ reflect.Type // type of the receiver
  111. method map[string]*methodType // registered methods 注册的方法
  112. }
  113. // Request is a header written before every RPC call. It is used internally
  114. // but documented here as an aid to debugging, such as when analyzing
  115. // network traffic.
  116. type Request struct {
  117. Color string // color
  118. RemoteIP string // remoteIP
  119. Timeout time.Duration // timeout
  120. ServiceMethod string // format: "Service.Method"
  121. Seq uint64 // sequence number chosen by client
  122. Trace TraceInfo // trace info
  123. ctx context.Context
  124. }
  125. // Auth handshake struct.
  126. type Auth struct {
  127. User string
  128. }
  129. // Response is a header written before every RPC return. It is used internally
  130. // but documented here as an aid to debugging, such as when analyzing
  131. // network traffic.
  132. type Response struct {
  133. ServiceMethod string // echoes that of the Request
  134. Seq uint64 // echoes that of the request
  135. Error string // error, if any.
  136. }
  137. // Interceptor interface.
  138. type Interceptor interface {
  139. Rate(context.Context) error
  140. Stat(context.Context, interface{}, error)
  141. Auth(context.Context, net.Addr, string) error // ip, token
  142. }
  143. // Server represents an RPC Server.
  144. type Server struct {
  145. lis net.Listener
  146. serviceMap map[string]*service
  147. Interceptor Interceptor //拦截器
  148. }
  149. // newServer returns a new Server.
  150. func newServer() *Server {
  151. return &Server{serviceMap: make(map[string]*service)}
  152. }
  153. // DefaultServer is the default instance of *Server.
  154. var DefaultServer = newServer()
  155. // Is this an exported - upper case - name?
  156. func isExported(name string) bool {
  157. rune, _ := utf8.DecodeRuneInString(name)
  158. return unicode.IsUpper(rune)
  159. }
  160. // Is this type exported or a builtin?
  161. func isExportedOrBuiltinType(t reflect.Type) bool {
  162. for t.Kind() == reflect.Ptr {
  163. t = t.Elem()
  164. }
  165. // PkgPath will be non-empty even for an exported type,
  166. // so we need to check the type name as well.
  167. return isExported(t.Name()) || t.PkgPath() == ""
  168. }
  169. // Register publishes in the server the set of methods of the
  170. // receiver value that satisfy the following conditions:
  171. // - exported method of exported type
  172. // - two arguments, both of exported type
  173. // - the second argument is a pointer
  174. // - one return value, of type error
  175. // It returns an error if the receiver is not an exported type or has
  176. // no suitable methods. It also logs the error using package log.
  177. // The client accesses each method using a string of the form "Type.Method",
  178. // where Type is the receiver's concrete type.
  179. func (server *Server) Register(rcvr interface{}) (err error) {
  180. if err = server.register(rcvr, "", false); err != nil {
  181. return
  182. }
  183. return server.register(new(pinger), _service, true)
  184. }
  185. // RegisterName is like Register but uses the provided name for the type
  186. // instead of the receiver's concrete type.
  187. func (server *Server) RegisterName(name string, rcvr interface{}) (err error) {
  188. if err = server.register(rcvr, name, true); err != nil {
  189. return
  190. }
  191. return server.register(new(pinger), _service, true)
  192. }
  193. func (server *Server) register(rcvr interface{}, name string, useName bool) error {
  194. if server.serviceMap == nil {
  195. server.serviceMap = make(map[string]*service)
  196. }
  197. s := new(service)
  198. s.typ = reflect.TypeOf(rcvr)
  199. s.rcvr = reflect.ValueOf(rcvr)
  200. sname := reflect.Indirect(s.rcvr).Type().Name()
  201. if useName {
  202. sname = name
  203. }
  204. if sname == "" {
  205. s := "rpc.Register: no service name for type " + s.typ.String()
  206. log.Print(s)
  207. return errors.New(s)
  208. }
  209. if !isExported(sname) && !useName {
  210. s := "rpc.Register: type " + sname + " is not exported"
  211. log.Print(s)
  212. return errors.New(s)
  213. }
  214. if _, present := server.serviceMap[sname]; present {
  215. return errors.New("rpc: service already defined: " + sname)
  216. }
  217. s.name = sname
  218. // Install the methods
  219. s.method = suitableMethods(s.typ, true)
  220. if len(s.method) == 0 {
  221. str := ""
  222. // To help the user, see if a pointer receiver would work.
  223. method := suitableMethods(reflect.PtrTo(s.typ), false)
  224. if len(method) != 0 {
  225. str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
  226. } else {
  227. str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
  228. }
  229. log.Print(str)
  230. return errors.New(str)
  231. }
  232. server.serviceMap[s.name] = s
  233. return nil
  234. }
  235. // suitableMethods returns suitable Rpc methods of typ, it will report
  236. // error using log if reportErr is true.
  237. func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
  238. methods := make(map[string]*methodType)
  239. for m := 0; m < typ.NumMethod(); m++ {
  240. method := typ.Method(m)
  241. mtype := method.Type
  242. mname := method.Name
  243. // Method must be exported.
  244. if method.PkgPath != "" {
  245. continue
  246. }
  247. // Method needs ins: receiver, context, *arg, *reply.
  248. if mtype.NumIn() != 4 {
  249. if reportErr {
  250. log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
  251. }
  252. continue
  253. }
  254. // First arg need not be a pointer.
  255. argType := mtype.In(1)
  256. if !argType.Implements(ctxType) {
  257. if reportErr {
  258. log.Println(mname, "argument type must implements:", ctxType)
  259. }
  260. continue
  261. }
  262. // Second arg need not be a pointer.
  263. argType = mtype.In(2)
  264. if !isExportedOrBuiltinType(argType) {
  265. if reportErr {
  266. log.Println(mname, "argument type not exported:", argType)
  267. }
  268. continue
  269. }
  270. // Thrid arg must be a pointer.
  271. replyType := mtype.In(3)
  272. if replyType.Kind() != reflect.Ptr {
  273. if reportErr {
  274. log.Println("method", mname, "reply type not a pointer:", replyType)
  275. }
  276. continue
  277. }
  278. // Reply type must be exported.
  279. if !isExportedOrBuiltinType(replyType) {
  280. if reportErr {
  281. log.Println("method", mname, "reply type not exported:", replyType)
  282. }
  283. continue
  284. }
  285. // Method needs one out.
  286. if mtype.NumOut() != 1 {
  287. if reportErr {
  288. log.Println("method", mname, "has wrong number of outs:", mtype.NumOut())
  289. }
  290. continue
  291. }
  292. // The return type of the method must be error.
  293. if returnType := mtype.Out(0); returnType != typeOfError {
  294. if reportErr {
  295. log.Println("method", mname, "returns", returnType.String(), "not error")
  296. }
  297. continue
  298. }
  299. methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
  300. }
  301. return methods
  302. }
  303. // A value sent as a placeholder for the server's response value when the server
  304. // receives an invalid request. It is never decoded by the client since the Response
  305. // contains an error when it is used.
  306. var invalidRequest = struct{}{}
  307. func (server *Server) sendResponse(c context.Context, codec *serverCodec, reply interface{}, errmsg string) {
  308. var (
  309. err error
  310. ts Response
  311. resp = &codec.resp
  312. )
  313. if errmsg != "" {
  314. reply = invalidRequest
  315. }
  316. ts.ServiceMethod = c.ServiceMethod()
  317. ts.Seq = c.Seq()
  318. ts.Error = errmsg
  319. codec.sending.Lock()
  320. // NOTE must keep resp goroutine safe
  321. *resp = ts
  322. // Encode the response header
  323. if err = codec.writeResponse(reply); err != nil {
  324. log.Println("rpc: writing response:", err)
  325. }
  326. codec.sending.Unlock()
  327. }
  328. func (s *service) call(c context.Context, server *Server, mtype *methodType, argv, replyv reflect.Value, codec *serverCodec) {
  329. var (
  330. err error
  331. errmsg string
  332. errInter interface{}
  333. cv reflect.Value
  334. returnValues []reflect.Value
  335. )
  336. t, _ := trace.FromContext(c)
  337. defer func() {
  338. if err1 := recover(); err1 != nil {
  339. err = err1.(error)
  340. errmsg = err.Error()
  341. const size = 64 << 10
  342. buf := make([]byte, size)
  343. buf = buf[:runtime.Stack(buf, false)]
  344. xlog.Error("rpc call panic: %v \n%s", err1, buf)
  345. server.sendResponse(c, codec, replyv.Interface(), errmsg)
  346. if server.Interceptor != nil {
  347. server.Interceptor.Stat(c, argv.Interface(), err)
  348. }
  349. if t != nil {
  350. t.Finish(&err)
  351. }
  352. }
  353. }()
  354. // rate limit
  355. if server.Interceptor != nil {
  356. if err = server.Interceptor.Rate(c); err != nil {
  357. errmsg = err.Error()
  358. }
  359. }
  360. if err == nil {
  361. // Invoke the method, providing a new value for the reply.
  362. cv = reflect.New(ctxType)
  363. *cv.Interface().(*context.Context) = c
  364. returnValues = mtype.method.Func.Call([]reflect.Value{s.rcvr, cv.Elem(), argv, replyv})
  365. // The return value for the method is an error.
  366. if errInter = returnValues[0].Interface(); errInter != nil {
  367. err = errInter.(error)
  368. errmsg = pkgerr.Cause(err).Error()
  369. }
  370. }
  371. server.sendResponse(c, codec, replyv.Interface(), errmsg)
  372. // stat
  373. if server.Interceptor != nil {
  374. server.Interceptor.Stat(c, argv.Interface(), err)
  375. }
  376. if t != nil {
  377. t.Finish(&err)
  378. }
  379. }
  380. type serverCodec struct {
  381. sending sync.Mutex
  382. resp Response
  383. req Request
  384. auth Auth
  385. rwc io.ReadWriteCloser
  386. dec *gob.Decoder
  387. enc *gob.Encoder
  388. encBuf *bufio.Writer
  389. addr net.Addr
  390. closed bool
  391. }
  392. func (c *serverCodec) readRequestHeader() error {
  393. return pkgerr.WithStack(c.dec.Decode(&c.req))
  394. }
  395. func (c *serverCodec) readRequestBody(body interface{}) error {
  396. return pkgerr.WithStack(c.dec.Decode(body))
  397. }
  398. func (c *serverCodec) writeResponse(body interface{}) (err error) {
  399. if err = c.enc.Encode(&c.resp); err != nil {
  400. err = pkgerr.WithStack(err)
  401. if c.encBuf.Flush() == nil {
  402. // Gob couldn't encode the header. Should not happen, so if it does,
  403. // shut down the connection to signal that the connection is broken.
  404. log.Println("rpc: gob error encoding response:", err)
  405. c.close()
  406. }
  407. return
  408. }
  409. if err = c.enc.Encode(body); err != nil {
  410. err = pkgerr.WithStack(err)
  411. if c.encBuf.Flush() == nil {
  412. // Was a gob problem encoding the body but the header has been written.
  413. // Shut down the connection to signal that the connection is broken.
  414. log.Println("rpc: gob error encoding body:", err)
  415. c.close()
  416. }
  417. return
  418. }
  419. return pkgerr.WithStack(c.encBuf.Flush())
  420. }
  421. func (c *serverCodec) close() error {
  422. if c.closed {
  423. // Only call c.rwc.Close once; otherwise the semantics are undefined.
  424. return nil
  425. }
  426. c.closed = true
  427. return c.rwc.Close()
  428. }
  429. // ServeConn runs the server on a single connection.
  430. // ServeConn blocks, serving the connection until the client hangs up.
  431. // The caller typically invokes ServeConn in a go statement.
  432. // ServeConn uses the gob wire format (see package gob) on the
  433. // connection. To use an alternate codec, use ServeCodec.
  434. func (server *Server) ServeConn(conn net.Conn) {
  435. buf := bufio.NewWriter(conn)
  436. srv := &serverCodec{
  437. rwc: conn,
  438. dec: gob.NewDecoder(conn),
  439. enc: gob.NewEncoder(buf),
  440. encBuf: buf,
  441. addr: conn.RemoteAddr(),
  442. }
  443. server.serveCodec(srv)
  444. }
  445. // serveCodec is like ServeConn but uses the specified codec to
  446. // decode requests and encode responses.
  447. func (server *Server) serveCodec(codec *serverCodec) {
  448. req := &codec.req
  449. for {
  450. // serve request
  451. service, mtype, argv, replyv, err := server.readRequest(codec)
  452. if err != nil {
  453. if err != io.EOF {
  454. log.Println("rpc:", err)
  455. }
  456. if req.ctx == nil {
  457. break
  458. }
  459. errmsg := err.Error()
  460. if req.ServiceMethod == _authServiceMethod {
  461. errmsg = ""
  462. }
  463. server.sendResponse(req.ctx, codec, invalidRequest, errmsg)
  464. continue
  465. }
  466. if req.ServiceMethod == _authServiceMethod {
  467. codec.auth = *(argv.Interface().(*Auth))
  468. req.ctx = context.NewContext(ctx.Background(), req.ServiceMethod, codec.auth.User, req.Seq)
  469. server.sendResponse(req.ctx, codec, invalidRequest, "")
  470. continue
  471. }
  472. go service.call(req.ctx, server, mtype, argv, replyv, codec)
  473. }
  474. codec.close()
  475. }
  476. func (server *Server) readRequest(codec *serverCodec) (service *service, mtype *methodType, argv, replyv reflect.Value, err error) {
  477. var req = &codec.req
  478. *req = Request{}
  479. if service, mtype, err = server.readRequestHeader(codec); err != nil {
  480. // keepreading
  481. if req.ctx == nil {
  482. return
  483. }
  484. // discard body
  485. codec.readRequestBody(nil)
  486. return
  487. }
  488. // Decode the argument value.
  489. argIsValue := false // if true, need to indirect before calling.
  490. if mtype.ArgType.Kind() == reflect.Ptr {
  491. argv = reflect.New(mtype.ArgType.Elem())
  492. } else {
  493. argv = reflect.New(mtype.ArgType)
  494. argIsValue = true
  495. }
  496. // argv guaranteed to be a pointer now.
  497. if err = codec.readRequestBody(argv.Interface()); err != nil {
  498. return
  499. }
  500. if argIsValue {
  501. argv = argv.Elem()
  502. }
  503. replyv = reflect.New(mtype.ReplyType.Elem())
  504. return
  505. }
  506. func (server *Server) readRequestHeader(codec *serverCodec) (service *service, mtype *methodType, err error) {
  507. var t trace.Trace
  508. req := &codec.req
  509. if err = codec.readRequestHeader(); err != nil {
  510. return
  511. }
  512. if t, _ = trace.Extract(nil, &req.Trace); t == nil {
  513. t = trace.New(req.ServiceMethod)
  514. }
  515. t.SetTitle(req.ServiceMethod)
  516. t.SetTag(trace.String(trace.TagAddress, codec.addr.String()))
  517. md := metadata.MD{
  518. metadata.Trace: t,
  519. metadata.Color: req.Color,
  520. metadata.RemoteIP: req.RemoteIP,
  521. metadata.Caller: req.Trace.Caller,
  522. }
  523. // FIXME(maojian) Timeout?
  524. c1 := metadata.NewContext(ctx.Background(), md)
  525. caller := codec.auth.User
  526. if caller == "" {
  527. caller = req.Trace.Caller
  528. }
  529. // NOTE ctx not nil then keepreading
  530. req.ctx = context.NewContext(c1, req.ServiceMethod, caller, req.Seq)
  531. // We read the header successfully. If we see an error now,
  532. // we can still recover and move on to the next request.
  533. dot := strings.LastIndex(req.ServiceMethod, ".")
  534. if dot < 0 {
  535. err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
  536. return
  537. }
  538. serviceName := req.ServiceMethod[:dot]
  539. methodName := req.ServiceMethod[dot+1:]
  540. // Look up the request.
  541. service = server.serviceMap[serviceName]
  542. if service == nil {
  543. err = errors.New("rpc: can't find service " + req.ServiceMethod)
  544. return
  545. }
  546. mtype = service.method[methodName]
  547. if mtype == nil {
  548. err = errors.New("rpc: can't find method " + req.ServiceMethod)
  549. }
  550. return
  551. }
  552. // Accept accepts connections on the listener and serves requests
  553. // for each incoming connection. Accept blocks until the listener
  554. // returns a non-nil error. The caller typically invokes Accept in a
  555. // go statement.
  556. func (server *Server) Accept(lis net.Listener) {
  557. for {
  558. conn, err := lis.Accept()
  559. if err != nil {
  560. log.Print("rpc.Serve: accept:", err.Error())
  561. return
  562. }
  563. go server.ServeConn(conn)
  564. }
  565. }
  566. // Close stop the rpc server.
  567. func (server *Server) Close() error {
  568. if server.lis != nil {
  569. return server.lis.Close()
  570. }
  571. return nil
  572. }
  573. // Register publishes the receiver's methods in the DefaultServer.
  574. func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
  575. // RegisterName is like Register but uses the provided name for the type
  576. // instead of the receiver's concrete type.
  577. func RegisterName(name string, rcvr interface{}) error {
  578. return DefaultServer.RegisterName(name, rcvr)
  579. }
  580. // ServeConn runs the DefaultServer on a single connection.
  581. // ServeConn blocks, serving the connection until the client hangs up.
  582. // The caller typically invokes ServeConn in a go statement.
  583. // ServeConn uses the gob wire format (see package gob) on the
  584. // connection. To use an alternate codec, use ServeCodec.
  585. func ServeConn(conn net.Conn) {
  586. DefaultServer.ServeConn(conn)
  587. }
  588. // Accept accepts connections on the listener and serves requests
  589. // to DefaultServer for each incoming connection.
  590. // Accept blocks; the caller typically invokes it in a go statement.
  591. func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
  592. const (
  593. _authServiceMethod = "inner.Auth"
  594. _pingServiceMethod = "inner.Ping"
  595. _service = "inner"
  596. )
  597. var (
  598. _pingArg = &struct{}{}
  599. )
  600. // pinger rpc ping service
  601. type pinger struct {
  602. }
  603. // Ping rpc ping.
  604. func (p *pinger) Ping(c context.Context, arg *struct{}, reply *struct{}) error {
  605. return nil
  606. }
  607. // Auth
  608. func (p *pinger) Auth(c context.Context, arg *Auth, reply *struct{}) error {
  609. return nil
  610. }