123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808 |
- /*
- *
- * Copyright 2016 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package grpc
- import (
- "context"
- "fmt"
- "math"
- "strconv"
- "sync"
- "testing"
- "time"
- "google.golang.org/grpc/codes"
- _ "google.golang.org/grpc/grpclog/glogger"
- "google.golang.org/grpc/internal/leakcheck"
- "google.golang.org/grpc/naming"
- "google.golang.org/grpc/status"
- // V1 balancer tests use passthrough resolver instead of dns.
- // TODO(bar) remove this when removing v1 balaner entirely.
- _ "google.golang.org/grpc/resolver/passthrough"
- )
- func pickFirstBalancerV1(r naming.Resolver) Balancer {
- return &pickFirst{&roundRobin{r: r}}
- }
- type testWatcher struct {
- // the channel to receives name resolution updates
- update chan *naming.Update
- // the side channel to get to know how many updates in a batch
- side chan int
- // the channel to notify update injector that the update reading is done
- readDone chan int
- }
- func (w *testWatcher) Next() (updates []*naming.Update, err error) {
- n := <-w.side
- if n == 0 {
- return nil, fmt.Errorf("w.side is closed")
- }
- for i := 0; i < n; i++ {
- u := <-w.update
- if u != nil {
- updates = append(updates, u)
- }
- }
- w.readDone <- 0
- return
- }
- func (w *testWatcher) Close() {
- close(w.side)
- }
- // Inject naming resolution updates to the testWatcher.
- func (w *testWatcher) inject(updates []*naming.Update) {
- w.side <- len(updates)
- for _, u := range updates {
- w.update <- u
- }
- <-w.readDone
- }
- type testNameResolver struct {
- w *testWatcher
- addr string
- }
- func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
- r.w = &testWatcher{
- update: make(chan *naming.Update, 1),
- side: make(chan int, 1),
- readDone: make(chan int),
- }
- r.w.side <- 1
- r.w.update <- &naming.Update{
- Op: naming.Add,
- Addr: r.addr,
- }
- go func() {
- <-r.w.readDone
- }()
- return r.w, nil
- }
- func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver, func()) {
- var servers []*server
- for i := 0; i < numServers; i++ {
- s := newTestServer()
- servers = append(servers, s)
- go s.start(t, 0, maxStreams)
- s.wait(t, 2*time.Second)
- }
- // Point to server[0]
- addr := "localhost:" + servers[0].port
- return servers, &testNameResolver{
- addr: addr,
- }, func() {
- for i := 0; i < numServers; i++ {
- servers[i].stop()
- }
- }
- }
- func TestNameDiscovery(t *testing.T) {
- defer leakcheck.Check(t)
- // Start 2 servers on 2 ports.
- numServers := 2
- servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
- defer cleanup()
- cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("Failed to create ClientConn: %v", err)
- }
- defer cc.Close()
- req := "port"
- var reply string
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
- t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
- }
- // Inject the name resolution change to remove servers[0] and add servers[1].
- var updates []*naming.Update
- updates = append(updates, &naming.Update{
- Op: naming.Delete,
- Addr: "localhost:" + servers[0].port,
- })
- updates = append(updates, &naming.Update{
- Op: naming.Add,
- Addr: "localhost:" + servers[1].port,
- })
- r.w.inject(updates)
- // Loop until the rpcs in flight talks to servers[1].
- for {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- }
- func TestEmptyAddrs(t *testing.T) {
- defer leakcheck.Check(t)
- servers, r, cleanup := startServers(t, 1, math.MaxUint32)
- defer cleanup()
- cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("Failed to create ClientConn: %v", err)
- }
- defer cc.Close()
- var reply string
- if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
- t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
- }
- // Inject name resolution change to remove the server so that there is no address
- // available after that.
- u := &naming.Update{
- Op: naming.Delete,
- Addr: "localhost:" + servers[0].port,
- }
- r.w.inject([]*naming.Update{u})
- // Loop until the above updates apply.
- for {
- time.Sleep(10 * time.Millisecond)
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
- if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
- cancel()
- break
- }
- cancel()
- }
- }
- func TestRoundRobin(t *testing.T) {
- defer leakcheck.Check(t)
- // Start 3 servers on 3 ports.
- numServers := 3
- servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
- defer cleanup()
- cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("Failed to create ClientConn: %v", err)
- }
- defer cc.Close()
- // Add servers[1] to the service discovery.
- u := &naming.Update{
- Op: naming.Add,
- Addr: "localhost:" + servers[1].port,
- }
- r.w.inject([]*naming.Update{u})
- req := "port"
- var reply string
- // Loop until servers[1] is up
- for {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- // Add server2[2] to the service discovery.
- u = &naming.Update{
- Op: naming.Add,
- Addr: "localhost:" + servers[2].port,
- }
- r.w.inject([]*naming.Update{u})
- // Loop until both servers[2] are up.
- for {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- // Check the incoming RPCs served in a round-robin manner.
- for i := 0; i < 10; i++ {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[i%numServers].port {
- t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port)
- }
- }
- }
- func TestCloseWithPendingRPC(t *testing.T) {
- defer leakcheck.Check(t)
- servers, r, cleanup := startServers(t, 1, math.MaxUint32)
- defer cleanup()
- cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("Failed to create ClientConn: %v", err)
- }
- defer cc.Close()
- var reply string
- if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
- t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
- }
- // Remove the server.
- updates := []*naming.Update{{
- Op: naming.Delete,
- Addr: "localhost:" + servers[0].port,
- }}
- r.w.inject(updates)
- // Loop until the above update applies.
- for {
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
- if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
- cancel()
- break
- }
- time.Sleep(10 * time.Millisecond)
- cancel()
- }
- // Issue 2 RPCs which should be completed with error status once cc is closed.
- var wg sync.WaitGroup
- wg.Add(2)
- go func() {
- defer wg.Done()
- var reply string
- if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
- t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
- }
- }()
- go func() {
- defer wg.Done()
- var reply string
- time.Sleep(5 * time.Millisecond)
- if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
- t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
- }
- }()
- time.Sleep(5 * time.Millisecond)
- cc.Close()
- wg.Wait()
- }
- func TestGetOnWaitChannel(t *testing.T) {
- defer leakcheck.Check(t)
- servers, r, cleanup := startServers(t, 1, math.MaxUint32)
- defer cleanup()
- cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("Failed to create ClientConn: %v", err)
- }
- defer cc.Close()
- // Remove all servers so that all upcoming RPCs will block on waitCh.
- updates := []*naming.Update{{
- Op: naming.Delete,
- Addr: "localhost:" + servers[0].port,
- }}
- r.w.inject(updates)
- for {
- var reply string
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
- if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
- cancel()
- break
- }
- cancel()
- time.Sleep(10 * time.Millisecond)
- }
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- var reply string
- if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
- t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
- }
- }()
- // Add a connected server to get the above RPC through.
- updates = []*naming.Update{{
- Op: naming.Add,
- Addr: "localhost:" + servers[0].port,
- }}
- r.w.inject(updates)
- // Wait until the above RPC succeeds.
- wg.Wait()
- }
- func TestOneServerDown(t *testing.T) {
- defer leakcheck.Check(t)
- // Start 2 servers.
- numServers := 2
- servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
- defer cleanup()
- cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
- if err != nil {
- t.Fatalf("Failed to create ClientConn: %v", err)
- }
- defer cc.Close()
- // Add servers[1] to the service discovery.
- var updates []*naming.Update
- updates = append(updates, &naming.Update{
- Op: naming.Add,
- Addr: "localhost:" + servers[1].port,
- })
- r.w.inject(updates)
- req := "port"
- var reply string
- // Loop until servers[1] is up
- for {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- var wg sync.WaitGroup
- numRPC := 100
- sleepDuration := 10 * time.Millisecond
- wg.Add(1)
- go func() {
- time.Sleep(sleepDuration)
- // After sleepDuration, kill server[0].
- servers[0].stop()
- wg.Done()
- }()
- // All non-failfast RPCs should not block because there's at least one connection available.
- for i := 0; i < numRPC; i++ {
- wg.Add(1)
- go func() {
- time.Sleep(sleepDuration)
- // After sleepDuration, invoke RPC.
- // server[0] is killed around the same time to make it racy between balancer and gRPC internals.
- cc.Invoke(context.Background(), "/foo/bar", &req, &reply, FailFast(false))
- wg.Done()
- }()
- }
- wg.Wait()
- }
- func TestOneAddressRemoval(t *testing.T) {
- defer leakcheck.Check(t)
- // Start 2 servers.
- numServers := 2
- servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
- defer cleanup()
- cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("Failed to create ClientConn: %v", err)
- }
- defer cc.Close()
- // Add servers[1] to the service discovery.
- var updates []*naming.Update
- updates = append(updates, &naming.Update{
- Op: naming.Add,
- Addr: "localhost:" + servers[1].port,
- })
- r.w.inject(updates)
- req := "port"
- var reply string
- // Loop until servers[1] is up
- for {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- var wg sync.WaitGroup
- numRPC := 100
- sleepDuration := 10 * time.Millisecond
- wg.Add(1)
- go func() {
- time.Sleep(sleepDuration)
- // After sleepDuration, delete server[0].
- var updates []*naming.Update
- updates = append(updates, &naming.Update{
- Op: naming.Delete,
- Addr: "localhost:" + servers[0].port,
- })
- r.w.inject(updates)
- wg.Done()
- }()
- // All non-failfast RPCs should not fail because there's at least one connection available.
- for i := 0; i < numRPC; i++ {
- wg.Add(1)
- go func() {
- var reply string
- time.Sleep(sleepDuration)
- // After sleepDuration, invoke RPC.
- // server[0] is removed around the same time to make it racy between balancer and gRPC internals.
- if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
- t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
- }
- wg.Done()
- }()
- }
- wg.Wait()
- }
- func checkServerUp(t *testing.T, currentServer *server) {
- req := "port"
- port := currentServer.port
- cc, err := Dial("passthrough:///localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("Failed to create ClientConn: %v", err)
- }
- defer cc.Close()
- var reply string
- for {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == port {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- }
- func TestPickFirstEmptyAddrs(t *testing.T) {
- defer leakcheck.Check(t)
- servers, r, cleanup := startServers(t, 1, math.MaxUint32)
- defer cleanup()
- cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("Failed to create ClientConn: %v", err)
- }
- defer cc.Close()
- var reply string
- if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
- t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
- }
- // Inject name resolution change to remove the server so that there is no address
- // available after that.
- u := &naming.Update{
- Op: naming.Delete,
- Addr: "localhost:" + servers[0].port,
- }
- r.w.inject([]*naming.Update{u})
- // Loop until the above updates apply.
- for {
- time.Sleep(10 * time.Millisecond)
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
- if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
- cancel()
- break
- }
- cancel()
- }
- }
- func TestPickFirstCloseWithPendingRPC(t *testing.T) {
- defer leakcheck.Check(t)
- servers, r, cleanup := startServers(t, 1, math.MaxUint32)
- defer cleanup()
- cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("Failed to create ClientConn: %v", err)
- }
- defer cc.Close()
- var reply string
- if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
- t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
- }
- // Remove the server.
- updates := []*naming.Update{{
- Op: naming.Delete,
- Addr: "localhost:" + servers[0].port,
- }}
- r.w.inject(updates)
- // Loop until the above update applies.
- for {
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
- if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
- cancel()
- break
- }
- time.Sleep(10 * time.Millisecond)
- cancel()
- }
- // Issue 2 RPCs which should be completed with error status once cc is closed.
- var wg sync.WaitGroup
- wg.Add(2)
- go func() {
- defer wg.Done()
- var reply string
- if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
- t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
- }
- }()
- go func() {
- defer wg.Done()
- var reply string
- time.Sleep(5 * time.Millisecond)
- if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
- t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
- }
- }()
- time.Sleep(5 * time.Millisecond)
- cc.Close()
- wg.Wait()
- }
- func TestPickFirstOrderAllServerUp(t *testing.T) {
- defer leakcheck.Check(t)
- // Start 3 servers on 3 ports.
- numServers := 3
- servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
- defer cleanup()
- cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("Failed to create ClientConn: %v", err)
- }
- defer cc.Close()
- // Add servers[1] and [2] to the service discovery.
- u := &naming.Update{
- Op: naming.Add,
- Addr: "localhost:" + servers[1].port,
- }
- r.w.inject([]*naming.Update{u})
- u = &naming.Update{
- Op: naming.Add,
- Addr: "localhost:" + servers[2].port,
- }
- r.w.inject([]*naming.Update{u})
- // Loop until all 3 servers are up
- checkServerUp(t, servers[0])
- checkServerUp(t, servers[1])
- checkServerUp(t, servers[2])
- // Check the incoming RPCs served in server[0]
- req := "port"
- var reply string
- for i := 0; i < 20; i++ {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
- t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
- }
- time.Sleep(10 * time.Millisecond)
- }
- // Delete server[0] in the balancer, the incoming RPCs served in server[1]
- // For test addrconn, close server[0] instead
- u = &naming.Update{
- Op: naming.Delete,
- Addr: "localhost:" + servers[0].port,
- }
- r.w.inject([]*naming.Update{u})
- // Loop until it changes to server[1]
- for {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- for i := 0; i < 20; i++ {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
- t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
- }
- time.Sleep(10 * time.Millisecond)
- }
- // Add server[0] back to the balancer, the incoming RPCs served in server[1]
- // Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port}
- u = &naming.Update{
- Op: naming.Add,
- Addr: "localhost:" + servers[0].port,
- }
- r.w.inject([]*naming.Update{u})
- for i := 0; i < 20; i++ {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
- t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
- }
- time.Sleep(10 * time.Millisecond)
- }
- // Delete server[1] in the balancer, the incoming RPCs served in server[2]
- u = &naming.Update{
- Op: naming.Delete,
- Addr: "localhost:" + servers[1].port,
- }
- r.w.inject([]*naming.Update{u})
- for {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
- break
- }
- time.Sleep(1 * time.Second)
- }
- for i := 0; i < 20; i++ {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port {
- t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
- }
- time.Sleep(10 * time.Millisecond)
- }
- // Delete server[2] in the balancer, the incoming RPCs served in server[0]
- u = &naming.Update{
- Op: naming.Delete,
- Addr: "localhost:" + servers[2].port,
- }
- r.w.inject([]*naming.Update{u})
- for {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
- break
- }
- time.Sleep(1 * time.Second)
- }
- for i := 0; i < 20; i++ {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
- t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
- }
- time.Sleep(10 * time.Millisecond)
- }
- }
- func TestPickFirstOrderOneServerDown(t *testing.T) {
- defer leakcheck.Check(t)
- // Start 3 servers on 3 ports.
- numServers := 3
- servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
- defer cleanup()
- cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
- if err != nil {
- t.Fatalf("Failed to create ClientConn: %v", err)
- }
- defer cc.Close()
- // Add servers[1] and [2] to the service discovery.
- u := &naming.Update{
- Op: naming.Add,
- Addr: "localhost:" + servers[1].port,
- }
- r.w.inject([]*naming.Update{u})
- u = &naming.Update{
- Op: naming.Add,
- Addr: "localhost:" + servers[2].port,
- }
- r.w.inject([]*naming.Update{u})
- // Loop until all 3 servers are up
- checkServerUp(t, servers[0])
- checkServerUp(t, servers[1])
- checkServerUp(t, servers[2])
- // Check the incoming RPCs served in server[0]
- req := "port"
- var reply string
- for i := 0; i < 20; i++ {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
- t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
- }
- time.Sleep(10 * time.Millisecond)
- }
- // server[0] down, incoming RPCs served in server[1], but the order of Notify still remains
- // {server[0] server[1] server[2]}
- servers[0].stop()
- // Loop until it changes to server[1]
- for {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- for i := 0; i < 20; i++ {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
- t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
- }
- time.Sleep(10 * time.Millisecond)
- }
- // up the server[0] back, the incoming RPCs served in server[1]
- p, _ := strconv.Atoi(servers[0].port)
- servers[0] = newTestServer()
- go servers[0].start(t, p, math.MaxUint32)
- defer servers[0].stop()
- servers[0].wait(t, 2*time.Second)
- checkServerUp(t, servers[0])
- for i := 0; i < 20; i++ {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
- t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
- }
- time.Sleep(10 * time.Millisecond)
- }
- // Delete server[1] in the balancer, the incoming RPCs served in server[0]
- u = &naming.Update{
- Op: naming.Delete,
- Addr: "localhost:" + servers[1].port,
- }
- r.w.inject([]*naming.Update{u})
- for {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
- break
- }
- time.Sleep(1 * time.Second)
- }
- for i := 0; i < 20; i++ {
- if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
- t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
- }
- time.Sleep(10 * time.Millisecond)
- }
- }
- func TestPickFirstOneAddressRemoval(t *testing.T) {
- defer leakcheck.Check(t)
- // Start 2 servers.
- numServers := 2
- servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
- defer cleanup()
- cc, err := Dial("passthrough:///localhost:"+servers[0].port, WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
- if err != nil {
- t.Fatalf("Failed to create ClientConn: %v", err)
- }
- defer cc.Close()
- // Add servers[1] to the service discovery.
- var updates []*naming.Update
- updates = append(updates, &naming.Update{
- Op: naming.Add,
- Addr: "localhost:" + servers[1].port,
- })
- r.w.inject(updates)
- // Create a new cc to Loop until servers[1] is up
- checkServerUp(t, servers[0])
- checkServerUp(t, servers[1])
- var wg sync.WaitGroup
- numRPC := 100
- sleepDuration := 10 * time.Millisecond
- wg.Add(1)
- go func() {
- time.Sleep(sleepDuration)
- // After sleepDuration, delete server[0].
- var updates []*naming.Update
- updates = append(updates, &naming.Update{
- Op: naming.Delete,
- Addr: "localhost:" + servers[0].port,
- })
- r.w.inject(updates)
- wg.Done()
- }()
- // All non-failfast RPCs should not fail because there's at least one connection available.
- for i := 0; i < numRPC; i++ {
- wg.Add(1)
- go func() {
- var reply string
- time.Sleep(sleepDuration)
- // After sleepDuration, invoke RPC.
- // server[0] is removed around the same time to make it racy between balancer and gRPC internals.
- if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
- t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
- }
- wg.Done()
- }()
- }
- wg.Wait()
- }
|