123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- /*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package main
- import (
- "context"
- "flag"
- "fmt"
- "os"
- "runtime"
- "runtime/pprof"
- "sync"
- "time"
- "google.golang.org/grpc"
- "google.golang.org/grpc/benchmark"
- testpb "google.golang.org/grpc/benchmark/grpc_testing"
- "google.golang.org/grpc/benchmark/stats"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal/syscall"
- )
- var (
- port = flag.String("port", "50051", "Localhost port to connect to.")
- numRPC = flag.Int("r", 1, "The number of concurrent RPCs on each connection.")
- numConn = flag.Int("c", 1, "The number of parallel connections.")
- warmupDur = flag.Int("w", 10, "Warm-up duration in seconds")
- duration = flag.Int("d", 60, "Benchmark duration in seconds")
- rqSize = flag.Int("req", 1, "Request message size in bytes.")
- rspSize = flag.Int("resp", 1, "Response message size in bytes.")
- rpcType = flag.String("rpc_type", "unary",
- `Configure different client rpc type. Valid options are:
- unary;
- streaming.`)
- testName = flag.String("test_name", "", "Name of the test used for creating profiles.")
- wg sync.WaitGroup
- hopts = stats.HistogramOptions{
- NumBuckets: 2495,
- GrowthFactor: .01,
- }
- mu sync.Mutex
- hists []*stats.Histogram
- )
- func main() {
- flag.Parse()
- if *testName == "" {
- grpclog.Fatalf("test_name not set")
- }
- req := &testpb.SimpleRequest{
- ResponseType: testpb.PayloadType_COMPRESSABLE,
- ResponseSize: int32(*rspSize),
- Payload: &testpb.Payload{
- Type: testpb.PayloadType_COMPRESSABLE,
- Body: make([]byte, *rqSize),
- },
- }
- connectCtx, connectCancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
- defer connectCancel()
- ccs := buildConnections(connectCtx)
- warmDeadline := time.Now().Add(time.Duration(*warmupDur) * time.Second)
- endDeadline := warmDeadline.Add(time.Duration(*duration) * time.Second)
- cf, err := os.Create("/tmp/" + *testName + ".cpu")
- if err != nil {
- grpclog.Fatalf("Error creating file: %v", err)
- }
- defer cf.Close()
- pprof.StartCPUProfile(cf)
- cpuBeg := syscall.GetCPUTime()
- for _, cc := range ccs {
- runWithConn(cc, req, warmDeadline, endDeadline)
- }
- wg.Wait()
- cpu := time.Duration(syscall.GetCPUTime() - cpuBeg)
- pprof.StopCPUProfile()
- mf, err := os.Create("/tmp/" + *testName + ".mem")
- if err != nil {
- grpclog.Fatalf("Error creating file: %v", err)
- }
- defer mf.Close()
- runtime.GC() // materialize all statistics
- if err := pprof.WriteHeapProfile(mf); err != nil {
- grpclog.Fatalf("Error writing memory profile: %v", err)
- }
- hist := stats.NewHistogram(hopts)
- for _, h := range hists {
- hist.Merge(h)
- }
- parseHist(hist)
- fmt.Println("Client CPU utilization:", cpu)
- fmt.Println("Client CPU profile:", cf.Name())
- fmt.Println("Client Mem Profile:", mf.Name())
- }
- func buildConnections(ctx context.Context) []*grpc.ClientConn {
- ccs := make([]*grpc.ClientConn, *numConn)
- for i := range ccs {
- ccs[i] = benchmark.NewClientConnWithContext(ctx, "localhost:"+*port, grpc.WithInsecure(), grpc.WithBlock())
- }
- return ccs
- }
- func runWithConn(cc *grpc.ClientConn, req *testpb.SimpleRequest, warmDeadline, endDeadline time.Time) {
- for i := 0; i < *numRPC; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- caller := makeCaller(cc, req)
- hist := stats.NewHistogram(hopts)
- for {
- start := time.Now()
- if start.After(endDeadline) {
- mu.Lock()
- hists = append(hists, hist)
- mu.Unlock()
- return
- }
- caller()
- elapsed := time.Since(start)
- if start.After(warmDeadline) {
- hist.Add(elapsed.Nanoseconds())
- }
- }
- }()
- }
- }
- func makeCaller(cc *grpc.ClientConn, req *testpb.SimpleRequest) func() {
- client := testpb.NewBenchmarkServiceClient(cc)
- if *rpcType == "unary" {
- return func() {
- if _, err := client.UnaryCall(context.Background(), req); err != nil {
- grpclog.Fatalf("RPC failed: %v", err)
- }
- }
- }
- stream, err := client.StreamingCall(context.Background())
- if err != nil {
- grpclog.Fatalf("RPC failed: %v", err)
- }
- return func() {
- if err := stream.Send(req); err != nil {
- grpclog.Fatalf("Streaming RPC failed to send: %v", err)
- }
- if _, err := stream.Recv(); err != nil {
- grpclog.Fatalf("Streaming RPC failed to read: %v", err)
- }
- }
- }
- func parseHist(hist *stats.Histogram) {
- fmt.Println("qps:", float64(hist.Count)/float64(*duration))
- fmt.Printf("Latency: (50/90/99 %%ile): %v/%v/%v\n",
- time.Duration(median(.5, hist)),
- time.Duration(median(.9, hist)),
- time.Duration(median(.99, hist)))
- }
- func median(percentile float64, h *stats.Histogram) int64 {
- need := int64(float64(h.Count) * percentile)
- have := int64(0)
- for _, bucket := range h.Buckets {
- count := bucket.Count
- if have+count >= need {
- percent := float64(need-have) / float64(count)
- return int64((1.0-percent)*bucket.LowBound + percent*bucket.LowBound*(1.0+hopts.GrowthFactor))
- }
- have += bucket.Count
- }
- panic("should have found a bound")
- }
|