123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515 |
- /*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package grpc
- import (
- "context"
- "net"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "golang.org/x/net/http2"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/internal/leakcheck"
- "google.golang.org/grpc/internal/testutils"
- "google.golang.org/grpc/resolver"
- "google.golang.org/grpc/resolver/manual"
- )
- const stateRecordingBalancerName = "state_recoding_balancer"
- var testBalancer = &stateRecordingBalancer{}
- func init() {
- balancer.Register(testBalancer)
- }
- // These tests use a pipeListener. This listener is similar to net.Listener except that it is unbuffered, so each read
- // and write will wait for the other side's corresponding write or read.
- func TestStateTransitions_SingleAddress(t *testing.T) {
- defer leakcheck.Check(t)
- mctBkp := getMinConnectTimeout()
- defer func() {
- atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(mctBkp))
- }()
- atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(time.Millisecond)*100)
- for _, test := range []struct {
- desc string
- want []connectivity.State
- server func(net.Listener) net.Conn
- }{
- {
- desc: "When the server returns server preface, the client enters READY.",
- want: []connectivity.State{
- connectivity.Connecting,
- connectivity.Ready,
- },
- server: func(lis net.Listener) net.Conn {
- conn, err := lis.Accept()
- if err != nil {
- t.Error(err)
- return nil
- }
- go keepReading(conn)
- framer := http2.NewFramer(conn, conn)
- if err := framer.WriteSettings(http2.Setting{}); err != nil {
- t.Errorf("Error while writing settings frame. %v", err)
- return nil
- }
- return conn
- },
- },
- {
- desc: "When the connection is closed, the client enters TRANSIENT FAILURE.",
- want: []connectivity.State{
- connectivity.Connecting,
- connectivity.TransientFailure,
- },
- server: func(lis net.Listener) net.Conn {
- conn, err := lis.Accept()
- if err != nil {
- t.Error(err)
- return nil
- }
- conn.Close()
- return nil
- },
- },
- {
- desc: `When the server sends its connection preface, but the connection dies before the client can write its
- connection preface, the client enters TRANSIENT FAILURE.`,
- want: []connectivity.State{
- connectivity.Connecting,
- connectivity.TransientFailure,
- },
- server: func(lis net.Listener) net.Conn {
- conn, err := lis.Accept()
- if err != nil {
- t.Error(err)
- return nil
- }
- framer := http2.NewFramer(conn, conn)
- if err := framer.WriteSettings(http2.Setting{}); err != nil {
- t.Errorf("Error while writing settings frame. %v", err)
- return nil
- }
- conn.Close()
- return nil
- },
- },
- {
- desc: `When the server reads the client connection preface but does not send its connection preface, the
- client enters TRANSIENT FAILURE.`,
- want: []connectivity.State{
- connectivity.Connecting,
- connectivity.TransientFailure,
- },
- server: func(lis net.Listener) net.Conn {
- conn, err := lis.Accept()
- if err != nil {
- t.Error(err)
- return nil
- }
- go keepReading(conn)
- return conn
- },
- },
- } {
- t.Log(test.desc)
- testStateTransitionSingleAddress(t, test.want, test.server)
- }
- }
- func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) {
- defer leakcheck.Check(t)
- stateNotifications := make(chan connectivity.State, len(want))
- testBalancer.ResetNotifier(stateNotifications)
- ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- defer cancel()
- pl := testutils.NewPipeListener()
- defer pl.Close()
- // Launch the server.
- var conn net.Conn
- var connMu sync.Mutex
- go func() {
- connMu.Lock()
- conn = server(pl)
- connMu.Unlock()
- }()
- client, err := DialContext(ctx, "", WithWaitForHandshake(), WithInsecure(),
- WithBalancerName(stateRecordingBalancerName), WithDialer(pl.Dialer()), withBackoff(noBackoff{}))
- if err != nil {
- t.Fatal(err)
- }
- defer client.Close()
- timeout := time.After(5 * time.Second)
- for i := 0; i < len(want); i++ {
- select {
- case <-timeout:
- t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
- case seen := <-stateNotifications:
- if seen != want[i] {
- t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
- }
- }
- }
- connMu.Lock()
- defer connMu.Unlock()
- if conn != nil {
- err = conn.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
- }
- // When a READY connection is closed, the client enters TRANSIENT FAILURE before CONNECTING.
- func TestStateTransition_ReadyToTransientFailure(t *testing.T) {
- defer leakcheck.Check(t)
- want := []connectivity.State{
- connectivity.Connecting,
- connectivity.Ready,
- connectivity.TransientFailure,
- connectivity.Connecting,
- }
- stateNotifications := make(chan connectivity.State, len(want))
- testBalancer.ResetNotifier(stateNotifications)
- ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- defer cancel()
- lis, err := net.Listen("tcp", "localhost:0")
- if err != nil {
- t.Fatalf("Error while listening. Err: %v", err)
- }
- defer lis.Close()
- sawReady := make(chan struct{})
- // Launch the server.
- go func() {
- conn, err := lis.Accept()
- if err != nil {
- t.Error(err)
- return
- }
- go keepReading(conn)
- framer := http2.NewFramer(conn, conn)
- if err := framer.WriteSettings(http2.Setting{}); err != nil {
- t.Errorf("Error while writing settings frame. %v", err)
- return
- }
- // Prevents race between onPrefaceReceipt and onClose.
- <-sawReady
- conn.Close()
- }()
- client, err := DialContext(ctx, lis.Addr().String(), WithWaitForHandshake(), WithInsecure(), WithBalancerName(stateRecordingBalancerName))
- if err != nil {
- t.Fatal(err)
- }
- defer client.Close()
- timeout := time.After(5 * time.Second)
- for i := 0; i < len(want); i++ {
- select {
- case <-timeout:
- t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
- case seen := <-stateNotifications:
- if seen == connectivity.Ready {
- close(sawReady)
- }
- if seen != want[i] {
- t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
- }
- }
- }
- }
- // When the first connection is closed, the client enters stays in CONNECTING until it tries the second
- // address (which succeeds, and then it enters READY).
- func TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
- defer leakcheck.Check(t)
- want := []connectivity.State{
- connectivity.Connecting,
- connectivity.Ready,
- }
- stateNotifications := make(chan connectivity.State, len(want))
- testBalancer.ResetNotifier(stateNotifications)
- ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- defer cancel()
- lis1, err := net.Listen("tcp", "localhost:0")
- if err != nil {
- t.Fatalf("Error while listening. Err: %v", err)
- }
- defer lis1.Close()
- lis2, err := net.Listen("tcp", "localhost:0")
- if err != nil {
- t.Fatalf("Error while listening. Err: %v", err)
- }
- defer lis2.Close()
- server1Done := make(chan struct{})
- server2Done := make(chan struct{})
- // Launch server 1.
- go func() {
- conn, err := lis1.Accept()
- if err != nil {
- t.Error(err)
- return
- }
- conn.Close()
- close(server1Done)
- }()
- // Launch server 2.
- go func() {
- conn, err := lis2.Accept()
- if err != nil {
- t.Error(err)
- return
- }
- go keepReading(conn)
- framer := http2.NewFramer(conn, conn)
- if err := framer.WriteSettings(http2.Setting{}); err != nil {
- t.Errorf("Error while writing settings frame. %v", err)
- return
- }
- close(server2Done)
- }()
- rb := manual.NewBuilderWithScheme("whatever")
- rb.InitialAddrs([]resolver.Address{
- {Addr: lis1.Addr().String()},
- {Addr: lis2.Addr().String()},
- })
- client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithWaitForHandshake(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
- if err != nil {
- t.Fatal(err)
- }
- defer client.Close()
- timeout := time.After(5 * time.Second)
- for i := 0; i < len(want); i++ {
- select {
- case <-timeout:
- t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
- case seen := <-stateNotifications:
- if seen != want[i] {
- t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
- }
- }
- }
- select {
- case <-timeout:
- t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
- case <-server1Done:
- }
- select {
- case <-timeout:
- t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2")
- case <-server2Done:
- }
- }
- // When there are multiple addresses, and we enter READY on one of them, a later closure should cause
- // the client to enter TRANSIENT FAILURE before it re-enters CONNECTING.
- func TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
- defer leakcheck.Check(t)
- want := []connectivity.State{
- connectivity.Connecting,
- connectivity.Ready,
- connectivity.TransientFailure,
- connectivity.Connecting,
- }
- stateNotifications := make(chan connectivity.State, len(want))
- testBalancer.ResetNotifier(stateNotifications)
- ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- defer cancel()
- lis1, err := net.Listen("tcp", "localhost:0")
- if err != nil {
- t.Fatalf("Error while listening. Err: %v", err)
- }
- defer lis1.Close()
- // Never actually gets used; we just want it to be alive so that the resolver has two addresses to target.
- lis2, err := net.Listen("tcp", "localhost:0")
- if err != nil {
- t.Fatalf("Error while listening. Err: %v", err)
- }
- defer lis2.Close()
- server1Done := make(chan struct{})
- sawReady := make(chan struct{})
- // Launch server 1.
- go func() {
- conn, err := lis1.Accept()
- if err != nil {
- t.Error(err)
- return
- }
- go keepReading(conn)
- framer := http2.NewFramer(conn, conn)
- if err := framer.WriteSettings(http2.Setting{}); err != nil {
- t.Errorf("Error while writing settings frame. %v", err)
- return
- }
- <-sawReady
- conn.Close()
- _, err = lis1.Accept()
- if err != nil {
- t.Error(err)
- return
- }
- close(server1Done)
- }()
- rb := manual.NewBuilderWithScheme("whatever")
- rb.InitialAddrs([]resolver.Address{
- {Addr: lis1.Addr().String()},
- {Addr: lis2.Addr().String()},
- })
- client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithWaitForHandshake(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
- if err != nil {
- t.Fatal(err)
- }
- defer client.Close()
- timeout := time.After(2 * time.Second)
- for i := 0; i < len(want); i++ {
- select {
- case <-timeout:
- t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
- case seen := <-stateNotifications:
- if seen == connectivity.Ready {
- close(sawReady)
- }
- if seen != want[i] {
- t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
- }
- }
- }
- select {
- case <-timeout:
- t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
- case <-server1Done:
- }
- }
- type stateRecordingBalancer struct {
- mu sync.Mutex
- notifier chan<- connectivity.State
- balancer.Balancer
- }
- func (b *stateRecordingBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
- b.mu.Lock()
- b.notifier <- s
- b.mu.Unlock()
- b.Balancer.HandleSubConnStateChange(sc, s)
- }
- func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) {
- b.mu.Lock()
- defer b.mu.Unlock()
- b.notifier = r
- }
- func (b *stateRecordingBalancer) Close() {
- b.mu.Lock()
- u := b.Balancer
- b.mu.Unlock()
- u.Close()
- }
- func (b *stateRecordingBalancer) Name() string {
- return stateRecordingBalancerName
- }
- func (b *stateRecordingBalancer) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
- b.mu.Lock()
- b.Balancer = balancer.Get(PickFirstBalancerName).Build(cc, opts)
- b.mu.Unlock()
- return b
- }
- type noBackoff struct{}
- func (b noBackoff) Backoff(int) time.Duration { return time.Duration(0) }
- // Keep reading until something causes the connection to die (EOF, server closed, etc). Useful
- // as a tool for mindlessly keeping the connection healthy, since the client will error if
- // things like client prefaces are not accepted in a timely fashion.
- func keepReading(conn net.Conn) {
- buf := make([]byte, 1024)
- for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) {
- }
- }
|