123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477 |
- /*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package roundrobin_test
- import (
- "context"
- "fmt"
- "net"
- "sync"
- "testing"
- "time"
- "google.golang.org/grpc"
- "google.golang.org/grpc/balancer/roundrobin"
- "google.golang.org/grpc/codes"
- _ "google.golang.org/grpc/grpclog/glogger"
- "google.golang.org/grpc/internal/leakcheck"
- "google.golang.org/grpc/peer"
- "google.golang.org/grpc/resolver"
- "google.golang.org/grpc/resolver/manual"
- "google.golang.org/grpc/status"
- testpb "google.golang.org/grpc/test/grpc_testing"
- )
- type testServer struct {
- testpb.TestServiceServer
- }
- func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
- return &testpb.Empty{}, nil
- }
- func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
- return nil
- }
- type test struct {
- servers []*grpc.Server
- addresses []string
- }
- func (t *test) cleanup() {
- for _, s := range t.servers {
- s.Stop()
- }
- }
- func startTestServers(count int) (_ *test, err error) {
- t := &test{}
- defer func() {
- if err != nil {
- for _, s := range t.servers {
- s.Stop()
- }
- }
- }()
- for i := 0; i < count; i++ {
- lis, err := net.Listen("tcp", "localhost:0")
- if err != nil {
- return nil, fmt.Errorf("Failed to listen %v", err)
- }
- s := grpc.NewServer()
- testpb.RegisterTestServiceServer(s, &testServer{})
- t.servers = append(t.servers, s)
- t.addresses = append(t.addresses, lis.Addr().String())
- go func(s *grpc.Server, l net.Listener) {
- s.Serve(l)
- }(s, lis)
- }
- return t, nil
- }
- func TestOneBackend(t *testing.T) {
- defer leakcheck.Check(t)
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- test, err := startTestServers(1)
- if err != nil {
- t.Fatalf("failed to start servers: %v", err)
- }
- defer test.cleanup()
- cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
- if err != nil {
- t.Fatalf("failed to dial: %v", err)
- }
- defer cc.Close()
- testc := testpb.NewTestServiceClient(cc)
- // The first RPC should fail because there's no address.
- ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
- defer cancel()
- if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
- t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
- }
- r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
- // The second RPC should succeed.
- if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
- t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
- }
- }
- func TestBackendsRoundRobin(t *testing.T) {
- defer leakcheck.Check(t)
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- backendCount := 5
- test, err := startTestServers(backendCount)
- if err != nil {
- t.Fatalf("failed to start servers: %v", err)
- }
- defer test.cleanup()
- cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
- if err != nil {
- t.Fatalf("failed to dial: %v", err)
- }
- defer cc.Close()
- testc := testpb.NewTestServiceClient(cc)
- // The first RPC should fail because there's no address.
- ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
- defer cancel()
- if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
- t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
- }
- var resolvedAddrs []resolver.Address
- for i := 0; i < backendCount; i++ {
- resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
- }
- r.NewAddress(resolvedAddrs)
- var p peer.Peer
- // Make sure connections to all servers are up.
- for si := 0; si < backendCount; si++ {
- var connected bool
- for i := 0; i < 1000; i++ {
- if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
- t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
- }
- if p.Addr.String() == test.addresses[si] {
- connected = true
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !connected {
- t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
- }
- }
- for i := 0; i < 3*backendCount; i++ {
- if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
- t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
- }
- if p.Addr.String() != test.addresses[i%backendCount] {
- t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
- }
- }
- }
- func TestAddressesRemoved(t *testing.T) {
- defer leakcheck.Check(t)
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- test, err := startTestServers(1)
- if err != nil {
- t.Fatalf("failed to start servers: %v", err)
- }
- defer test.cleanup()
- cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
- if err != nil {
- t.Fatalf("failed to dial: %v", err)
- }
- defer cc.Close()
- testc := testpb.NewTestServiceClient(cc)
- // The first RPC should fail because there's no address.
- ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
- defer cancel()
- if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
- t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
- }
- r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
- // The second RPC should succeed.
- if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
- t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
- }
- r.NewAddress([]resolver.Address{})
- for i := 0; i < 1000; i++ {
- ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
- defer cancel()
- if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
- return
- }
- time.Sleep(time.Millisecond)
- }
- t.Fatalf("No RPC failed after removing all addresses, want RPC to fail with DeadlineExceeded")
- }
- func TestCloseWithPendingRPC(t *testing.T) {
- defer leakcheck.Check(t)
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- test, err := startTestServers(1)
- if err != nil {
- t.Fatalf("failed to start servers: %v", err)
- }
- defer test.cleanup()
- cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
- if err != nil {
- t.Fatalf("failed to dial: %v", err)
- }
- testc := testpb.NewTestServiceClient(cc)
- var wg sync.WaitGroup
- for i := 0; i < 3; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- // This RPC blocks until cc is closed.
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded {
- t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing")
- }
- cancel()
- }()
- }
- cc.Close()
- wg.Wait()
- }
- func TestNewAddressWhileBlocking(t *testing.T) {
- defer leakcheck.Check(t)
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- test, err := startTestServers(1)
- if err != nil {
- t.Fatalf("failed to start servers: %v", err)
- }
- defer test.cleanup()
- cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
- if err != nil {
- t.Fatalf("failed to dial: %v", err)
- }
- defer cc.Close()
- testc := testpb.NewTestServiceClient(cc)
- // The first RPC should fail because there's no address.
- ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
- defer cancel()
- if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
- t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
- }
- r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
- // The second RPC should succeed.
- ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
- defer cancel()
- if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
- t.Fatalf("EmptyCall() = _, %v, want _, nil", err)
- }
- r.NewAddress([]resolver.Address{})
- var wg sync.WaitGroup
- for i := 0; i < 3; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- // This RPC blocks until NewAddress is called.
- testc.EmptyCall(context.Background(), &testpb.Empty{})
- }()
- }
- time.Sleep(50 * time.Millisecond)
- r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
- wg.Wait()
- }
- func TestOneServerDown(t *testing.T) {
- defer leakcheck.Check(t)
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- backendCount := 3
- test, err := startTestServers(backendCount)
- if err != nil {
- t.Fatalf("failed to start servers: %v", err)
- }
- defer test.cleanup()
- cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
- if err != nil {
- t.Fatalf("failed to dial: %v", err)
- }
- defer cc.Close()
- testc := testpb.NewTestServiceClient(cc)
- // The first RPC should fail because there's no address.
- ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
- defer cancel()
- if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
- t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
- }
- var resolvedAddrs []resolver.Address
- for i := 0; i < backendCount; i++ {
- resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
- }
- r.NewAddress(resolvedAddrs)
- var p peer.Peer
- // Make sure connections to all servers are up.
- for si := 0; si < backendCount; si++ {
- var connected bool
- for i := 0; i < 1000; i++ {
- if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
- t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
- }
- if p.Addr.String() == test.addresses[si] {
- connected = true
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !connected {
- t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
- }
- }
- for i := 0; i < 3*backendCount; i++ {
- if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
- t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
- }
- if p.Addr.String() != test.addresses[i%backendCount] {
- t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
- }
- }
- // Stop one server, RPCs should roundrobin among the remaining servers.
- backendCount--
- test.servers[backendCount].Stop()
- // Loop until see server[backendCount-1] twice without seeing server[backendCount].
- var targetSeen int
- for i := 0; i < 1000; i++ {
- if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
- targetSeen = 0
- t.Logf("EmptyCall() = _, %v, want _, <nil>", err)
- // Due to a race, this RPC could possibly get the connection that
- // was closing, and this RPC may fail. Keep trying when this
- // happens.
- continue
- }
- switch p.Addr.String() {
- case test.addresses[backendCount-1]:
- targetSeen++
- case test.addresses[backendCount]:
- // Reset targetSeen if peer is server[backendCount].
- targetSeen = 0
- }
- // Break to make sure the last picked address is server[-1], so the following for loop won't be flaky.
- if targetSeen >= 2 {
- break
- }
- }
- if targetSeen != 2 {
- t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]")
- }
- for i := 0; i < 3*backendCount; i++ {
- if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
- t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
- }
- if p.Addr.String() != test.addresses[i%backendCount] {
- t.Errorf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
- }
- }
- }
- func TestAllServersDown(t *testing.T) {
- defer leakcheck.Check(t)
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- backendCount := 3
- test, err := startTestServers(backendCount)
- if err != nil {
- t.Fatalf("failed to start servers: %v", err)
- }
- defer test.cleanup()
- cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
- if err != nil {
- t.Fatalf("failed to dial: %v", err)
- }
- defer cc.Close()
- testc := testpb.NewTestServiceClient(cc)
- // The first RPC should fail because there's no address.
- ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
- defer cancel()
- if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
- t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
- }
- var resolvedAddrs []resolver.Address
- for i := 0; i < backendCount; i++ {
- resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
- }
- r.NewAddress(resolvedAddrs)
- var p peer.Peer
- // Make sure connections to all servers are up.
- for si := 0; si < backendCount; si++ {
- var connected bool
- for i := 0; i < 1000; i++ {
- if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
- t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
- }
- if p.Addr.String() == test.addresses[si] {
- connected = true
- break
- }
- time.Sleep(time.Millisecond)
- }
- if !connected {
- t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
- }
- }
- for i := 0; i < 3*backendCount; i++ {
- if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
- t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
- }
- if p.Addr.String() != test.addresses[i%backendCount] {
- t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
- }
- }
- // All servers are stopped, failfast RPC should fail with unavailable.
- for i := 0; i < backendCount; i++ {
- test.servers[i].Stop()
- }
- time.Sleep(100 * time.Millisecond)
- for i := 0; i < 1000; i++ {
- if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.Unavailable {
- return
- }
- time.Sleep(time.Millisecond)
- }
- t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped")
- }
|