1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915 |
- /*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package test
- import (
- "context"
- "crypto/tls"
- "fmt"
- "net"
- "reflect"
- "sync"
- "testing"
- "time"
- "golang.org/x/net/http2"
- "google.golang.org/grpc"
- _ "google.golang.org/grpc/balancer/grpclb"
- "google.golang.org/grpc/balancer/roundrobin"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/internal/channelz"
- "google.golang.org/grpc/internal/leakcheck"
- "google.golang.org/grpc/keepalive"
- "google.golang.org/grpc/resolver"
- "google.golang.org/grpc/resolver/manual"
- "google.golang.org/grpc/status"
- testpb "google.golang.org/grpc/test/grpc_testing"
- "google.golang.org/grpc/testdata"
- )
- func (te *test) startServers(ts testpb.TestServiceServer, num int) {
- for i := 0; i < num; i++ {
- te.startServer(ts)
- te.srvs = append(te.srvs, te.srv.(*grpc.Server))
- te.srvAddrs = append(te.srvAddrs, te.srvAddr)
- te.srv = nil
- te.srvAddr = ""
- }
- }
- func verifyResultWithDelay(f func() (bool, error)) error {
- var ok bool
- var err error
- for i := 0; i < 1000; i++ {
- if ok, err = f(); ok {
- return nil
- }
- time.Sleep(10 * time.Millisecond)
- }
- return err
- }
- func TestCZServerRegistrationAndDeletion(t *testing.T) {
- defer leakcheck.Check(t)
- testcases := []struct {
- total int
- start int64
- length int
- end bool
- }{
- {total: channelz.EntryPerPage, start: 0, length: channelz.EntryPerPage, end: true},
- {total: channelz.EntryPerPage - 1, start: 0, length: channelz.EntryPerPage - 1, end: true},
- {total: channelz.EntryPerPage + 1, start: 0, length: channelz.EntryPerPage, end: false},
- {total: channelz.EntryPerPage + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), length: 0, end: true},
- }
- for _, c := range testcases {
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- te := newTest(t, e)
- te.startServers(&testServer{security: e.security}, c.total)
- ss, end := channelz.GetServers(c.start)
- if len(ss) != c.length || end != c.end {
- t.Fatalf("GetServers(%d) = %+v (len of which: %d), end: %+v, want len(GetServers(%d)) = %d, end: %+v", c.start, ss, len(ss), end, c.start, c.length, c.end)
- }
- te.tearDown()
- ss, end = channelz.GetServers(c.start)
- if len(ss) != 0 || !end {
- t.Fatalf("GetServers(0) = %+v (len of which: %d), end: %+v, want len(GetServers(0)) = 0, end: true", ss, len(ss), end)
- }
- }
- }
- func TestCZTopChannelRegistrationAndDeletion(t *testing.T) {
- defer leakcheck.Check(t)
- testcases := []struct {
- total int
- start int64
- length int
- end bool
- }{
- {total: channelz.EntryPerPage, start: 0, length: channelz.EntryPerPage, end: true},
- {total: channelz.EntryPerPage - 1, start: 0, length: channelz.EntryPerPage - 1, end: true},
- {total: channelz.EntryPerPage + 1, start: 0, length: channelz.EntryPerPage, end: false},
- {total: channelz.EntryPerPage + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), length: 0, end: true},
- }
- for _, c := range testcases {
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- te := newTest(t, e)
- var ccs []*grpc.ClientConn
- for i := 0; i < c.total; i++ {
- cc := te.clientConn()
- te.cc = nil
- // avoid making next dial blocking
- te.srvAddr = ""
- ccs = append(ccs, cc)
- }
- if err := verifyResultWithDelay(func() (bool, error) {
- if tcs, end := channelz.GetTopChannels(c.start); len(tcs) != c.length || end != c.end {
- return false, fmt.Errorf("getTopChannels(%d) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(%d)) = %d, end: %+v", c.start, tcs, len(tcs), end, c.start, c.length, c.end)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- for _, cc := range ccs {
- cc.Close()
- }
- if err := verifyResultWithDelay(func() (bool, error) {
- if tcs, end := channelz.GetTopChannels(c.start); len(tcs) != 0 || !end {
- return false, fmt.Errorf("getTopChannels(0) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(0)) = 0, end: true", tcs, len(tcs), end)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- te.tearDown()
- }
- }
- func TestCZNestedChannelRegistrationAndDeletion(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- // avoid calling API to set balancer type, which will void service config's change of balancer.
- e.balancer = ""
- te := newTest(t, e)
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
- r.InitialAddrs(resolvedAddrs)
- te.resolverScheme = r.Scheme()
- te.clientConn()
- defer te.tearDown()
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].NestedChans) != 1 {
- return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
- r.NewAddress([]resolver.Address{{Addr: "127.0.0.1:0"}})
- // wait for the shutdown of grpclb balancer
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].NestedChans) != 0 {
- return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- num := 3 // number of backends
- te := newTest(t, e)
- var svrAddrs []resolver.Address
- te.startServers(&testServer{security: e.security}, num)
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- for _, a := range te.srvAddrs {
- svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
- }
- r.InitialAddrs(svrAddrs)
- te.resolverScheme = r.Scheme()
- te.clientConn()
- defer te.tearDown()
- // Here, we just wait for all sockets to be up. In the future, if we implement
- // IDLE, we may need to make several rpc calls to create the sockets.
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].SubChans) != num {
- return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans))
- }
- count := 0
- for k := range tcs[0].SubChans {
- sc := channelz.GetSubChannel(k)
- if sc == nil {
- return false, fmt.Errorf("got <nil> subchannel")
- }
- count += len(sc.Sockets)
- }
- if count != num {
- return false, fmt.Errorf("there should be %d sockets not %d", num, count)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- r.NewAddress(svrAddrs[:len(svrAddrs)-1])
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].SubChans) != num-1 {
- return false, fmt.Errorf("there should be %d subchannel not %d", num-1, len(tcs[0].SubChans))
- }
- count := 0
- for k := range tcs[0].SubChans {
- sc := channelz.GetSubChannel(k)
- if sc == nil {
- return false, fmt.Errorf("got <nil> subchannel")
- }
- count += len(sc.Sockets)
- }
- if count != num-1 {
- return false, fmt.Errorf("there should be %d sockets not %d", num-1, count)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZServerSocketRegistrationAndDeletion(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- num := 3 // number of clients
- te := newTest(t, e)
- te.startServer(&testServer{security: e.security})
- defer te.tearDown()
- var ccs []*grpc.ClientConn
- for i := 0; i < num; i++ {
- cc := te.clientConn()
- te.cc = nil
- ccs = append(ccs, cc)
- }
- defer func() {
- for _, c := range ccs[:len(ccs)-1] {
- c.Close()
- }
- }()
- var svrID int64
- if err := verifyResultWithDelay(func() (bool, error) {
- ss, _ := channelz.GetServers(0)
- if len(ss) != 1 {
- return false, fmt.Errorf("there should only be one server, not %d", len(ss))
- }
- if len(ss[0].ListenSockets) != 1 {
- return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets))
- }
- ns, _ := channelz.GetServerSockets(ss[0].ID, 0)
- if len(ns) != num {
- return false, fmt.Errorf("there should be %d normal sockets not %d", num, len(ns))
- }
- svrID = ss[0].ID
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- ccs[len(ccs)-1].Close()
- if err := verifyResultWithDelay(func() (bool, error) {
- ns, _ := channelz.GetServerSockets(svrID, 0)
- if len(ns) != num-1 {
- return false, fmt.Errorf("there should be %d normal sockets not %d", num-1, len(ns))
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZServerListenSocketDeletion(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- s := grpc.NewServer()
- lis, err := net.Listen("tcp", "localhost:0")
- if err != nil {
- t.Fatalf("failed to listen: %v", err)
- }
- go s.Serve(lis)
- if err := verifyResultWithDelay(func() (bool, error) {
- ss, _ := channelz.GetServers(0)
- if len(ss) != 1 {
- return false, fmt.Errorf("there should only be one server, not %d", len(ss))
- }
- if len(ss[0].ListenSockets) != 1 {
- return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets))
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- lis.Close()
- if err := verifyResultWithDelay(func() (bool, error) {
- ss, _ := channelz.GetServers(0)
- if len(ss) != 1 {
- return false, fmt.Errorf("there should be 1 server, not %d", len(ss))
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- s.Stop()
- }
- type dummyChannel struct{}
- func (d *dummyChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
- return &channelz.ChannelInternalMetric{}
- }
- type dummySocket struct{}
- func (d *dummySocket) ChannelzMetric() *channelz.SocketInternalMetric {
- return &channelz.SocketInternalMetric{}
- }
- func TestCZRecusivelyDeletionOfEntry(t *testing.T) {
- // +--+TopChan+---+
- // | |
- // v v
- // +-+SubChan1+--+ SubChan2
- // | |
- // v v
- // Socket1 Socket2
- channelz.NewChannelzStorage()
- topChanID := channelz.RegisterChannel(&dummyChannel{}, 0, "")
- subChanID1 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
- subChanID2 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
- sktID1 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
- sktID2 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
- tcs, _ := channelz.GetTopChannels(0)
- if tcs == nil || len(tcs) != 1 {
- t.Fatalf("There should be one TopChannel entry")
- }
- if len(tcs[0].SubChans) != 2 {
- t.Fatalf("There should be two SubChannel entries")
- }
- sc := channelz.GetSubChannel(subChanID1)
- if sc == nil || len(sc.Sockets) != 2 {
- t.Fatalf("There should be two Socket entries")
- }
- channelz.RemoveEntry(topChanID)
- tcs, _ = channelz.GetTopChannels(0)
- if tcs == nil || len(tcs) != 1 {
- t.Fatalf("There should be one TopChannel entry")
- }
- channelz.RemoveEntry(subChanID1)
- channelz.RemoveEntry(subChanID2)
- tcs, _ = channelz.GetTopChannels(0)
- if tcs == nil || len(tcs) != 1 {
- t.Fatalf("There should be one TopChannel entry")
- }
- if len(tcs[0].SubChans) != 1 {
- t.Fatalf("There should be one SubChannel entry")
- }
- channelz.RemoveEntry(sktID1)
- channelz.RemoveEntry(sktID2)
- tcs, _ = channelz.GetTopChannels(0)
- if tcs != nil {
- t.Fatalf("There should be no TopChannel entry")
- }
- }
- func TestCZChannelMetrics(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- num := 3 // number of backends
- te := newTest(t, e)
- te.maxClientSendMsgSize = newInt(8)
- var svrAddrs []resolver.Address
- te.startServers(&testServer{security: e.security}, num)
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- for _, a := range te.srvAddrs {
- svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
- }
- r.InitialAddrs(svrAddrs)
- te.resolverScheme = r.Scheme()
- cc := te.clientConn()
- defer te.tearDown()
- tc := testpb.NewTestServiceClient(cc)
- if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
- t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
- }
- const smallSize = 1
- const largeSize = 8
- largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
- if err != nil {
- t.Fatal(err)
- }
- req := &testpb.SimpleRequest{
- ResponseType: testpb.PayloadType_COMPRESSABLE,
- ResponseSize: int32(smallSize),
- Payload: largePayload,
- }
- if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
- t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
- }
- stream, err := tc.FullDuplexCall(context.Background())
- if err != nil {
- t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
- }
- defer stream.CloseSend()
- // Here, we just wait for all sockets to be up. In the future, if we implement
- // IDLE, we may need to make several rpc calls to create the sockets.
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].SubChans) != num {
- return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans))
- }
- var cst, csu, cf int64
- for k := range tcs[0].SubChans {
- sc := channelz.GetSubChannel(k)
- if sc == nil {
- return false, fmt.Errorf("got <nil> subchannel")
- }
- cst += sc.ChannelData.CallsStarted
- csu += sc.ChannelData.CallsSucceeded
- cf += sc.ChannelData.CallsFailed
- }
- if cst != 3 {
- return false, fmt.Errorf("there should be 3 CallsStarted not %d", cst)
- }
- if csu != 1 {
- return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", csu)
- }
- if cf != 1 {
- return false, fmt.Errorf("there should be 1 CallsFailed not %d", cf)
- }
- if tcs[0].ChannelData.CallsStarted != 3 {
- return false, fmt.Errorf("there should be 3 CallsStarted not %d", tcs[0].ChannelData.CallsStarted)
- }
- if tcs[0].ChannelData.CallsSucceeded != 1 {
- return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", tcs[0].ChannelData.CallsSucceeded)
- }
- if tcs[0].ChannelData.CallsFailed != 1 {
- return false, fmt.Errorf("there should be 1 CallsFailed not %d", tcs[0].ChannelData.CallsFailed)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZServerMetrics(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- te := newTest(t, e)
- te.maxServerReceiveMsgSize = newInt(8)
- te.startServer(&testServer{security: e.security})
- defer te.tearDown()
- cc := te.clientConn()
- tc := testpb.NewTestServiceClient(cc)
- if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
- t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
- }
- const smallSize = 1
- const largeSize = 8
- largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
- if err != nil {
- t.Fatal(err)
- }
- req := &testpb.SimpleRequest{
- ResponseType: testpb.PayloadType_COMPRESSABLE,
- ResponseSize: int32(smallSize),
- Payload: largePayload,
- }
- if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
- t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
- }
- stream, err := tc.FullDuplexCall(context.Background())
- if err != nil {
- t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
- }
- defer stream.CloseSend()
- if err := verifyResultWithDelay(func() (bool, error) {
- ss, _ := channelz.GetServers(0)
- if len(ss) != 1 {
- return false, fmt.Errorf("there should only be one server, not %d", len(ss))
- }
- if ss[0].ServerData.CallsStarted != 3 {
- return false, fmt.Errorf("there should be 3 CallsStarted not %d", ss[0].ServerData.CallsStarted)
- }
- if ss[0].ServerData.CallsSucceeded != 1 {
- return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", ss[0].ServerData.CallsSucceeded)
- }
- if ss[0].ServerData.CallsFailed != 1 {
- return false, fmt.Errorf("there should be 1 CallsFailed not %d", ss[0].ServerData.CallsFailed)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- type testServiceClientWrapper struct {
- testpb.TestServiceClient
- mu sync.RWMutex
- streamsCreated int
- }
- func (t *testServiceClientWrapper) getCurrentStreamID() uint32 {
- t.mu.RLock()
- defer t.mu.RUnlock()
- return uint32(2*t.streamsCreated - 1)
- }
- func (t *testServiceClientWrapper) EmptyCall(ctx context.Context, in *testpb.Empty, opts ...grpc.CallOption) (*testpb.Empty, error) {
- t.mu.Lock()
- defer t.mu.Unlock()
- t.streamsCreated++
- return t.TestServiceClient.EmptyCall(ctx, in, opts...)
- }
- func (t *testServiceClientWrapper) UnaryCall(ctx context.Context, in *testpb.SimpleRequest, opts ...grpc.CallOption) (*testpb.SimpleResponse, error) {
- t.mu.Lock()
- defer t.mu.Unlock()
- t.streamsCreated++
- return t.TestServiceClient.UnaryCall(ctx, in, opts...)
- }
- func (t *testServiceClientWrapper) StreamingOutputCall(ctx context.Context, in *testpb.StreamingOutputCallRequest, opts ...grpc.CallOption) (testpb.TestService_StreamingOutputCallClient, error) {
- t.mu.Lock()
- defer t.mu.Unlock()
- t.streamsCreated++
- return t.TestServiceClient.StreamingOutputCall(ctx, in, opts...)
- }
- func (t *testServiceClientWrapper) StreamingInputCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_StreamingInputCallClient, error) {
- t.mu.Lock()
- defer t.mu.Unlock()
- t.streamsCreated++
- return t.TestServiceClient.StreamingInputCall(ctx, opts...)
- }
- func (t *testServiceClientWrapper) FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_FullDuplexCallClient, error) {
- t.mu.Lock()
- defer t.mu.Unlock()
- t.streamsCreated++
- return t.TestServiceClient.FullDuplexCall(ctx, opts...)
- }
- func (t *testServiceClientWrapper) HalfDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_HalfDuplexCallClient, error) {
- t.mu.Lock()
- defer t.mu.Unlock()
- t.streamsCreated++
- return t.TestServiceClient.HalfDuplexCall(ctx, opts...)
- }
- func doSuccessfulUnaryCall(tc testpb.TestServiceClient, t *testing.T) {
- if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
- t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
- }
- }
- func doStreamingInputCallWithLargePayload(tc testpb.TestServiceClient, t *testing.T) {
- s, err := tc.StreamingInputCall(context.Background())
- if err != nil {
- t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want <nil>", err)
- }
- payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10000)
- if err != nil {
- t.Fatal(err)
- }
- s.Send(&testpb.StreamingInputCallRequest{Payload: payload})
- }
- func doServerSideFailedUnaryCall(tc testpb.TestServiceClient, t *testing.T) {
- const smallSize = 1
- const largeSize = 2000
- largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
- if err != nil {
- t.Fatal(err)
- }
- req := &testpb.SimpleRequest{
- ResponseType: testpb.PayloadType_COMPRESSABLE,
- ResponseSize: int32(smallSize),
- Payload: largePayload,
- }
- if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
- t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
- }
- }
- func doClientSideInitiatedFailedStream(tc testpb.TestServiceClient, t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- stream, err := tc.FullDuplexCall(ctx)
- if err != nil {
- t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
- }
- const smallSize = 1
- smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
- if err != nil {
- t.Fatal(err)
- }
- sreq := &testpb.StreamingOutputCallRequest{
- ResponseType: testpb.PayloadType_COMPRESSABLE,
- ResponseParameters: []*testpb.ResponseParameters{
- {Size: smallSize},
- },
- Payload: smallPayload,
- }
- if err := stream.Send(sreq); err != nil {
- t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
- }
- if _, err := stream.Recv(); err != nil {
- t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
- }
- // By canceling the call, the client will send rst_stream to end the call, and
- // the stream will failed as a result.
- cancel()
- }
- // This func is to be used to test client side counting of failed streams.
- func doServerSideInitiatedFailedStreamWithRSTStream(tc testpb.TestServiceClient, t *testing.T, l *listenerWrapper) {
- stream, err := tc.FullDuplexCall(context.Background())
- if err != nil {
- t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
- }
- const smallSize = 1
- smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
- if err != nil {
- t.Fatal(err)
- }
- sreq := &testpb.StreamingOutputCallRequest{
- ResponseType: testpb.PayloadType_COMPRESSABLE,
- ResponseParameters: []*testpb.ResponseParameters{
- {Size: smallSize},
- },
- Payload: smallPayload,
- }
- if err := stream.Send(sreq); err != nil {
- t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
- }
- if _, err := stream.Recv(); err != nil {
- t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
- }
- rcw := l.getLastConn()
- if rcw != nil {
- rcw.writeRSTStream(tc.(*testServiceClientWrapper).getCurrentStreamID(), http2.ErrCodeCancel)
- }
- if _, err := stream.Recv(); err == nil {
- t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err)
- }
- }
- // this func is to be used to test client side counting of failed streams.
- func doServerSideInitiatedFailedStreamWithGoAway(tc testpb.TestServiceClient, t *testing.T, l *listenerWrapper) {
- // This call is just to keep the transport from shutting down (socket will be deleted
- // in this case, and we will not be able to get metrics).
- s, err := tc.FullDuplexCall(context.Background())
- if err != nil {
- t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
- }
- if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{
- {
- Size: 1,
- },
- }}); err != nil {
- t.Fatalf("s.Send() failed with error: %v", err)
- }
- if _, err := s.Recv(); err != nil {
- t.Fatalf("s.Recv() failed with error: %v", err)
- }
- s, err = tc.FullDuplexCall(context.Background())
- if err != nil {
- t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
- }
- if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{
- {
- Size: 1,
- },
- }}); err != nil {
- t.Fatalf("s.Send() failed with error: %v", err)
- }
- if _, err := s.Recv(); err != nil {
- t.Fatalf("s.Recv() failed with error: %v", err)
- }
- rcw := l.getLastConn()
- if rcw != nil {
- rcw.writeGoAway(tc.(*testServiceClientWrapper).getCurrentStreamID()-2, http2.ErrCodeCancel, []byte{})
- }
- if _, err := s.Recv(); err == nil {
- t.Fatalf("%v.Recv() = %v, want <non-nil>", s, err)
- }
- }
- // this func is to be used to test client side counting of failed streams.
- func doServerSideInitiatedFailedStreamWithClientBreakFlowControl(tc testpb.TestServiceClient, t *testing.T, dw *dialerWrapper) {
- stream, err := tc.FullDuplexCall(context.Background())
- if err != nil {
- t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
- }
- // sleep here to make sure header frame being sent before the data frame we write directly below.
- time.Sleep(10 * time.Millisecond)
- payload := make([]byte, 65537, 65537)
- dw.getRawConnWrapper().writeRawFrame(http2.FrameData, 0, tc.(*testServiceClientWrapper).getCurrentStreamID(), payload)
- if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
- t.Fatalf("%v.Recv() = %v, want error code: %v", stream, err, codes.ResourceExhausted)
- }
- }
- func doIdleCallToInvokeKeepAlive(tc testpb.TestServiceClient, t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- _, err := tc.FullDuplexCall(ctx)
- if err != nil {
- t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
- }
- // 2500ms allow for 2 keepalives (1000ms per round trip)
- time.Sleep(2500 * time.Millisecond)
- cancel()
- }
- func TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- te := newTest(t, e)
- te.maxServerReceiveMsgSize = newInt(20)
- te.maxClientReceiveMsgSize = newInt(20)
- rcw := te.startServerWithConnControl(&testServer{security: e.security})
- defer te.tearDown()
- cc := te.clientConn()
- tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
- doSuccessfulUnaryCall(tc, t)
- var scID, skID int64
- if err := verifyResultWithDelay(func() (bool, error) {
- tchan, _ := channelz.GetTopChannels(0)
- if len(tchan) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
- }
- if len(tchan[0].SubChans) != 1 {
- return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
- }
- for scID = range tchan[0].SubChans {
- break
- }
- sc := channelz.GetSubChannel(scID)
- if sc == nil {
- return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", scID)
- }
- if len(sc.Sockets) != 1 {
- return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
- }
- for skID = range sc.Sockets {
- break
- }
- skt := channelz.GetSocket(skID)
- sktData := skt.SocketData
- if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
- return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.MessagesSent, sktData.MessagesReceived)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- doServerSideFailedUnaryCall(tc, t)
- if err := verifyResultWithDelay(func() (bool, error) {
- skt := channelz.GetSocket(skID)
- sktData := skt.SocketData
- if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 1 {
- return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (2, 2, 2, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.MessagesSent, sktData.MessagesReceived)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- doClientSideInitiatedFailedStream(tc, t)
- if err := verifyResultWithDelay(func() (bool, error) {
- skt := channelz.GetSocket(skID)
- sktData := skt.SocketData
- if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 3 || sktData.MessagesReceived != 2 {
- return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 3, 2), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- doServerSideInitiatedFailedStreamWithRSTStream(tc, t, rcw)
- if err := verifyResultWithDelay(func() (bool, error) {
- skt := channelz.GetSocket(skID)
- sktData := skt.SocketData
- if sktData.StreamsStarted != 4 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 2 || sktData.MessagesSent != 4 || sktData.MessagesReceived != 3 {
- return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (4, 2, 2, 4, 3), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- doServerSideInitiatedFailedStreamWithGoAway(tc, t, rcw)
- if err := verifyResultWithDelay(func() (bool, error) {
- skt := channelz.GetSocket(skID)
- sktData := skt.SocketData
- if sktData.StreamsStarted != 6 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 3 || sktData.MessagesSent != 6 || sktData.MessagesReceived != 5 {
- return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (6, 2, 3, 6, 5), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- // This test is to complete TestCZClientSocketMetricsStreamsAndMessagesCount and
- // TestCZServerSocketMetricsStreamsAndMessagesCount by adding the test case of
- // server sending RST_STREAM to client due to client side flow control violation.
- // It is separated from other cases due to setup incompatibly, i.e. max receive
- // size violation will mask flow control violation.
- func TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- te := newTest(t, e)
- te.serverInitialWindowSize = 65536
- // Avoid overflowing connection level flow control window, which will lead to
- // transport being closed.
- te.serverInitialConnWindowSize = 65536 * 2
- te.startServer(&testServer{security: e.security})
- defer te.tearDown()
- cc, dw := te.clientConnWithConnControl()
- tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
- doServerSideInitiatedFailedStreamWithClientBreakFlowControl(tc, t, dw)
- if err := verifyResultWithDelay(func() (bool, error) {
- tchan, _ := channelz.GetTopChannels(0)
- if len(tchan) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
- }
- if len(tchan[0].SubChans) != 1 {
- return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
- }
- var id int64
- for id = range tchan[0].SubChans {
- break
- }
- sc := channelz.GetSubChannel(id)
- if sc == nil {
- return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
- }
- if len(sc.Sockets) != 1 {
- return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
- }
- for id = range sc.Sockets {
- break
- }
- skt := channelz.GetSocket(id)
- sktData := skt.SocketData
- if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 {
- return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed)
- }
- ss, _ := channelz.GetServers(0)
- if len(ss) != 1 {
- return false, fmt.Errorf("there should only be one server, not %d", len(ss))
- }
- ns, _ := channelz.GetServerSockets(ss[0].ID, 0)
- if len(ns) != 1 {
- return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns))
- }
- sktData = ns[0].SocketData
- if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 {
- return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- te := newTest(t, e)
- // disable BDP
- te.serverInitialWindowSize = 65536
- te.serverInitialConnWindowSize = 65536
- te.clientInitialWindowSize = 65536
- te.clientInitialConnWindowSize = 65536
- te.startServer(&testServer{security: e.security})
- defer te.tearDown()
- cc := te.clientConn()
- tc := testpb.NewTestServiceClient(cc)
- for i := 0; i < 10; i++ {
- doSuccessfulUnaryCall(tc, t)
- }
- var cliSktID, svrSktID int64
- if err := verifyResultWithDelay(func() (bool, error) {
- tchan, _ := channelz.GetTopChannels(0)
- if len(tchan) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
- }
- if len(tchan[0].SubChans) != 1 {
- return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
- }
- var id int64
- for id = range tchan[0].SubChans {
- break
- }
- sc := channelz.GetSubChannel(id)
- if sc == nil {
- return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
- }
- if len(sc.Sockets) != 1 {
- return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
- }
- for id = range sc.Sockets {
- break
- }
- skt := channelz.GetSocket(id)
- sktData := skt.SocketData
- // 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
- if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
- return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
- }
- ss, _ := channelz.GetServers(0)
- if len(ss) != 1 {
- return false, fmt.Errorf("there should only be one server, not %d", len(ss))
- }
- ns, _ := channelz.GetServerSockets(ss[0].ID, 0)
- sktData = ns[0].SocketData
- if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
- return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
- }
- cliSktID, svrSktID = id, ss[0].ID
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- doStreamingInputCallWithLargePayload(tc, t)
- if err := verifyResultWithDelay(func() (bool, error) {
- skt := channelz.GetSocket(cliSktID)
- sktData := skt.SocketData
- // Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
- // Remote: 65536 - 5 (Length-Prefixed-Message size) * 10 - 10011 = 55475
- if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 55475 {
- return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 55475), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
- }
- ss, _ := channelz.GetServers(0)
- if len(ss) != 1 {
- return false, fmt.Errorf("there should only be one server, not %d", len(ss))
- }
- ns, _ := channelz.GetServerSockets(svrSktID, 0)
- sktData = ns[0].SocketData
- if sktData.LocalFlowControlWindow != 55475 || sktData.RemoteFlowControlWindow != 65486 {
- return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (55475, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- // triggers transport flow control window update on server side, since unacked
- // bytes should be larger than limit now. i.e. 50 + 20022 > 65536/4.
- doStreamingInputCallWithLargePayload(tc, t)
- if err := verifyResultWithDelay(func() (bool, error) {
- skt := channelz.GetSocket(cliSktID)
- sktData := skt.SocketData
- // Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
- // Remote: 65536
- if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65536 {
- return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 65536), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
- }
- ss, _ := channelz.GetServers(0)
- if len(ss) != 1 {
- return false, fmt.Errorf("there should only be one server, not %d", len(ss))
- }
- ns, _ := channelz.GetServerSockets(svrSktID, 0)
- sktData = ns[0].SocketData
- if sktData.LocalFlowControlWindow != 65536 || sktData.RemoteFlowControlWindow != 65486 {
- return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZClientSocketMetricsKeepAlive(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- te := newTest(t, e)
- te.cliKeepAlive = &keepalive.ClientParameters{Time: 500 * time.Millisecond, Timeout: 500 * time.Millisecond}
- te.startServer(&testServer{security: e.security})
- defer te.tearDown()
- cc := te.clientConn()
- tc := testpb.NewTestServiceClient(cc)
- doIdleCallToInvokeKeepAlive(tc, t)
- if err := verifyResultWithDelay(func() (bool, error) {
- tchan, _ := channelz.GetTopChannels(0)
- if len(tchan) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
- }
- if len(tchan[0].SubChans) != 1 {
- return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
- }
- var id int64
- for id = range tchan[0].SubChans {
- break
- }
- sc := channelz.GetSubChannel(id)
- if sc == nil {
- return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
- }
- if len(sc.Sockets) != 1 {
- return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
- }
- for id = range sc.Sockets {
- break
- }
- skt := channelz.GetSocket(id)
- if skt.SocketData.KeepAlivesSent != 2 { // doIdleCallToInvokeKeepAlive func is set up to send 2 KeepAlives.
- return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", skt.SocketData.KeepAlivesSent)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- te := newTest(t, e)
- te.maxServerReceiveMsgSize = newInt(20)
- te.maxClientReceiveMsgSize = newInt(20)
- te.startServer(&testServer{security: e.security})
- defer te.tearDown()
- cc, _ := te.clientConnWithConnControl()
- tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
- var svrID int64
- if err := verifyResultWithDelay(func() (bool, error) {
- ss, _ := channelz.GetServers(0)
- if len(ss) != 1 {
- return false, fmt.Errorf("there should only be one server, not %d", len(ss))
- }
- svrID = ss[0].ID
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- doSuccessfulUnaryCall(tc, t)
- if err := verifyResultWithDelay(func() (bool, error) {
- ns, _ := channelz.GetServerSockets(svrID, 0)
- sktData := ns[0].SocketData
- if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
- return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- doServerSideFailedUnaryCall(tc, t)
- if err := verifyResultWithDelay(func() (bool, error) {
- ns, _ := channelz.GetServerSockets(svrID, 0)
- sktData := ns[0].SocketData
- if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
- return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (2, 2, 0, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- doClientSideInitiatedFailedStream(tc, t)
- if err := verifyResultWithDelay(func() (bool, error) {
- ns, _ := channelz.GetServerSockets(svrID, 0)
- sktData := ns[0].SocketData
- if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 2 {
- return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 2, 2), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZServerSocketMetricsKeepAlive(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- te := newTest(t, e)
- te.svrKeepAlive = &keepalive.ServerParameters{Time: 500 * time.Millisecond, Timeout: 500 * time.Millisecond}
- te.startServer(&testServer{security: e.security})
- defer te.tearDown()
- cc := te.clientConn()
- tc := testpb.NewTestServiceClient(cc)
- doIdleCallToInvokeKeepAlive(tc, t)
- if err := verifyResultWithDelay(func() (bool, error) {
- ss, _ := channelz.GetServers(0)
- if len(ss) != 1 {
- return false, fmt.Errorf("there should be one server, not %d", len(ss))
- }
- ns, _ := channelz.GetServerSockets(ss[0].ID, 0)
- if len(ns) != 1 {
- return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns))
- }
- if ns[0].SocketData.KeepAlivesSent != 2 { // doIdleCallToInvokeKeepAlive func is set up to send 2 KeepAlives.
- return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", ns[0].SocketData.KeepAlivesSent)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- var cipherSuites = []string{
- "TLS_RSA_WITH_RC4_128_SHA",
- "TLS_RSA_WITH_3DES_EDE_CBC_SHA",
- "TLS_RSA_WITH_AES_128_CBC_SHA",
- "TLS_RSA_WITH_AES_256_CBC_SHA",
- "TLS_RSA_WITH_AES_128_GCM_SHA256",
- "TLS_RSA_WITH_AES_256_GCM_SHA384",
- "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA",
- "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
- "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA",
- "TLS_ECDHE_RSA_WITH_RC4_128_SHA",
- "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA",
- "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA",
- "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA",
- "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
- "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
- "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
- "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
- "TLS_FALLBACK_SCSV",
- "TLS_RSA_WITH_AES_128_CBC_SHA256",
- "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256",
- "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
- "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305",
- "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305",
- }
- func TestCZSocketGetSecurityValueTLS(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpTLSRREnv
- te := newTest(t, e)
- te.startServer(&testServer{security: e.security})
- defer te.tearDown()
- te.clientConn()
- if err := verifyResultWithDelay(func() (bool, error) {
- tchan, _ := channelz.GetTopChannels(0)
- if len(tchan) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
- }
- if len(tchan[0].SubChans) != 1 {
- return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
- }
- var id int64
- for id = range tchan[0].SubChans {
- break
- }
- sc := channelz.GetSubChannel(id)
- if sc == nil {
- return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
- }
- if len(sc.Sockets) != 1 {
- return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
- }
- for id = range sc.Sockets {
- break
- }
- skt := channelz.GetSocket(id)
- cert, _ := tls.LoadX509KeyPair(testdata.Path("server1.pem"), testdata.Path("server1.key"))
- securityVal, ok := skt.SocketData.Security.(*credentials.TLSChannelzSecurityValue)
- if !ok {
- return false, fmt.Errorf("the SocketData.Security is of type: %T, want: *credentials.TLSChannelzSecurityValue", skt.SocketData.Security)
- }
- if !reflect.DeepEqual(securityVal.RemoteCertificate, cert.Certificate[0]) {
- return false, fmt.Errorf("SocketData.Security.RemoteCertificate got: %v, want: %v", securityVal.RemoteCertificate, cert.Certificate[0])
- }
- for _, v := range cipherSuites {
- if v == securityVal.StandardName {
- return true, nil
- }
- }
- return false, fmt.Errorf("SocketData.Security.StandardName got: %v, want it to be one of %v ", securityVal.StandardName, cipherSuites)
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZChannelTraceCreationDeletion(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- // avoid calling API to set balancer type, which will void service config's change of balancer.
- e.balancer = ""
- te := newTest(t, e)
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
- r.InitialAddrs(resolvedAddrs)
- te.resolverScheme = r.Scheme()
- te.clientConn()
- defer te.tearDown()
- var nestedConn int64
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].NestedChans) != 1 {
- return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
- }
- for k := range tcs[0].NestedChans {
- nestedConn = k
- }
- for _, e := range tcs[0].Trace.Events {
- if e.RefID == nestedConn && e.RefType != channelz.RefChannel {
- return false, fmt.Errorf("nested channel trace event shoud have RefChannel as RefType")
- }
- }
- ncm := channelz.GetChannel(nestedConn)
- if ncm.Trace == nil {
- return false, fmt.Errorf("trace for nested channel should not be empty")
- }
- if len(ncm.Trace.Events) == 0 {
- return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
- }
- if ncm.Trace.Events[0].Desc != "Channel Created" {
- return false, fmt.Errorf("the first trace event should be \"Channel Created\", not %q", ncm.Trace.Events[0].Desc)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
- r.NewAddress([]resolver.Address{{Addr: "127.0.0.1:0"}})
- // wait for the shutdown of grpclb balancer
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].NestedChans) != 0 {
- return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
- }
- ncm := channelz.GetChannel(nestedConn)
- if ncm == nil {
- return false, fmt.Errorf("nested channel should still exist due to parent's trace reference")
- }
- if ncm.Trace == nil {
- return false, fmt.Errorf("trace for nested channel should not be empty")
- }
- if len(ncm.Trace.Events) == 0 {
- return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
- }
- if ncm.Trace.Events[len(ncm.Trace.Events)-1].Desc != "Channel Deleted" {
- return false, fmt.Errorf("the first trace event should be \"Channel Deleted\", not %q", ncm.Trace.Events[0].Desc)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZSubChannelTraceCreationDeletion(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- te := newTest(t, e)
- te.startServer(&testServer{security: e.security})
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
- te.resolverScheme = r.Scheme()
- te.clientConn()
- defer te.tearDown()
- var subConn int64
- // Here, we just wait for all sockets to be up. In the future, if we implement
- // IDLE, we may need to make several rpc calls to create the sockets.
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].SubChans) != 1 {
- return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
- }
- for k := range tcs[0].SubChans {
- subConn = k
- }
- for _, e := range tcs[0].Trace.Events {
- if e.RefID == subConn && e.RefType != channelz.RefSubChannel {
- return false, fmt.Errorf("subchannel trace event shoud have RefType to be RefSubChannel")
- }
- }
- scm := channelz.GetSubChannel(subConn)
- if scm == nil {
- return false, fmt.Errorf("subChannel does not exist")
- }
- if scm.Trace == nil {
- return false, fmt.Errorf("trace for subChannel should not be empty")
- }
- if len(scm.Trace.Events) == 0 {
- return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
- }
- if scm.Trace.Events[0].Desc != "Subchannel Created" {
- return false, fmt.Errorf("the first trace event should be \"Subchannel Created\", not %q", scm.Trace.Events[0].Desc)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- r.NewAddress([]resolver.Address{})
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].SubChans) != 0 {
- return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans))
- }
- scm := channelz.GetSubChannel(subConn)
- if scm == nil {
- return false, fmt.Errorf("subChannel should still exist due to parent's trace reference")
- }
- if scm.Trace == nil {
- return false, fmt.Errorf("trace for SubChannel should not be empty")
- }
- if len(scm.Trace.Events) == 0 {
- return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
- }
- if got, want := scm.Trace.Events[len(scm.Trace.Events)-1].Desc, "Subchannel Deleted"; got != want {
- return false, fmt.Errorf("the last trace event should be %q, not %q", want, got)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZChannelAddressResolutionChange(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- e.balancer = ""
- te := newTest(t, e)
- te.startServer(&testServer{security: e.security})
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
- te.resolverScheme = r.Scheme()
- te.clientConn()
- defer te.tearDown()
- var cid int64
- // Here, we just wait for all sockets to be up. In the future, if we implement
- // IDLE, we may need to make several rpc calls to create the sockets.
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- cid = tcs[0].ID
- for i := len(tcs[0].Trace.Events) - 1; i >= 0; i-- {
- if tcs[0].Trace.Events[i].Desc == fmt.Sprintf("Resolver returns a non-empty address list (previous one was empty) %q", te.srvAddr) {
- break
- }
- if i == 0 {
- return false, fmt.Errorf("events do not contain expected address resolution from empty address state")
- }
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
- if err := verifyResultWithDelay(func() (bool, error) {
- cm := channelz.GetChannel(cid)
- for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
- if cm.Trace.Events[i].Desc == fmt.Sprintf("Channel switches to new LB policy %q", roundrobin.Name) {
- break
- }
- if i == 0 {
- return false, fmt.Errorf("events do not contain expected address resolution change of LB policy")
- }
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- newSc := `{
- "methodConfig": [
- {
- "name": [
- {
- "service": "grpc.testing.TestService",
- "method": "EmptyCall"
- },
- ],
- "waitForReady": false,
- "timeout": ".001s"
- }
- ]
- }`
- r.NewServiceConfig(newSc)
- if err := verifyResultWithDelay(func() (bool, error) {
- cm := channelz.GetChannel(cid)
- for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
- if cm.Trace.Events[i].Desc == fmt.Sprintf("Channel has a new service config \"%s\"", newSc) {
- break
- }
- if i == 0 {
- return false, fmt.Errorf("events do not contain expected address resolution of new service config")
- }
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- r.NewAddress([]resolver.Address{})
- if err := verifyResultWithDelay(func() (bool, error) {
- cm := channelz.GetChannel(cid)
- for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
- if cm.Trace.Events[i].Desc == "Resolver returns an empty address list" {
- break
- }
- if i == 0 {
- return false, fmt.Errorf("events do not contain expected address resolution of empty address")
- }
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZSubChannelPickedNewAddress(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- e.balancer = ""
- te := newTest(t, e)
- te.startServers(&testServer{security: e.security}, 3)
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- var svrAddrs []resolver.Address
- for _, a := range te.srvAddrs {
- svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
- }
- r.InitialAddrs(svrAddrs)
- te.resolverScheme = r.Scheme()
- cc := te.clientConn()
- defer te.tearDown()
- tc := testpb.NewTestServiceClient(cc)
- // make sure the connection is up
- ctx, cancel := context.WithTimeout(context.Background(), time.Second)
- defer cancel()
- if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
- t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
- }
- te.srvs[0].Stop()
- te.srvs[1].Stop()
- // Here, we just wait for all sockets to be up. In the future, if we implement
- // IDLE, we may need to make several rpc calls to create the sockets.
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].SubChans) != 1 {
- return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
- }
- var subConn int64
- for k := range tcs[0].SubChans {
- subConn = k
- }
- scm := channelz.GetSubChannel(subConn)
- if scm.Trace == nil {
- return false, fmt.Errorf("trace for SubChannel should not be empty")
- }
- if len(scm.Trace.Events) == 0 {
- return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
- }
- for i := len(scm.Trace.Events) - 1; i >= 0; i-- {
- if scm.Trace.Events[i].Desc == fmt.Sprintf("Subchannel picks a new address %q to connect", te.srvAddrs[2]) {
- break
- }
- if i == 0 {
- return false, fmt.Errorf("events do not contain expected address resolution of subchannel picked new address")
- }
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZSubChannelConnectivityState(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- te := newTest(t, e)
- te.startServer(&testServer{security: e.security})
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
- te.resolverScheme = r.Scheme()
- te.customDialOptions = []grpc.DialOption{grpc.WithWaitForHandshake()}
- cc := te.clientConn()
- defer te.tearDown()
- tc := testpb.NewTestServiceClient(cc)
- // make sure the connection is up
- ctx, cancel := context.WithTimeout(context.Background(), time.Second)
- defer cancel()
- if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
- t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
- }
- var subConn int64
- te.srv.Stop()
- if err := verifyResultWithDelay(func() (bool, error) {
- // we need to obtain the SubChannel id before it gets deleted from Channel's children list (due
- // to effect of r.NewAddress([]resolver.Address{}))
- if subConn == 0 {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].SubChans) != 1 {
- return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
- }
- for k := range tcs[0].SubChans {
- // get the SubChannel id for further trace inquiry.
- subConn = k
- }
- }
- scm := channelz.GetSubChannel(subConn)
- if scm == nil {
- return false, fmt.Errorf("subChannel should still exist due to parent's trace reference")
- }
- if scm.Trace == nil {
- return false, fmt.Errorf("trace for SubChannel should not be empty")
- }
- if len(scm.Trace.Events) == 0 {
- return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
- }
- var ready, connecting, transient, shutdown int
- for _, e := range scm.Trace.Events {
- if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) {
- transient++
- }
- }
- // Make sure the SubChannel has already seen transient failure before shutting it down through
- // r.NewAddress([]resolver.Address{}).
- if transient == 0 {
- return false, fmt.Errorf("transient failure has not happened on SubChannel yet")
- }
- transient = 0
- r.NewAddress([]resolver.Address{})
- for _, e := range scm.Trace.Events {
- if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) {
- ready++
- }
- if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Connecting) {
- connecting++
- }
- if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) {
- transient++
- }
- if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Shutdown) {
- shutdown++
- }
- }
- // example:
- // Subchannel Created
- // Subchannel's connectivity state changed to CONNECTING
- // Subchannel picked a new address: "localhost:36011"
- // Subchannel's connectivity state changed to READY
- // Subchannel's connectivity state changed to TRANSIENT_FAILURE
- // Subchannel's connectivity state changed to CONNECTING
- // Subchannel picked a new address: "localhost:36011"
- // Subchannel's connectivity state changed to SHUTDOWN
- // Subchannel Deleted
- if ready != 1 || connecting < 1 || transient < 1 || shutdown != 1 {
- return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, shutdown = %d, want: 1, >=1, >=1, 1", ready, connecting, transient, shutdown)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZChannelConnectivityState(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- te := newTest(t, e)
- te.startServer(&testServer{security: e.security})
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
- te.resolverScheme = r.Scheme()
- te.customDialOptions = []grpc.DialOption{grpc.WithWaitForHandshake()}
- cc := te.clientConn()
- defer te.tearDown()
- tc := testpb.NewTestServiceClient(cc)
- // make sure the connection is up
- ctx, cancel := context.WithTimeout(context.Background(), time.Second)
- defer cancel()
- if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
- t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
- }
- te.srv.Stop()
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- var ready, connecting, transient int
- for _, e := range tcs[0].Trace.Events {
- if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready) {
- ready++
- }
- if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Connecting) {
- connecting++
- }
- if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.TransientFailure) {
- transient++
- }
- }
- // example:
- // Channel Created
- // Adressses resolved (from empty address state): "localhost:40467"
- // SubChannel (id: 4[]) Created
- // Channel's connectivity state changed to CONNECTING
- // Channel's connectivity state changed to READY
- // Channel's connectivity state changed to TRANSIENT_FAILURE
- // Channel's connectivity state changed to CONNECTING
- // Channel's connectivity state changed to TRANSIENT_FAILURE
- if ready != 1 || connecting < 1 || transient < 1 {
- return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, want: 1, >=1, >=1", ready, connecting, transient)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZTraceOverwriteChannelDeletion(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- // avoid calling API to set balancer type, which will void service config's change of balancer.
- e.balancer = ""
- te := newTest(t, e)
- channelz.SetMaxTraceEntry(1)
- defer channelz.ResetMaxTraceEntryToDefault()
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
- r.InitialAddrs(resolvedAddrs)
- te.resolverScheme = r.Scheme()
- te.clientConn()
- defer te.tearDown()
- var nestedConn int64
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].NestedChans) != 1 {
- return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
- }
- for k := range tcs[0].NestedChans {
- nestedConn = k
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
- r.NewAddress([]resolver.Address{{Addr: "127.0.0.1:0"}})
- // wait for the shutdown of grpclb balancer
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].NestedChans) != 0 {
- return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- // verify that the nested channel no longer exist due to trace referencing it got overwritten.
- if err := verifyResultWithDelay(func() (bool, error) {
- cm := channelz.GetChannel(nestedConn)
- if cm != nil {
- return false, fmt.Errorf("nested channel should have been deleted since its parent's trace should not contain any reference to it anymore")
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZTraceOverwriteSubChannelDeletion(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- te := newTest(t, e)
- channelz.SetMaxTraceEntry(1)
- defer channelz.ResetMaxTraceEntryToDefault()
- te.startServer(&testServer{security: e.security})
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
- te.resolverScheme = r.Scheme()
- te.clientConn()
- defer te.tearDown()
- var subConn int64
- // Here, we just wait for all sockets to be up. In the future, if we implement
- // IDLE, we may need to make several rpc calls to create the sockets.
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].SubChans) != 1 {
- return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
- }
- for k := range tcs[0].SubChans {
- subConn = k
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- r.NewAddress([]resolver.Address{})
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].SubChans) != 0 {
- return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans))
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- // verify that the subchannel no longer exist due to trace referencing it got overwritten.
- if err := verifyResultWithDelay(func() (bool, error) {
- cm := channelz.GetChannel(subConn)
- if cm != nil {
- return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore")
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func TestCZTraceTopChannelDeletionTraceClear(t *testing.T) {
- defer leakcheck.Check(t)
- channelz.NewChannelzStorage()
- e := tcpClearRREnv
- te := newTest(t, e)
- te.startServer(&testServer{security: e.security})
- r, cleanup := manual.GenerateAndRegisterManualResolver()
- defer cleanup()
- r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
- te.resolverScheme = r.Scheme()
- te.clientConn()
- var subConn int64
- // Here, we just wait for all sockets to be up. In the future, if we implement
- // IDLE, we may need to make several rpc calls to create the sockets.
- if err := verifyResultWithDelay(func() (bool, error) {
- tcs, _ := channelz.GetTopChannels(0)
- if len(tcs) != 1 {
- return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
- }
- if len(tcs[0].SubChans) != 1 {
- return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
- }
- for k := range tcs[0].SubChans {
- subConn = k
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- te.tearDown()
- // verify that the subchannel no longer exist due to parent channel got deleted and its trace cleared.
- if err := verifyResultWithDelay(func() (bool, error) {
- cm := channelz.GetChannel(subConn)
- if cm != nil {
- return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore")
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
|