main.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "os/signal"
  8. "syscall"
  9. "time"
  10. "go-common/library/ecode"
  11. epb "go-common/library/ecode/pb"
  12. "go-common/library/log"
  13. "go-common/library/net/rpc/warden"
  14. pb "go-common/library/net/rpc/warden/proto/testproto"
  15. xtime "go-common/library/time"
  16. "github.com/golang/protobuf/ptypes"
  17. "google.golang.org/grpc"
  18. )
  19. type helloServer struct {
  20. addr string
  21. }
  22. func (s *helloServer) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  23. if in.Name == "err_detail_test" {
  24. any, _ := ptypes.MarshalAny(&pb.HelloReply{Success: true, Message: "this is test detail"})
  25. err := epb.From(ecode.AccessDenied)
  26. err.ErrDetail = any
  27. return nil, err
  28. }
  29. return &pb.HelloReply{Message: fmt.Sprintf("hello %s from %s", in.Name, s.addr)}, nil
  30. }
  31. func (s *helloServer) StreamHello(ss pb.Greeter_StreamHelloServer) error {
  32. for i := 0; i < 3; i++ {
  33. in, err := ss.Recv()
  34. if err == io.EOF {
  35. return nil
  36. }
  37. if err != nil {
  38. return err
  39. }
  40. ret := &pb.HelloReply{Message: "Hello " + in.Name, Success: true}
  41. err = ss.Send(ret)
  42. if err != nil {
  43. return err
  44. }
  45. }
  46. return nil
  47. }
  48. func runServer(addr string) *warden.Server {
  49. server := warden.NewServer(&warden.ServerConfig{
  50. //服务端每个请求的默认超时时间
  51. Timeout: xtime.Duration(time.Second),
  52. })
  53. server.Use(middleware())
  54. pb.RegisterGreeterServer(server.Server(), &helloServer{addr: addr})
  55. go func() {
  56. err := server.Run(addr)
  57. if err != nil {
  58. panic("run server failed!" + err.Error())
  59. }
  60. }()
  61. return server
  62. }
  63. func main() {
  64. log.Init(&log.Config{Stdout: true})
  65. server := runServer("0.0.0.0:8080")
  66. signalHandler(server)
  67. }
  68. //类似于中间件
  69. func middleware() grpc.UnaryServerInterceptor {
  70. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
  71. //记录调用方法
  72. log.Info("method:%s", info.FullMethod)
  73. //call chain
  74. resp, err = handler(ctx, req)
  75. return
  76. }
  77. }
  78. func signalHandler(s *warden.Server) {
  79. var (
  80. ch = make(chan os.Signal, 1)
  81. )
  82. signal.Notify(ch, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
  83. for {
  84. si := <-ch
  85. switch si {
  86. case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
  87. log.Info("get a signal %s, stop the consume process", si.String())
  88. ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
  89. defer cancel()
  90. //gracefully shutdown with timeout
  91. s.Shutdown(ctx)
  92. return
  93. case syscall.SIGHUP:
  94. default:
  95. return
  96. }
  97. }
  98. }