channelz_test.go 66 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package test
  19. import (
  20. "context"
  21. "crypto/tls"
  22. "fmt"
  23. "net"
  24. "reflect"
  25. "sync"
  26. "testing"
  27. "time"
  28. "golang.org/x/net/http2"
  29. "google.golang.org/grpc"
  30. _ "google.golang.org/grpc/balancer/grpclb"
  31. "google.golang.org/grpc/balancer/roundrobin"
  32. "google.golang.org/grpc/codes"
  33. "google.golang.org/grpc/connectivity"
  34. "google.golang.org/grpc/credentials"
  35. "google.golang.org/grpc/internal/channelz"
  36. "google.golang.org/grpc/internal/leakcheck"
  37. "google.golang.org/grpc/keepalive"
  38. "google.golang.org/grpc/resolver"
  39. "google.golang.org/grpc/resolver/manual"
  40. "google.golang.org/grpc/status"
  41. testpb "google.golang.org/grpc/test/grpc_testing"
  42. "google.golang.org/grpc/testdata"
  43. )
  44. func (te *test) startServers(ts testpb.TestServiceServer, num int) {
  45. for i := 0; i < num; i++ {
  46. te.startServer(ts)
  47. te.srvs = append(te.srvs, te.srv.(*grpc.Server))
  48. te.srvAddrs = append(te.srvAddrs, te.srvAddr)
  49. te.srv = nil
  50. te.srvAddr = ""
  51. }
  52. }
  53. func verifyResultWithDelay(f func() (bool, error)) error {
  54. var ok bool
  55. var err error
  56. for i := 0; i < 1000; i++ {
  57. if ok, err = f(); ok {
  58. return nil
  59. }
  60. time.Sleep(10 * time.Millisecond)
  61. }
  62. return err
  63. }
  64. func TestCZServerRegistrationAndDeletion(t *testing.T) {
  65. defer leakcheck.Check(t)
  66. testcases := []struct {
  67. total int
  68. start int64
  69. length int
  70. end bool
  71. }{
  72. {total: channelz.EntryPerPage, start: 0, length: channelz.EntryPerPage, end: true},
  73. {total: channelz.EntryPerPage - 1, start: 0, length: channelz.EntryPerPage - 1, end: true},
  74. {total: channelz.EntryPerPage + 1, start: 0, length: channelz.EntryPerPage, end: false},
  75. {total: channelz.EntryPerPage + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), length: 0, end: true},
  76. }
  77. for _, c := range testcases {
  78. channelz.NewChannelzStorage()
  79. e := tcpClearRREnv
  80. te := newTest(t, e)
  81. te.startServers(&testServer{security: e.security}, c.total)
  82. ss, end := channelz.GetServers(c.start)
  83. if len(ss) != c.length || end != c.end {
  84. t.Fatalf("GetServers(%d) = %+v (len of which: %d), end: %+v, want len(GetServers(%d)) = %d, end: %+v", c.start, ss, len(ss), end, c.start, c.length, c.end)
  85. }
  86. te.tearDown()
  87. ss, end = channelz.GetServers(c.start)
  88. if len(ss) != 0 || !end {
  89. t.Fatalf("GetServers(0) = %+v (len of which: %d), end: %+v, want len(GetServers(0)) = 0, end: true", ss, len(ss), end)
  90. }
  91. }
  92. }
  93. func TestCZTopChannelRegistrationAndDeletion(t *testing.T) {
  94. defer leakcheck.Check(t)
  95. testcases := []struct {
  96. total int
  97. start int64
  98. length int
  99. end bool
  100. }{
  101. {total: channelz.EntryPerPage, start: 0, length: channelz.EntryPerPage, end: true},
  102. {total: channelz.EntryPerPage - 1, start: 0, length: channelz.EntryPerPage - 1, end: true},
  103. {total: channelz.EntryPerPage + 1, start: 0, length: channelz.EntryPerPage, end: false},
  104. {total: channelz.EntryPerPage + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), length: 0, end: true},
  105. }
  106. for _, c := range testcases {
  107. channelz.NewChannelzStorage()
  108. e := tcpClearRREnv
  109. te := newTest(t, e)
  110. var ccs []*grpc.ClientConn
  111. for i := 0; i < c.total; i++ {
  112. cc := te.clientConn()
  113. te.cc = nil
  114. // avoid making next dial blocking
  115. te.srvAddr = ""
  116. ccs = append(ccs, cc)
  117. }
  118. if err := verifyResultWithDelay(func() (bool, error) {
  119. if tcs, end := channelz.GetTopChannels(c.start); len(tcs) != c.length || end != c.end {
  120. return false, fmt.Errorf("getTopChannels(%d) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(%d)) = %d, end: %+v", c.start, tcs, len(tcs), end, c.start, c.length, c.end)
  121. }
  122. return true, nil
  123. }); err != nil {
  124. t.Fatal(err)
  125. }
  126. for _, cc := range ccs {
  127. cc.Close()
  128. }
  129. if err := verifyResultWithDelay(func() (bool, error) {
  130. if tcs, end := channelz.GetTopChannels(c.start); len(tcs) != 0 || !end {
  131. return false, fmt.Errorf("getTopChannels(0) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(0)) = 0, end: true", tcs, len(tcs), end)
  132. }
  133. return true, nil
  134. }); err != nil {
  135. t.Fatal(err)
  136. }
  137. te.tearDown()
  138. }
  139. }
  140. func TestCZNestedChannelRegistrationAndDeletion(t *testing.T) {
  141. defer leakcheck.Check(t)
  142. channelz.NewChannelzStorage()
  143. e := tcpClearRREnv
  144. // avoid calling API to set balancer type, which will void service config's change of balancer.
  145. e.balancer = ""
  146. te := newTest(t, e)
  147. r, cleanup := manual.GenerateAndRegisterManualResolver()
  148. defer cleanup()
  149. resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
  150. r.InitialAddrs(resolvedAddrs)
  151. te.resolverScheme = r.Scheme()
  152. te.clientConn()
  153. defer te.tearDown()
  154. if err := verifyResultWithDelay(func() (bool, error) {
  155. tcs, _ := channelz.GetTopChannels(0)
  156. if len(tcs) != 1 {
  157. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  158. }
  159. if len(tcs[0].NestedChans) != 1 {
  160. return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
  161. }
  162. return true, nil
  163. }); err != nil {
  164. t.Fatal(err)
  165. }
  166. r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
  167. r.NewAddress([]resolver.Address{{Addr: "127.0.0.1:0"}})
  168. // wait for the shutdown of grpclb balancer
  169. if err := verifyResultWithDelay(func() (bool, error) {
  170. tcs, _ := channelz.GetTopChannels(0)
  171. if len(tcs) != 1 {
  172. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  173. }
  174. if len(tcs[0].NestedChans) != 0 {
  175. return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
  176. }
  177. return true, nil
  178. }); err != nil {
  179. t.Fatal(err)
  180. }
  181. }
  182. func TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) {
  183. defer leakcheck.Check(t)
  184. channelz.NewChannelzStorage()
  185. e := tcpClearRREnv
  186. num := 3 // number of backends
  187. te := newTest(t, e)
  188. var svrAddrs []resolver.Address
  189. te.startServers(&testServer{security: e.security}, num)
  190. r, cleanup := manual.GenerateAndRegisterManualResolver()
  191. defer cleanup()
  192. for _, a := range te.srvAddrs {
  193. svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
  194. }
  195. r.InitialAddrs(svrAddrs)
  196. te.resolverScheme = r.Scheme()
  197. te.clientConn()
  198. defer te.tearDown()
  199. // Here, we just wait for all sockets to be up. In the future, if we implement
  200. // IDLE, we may need to make several rpc calls to create the sockets.
  201. if err := verifyResultWithDelay(func() (bool, error) {
  202. tcs, _ := channelz.GetTopChannels(0)
  203. if len(tcs) != 1 {
  204. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  205. }
  206. if len(tcs[0].SubChans) != num {
  207. return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans))
  208. }
  209. count := 0
  210. for k := range tcs[0].SubChans {
  211. sc := channelz.GetSubChannel(k)
  212. if sc == nil {
  213. return false, fmt.Errorf("got <nil> subchannel")
  214. }
  215. count += len(sc.Sockets)
  216. }
  217. if count != num {
  218. return false, fmt.Errorf("there should be %d sockets not %d", num, count)
  219. }
  220. return true, nil
  221. }); err != nil {
  222. t.Fatal(err)
  223. }
  224. r.NewAddress(svrAddrs[:len(svrAddrs)-1])
  225. if err := verifyResultWithDelay(func() (bool, error) {
  226. tcs, _ := channelz.GetTopChannels(0)
  227. if len(tcs) != 1 {
  228. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  229. }
  230. if len(tcs[0].SubChans) != num-1 {
  231. return false, fmt.Errorf("there should be %d subchannel not %d", num-1, len(tcs[0].SubChans))
  232. }
  233. count := 0
  234. for k := range tcs[0].SubChans {
  235. sc := channelz.GetSubChannel(k)
  236. if sc == nil {
  237. return false, fmt.Errorf("got <nil> subchannel")
  238. }
  239. count += len(sc.Sockets)
  240. }
  241. if count != num-1 {
  242. return false, fmt.Errorf("there should be %d sockets not %d", num-1, count)
  243. }
  244. return true, nil
  245. }); err != nil {
  246. t.Fatal(err)
  247. }
  248. }
  249. func TestCZServerSocketRegistrationAndDeletion(t *testing.T) {
  250. defer leakcheck.Check(t)
  251. channelz.NewChannelzStorage()
  252. e := tcpClearRREnv
  253. num := 3 // number of clients
  254. te := newTest(t, e)
  255. te.startServer(&testServer{security: e.security})
  256. defer te.tearDown()
  257. var ccs []*grpc.ClientConn
  258. for i := 0; i < num; i++ {
  259. cc := te.clientConn()
  260. te.cc = nil
  261. ccs = append(ccs, cc)
  262. }
  263. defer func() {
  264. for _, c := range ccs[:len(ccs)-1] {
  265. c.Close()
  266. }
  267. }()
  268. var svrID int64
  269. if err := verifyResultWithDelay(func() (bool, error) {
  270. ss, _ := channelz.GetServers(0)
  271. if len(ss) != 1 {
  272. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  273. }
  274. if len(ss[0].ListenSockets) != 1 {
  275. return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets))
  276. }
  277. ns, _ := channelz.GetServerSockets(ss[0].ID, 0)
  278. if len(ns) != num {
  279. return false, fmt.Errorf("there should be %d normal sockets not %d", num, len(ns))
  280. }
  281. svrID = ss[0].ID
  282. return true, nil
  283. }); err != nil {
  284. t.Fatal(err)
  285. }
  286. ccs[len(ccs)-1].Close()
  287. if err := verifyResultWithDelay(func() (bool, error) {
  288. ns, _ := channelz.GetServerSockets(svrID, 0)
  289. if len(ns) != num-1 {
  290. return false, fmt.Errorf("there should be %d normal sockets not %d", num-1, len(ns))
  291. }
  292. return true, nil
  293. }); err != nil {
  294. t.Fatal(err)
  295. }
  296. }
  297. func TestCZServerListenSocketDeletion(t *testing.T) {
  298. defer leakcheck.Check(t)
  299. channelz.NewChannelzStorage()
  300. s := grpc.NewServer()
  301. lis, err := net.Listen("tcp", "localhost:0")
  302. if err != nil {
  303. t.Fatalf("failed to listen: %v", err)
  304. }
  305. go s.Serve(lis)
  306. if err := verifyResultWithDelay(func() (bool, error) {
  307. ss, _ := channelz.GetServers(0)
  308. if len(ss) != 1 {
  309. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  310. }
  311. if len(ss[0].ListenSockets) != 1 {
  312. return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets))
  313. }
  314. return true, nil
  315. }); err != nil {
  316. t.Fatal(err)
  317. }
  318. lis.Close()
  319. if err := verifyResultWithDelay(func() (bool, error) {
  320. ss, _ := channelz.GetServers(0)
  321. if len(ss) != 1 {
  322. return false, fmt.Errorf("there should be 1 server, not %d", len(ss))
  323. }
  324. return true, nil
  325. }); err != nil {
  326. t.Fatal(err)
  327. }
  328. s.Stop()
  329. }
  330. type dummyChannel struct{}
  331. func (d *dummyChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
  332. return &channelz.ChannelInternalMetric{}
  333. }
  334. type dummySocket struct{}
  335. func (d *dummySocket) ChannelzMetric() *channelz.SocketInternalMetric {
  336. return &channelz.SocketInternalMetric{}
  337. }
  338. func TestCZRecusivelyDeletionOfEntry(t *testing.T) {
  339. // +--+TopChan+---+
  340. // | |
  341. // v v
  342. // +-+SubChan1+--+ SubChan2
  343. // | |
  344. // v v
  345. // Socket1 Socket2
  346. channelz.NewChannelzStorage()
  347. topChanID := channelz.RegisterChannel(&dummyChannel{}, 0, "")
  348. subChanID1 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
  349. subChanID2 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
  350. sktID1 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
  351. sktID2 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
  352. tcs, _ := channelz.GetTopChannels(0)
  353. if tcs == nil || len(tcs) != 1 {
  354. t.Fatalf("There should be one TopChannel entry")
  355. }
  356. if len(tcs[0].SubChans) != 2 {
  357. t.Fatalf("There should be two SubChannel entries")
  358. }
  359. sc := channelz.GetSubChannel(subChanID1)
  360. if sc == nil || len(sc.Sockets) != 2 {
  361. t.Fatalf("There should be two Socket entries")
  362. }
  363. channelz.RemoveEntry(topChanID)
  364. tcs, _ = channelz.GetTopChannels(0)
  365. if tcs == nil || len(tcs) != 1 {
  366. t.Fatalf("There should be one TopChannel entry")
  367. }
  368. channelz.RemoveEntry(subChanID1)
  369. channelz.RemoveEntry(subChanID2)
  370. tcs, _ = channelz.GetTopChannels(0)
  371. if tcs == nil || len(tcs) != 1 {
  372. t.Fatalf("There should be one TopChannel entry")
  373. }
  374. if len(tcs[0].SubChans) != 1 {
  375. t.Fatalf("There should be one SubChannel entry")
  376. }
  377. channelz.RemoveEntry(sktID1)
  378. channelz.RemoveEntry(sktID2)
  379. tcs, _ = channelz.GetTopChannels(0)
  380. if tcs != nil {
  381. t.Fatalf("There should be no TopChannel entry")
  382. }
  383. }
  384. func TestCZChannelMetrics(t *testing.T) {
  385. defer leakcheck.Check(t)
  386. channelz.NewChannelzStorage()
  387. e := tcpClearRREnv
  388. num := 3 // number of backends
  389. te := newTest(t, e)
  390. te.maxClientSendMsgSize = newInt(8)
  391. var svrAddrs []resolver.Address
  392. te.startServers(&testServer{security: e.security}, num)
  393. r, cleanup := manual.GenerateAndRegisterManualResolver()
  394. defer cleanup()
  395. for _, a := range te.srvAddrs {
  396. svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
  397. }
  398. r.InitialAddrs(svrAddrs)
  399. te.resolverScheme = r.Scheme()
  400. cc := te.clientConn()
  401. defer te.tearDown()
  402. tc := testpb.NewTestServiceClient(cc)
  403. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  404. t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  405. }
  406. const smallSize = 1
  407. const largeSize = 8
  408. largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
  409. if err != nil {
  410. t.Fatal(err)
  411. }
  412. req := &testpb.SimpleRequest{
  413. ResponseType: testpb.PayloadType_COMPRESSABLE,
  414. ResponseSize: int32(smallSize),
  415. Payload: largePayload,
  416. }
  417. if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
  418. t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  419. }
  420. stream, err := tc.FullDuplexCall(context.Background())
  421. if err != nil {
  422. t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  423. }
  424. defer stream.CloseSend()
  425. // Here, we just wait for all sockets to be up. In the future, if we implement
  426. // IDLE, we may need to make several rpc calls to create the sockets.
  427. if err := verifyResultWithDelay(func() (bool, error) {
  428. tcs, _ := channelz.GetTopChannels(0)
  429. if len(tcs) != 1 {
  430. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  431. }
  432. if len(tcs[0].SubChans) != num {
  433. return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans))
  434. }
  435. var cst, csu, cf int64
  436. for k := range tcs[0].SubChans {
  437. sc := channelz.GetSubChannel(k)
  438. if sc == nil {
  439. return false, fmt.Errorf("got <nil> subchannel")
  440. }
  441. cst += sc.ChannelData.CallsStarted
  442. csu += sc.ChannelData.CallsSucceeded
  443. cf += sc.ChannelData.CallsFailed
  444. }
  445. if cst != 3 {
  446. return false, fmt.Errorf("there should be 3 CallsStarted not %d", cst)
  447. }
  448. if csu != 1 {
  449. return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", csu)
  450. }
  451. if cf != 1 {
  452. return false, fmt.Errorf("there should be 1 CallsFailed not %d", cf)
  453. }
  454. if tcs[0].ChannelData.CallsStarted != 3 {
  455. return false, fmt.Errorf("there should be 3 CallsStarted not %d", tcs[0].ChannelData.CallsStarted)
  456. }
  457. if tcs[0].ChannelData.CallsSucceeded != 1 {
  458. return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", tcs[0].ChannelData.CallsSucceeded)
  459. }
  460. if tcs[0].ChannelData.CallsFailed != 1 {
  461. return false, fmt.Errorf("there should be 1 CallsFailed not %d", tcs[0].ChannelData.CallsFailed)
  462. }
  463. return true, nil
  464. }); err != nil {
  465. t.Fatal(err)
  466. }
  467. }
  468. func TestCZServerMetrics(t *testing.T) {
  469. defer leakcheck.Check(t)
  470. channelz.NewChannelzStorage()
  471. e := tcpClearRREnv
  472. te := newTest(t, e)
  473. te.maxServerReceiveMsgSize = newInt(8)
  474. te.startServer(&testServer{security: e.security})
  475. defer te.tearDown()
  476. cc := te.clientConn()
  477. tc := testpb.NewTestServiceClient(cc)
  478. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  479. t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  480. }
  481. const smallSize = 1
  482. const largeSize = 8
  483. largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
  484. if err != nil {
  485. t.Fatal(err)
  486. }
  487. req := &testpb.SimpleRequest{
  488. ResponseType: testpb.PayloadType_COMPRESSABLE,
  489. ResponseSize: int32(smallSize),
  490. Payload: largePayload,
  491. }
  492. if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
  493. t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  494. }
  495. stream, err := tc.FullDuplexCall(context.Background())
  496. if err != nil {
  497. t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  498. }
  499. defer stream.CloseSend()
  500. if err := verifyResultWithDelay(func() (bool, error) {
  501. ss, _ := channelz.GetServers(0)
  502. if len(ss) != 1 {
  503. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  504. }
  505. if ss[0].ServerData.CallsStarted != 3 {
  506. return false, fmt.Errorf("there should be 3 CallsStarted not %d", ss[0].ServerData.CallsStarted)
  507. }
  508. if ss[0].ServerData.CallsSucceeded != 1 {
  509. return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", ss[0].ServerData.CallsSucceeded)
  510. }
  511. if ss[0].ServerData.CallsFailed != 1 {
  512. return false, fmt.Errorf("there should be 1 CallsFailed not %d", ss[0].ServerData.CallsFailed)
  513. }
  514. return true, nil
  515. }); err != nil {
  516. t.Fatal(err)
  517. }
  518. }
  519. type testServiceClientWrapper struct {
  520. testpb.TestServiceClient
  521. mu sync.RWMutex
  522. streamsCreated int
  523. }
  524. func (t *testServiceClientWrapper) getCurrentStreamID() uint32 {
  525. t.mu.RLock()
  526. defer t.mu.RUnlock()
  527. return uint32(2*t.streamsCreated - 1)
  528. }
  529. func (t *testServiceClientWrapper) EmptyCall(ctx context.Context, in *testpb.Empty, opts ...grpc.CallOption) (*testpb.Empty, error) {
  530. t.mu.Lock()
  531. defer t.mu.Unlock()
  532. t.streamsCreated++
  533. return t.TestServiceClient.EmptyCall(ctx, in, opts...)
  534. }
  535. func (t *testServiceClientWrapper) UnaryCall(ctx context.Context, in *testpb.SimpleRequest, opts ...grpc.CallOption) (*testpb.SimpleResponse, error) {
  536. t.mu.Lock()
  537. defer t.mu.Unlock()
  538. t.streamsCreated++
  539. return t.TestServiceClient.UnaryCall(ctx, in, opts...)
  540. }
  541. func (t *testServiceClientWrapper) StreamingOutputCall(ctx context.Context, in *testpb.StreamingOutputCallRequest, opts ...grpc.CallOption) (testpb.TestService_StreamingOutputCallClient, error) {
  542. t.mu.Lock()
  543. defer t.mu.Unlock()
  544. t.streamsCreated++
  545. return t.TestServiceClient.StreamingOutputCall(ctx, in, opts...)
  546. }
  547. func (t *testServiceClientWrapper) StreamingInputCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_StreamingInputCallClient, error) {
  548. t.mu.Lock()
  549. defer t.mu.Unlock()
  550. t.streamsCreated++
  551. return t.TestServiceClient.StreamingInputCall(ctx, opts...)
  552. }
  553. func (t *testServiceClientWrapper) FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_FullDuplexCallClient, error) {
  554. t.mu.Lock()
  555. defer t.mu.Unlock()
  556. t.streamsCreated++
  557. return t.TestServiceClient.FullDuplexCall(ctx, opts...)
  558. }
  559. func (t *testServiceClientWrapper) HalfDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_HalfDuplexCallClient, error) {
  560. t.mu.Lock()
  561. defer t.mu.Unlock()
  562. t.streamsCreated++
  563. return t.TestServiceClient.HalfDuplexCall(ctx, opts...)
  564. }
  565. func doSuccessfulUnaryCall(tc testpb.TestServiceClient, t *testing.T) {
  566. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  567. t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  568. }
  569. }
  570. func doStreamingInputCallWithLargePayload(tc testpb.TestServiceClient, t *testing.T) {
  571. s, err := tc.StreamingInputCall(context.Background())
  572. if err != nil {
  573. t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want <nil>", err)
  574. }
  575. payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10000)
  576. if err != nil {
  577. t.Fatal(err)
  578. }
  579. s.Send(&testpb.StreamingInputCallRequest{Payload: payload})
  580. }
  581. func doServerSideFailedUnaryCall(tc testpb.TestServiceClient, t *testing.T) {
  582. const smallSize = 1
  583. const largeSize = 2000
  584. largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
  585. if err != nil {
  586. t.Fatal(err)
  587. }
  588. req := &testpb.SimpleRequest{
  589. ResponseType: testpb.PayloadType_COMPRESSABLE,
  590. ResponseSize: int32(smallSize),
  591. Payload: largePayload,
  592. }
  593. if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
  594. t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  595. }
  596. }
  597. func doClientSideInitiatedFailedStream(tc testpb.TestServiceClient, t *testing.T) {
  598. ctx, cancel := context.WithCancel(context.Background())
  599. stream, err := tc.FullDuplexCall(ctx)
  600. if err != nil {
  601. t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
  602. }
  603. const smallSize = 1
  604. smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
  605. if err != nil {
  606. t.Fatal(err)
  607. }
  608. sreq := &testpb.StreamingOutputCallRequest{
  609. ResponseType: testpb.PayloadType_COMPRESSABLE,
  610. ResponseParameters: []*testpb.ResponseParameters{
  611. {Size: smallSize},
  612. },
  613. Payload: smallPayload,
  614. }
  615. if err := stream.Send(sreq); err != nil {
  616. t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  617. }
  618. if _, err := stream.Recv(); err != nil {
  619. t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
  620. }
  621. // By canceling the call, the client will send rst_stream to end the call, and
  622. // the stream will failed as a result.
  623. cancel()
  624. }
  625. // This func is to be used to test client side counting of failed streams.
  626. func doServerSideInitiatedFailedStreamWithRSTStream(tc testpb.TestServiceClient, t *testing.T, l *listenerWrapper) {
  627. stream, err := tc.FullDuplexCall(context.Background())
  628. if err != nil {
  629. t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
  630. }
  631. const smallSize = 1
  632. smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
  633. if err != nil {
  634. t.Fatal(err)
  635. }
  636. sreq := &testpb.StreamingOutputCallRequest{
  637. ResponseType: testpb.PayloadType_COMPRESSABLE,
  638. ResponseParameters: []*testpb.ResponseParameters{
  639. {Size: smallSize},
  640. },
  641. Payload: smallPayload,
  642. }
  643. if err := stream.Send(sreq); err != nil {
  644. t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  645. }
  646. if _, err := stream.Recv(); err != nil {
  647. t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
  648. }
  649. rcw := l.getLastConn()
  650. if rcw != nil {
  651. rcw.writeRSTStream(tc.(*testServiceClientWrapper).getCurrentStreamID(), http2.ErrCodeCancel)
  652. }
  653. if _, err := stream.Recv(); err == nil {
  654. t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err)
  655. }
  656. }
  657. // this func is to be used to test client side counting of failed streams.
  658. func doServerSideInitiatedFailedStreamWithGoAway(tc testpb.TestServiceClient, t *testing.T, l *listenerWrapper) {
  659. // This call is just to keep the transport from shutting down (socket will be deleted
  660. // in this case, and we will not be able to get metrics).
  661. s, err := tc.FullDuplexCall(context.Background())
  662. if err != nil {
  663. t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
  664. }
  665. if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{
  666. {
  667. Size: 1,
  668. },
  669. }}); err != nil {
  670. t.Fatalf("s.Send() failed with error: %v", err)
  671. }
  672. if _, err := s.Recv(); err != nil {
  673. t.Fatalf("s.Recv() failed with error: %v", err)
  674. }
  675. s, err = tc.FullDuplexCall(context.Background())
  676. if err != nil {
  677. t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
  678. }
  679. if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{
  680. {
  681. Size: 1,
  682. },
  683. }}); err != nil {
  684. t.Fatalf("s.Send() failed with error: %v", err)
  685. }
  686. if _, err := s.Recv(); err != nil {
  687. t.Fatalf("s.Recv() failed with error: %v", err)
  688. }
  689. rcw := l.getLastConn()
  690. if rcw != nil {
  691. rcw.writeGoAway(tc.(*testServiceClientWrapper).getCurrentStreamID()-2, http2.ErrCodeCancel, []byte{})
  692. }
  693. if _, err := s.Recv(); err == nil {
  694. t.Fatalf("%v.Recv() = %v, want <non-nil>", s, err)
  695. }
  696. }
  697. // this func is to be used to test client side counting of failed streams.
  698. func doServerSideInitiatedFailedStreamWithClientBreakFlowControl(tc testpb.TestServiceClient, t *testing.T, dw *dialerWrapper) {
  699. stream, err := tc.FullDuplexCall(context.Background())
  700. if err != nil {
  701. t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
  702. }
  703. // sleep here to make sure header frame being sent before the data frame we write directly below.
  704. time.Sleep(10 * time.Millisecond)
  705. payload := make([]byte, 65537, 65537)
  706. dw.getRawConnWrapper().writeRawFrame(http2.FrameData, 0, tc.(*testServiceClientWrapper).getCurrentStreamID(), payload)
  707. if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
  708. t.Fatalf("%v.Recv() = %v, want error code: %v", stream, err, codes.ResourceExhausted)
  709. }
  710. }
  711. func doIdleCallToInvokeKeepAlive(tc testpb.TestServiceClient, t *testing.T) {
  712. ctx, cancel := context.WithCancel(context.Background())
  713. _, err := tc.FullDuplexCall(ctx)
  714. if err != nil {
  715. t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
  716. }
  717. // 2500ms allow for 2 keepalives (1000ms per round trip)
  718. time.Sleep(2500 * time.Millisecond)
  719. cancel()
  720. }
  721. func TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) {
  722. defer leakcheck.Check(t)
  723. channelz.NewChannelzStorage()
  724. e := tcpClearRREnv
  725. te := newTest(t, e)
  726. te.maxServerReceiveMsgSize = newInt(20)
  727. te.maxClientReceiveMsgSize = newInt(20)
  728. rcw := te.startServerWithConnControl(&testServer{security: e.security})
  729. defer te.tearDown()
  730. cc := te.clientConn()
  731. tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
  732. doSuccessfulUnaryCall(tc, t)
  733. var scID, skID int64
  734. if err := verifyResultWithDelay(func() (bool, error) {
  735. tchan, _ := channelz.GetTopChannels(0)
  736. if len(tchan) != 1 {
  737. return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
  738. }
  739. if len(tchan[0].SubChans) != 1 {
  740. return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
  741. }
  742. for scID = range tchan[0].SubChans {
  743. break
  744. }
  745. sc := channelz.GetSubChannel(scID)
  746. if sc == nil {
  747. return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", scID)
  748. }
  749. if len(sc.Sockets) != 1 {
  750. return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
  751. }
  752. for skID = range sc.Sockets {
  753. break
  754. }
  755. skt := channelz.GetSocket(skID)
  756. sktData := skt.SocketData
  757. if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
  758. return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.MessagesSent, sktData.MessagesReceived)
  759. }
  760. return true, nil
  761. }); err != nil {
  762. t.Fatal(err)
  763. }
  764. doServerSideFailedUnaryCall(tc, t)
  765. if err := verifyResultWithDelay(func() (bool, error) {
  766. skt := channelz.GetSocket(skID)
  767. sktData := skt.SocketData
  768. if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 1 {
  769. return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (2, 2, 2, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.MessagesSent, sktData.MessagesReceived)
  770. }
  771. return true, nil
  772. }); err != nil {
  773. t.Fatal(err)
  774. }
  775. doClientSideInitiatedFailedStream(tc, t)
  776. if err := verifyResultWithDelay(func() (bool, error) {
  777. skt := channelz.GetSocket(skID)
  778. sktData := skt.SocketData
  779. if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 3 || sktData.MessagesReceived != 2 {
  780. return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 3, 2), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
  781. }
  782. return true, nil
  783. }); err != nil {
  784. t.Fatal(err)
  785. }
  786. doServerSideInitiatedFailedStreamWithRSTStream(tc, t, rcw)
  787. if err := verifyResultWithDelay(func() (bool, error) {
  788. skt := channelz.GetSocket(skID)
  789. sktData := skt.SocketData
  790. if sktData.StreamsStarted != 4 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 2 || sktData.MessagesSent != 4 || sktData.MessagesReceived != 3 {
  791. return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (4, 2, 2, 4, 3), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
  792. }
  793. return true, nil
  794. }); err != nil {
  795. t.Fatal(err)
  796. }
  797. doServerSideInitiatedFailedStreamWithGoAway(tc, t, rcw)
  798. if err := verifyResultWithDelay(func() (bool, error) {
  799. skt := channelz.GetSocket(skID)
  800. sktData := skt.SocketData
  801. if sktData.StreamsStarted != 6 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 3 || sktData.MessagesSent != 6 || sktData.MessagesReceived != 5 {
  802. return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (6, 2, 3, 6, 5), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
  803. }
  804. return true, nil
  805. }); err != nil {
  806. t.Fatal(err)
  807. }
  808. }
  809. // This test is to complete TestCZClientSocketMetricsStreamsAndMessagesCount and
  810. // TestCZServerSocketMetricsStreamsAndMessagesCount by adding the test case of
  811. // server sending RST_STREAM to client due to client side flow control violation.
  812. // It is separated from other cases due to setup incompatibly, i.e. max receive
  813. // size violation will mask flow control violation.
  814. func TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testing.T) {
  815. defer leakcheck.Check(t)
  816. channelz.NewChannelzStorage()
  817. e := tcpClearRREnv
  818. te := newTest(t, e)
  819. te.serverInitialWindowSize = 65536
  820. // Avoid overflowing connection level flow control window, which will lead to
  821. // transport being closed.
  822. te.serverInitialConnWindowSize = 65536 * 2
  823. te.startServer(&testServer{security: e.security})
  824. defer te.tearDown()
  825. cc, dw := te.clientConnWithConnControl()
  826. tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
  827. doServerSideInitiatedFailedStreamWithClientBreakFlowControl(tc, t, dw)
  828. if err := verifyResultWithDelay(func() (bool, error) {
  829. tchan, _ := channelz.GetTopChannels(0)
  830. if len(tchan) != 1 {
  831. return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
  832. }
  833. if len(tchan[0].SubChans) != 1 {
  834. return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
  835. }
  836. var id int64
  837. for id = range tchan[0].SubChans {
  838. break
  839. }
  840. sc := channelz.GetSubChannel(id)
  841. if sc == nil {
  842. return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
  843. }
  844. if len(sc.Sockets) != 1 {
  845. return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
  846. }
  847. for id = range sc.Sockets {
  848. break
  849. }
  850. skt := channelz.GetSocket(id)
  851. sktData := skt.SocketData
  852. if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 {
  853. return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed)
  854. }
  855. ss, _ := channelz.GetServers(0)
  856. if len(ss) != 1 {
  857. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  858. }
  859. ns, _ := channelz.GetServerSockets(ss[0].ID, 0)
  860. if len(ns) != 1 {
  861. return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns))
  862. }
  863. sktData = ns[0].SocketData
  864. if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 {
  865. return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed)
  866. }
  867. return true, nil
  868. }); err != nil {
  869. t.Fatal(err)
  870. }
  871. }
  872. func TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
  873. defer leakcheck.Check(t)
  874. channelz.NewChannelzStorage()
  875. e := tcpClearRREnv
  876. te := newTest(t, e)
  877. // disable BDP
  878. te.serverInitialWindowSize = 65536
  879. te.serverInitialConnWindowSize = 65536
  880. te.clientInitialWindowSize = 65536
  881. te.clientInitialConnWindowSize = 65536
  882. te.startServer(&testServer{security: e.security})
  883. defer te.tearDown()
  884. cc := te.clientConn()
  885. tc := testpb.NewTestServiceClient(cc)
  886. for i := 0; i < 10; i++ {
  887. doSuccessfulUnaryCall(tc, t)
  888. }
  889. var cliSktID, svrSktID int64
  890. if err := verifyResultWithDelay(func() (bool, error) {
  891. tchan, _ := channelz.GetTopChannels(0)
  892. if len(tchan) != 1 {
  893. return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
  894. }
  895. if len(tchan[0].SubChans) != 1 {
  896. return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
  897. }
  898. var id int64
  899. for id = range tchan[0].SubChans {
  900. break
  901. }
  902. sc := channelz.GetSubChannel(id)
  903. if sc == nil {
  904. return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
  905. }
  906. if len(sc.Sockets) != 1 {
  907. return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
  908. }
  909. for id = range sc.Sockets {
  910. break
  911. }
  912. skt := channelz.GetSocket(id)
  913. sktData := skt.SocketData
  914. // 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
  915. if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
  916. return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  917. }
  918. ss, _ := channelz.GetServers(0)
  919. if len(ss) != 1 {
  920. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  921. }
  922. ns, _ := channelz.GetServerSockets(ss[0].ID, 0)
  923. sktData = ns[0].SocketData
  924. if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
  925. return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  926. }
  927. cliSktID, svrSktID = id, ss[0].ID
  928. return true, nil
  929. }); err != nil {
  930. t.Fatal(err)
  931. }
  932. doStreamingInputCallWithLargePayload(tc, t)
  933. if err := verifyResultWithDelay(func() (bool, error) {
  934. skt := channelz.GetSocket(cliSktID)
  935. sktData := skt.SocketData
  936. // Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
  937. // Remote: 65536 - 5 (Length-Prefixed-Message size) * 10 - 10011 = 55475
  938. if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 55475 {
  939. return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 55475), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  940. }
  941. ss, _ := channelz.GetServers(0)
  942. if len(ss) != 1 {
  943. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  944. }
  945. ns, _ := channelz.GetServerSockets(svrSktID, 0)
  946. sktData = ns[0].SocketData
  947. if sktData.LocalFlowControlWindow != 55475 || sktData.RemoteFlowControlWindow != 65486 {
  948. return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (55475, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  949. }
  950. return true, nil
  951. }); err != nil {
  952. t.Fatal(err)
  953. }
  954. // triggers transport flow control window update on server side, since unacked
  955. // bytes should be larger than limit now. i.e. 50 + 20022 > 65536/4.
  956. doStreamingInputCallWithLargePayload(tc, t)
  957. if err := verifyResultWithDelay(func() (bool, error) {
  958. skt := channelz.GetSocket(cliSktID)
  959. sktData := skt.SocketData
  960. // Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
  961. // Remote: 65536
  962. if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65536 {
  963. return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 65536), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  964. }
  965. ss, _ := channelz.GetServers(0)
  966. if len(ss) != 1 {
  967. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  968. }
  969. ns, _ := channelz.GetServerSockets(svrSktID, 0)
  970. sktData = ns[0].SocketData
  971. if sktData.LocalFlowControlWindow != 65536 || sktData.RemoteFlowControlWindow != 65486 {
  972. return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  973. }
  974. return true, nil
  975. }); err != nil {
  976. t.Fatal(err)
  977. }
  978. }
  979. func TestCZClientSocketMetricsKeepAlive(t *testing.T) {
  980. defer leakcheck.Check(t)
  981. channelz.NewChannelzStorage()
  982. e := tcpClearRREnv
  983. te := newTest(t, e)
  984. te.cliKeepAlive = &keepalive.ClientParameters{Time: 500 * time.Millisecond, Timeout: 500 * time.Millisecond}
  985. te.startServer(&testServer{security: e.security})
  986. defer te.tearDown()
  987. cc := te.clientConn()
  988. tc := testpb.NewTestServiceClient(cc)
  989. doIdleCallToInvokeKeepAlive(tc, t)
  990. if err := verifyResultWithDelay(func() (bool, error) {
  991. tchan, _ := channelz.GetTopChannels(0)
  992. if len(tchan) != 1 {
  993. return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
  994. }
  995. if len(tchan[0].SubChans) != 1 {
  996. return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
  997. }
  998. var id int64
  999. for id = range tchan[0].SubChans {
  1000. break
  1001. }
  1002. sc := channelz.GetSubChannel(id)
  1003. if sc == nil {
  1004. return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
  1005. }
  1006. if len(sc.Sockets) != 1 {
  1007. return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
  1008. }
  1009. for id = range sc.Sockets {
  1010. break
  1011. }
  1012. skt := channelz.GetSocket(id)
  1013. if skt.SocketData.KeepAlivesSent != 2 { // doIdleCallToInvokeKeepAlive func is set up to send 2 KeepAlives.
  1014. return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", skt.SocketData.KeepAlivesSent)
  1015. }
  1016. return true, nil
  1017. }); err != nil {
  1018. t.Fatal(err)
  1019. }
  1020. }
  1021. func TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) {
  1022. defer leakcheck.Check(t)
  1023. channelz.NewChannelzStorage()
  1024. e := tcpClearRREnv
  1025. te := newTest(t, e)
  1026. te.maxServerReceiveMsgSize = newInt(20)
  1027. te.maxClientReceiveMsgSize = newInt(20)
  1028. te.startServer(&testServer{security: e.security})
  1029. defer te.tearDown()
  1030. cc, _ := te.clientConnWithConnControl()
  1031. tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
  1032. var svrID int64
  1033. if err := verifyResultWithDelay(func() (bool, error) {
  1034. ss, _ := channelz.GetServers(0)
  1035. if len(ss) != 1 {
  1036. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  1037. }
  1038. svrID = ss[0].ID
  1039. return true, nil
  1040. }); err != nil {
  1041. t.Fatal(err)
  1042. }
  1043. doSuccessfulUnaryCall(tc, t)
  1044. if err := verifyResultWithDelay(func() (bool, error) {
  1045. ns, _ := channelz.GetServerSockets(svrID, 0)
  1046. sktData := ns[0].SocketData
  1047. if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
  1048. return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
  1049. }
  1050. return true, nil
  1051. }); err != nil {
  1052. t.Fatal(err)
  1053. }
  1054. doServerSideFailedUnaryCall(tc, t)
  1055. if err := verifyResultWithDelay(func() (bool, error) {
  1056. ns, _ := channelz.GetServerSockets(svrID, 0)
  1057. sktData := ns[0].SocketData
  1058. if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
  1059. return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (2, 2, 0, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
  1060. }
  1061. return true, nil
  1062. }); err != nil {
  1063. t.Fatal(err)
  1064. }
  1065. doClientSideInitiatedFailedStream(tc, t)
  1066. if err := verifyResultWithDelay(func() (bool, error) {
  1067. ns, _ := channelz.GetServerSockets(svrID, 0)
  1068. sktData := ns[0].SocketData
  1069. if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 2 {
  1070. return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 2, 2), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
  1071. }
  1072. return true, nil
  1073. }); err != nil {
  1074. t.Fatal(err)
  1075. }
  1076. }
  1077. func TestCZServerSocketMetricsKeepAlive(t *testing.T) {
  1078. defer leakcheck.Check(t)
  1079. channelz.NewChannelzStorage()
  1080. e := tcpClearRREnv
  1081. te := newTest(t, e)
  1082. te.svrKeepAlive = &keepalive.ServerParameters{Time: 500 * time.Millisecond, Timeout: 500 * time.Millisecond}
  1083. te.startServer(&testServer{security: e.security})
  1084. defer te.tearDown()
  1085. cc := te.clientConn()
  1086. tc := testpb.NewTestServiceClient(cc)
  1087. doIdleCallToInvokeKeepAlive(tc, t)
  1088. if err := verifyResultWithDelay(func() (bool, error) {
  1089. ss, _ := channelz.GetServers(0)
  1090. if len(ss) != 1 {
  1091. return false, fmt.Errorf("there should be one server, not %d", len(ss))
  1092. }
  1093. ns, _ := channelz.GetServerSockets(ss[0].ID, 0)
  1094. if len(ns) != 1 {
  1095. return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns))
  1096. }
  1097. if ns[0].SocketData.KeepAlivesSent != 2 { // doIdleCallToInvokeKeepAlive func is set up to send 2 KeepAlives.
  1098. return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", ns[0].SocketData.KeepAlivesSent)
  1099. }
  1100. return true, nil
  1101. }); err != nil {
  1102. t.Fatal(err)
  1103. }
  1104. }
  1105. var cipherSuites = []string{
  1106. "TLS_RSA_WITH_RC4_128_SHA",
  1107. "TLS_RSA_WITH_3DES_EDE_CBC_SHA",
  1108. "TLS_RSA_WITH_AES_128_CBC_SHA",
  1109. "TLS_RSA_WITH_AES_256_CBC_SHA",
  1110. "TLS_RSA_WITH_AES_128_GCM_SHA256",
  1111. "TLS_RSA_WITH_AES_256_GCM_SHA384",
  1112. "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA",
  1113. "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
  1114. "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA",
  1115. "TLS_ECDHE_RSA_WITH_RC4_128_SHA",
  1116. "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA",
  1117. "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA",
  1118. "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA",
  1119. "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
  1120. "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
  1121. "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
  1122. "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
  1123. "TLS_FALLBACK_SCSV",
  1124. "TLS_RSA_WITH_AES_128_CBC_SHA256",
  1125. "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256",
  1126. "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
  1127. "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305",
  1128. "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305",
  1129. }
  1130. func TestCZSocketGetSecurityValueTLS(t *testing.T) {
  1131. defer leakcheck.Check(t)
  1132. channelz.NewChannelzStorage()
  1133. e := tcpTLSRREnv
  1134. te := newTest(t, e)
  1135. te.startServer(&testServer{security: e.security})
  1136. defer te.tearDown()
  1137. te.clientConn()
  1138. if err := verifyResultWithDelay(func() (bool, error) {
  1139. tchan, _ := channelz.GetTopChannels(0)
  1140. if len(tchan) != 1 {
  1141. return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
  1142. }
  1143. if len(tchan[0].SubChans) != 1 {
  1144. return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
  1145. }
  1146. var id int64
  1147. for id = range tchan[0].SubChans {
  1148. break
  1149. }
  1150. sc := channelz.GetSubChannel(id)
  1151. if sc == nil {
  1152. return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
  1153. }
  1154. if len(sc.Sockets) != 1 {
  1155. return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
  1156. }
  1157. for id = range sc.Sockets {
  1158. break
  1159. }
  1160. skt := channelz.GetSocket(id)
  1161. cert, _ := tls.LoadX509KeyPair(testdata.Path("server1.pem"), testdata.Path("server1.key"))
  1162. securityVal, ok := skt.SocketData.Security.(*credentials.TLSChannelzSecurityValue)
  1163. if !ok {
  1164. return false, fmt.Errorf("the SocketData.Security is of type: %T, want: *credentials.TLSChannelzSecurityValue", skt.SocketData.Security)
  1165. }
  1166. if !reflect.DeepEqual(securityVal.RemoteCertificate, cert.Certificate[0]) {
  1167. return false, fmt.Errorf("SocketData.Security.RemoteCertificate got: %v, want: %v", securityVal.RemoteCertificate, cert.Certificate[0])
  1168. }
  1169. for _, v := range cipherSuites {
  1170. if v == securityVal.StandardName {
  1171. return true, nil
  1172. }
  1173. }
  1174. return false, fmt.Errorf("SocketData.Security.StandardName got: %v, want it to be one of %v ", securityVal.StandardName, cipherSuites)
  1175. }); err != nil {
  1176. t.Fatal(err)
  1177. }
  1178. }
  1179. func TestCZChannelTraceCreationDeletion(t *testing.T) {
  1180. defer leakcheck.Check(t)
  1181. channelz.NewChannelzStorage()
  1182. e := tcpClearRREnv
  1183. // avoid calling API to set balancer type, which will void service config's change of balancer.
  1184. e.balancer = ""
  1185. te := newTest(t, e)
  1186. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1187. defer cleanup()
  1188. resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
  1189. r.InitialAddrs(resolvedAddrs)
  1190. te.resolverScheme = r.Scheme()
  1191. te.clientConn()
  1192. defer te.tearDown()
  1193. var nestedConn int64
  1194. if err := verifyResultWithDelay(func() (bool, error) {
  1195. tcs, _ := channelz.GetTopChannels(0)
  1196. if len(tcs) != 1 {
  1197. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1198. }
  1199. if len(tcs[0].NestedChans) != 1 {
  1200. return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
  1201. }
  1202. for k := range tcs[0].NestedChans {
  1203. nestedConn = k
  1204. }
  1205. for _, e := range tcs[0].Trace.Events {
  1206. if e.RefID == nestedConn && e.RefType != channelz.RefChannel {
  1207. return false, fmt.Errorf("nested channel trace event shoud have RefChannel as RefType")
  1208. }
  1209. }
  1210. ncm := channelz.GetChannel(nestedConn)
  1211. if ncm.Trace == nil {
  1212. return false, fmt.Errorf("trace for nested channel should not be empty")
  1213. }
  1214. if len(ncm.Trace.Events) == 0 {
  1215. return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
  1216. }
  1217. if ncm.Trace.Events[0].Desc != "Channel Created" {
  1218. return false, fmt.Errorf("the first trace event should be \"Channel Created\", not %q", ncm.Trace.Events[0].Desc)
  1219. }
  1220. return true, nil
  1221. }); err != nil {
  1222. t.Fatal(err)
  1223. }
  1224. r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
  1225. r.NewAddress([]resolver.Address{{Addr: "127.0.0.1:0"}})
  1226. // wait for the shutdown of grpclb balancer
  1227. if err := verifyResultWithDelay(func() (bool, error) {
  1228. tcs, _ := channelz.GetTopChannels(0)
  1229. if len(tcs) != 1 {
  1230. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1231. }
  1232. if len(tcs[0].NestedChans) != 0 {
  1233. return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
  1234. }
  1235. ncm := channelz.GetChannel(nestedConn)
  1236. if ncm == nil {
  1237. return false, fmt.Errorf("nested channel should still exist due to parent's trace reference")
  1238. }
  1239. if ncm.Trace == nil {
  1240. return false, fmt.Errorf("trace for nested channel should not be empty")
  1241. }
  1242. if len(ncm.Trace.Events) == 0 {
  1243. return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
  1244. }
  1245. if ncm.Trace.Events[len(ncm.Trace.Events)-1].Desc != "Channel Deleted" {
  1246. return false, fmt.Errorf("the first trace event should be \"Channel Deleted\", not %q", ncm.Trace.Events[0].Desc)
  1247. }
  1248. return true, nil
  1249. }); err != nil {
  1250. t.Fatal(err)
  1251. }
  1252. }
  1253. func TestCZSubChannelTraceCreationDeletion(t *testing.T) {
  1254. defer leakcheck.Check(t)
  1255. channelz.NewChannelzStorage()
  1256. e := tcpClearRREnv
  1257. te := newTest(t, e)
  1258. te.startServer(&testServer{security: e.security})
  1259. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1260. defer cleanup()
  1261. r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
  1262. te.resolverScheme = r.Scheme()
  1263. te.clientConn()
  1264. defer te.tearDown()
  1265. var subConn int64
  1266. // Here, we just wait for all sockets to be up. In the future, if we implement
  1267. // IDLE, we may need to make several rpc calls to create the sockets.
  1268. if err := verifyResultWithDelay(func() (bool, error) {
  1269. tcs, _ := channelz.GetTopChannels(0)
  1270. if len(tcs) != 1 {
  1271. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1272. }
  1273. if len(tcs[0].SubChans) != 1 {
  1274. return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
  1275. }
  1276. for k := range tcs[0].SubChans {
  1277. subConn = k
  1278. }
  1279. for _, e := range tcs[0].Trace.Events {
  1280. if e.RefID == subConn && e.RefType != channelz.RefSubChannel {
  1281. return false, fmt.Errorf("subchannel trace event shoud have RefType to be RefSubChannel")
  1282. }
  1283. }
  1284. scm := channelz.GetSubChannel(subConn)
  1285. if scm == nil {
  1286. return false, fmt.Errorf("subChannel does not exist")
  1287. }
  1288. if scm.Trace == nil {
  1289. return false, fmt.Errorf("trace for subChannel should not be empty")
  1290. }
  1291. if len(scm.Trace.Events) == 0 {
  1292. return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
  1293. }
  1294. if scm.Trace.Events[0].Desc != "Subchannel Created" {
  1295. return false, fmt.Errorf("the first trace event should be \"Subchannel Created\", not %q", scm.Trace.Events[0].Desc)
  1296. }
  1297. return true, nil
  1298. }); err != nil {
  1299. t.Fatal(err)
  1300. }
  1301. r.NewAddress([]resolver.Address{})
  1302. if err := verifyResultWithDelay(func() (bool, error) {
  1303. tcs, _ := channelz.GetTopChannels(0)
  1304. if len(tcs) != 1 {
  1305. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1306. }
  1307. if len(tcs[0].SubChans) != 0 {
  1308. return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans))
  1309. }
  1310. scm := channelz.GetSubChannel(subConn)
  1311. if scm == nil {
  1312. return false, fmt.Errorf("subChannel should still exist due to parent's trace reference")
  1313. }
  1314. if scm.Trace == nil {
  1315. return false, fmt.Errorf("trace for SubChannel should not be empty")
  1316. }
  1317. if len(scm.Trace.Events) == 0 {
  1318. return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
  1319. }
  1320. if got, want := scm.Trace.Events[len(scm.Trace.Events)-1].Desc, "Subchannel Deleted"; got != want {
  1321. return false, fmt.Errorf("the last trace event should be %q, not %q", want, got)
  1322. }
  1323. return true, nil
  1324. }); err != nil {
  1325. t.Fatal(err)
  1326. }
  1327. }
  1328. func TestCZChannelAddressResolutionChange(t *testing.T) {
  1329. defer leakcheck.Check(t)
  1330. channelz.NewChannelzStorage()
  1331. e := tcpClearRREnv
  1332. e.balancer = ""
  1333. te := newTest(t, e)
  1334. te.startServer(&testServer{security: e.security})
  1335. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1336. defer cleanup()
  1337. r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
  1338. te.resolverScheme = r.Scheme()
  1339. te.clientConn()
  1340. defer te.tearDown()
  1341. var cid int64
  1342. // Here, we just wait for all sockets to be up. In the future, if we implement
  1343. // IDLE, we may need to make several rpc calls to create the sockets.
  1344. if err := verifyResultWithDelay(func() (bool, error) {
  1345. tcs, _ := channelz.GetTopChannels(0)
  1346. if len(tcs) != 1 {
  1347. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1348. }
  1349. cid = tcs[0].ID
  1350. for i := len(tcs[0].Trace.Events) - 1; i >= 0; i-- {
  1351. if tcs[0].Trace.Events[i].Desc == fmt.Sprintf("Resolver returns a non-empty address list (previous one was empty) %q", te.srvAddr) {
  1352. break
  1353. }
  1354. if i == 0 {
  1355. return false, fmt.Errorf("events do not contain expected address resolution from empty address state")
  1356. }
  1357. }
  1358. return true, nil
  1359. }); err != nil {
  1360. t.Fatal(err)
  1361. }
  1362. r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
  1363. if err := verifyResultWithDelay(func() (bool, error) {
  1364. cm := channelz.GetChannel(cid)
  1365. for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
  1366. if cm.Trace.Events[i].Desc == fmt.Sprintf("Channel switches to new LB policy %q", roundrobin.Name) {
  1367. break
  1368. }
  1369. if i == 0 {
  1370. return false, fmt.Errorf("events do not contain expected address resolution change of LB policy")
  1371. }
  1372. }
  1373. return true, nil
  1374. }); err != nil {
  1375. t.Fatal(err)
  1376. }
  1377. newSc := `{
  1378. "methodConfig": [
  1379. {
  1380. "name": [
  1381. {
  1382. "service": "grpc.testing.TestService",
  1383. "method": "EmptyCall"
  1384. },
  1385. ],
  1386. "waitForReady": false,
  1387. "timeout": ".001s"
  1388. }
  1389. ]
  1390. }`
  1391. r.NewServiceConfig(newSc)
  1392. if err := verifyResultWithDelay(func() (bool, error) {
  1393. cm := channelz.GetChannel(cid)
  1394. for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
  1395. if cm.Trace.Events[i].Desc == fmt.Sprintf("Channel has a new service config \"%s\"", newSc) {
  1396. break
  1397. }
  1398. if i == 0 {
  1399. return false, fmt.Errorf("events do not contain expected address resolution of new service config")
  1400. }
  1401. }
  1402. return true, nil
  1403. }); err != nil {
  1404. t.Fatal(err)
  1405. }
  1406. r.NewAddress([]resolver.Address{})
  1407. if err := verifyResultWithDelay(func() (bool, error) {
  1408. cm := channelz.GetChannel(cid)
  1409. for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
  1410. if cm.Trace.Events[i].Desc == "Resolver returns an empty address list" {
  1411. break
  1412. }
  1413. if i == 0 {
  1414. return false, fmt.Errorf("events do not contain expected address resolution of empty address")
  1415. }
  1416. }
  1417. return true, nil
  1418. }); err != nil {
  1419. t.Fatal(err)
  1420. }
  1421. }
  1422. func TestCZSubChannelPickedNewAddress(t *testing.T) {
  1423. defer leakcheck.Check(t)
  1424. channelz.NewChannelzStorage()
  1425. e := tcpClearRREnv
  1426. e.balancer = ""
  1427. te := newTest(t, e)
  1428. te.startServers(&testServer{security: e.security}, 3)
  1429. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1430. defer cleanup()
  1431. var svrAddrs []resolver.Address
  1432. for _, a := range te.srvAddrs {
  1433. svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
  1434. }
  1435. r.InitialAddrs(svrAddrs)
  1436. te.resolverScheme = r.Scheme()
  1437. cc := te.clientConn()
  1438. defer te.tearDown()
  1439. tc := testpb.NewTestServiceClient(cc)
  1440. // make sure the connection is up
  1441. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  1442. defer cancel()
  1443. if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  1444. t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  1445. }
  1446. te.srvs[0].Stop()
  1447. te.srvs[1].Stop()
  1448. // Here, we just wait for all sockets to be up. In the future, if we implement
  1449. // IDLE, we may need to make several rpc calls to create the sockets.
  1450. if err := verifyResultWithDelay(func() (bool, error) {
  1451. tcs, _ := channelz.GetTopChannels(0)
  1452. if len(tcs) != 1 {
  1453. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1454. }
  1455. if len(tcs[0].SubChans) != 1 {
  1456. return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
  1457. }
  1458. var subConn int64
  1459. for k := range tcs[0].SubChans {
  1460. subConn = k
  1461. }
  1462. scm := channelz.GetSubChannel(subConn)
  1463. if scm.Trace == nil {
  1464. return false, fmt.Errorf("trace for SubChannel should not be empty")
  1465. }
  1466. if len(scm.Trace.Events) == 0 {
  1467. return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
  1468. }
  1469. for i := len(scm.Trace.Events) - 1; i >= 0; i-- {
  1470. if scm.Trace.Events[i].Desc == fmt.Sprintf("Subchannel picks a new address %q to connect", te.srvAddrs[2]) {
  1471. break
  1472. }
  1473. if i == 0 {
  1474. return false, fmt.Errorf("events do not contain expected address resolution of subchannel picked new address")
  1475. }
  1476. }
  1477. return true, nil
  1478. }); err != nil {
  1479. t.Fatal(err)
  1480. }
  1481. }
  1482. func TestCZSubChannelConnectivityState(t *testing.T) {
  1483. defer leakcheck.Check(t)
  1484. channelz.NewChannelzStorage()
  1485. e := tcpClearRREnv
  1486. te := newTest(t, e)
  1487. te.startServer(&testServer{security: e.security})
  1488. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1489. defer cleanup()
  1490. r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
  1491. te.resolverScheme = r.Scheme()
  1492. te.customDialOptions = []grpc.DialOption{grpc.WithWaitForHandshake()}
  1493. cc := te.clientConn()
  1494. defer te.tearDown()
  1495. tc := testpb.NewTestServiceClient(cc)
  1496. // make sure the connection is up
  1497. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  1498. defer cancel()
  1499. if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  1500. t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  1501. }
  1502. var subConn int64
  1503. te.srv.Stop()
  1504. if err := verifyResultWithDelay(func() (bool, error) {
  1505. // we need to obtain the SubChannel id before it gets deleted from Channel's children list (due
  1506. // to effect of r.NewAddress([]resolver.Address{}))
  1507. if subConn == 0 {
  1508. tcs, _ := channelz.GetTopChannels(0)
  1509. if len(tcs) != 1 {
  1510. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1511. }
  1512. if len(tcs[0].SubChans) != 1 {
  1513. return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
  1514. }
  1515. for k := range tcs[0].SubChans {
  1516. // get the SubChannel id for further trace inquiry.
  1517. subConn = k
  1518. }
  1519. }
  1520. scm := channelz.GetSubChannel(subConn)
  1521. if scm == nil {
  1522. return false, fmt.Errorf("subChannel should still exist due to parent's trace reference")
  1523. }
  1524. if scm.Trace == nil {
  1525. return false, fmt.Errorf("trace for SubChannel should not be empty")
  1526. }
  1527. if len(scm.Trace.Events) == 0 {
  1528. return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
  1529. }
  1530. var ready, connecting, transient, shutdown int
  1531. for _, e := range scm.Trace.Events {
  1532. if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) {
  1533. transient++
  1534. }
  1535. }
  1536. // Make sure the SubChannel has already seen transient failure before shutting it down through
  1537. // r.NewAddress([]resolver.Address{}).
  1538. if transient == 0 {
  1539. return false, fmt.Errorf("transient failure has not happened on SubChannel yet")
  1540. }
  1541. transient = 0
  1542. r.NewAddress([]resolver.Address{})
  1543. for _, e := range scm.Trace.Events {
  1544. if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) {
  1545. ready++
  1546. }
  1547. if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Connecting) {
  1548. connecting++
  1549. }
  1550. if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) {
  1551. transient++
  1552. }
  1553. if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Shutdown) {
  1554. shutdown++
  1555. }
  1556. }
  1557. // example:
  1558. // Subchannel Created
  1559. // Subchannel's connectivity state changed to CONNECTING
  1560. // Subchannel picked a new address: "localhost:36011"
  1561. // Subchannel's connectivity state changed to READY
  1562. // Subchannel's connectivity state changed to TRANSIENT_FAILURE
  1563. // Subchannel's connectivity state changed to CONNECTING
  1564. // Subchannel picked a new address: "localhost:36011"
  1565. // Subchannel's connectivity state changed to SHUTDOWN
  1566. // Subchannel Deleted
  1567. if ready != 1 || connecting < 1 || transient < 1 || shutdown != 1 {
  1568. return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, shutdown = %d, want: 1, >=1, >=1, 1", ready, connecting, transient, shutdown)
  1569. }
  1570. return true, nil
  1571. }); err != nil {
  1572. t.Fatal(err)
  1573. }
  1574. }
  1575. func TestCZChannelConnectivityState(t *testing.T) {
  1576. defer leakcheck.Check(t)
  1577. channelz.NewChannelzStorage()
  1578. e := tcpClearRREnv
  1579. te := newTest(t, e)
  1580. te.startServer(&testServer{security: e.security})
  1581. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1582. defer cleanup()
  1583. r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
  1584. te.resolverScheme = r.Scheme()
  1585. te.customDialOptions = []grpc.DialOption{grpc.WithWaitForHandshake()}
  1586. cc := te.clientConn()
  1587. defer te.tearDown()
  1588. tc := testpb.NewTestServiceClient(cc)
  1589. // make sure the connection is up
  1590. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  1591. defer cancel()
  1592. if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  1593. t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  1594. }
  1595. te.srv.Stop()
  1596. if err := verifyResultWithDelay(func() (bool, error) {
  1597. tcs, _ := channelz.GetTopChannels(0)
  1598. if len(tcs) != 1 {
  1599. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1600. }
  1601. var ready, connecting, transient int
  1602. for _, e := range tcs[0].Trace.Events {
  1603. if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready) {
  1604. ready++
  1605. }
  1606. if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Connecting) {
  1607. connecting++
  1608. }
  1609. if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.TransientFailure) {
  1610. transient++
  1611. }
  1612. }
  1613. // example:
  1614. // Channel Created
  1615. // Adressses resolved (from empty address state): "localhost:40467"
  1616. // SubChannel (id: 4[]) Created
  1617. // Channel's connectivity state changed to CONNECTING
  1618. // Channel's connectivity state changed to READY
  1619. // Channel's connectivity state changed to TRANSIENT_FAILURE
  1620. // Channel's connectivity state changed to CONNECTING
  1621. // Channel's connectivity state changed to TRANSIENT_FAILURE
  1622. if ready != 1 || connecting < 1 || transient < 1 {
  1623. return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, want: 1, >=1, >=1", ready, connecting, transient)
  1624. }
  1625. return true, nil
  1626. }); err != nil {
  1627. t.Fatal(err)
  1628. }
  1629. }
  1630. func TestCZTraceOverwriteChannelDeletion(t *testing.T) {
  1631. defer leakcheck.Check(t)
  1632. channelz.NewChannelzStorage()
  1633. e := tcpClearRREnv
  1634. // avoid calling API to set balancer type, which will void service config's change of balancer.
  1635. e.balancer = ""
  1636. te := newTest(t, e)
  1637. channelz.SetMaxTraceEntry(1)
  1638. defer channelz.ResetMaxTraceEntryToDefault()
  1639. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1640. defer cleanup()
  1641. resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
  1642. r.InitialAddrs(resolvedAddrs)
  1643. te.resolverScheme = r.Scheme()
  1644. te.clientConn()
  1645. defer te.tearDown()
  1646. var nestedConn int64
  1647. if err := verifyResultWithDelay(func() (bool, error) {
  1648. tcs, _ := channelz.GetTopChannels(0)
  1649. if len(tcs) != 1 {
  1650. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1651. }
  1652. if len(tcs[0].NestedChans) != 1 {
  1653. return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
  1654. }
  1655. for k := range tcs[0].NestedChans {
  1656. nestedConn = k
  1657. }
  1658. return true, nil
  1659. }); err != nil {
  1660. t.Fatal(err)
  1661. }
  1662. r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
  1663. r.NewAddress([]resolver.Address{{Addr: "127.0.0.1:0"}})
  1664. // wait for the shutdown of grpclb balancer
  1665. if err := verifyResultWithDelay(func() (bool, error) {
  1666. tcs, _ := channelz.GetTopChannels(0)
  1667. if len(tcs) != 1 {
  1668. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1669. }
  1670. if len(tcs[0].NestedChans) != 0 {
  1671. return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
  1672. }
  1673. return true, nil
  1674. }); err != nil {
  1675. t.Fatal(err)
  1676. }
  1677. // verify that the nested channel no longer exist due to trace referencing it got overwritten.
  1678. if err := verifyResultWithDelay(func() (bool, error) {
  1679. cm := channelz.GetChannel(nestedConn)
  1680. if cm != nil {
  1681. return false, fmt.Errorf("nested channel should have been deleted since its parent's trace should not contain any reference to it anymore")
  1682. }
  1683. return true, nil
  1684. }); err != nil {
  1685. t.Fatal(err)
  1686. }
  1687. }
  1688. func TestCZTraceOverwriteSubChannelDeletion(t *testing.T) {
  1689. defer leakcheck.Check(t)
  1690. channelz.NewChannelzStorage()
  1691. e := tcpClearRREnv
  1692. te := newTest(t, e)
  1693. channelz.SetMaxTraceEntry(1)
  1694. defer channelz.ResetMaxTraceEntryToDefault()
  1695. te.startServer(&testServer{security: e.security})
  1696. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1697. defer cleanup()
  1698. r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
  1699. te.resolverScheme = r.Scheme()
  1700. te.clientConn()
  1701. defer te.tearDown()
  1702. var subConn int64
  1703. // Here, we just wait for all sockets to be up. In the future, if we implement
  1704. // IDLE, we may need to make several rpc calls to create the sockets.
  1705. if err := verifyResultWithDelay(func() (bool, error) {
  1706. tcs, _ := channelz.GetTopChannels(0)
  1707. if len(tcs) != 1 {
  1708. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1709. }
  1710. if len(tcs[0].SubChans) != 1 {
  1711. return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
  1712. }
  1713. for k := range tcs[0].SubChans {
  1714. subConn = k
  1715. }
  1716. return true, nil
  1717. }); err != nil {
  1718. t.Fatal(err)
  1719. }
  1720. r.NewAddress([]resolver.Address{})
  1721. if err := verifyResultWithDelay(func() (bool, error) {
  1722. tcs, _ := channelz.GetTopChannels(0)
  1723. if len(tcs) != 1 {
  1724. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1725. }
  1726. if len(tcs[0].SubChans) != 0 {
  1727. return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans))
  1728. }
  1729. return true, nil
  1730. }); err != nil {
  1731. t.Fatal(err)
  1732. }
  1733. // verify that the subchannel no longer exist due to trace referencing it got overwritten.
  1734. if err := verifyResultWithDelay(func() (bool, error) {
  1735. cm := channelz.GetChannel(subConn)
  1736. if cm != nil {
  1737. return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore")
  1738. }
  1739. return true, nil
  1740. }); err != nil {
  1741. t.Fatal(err)
  1742. }
  1743. }
  1744. func TestCZTraceTopChannelDeletionTraceClear(t *testing.T) {
  1745. defer leakcheck.Check(t)
  1746. channelz.NewChannelzStorage()
  1747. e := tcpClearRREnv
  1748. te := newTest(t, e)
  1749. te.startServer(&testServer{security: e.security})
  1750. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1751. defer cleanup()
  1752. r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
  1753. te.resolverScheme = r.Scheme()
  1754. te.clientConn()
  1755. var subConn int64
  1756. // Here, we just wait for all sockets to be up. In the future, if we implement
  1757. // IDLE, we may need to make several rpc calls to create the sockets.
  1758. if err := verifyResultWithDelay(func() (bool, error) {
  1759. tcs, _ := channelz.GetTopChannels(0)
  1760. if len(tcs) != 1 {
  1761. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1762. }
  1763. if len(tcs[0].SubChans) != 1 {
  1764. return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
  1765. }
  1766. for k := range tcs[0].SubChans {
  1767. subConn = k
  1768. }
  1769. return true, nil
  1770. }); err != nil {
  1771. t.Fatal(err)
  1772. }
  1773. te.tearDown()
  1774. // verify that the subchannel no longer exist due to parent channel got deleted and its trace cleared.
  1775. if err := verifyResultWithDelay(func() (bool, error) {
  1776. cm := channelz.GetChannel(subConn)
  1777. if cm != nil {
  1778. return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore")
  1779. }
  1780. return true, nil
  1781. }); err != nil {
  1782. t.Fatal(err)
  1783. }
  1784. }