stats_test.go 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255
  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package stats_test
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "net"
  24. "reflect"
  25. "sync"
  26. "testing"
  27. "time"
  28. "github.com/golang/protobuf/proto"
  29. "google.golang.org/grpc"
  30. "google.golang.org/grpc/metadata"
  31. "google.golang.org/grpc/stats"
  32. testpb "google.golang.org/grpc/stats/grpc_testing"
  33. "google.golang.org/grpc/status"
  34. )
  35. func init() {
  36. grpc.EnableTracing = false
  37. }
  38. type connCtxKey struct{}
  39. type rpcCtxKey struct{}
  40. var (
  41. // For headers:
  42. testMetadata = metadata.MD{
  43. "key1": []string{"value1"},
  44. "key2": []string{"value2"},
  45. }
  46. // For trailers:
  47. testTrailerMetadata = metadata.MD{
  48. "tkey1": []string{"trailerValue1"},
  49. "tkey2": []string{"trailerValue2"},
  50. }
  51. // The id for which the service handler should return error.
  52. errorID int32 = 32202
  53. )
  54. type testServer struct{}
  55. func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  56. md, ok := metadata.FromIncomingContext(ctx)
  57. if ok {
  58. if err := grpc.SendHeader(ctx, md); err != nil {
  59. return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", md, err)
  60. }
  61. if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil {
  62. return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err)
  63. }
  64. }
  65. if in.Id == errorID {
  66. return nil, fmt.Errorf("got error id: %v", in.Id)
  67. }
  68. return &testpb.SimpleResponse{Id: in.Id}, nil
  69. }
  70. func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
  71. md, ok := metadata.FromIncomingContext(stream.Context())
  72. if ok {
  73. if err := stream.SendHeader(md); err != nil {
  74. return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
  75. }
  76. stream.SetTrailer(testTrailerMetadata)
  77. }
  78. for {
  79. in, err := stream.Recv()
  80. if err == io.EOF {
  81. // read done.
  82. return nil
  83. }
  84. if err != nil {
  85. return err
  86. }
  87. if in.Id == errorID {
  88. return fmt.Errorf("got error id: %v", in.Id)
  89. }
  90. if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil {
  91. return err
  92. }
  93. }
  94. }
  95. func (s *testServer) ClientStreamCall(stream testpb.TestService_ClientStreamCallServer) error {
  96. md, ok := metadata.FromIncomingContext(stream.Context())
  97. if ok {
  98. if err := stream.SendHeader(md); err != nil {
  99. return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
  100. }
  101. stream.SetTrailer(testTrailerMetadata)
  102. }
  103. for {
  104. in, err := stream.Recv()
  105. if err == io.EOF {
  106. // read done.
  107. return stream.SendAndClose(&testpb.SimpleResponse{Id: int32(0)})
  108. }
  109. if err != nil {
  110. return err
  111. }
  112. if in.Id == errorID {
  113. return fmt.Errorf("got error id: %v", in.Id)
  114. }
  115. }
  116. }
  117. func (s *testServer) ServerStreamCall(in *testpb.SimpleRequest, stream testpb.TestService_ServerStreamCallServer) error {
  118. md, ok := metadata.FromIncomingContext(stream.Context())
  119. if ok {
  120. if err := stream.SendHeader(md); err != nil {
  121. return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
  122. }
  123. stream.SetTrailer(testTrailerMetadata)
  124. }
  125. if in.Id == errorID {
  126. return fmt.Errorf("got error id: %v", in.Id)
  127. }
  128. for i := 0; i < 5; i++ {
  129. if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil {
  130. return err
  131. }
  132. }
  133. return nil
  134. }
  135. // test is an end-to-end test. It should be created with the newTest
  136. // func, modified as needed, and then started with its startServer method.
  137. // It should be cleaned up with the tearDown method.
  138. type test struct {
  139. t *testing.T
  140. compress string
  141. clientStatsHandler stats.Handler
  142. serverStatsHandler stats.Handler
  143. testServer testpb.TestServiceServer // nil means none
  144. // srv and srvAddr are set once startServer is called.
  145. srv *grpc.Server
  146. srvAddr string
  147. cc *grpc.ClientConn // nil until requested via clientConn
  148. }
  149. func (te *test) tearDown() {
  150. if te.cc != nil {
  151. te.cc.Close()
  152. te.cc = nil
  153. }
  154. te.srv.Stop()
  155. }
  156. type testConfig struct {
  157. compress string
  158. }
  159. // newTest returns a new test using the provided testing.T and
  160. // environment. It is returned with default values. Tests should
  161. // modify it before calling its startServer and clientConn methods.
  162. func newTest(t *testing.T, tc *testConfig, ch stats.Handler, sh stats.Handler) *test {
  163. te := &test{
  164. t: t,
  165. compress: tc.compress,
  166. clientStatsHandler: ch,
  167. serverStatsHandler: sh,
  168. }
  169. return te
  170. }
  171. // startServer starts a gRPC server listening. Callers should defer a
  172. // call to te.tearDown to clean up.
  173. func (te *test) startServer(ts testpb.TestServiceServer) {
  174. te.testServer = ts
  175. lis, err := net.Listen("tcp", "localhost:0")
  176. if err != nil {
  177. te.t.Fatalf("Failed to listen: %v", err)
  178. }
  179. var opts []grpc.ServerOption
  180. if te.compress == "gzip" {
  181. opts = append(opts,
  182. grpc.RPCCompressor(grpc.NewGZIPCompressor()),
  183. grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
  184. )
  185. }
  186. if te.serverStatsHandler != nil {
  187. opts = append(opts, grpc.StatsHandler(te.serverStatsHandler))
  188. }
  189. s := grpc.NewServer(opts...)
  190. te.srv = s
  191. if te.testServer != nil {
  192. testpb.RegisterTestServiceServer(s, te.testServer)
  193. }
  194. go s.Serve(lis)
  195. te.srvAddr = lis.Addr().String()
  196. }
  197. func (te *test) clientConn() *grpc.ClientConn {
  198. if te.cc != nil {
  199. return te.cc
  200. }
  201. opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()}
  202. if te.compress == "gzip" {
  203. opts = append(opts,
  204. grpc.WithCompressor(grpc.NewGZIPCompressor()),
  205. grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
  206. )
  207. }
  208. if te.clientStatsHandler != nil {
  209. opts = append(opts, grpc.WithStatsHandler(te.clientStatsHandler))
  210. }
  211. var err error
  212. te.cc, err = grpc.Dial(te.srvAddr, opts...)
  213. if err != nil {
  214. te.t.Fatalf("Dial(%q) = %v", te.srvAddr, err)
  215. }
  216. return te.cc
  217. }
  218. type rpcType int
  219. const (
  220. unaryRPC rpcType = iota
  221. clientStreamRPC
  222. serverStreamRPC
  223. fullDuplexStreamRPC
  224. )
  225. type rpcConfig struct {
  226. count int // Number of requests and responses for streaming RPCs.
  227. success bool // Whether the RPC should succeed or return error.
  228. failfast bool
  229. callType rpcType // Type of RPC.
  230. }
  231. func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
  232. var (
  233. resp *testpb.SimpleResponse
  234. req *testpb.SimpleRequest
  235. err error
  236. )
  237. tc := testpb.NewTestServiceClient(te.clientConn())
  238. if c.success {
  239. req = &testpb.SimpleRequest{Id: errorID + 1}
  240. } else {
  241. req = &testpb.SimpleRequest{Id: errorID}
  242. }
  243. ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
  244. resp, err = tc.UnaryCall(ctx, req, grpc.FailFast(c.failfast))
  245. return req, resp, err
  246. }
  247. func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
  248. var (
  249. reqs []*testpb.SimpleRequest
  250. resps []*testpb.SimpleResponse
  251. err error
  252. )
  253. tc := testpb.NewTestServiceClient(te.clientConn())
  254. stream, err := tc.FullDuplexCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.FailFast(c.failfast))
  255. if err != nil {
  256. return reqs, resps, err
  257. }
  258. var startID int32
  259. if !c.success {
  260. startID = errorID
  261. }
  262. for i := 0; i < c.count; i++ {
  263. req := &testpb.SimpleRequest{
  264. Id: int32(i) + startID,
  265. }
  266. reqs = append(reqs, req)
  267. if err = stream.Send(req); err != nil {
  268. return reqs, resps, err
  269. }
  270. var resp *testpb.SimpleResponse
  271. if resp, err = stream.Recv(); err != nil {
  272. return reqs, resps, err
  273. }
  274. resps = append(resps, resp)
  275. }
  276. if err = stream.CloseSend(); err != nil && err != io.EOF {
  277. return reqs, resps, err
  278. }
  279. if _, err = stream.Recv(); err != io.EOF {
  280. return reqs, resps, err
  281. }
  282. return reqs, resps, nil
  283. }
  284. func (te *test) doClientStreamCall(c *rpcConfig) ([]*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
  285. var (
  286. reqs []*testpb.SimpleRequest
  287. resp *testpb.SimpleResponse
  288. err error
  289. )
  290. tc := testpb.NewTestServiceClient(te.clientConn())
  291. stream, err := tc.ClientStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.FailFast(c.failfast))
  292. if err != nil {
  293. return reqs, resp, err
  294. }
  295. var startID int32
  296. if !c.success {
  297. startID = errorID
  298. }
  299. for i := 0; i < c.count; i++ {
  300. req := &testpb.SimpleRequest{
  301. Id: int32(i) + startID,
  302. }
  303. reqs = append(reqs, req)
  304. if err = stream.Send(req); err != nil {
  305. return reqs, resp, err
  306. }
  307. }
  308. resp, err = stream.CloseAndRecv()
  309. return reqs, resp, err
  310. }
  311. func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
  312. var (
  313. req *testpb.SimpleRequest
  314. resps []*testpb.SimpleResponse
  315. err error
  316. )
  317. tc := testpb.NewTestServiceClient(te.clientConn())
  318. var startID int32
  319. if !c.success {
  320. startID = errorID
  321. }
  322. req = &testpb.SimpleRequest{Id: startID}
  323. stream, err := tc.ServerStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), req, grpc.FailFast(c.failfast))
  324. if err != nil {
  325. return req, resps, err
  326. }
  327. for {
  328. var resp *testpb.SimpleResponse
  329. resp, err := stream.Recv()
  330. if err == io.EOF {
  331. return req, resps, nil
  332. } else if err != nil {
  333. return req, resps, err
  334. }
  335. resps = append(resps, resp)
  336. }
  337. }
  338. type expectedData struct {
  339. method string
  340. serverAddr string
  341. compression string
  342. reqIdx int
  343. requests []*testpb.SimpleRequest
  344. respIdx int
  345. responses []*testpb.SimpleResponse
  346. err error
  347. failfast bool
  348. }
  349. type gotData struct {
  350. ctx context.Context
  351. client bool
  352. s interface{} // This could be RPCStats or ConnStats.
  353. }
  354. const (
  355. begin int = iota
  356. end
  357. inPayload
  358. inHeader
  359. inTrailer
  360. outPayload
  361. outHeader
  362. // TODO: test outTrailer ?
  363. connbegin
  364. connend
  365. )
  366. func checkBegin(t *testing.T, d *gotData, e *expectedData) {
  367. var (
  368. ok bool
  369. st *stats.Begin
  370. )
  371. if st, ok = d.s.(*stats.Begin); !ok {
  372. t.Fatalf("got %T, want Begin", d.s)
  373. }
  374. if d.ctx == nil {
  375. t.Fatalf("d.ctx = nil, want <non-nil>")
  376. }
  377. if st.BeginTime.IsZero() {
  378. t.Fatalf("st.BeginTime = %v, want <non-zero>", st.BeginTime)
  379. }
  380. if d.client {
  381. if st.FailFast != e.failfast {
  382. t.Fatalf("st.FailFast = %v, want %v", st.FailFast, e.failfast)
  383. }
  384. }
  385. }
  386. func checkInHeader(t *testing.T, d *gotData, e *expectedData) {
  387. var (
  388. ok bool
  389. st *stats.InHeader
  390. )
  391. if st, ok = d.s.(*stats.InHeader); !ok {
  392. t.Fatalf("got %T, want InHeader", d.s)
  393. }
  394. if d.ctx == nil {
  395. t.Fatalf("d.ctx = nil, want <non-nil>")
  396. }
  397. if !d.client {
  398. if st.FullMethod != e.method {
  399. t.Fatalf("st.FullMethod = %s, want %v", st.FullMethod, e.method)
  400. }
  401. if st.LocalAddr.String() != e.serverAddr {
  402. t.Fatalf("st.LocalAddr = %v, want %v", st.LocalAddr, e.serverAddr)
  403. }
  404. if st.Compression != e.compression {
  405. t.Fatalf("st.Compression = %v, want %v", st.Compression, e.compression)
  406. }
  407. if connInfo, ok := d.ctx.Value(connCtxKey{}).(*stats.ConnTagInfo); ok {
  408. if connInfo.RemoteAddr != st.RemoteAddr {
  409. t.Fatalf("connInfo.RemoteAddr = %v, want %v", connInfo.RemoteAddr, st.RemoteAddr)
  410. }
  411. if connInfo.LocalAddr != st.LocalAddr {
  412. t.Fatalf("connInfo.LocalAddr = %v, want %v", connInfo.LocalAddr, st.LocalAddr)
  413. }
  414. } else {
  415. t.Fatalf("got context %v, want one with connCtxKey", d.ctx)
  416. }
  417. if rpcInfo, ok := d.ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo); ok {
  418. if rpcInfo.FullMethodName != st.FullMethod {
  419. t.Fatalf("rpcInfo.FullMethod = %s, want %v", rpcInfo.FullMethodName, st.FullMethod)
  420. }
  421. } else {
  422. t.Fatalf("got context %v, want one with rpcCtxKey", d.ctx)
  423. }
  424. }
  425. }
  426. func checkInPayload(t *testing.T, d *gotData, e *expectedData) {
  427. var (
  428. ok bool
  429. st *stats.InPayload
  430. )
  431. if st, ok = d.s.(*stats.InPayload); !ok {
  432. t.Fatalf("got %T, want InPayload", d.s)
  433. }
  434. if d.ctx == nil {
  435. t.Fatalf("d.ctx = nil, want <non-nil>")
  436. }
  437. if d.client {
  438. b, err := proto.Marshal(e.responses[e.respIdx])
  439. if err != nil {
  440. t.Fatalf("failed to marshal message: %v", err)
  441. }
  442. if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.responses[e.respIdx]) {
  443. t.Fatalf("st.Payload = %T, want %T", st.Payload, e.responses[e.respIdx])
  444. }
  445. e.respIdx++
  446. if string(st.Data) != string(b) {
  447. t.Fatalf("st.Data = %v, want %v", st.Data, b)
  448. }
  449. if st.Length != len(b) {
  450. t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
  451. }
  452. } else {
  453. b, err := proto.Marshal(e.requests[e.reqIdx])
  454. if err != nil {
  455. t.Fatalf("failed to marshal message: %v", err)
  456. }
  457. if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.requests[e.reqIdx]) {
  458. t.Fatalf("st.Payload = %T, want %T", st.Payload, e.requests[e.reqIdx])
  459. }
  460. e.reqIdx++
  461. if string(st.Data) != string(b) {
  462. t.Fatalf("st.Data = %v, want %v", st.Data, b)
  463. }
  464. if st.Length != len(b) {
  465. t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
  466. }
  467. }
  468. // TODO check WireLength and ReceivedTime.
  469. if st.RecvTime.IsZero() {
  470. t.Fatalf("st.ReceivedTime = %v, want <non-zero>", st.RecvTime)
  471. }
  472. }
  473. func checkInTrailer(t *testing.T, d *gotData, e *expectedData) {
  474. var (
  475. ok bool
  476. )
  477. if _, ok = d.s.(*stats.InTrailer); !ok {
  478. t.Fatalf("got %T, want InTrailer", d.s)
  479. }
  480. if d.ctx == nil {
  481. t.Fatalf("d.ctx = nil, want <non-nil>")
  482. }
  483. }
  484. func checkOutHeader(t *testing.T, d *gotData, e *expectedData) {
  485. var (
  486. ok bool
  487. st *stats.OutHeader
  488. )
  489. if st, ok = d.s.(*stats.OutHeader); !ok {
  490. t.Fatalf("got %T, want OutHeader", d.s)
  491. }
  492. if d.ctx == nil {
  493. t.Fatalf("d.ctx = nil, want <non-nil>")
  494. }
  495. if d.client {
  496. if st.FullMethod != e.method {
  497. t.Fatalf("st.FullMethod = %s, want %v", st.FullMethod, e.method)
  498. }
  499. if st.RemoteAddr.String() != e.serverAddr {
  500. t.Fatalf("st.RemoteAddr = %v, want %v", st.RemoteAddr, e.serverAddr)
  501. }
  502. if st.Compression != e.compression {
  503. t.Fatalf("st.Compression = %v, want %v", st.Compression, e.compression)
  504. }
  505. if rpcInfo, ok := d.ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo); ok {
  506. if rpcInfo.FullMethodName != st.FullMethod {
  507. t.Fatalf("rpcInfo.FullMethod = %s, want %v", rpcInfo.FullMethodName, st.FullMethod)
  508. }
  509. } else {
  510. t.Fatalf("got context %v, want one with rpcCtxKey", d.ctx)
  511. }
  512. }
  513. }
  514. func checkOutPayload(t *testing.T, d *gotData, e *expectedData) {
  515. var (
  516. ok bool
  517. st *stats.OutPayload
  518. )
  519. if st, ok = d.s.(*stats.OutPayload); !ok {
  520. t.Fatalf("got %T, want OutPayload", d.s)
  521. }
  522. if d.ctx == nil {
  523. t.Fatalf("d.ctx = nil, want <non-nil>")
  524. }
  525. if d.client {
  526. b, err := proto.Marshal(e.requests[e.reqIdx])
  527. if err != nil {
  528. t.Fatalf("failed to marshal message: %v", err)
  529. }
  530. if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.requests[e.reqIdx]) {
  531. t.Fatalf("st.Payload = %T, want %T", st.Payload, e.requests[e.reqIdx])
  532. }
  533. e.reqIdx++
  534. if string(st.Data) != string(b) {
  535. t.Fatalf("st.Data = %v, want %v", st.Data, b)
  536. }
  537. if st.Length != len(b) {
  538. t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
  539. }
  540. } else {
  541. b, err := proto.Marshal(e.responses[e.respIdx])
  542. if err != nil {
  543. t.Fatalf("failed to marshal message: %v", err)
  544. }
  545. if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.responses[e.respIdx]) {
  546. t.Fatalf("st.Payload = %T, want %T", st.Payload, e.responses[e.respIdx])
  547. }
  548. e.respIdx++
  549. if string(st.Data) != string(b) {
  550. t.Fatalf("st.Data = %v, want %v", st.Data, b)
  551. }
  552. if st.Length != len(b) {
  553. t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
  554. }
  555. }
  556. // TODO check WireLength and ReceivedTime.
  557. if st.SentTime.IsZero() {
  558. t.Fatalf("st.SentTime = %v, want <non-zero>", st.SentTime)
  559. }
  560. }
  561. func checkOutTrailer(t *testing.T, d *gotData, e *expectedData) {
  562. var (
  563. ok bool
  564. st *stats.OutTrailer
  565. )
  566. if st, ok = d.s.(*stats.OutTrailer); !ok {
  567. t.Fatalf("got %T, want OutTrailer", d.s)
  568. }
  569. if d.ctx == nil {
  570. t.Fatalf("d.ctx = nil, want <non-nil>")
  571. }
  572. if st.Client {
  573. t.Fatalf("st IsClient = true, want false")
  574. }
  575. }
  576. func checkEnd(t *testing.T, d *gotData, e *expectedData) {
  577. var (
  578. ok bool
  579. st *stats.End
  580. )
  581. if st, ok = d.s.(*stats.End); !ok {
  582. t.Fatalf("got %T, want End", d.s)
  583. }
  584. if d.ctx == nil {
  585. t.Fatalf("d.ctx = nil, want <non-nil>")
  586. }
  587. if st.BeginTime.IsZero() {
  588. t.Fatalf("st.BeginTime = %v, want <non-zero>", st.BeginTime)
  589. }
  590. if st.EndTime.IsZero() {
  591. t.Fatalf("st.EndTime = %v, want <non-zero>", st.EndTime)
  592. }
  593. actual, ok := status.FromError(st.Error)
  594. if !ok {
  595. t.Fatalf("expected st.Error to be a statusError, got %v (type %T)", st.Error, st.Error)
  596. }
  597. expectedStatus, _ := status.FromError(e.err)
  598. if actual.Code() != expectedStatus.Code() || actual.Message() != expectedStatus.Message() {
  599. t.Fatalf("st.Error = %v, want %v", st.Error, e.err)
  600. }
  601. }
  602. func checkConnBegin(t *testing.T, d *gotData, e *expectedData) {
  603. var (
  604. ok bool
  605. st *stats.ConnBegin
  606. )
  607. if st, ok = d.s.(*stats.ConnBegin); !ok {
  608. t.Fatalf("got %T, want ConnBegin", d.s)
  609. }
  610. if d.ctx == nil {
  611. t.Fatalf("d.ctx = nil, want <non-nil>")
  612. }
  613. st.IsClient() // TODO remove this.
  614. }
  615. func checkConnEnd(t *testing.T, d *gotData, e *expectedData) {
  616. var (
  617. ok bool
  618. st *stats.ConnEnd
  619. )
  620. if st, ok = d.s.(*stats.ConnEnd); !ok {
  621. t.Fatalf("got %T, want ConnEnd", d.s)
  622. }
  623. if d.ctx == nil {
  624. t.Fatalf("d.ctx = nil, want <non-nil>")
  625. }
  626. st.IsClient() // TODO remove this.
  627. }
  628. type statshandler struct {
  629. mu sync.Mutex
  630. gotRPC []*gotData
  631. gotConn []*gotData
  632. }
  633. func (h *statshandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
  634. return context.WithValue(ctx, connCtxKey{}, info)
  635. }
  636. func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
  637. return context.WithValue(ctx, rpcCtxKey{}, info)
  638. }
  639. func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) {
  640. h.mu.Lock()
  641. defer h.mu.Unlock()
  642. h.gotConn = append(h.gotConn, &gotData{ctx, s.IsClient(), s})
  643. }
  644. func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
  645. h.mu.Lock()
  646. defer h.mu.Unlock()
  647. h.gotRPC = append(h.gotRPC, &gotData{ctx, s.IsClient(), s})
  648. }
  649. func checkConnStats(t *testing.T, got []*gotData) {
  650. if len(got) <= 0 || len(got)%2 != 0 {
  651. for i, g := range got {
  652. t.Errorf(" - %v, %T = %+v, ctx: %v", i, g.s, g.s, g.ctx)
  653. }
  654. t.Fatalf("got %v stats, want even positive number", len(got))
  655. }
  656. // The first conn stats must be a ConnBegin.
  657. checkConnBegin(t, got[0], nil)
  658. // The last conn stats must be a ConnEnd.
  659. checkConnEnd(t, got[len(got)-1], nil)
  660. }
  661. func checkServerStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) {
  662. if len(got) != len(checkFuncs) {
  663. for i, g := range got {
  664. t.Errorf(" - %v, %T", i, g.s)
  665. }
  666. t.Fatalf("got %v stats, want %v stats", len(got), len(checkFuncs))
  667. }
  668. var rpcctx context.Context
  669. for i := 0; i < len(got); i++ {
  670. if _, ok := got[i].s.(stats.RPCStats); ok {
  671. if rpcctx != nil && got[i].ctx != rpcctx {
  672. t.Fatalf("got different contexts with stats %T", got[i].s)
  673. }
  674. rpcctx = got[i].ctx
  675. }
  676. }
  677. for i, f := range checkFuncs {
  678. f(t, got[i], expect)
  679. }
  680. }
  681. func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) {
  682. h := &statshandler{}
  683. te := newTest(t, tc, nil, h)
  684. te.startServer(&testServer{})
  685. defer te.tearDown()
  686. var (
  687. reqs []*testpb.SimpleRequest
  688. resps []*testpb.SimpleResponse
  689. err error
  690. method string
  691. req *testpb.SimpleRequest
  692. resp *testpb.SimpleResponse
  693. e error
  694. )
  695. switch cc.callType {
  696. case unaryRPC:
  697. method = "/grpc.testing.TestService/UnaryCall"
  698. req, resp, e = te.doUnaryCall(cc)
  699. reqs = []*testpb.SimpleRequest{req}
  700. resps = []*testpb.SimpleResponse{resp}
  701. err = e
  702. case clientStreamRPC:
  703. method = "/grpc.testing.TestService/ClientStreamCall"
  704. reqs, resp, e = te.doClientStreamCall(cc)
  705. resps = []*testpb.SimpleResponse{resp}
  706. err = e
  707. case serverStreamRPC:
  708. method = "/grpc.testing.TestService/ServerStreamCall"
  709. req, resps, e = te.doServerStreamCall(cc)
  710. reqs = []*testpb.SimpleRequest{req}
  711. err = e
  712. case fullDuplexStreamRPC:
  713. method = "/grpc.testing.TestService/FullDuplexCall"
  714. reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
  715. }
  716. if cc.success != (err == nil) {
  717. t.Fatalf("cc.success: %v, got error: %v", cc.success, err)
  718. }
  719. te.cc.Close()
  720. te.srv.GracefulStop() // Wait for the server to stop.
  721. for {
  722. h.mu.Lock()
  723. if len(h.gotRPC) >= len(checkFuncs) {
  724. h.mu.Unlock()
  725. break
  726. }
  727. h.mu.Unlock()
  728. time.Sleep(10 * time.Millisecond)
  729. }
  730. for {
  731. h.mu.Lock()
  732. if _, ok := h.gotConn[len(h.gotConn)-1].s.(*stats.ConnEnd); ok {
  733. h.mu.Unlock()
  734. break
  735. }
  736. h.mu.Unlock()
  737. time.Sleep(10 * time.Millisecond)
  738. }
  739. expect := &expectedData{
  740. serverAddr: te.srvAddr,
  741. compression: tc.compress,
  742. method: method,
  743. requests: reqs,
  744. responses: resps,
  745. err: err,
  746. }
  747. h.mu.Lock()
  748. checkConnStats(t, h.gotConn)
  749. h.mu.Unlock()
  750. checkServerStats(t, h.gotRPC, expect, checkFuncs)
  751. }
  752. func TestServerStatsUnaryRPC(t *testing.T) {
  753. testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
  754. checkInHeader,
  755. checkBegin,
  756. checkInPayload,
  757. checkOutHeader,
  758. checkOutPayload,
  759. checkOutTrailer,
  760. checkEnd,
  761. })
  762. }
  763. func TestServerStatsUnaryRPCError(t *testing.T) {
  764. testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
  765. checkInHeader,
  766. checkBegin,
  767. checkInPayload,
  768. checkOutHeader,
  769. checkOutTrailer,
  770. checkEnd,
  771. })
  772. }
  773. func TestServerStatsClientStreamRPC(t *testing.T) {
  774. count := 5
  775. checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  776. checkInHeader,
  777. checkBegin,
  778. checkOutHeader,
  779. }
  780. ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  781. checkInPayload,
  782. }
  783. for i := 0; i < count; i++ {
  784. checkFuncs = append(checkFuncs, ioPayFuncs...)
  785. }
  786. checkFuncs = append(checkFuncs,
  787. checkOutPayload,
  788. checkOutTrailer,
  789. checkEnd,
  790. )
  791. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: clientStreamRPC}, checkFuncs)
  792. }
  793. func TestServerStatsClientStreamRPCError(t *testing.T) {
  794. count := 1
  795. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: clientStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
  796. checkInHeader,
  797. checkBegin,
  798. checkOutHeader,
  799. checkInPayload,
  800. checkOutTrailer,
  801. checkEnd,
  802. })
  803. }
  804. func TestServerStatsServerStreamRPC(t *testing.T) {
  805. count := 5
  806. checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  807. checkInHeader,
  808. checkBegin,
  809. checkInPayload,
  810. checkOutHeader,
  811. }
  812. ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  813. checkOutPayload,
  814. }
  815. for i := 0; i < count; i++ {
  816. checkFuncs = append(checkFuncs, ioPayFuncs...)
  817. }
  818. checkFuncs = append(checkFuncs,
  819. checkOutTrailer,
  820. checkEnd,
  821. )
  822. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: serverStreamRPC}, checkFuncs)
  823. }
  824. func TestServerStatsServerStreamRPCError(t *testing.T) {
  825. count := 5
  826. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: serverStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
  827. checkInHeader,
  828. checkBegin,
  829. checkInPayload,
  830. checkOutHeader,
  831. checkOutTrailer,
  832. checkEnd,
  833. })
  834. }
  835. func TestServerStatsFullDuplexRPC(t *testing.T) {
  836. count := 5
  837. checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  838. checkInHeader,
  839. checkBegin,
  840. checkOutHeader,
  841. }
  842. ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
  843. checkInPayload,
  844. checkOutPayload,
  845. }
  846. for i := 0; i < count; i++ {
  847. checkFuncs = append(checkFuncs, ioPayFuncs...)
  848. }
  849. checkFuncs = append(checkFuncs,
  850. checkOutTrailer,
  851. checkEnd,
  852. )
  853. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: fullDuplexStreamRPC}, checkFuncs)
  854. }
  855. func TestServerStatsFullDuplexRPCError(t *testing.T) {
  856. count := 5
  857. testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: fullDuplexStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
  858. checkInHeader,
  859. checkBegin,
  860. checkOutHeader,
  861. checkInPayload,
  862. checkOutTrailer,
  863. checkEnd,
  864. })
  865. }
  866. type checkFuncWithCount struct {
  867. f func(t *testing.T, d *gotData, e *expectedData)
  868. c int // expected count
  869. }
  870. func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs map[int]*checkFuncWithCount) {
  871. var expectLen int
  872. for _, v := range checkFuncs {
  873. expectLen += v.c
  874. }
  875. if len(got) != expectLen {
  876. for i, g := range got {
  877. t.Errorf(" - %v, %T", i, g.s)
  878. }
  879. t.Fatalf("got %v stats, want %v stats", len(got), expectLen)
  880. }
  881. var tagInfoInCtx *stats.RPCTagInfo
  882. for i := 0; i < len(got); i++ {
  883. if _, ok := got[i].s.(stats.RPCStats); ok {
  884. tagInfoInCtxNew, _ := got[i].ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo)
  885. if tagInfoInCtx != nil && tagInfoInCtx != tagInfoInCtxNew {
  886. t.Fatalf("got context containing different tagInfo with stats %T", got[i].s)
  887. }
  888. tagInfoInCtx = tagInfoInCtxNew
  889. }
  890. }
  891. for _, s := range got {
  892. switch s.s.(type) {
  893. case *stats.Begin:
  894. if checkFuncs[begin].c <= 0 {
  895. t.Fatalf("unexpected stats: %T", s.s)
  896. }
  897. checkFuncs[begin].f(t, s, expect)
  898. checkFuncs[begin].c--
  899. case *stats.OutHeader:
  900. if checkFuncs[outHeader].c <= 0 {
  901. t.Fatalf("unexpected stats: %T", s.s)
  902. }
  903. checkFuncs[outHeader].f(t, s, expect)
  904. checkFuncs[outHeader].c--
  905. case *stats.OutPayload:
  906. if checkFuncs[outPayload].c <= 0 {
  907. t.Fatalf("unexpected stats: %T", s.s)
  908. }
  909. checkFuncs[outPayload].f(t, s, expect)
  910. checkFuncs[outPayload].c--
  911. case *stats.InHeader:
  912. if checkFuncs[inHeader].c <= 0 {
  913. t.Fatalf("unexpected stats: %T", s.s)
  914. }
  915. checkFuncs[inHeader].f(t, s, expect)
  916. checkFuncs[inHeader].c--
  917. case *stats.InPayload:
  918. if checkFuncs[inPayload].c <= 0 {
  919. t.Fatalf("unexpected stats: %T", s.s)
  920. }
  921. checkFuncs[inPayload].f(t, s, expect)
  922. checkFuncs[inPayload].c--
  923. case *stats.InTrailer:
  924. if checkFuncs[inTrailer].c <= 0 {
  925. t.Fatalf("unexpected stats: %T", s.s)
  926. }
  927. checkFuncs[inTrailer].f(t, s, expect)
  928. checkFuncs[inTrailer].c--
  929. case *stats.End:
  930. if checkFuncs[end].c <= 0 {
  931. t.Fatalf("unexpected stats: %T", s.s)
  932. }
  933. checkFuncs[end].f(t, s, expect)
  934. checkFuncs[end].c--
  935. case *stats.ConnBegin:
  936. if checkFuncs[connbegin].c <= 0 {
  937. t.Fatalf("unexpected stats: %T", s.s)
  938. }
  939. checkFuncs[connbegin].f(t, s, expect)
  940. checkFuncs[connbegin].c--
  941. case *stats.ConnEnd:
  942. if checkFuncs[connend].c <= 0 {
  943. t.Fatalf("unexpected stats: %T", s.s)
  944. }
  945. checkFuncs[connend].f(t, s, expect)
  946. checkFuncs[connend].c--
  947. default:
  948. t.Fatalf("unexpected stats: %T", s.s)
  949. }
  950. }
  951. }
  952. func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map[int]*checkFuncWithCount) {
  953. h := &statshandler{}
  954. te := newTest(t, tc, h, nil)
  955. te.startServer(&testServer{})
  956. defer te.tearDown()
  957. var (
  958. reqs []*testpb.SimpleRequest
  959. resps []*testpb.SimpleResponse
  960. method string
  961. err error
  962. req *testpb.SimpleRequest
  963. resp *testpb.SimpleResponse
  964. e error
  965. )
  966. switch cc.callType {
  967. case unaryRPC:
  968. method = "/grpc.testing.TestService/UnaryCall"
  969. req, resp, e = te.doUnaryCall(cc)
  970. reqs = []*testpb.SimpleRequest{req}
  971. resps = []*testpb.SimpleResponse{resp}
  972. err = e
  973. case clientStreamRPC:
  974. method = "/grpc.testing.TestService/ClientStreamCall"
  975. reqs, resp, e = te.doClientStreamCall(cc)
  976. resps = []*testpb.SimpleResponse{resp}
  977. err = e
  978. case serverStreamRPC:
  979. method = "/grpc.testing.TestService/ServerStreamCall"
  980. req, resps, e = te.doServerStreamCall(cc)
  981. reqs = []*testpb.SimpleRequest{req}
  982. err = e
  983. case fullDuplexStreamRPC:
  984. method = "/grpc.testing.TestService/FullDuplexCall"
  985. reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
  986. }
  987. if cc.success != (err == nil) {
  988. t.Fatalf("cc.success: %v, got error: %v", cc.success, err)
  989. }
  990. te.cc.Close()
  991. te.srv.GracefulStop() // Wait for the server to stop.
  992. lenRPCStats := 0
  993. for _, v := range checkFuncs {
  994. lenRPCStats += v.c
  995. }
  996. for {
  997. h.mu.Lock()
  998. if len(h.gotRPC) >= lenRPCStats {
  999. h.mu.Unlock()
  1000. break
  1001. }
  1002. h.mu.Unlock()
  1003. time.Sleep(10 * time.Millisecond)
  1004. }
  1005. for {
  1006. h.mu.Lock()
  1007. if _, ok := h.gotConn[len(h.gotConn)-1].s.(*stats.ConnEnd); ok {
  1008. h.mu.Unlock()
  1009. break
  1010. }
  1011. h.mu.Unlock()
  1012. time.Sleep(10 * time.Millisecond)
  1013. }
  1014. expect := &expectedData{
  1015. serverAddr: te.srvAddr,
  1016. compression: tc.compress,
  1017. method: method,
  1018. requests: reqs,
  1019. responses: resps,
  1020. failfast: cc.failfast,
  1021. err: err,
  1022. }
  1023. h.mu.Lock()
  1024. checkConnStats(t, h.gotConn)
  1025. h.mu.Unlock()
  1026. checkClientStats(t, h.gotRPC, expect, checkFuncs)
  1027. }
  1028. func TestClientStatsUnaryRPC(t *testing.T) {
  1029. testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
  1030. begin: {checkBegin, 1},
  1031. outHeader: {checkOutHeader, 1},
  1032. outPayload: {checkOutPayload, 1},
  1033. inHeader: {checkInHeader, 1},
  1034. inPayload: {checkInPayload, 1},
  1035. inTrailer: {checkInTrailer, 1},
  1036. end: {checkEnd, 1},
  1037. })
  1038. }
  1039. func TestClientStatsUnaryRPCError(t *testing.T) {
  1040. testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
  1041. begin: {checkBegin, 1},
  1042. outHeader: {checkOutHeader, 1},
  1043. outPayload: {checkOutPayload, 1},
  1044. inHeader: {checkInHeader, 1},
  1045. inTrailer: {checkInTrailer, 1},
  1046. end: {checkEnd, 1},
  1047. })
  1048. }
  1049. func TestClientStatsClientStreamRPC(t *testing.T) {
  1050. count := 5
  1051. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
  1052. begin: {checkBegin, 1},
  1053. outHeader: {checkOutHeader, 1},
  1054. inHeader: {checkInHeader, 1},
  1055. outPayload: {checkOutPayload, count},
  1056. inTrailer: {checkInTrailer, 1},
  1057. inPayload: {checkInPayload, 1},
  1058. end: {checkEnd, 1},
  1059. })
  1060. }
  1061. func TestClientStatsClientStreamRPCError(t *testing.T) {
  1062. count := 1
  1063. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
  1064. begin: {checkBegin, 1},
  1065. outHeader: {checkOutHeader, 1},
  1066. inHeader: {checkInHeader, 1},
  1067. outPayload: {checkOutPayload, 1},
  1068. inTrailer: {checkInTrailer, 1},
  1069. end: {checkEnd, 1},
  1070. })
  1071. }
  1072. func TestClientStatsServerStreamRPC(t *testing.T) {
  1073. count := 5
  1074. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
  1075. begin: {checkBegin, 1},
  1076. outHeader: {checkOutHeader, 1},
  1077. outPayload: {checkOutPayload, 1},
  1078. inHeader: {checkInHeader, 1},
  1079. inPayload: {checkInPayload, count},
  1080. inTrailer: {checkInTrailer, 1},
  1081. end: {checkEnd, 1},
  1082. })
  1083. }
  1084. func TestClientStatsServerStreamRPCError(t *testing.T) {
  1085. count := 5
  1086. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
  1087. begin: {checkBegin, 1},
  1088. outHeader: {checkOutHeader, 1},
  1089. outPayload: {checkOutPayload, 1},
  1090. inHeader: {checkInHeader, 1},
  1091. inTrailer: {checkInTrailer, 1},
  1092. end: {checkEnd, 1},
  1093. })
  1094. }
  1095. func TestClientStatsFullDuplexRPC(t *testing.T) {
  1096. count := 5
  1097. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
  1098. begin: {checkBegin, 1},
  1099. outHeader: {checkOutHeader, 1},
  1100. outPayload: {checkOutPayload, count},
  1101. inHeader: {checkInHeader, 1},
  1102. inPayload: {checkInPayload, count},
  1103. inTrailer: {checkInTrailer, 1},
  1104. end: {checkEnd, 1},
  1105. })
  1106. }
  1107. func TestClientStatsFullDuplexRPCError(t *testing.T) {
  1108. count := 5
  1109. testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
  1110. begin: {checkBegin, 1},
  1111. outHeader: {checkOutHeader, 1},
  1112. outPayload: {checkOutPayload, 1},
  1113. inHeader: {checkInHeader, 1},
  1114. inTrailer: {checkInTrailer, 1},
  1115. end: {checkEnd, 1},
  1116. })
  1117. }
  1118. func TestTags(t *testing.T) {
  1119. b := []byte{5, 2, 4, 3, 1}
  1120. ctx := stats.SetTags(context.Background(), b)
  1121. if tg := stats.OutgoingTags(ctx); !reflect.DeepEqual(tg, b) {
  1122. t.Errorf("OutgoingTags(%v) = %v; want %v", ctx, tg, b)
  1123. }
  1124. if tg := stats.Tags(ctx); tg != nil {
  1125. t.Errorf("Tags(%v) = %v; want nil", ctx, tg)
  1126. }
  1127. ctx = stats.SetIncomingTags(context.Background(), b)
  1128. if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, b) {
  1129. t.Errorf("Tags(%v) = %v; want %v", ctx, tg, b)
  1130. }
  1131. if tg := stats.OutgoingTags(ctx); tg != nil {
  1132. t.Errorf("OutgoingTags(%v) = %v; want nil", ctx, tg)
  1133. }
  1134. }
  1135. func TestTrace(t *testing.T) {
  1136. b := []byte{5, 2, 4, 3, 1}
  1137. ctx := stats.SetTrace(context.Background(), b)
  1138. if tr := stats.OutgoingTrace(ctx); !reflect.DeepEqual(tr, b) {
  1139. t.Errorf("OutgoingTrace(%v) = %v; want %v", ctx, tr, b)
  1140. }
  1141. if tr := stats.Trace(ctx); tr != nil {
  1142. t.Errorf("Trace(%v) = %v; want nil", ctx, tr)
  1143. }
  1144. ctx = stats.SetIncomingTrace(context.Background(), b)
  1145. if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, b) {
  1146. t.Errorf("Trace(%v) = %v; want %v", ctx, tr, b)
  1147. }
  1148. if tr := stats.OutgoingTrace(ctx); tr != nil {
  1149. t.Errorf("OutgoingTrace(%v) = %v; want nil", ctx, tr)
  1150. }
  1151. }