server_test.go 16 KB


  1. package warden
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math/rand"
  7. "net"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "testing"
  13. "time"
  14. "go-common/library/ecode"
  15. errpb "go-common/library/ecode/pb"
  16. "go-common/library/ecode/tip"
  17. nmd "go-common/library/net/metadata"
  18. "go-common/library/net/netutil/breaker"
  19. pb "go-common/library/net/rpc/warden/proto/testproto"
  20. xtrace "go-common/library/net/trace"
  21. xtime "go-common/library/time"
  22. "github.com/golang/protobuf/ptypes"
  23. "github.com/pkg/errors"
  24. "github.com/stretchr/testify/assert"
  25. "google.golang.org/grpc"
  26. "google.golang.org/grpc/codes"
  27. "google.golang.org/grpc/status"
  28. )
  29. func init() {
  30. tip.Init(nil)
  31. }
  32. const (
  33. _separator = "\001"
  34. )
  35. var (
  36. outPut []string
  37. _testOnce sync.Once
  38. server *Server
  39. clientConfig = ClientConfig{
  40. Dial: xtime.Duration(time.Second * 10),
  41. Timeout: xtime.Duration(time.Second * 10),
  42. Breaker: &breaker.Config{
  43. Window: xtime.Duration(3 * time.Second),
  44. Sleep: xtime.Duration(3 * time.Second),
  45. Bucket: 10,
  46. Ratio: 0.3,
  47. Request: 20,
  48. },
  49. }
  50. clientConfig2 = ClientConfig{
  51. Dial: xtime.Duration(time.Second * 10),
  52. Timeout: xtime.Duration(time.Second * 10),
  53. Breaker: &breaker.Config{
  54. Window: xtime.Duration(3 * time.Second),
  55. Sleep: xtime.Duration(3 * time.Second),
  56. Bucket: 10,
  57. Ratio: 0.3,
  58. Request: 20,
  59. },
  60. Method: map[string]*ClientConfig{`/testproto.Greeter/SayHello`: {Timeout: xtime.Duration(time.Millisecond * 200)}},
  61. }
  62. )
  63. type helloServer struct {
  64. t *testing.T
  65. }
  66. func (s *helloServer) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  67. if in.Name == "trace_test" {
  68. t, isok := xtrace.FromContext(ctx)
  69. if !isok {
  70. t = xtrace.New("test title")
  71. s.t.Fatalf("no trace extracted from server context")
  72. }
  73. newCtx := xtrace.NewContext(ctx, t)
  74. if in.Age == 0 {
  75. runClient(newCtx, &clientConfig, s.t, "trace_test", 1)
  76. }
  77. } else if in.Name == "recovery_test" {
  78. panic("test recovery")
  79. } else if in.Name == "graceful_shutdown" {
  80. time.Sleep(time.Second * 3)
  81. } else if in.Name == "timeout_test" {
  82. if in.Age > 10 {
  83. s.t.Fatalf("can not deliver requests over 10 times because of link timeout")
  84. return &pb.HelloReply{Message: "Hello " + in.Name, Success: true}, nil
  85. }
  86. time.Sleep(time.Millisecond * 10)
  87. _, err := runClient(ctx, &clientConfig, s.t, "timeout_test", in.Age+1)
  88. return &pb.HelloReply{Message: "Hello " + in.Name, Success: true}, err
  89. } else if in.Name == "timeout_test2" {
  90. if in.Age > 10 {
  91. s.t.Fatalf("can not deliver requests over 10 times because of link timeout")
  92. return &pb.HelloReply{Message: "Hello " + in.Name, Success: true}, nil
  93. }
  94. time.Sleep(time.Millisecond * 10)
  95. _, err := runClient(ctx, &clientConfig2, s.t, "timeout_test2", in.Age+1)
  96. return &pb.HelloReply{Message: "Hello " + in.Name, Success: true}, err
  97. } else if in.Name == "color_test" {
  98. if in.Age == 0 {
  99. resp, err := runClient(ctx, &clientConfig, s.t, "color_test", in.Age+1)
  100. return resp, err
  101. }
  102. color := nmd.String(ctx, nmd.Color)
  103. return &pb.HelloReply{Message: "Hello " + color, Success: true}, nil
  104. } else if in.Name == "breaker_test" {
  105. if rand.Intn(100) <= 50 {
  106. return nil, status.Errorf(codes.ResourceExhausted, "test")
  107. }
  108. return &pb.HelloReply{Message: "Hello " + in.Name, Success: true}, nil
  109. } else if in.Name == "error_detail" {
  110. any, _ := ptypes.MarshalAny(&pb.HelloReply{Success: true})
  111. return nil, &errpb.Error{
  112. ErrCode: 123456,
  113. ErrMessage: "test_error_detail",
  114. ErrDetail: any,
  115. }
  116. } else if in.Name == "ecode_status" {
  117. reply := &pb.HelloReply{Message: "status", Success: true}
  118. st, _ := ecode.Error(ecode.RequestErr, "RequestErr").WithDetails(reply)
  119. return nil, st
  120. } else if in.Name == "general_error" {
  121. return nil, fmt.Errorf("haha is error")
  122. } else if in.Name == "ecode_code_error" {
  123. return nil, ecode.CreativeArticleTagErr
  124. } else if in.Name == "pb_error_error" {
  125. return nil, &errpb.Error{ErrCode: 11122, ErrMessage: "haha"}
  126. } else if in.Name == "ecode_status_error" {
  127. return nil, ecode.Error(ecode.RequestErr, "RequestErr")
  128. } else if in.Name == "test_remote_port" {
  129. if strconv.Itoa(int(in.Age)) != nmd.String(ctx, nmd.RemotePort) {
  130. return nil, fmt.Errorf("error port %d", in.Age)
  131. }
  132. reply := &pb.HelloReply{Message: "status", Success: true}
  133. return reply, nil
  134. }
  135. return &pb.HelloReply{Message: "Hello " + in.Name, Success: true}, nil
  136. }
  137. func (s *helloServer) StreamHello(ss pb.Greeter_StreamHelloServer) error {
  138. for i := 0; i < 3; i++ {
  139. in, err := ss.Recv()
  140. if err == io.EOF {
  141. return nil
  142. }
  143. if err != nil {
  144. return err
  145. }
  146. ret := &pb.HelloReply{Message: "Hello " + in.Name, Success: true}
  147. err = ss.Send(ret)
  148. if err != nil {
  149. return err
  150. }
  151. }
  152. return nil
  153. }
  154. func runServer(t *testing.T, interceptors ...grpc.UnaryServerInterceptor) func() {
  155. return func() {
  156. server = NewServer(&ServerConfig{Addr: "127.0.0.1:8080", Timeout: xtime.Duration(time.Second)})
  157. pb.RegisterGreeterServer(server.Server(), &helloServer{t})
  158. server.Use(
  159. func(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  160. outPut = append(outPut, "1")
  161. resp, err := handler(ctx, req)
  162. outPut = append(outPut, "2")
  163. return resp, err
  164. },
  165. func(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  166. outPut = append(outPut, "3")
  167. resp, err := handler(ctx, req)
  168. outPut = append(outPut, "4")
  169. return resp, err
  170. })
  171. if _, err := server.Start(); err != nil {
  172. t.Fatal(err)
  173. }
  174. }
  175. }
  176. func runClient(ctx context.Context, cc *ClientConfig, t *testing.T, name string, age int32, interceptors ...grpc.UnaryClientInterceptor) (resp *pb.HelloReply, err error) {
  177. client := NewClient(cc)
  178. client.Use(interceptors...)
  179. conn, err := client.Dial(context.Background(), "127.0.0.1:8080")
  180. if err != nil {
  181. panic(fmt.Errorf("did not connect: %v,req: %v %v", err, name, age))
  182. }
  183. defer conn.Close()
  184. c := pb.NewGreeterClient(conn)
  185. resp, err = c.SayHello(ctx, &pb.HelloRequest{Name: name, Age: age})
  186. return
  187. }
  188. func Test_Warden(t *testing.T) {
  189. go func() {
  190. time.Sleep(time.Second * 10)
  191. panic("run test warden timeout,exit now!")
  192. }()
  193. xtrace.Init(&xtrace.Config{Addr: "127.0.0.1:9982", Proto: "udp", Timeout: xtime.Duration(time.Second * 3)})
  194. go _testOnce.Do(runServer(t))
  195. go runClient(context.Background(), &clientConfig, t, "trace_test", 0)
  196. testTrace(t, 9982, false)
  197. testInterceptorChain(t)
  198. testValidation(t)
  199. testServerRecovery(t)
  200. testClientRecovery(t)
  201. testErrorDetail(t)
  202. testECodeStatus(t)
  203. testColorPass(t)
  204. testRemotePort(t)
  205. testLinkTimeout(t)
  206. testClientConfig(t)
  207. testBreaker(t)
  208. testAllErrorCase(t)
  209. testGracefulShutDown(t)
  210. }
  211. func testValidation(t *testing.T) {
  212. _, err := runClient(context.Background(), &clientConfig, t, "", 0)
  213. if !ecode.RequestErr.Equal(err) {
  214. t.Fatalf("testValidation should return ecode.RequestErr,but is %v", err)
  215. }
  216. }
  217. func testAllErrorCase(t *testing.T) {
  218. // } else if in.Name == "general_error" {
  219. // return nil, fmt.Errorf("haha is error")
  220. // } else if in.Name == "ecode_code_error" {
  221. // return nil, ecode.CreativeArticleTagErr
  222. // } else if in.Name == "pb_error_error" {
  223. // return nil, &errpb.Error{ErrCode: 11122, ErrMessage: "haha"}
  224. // } else if in.Name == "ecode_status_error" {
  225. // return nil, ecode.Error(ecode.RequestErr, "RequestErr")
  226. // }
  227. ctx := context.Background()
  228. t.Run("general_error", func(t *testing.T) {
  229. _, err := runClient(ctx, &clientConfig, t, "general_error", 0)
  230. assert.Contains(t, err.Error(), "haha")
  231. ec := ecode.Cause(err)
  232. assert.Equal(t, -500, ec.Code())
  233. // remove this assert in future
  234. assert.Equal(t, "服务器错误", ec.Message())
  235. })
  236. t.Run("ecode_code_error", func(t *testing.T) {
  237. _, err := runClient(ctx, &clientConfig, t, "ecode_code_error", 0)
  238. ec := ecode.Cause(err)
  239. assert.Equal(t, ecode.CreativeArticleTagErr.Code(), ec.Code())
  240. // remove this assert in future
  241. assert.Equal(t, "标签错误", ec.Message())
  242. })
  243. t.Run("pb_error_error", func(t *testing.T) {
  244. _, err := runClient(ctx, &clientConfig, t, "pb_error_error", 0)
  245. ec := ecode.Cause(err)
  246. assert.Equal(t, 11122, ec.Code())
  247. assert.Equal(t, "haha", ec.Message())
  248. })
  249. t.Run("ecode_status_error", func(t *testing.T) {
  250. _, err := runClient(ctx, &clientConfig, t, "ecode_status_error", 0)
  251. ec := ecode.Cause(err)
  252. assert.Equal(t, ecode.RequestErr.Code(), ec.Code())
  253. assert.Equal(t, "RequestErr", ec.Message())
  254. })
  255. }
  256. func testBreaker(t *testing.T) {
  257. client := NewClient(&clientConfig)
  258. conn, err := client.Dial(context.Background(), "127.0.0.1:8080")
  259. if err != nil {
  260. t.Fatalf("did not connect: %v", err)
  261. }
  262. defer conn.Close()
  263. c := pb.NewGreeterClient(conn)
  264. for i := 0; i < 35; i++ {
  265. _, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: "breaker_test"})
  266. if err != nil {
  267. if ecode.ServiceUnavailable.Equal(err) {
  268. return
  269. }
  270. }
  271. }
  272. t.Fatalf("testBreaker failed!No breaker was triggered")
  273. }
  274. func testColorPass(t *testing.T) {
  275. ctx := nmd.NewContext(context.Background(), nmd.MD{
  276. nmd.Color: "red",
  277. })
  278. resp, err := runClient(ctx, &clientConfig, t, "color_test", 0)
  279. if err != nil {
  280. t.Fatalf("testColorPass return error %v", err)
  281. }
  282. if resp == nil || resp.Message != "Hello red" {
  283. t.Fatalf("testColorPass resp.Message must be red,%v", *resp)
  284. }
  285. }
  286. func testRemotePort(t *testing.T) {
  287. ctx := nmd.NewContext(context.Background(), nmd.MD{
  288. nmd.RemotePort: "8000",
  289. })
  290. _, err := runClient(ctx, &clientConfig, t, "test_remote_port", 8000)
  291. if err != nil {
  292. t.Fatalf("testRemotePort return error %v", err)
  293. }
  294. }
  295. func testLinkTimeout(t *testing.T) {
  296. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200)
  297. defer cancel()
  298. _, err := runClient(ctx, &clientConfig, t, "timeout_test", 0)
  299. if err == nil {
  300. t.Fatalf("testLinkTimeout must return error")
  301. }
  302. if !ecode.Deadline.Equal(err) {
  303. t.Fatalf("testLinkTimeout must return error RPCDeadline,err:%v", err)
  304. }
  305. }
  306. func testClientConfig(t *testing.T) {
  307. _, err := runClient(context.Background(), &clientConfig2, t, "timeout_test2", 0)
  308. if err == nil {
  309. t.Fatalf("testLinkTimeout must return error")
  310. }
  311. if !ecode.Deadline.Equal(err) {
  312. t.Fatalf("testLinkTimeout must return error RPCDeadline,err:%v", err)
  313. }
  314. }
  315. func testGracefulShutDown(t *testing.T) {
  316. wg := sync.WaitGroup{}
  317. for i := 0; i < 10; i++ {
  318. wg.Add(1)
  319. go func() {
  320. defer wg.Done()
  321. resp, err := runClient(context.Background(), &clientConfig, t, "graceful_shutdown", 0)
  322. if err != nil {
  323. panic(fmt.Errorf("run graceful_shutdown client return(%v)", err))
  324. }
  325. if !resp.Success || resp.Message != "Hello graceful_shutdown" {
  326. panic(fmt.Errorf("run graceful_shutdown client return(%v,%v)", err, *resp))
  327. }
  328. }()
  329. }
  330. go func() {
  331. time.Sleep(time.Second)
  332. ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
  333. defer cancel()
  334. server.Shutdown(ctx)
  335. }()
  336. wg.Wait()
  337. }
  338. func testClientRecovery(t *testing.T) {
  339. ctx := context.Background()
  340. client := NewClient(&clientConfig)
  341. client.Use(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (ret error) {
  342. invoker(ctx, method, req, reply, cc, opts...)
  343. panic("client recovery test")
  344. })
  345. conn, err := client.Dial(ctx, "127.0.0.1:8080")
  346. if err != nil {
  347. t.Fatalf("did not connect: %v", err)
  348. }
  349. defer conn.Close()
  350. c := pb.NewGreeterClient(conn)
  351. _, err = c.SayHello(ctx, &pb.HelloRequest{Name: "other_test", Age: 0})
  352. if err == nil {
  353. t.Fatalf("recovery must return error")
  354. }
  355. e, ok := errors.Cause(err).(ecode.Codes)
  356. if !ok {
  357. t.Fatalf("recovery must return ecode error")
  358. }
  359. if !ecode.ServerErr.Equal(e) {
  360. t.Fatalf("recovery must return ecode.RPCClientErr")
  361. }
  362. }
  363. func testServerRecovery(t *testing.T) {
  364. ctx := context.Background()
  365. client := NewClient(&clientConfig)
  366. conn, err := client.Dial(ctx, "127.0.0.1:8080")
  367. if err != nil {
  368. t.Fatalf("did not connect: %v", err)
  369. }
  370. defer conn.Close()
  371. c := pb.NewGreeterClient(conn)
  372. _, err = c.SayHello(ctx, &pb.HelloRequest{Name: "recovery_test", Age: 0})
  373. if err == nil {
  374. t.Fatalf("recovery must return error")
  375. }
  376. e, ok := errors.Cause(err).(ecode.Codes)
  377. if !ok {
  378. t.Fatalf("recovery must return ecode error")
  379. }
  380. if e.Code() != ecode.ServerErr.Code() {
  381. t.Fatalf("recovery must return ecode.ServerErr")
  382. }
  383. }
  384. func testInterceptorChain(t *testing.T) {
  385. // NOTE: don't delete this sleep
  386. time.Sleep(time.Millisecond)
  387. if outPut[0] != "1" || outPut[1] != "3" || outPut[2] != "1" || outPut[3] != "3" || outPut[4] != "4" || outPut[5] != "2" || outPut[6] != "4" || outPut[7] != "2" {
  388. t.Fatalf("outPut shoud be [1 3 1 3 4 2 4 2]!")
  389. }
  390. }
  391. func testErrorDetail(t *testing.T) {
  392. _, err := runClient(context.Background(), &clientConfig2, t, "error_detail", 0)
  393. if err == nil {
  394. t.Fatalf("testErrorDetail must return error")
  395. }
  396. if ec, ok := errors.Cause(err).(ecode.Codes); !ok {
  397. t.Fatalf("testErrorDetail must return ecode error")
  398. } else if ec.Code() != 123456 || ec.Message() != "test_error_detail" || len(ec.Details()) == 0 {
  399. t.Fatalf("testErrorDetail must return code:123456 and message:test_error_detail, code: %d, message: %s, details length: %d", ec.Code(), ec.Message(), len(ec.Details()))
  400. } else if _, ok := ec.Details()[0].(*pb.HelloReply); !ok {
  401. t.Fatalf("expect get pb.HelloReply")
  402. }
  403. }
  404. func testECodeStatus(t *testing.T) {
  405. _, err := runClient(context.Background(), &clientConfig2, t, "ecode_status", 0)
  406. if err == nil {
  407. t.Fatalf("testECodeStatus must return error")
  408. }
  409. st, ok := errors.Cause(err).(*ecode.Status)
  410. if !ok {
  411. t.Fatalf("testECodeStatus must return *ecode.Status")
  412. }
  413. if st.Code() != int(ecode.RequestErr) && st.Message() != "RequestErr" {
  414. t.Fatalf("testECodeStatus must return code: -400, message: RequestErr get: code: %d, message: %s", st.Code(), st.Message())
  415. }
  416. detail := st.Details()[0].(*pb.HelloReply)
  417. if !detail.Success || detail.Message != "status" {
  418. t.Fatalf("wrong detail")
  419. }
  420. }
  421. func testTrace(t *testing.T, port int, isStream bool) {
  422. listener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: port})
  423. if err != nil {
  424. t.Fatalf("listent udp failed, %v", err)
  425. return
  426. }
  427. data := make([]byte, 1024)
  428. strs := make([][]string, 0)
  429. for {
  430. var n int
  431. n, _, err = listener.ReadFromUDP(data)
  432. if err != nil {
  433. t.Fatalf("read from udp faild, %v", err)
  434. }
  435. str := strings.Split(string(data[:n]), _separator)
  436. strs = append(strs, str)
  437. if len(strs) == 2 {
  438. break
  439. }
  440. }
  441. if len(strs[0]) == 0 || len(strs[1]) == 0 {
  442. t.Fatalf("trace str's length must be greater than 0")
  443. }
  444. }
  445. func BenchmarkServer(b *testing.B) {
  446. server := NewServer(&ServerConfig{Addr: "127.0.0.1:8080", Timeout: xtime.Duration(time.Second)})
  447. go func() {
  448. pb.RegisterGreeterServer(server.Server(), &helloServer{})
  449. if _, err := server.Start(); err != nil {
  450. os.Exit(0)
  451. return
  452. }
  453. }()
  454. defer func() {
  455. server.Server().Stop()
  456. }()
  457. client := NewClient(&clientConfig)
  458. conn, err := client.Dial(context.Background(), "127.0.0.1:8080")
  459. if err != nil {
  460. conn.Close()
  461. b.Fatalf("did not connect: %v", err)
  462. }
  463. b.ResetTimer()
  464. b.RunParallel(func(parab *testing.PB) {
  465. for parab.Next() {
  466. c := pb.NewGreeterClient(conn)
  467. resp, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: "benchmark_test", Age: 1})
  468. if err != nil {
  469. conn.Close()
  470. b.Fatalf("c.SayHello failed: %v,req: %v %v", err, "benchmark", 1)
  471. }
  472. if !resp.Success {
  473. b.Error("repsonse not success!")
  474. }
  475. }
  476. })
  477. conn.Close()
  478. }
  479. func TestParseDSN(t *testing.T) {
  480. dsn := "tcp://0.0.0.0:80/?timeout=100ms&idleTimeout=120s&keepaliveInterval=120s&keepaliveTimeout=20s&maxLife=4h&closeWait=3s"
  481. config := parseDSN(dsn)
  482. if config.Network != "tcp" || config.Addr != "0.0.0.0:80" || time.Duration(config.Timeout) != time.Millisecond*100 ||
  483. time.Duration(config.IdleTimeout) != time.Second*120 || time.Duration(config.KeepAliveInterval) != time.Second*120 ||
  484. time.Duration(config.MaxLifeTime) != time.Hour*4 || time.Duration(config.ForceCloseWait) != time.Second*3 || time.Duration(config.KeepAliveTimeout) != time.Second*20 {
  485. t.Fatalf("parseDSN(%s) not compare config result(%+v)", dsn, config)
  486. }
  487. dsn = "unix:///temp/warden.sock?timeout=300ms"
  488. config = parseDSN(dsn)
  489. if config.Network != "unix" || config.Addr != "/temp/warden.sock" || time.Duration(config.Timeout) != time.Millisecond*300 {
  490. t.Fatalf("parseDSN(%s) not compare config result(%+v)", dsn, config)
  491. }
  492. }