binarylog_end2end_test.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package binarylog_test
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "net"
  24. "sort"
  25. "sync"
  26. "testing"
  27. "time"
  28. "github.com/golang/protobuf/proto"
  29. "google.golang.org/grpc"
  30. pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
  31. "google.golang.org/grpc/grpclog"
  32. "google.golang.org/grpc/internal/binarylog"
  33. "google.golang.org/grpc/metadata"
  34. testpb "google.golang.org/grpc/stats/grpc_testing"
  35. "google.golang.org/grpc/status"
  36. )
  37. func init() {
  38. // Setting environment variable in tests doesn't work because of the init
  39. // orders. Set the loggers directly here.
  40. binarylog.SetLogger(binarylog.AllLogger)
  41. binarylog.SetDefaultSink(testSink)
  42. }
  43. var testSink = &testBinLogSink{}
  44. type testBinLogSink struct {
  45. mu sync.Mutex
  46. buf []*pb.GrpcLogEntry
  47. }
  48. func (s *testBinLogSink) Write(e *pb.GrpcLogEntry) error {
  49. s.mu.Lock()
  50. s.buf = append(s.buf, e)
  51. s.mu.Unlock()
  52. return nil
  53. }
  54. func (s *testBinLogSink) Close() error { return nil }
  55. // Returns all client entris if client is true, otherwise return all server
  56. // entries.
  57. func (s *testBinLogSink) logEntries(client bool) []*pb.GrpcLogEntry {
  58. logger := pb.GrpcLogEntry_LOGGER_SERVER
  59. if client {
  60. logger = pb.GrpcLogEntry_LOGGER_CLIENT
  61. }
  62. var ret []*pb.GrpcLogEntry
  63. s.mu.Lock()
  64. for _, e := range s.buf {
  65. if e.Logger == logger {
  66. ret = append(ret, e)
  67. }
  68. }
  69. s.mu.Unlock()
  70. return ret
  71. }
  72. func (s *testBinLogSink) clear() {
  73. s.mu.Lock()
  74. s.buf = nil
  75. s.mu.Unlock()
  76. }
  77. var (
  78. // For headers:
  79. testMetadata = metadata.MD{
  80. "key1": []string{"value1"},
  81. "key2": []string{"value2"},
  82. }
  83. // For trailers:
  84. testTrailerMetadata = metadata.MD{
  85. "tkey1": []string{"trailerValue1"},
  86. "tkey2": []string{"trailerValue2"},
  87. }
  88. // The id for which the service handler should return error.
  89. errorID int32 = 32202
  90. globalRPCID uint64 // RPC id starts with 1, but we do ++ at the beginning of each test.
  91. )
  92. type testServer struct {
  93. te *test
  94. }
  95. func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  96. md, ok := metadata.FromIncomingContext(ctx)
  97. if ok {
  98. if err := grpc.SendHeader(ctx, md); err != nil {
  99. return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", md, err)
  100. }
  101. if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil {
  102. return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err)
  103. }
  104. }
  105. if in.Id == errorID {
  106. return nil, fmt.Errorf("got error id: %v", in.Id)
  107. }
  108. return &testpb.SimpleResponse{Id: in.Id}, nil
  109. }
  110. func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
  111. md, ok := metadata.FromIncomingContext(stream.Context())
  112. if ok {
  113. if err := stream.SendHeader(md); err != nil {
  114. return status.Errorf(status.Code(err), "stream.SendHeader(%v) = %v, want %v", md, err, nil)
  115. }
  116. stream.SetTrailer(testTrailerMetadata)
  117. }
  118. for {
  119. in, err := stream.Recv()
  120. if err == io.EOF {
  121. // read done.
  122. return nil
  123. }
  124. if err != nil {
  125. return err
  126. }
  127. if in.Id == errorID {
  128. return fmt.Errorf("got error id: %v", in.Id)
  129. }
  130. if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil {
  131. return err
  132. }
  133. }
  134. }
  135. func (s *testServer) ClientStreamCall(stream testpb.TestService_ClientStreamCallServer) error {
  136. md, ok := metadata.FromIncomingContext(stream.Context())
  137. if ok {
  138. if err := stream.SendHeader(md); err != nil {
  139. return status.Errorf(status.Code(err), "stream.SendHeader(%v) = %v, want %v", md, err, nil)
  140. }
  141. stream.SetTrailer(testTrailerMetadata)
  142. }
  143. for {
  144. in, err := stream.Recv()
  145. if err == io.EOF {
  146. // read done.
  147. return stream.SendAndClose(&testpb.SimpleResponse{Id: int32(0)})
  148. }
  149. if err != nil {
  150. return err
  151. }
  152. if in.Id == errorID {
  153. return fmt.Errorf("got error id: %v", in.Id)
  154. }
  155. }
  156. }
  157. func (s *testServer) ServerStreamCall(in *testpb.SimpleRequest, stream testpb.TestService_ServerStreamCallServer) error {
  158. md, ok := metadata.FromIncomingContext(stream.Context())
  159. if ok {
  160. if err := stream.SendHeader(md); err != nil {
  161. return status.Errorf(status.Code(err), "stream.SendHeader(%v) = %v, want %v", md, err, nil)
  162. }
  163. stream.SetTrailer(testTrailerMetadata)
  164. }
  165. if in.Id == errorID {
  166. return fmt.Errorf("got error id: %v", in.Id)
  167. }
  168. for i := 0; i < 5; i++ {
  169. if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil {
  170. return err
  171. }
  172. }
  173. return nil
  174. }
  175. // test is an end-to-end test. It should be created with the newTest
  176. // func, modified as needed, and then started with its startServer method.
  177. // It should be cleaned up with the tearDown method.
  178. type test struct {
  179. t *testing.T
  180. testServer testpb.TestServiceServer // nil means none
  181. // srv and srvAddr are set once startServer is called.
  182. srv *grpc.Server
  183. srvAddr string // Server IP without port.
  184. srvIP net.IP
  185. srvPort int
  186. cc *grpc.ClientConn // nil until requested via clientConn
  187. // Fields for client address. Set by the service handler.
  188. clientAddrMu sync.Mutex
  189. clientIP net.IP
  190. clientPort int
  191. }
  192. func (te *test) tearDown() {
  193. if te.cc != nil {
  194. te.cc.Close()
  195. te.cc = nil
  196. }
  197. te.srv.Stop()
  198. }
  199. type testConfig struct {
  200. }
  201. // newTest returns a new test using the provided testing.T and
  202. // environment. It is returned with default values. Tests should
  203. // modify it before calling its startServer and clientConn methods.
  204. func newTest(t *testing.T, tc *testConfig) *test {
  205. te := &test{
  206. t: t,
  207. }
  208. return te
  209. }
  210. type listenerWrapper struct {
  211. net.Listener
  212. te *test
  213. }
  214. func (lw *listenerWrapper) Accept() (net.Conn, error) {
  215. conn, err := lw.Listener.Accept()
  216. if err != nil {
  217. return nil, err
  218. }
  219. lw.te.clientAddrMu.Lock()
  220. lw.te.clientIP = conn.RemoteAddr().(*net.TCPAddr).IP
  221. lw.te.clientPort = conn.RemoteAddr().(*net.TCPAddr).Port
  222. lw.te.clientAddrMu.Unlock()
  223. return conn, nil
  224. }
  225. // startServer starts a gRPC server listening. Callers should defer a
  226. // call to te.tearDown to clean up.
  227. func (te *test) startServer(ts testpb.TestServiceServer) {
  228. te.testServer = ts
  229. lis, err := net.Listen("tcp", "localhost:0")
  230. lis = &listenerWrapper{
  231. Listener: lis,
  232. te: te,
  233. }
  234. if err != nil {
  235. te.t.Fatalf("Failed to listen: %v", err)
  236. }
  237. var opts []grpc.ServerOption
  238. s := grpc.NewServer(opts...)
  239. te.srv = s
  240. if te.testServer != nil {
  241. testpb.RegisterTestServiceServer(s, te.testServer)
  242. }
  243. go s.Serve(lis)
  244. te.srvAddr = lis.Addr().String()
  245. te.srvIP = lis.Addr().(*net.TCPAddr).IP
  246. te.srvPort = lis.Addr().(*net.TCPAddr).Port
  247. }
  248. func (te *test) clientConn() *grpc.ClientConn {
  249. if te.cc != nil {
  250. return te.cc
  251. }
  252. opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()}
  253. var err error
  254. te.cc, err = grpc.Dial(te.srvAddr, opts...)
  255. if err != nil {
  256. te.t.Fatalf("Dial(%q) = %v", te.srvAddr, err)
  257. }
  258. return te.cc
  259. }
  260. type rpcType int
  261. const (
  262. unaryRPC rpcType = iota
  263. clientStreamRPC
  264. serverStreamRPC
  265. fullDuplexStreamRPC
  266. cancelRPC
  267. )
  268. type rpcConfig struct {
  269. count int // Number of requests and responses for streaming RPCs.
  270. success bool // Whether the RPC should succeed or return error.
  271. callType rpcType // Type of RPC.
  272. }
  273. func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
  274. var (
  275. resp *testpb.SimpleResponse
  276. req *testpb.SimpleRequest
  277. err error
  278. )
  279. tc := testpb.NewTestServiceClient(te.clientConn())
  280. if c.success {
  281. req = &testpb.SimpleRequest{Id: errorID + 1}
  282. } else {
  283. req = &testpb.SimpleRequest{Id: errorID}
  284. }
  285. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  286. defer cancel()
  287. ctx = metadata.NewOutgoingContext(ctx, testMetadata)
  288. resp, err = tc.UnaryCall(ctx, req)
  289. return req, resp, err
  290. }
  291. func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
  292. var (
  293. reqs []*testpb.SimpleRequest
  294. resps []*testpb.SimpleResponse
  295. err error
  296. )
  297. tc := testpb.NewTestServiceClient(te.clientConn())
  298. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  299. defer cancel()
  300. ctx = metadata.NewOutgoingContext(ctx, testMetadata)
  301. stream, err := tc.FullDuplexCall(ctx)
  302. if err != nil {
  303. return reqs, resps, err
  304. }
  305. if c.callType == cancelRPC {
  306. cancel()
  307. return reqs, resps, context.Canceled
  308. }
  309. var startID int32
  310. if !c.success {
  311. startID = errorID
  312. }
  313. for i := 0; i < c.count; i++ {
  314. req := &testpb.SimpleRequest{
  315. Id: int32(i) + startID,
  316. }
  317. reqs = append(reqs, req)
  318. if err = stream.Send(req); err != nil {
  319. return reqs, resps, err
  320. }
  321. var resp *testpb.SimpleResponse
  322. if resp, err = stream.Recv(); err != nil {
  323. return reqs, resps, err
  324. }
  325. resps = append(resps, resp)
  326. }
  327. if err = stream.CloseSend(); err != nil && err != io.EOF {
  328. return reqs, resps, err
  329. }
  330. if _, err = stream.Recv(); err != io.EOF {
  331. return reqs, resps, err
  332. }
  333. return reqs, resps, nil
  334. }
  335. func (te *test) doClientStreamCall(c *rpcConfig) ([]*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
  336. var (
  337. reqs []*testpb.SimpleRequest
  338. resp *testpb.SimpleResponse
  339. err error
  340. )
  341. tc := testpb.NewTestServiceClient(te.clientConn())
  342. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  343. defer cancel()
  344. ctx = metadata.NewOutgoingContext(ctx, testMetadata)
  345. stream, err := tc.ClientStreamCall(ctx)
  346. if err != nil {
  347. return reqs, resp, err
  348. }
  349. var startID int32
  350. if !c.success {
  351. startID = errorID
  352. }
  353. for i := 0; i < c.count; i++ {
  354. req := &testpb.SimpleRequest{
  355. Id: int32(i) + startID,
  356. }
  357. reqs = append(reqs, req)
  358. if err = stream.Send(req); err != nil {
  359. return reqs, resp, err
  360. }
  361. }
  362. resp, err = stream.CloseAndRecv()
  363. return reqs, resp, err
  364. }
  365. func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
  366. var (
  367. req *testpb.SimpleRequest
  368. resps []*testpb.SimpleResponse
  369. err error
  370. )
  371. tc := testpb.NewTestServiceClient(te.clientConn())
  372. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  373. defer cancel()
  374. ctx = metadata.NewOutgoingContext(ctx, testMetadata)
  375. var startID int32
  376. if !c.success {
  377. startID = errorID
  378. }
  379. req = &testpb.SimpleRequest{Id: startID}
  380. stream, err := tc.ServerStreamCall(ctx, req)
  381. if err != nil {
  382. return req, resps, err
  383. }
  384. for {
  385. var resp *testpb.SimpleResponse
  386. resp, err := stream.Recv()
  387. if err == io.EOF {
  388. return req, resps, nil
  389. } else if err != nil {
  390. return req, resps, err
  391. }
  392. resps = append(resps, resp)
  393. }
  394. }
  395. type expectedData struct {
  396. te *test
  397. cc *rpcConfig
  398. method string
  399. requests []*testpb.SimpleRequest
  400. responses []*testpb.SimpleResponse
  401. err error
  402. }
  403. func (ed *expectedData) newClientHeaderEntry(client bool, rpcID, inRPCID uint64) *pb.GrpcLogEntry {
  404. logger := pb.GrpcLogEntry_LOGGER_CLIENT
  405. var peer *pb.Address
  406. if !client {
  407. logger = pb.GrpcLogEntry_LOGGER_SERVER
  408. ed.te.clientAddrMu.Lock()
  409. peer = &pb.Address{
  410. Address: ed.te.clientIP.String(),
  411. IpPort: uint32(ed.te.clientPort),
  412. }
  413. if ed.te.clientIP.To4() != nil {
  414. peer.Type = pb.Address_TYPE_IPV4
  415. } else {
  416. peer.Type = pb.Address_TYPE_IPV6
  417. }
  418. ed.te.clientAddrMu.Unlock()
  419. }
  420. return &pb.GrpcLogEntry{
  421. Timestamp: nil,
  422. CallId: rpcID,
  423. SequenceIdWithinCall: inRPCID,
  424. Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
  425. Logger: logger,
  426. Payload: &pb.GrpcLogEntry_ClientHeader{
  427. ClientHeader: &pb.ClientHeader{
  428. Metadata: binarylog.MdToMetadataProto(testMetadata),
  429. MethodName: ed.method,
  430. Authority: ed.te.srvAddr,
  431. },
  432. },
  433. Peer: peer,
  434. }
  435. }
  436. func (ed *expectedData) newServerHeaderEntry(client bool, rpcID, inRPCID uint64) *pb.GrpcLogEntry {
  437. logger := pb.GrpcLogEntry_LOGGER_SERVER
  438. var peer *pb.Address
  439. if client {
  440. logger = pb.GrpcLogEntry_LOGGER_CLIENT
  441. peer = &pb.Address{
  442. Address: ed.te.srvIP.String(),
  443. IpPort: uint32(ed.te.srvPort),
  444. }
  445. if ed.te.srvIP.To4() != nil {
  446. peer.Type = pb.Address_TYPE_IPV4
  447. } else {
  448. peer.Type = pb.Address_TYPE_IPV6
  449. }
  450. }
  451. return &pb.GrpcLogEntry{
  452. Timestamp: nil,
  453. CallId: rpcID,
  454. SequenceIdWithinCall: inRPCID,
  455. Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
  456. Logger: logger,
  457. Payload: &pb.GrpcLogEntry_ServerHeader{
  458. ServerHeader: &pb.ServerHeader{
  459. Metadata: binarylog.MdToMetadataProto(testMetadata),
  460. },
  461. },
  462. Peer: peer,
  463. }
  464. }
  465. func (ed *expectedData) newClientMessageEntry(client bool, rpcID, inRPCID uint64, msg *testpb.SimpleRequest) *pb.GrpcLogEntry {
  466. logger := pb.GrpcLogEntry_LOGGER_CLIENT
  467. if !client {
  468. logger = pb.GrpcLogEntry_LOGGER_SERVER
  469. }
  470. data, err := proto.Marshal(msg)
  471. if err != nil {
  472. grpclog.Infof("binarylogging_testing: failed to marshal proto message: %v", err)
  473. }
  474. return &pb.GrpcLogEntry{
  475. Timestamp: nil,
  476. CallId: rpcID,
  477. SequenceIdWithinCall: inRPCID,
  478. Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
  479. Logger: logger,
  480. Payload: &pb.GrpcLogEntry_Message{
  481. Message: &pb.Message{
  482. Length: uint32(len(data)),
  483. Data: data,
  484. },
  485. },
  486. }
  487. }
  488. func (ed *expectedData) newServerMessageEntry(client bool, rpcID, inRPCID uint64, msg *testpb.SimpleResponse) *pb.GrpcLogEntry {
  489. logger := pb.GrpcLogEntry_LOGGER_CLIENT
  490. if !client {
  491. logger = pb.GrpcLogEntry_LOGGER_SERVER
  492. }
  493. data, err := proto.Marshal(msg)
  494. if err != nil {
  495. grpclog.Infof("binarylogging_testing: failed to marshal proto message: %v", err)
  496. }
  497. return &pb.GrpcLogEntry{
  498. Timestamp: nil,
  499. CallId: rpcID,
  500. SequenceIdWithinCall: inRPCID,
  501. Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
  502. Logger: logger,
  503. Payload: &pb.GrpcLogEntry_Message{
  504. Message: &pb.Message{
  505. Length: uint32(len(data)),
  506. Data: data,
  507. },
  508. },
  509. }
  510. }
  511. func (ed *expectedData) newHalfCloseEntry(client bool, rpcID, inRPCID uint64) *pb.GrpcLogEntry {
  512. logger := pb.GrpcLogEntry_LOGGER_CLIENT
  513. if !client {
  514. logger = pb.GrpcLogEntry_LOGGER_SERVER
  515. }
  516. return &pb.GrpcLogEntry{
  517. Timestamp: nil,
  518. CallId: rpcID,
  519. SequenceIdWithinCall: inRPCID,
  520. Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
  521. Payload: nil, // No payload here.
  522. Logger: logger,
  523. }
  524. }
  525. func (ed *expectedData) newServerTrailerEntry(client bool, rpcID, inRPCID uint64, stErr error) *pb.GrpcLogEntry {
  526. logger := pb.GrpcLogEntry_LOGGER_SERVER
  527. var peer *pb.Address
  528. if client {
  529. logger = pb.GrpcLogEntry_LOGGER_CLIENT
  530. peer = &pb.Address{
  531. Address: ed.te.srvIP.String(),
  532. IpPort: uint32(ed.te.srvPort),
  533. }
  534. if ed.te.srvIP.To4() != nil {
  535. peer.Type = pb.Address_TYPE_IPV4
  536. } else {
  537. peer.Type = pb.Address_TYPE_IPV6
  538. }
  539. }
  540. st, ok := status.FromError(stErr)
  541. if !ok {
  542. grpclog.Info("binarylogging: error in trailer is not a status error")
  543. }
  544. stProto := st.Proto()
  545. var (
  546. detailsBytes []byte
  547. err error
  548. )
  549. if stProto != nil && len(stProto.Details) != 0 {
  550. detailsBytes, err = proto.Marshal(stProto)
  551. if err != nil {
  552. grpclog.Infof("binarylogging: failed to marshal status proto: %v", err)
  553. }
  554. }
  555. return &pb.GrpcLogEntry{
  556. Timestamp: nil,
  557. CallId: rpcID,
  558. SequenceIdWithinCall: inRPCID,
  559. Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
  560. Logger: logger,
  561. Payload: &pb.GrpcLogEntry_Trailer{
  562. Trailer: &pb.Trailer{
  563. Metadata: binarylog.MdToMetadataProto(testTrailerMetadata),
  564. // st will be nil if err was not a status error, but nil is ok.
  565. StatusCode: uint32(st.Code()),
  566. StatusMessage: st.Message(),
  567. StatusDetails: detailsBytes,
  568. },
  569. },
  570. Peer: peer,
  571. }
  572. }
  573. func (ed *expectedData) newCancelEntry(rpcID, inRPCID uint64) *pb.GrpcLogEntry {
  574. return &pb.GrpcLogEntry{
  575. Timestamp: nil,
  576. CallId: rpcID,
  577. SequenceIdWithinCall: inRPCID,
  578. Type: pb.GrpcLogEntry_EVENT_TYPE_CANCEL,
  579. Logger: pb.GrpcLogEntry_LOGGER_CLIENT,
  580. Payload: nil,
  581. }
  582. }
  583. func (ed *expectedData) toClientLogEntries() []*pb.GrpcLogEntry {
  584. var (
  585. ret []*pb.GrpcLogEntry
  586. idInRPC uint64 = 1
  587. )
  588. ret = append(ret, ed.newClientHeaderEntry(true, globalRPCID, idInRPC))
  589. idInRPC++
  590. switch ed.cc.callType {
  591. case unaryRPC, fullDuplexStreamRPC:
  592. for i := 0; i < len(ed.requests); i++ {
  593. ret = append(ret, ed.newClientMessageEntry(true, globalRPCID, idInRPC, ed.requests[i]))
  594. idInRPC++
  595. if i == 0 {
  596. // First message, append ServerHeader.
  597. ret = append(ret, ed.newServerHeaderEntry(true, globalRPCID, idInRPC))
  598. idInRPC++
  599. }
  600. if !ed.cc.success {
  601. // There is no response in the RPC error case.
  602. continue
  603. }
  604. ret = append(ret, ed.newServerMessageEntry(true, globalRPCID, idInRPC, ed.responses[i]))
  605. idInRPC++
  606. }
  607. if ed.cc.success && ed.cc.callType == fullDuplexStreamRPC {
  608. ret = append(ret, ed.newHalfCloseEntry(true, globalRPCID, idInRPC))
  609. idInRPC++
  610. }
  611. case clientStreamRPC, serverStreamRPC:
  612. for i := 0; i < len(ed.requests); i++ {
  613. ret = append(ret, ed.newClientMessageEntry(true, globalRPCID, idInRPC, ed.requests[i]))
  614. idInRPC++
  615. }
  616. if ed.cc.callType == clientStreamRPC {
  617. ret = append(ret, ed.newHalfCloseEntry(true, globalRPCID, idInRPC))
  618. idInRPC++
  619. }
  620. ret = append(ret, ed.newServerHeaderEntry(true, globalRPCID, idInRPC))
  621. idInRPC++
  622. if ed.cc.success {
  623. for i := 0; i < len(ed.responses); i++ {
  624. ret = append(ret, ed.newServerMessageEntry(true, globalRPCID, idInRPC, ed.responses[0]))
  625. idInRPC++
  626. }
  627. }
  628. }
  629. if ed.cc.callType == cancelRPC {
  630. ret = append(ret, ed.newCancelEntry(globalRPCID, idInRPC))
  631. idInRPC++
  632. } else {
  633. ret = append(ret, ed.newServerTrailerEntry(true, globalRPCID, idInRPC, ed.err))
  634. idInRPC++
  635. }
  636. return ret
  637. }
  638. func (ed *expectedData) toServerLogEntries() []*pb.GrpcLogEntry {
  639. var (
  640. ret []*pb.GrpcLogEntry
  641. idInRPC uint64 = 1
  642. )
  643. ret = append(ret, ed.newClientHeaderEntry(false, globalRPCID, idInRPC))
  644. idInRPC++
  645. switch ed.cc.callType {
  646. case unaryRPC:
  647. ret = append(ret, ed.newClientMessageEntry(false, globalRPCID, idInRPC, ed.requests[0]))
  648. idInRPC++
  649. ret = append(ret, ed.newServerHeaderEntry(false, globalRPCID, idInRPC))
  650. idInRPC++
  651. if ed.cc.success {
  652. ret = append(ret, ed.newServerMessageEntry(false, globalRPCID, idInRPC, ed.responses[0]))
  653. idInRPC++
  654. }
  655. case fullDuplexStreamRPC:
  656. ret = append(ret, ed.newServerHeaderEntry(false, globalRPCID, idInRPC))
  657. idInRPC++
  658. for i := 0; i < len(ed.requests); i++ {
  659. ret = append(ret, ed.newClientMessageEntry(false, globalRPCID, idInRPC, ed.requests[i]))
  660. idInRPC++
  661. if !ed.cc.success {
  662. // There is no response in the RPC error case.
  663. continue
  664. }
  665. ret = append(ret, ed.newServerMessageEntry(false, globalRPCID, idInRPC, ed.responses[i]))
  666. idInRPC++
  667. }
  668. if ed.cc.success && ed.cc.callType == fullDuplexStreamRPC {
  669. ret = append(ret, ed.newHalfCloseEntry(false, globalRPCID, idInRPC))
  670. idInRPC++
  671. }
  672. case clientStreamRPC:
  673. ret = append(ret, ed.newServerHeaderEntry(false, globalRPCID, idInRPC))
  674. idInRPC++
  675. for i := 0; i < len(ed.requests); i++ {
  676. ret = append(ret, ed.newClientMessageEntry(false, globalRPCID, idInRPC, ed.requests[i]))
  677. idInRPC++
  678. }
  679. if ed.cc.success {
  680. ret = append(ret, ed.newHalfCloseEntry(false, globalRPCID, idInRPC))
  681. idInRPC++
  682. ret = append(ret, ed.newServerMessageEntry(false, globalRPCID, idInRPC, ed.responses[0]))
  683. idInRPC++
  684. }
  685. case serverStreamRPC:
  686. ret = append(ret, ed.newClientMessageEntry(false, globalRPCID, idInRPC, ed.requests[0]))
  687. idInRPC++
  688. ret = append(ret, ed.newServerHeaderEntry(false, globalRPCID, idInRPC))
  689. idInRPC++
  690. for i := 0; i < len(ed.responses); i++ {
  691. ret = append(ret, ed.newServerMessageEntry(false, globalRPCID, idInRPC, ed.responses[0]))
  692. idInRPC++
  693. }
  694. }
  695. ret = append(ret, ed.newServerTrailerEntry(false, globalRPCID, idInRPC, ed.err))
  696. idInRPC++
  697. return ret
  698. }
  699. func runRPCs(t *testing.T, tc *testConfig, cc *rpcConfig) *expectedData {
  700. te := newTest(t, tc)
  701. te.startServer(&testServer{te: te})
  702. defer te.tearDown()
  703. expect := &expectedData{
  704. te: te,
  705. cc: cc,
  706. }
  707. switch cc.callType {
  708. case unaryRPC:
  709. expect.method = "/grpc.testing.TestService/UnaryCall"
  710. req, resp, err := te.doUnaryCall(cc)
  711. expect.requests = []*testpb.SimpleRequest{req}
  712. expect.responses = []*testpb.SimpleResponse{resp}
  713. expect.err = err
  714. case clientStreamRPC:
  715. expect.method = "/grpc.testing.TestService/ClientStreamCall"
  716. reqs, resp, err := te.doClientStreamCall(cc)
  717. expect.requests = reqs
  718. expect.responses = []*testpb.SimpleResponse{resp}
  719. expect.err = err
  720. case serverStreamRPC:
  721. expect.method = "/grpc.testing.TestService/ServerStreamCall"
  722. req, resps, err := te.doServerStreamCall(cc)
  723. expect.responses = resps
  724. expect.requests = []*testpb.SimpleRequest{req}
  725. expect.err = err
  726. case fullDuplexStreamRPC, cancelRPC:
  727. expect.method = "/grpc.testing.TestService/FullDuplexCall"
  728. expect.requests, expect.responses, expect.err = te.doFullDuplexCallRoundtrip(cc)
  729. }
  730. if cc.success != (expect.err == nil) {
  731. t.Fatalf("cc.success: %v, got error: %v", cc.success, expect.err)
  732. }
  733. te.cc.Close()
  734. te.srv.GracefulStop() // Wait for the server to stop.
  735. return expect
  736. }
  737. // equalLogEntry sorts the metadata entries by key (to compare metadata).
  738. //
  739. // This function is typically called with only two entries. It's written in this
  740. // way so the code can be put in a for loop instead of copied twice.
  741. func equalLogEntry(entries ...*pb.GrpcLogEntry) (equal bool) {
  742. for i, e := range entries {
  743. // Clear out some fields we don't compare.
  744. e.Timestamp = nil
  745. e.CallId = 0 // CallID is global to the binary, hard to compare.
  746. if h := e.GetClientHeader(); h != nil {
  747. h.Timeout = nil
  748. tmp := h.Metadata.Entry[:0]
  749. for _, e := range h.Metadata.Entry {
  750. tmp = append(tmp, e)
  751. }
  752. h.Metadata.Entry = tmp
  753. sort.Slice(h.Metadata.Entry, func(i, j int) bool { return h.Metadata.Entry[i].Key < h.Metadata.Entry[j].Key })
  754. }
  755. if h := e.GetServerHeader(); h != nil {
  756. tmp := h.Metadata.Entry[:0]
  757. for _, e := range h.Metadata.Entry {
  758. tmp = append(tmp, e)
  759. }
  760. h.Metadata.Entry = tmp
  761. sort.Slice(h.Metadata.Entry, func(i, j int) bool { return h.Metadata.Entry[i].Key < h.Metadata.Entry[j].Key })
  762. }
  763. if h := e.GetTrailer(); h != nil {
  764. sort.Slice(h.Metadata.Entry, func(i, j int) bool { return h.Metadata.Entry[i].Key < h.Metadata.Entry[j].Key })
  765. }
  766. if i > 0 && !proto.Equal(e, entries[i-1]) {
  767. return false
  768. }
  769. }
  770. return true
  771. }
  772. func testClientBinaryLog(t *testing.T, c *rpcConfig) error {
  773. defer testSink.clear()
  774. expect := runRPCs(t, &testConfig{}, c)
  775. want := expect.toClientLogEntries()
  776. var got []*pb.GrpcLogEntry
  777. // In racy cases, some entries are not logged when the RPC is finished (e.g.
  778. // context.Cancel).
  779. //
  780. // Check 10 times, with a sleep of 1/100 seconds between each check. Makes
  781. // it an 1-second wait in total.
  782. for i := 0; i < 10; i++ {
  783. got = testSink.logEntries(true) // all client entries.
  784. if len(want) == len(got) {
  785. break
  786. }
  787. time.Sleep(100 * time.Millisecond)
  788. }
  789. if len(want) != len(got) {
  790. for i, e := range want {
  791. t.Errorf("in want: %d, %s", i, e.GetType())
  792. }
  793. for i, e := range got {
  794. t.Errorf("in got: %d, %s", i, e.GetType())
  795. }
  796. return fmt.Errorf("didn't get same amount of log entries, want: %d, got: %d", len(want), len(got))
  797. }
  798. var errored bool
  799. for i := 0; i < len(got); i++ {
  800. if !equalLogEntry(want[i], got[i]) {
  801. t.Errorf("entry: %d, want %+v, got %+v", i, want[i], got[i])
  802. errored = true
  803. }
  804. }
  805. if errored {
  806. return fmt.Errorf("test failed")
  807. }
  808. return nil
  809. }
  810. func TestClientBinaryLogUnaryRPC(t *testing.T) {
  811. if err := testClientBinaryLog(t, &rpcConfig{success: true, callType: unaryRPC}); err != nil {
  812. t.Fatal(err)
  813. }
  814. }
  815. func TestClientBinaryLogUnaryRPCError(t *testing.T) {
  816. if err := testClientBinaryLog(t, &rpcConfig{success: false, callType: unaryRPC}); err != nil {
  817. t.Fatal(err)
  818. }
  819. }
  820. func TestClientBinaryLogClientStreamRPC(t *testing.T) {
  821. count := 5
  822. if err := testClientBinaryLog(t, &rpcConfig{count: count, success: true, callType: clientStreamRPC}); err != nil {
  823. t.Fatal(err)
  824. }
  825. }
  826. func TestClientBinaryLogClientStreamRPCError(t *testing.T) {
  827. count := 1
  828. if err := testClientBinaryLog(t, &rpcConfig{count: count, success: false, callType: clientStreamRPC}); err != nil {
  829. t.Fatal(err)
  830. }
  831. }
  832. func TestClientBinaryLogServerStreamRPC(t *testing.T) {
  833. count := 5
  834. if err := testClientBinaryLog(t, &rpcConfig{count: count, success: true, callType: serverStreamRPC}); err != nil {
  835. t.Fatal(err)
  836. }
  837. }
  838. func TestClientBinaryLogServerStreamRPCError(t *testing.T) {
  839. count := 5
  840. if err := testClientBinaryLog(t, &rpcConfig{count: count, success: false, callType: serverStreamRPC}); err != nil {
  841. t.Fatal(err)
  842. }
  843. }
  844. func TestClientBinaryLogFullDuplexRPC(t *testing.T) {
  845. count := 5
  846. if err := testClientBinaryLog(t, &rpcConfig{count: count, success: true, callType: fullDuplexStreamRPC}); err != nil {
  847. t.Fatal(err)
  848. }
  849. }
  850. func TestClientBinaryLogFullDuplexRPCError(t *testing.T) {
  851. count := 5
  852. if err := testClientBinaryLog(t, &rpcConfig{count: count, success: false, callType: fullDuplexStreamRPC}); err != nil {
  853. t.Fatal(err)
  854. }
  855. }
  856. func TestClientBinaryLogCancel(t *testing.T) {
  857. count := 5
  858. if err := testClientBinaryLog(t, &rpcConfig{count: count, success: false, callType: cancelRPC}); err != nil {
  859. t.Fatal(err)
  860. }
  861. }
  862. func testServerBinaryLog(t *testing.T, c *rpcConfig) error {
  863. defer testSink.clear()
  864. expect := runRPCs(t, &testConfig{}, c)
  865. want := expect.toServerLogEntries()
  866. var got []*pb.GrpcLogEntry
  867. // In racy cases, some entries are not logged when the RPC is finished (e.g.
  868. // context.Cancel). This is unlikely to happen on server side, but it does
  869. // no harm to retry.
  870. //
  871. // Check 10 times, with a sleep of 1/100 seconds between each check. Makes
  872. // it an 1-second wait in total.
  873. for i := 0; i < 10; i++ {
  874. got = testSink.logEntries(false) // all server entries.
  875. if len(want) == len(got) {
  876. break
  877. }
  878. time.Sleep(100 * time.Millisecond)
  879. }
  880. if len(want) != len(got) {
  881. for i, e := range want {
  882. t.Errorf("in want: %d, %s", i, e.GetType())
  883. }
  884. for i, e := range got {
  885. t.Errorf("in got: %d, %s", i, e.GetType())
  886. }
  887. return fmt.Errorf("didn't get same amount of log entries, want: %d, got: %d", len(want), len(got))
  888. }
  889. var errored bool
  890. for i := 0; i < len(got); i++ {
  891. if !equalLogEntry(want[i], got[i]) {
  892. t.Errorf("entry: %d, want %+v, got %+v", i, want[i], got[i])
  893. errored = true
  894. }
  895. }
  896. if errored {
  897. return fmt.Errorf("test failed")
  898. }
  899. return nil
  900. }
  901. func TestServerBinaryLogUnaryRPC(t *testing.T) {
  902. if err := testServerBinaryLog(t, &rpcConfig{success: true, callType: unaryRPC}); err != nil {
  903. t.Fatal(err)
  904. }
  905. }
  906. func TestServerBinaryLogUnaryRPCError(t *testing.T) {
  907. if err := testServerBinaryLog(t, &rpcConfig{success: false, callType: unaryRPC}); err != nil {
  908. t.Fatal(err)
  909. }
  910. }
  911. func TestServerBinaryLogClientStreamRPC(t *testing.T) {
  912. count := 5
  913. if err := testServerBinaryLog(t, &rpcConfig{count: count, success: true, callType: clientStreamRPC}); err != nil {
  914. t.Fatal(err)
  915. }
  916. }
  917. func TestServerBinaryLogClientStreamRPCError(t *testing.T) {
  918. count := 1
  919. if err := testServerBinaryLog(t, &rpcConfig{count: count, success: false, callType: clientStreamRPC}); err != nil {
  920. t.Fatal(err)
  921. }
  922. }
  923. func TestServerBinaryLogServerStreamRPC(t *testing.T) {
  924. count := 5
  925. if err := testServerBinaryLog(t, &rpcConfig{count: count, success: true, callType: serverStreamRPC}); err != nil {
  926. t.Fatal(err)
  927. }
  928. }
  929. func TestServerBinaryLogServerStreamRPCError(t *testing.T) {
  930. count := 5
  931. if err := testServerBinaryLog(t, &rpcConfig{count: count, success: false, callType: serverStreamRPC}); err != nil {
  932. t.Fatal(err)
  933. }
  934. }
  935. func TestServerBinaryLogFullDuplex(t *testing.T) {
  936. count := 5
  937. if err := testServerBinaryLog(t, &rpcConfig{count: count, success: true, callType: fullDuplexStreamRPC}); err != nil {
  938. t.Fatal(err)
  939. }
  940. }
  941. func TestServerBinaryLogFullDuplexError(t *testing.T) {
  942. count := 5
  943. if err := testServerBinaryLog(t, &rpcConfig{count: count, success: false, callType: fullDuplexStreamRPC}); err != nil {
  944. t.Fatal(err)
  945. }
  946. }