benchmark.go 11 KB


  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. //go:generate protoc -I grpc_testing --go_out=plugins=grpc:grpc_testing grpc_testing/control.proto grpc_testing/messages.proto grpc_testing/payloads.proto grpc_testing/services.proto grpc_testing/stats.proto
  19. /*
  20. Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks.
  21. */
  22. package benchmark
  23. import (
  24. "context"
  25. "fmt"
  26. "io"
  27. "net"
  28. "sync"
  29. "testing"
  30. "time"
  31. "google.golang.org/grpc"
  32. testpb "google.golang.org/grpc/benchmark/grpc_testing"
  33. "google.golang.org/grpc/benchmark/latency"
  34. "google.golang.org/grpc/benchmark/stats"
  35. "google.golang.org/grpc/grpclog"
  36. )
  37. // AddOne add 1 to the features slice
  38. func AddOne(features []int, featuresMaxPosition []int) {
  39. for i := len(features) - 1; i >= 0; i-- {
  40. features[i] = (features[i] + 1)
  41. if features[i]/featuresMaxPosition[i] == 0 {
  42. break
  43. }
  44. features[i] = features[i] % featuresMaxPosition[i]
  45. }
  46. }
  47. // Allows reuse of the same testpb.Payload object.
  48. func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
  49. if size < 0 {
  50. grpclog.Fatalf("Requested a response with invalid length %d", size)
  51. }
  52. body := make([]byte, size)
  53. switch t {
  54. case testpb.PayloadType_COMPRESSABLE:
  55. case testpb.PayloadType_UNCOMPRESSABLE:
  56. grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
  57. default:
  58. grpclog.Fatalf("Unsupported payload type: %d", t)
  59. }
  60. p.Type = t
  61. p.Body = body
  62. }
  63. func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
  64. p := new(testpb.Payload)
  65. setPayload(p, t, size)
  66. return p
  67. }
  68. type testServer struct {
  69. }
  70. func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  71. return &testpb.SimpleResponse{
  72. Payload: newPayload(in.ResponseType, int(in.ResponseSize)),
  73. }, nil
  74. }
  75. func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
  76. response := &testpb.SimpleResponse{
  77. Payload: new(testpb.Payload),
  78. }
  79. in := new(testpb.SimpleRequest)
  80. for {
  81. // use ServerStream directly to reuse the same testpb.SimpleRequest object
  82. err := stream.(grpc.ServerStream).RecvMsg(in)
  83. if err == io.EOF {
  84. // read done.
  85. return nil
  86. }
  87. if err != nil {
  88. return err
  89. }
  90. setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
  91. if err := stream.Send(response); err != nil {
  92. return err
  93. }
  94. }
  95. }
  96. // byteBufServer is a gRPC server that sends and receives byte buffer.
  97. // The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead.
  98. type byteBufServer struct {
  99. respSize int32
  100. }
  101. // UnaryCall is an empty function and is not used for benchmark.
  102. // If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated.
  103. func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  104. return &testpb.SimpleResponse{}, nil
  105. }
  106. func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
  107. for {
  108. var in []byte
  109. err := stream.(grpc.ServerStream).RecvMsg(&in)
  110. if err == io.EOF {
  111. return nil
  112. }
  113. if err != nil {
  114. return err
  115. }
  116. out := make([]byte, s.respSize)
  117. if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
  118. return err
  119. }
  120. }
  121. }
  122. // ServerInfo contains the information to create a gRPC benchmark server.
  123. type ServerInfo struct {
  124. // Type is the type of the server.
  125. // It should be "protobuf" or "bytebuf".
  126. Type string
  127. // Metadata is an optional configuration.
  128. // For "protobuf", it's ignored.
  129. // For "bytebuf", it should be an int representing response size.
  130. Metadata interface{}
  131. // Listener is the network listener for the server to use
  132. Listener net.Listener
  133. }
  134. // StartServer starts a gRPC server serving a benchmark service according to info.
  135. // It returns a function to stop the server.
  136. func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() {
  137. opts = append(opts, grpc.WriteBufferSize(128*1024))
  138. opts = append(opts, grpc.ReadBufferSize(128*1024))
  139. s := grpc.NewServer(opts...)
  140. switch info.Type {
  141. case "protobuf":
  142. testpb.RegisterBenchmarkServiceServer(s, &testServer{})
  143. case "bytebuf":
  144. respSize, ok := info.Metadata.(int32)
  145. if !ok {
  146. grpclog.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
  147. }
  148. testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
  149. default:
  150. grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
  151. }
  152. go s.Serve(info.Listener)
  153. return func() {
  154. s.Stop()
  155. }
  156. }
  157. // DoUnaryCall performs an unary RPC with given stub and request and response sizes.
  158. func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error {
  159. pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
  160. req := &testpb.SimpleRequest{
  161. ResponseType: pl.Type,
  162. ResponseSize: int32(respSize),
  163. Payload: pl,
  164. }
  165. if _, err := tc.UnaryCall(context.Background(), req); err != nil {
  166. return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
  167. }
  168. return nil
  169. }
  170. // DoStreamingRoundTrip performs a round trip for a single streaming rpc.
  171. func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
  172. pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
  173. req := &testpb.SimpleRequest{
  174. ResponseType: pl.Type,
  175. ResponseSize: int32(respSize),
  176. Payload: pl,
  177. }
  178. if err := stream.Send(req); err != nil {
  179. return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
  180. }
  181. if _, err := stream.Recv(); err != nil {
  182. // EOF is a valid error here.
  183. if err == io.EOF {
  184. return nil
  185. }
  186. return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
  187. }
  188. return nil
  189. }
  190. // DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer.
  191. func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
  192. out := make([]byte, reqSize)
  193. if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
  194. return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
  195. }
  196. var in []byte
  197. if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
  198. // EOF is a valid error here.
  199. if err == io.EOF {
  200. return nil
  201. }
  202. return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
  203. }
  204. return nil
  205. }
  206. // NewClientConn creates a gRPC client connection to addr.
  207. func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
  208. return NewClientConnWithContext(context.Background(), addr, opts...)
  209. }
  210. // NewClientConnWithContext creates a gRPC client connection to addr using ctx.
  211. func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn {
  212. opts = append(opts, grpc.WithWriteBufferSize(128*1024))
  213. opts = append(opts, grpc.WithReadBufferSize(128*1024))
  214. conn, err := grpc.DialContext(ctx, addr, opts...)
  215. if err != nil {
  216. grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
  217. }
  218. return conn
  219. }
  220. func runUnary(b *testing.B, benchFeatures stats.Features) {
  221. s := stats.AddStats(b, 38)
  222. nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
  223. lis, err := net.Listen("tcp", "localhost:0")
  224. if err != nil {
  225. grpclog.Fatalf("Failed to listen: %v", err)
  226. }
  227. target := lis.Addr().String()
  228. lis = nw.Listener(lis)
  229. stopper := StartServer(ServerInfo{Type: "protobuf", Listener: lis}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
  230. defer stopper()
  231. conn := NewClientConn(
  232. target, grpc.WithInsecure(),
  233. grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
  234. return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
  235. }),
  236. )
  237. tc := testpb.NewBenchmarkServiceClient(conn)
  238. // Warm up connection.
  239. for i := 0; i < 10; i++ {
  240. unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
  241. }
  242. ch := make(chan int, benchFeatures.MaxConcurrentCalls*4)
  243. var (
  244. mu sync.Mutex
  245. wg sync.WaitGroup
  246. )
  247. wg.Add(benchFeatures.MaxConcurrentCalls)
  248. // Distribute the b.N calls over maxConcurrentCalls workers.
  249. for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
  250. go func() {
  251. for range ch {
  252. start := time.Now()
  253. unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
  254. elapse := time.Since(start)
  255. mu.Lock()
  256. s.Add(elapse)
  257. mu.Unlock()
  258. }
  259. wg.Done()
  260. }()
  261. }
  262. b.ResetTimer()
  263. for i := 0; i < b.N; i++ {
  264. ch <- i
  265. }
  266. close(ch)
  267. wg.Wait()
  268. b.StopTimer()
  269. conn.Close()
  270. }
  271. func runStream(b *testing.B, benchFeatures stats.Features) {
  272. s := stats.AddStats(b, 38)
  273. nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
  274. lis, err := net.Listen("tcp", "localhost:0")
  275. if err != nil {
  276. grpclog.Fatalf("Failed to listen: %v", err)
  277. }
  278. target := lis.Addr().String()
  279. lis = nw.Listener(lis)
  280. stopper := StartServer(ServerInfo{Type: "protobuf", Listener: lis}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
  281. defer stopper()
  282. conn := NewClientConn(
  283. target, grpc.WithInsecure(),
  284. grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
  285. return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
  286. }),
  287. )
  288. tc := testpb.NewBenchmarkServiceClient(conn)
  289. // Warm up connection.
  290. stream, err := tc.StreamingCall(context.Background())
  291. if err != nil {
  292. b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
  293. }
  294. for i := 0; i < 10; i++ {
  295. streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
  296. }
  297. ch := make(chan struct{}, benchFeatures.MaxConcurrentCalls*4)
  298. var (
  299. mu sync.Mutex
  300. wg sync.WaitGroup
  301. )
  302. wg.Add(benchFeatures.MaxConcurrentCalls)
  303. // Distribute the b.N calls over maxConcurrentCalls workers.
  304. for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
  305. stream, err := tc.StreamingCall(context.Background())
  306. if err != nil {
  307. b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
  308. }
  309. go func() {
  310. for range ch {
  311. start := time.Now()
  312. streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
  313. elapse := time.Since(start)
  314. mu.Lock()
  315. s.Add(elapse)
  316. mu.Unlock()
  317. }
  318. wg.Done()
  319. }()
  320. }
  321. b.ResetTimer()
  322. for i := 0; i < b.N; i++ {
  323. ch <- struct{}{}
  324. }
  325. close(ch)
  326. wg.Wait()
  327. b.StopTimer()
  328. conn.Close()
  329. }
  330. func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
  331. if err := DoUnaryCall(client, reqSize, respSize); err != nil {
  332. grpclog.Fatalf("DoUnaryCall failed: %v", err)
  333. }
  334. }
  335. func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
  336. if err := DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
  337. grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err)
  338. }
  339. }