type benchmarkClient struct {
closeConns func()
- stop chan bool
lastResetTime time.Time
histogramOptions stats.HistogramOptions
lockingHistograms []lockingHistogram
}, nil
}
-func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benchmarkClient) error {
+func performRPCs(ctx context.Context, config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benchmarkClient) error {
// Read payload size and type from config.
var (
payloadReqSize, payloadRespSize int
switch config.RpcType {
case testpb.RpcType_UNARY:
- bc.unaryLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, poissonLambda)
+ bc.unaryLoop(ctx, conns, rpcCountPerConn, payloadReqSize, payloadRespSize, poissonLambda)
case testpb.RpcType_STREAMING:
- bc.streamingLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType, poissonLambda)
+ bc.streamingLoop(ctx, conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType, poissonLambda)
default:
return status.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType)
}
return nil
}
-func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) {
+func startBenchmarkClient(ctx context.Context, config *testpb.ClientConfig) (*benchmarkClient, error) {
printClientConfig(config)
// Set running environment like how many cores to use.
},
lockingHistograms: make([]lockingHistogram, rpcCountPerConn*len(conns)),
- stop: make(chan bool),
lastResetTime: time.Now(),
closeConns: closeConns,
rusageLastReset: syscall.GetRusage(),
}
- if err = performRPCs(config, conns, bc); err != nil {
+ if err = performRPCs(ctx, config, conns, bc); err != nil {
// Close all connections if performRPCs failed.
closeConns()
return nil, err
return bc, nil
}
-func (bc *benchmarkClient) unaryLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, poissonLambda *float64) {
+func (bc *benchmarkClient) unaryLoop(ctx context.Context, conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, poissonLambda *float64) {
for ic, conn := range conns {
client := testgrpc.NewBenchmarkServiceClient(conn)
// For each connection, create rpcCountPerConn goroutines to do rpc.
// before starting benchmark.
if poissonLambda == nil { // Closed loop.
for {
- select {
- case <-bc.stop:
- return
- default:
+ if ctx.Err() != nil {
+ break
}
start := time.Now()
if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
bc.poissonUnary(client, idx, reqSize, respSize, *poissonLambda)
})
}
-
}(idx)
}
}
}
-func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64) {
+func (bc *benchmarkClient) streamingLoop(ctx context.Context, conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64) {
var doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error
if payloadType == "bytebuf" {
doRPC = benchmark.DoByteBufStreamingRoundTrip
}
elapse := time.Since(start)
bc.lockingHistograms[idx].add(int64(elapse))
- select {
- case <-bc.stop:
+ if ctx.Err() != nil {
return
- default:
}
}
}(idx)
func (bc *benchmarkClient) poissonStreaming(stream testgrpc.BenchmarkService_StreamingCallClient, idx int, reqSize int, respSize int, lambda float64, doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error) {
go func() {
start := time.Now()
+
if err := doRPC(stream, reqSize, respSize); err != nil {
return
}
}
func (bc *benchmarkClient) shutdown() {
- close(bc.stop)
bc.closeConns()
}