main.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package main
  19. import (
  20. "context"
  21. "flag"
  22. "fmt"
  23. "os"
  24. "runtime"
  25. "runtime/pprof"
  26. "sync"
  27. "time"
  28. "google.golang.org/grpc"
  29. "google.golang.org/grpc/benchmark"
  30. testpb "google.golang.org/grpc/benchmark/grpc_testing"
  31. "google.golang.org/grpc/benchmark/stats"
  32. "google.golang.org/grpc/grpclog"
  33. "google.golang.org/grpc/internal/syscall"
  34. )
  35. var (
  36. port = flag.String("port", "50051", "Localhost port to connect to.")
  37. numRPC = flag.Int("r", 1, "The number of concurrent RPCs on each connection.")
  38. numConn = flag.Int("c", 1, "The number of parallel connections.")
  39. warmupDur = flag.Int("w", 10, "Warm-up duration in seconds")
  40. duration = flag.Int("d", 60, "Benchmark duration in seconds")
  41. rqSize = flag.Int("req", 1, "Request message size in bytes.")
  42. rspSize = flag.Int("resp", 1, "Response message size in bytes.")
  43. rpcType = flag.String("rpc_type", "unary",
  44. `Configure different client rpc type. Valid options are:
  45. unary;
  46. streaming.`)
  47. testName = flag.String("test_name", "", "Name of the test used for creating profiles.")
  48. wg sync.WaitGroup
  49. hopts = stats.HistogramOptions{
  50. NumBuckets: 2495,
  51. GrowthFactor: .01,
  52. }
  53. mu sync.Mutex
  54. hists []*stats.Histogram
  55. )
  56. func main() {
  57. flag.Parse()
  58. if *testName == "" {
  59. grpclog.Fatalf("test_name not set")
  60. }
  61. req := &testpb.SimpleRequest{
  62. ResponseType: testpb.PayloadType_COMPRESSABLE,
  63. ResponseSize: int32(*rspSize),
  64. Payload: &testpb.Payload{
  65. Type: testpb.PayloadType_COMPRESSABLE,
  66. Body: make([]byte, *rqSize),
  67. },
  68. }
  69. connectCtx, connectCancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
  70. defer connectCancel()
  71. ccs := buildConnections(connectCtx)
  72. warmDeadline := time.Now().Add(time.Duration(*warmupDur) * time.Second)
  73. endDeadline := warmDeadline.Add(time.Duration(*duration) * time.Second)
  74. cf, err := os.Create("/tmp/" + *testName + ".cpu")
  75. if err != nil {
  76. grpclog.Fatalf("Error creating file: %v", err)
  77. }
  78. defer cf.Close()
  79. pprof.StartCPUProfile(cf)
  80. cpuBeg := syscall.GetCPUTime()
  81. for _, cc := range ccs {
  82. runWithConn(cc, req, warmDeadline, endDeadline)
  83. }
  84. wg.Wait()
  85. cpu := time.Duration(syscall.GetCPUTime() - cpuBeg)
  86. pprof.StopCPUProfile()
  87. mf, err := os.Create("/tmp/" + *testName + ".mem")
  88. if err != nil {
  89. grpclog.Fatalf("Error creating file: %v", err)
  90. }
  91. defer mf.Close()
  92. runtime.GC() // materialize all statistics
  93. if err := pprof.WriteHeapProfile(mf); err != nil {
  94. grpclog.Fatalf("Error writing memory profile: %v", err)
  95. }
  96. hist := stats.NewHistogram(hopts)
  97. for _, h := range hists {
  98. hist.Merge(h)
  99. }
  100. parseHist(hist)
  101. fmt.Println("Client CPU utilization:", cpu)
  102. fmt.Println("Client CPU profile:", cf.Name())
  103. fmt.Println("Client Mem Profile:", mf.Name())
  104. }
  105. func buildConnections(ctx context.Context) []*grpc.ClientConn {
  106. ccs := make([]*grpc.ClientConn, *numConn)
  107. for i := range ccs {
  108. ccs[i] = benchmark.NewClientConnWithContext(ctx, "localhost:"+*port, grpc.WithInsecure(), grpc.WithBlock())
  109. }
  110. return ccs
  111. }
  112. func runWithConn(cc *grpc.ClientConn, req *testpb.SimpleRequest, warmDeadline, endDeadline time.Time) {
  113. for i := 0; i < *numRPC; i++ {
  114. wg.Add(1)
  115. go func() {
  116. defer wg.Done()
  117. caller := makeCaller(cc, req)
  118. hist := stats.NewHistogram(hopts)
  119. for {
  120. start := time.Now()
  121. if start.After(endDeadline) {
  122. mu.Lock()
  123. hists = append(hists, hist)
  124. mu.Unlock()
  125. return
  126. }
  127. caller()
  128. elapsed := time.Since(start)
  129. if start.After(warmDeadline) {
  130. hist.Add(elapsed.Nanoseconds())
  131. }
  132. }
  133. }()
  134. }
  135. }
  136. func makeCaller(cc *grpc.ClientConn, req *testpb.SimpleRequest) func() {
  137. client := testpb.NewBenchmarkServiceClient(cc)
  138. if *rpcType == "unary" {
  139. return func() {
  140. if _, err := client.UnaryCall(context.Background(), req); err != nil {
  141. grpclog.Fatalf("RPC failed: %v", err)
  142. }
  143. }
  144. }
  145. stream, err := client.StreamingCall(context.Background())
  146. if err != nil {
  147. grpclog.Fatalf("RPC failed: %v", err)
  148. }
  149. return func() {
  150. if err := stream.Send(req); err != nil {
  151. grpclog.Fatalf("Streaming RPC failed to send: %v", err)
  152. }
  153. if _, err := stream.Recv(); err != nil {
  154. grpclog.Fatalf("Streaming RPC failed to read: %v", err)
  155. }
  156. }
  157. }
  158. func parseHist(hist *stats.Histogram) {
  159. fmt.Println("qps:", float64(hist.Count)/float64(*duration))
  160. fmt.Printf("Latency: (50/90/99 %%ile): %v/%v/%v\n",
  161. time.Duration(median(.5, hist)),
  162. time.Duration(median(.9, hist)),
  163. time.Duration(median(.99, hist)))
  164. }
  165. func median(percentile float64, h *stats.Histogram) int64 {
  166. need := int64(float64(h.Count) * percentile)
  167. have := int64(0)
  168. for _, bucket := range h.Buckets {
  169. count := bucket.Count
  170. if have+count >= need {
  171. percent := float64(need-have) / float64(count)
  172. return int64((1.0-percent)*bucket.LowBound + percent*bucket.LowBound*(1.0+hopts.GrowthFactor))
  173. }
  174. have += bucket.Count
  175. }
  176. panic("should have found a bound")
  177. }