123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- /*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package grpc
- import (
- "context"
- "fmt"
- "sync/atomic"
- "testing"
- "time"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/connectivity"
- _ "google.golang.org/grpc/grpclog/glogger"
- "google.golang.org/grpc/internal/leakcheck"
- "google.golang.org/grpc/internal/transport"
- )
- const goroutineCount = 5
- var (
- testT = &testTransport{}
- testSC = &acBalancerWrapper{ac: &addrConn{
- state: connectivity.Ready,
- transport: testT,
- }}
- testSCNotReady = &acBalancerWrapper{ac: &addrConn{
- state: connectivity.TransientFailure,
- }}
- )
- type testTransport struct {
- transport.ClientTransport
- }
- type testingPicker struct {
- err error
- sc balancer.SubConn
- maxCalled int64
- }
- func (p *testingPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
- if atomic.AddInt64(&p.maxCalled, -1) < 0 {
- return nil, nil, fmt.Errorf("Pick called to many times (> goroutineCount)")
- }
- if p.err != nil {
- return nil, nil, p.err
- }
- return p.sc, nil, nil
- }
- func TestBlockingPickTimeout(t *testing.T) {
- defer leakcheck.Check(t)
- bp := newPickerWrapper()
- ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
- defer cancel()
- if _, _, err := bp.pick(ctx, true, balancer.PickOptions{}); err != context.DeadlineExceeded {
- t.Errorf("bp.pick returned error %v, want DeadlineExceeded", err)
- }
- }
- func TestBlockingPick(t *testing.T) {
- defer leakcheck.Check(t)
- bp := newPickerWrapper()
- // All goroutines should block because picker is nil in bp.
- var finishedCount uint64
- for i := goroutineCount; i > 0; i-- {
- go func() {
- if tr, _, err := bp.pick(context.Background(), true, balancer.PickOptions{}); err != nil || tr != testT {
- t.Errorf("bp.pick returned non-nil error: %v", err)
- }
- atomic.AddUint64(&finishedCount, 1)
- }()
- }
- time.Sleep(50 * time.Millisecond)
- if c := atomic.LoadUint64(&finishedCount); c != 0 {
- t.Errorf("finished goroutines count: %v, want 0", c)
- }
- bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
- }
- func TestBlockingPickNoSubAvailable(t *testing.T) {
- defer leakcheck.Check(t)
- bp := newPickerWrapper()
- var finishedCount uint64
- bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount})
- // All goroutines should block because picker returns no sc available.
- for i := goroutineCount; i > 0; i-- {
- go func() {
- if tr, _, err := bp.pick(context.Background(), true, balancer.PickOptions{}); err != nil || tr != testT {
- t.Errorf("bp.pick returned non-nil error: %v", err)
- }
- atomic.AddUint64(&finishedCount, 1)
- }()
- }
- time.Sleep(50 * time.Millisecond)
- if c := atomic.LoadUint64(&finishedCount); c != 0 {
- t.Errorf("finished goroutines count: %v, want 0", c)
- }
- bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
- }
- func TestBlockingPickTransientWaitforready(t *testing.T) {
- defer leakcheck.Check(t)
- bp := newPickerWrapper()
- bp.updatePicker(&testingPicker{err: balancer.ErrTransientFailure, maxCalled: goroutineCount})
- var finishedCount uint64
- // All goroutines should block because picker returns transientFailure and
- // picks are not failfast.
- for i := goroutineCount; i > 0; i-- {
- go func() {
- if tr, _, err := bp.pick(context.Background(), false, balancer.PickOptions{}); err != nil || tr != testT {
- t.Errorf("bp.pick returned non-nil error: %v", err)
- }
- atomic.AddUint64(&finishedCount, 1)
- }()
- }
- time.Sleep(time.Millisecond)
- if c := atomic.LoadUint64(&finishedCount); c != 0 {
- t.Errorf("finished goroutines count: %v, want 0", c)
- }
- bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
- }
- func TestBlockingPickSCNotReady(t *testing.T) {
- defer leakcheck.Check(t)
- bp := newPickerWrapper()
- bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount})
- var finishedCount uint64
- // All goroutines should block because sc is not ready.
- for i := goroutineCount; i > 0; i-- {
- go func() {
- if tr, _, err := bp.pick(context.Background(), true, balancer.PickOptions{}); err != nil || tr != testT {
- t.Errorf("bp.pick returned non-nil error: %v", err)
- }
- atomic.AddUint64(&finishedCount, 1)
- }()
- }
- time.Sleep(time.Millisecond)
- if c := atomic.LoadUint64(&finishedCount); c != 0 {
- t.Errorf("finished goroutines count: %v, want 0", c)
- }
- bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
- }
|