benchmark_client.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package main
  19. import (
  20. "context"
  21. "flag"
  22. "math"
  23. "runtime"
  24. "sync"
  25. "time"
  26. "google.golang.org/grpc"
  27. "google.golang.org/grpc/benchmark"
  28. testpb "google.golang.org/grpc/benchmark/grpc_testing"
  29. "google.golang.org/grpc/benchmark/stats"
  30. "google.golang.org/grpc/codes"
  31. "google.golang.org/grpc/credentials"
  32. "google.golang.org/grpc/grpclog"
  33. "google.golang.org/grpc/internal/syscall"
  34. "google.golang.org/grpc/status"
  35. "google.golang.org/grpc/testdata"
  36. )
  37. var caFile = flag.String("ca_file", "", "The file containing the CA root cert file")
  38. type lockingHistogram struct {
  39. mu sync.Mutex
  40. histogram *stats.Histogram
  41. }
  42. func (h *lockingHistogram) add(value int64) {
  43. h.mu.Lock()
  44. defer h.mu.Unlock()
  45. h.histogram.Add(value)
  46. }
  47. // swap sets h.histogram to o and returns its old value.
  48. func (h *lockingHistogram) swap(o *stats.Histogram) *stats.Histogram {
  49. h.mu.Lock()
  50. defer h.mu.Unlock()
  51. old := h.histogram
  52. h.histogram = o
  53. return old
  54. }
  55. func (h *lockingHistogram) mergeInto(merged *stats.Histogram) {
  56. h.mu.Lock()
  57. defer h.mu.Unlock()
  58. merged.Merge(h.histogram)
  59. }
  60. type benchmarkClient struct {
  61. closeConns func()
  62. stop chan bool
  63. lastResetTime time.Time
  64. histogramOptions stats.HistogramOptions
  65. lockingHistograms []lockingHistogram
  66. rusageLastReset *syscall.Rusage
  67. }
  68. func printClientConfig(config *testpb.ClientConfig) {
  69. // Some config options are ignored:
  70. // - client type:
  71. // will always create sync client
  72. // - async client threads.
  73. // - core list
  74. grpclog.Infof(" * client type: %v (ignored, always creates sync client)", config.ClientType)
  75. grpclog.Infof(" * async client threads: %v (ignored)", config.AsyncClientThreads)
  76. // TODO: use cores specified by CoreList when setting list of cores is supported in go.
  77. grpclog.Infof(" * core list: %v (ignored)", config.CoreList)
  78. grpclog.Infof(" - security params: %v", config.SecurityParams)
  79. grpclog.Infof(" - core limit: %v", config.CoreLimit)
  80. grpclog.Infof(" - payload config: %v", config.PayloadConfig)
  81. grpclog.Infof(" - rpcs per chann: %v", config.OutstandingRpcsPerChannel)
  82. grpclog.Infof(" - channel number: %v", config.ClientChannels)
  83. grpclog.Infof(" - load params: %v", config.LoadParams)
  84. grpclog.Infof(" - rpc type: %v", config.RpcType)
  85. grpclog.Infof(" - histogram params: %v", config.HistogramParams)
  86. grpclog.Infof(" - server targets: %v", config.ServerTargets)
  87. }
  88. func setupClientEnv(config *testpb.ClientConfig) {
  89. // Use all cpu cores available on machine by default.
  90. // TODO: Revisit this for the optimal default setup.
  91. if config.CoreLimit > 0 {
  92. runtime.GOMAXPROCS(int(config.CoreLimit))
  93. } else {
  94. runtime.GOMAXPROCS(runtime.NumCPU())
  95. }
  96. }
  97. // createConns creates connections according to given config.
  98. // It returns the connections and corresponding function to close them.
  99. // It returns non-nil error if there is anything wrong.
  100. func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error) {
  101. var opts []grpc.DialOption
  102. // Sanity check for client type.
  103. switch config.ClientType {
  104. case testpb.ClientType_SYNC_CLIENT:
  105. case testpb.ClientType_ASYNC_CLIENT:
  106. default:
  107. return nil, nil, status.Errorf(codes.InvalidArgument, "unknown client type: %v", config.ClientType)
  108. }
  109. // Check and set security options.
  110. if config.SecurityParams != nil {
  111. if *caFile == "" {
  112. *caFile = testdata.Path("ca.pem")
  113. }
  114. creds, err := credentials.NewClientTLSFromFile(*caFile, config.SecurityParams.ServerHostOverride)
  115. if err != nil {
  116. return nil, nil, status.Errorf(codes.InvalidArgument, "failed to create TLS credentials %v", err)
  117. }
  118. opts = append(opts, grpc.WithTransportCredentials(creds))
  119. } else {
  120. opts = append(opts, grpc.WithInsecure())
  121. }
  122. // Use byteBufCodec if it is required.
  123. if config.PayloadConfig != nil {
  124. switch config.PayloadConfig.Payload.(type) {
  125. case *testpb.PayloadConfig_BytebufParams:
  126. opts = append(opts, grpc.WithDefaultCallOptions(grpc.CallCustomCodec(byteBufCodec{})))
  127. case *testpb.PayloadConfig_SimpleParams:
  128. default:
  129. return nil, nil, status.Errorf(codes.InvalidArgument, "unknown payload config: %v", config.PayloadConfig)
  130. }
  131. }
  132. // Create connections.
  133. connCount := int(config.ClientChannels)
  134. conns := make([]*grpc.ClientConn, connCount)
  135. for connIndex := 0; connIndex < connCount; connIndex++ {
  136. conns[connIndex] = benchmark.NewClientConn(config.ServerTargets[connIndex%len(config.ServerTargets)], opts...)
  137. }
  138. return conns, func() {
  139. for _, conn := range conns {
  140. conn.Close()
  141. }
  142. }, nil
  143. }
  144. func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benchmarkClient) error {
  145. // Read payload size and type from config.
  146. var (
  147. payloadReqSize, payloadRespSize int
  148. payloadType string
  149. )
  150. if config.PayloadConfig != nil {
  151. switch c := config.PayloadConfig.Payload.(type) {
  152. case *testpb.PayloadConfig_BytebufParams:
  153. payloadReqSize = int(c.BytebufParams.ReqSize)
  154. payloadRespSize = int(c.BytebufParams.RespSize)
  155. payloadType = "bytebuf"
  156. case *testpb.PayloadConfig_SimpleParams:
  157. payloadReqSize = int(c.SimpleParams.ReqSize)
  158. payloadRespSize = int(c.SimpleParams.RespSize)
  159. payloadType = "protobuf"
  160. default:
  161. return status.Errorf(codes.InvalidArgument, "unknown payload config: %v", config.PayloadConfig)
  162. }
  163. }
  164. // TODO add open loop distribution.
  165. switch config.LoadParams.Load.(type) {
  166. case *testpb.LoadParams_ClosedLoop:
  167. case *testpb.LoadParams_Poisson:
  168. return status.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams)
  169. default:
  170. return status.Errorf(codes.InvalidArgument, "unknown load params: %v", config.LoadParams)
  171. }
  172. rpcCountPerConn := int(config.OutstandingRpcsPerChannel)
  173. switch config.RpcType {
  174. case testpb.RpcType_UNARY:
  175. bc.doCloseLoopUnary(conns, rpcCountPerConn, payloadReqSize, payloadRespSize)
  176. // TODO open loop.
  177. case testpb.RpcType_STREAMING:
  178. bc.doCloseLoopStreaming(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType)
  179. // TODO open loop.
  180. default:
  181. return status.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType)
  182. }
  183. return nil
  184. }
  185. func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) {
  186. printClientConfig(config)
  187. // Set running environment like how many cores to use.
  188. setupClientEnv(config)
  189. conns, closeConns, err := createConns(config)
  190. if err != nil {
  191. return nil, err
  192. }
  193. rpcCountPerConn := int(config.OutstandingRpcsPerChannel)
  194. bc := &benchmarkClient{
  195. histogramOptions: stats.HistogramOptions{
  196. NumBuckets: int(math.Log(config.HistogramParams.MaxPossible)/math.Log(1+config.HistogramParams.Resolution)) + 1,
  197. GrowthFactor: config.HistogramParams.Resolution,
  198. BaseBucketSize: (1 + config.HistogramParams.Resolution),
  199. MinValue: 0,
  200. },
  201. lockingHistograms: make([]lockingHistogram, rpcCountPerConn*len(conns)),
  202. stop: make(chan bool),
  203. lastResetTime: time.Now(),
  204. closeConns: closeConns,
  205. rusageLastReset: syscall.GetRusage(),
  206. }
  207. if err = performRPCs(config, conns, bc); err != nil {
  208. // Close all connections if performRPCs failed.
  209. closeConns()
  210. return nil, err
  211. }
  212. return bc, nil
  213. }
  214. func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int) {
  215. for ic, conn := range conns {
  216. client := testpb.NewBenchmarkServiceClient(conn)
  217. // For each connection, create rpcCountPerConn goroutines to do rpc.
  218. for j := 0; j < rpcCountPerConn; j++ {
  219. // Create histogram for each goroutine.
  220. idx := ic*rpcCountPerConn + j
  221. bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
  222. // Start goroutine on the created mutex and histogram.
  223. go func(idx int) {
  224. // TODO: do warm up if necessary.
  225. // Now relying on worker client to reserve time to do warm up.
  226. // The worker client needs to wait for some time after client is created,
  227. // before starting benchmark.
  228. done := make(chan bool)
  229. for {
  230. go func() {
  231. start := time.Now()
  232. if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
  233. select {
  234. case <-bc.stop:
  235. case done <- false:
  236. }
  237. return
  238. }
  239. elapse := time.Since(start)
  240. bc.lockingHistograms[idx].add(int64(elapse))
  241. select {
  242. case <-bc.stop:
  243. case done <- true:
  244. }
  245. }()
  246. select {
  247. case <-bc.stop:
  248. return
  249. case <-done:
  250. }
  251. }
  252. }(idx)
  253. }
  254. }
  255. }
  256. func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string) {
  257. var doRPC func(testpb.BenchmarkService_StreamingCallClient, int, int) error
  258. if payloadType == "bytebuf" {
  259. doRPC = benchmark.DoByteBufStreamingRoundTrip
  260. } else {
  261. doRPC = benchmark.DoStreamingRoundTrip
  262. }
  263. for ic, conn := range conns {
  264. // For each connection, create rpcCountPerConn goroutines to do rpc.
  265. for j := 0; j < rpcCountPerConn; j++ {
  266. c := testpb.NewBenchmarkServiceClient(conn)
  267. stream, err := c.StreamingCall(context.Background())
  268. if err != nil {
  269. grpclog.Fatalf("%v.StreamingCall(_) = _, %v", c, err)
  270. }
  271. // Create histogram for each goroutine.
  272. idx := ic*rpcCountPerConn + j
  273. bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
  274. // Start goroutine on the created mutex and histogram.
  275. go func(idx int) {
  276. // TODO: do warm up if necessary.
  277. // Now relying on worker client to reserve time to do warm up.
  278. // The worker client needs to wait for some time after client is created,
  279. // before starting benchmark.
  280. for {
  281. start := time.Now()
  282. if err := doRPC(stream, reqSize, respSize); err != nil {
  283. return
  284. }
  285. elapse := time.Since(start)
  286. bc.lockingHistograms[idx].add(int64(elapse))
  287. select {
  288. case <-bc.stop:
  289. return
  290. default:
  291. }
  292. }
  293. }(idx)
  294. }
  295. }
  296. }
  297. // getStats returns the stats for benchmark client.
  298. // It resets lastResetTime and all histograms if argument reset is true.
  299. func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats {
  300. var wallTimeElapsed, uTimeElapsed, sTimeElapsed float64
  301. mergedHistogram := stats.NewHistogram(bc.histogramOptions)
  302. if reset {
  303. // Merging histogram may take some time.
  304. // Put all histograms aside and merge later.
  305. toMerge := make([]*stats.Histogram, len(bc.lockingHistograms))
  306. for i := range bc.lockingHistograms {
  307. toMerge[i] = bc.lockingHistograms[i].swap(stats.NewHistogram(bc.histogramOptions))
  308. }
  309. for i := 0; i < len(toMerge); i++ {
  310. mergedHistogram.Merge(toMerge[i])
  311. }
  312. wallTimeElapsed = time.Since(bc.lastResetTime).Seconds()
  313. latestRusage := syscall.GetRusage()
  314. uTimeElapsed, sTimeElapsed = syscall.CPUTimeDiff(bc.rusageLastReset, latestRusage)
  315. bc.rusageLastReset = latestRusage
  316. bc.lastResetTime = time.Now()
  317. } else {
  318. // Merge only, not reset.
  319. for i := range bc.lockingHistograms {
  320. bc.lockingHistograms[i].mergeInto(mergedHistogram)
  321. }
  322. wallTimeElapsed = time.Since(bc.lastResetTime).Seconds()
  323. uTimeElapsed, sTimeElapsed = syscall.CPUTimeDiff(bc.rusageLastReset, syscall.GetRusage())
  324. }
  325. b := make([]uint32, len(mergedHistogram.Buckets))
  326. for i, v := range mergedHistogram.Buckets {
  327. b[i] = uint32(v.Count)
  328. }
  329. return &testpb.ClientStats{
  330. Latencies: &testpb.HistogramData{
  331. Bucket: b,
  332. MinSeen: float64(mergedHistogram.Min),
  333. MaxSeen: float64(mergedHistogram.Max),
  334. Sum: float64(mergedHistogram.Sum),
  335. SumOfSquares: float64(mergedHistogram.SumOfSquares),
  336. Count: float64(mergedHistogram.Count),
  337. },
  338. TimeElapsed: wallTimeElapsed,
  339. TimeUser: uTimeElapsed,
  340. TimeSystem: sTimeElapsed,
  341. }
  342. }
  343. func (bc *benchmarkClient) shutdown() {
  344. close(bc.stop)
  345. bc.closeConns()
  346. }