test_utils.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. //go:generate protoc --go_out=plugins=grpc:. grpc_testing/test.proto
  19. package interop
  20. import (
  21. "context"
  22. "fmt"
  23. "io"
  24. "io/ioutil"
  25. "strings"
  26. "time"
  27. "github.com/golang/protobuf/proto"
  28. "golang.org/x/oauth2"
  29. "golang.org/x/oauth2/google"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/codes"
  32. "google.golang.org/grpc/grpclog"
  33. testpb "google.golang.org/grpc/interop/grpc_testing"
  34. "google.golang.org/grpc/metadata"
  35. "google.golang.org/grpc/status"
  36. )
  37. var (
  38. reqSizes = []int{27182, 8, 1828, 45904}
  39. respSizes = []int{31415, 9, 2653, 58979}
  40. largeReqSize = 271828
  41. largeRespSize = 314159
  42. initialMetadataKey = "x-grpc-test-echo-initial"
  43. trailingMetadataKey = "x-grpc-test-echo-trailing-bin"
  44. )
  45. // ClientNewPayload returns a payload of the given type and size.
  46. func ClientNewPayload(t testpb.PayloadType, size int) *testpb.Payload {
  47. if size < 0 {
  48. grpclog.Fatalf("Requested a response with invalid length %d", size)
  49. }
  50. body := make([]byte, size)
  51. switch t {
  52. case testpb.PayloadType_COMPRESSABLE:
  53. case testpb.PayloadType_UNCOMPRESSABLE:
  54. grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
  55. default:
  56. grpclog.Fatalf("Unsupported payload type: %d", t)
  57. }
  58. return &testpb.Payload{
  59. Type: t,
  60. Body: body,
  61. }
  62. }
  63. // DoEmptyUnaryCall performs a unary RPC with empty request and response messages.
  64. func DoEmptyUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  65. reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, args...)
  66. if err != nil {
  67. grpclog.Fatal("/TestService/EmptyCall RPC failed: ", err)
  68. }
  69. if !proto.Equal(&testpb.Empty{}, reply) {
  70. grpclog.Fatalf("/TestService/EmptyCall receives %v, want %v", reply, testpb.Empty{})
  71. }
  72. }
  73. // DoLargeUnaryCall performs a unary RPC with large payload in the request and response.
  74. func DoLargeUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  75. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  76. req := &testpb.SimpleRequest{
  77. ResponseType: testpb.PayloadType_COMPRESSABLE,
  78. ResponseSize: int32(largeRespSize),
  79. Payload: pl,
  80. }
  81. reply, err := tc.UnaryCall(context.Background(), req, args...)
  82. if err != nil {
  83. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  84. }
  85. t := reply.GetPayload().GetType()
  86. s := len(reply.GetPayload().GetBody())
  87. if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize {
  88. grpclog.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize)
  89. }
  90. }
  91. // DoClientStreaming performs a client streaming RPC.
  92. func DoClientStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  93. stream, err := tc.StreamingInputCall(context.Background(), args...)
  94. if err != nil {
  95. grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
  96. }
  97. var sum int
  98. for _, s := range reqSizes {
  99. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, s)
  100. req := &testpb.StreamingInputCallRequest{
  101. Payload: pl,
  102. }
  103. if err := stream.Send(req); err != nil {
  104. grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
  105. }
  106. sum += s
  107. }
  108. reply, err := stream.CloseAndRecv()
  109. if err != nil {
  110. grpclog.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
  111. }
  112. if reply.GetAggregatedPayloadSize() != int32(sum) {
  113. grpclog.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
  114. }
  115. }
  116. // DoServerStreaming performs a server streaming RPC.
  117. func DoServerStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  118. respParam := make([]*testpb.ResponseParameters, len(respSizes))
  119. for i, s := range respSizes {
  120. respParam[i] = &testpb.ResponseParameters{
  121. Size: int32(s),
  122. }
  123. }
  124. req := &testpb.StreamingOutputCallRequest{
  125. ResponseType: testpb.PayloadType_COMPRESSABLE,
  126. ResponseParameters: respParam,
  127. }
  128. stream, err := tc.StreamingOutputCall(context.Background(), req, args...)
  129. if err != nil {
  130. grpclog.Fatalf("%v.StreamingOutputCall(_) = _, %v", tc, err)
  131. }
  132. var rpcStatus error
  133. var respCnt int
  134. var index int
  135. for {
  136. reply, err := stream.Recv()
  137. if err != nil {
  138. rpcStatus = err
  139. break
  140. }
  141. t := reply.GetPayload().GetType()
  142. if t != testpb.PayloadType_COMPRESSABLE {
  143. grpclog.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
  144. }
  145. size := len(reply.GetPayload().GetBody())
  146. if size != respSizes[index] {
  147. grpclog.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
  148. }
  149. index++
  150. respCnt++
  151. }
  152. if rpcStatus != io.EOF {
  153. grpclog.Fatalf("Failed to finish the server streaming rpc: %v", rpcStatus)
  154. }
  155. if respCnt != len(respSizes) {
  156. grpclog.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
  157. }
  158. }
  159. // DoPingPong performs ping-pong style bi-directional streaming RPC.
  160. func DoPingPong(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  161. stream, err := tc.FullDuplexCall(context.Background(), args...)
  162. if err != nil {
  163. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  164. }
  165. var index int
  166. for index < len(reqSizes) {
  167. respParam := []*testpb.ResponseParameters{
  168. {
  169. Size: int32(respSizes[index]),
  170. },
  171. }
  172. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, reqSizes[index])
  173. req := &testpb.StreamingOutputCallRequest{
  174. ResponseType: testpb.PayloadType_COMPRESSABLE,
  175. ResponseParameters: respParam,
  176. Payload: pl,
  177. }
  178. if err := stream.Send(req); err != nil {
  179. grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
  180. }
  181. reply, err := stream.Recv()
  182. if err != nil {
  183. grpclog.Fatalf("%v.Recv() = %v", stream, err)
  184. }
  185. t := reply.GetPayload().GetType()
  186. if t != testpb.PayloadType_COMPRESSABLE {
  187. grpclog.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
  188. }
  189. size := len(reply.GetPayload().GetBody())
  190. if size != respSizes[index] {
  191. grpclog.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
  192. }
  193. index++
  194. }
  195. if err := stream.CloseSend(); err != nil {
  196. grpclog.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
  197. }
  198. if _, err := stream.Recv(); err != io.EOF {
  199. grpclog.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
  200. }
  201. }
  202. // DoEmptyStream sets up a bi-directional streaming with zero message.
  203. func DoEmptyStream(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  204. stream, err := tc.FullDuplexCall(context.Background(), args...)
  205. if err != nil {
  206. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  207. }
  208. if err := stream.CloseSend(); err != nil {
  209. grpclog.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
  210. }
  211. if _, err := stream.Recv(); err != io.EOF {
  212. grpclog.Fatalf("%v failed to complete the empty stream test: %v", stream, err)
  213. }
  214. }
  215. // DoTimeoutOnSleepingServer performs an RPC on a sleep server which causes RPC timeout.
  216. func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  217. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
  218. defer cancel()
  219. stream, err := tc.FullDuplexCall(ctx, args...)
  220. if err != nil {
  221. if status.Code(err) == codes.DeadlineExceeded {
  222. return
  223. }
  224. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  225. }
  226. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
  227. req := &testpb.StreamingOutputCallRequest{
  228. ResponseType: testpb.PayloadType_COMPRESSABLE,
  229. Payload: pl,
  230. }
  231. if err := stream.Send(req); err != nil && err != io.EOF {
  232. grpclog.Fatalf("%v.Send(_) = %v", stream, err)
  233. }
  234. if _, err := stream.Recv(); status.Code(err) != codes.DeadlineExceeded {
  235. grpclog.Fatalf("%v.Recv() = _, %v, want error code %d", stream, err, codes.DeadlineExceeded)
  236. }
  237. }
  238. // DoComputeEngineCreds performs a unary RPC with compute engine auth.
  239. func DoComputeEngineCreds(tc testpb.TestServiceClient, serviceAccount, oauthScope string) {
  240. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  241. req := &testpb.SimpleRequest{
  242. ResponseType: testpb.PayloadType_COMPRESSABLE,
  243. ResponseSize: int32(largeRespSize),
  244. Payload: pl,
  245. FillUsername: true,
  246. FillOauthScope: true,
  247. }
  248. reply, err := tc.UnaryCall(context.Background(), req)
  249. if err != nil {
  250. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  251. }
  252. user := reply.GetUsername()
  253. scope := reply.GetOauthScope()
  254. if user != serviceAccount {
  255. grpclog.Fatalf("Got user name %q, want %q.", user, serviceAccount)
  256. }
  257. if !strings.Contains(oauthScope, scope) {
  258. grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  259. }
  260. }
  261. func getServiceAccountJSONKey(keyFile string) []byte {
  262. jsonKey, err := ioutil.ReadFile(keyFile)
  263. if err != nil {
  264. grpclog.Fatalf("Failed to read the service account key file: %v", err)
  265. }
  266. return jsonKey
  267. }
  268. // DoServiceAccountCreds performs a unary RPC with service account auth.
  269. func DoServiceAccountCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
  270. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  271. req := &testpb.SimpleRequest{
  272. ResponseType: testpb.PayloadType_COMPRESSABLE,
  273. ResponseSize: int32(largeRespSize),
  274. Payload: pl,
  275. FillUsername: true,
  276. FillOauthScope: true,
  277. }
  278. reply, err := tc.UnaryCall(context.Background(), req)
  279. if err != nil {
  280. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  281. }
  282. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  283. user := reply.GetUsername()
  284. scope := reply.GetOauthScope()
  285. if !strings.Contains(string(jsonKey), user) {
  286. grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  287. }
  288. if !strings.Contains(oauthScope, scope) {
  289. grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  290. }
  291. }
  292. // DoJWTTokenCreds performs a unary RPC with JWT token auth.
  293. func DoJWTTokenCreds(tc testpb.TestServiceClient, serviceAccountKeyFile string) {
  294. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  295. req := &testpb.SimpleRequest{
  296. ResponseType: testpb.PayloadType_COMPRESSABLE,
  297. ResponseSize: int32(largeRespSize),
  298. Payload: pl,
  299. FillUsername: true,
  300. }
  301. reply, err := tc.UnaryCall(context.Background(), req)
  302. if err != nil {
  303. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  304. }
  305. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  306. user := reply.GetUsername()
  307. if !strings.Contains(string(jsonKey), user) {
  308. grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  309. }
  310. }
  311. // GetToken obtains an OAUTH token from the input.
  312. func GetToken(serviceAccountKeyFile string, oauthScope string) *oauth2.Token {
  313. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  314. config, err := google.JWTConfigFromJSON(jsonKey, oauthScope)
  315. if err != nil {
  316. grpclog.Fatalf("Failed to get the config: %v", err)
  317. }
  318. token, err := config.TokenSource(context.Background()).Token()
  319. if err != nil {
  320. grpclog.Fatalf("Failed to get the token: %v", err)
  321. }
  322. return token
  323. }
  324. // DoOauth2TokenCreds performs a unary RPC with OAUTH2 token auth.
  325. func DoOauth2TokenCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
  326. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  327. req := &testpb.SimpleRequest{
  328. ResponseType: testpb.PayloadType_COMPRESSABLE,
  329. ResponseSize: int32(largeRespSize),
  330. Payload: pl,
  331. FillUsername: true,
  332. FillOauthScope: true,
  333. }
  334. reply, err := tc.UnaryCall(context.Background(), req)
  335. if err != nil {
  336. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  337. }
  338. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  339. user := reply.GetUsername()
  340. scope := reply.GetOauthScope()
  341. if !strings.Contains(string(jsonKey), user) {
  342. grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  343. }
  344. if !strings.Contains(oauthScope, scope) {
  345. grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  346. }
  347. }
  348. // DoPerRPCCreds performs a unary RPC with per RPC OAUTH2 token.
  349. func DoPerRPCCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
  350. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  351. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  352. req := &testpb.SimpleRequest{
  353. ResponseType: testpb.PayloadType_COMPRESSABLE,
  354. ResponseSize: int32(largeRespSize),
  355. Payload: pl,
  356. FillUsername: true,
  357. FillOauthScope: true,
  358. }
  359. token := GetToken(serviceAccountKeyFile, oauthScope)
  360. kv := map[string]string{"authorization": token.Type() + " " + token.AccessToken}
  361. ctx := metadata.NewOutgoingContext(context.Background(), metadata.MD{"authorization": []string{kv["authorization"]}})
  362. reply, err := tc.UnaryCall(ctx, req)
  363. if err != nil {
  364. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  365. }
  366. user := reply.GetUsername()
  367. scope := reply.GetOauthScope()
  368. if !strings.Contains(string(jsonKey), user) {
  369. grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  370. }
  371. if !strings.Contains(oauthScope, scope) {
  372. grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  373. }
  374. }
  375. var testMetadata = metadata.MD{
  376. "key1": []string{"value1"},
  377. "key2": []string{"value2"},
  378. }
  379. // DoCancelAfterBegin cancels the RPC after metadata has been sent but before payloads are sent.
  380. func DoCancelAfterBegin(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  381. ctx, cancel := context.WithCancel(metadata.NewOutgoingContext(context.Background(), testMetadata))
  382. stream, err := tc.StreamingInputCall(ctx, args...)
  383. if err != nil {
  384. grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
  385. }
  386. cancel()
  387. _, err = stream.CloseAndRecv()
  388. if status.Code(err) != codes.Canceled {
  389. grpclog.Fatalf("%v.CloseAndRecv() got error code %d, want %d", stream, status.Code(err), codes.Canceled)
  390. }
  391. }
  392. // DoCancelAfterFirstResponse cancels the RPC after receiving the first message from the server.
  393. func DoCancelAfterFirstResponse(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  394. ctx, cancel := context.WithCancel(context.Background())
  395. stream, err := tc.FullDuplexCall(ctx, args...)
  396. if err != nil {
  397. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  398. }
  399. respParam := []*testpb.ResponseParameters{
  400. {
  401. Size: 31415,
  402. },
  403. }
  404. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
  405. req := &testpb.StreamingOutputCallRequest{
  406. ResponseType: testpb.PayloadType_COMPRESSABLE,
  407. ResponseParameters: respParam,
  408. Payload: pl,
  409. }
  410. if err := stream.Send(req); err != nil {
  411. grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
  412. }
  413. if _, err := stream.Recv(); err != nil {
  414. grpclog.Fatalf("%v.Recv() = %v", stream, err)
  415. }
  416. cancel()
  417. if _, err := stream.Recv(); status.Code(err) != codes.Canceled {
  418. grpclog.Fatalf("%v compleled with error code %d, want %d", stream, status.Code(err), codes.Canceled)
  419. }
  420. }
  421. var (
  422. initialMetadataValue = "test_initial_metadata_value"
  423. trailingMetadataValue = "\x0a\x0b\x0a\x0b\x0a\x0b"
  424. customMetadata = metadata.Pairs(
  425. initialMetadataKey, initialMetadataValue,
  426. trailingMetadataKey, trailingMetadataValue,
  427. )
  428. )
  429. func validateMetadata(header, trailer metadata.MD) {
  430. if len(header[initialMetadataKey]) != 1 {
  431. grpclog.Fatalf("Expected exactly one header from server. Received %d", len(header[initialMetadataKey]))
  432. }
  433. if header[initialMetadataKey][0] != initialMetadataValue {
  434. grpclog.Fatalf("Got header %s; want %s", header[initialMetadataKey][0], initialMetadataValue)
  435. }
  436. if len(trailer[trailingMetadataKey]) != 1 {
  437. grpclog.Fatalf("Expected exactly one trailer from server. Received %d", len(trailer[trailingMetadataKey]))
  438. }
  439. if trailer[trailingMetadataKey][0] != trailingMetadataValue {
  440. grpclog.Fatalf("Got trailer %s; want %s", trailer[trailingMetadataKey][0], trailingMetadataValue)
  441. }
  442. }
  443. // DoCustomMetadata checks that metadata is echoed back to the client.
  444. func DoCustomMetadata(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  445. // Testing with UnaryCall.
  446. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
  447. req := &testpb.SimpleRequest{
  448. ResponseType: testpb.PayloadType_COMPRESSABLE,
  449. ResponseSize: int32(1),
  450. Payload: pl,
  451. }
  452. ctx := metadata.NewOutgoingContext(context.Background(), customMetadata)
  453. var header, trailer metadata.MD
  454. args = append(args, grpc.Header(&header), grpc.Trailer(&trailer))
  455. reply, err := tc.UnaryCall(
  456. ctx,
  457. req,
  458. args...,
  459. )
  460. if err != nil {
  461. grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
  462. }
  463. t := reply.GetPayload().GetType()
  464. s := len(reply.GetPayload().GetBody())
  465. if t != testpb.PayloadType_COMPRESSABLE || s != 1 {
  466. grpclog.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, 1)
  467. }
  468. validateMetadata(header, trailer)
  469. // Testing with FullDuplex.
  470. stream, err := tc.FullDuplexCall(ctx, args...)
  471. if err != nil {
  472. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  473. }
  474. respParam := []*testpb.ResponseParameters{
  475. {
  476. Size: 1,
  477. },
  478. }
  479. streamReq := &testpb.StreamingOutputCallRequest{
  480. ResponseType: testpb.PayloadType_COMPRESSABLE,
  481. ResponseParameters: respParam,
  482. Payload: pl,
  483. }
  484. if err := stream.Send(streamReq); err != nil {
  485. grpclog.Fatalf("%v has error %v while sending %v", stream, err, streamReq)
  486. }
  487. streamHeader, err := stream.Header()
  488. if err != nil {
  489. grpclog.Fatalf("%v.Header() = %v", stream, err)
  490. }
  491. if _, err := stream.Recv(); err != nil {
  492. grpclog.Fatalf("%v.Recv() = %v", stream, err)
  493. }
  494. if err := stream.CloseSend(); err != nil {
  495. grpclog.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
  496. }
  497. if _, err := stream.Recv(); err != io.EOF {
  498. grpclog.Fatalf("%v failed to complete the custom metadata test: %v", stream, err)
  499. }
  500. streamTrailer := stream.Trailer()
  501. validateMetadata(streamHeader, streamTrailer)
  502. }
  503. // DoStatusCodeAndMessage checks that the status code is propagated back to the client.
  504. func DoStatusCodeAndMessage(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  505. var code int32 = 2
  506. msg := "test status message"
  507. expectedErr := status.Error(codes.Code(code), msg)
  508. respStatus := &testpb.EchoStatus{
  509. Code: code,
  510. Message: msg,
  511. }
  512. // Test UnaryCall.
  513. req := &testpb.SimpleRequest{
  514. ResponseStatus: respStatus,
  515. }
  516. if _, err := tc.UnaryCall(context.Background(), req, args...); err == nil || err.Error() != expectedErr.Error() {
  517. grpclog.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
  518. }
  519. // Test FullDuplexCall.
  520. stream, err := tc.FullDuplexCall(context.Background(), args...)
  521. if err != nil {
  522. grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  523. }
  524. streamReq := &testpb.StreamingOutputCallRequest{
  525. ResponseStatus: respStatus,
  526. }
  527. if err := stream.Send(streamReq); err != nil {
  528. grpclog.Fatalf("%v has error %v while sending %v, want <nil>", stream, err, streamReq)
  529. }
  530. if err := stream.CloseSend(); err != nil {
  531. grpclog.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
  532. }
  533. if _, err = stream.Recv(); err.Error() != expectedErr.Error() {
  534. grpclog.Fatalf("%v.Recv() returned error %v, want %v", stream, err, expectedErr)
  535. }
  536. }
  537. // DoSpecialStatusMessage verifies Unicode and whitespace is correctly processed
  538. // in status message.
  539. func DoSpecialStatusMessage(tc testpb.TestServiceClient, args ...grpc.CallOption) {
  540. const (
  541. code int32 = 2
  542. msg string = "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP 😈\t\n"
  543. )
  544. expectedErr := status.Error(codes.Code(code), msg)
  545. req := &testpb.SimpleRequest{
  546. ResponseStatus: &testpb.EchoStatus{
  547. Code: code,
  548. Message: msg,
  549. },
  550. }
  551. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  552. defer cancel()
  553. if _, err := tc.UnaryCall(ctx, req, args...); err == nil || err.Error() != expectedErr.Error() {
  554. grpclog.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
  555. }
  556. }
  557. // DoUnimplementedService attempts to call a method from an unimplemented service.
  558. func DoUnimplementedService(tc testpb.UnimplementedServiceClient) {
  559. _, err := tc.UnimplementedCall(context.Background(), &testpb.Empty{})
  560. if status.Code(err) != codes.Unimplemented {
  561. grpclog.Fatalf("%v.UnimplementedCall() = _, %v, want _, %v", tc, status.Code(err), codes.Unimplemented)
  562. }
  563. }
  564. // DoUnimplementedMethod attempts to call an unimplemented method.
  565. func DoUnimplementedMethod(cc *grpc.ClientConn) {
  566. var req, reply proto.Message
  567. if err := cc.Invoke(context.Background(), "/grpc.testing.TestService/UnimplementedCall", req, reply); err == nil || status.Code(err) != codes.Unimplemented {
  568. grpclog.Fatalf("ClientConn.Invoke(_, _, _, _, _) = %v, want error code %s", err, codes.Unimplemented)
  569. }
  570. }
  571. type testServer struct {
  572. }
  573. // NewTestServer creates a test server for test service.
  574. func NewTestServer() testpb.TestServiceServer {
  575. return &testServer{}
  576. }
  577. func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  578. return new(testpb.Empty), nil
  579. }
  580. func serverNewPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) {
  581. if size < 0 {
  582. return nil, fmt.Errorf("requested a response with invalid length %d", size)
  583. }
  584. body := make([]byte, size)
  585. switch t {
  586. case testpb.PayloadType_COMPRESSABLE:
  587. case testpb.PayloadType_UNCOMPRESSABLE:
  588. return nil, fmt.Errorf("payloadType UNCOMPRESSABLE is not supported")
  589. default:
  590. return nil, fmt.Errorf("unsupported payload type: %d", t)
  591. }
  592. return &testpb.Payload{
  593. Type: t,
  594. Body: body,
  595. }, nil
  596. }
  597. func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  598. st := in.GetResponseStatus()
  599. if md, ok := metadata.FromIncomingContext(ctx); ok {
  600. if initialMetadata, ok := md[initialMetadataKey]; ok {
  601. header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
  602. grpc.SendHeader(ctx, header)
  603. }
  604. if trailingMetadata, ok := md[trailingMetadataKey]; ok {
  605. trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
  606. grpc.SetTrailer(ctx, trailer)
  607. }
  608. }
  609. if st != nil && st.Code != 0 {
  610. return nil, status.Error(codes.Code(st.Code), st.Message)
  611. }
  612. pl, err := serverNewPayload(in.GetResponseType(), in.GetResponseSize())
  613. if err != nil {
  614. return nil, err
  615. }
  616. return &testpb.SimpleResponse{
  617. Payload: pl,
  618. }, nil
  619. }
  620. func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
  621. cs := args.GetResponseParameters()
  622. for _, c := range cs {
  623. if us := c.GetIntervalUs(); us > 0 {
  624. time.Sleep(time.Duration(us) * time.Microsecond)
  625. }
  626. pl, err := serverNewPayload(args.GetResponseType(), c.GetSize())
  627. if err != nil {
  628. return err
  629. }
  630. if err := stream.Send(&testpb.StreamingOutputCallResponse{
  631. Payload: pl,
  632. }); err != nil {
  633. return err
  634. }
  635. }
  636. return nil
  637. }
  638. func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
  639. var sum int
  640. for {
  641. in, err := stream.Recv()
  642. if err == io.EOF {
  643. return stream.SendAndClose(&testpb.StreamingInputCallResponse{
  644. AggregatedPayloadSize: int32(sum),
  645. })
  646. }
  647. if err != nil {
  648. return err
  649. }
  650. p := in.GetPayload().GetBody()
  651. sum += len(p)
  652. }
  653. }
  654. func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
  655. if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
  656. if initialMetadata, ok := md[initialMetadataKey]; ok {
  657. header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
  658. stream.SendHeader(header)
  659. }
  660. if trailingMetadata, ok := md[trailingMetadataKey]; ok {
  661. trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
  662. stream.SetTrailer(trailer)
  663. }
  664. }
  665. for {
  666. in, err := stream.Recv()
  667. if err == io.EOF {
  668. // read done.
  669. return nil
  670. }
  671. if err != nil {
  672. return err
  673. }
  674. st := in.GetResponseStatus()
  675. if st != nil && st.Code != 0 {
  676. return status.Error(codes.Code(st.Code), st.Message)
  677. }
  678. cs := in.GetResponseParameters()
  679. for _, c := range cs {
  680. if us := c.GetIntervalUs(); us > 0 {
  681. time.Sleep(time.Duration(us) * time.Microsecond)
  682. }
  683. pl, err := serverNewPayload(in.GetResponseType(), c.GetSize())
  684. if err != nil {
  685. return err
  686. }
  687. if err := stream.Send(&testpb.StreamingOutputCallResponse{
  688. Payload: pl,
  689. }); err != nil {
  690. return err
  691. }
  692. }
  693. }
  694. }
  695. func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error {
  696. var msgBuf []*testpb.StreamingOutputCallRequest
  697. for {
  698. in, err := stream.Recv()
  699. if err == io.EOF {
  700. // read done.
  701. break
  702. }
  703. if err != nil {
  704. return err
  705. }
  706. msgBuf = append(msgBuf, in)
  707. }
  708. for _, m := range msgBuf {
  709. cs := m.GetResponseParameters()
  710. for _, c := range cs {
  711. if us := c.GetIntervalUs(); us > 0 {
  712. time.Sleep(time.Duration(us) * time.Microsecond)
  713. }
  714. pl, err := serverNewPayload(m.GetResponseType(), c.GetSize())
  715. if err != nil {
  716. return err
  717. }
  718. if err := stream.Send(&testpb.StreamingOutputCallResponse{
  719. Payload: pl,
  720. }); err != nil {
  721. return err
  722. }
  723. }
  724. }
  725. return nil
  726. }