diff --git a/cmd/ghz/main.go b/cmd/ghz/main.go index b5445a16..b75276bf 100644 --- a/cmd/ghz/main.go +++ b/cmd/ghz/main.go @@ -77,18 +77,72 @@ var ( PlaceHolder(" ").IsSetByUser(&isAuthSet).String() // Run + isAsyncSet = false + async = kingpin.Flag("async", "Make async requests."). + Default("false").IsSetByUser(&isAsyncSet).Bool() + + isScheduleSet = false + schedule = kingpin.Flag("load-schedule", "Specifies the load schedule. Options are const, step, or line. Default is const."). + Default("const").IsSetByUser(&isScheduleSet).String() + + isQSet = false + q = kingpin.Flag("qps", "Queries per second (QPS) rate limit for constant load. Default is no rate limit."). + Default("0").Short('q').IsSetByUser(&isQSet).Uint() + + isLoadStartSet = false + loadStart = kingpin.Flag("load-start", "Specifies the load start value."). + Default("0").IsSetByUser(&isLoadStartSet).Uint() + + isLoadStepSet = false + loadStep = kingpin.Flag("load-step", "Specifies the load step value or slope value."). + Default("0").IsSetByUser(&isLoadStepSet).Uint() + + isLoadEndSet = false + loadEnd = kingpin.Flag("load-end", "Specifies the load end value."). + Default("0").IsSetByUser(&isLoadEndSet).Uint() + + isLoadStepDurSet = false + loadStepDuration = kingpin.Flag("load-step-duration", "Specifies the load step duration value for step load schedule."). + Default("0").IsSetByUser(&isLoadStepDurSet).Duration() + + isLoadMaxDurSet = false + loadMaxDuration = kingpin.Flag("load-max-duration", "Specifies the max load duration value for step or line."). + Default("0").IsSetByUser(&isLoadMaxDurSet).Duration() + + // Concurrency isCSet = false - c = kingpin.Flag("concurrency", "Number of requests to run concurrently. Total number of requests cannot be smaller than the concurrency level. Default is 50."). + c = kingpin.Flag("concurrency", "Number of request workers to run concurrently for const concurrency schedule. Default is 50."). Short('c').Default("50").IsSetByUser(&isCSet).Uint() + isCScheduleSet = false + cschdule = kingpin.Flag("concurrency-schedule", "Concurrency change schedule. Options are const, step, or line. Default is const."). + Default("const").IsSetByUser(&isCScheduleSet).String() + + isCMinSet = false + cmin = kingpin.Flag("concurrency-min", "Concurrency minimum / start value for step and line schedules."). + Default("0").IsSetByUser(&isCMinSet).Uint() + + isCMaxSet = false + cmax = kingpin.Flag("concurrency-max", "Concurrency maximum / end value for step and line schedules."). + Default("0").IsSetByUser(&isCMaxSet).Uint() + + isCStepSet = false + cstep = kingpin.Flag("concurrency-step", "Concurrency step / slope value for step and line schedules."). + Default("1").IsSetByUser(&isCStepSet).Uint() + + isCStepDurSet = false + cStepDuration = kingpin.Flag("concurrency-step-duration", "Specifies the concurrency step duration value for step concurrency schedule."). + Default("0").IsSetByUser(&isCStepDurSet).Duration() + + isCMaxDurSet = false + cMaxDuration = kingpin.Flag("concurrency-max-duration", "Specifies the max concurrency adjustment duration value for step or line concurrency schedule."). + Default("0").IsSetByUser(&isCMaxDurSet).Duration() + + // Other isNSet = false n = kingpin.Flag("total", "Number of requests to run. Default is 200."). Short('n').Default("200").IsSetByUser(&isNSet).Uint() - isQSet = false - q = kingpin.Flag("qps", "Rate limit, in queries per second (QPS). Default is no rate limit."). - Default("0").Short('q').IsSetByUser(&isQSet).Uint() - isTSet = false t = kingpin.Flag("timeout", "Timeout for each request. Default is 20s, use 0 for infinite."). Default("20s").Short('t').IsSetByUser(&isTSet).Duration() @@ -102,7 +156,7 @@ var ( Short('x').Default("0").IsSetByUser(&isXSet).Duration() isZStopSet = false - zstop = kingpin.Flag("duration-stop", "Specifies how duration stop is reported. Options are close, wait or ignore."). + zstop = kingpin.Flag("duration-stop", "Specifies how duration stop is reported. Options are close, wait or ignore. Default is close."). Default("close").IsSetByUser(&isZStopSet).String() // Data @@ -384,6 +438,19 @@ func createConfigFromArgs(cfg *runner.Config) error { cfg.ReflectMetadata = rmdMap cfg.Debug = *debug cfg.EnableCompression = *enableCompression + cfg.LoadSchedule = *schedule + cfg.LoadStart = *loadStart + cfg.LoadStep = *loadStep + cfg.LoadEnd = *loadEnd + cfg.LoadStepDuration = runner.Duration(*loadStepDuration) + cfg.LoadMaxDuration = runner.Duration(*loadMaxDuration) + cfg.Async = *async + cfg.CSchedule = *cschdule + cfg.CMin = *cmin + cfg.CStep = *cstep + cfg.CMax = *cmax + cfg.CStepDuration = runner.Duration(*cStepDuration) + cfg.CMaxDuration = runner.Duration(*cMaxDuration) return nil } @@ -393,6 +460,8 @@ func mergeConfig(dest *runner.Config, src *runner.Config) error { return errors.New("config cannot be nil") } + // proto + if isProtoSet { dest.Proto = src.Proto } @@ -405,6 +474,8 @@ func mergeConfig(dest *runner.Config, src *runner.Config) error { dest.Call = src.Call } + // security + if isCACertSet { dest.RootCert = src.RootCert } @@ -437,18 +508,12 @@ func mergeConfig(dest *runner.Config, src *runner.Config) error { dest.CName = src.CName } + // run + if isNSet { dest.N = src.N } - if isCSet { - dest.C = src.C - } - - if isQSet { - dest.QPS = src.QPS - } - if isZSet { dest.Z = src.Z } @@ -465,6 +530,8 @@ func mergeConfig(dest *runner.Config, src *runner.Config) error { dest.ZStop = src.ZStop } + // data + if isDataSet { dest.Data = src.Data } @@ -489,6 +556,8 @@ func mergeConfig(dest *runner.Config, src *runner.Config) error { dest.MetadataPath = src.MetadataPath } + // other + if isSISet { dest.SI = src.SI } @@ -541,6 +610,70 @@ func mergeConfig(dest *runner.Config, src *runner.Config) error { dest.Host = src.Host } + // load + + if isAsyncSet { + dest.Async = src.Async + } + + if isQSet { + dest.QPS = src.QPS + } + + if isScheduleSet { + dest.LoadSchedule = src.LoadSchedule + } + + if isLoadStartSet { + dest.LoadStart = src.LoadStart + } + + if isLoadStepSet { + dest.LoadStep = src.LoadStep + } + + if isLoadEndSet { + dest.LoadEnd = src.LoadEnd + } + + if isLoadStepDurSet { + dest.LoadStepDuration = src.LoadStepDuration + } + + if isLoadMaxDurSet { + dest.LoadMaxDuration = src.LoadMaxDuration + } + + // concurrency + + if isCSet { + dest.C = src.C + } + + if isCScheduleSet { + dest.CSchedule = src.CSchedule + } + + if isCMinSet { + dest.CMin = src.CMin + } + + if isCStepSet { + dest.CStep = src.CStep + } + + if isCMaxSet { + dest.CMax = src.CMax + } + + if isCStepDurSet { + dest.CStepDuration = src.CStepDuration + } + + if isCMaxDurSet { + dest.CMaxDuration = src.CMaxDuration + } + return nil } diff --git a/runner/config.go b/runner/config.go index 8de2f981..8a25901a 100644 --- a/runner/config.go +++ b/runner/config.go @@ -34,7 +34,6 @@ func (d Duration) String() string { // UnmarshalJSON is our custom unmarshaller with JSON support func (d *Duration) UnmarshalJSON(text []byte) error { - // strValue := string(text) first := text[0] last := text[len(text)-1] if first == '"' && last == '"' { @@ -68,7 +67,14 @@ type Config struct { Authority string `json:"authority" toml:"authority" yaml:"authority"` Insecure bool `json:"insecure,omitempty" toml:"insecure,omitempty" yaml:"insecure,omitempty"` N uint `json:"total" toml:"total" yaml:"total" default:"200"` + Async bool `json:"async,omitempty" toml:"async,omitempty" yaml:"async,omitempty"` C uint `json:"concurrency" toml:"concurrency" yaml:"concurrency" default:"50"` + CSchedule string `json:"concurrency-schedule" toml:"concurrency-schedule" yaml:"concurrency-schedule" default:"const"` + CMin uint `json:"concurrency-min" toml:"concurrency-min" yaml:"concurrency-min" default:"1"` + CMax uint `json:"concurrency-max" toml:"concurrency-max" yaml:"concurrency-max" default:"0"` + CStep uint `json:"concurrency-step" toml:"concurrency-step" yaml:"concurrency-step" default:"0"` + CStepDuration Duration `json:"concurrency-step-duration" toml:"concurrency-step-duration" yaml:"concurrency-step-duration" default:"0"` + CMaxDuration Duration `json:"concurrency-max-duration" toml:"concurrency-max-duration" yaml:"concurrency-max-duration" default:"0"` Connections uint `json:"connections" toml:"connections" yaml:"connections" default:"1"` QPS uint `json:"qps" toml:"qps" yaml:"qps"` Z Duration `json:"duration" toml:"duration" yaml:"duration"` @@ -94,6 +100,12 @@ type Config struct { Debug string `json:"debug,omitempty" toml:"debug,omitempty" yaml:"debug,omitempty"` Host string `json:"host" toml:"host" yaml:"host"` EnableCompression bool `json:"enable-compression,omitempty" toml:"enable-compression,omitempty" yaml:"enable-compression,omitempty"` + LoadSchedule string `json:"load-schedule" toml:"load-schedule" yaml:"load-schedule" default:"const"` + LoadStart uint `json:"load-start" toml:"load-start" yaml:"load-start"` + LoadEnd uint `json:"load-end" toml:"load-end" yaml:"load-end"` + LoadStep uint `json:"load-step" toml:"load-step" yaml:"load-step"` + LoadStepDuration Duration `json:"load-step-duration" toml:"load-step-duration" yaml:"load-step-duration"` + LoadMaxDuration Duration `json:"load-max-duration" toml:"load-max-duration" yaml:"load-max-duration"` } func checkData(data interface{}) error { diff --git a/runner/config_test.go b/runner/config_test.go index 6b24351f..f9e62e00 100644 --- a/runner/config_test.go +++ b/runner/config_test.go @@ -58,8 +58,11 @@ func TestConfig_Load(t *testing.T) { Data: map[string]interface{}{ "f_strings": []interface{}{"123", "456"}, }, - Format: "summary", - DialTimeout: Duration(10 * time.Second), + Format: "summary", + DialTimeout: Duration(10 * time.Second), + LoadSchedule: "const", + CSchedule: "const", + CMin: 1, }, true, }, diff --git a/runner/options.go b/runner/options.go index 45706b40..4bdb9d7c 100644 --- a/runner/options.go +++ b/runner/options.go @@ -19,6 +19,15 @@ import ( "google.golang.org/grpc/credentials" ) +// ScheduleConst is a constant load schedule +const ScheduleConst = "const" + +// ScheduleStep is the step load schedule +const ScheduleStep = "step" + +// ScheduleLine is the line load schedule +const ScheduleLine = "line" + // RunConfig represents the request Configs type RunConfig struct { // call settings @@ -39,10 +48,27 @@ type RunConfig struct { insecure bool authority string + // load + qps int + loadStart uint + loadEnd uint + loadStep uint + loadSchedule string + loadDuration time.Duration + loadStepDuration time.Duration + + // concurrency + c int + cMin uint + cMax uint + cStep uint + cSchedule string + cMaxDuration time.Duration + cStepDuration time.Duration + // test - n int - c int - qps int + n int + async bool // number of connections nConns int @@ -573,15 +599,16 @@ func NewConfig(call, host string, options ...Option) (*RunConfig, error) { // init with defaults c := &RunConfig{ - call: call, - host: host, - n: 200, - c: 50, - nConns: 1, - timeout: time.Duration(20 * time.Second), - dialTimeout: time.Duration(10 * time.Second), - cpus: runtime.GOMAXPROCS(-1), - zstop: "close", + call: call, + host: host, + n: 200, + c: 50, + nConns: 1, + timeout: time.Duration(20 * time.Second), + dialTimeout: time.Duration(10 * time.Second), + cpus: runtime.GOMAXPROCS(-1), + zstop: "close", + loadSchedule: ScheduleConst, } // apply options @@ -600,20 +627,60 @@ func NewConfig(call, host string, options ...Option) (*RunConfig, error) { // checks if c.nConns > c.c { - return nil, errors.New("Number of connections cannot be greater than concurrency") + return nil, errors.New("number of connections cannot be greater than concurrency") } if c.call == "" { - return nil, errors.New("Call required") + return nil, errors.New("call required") } if c.host == "" { - return nil, errors.New("Host required") + return nil, errors.New("host required") + } + + if c.loadSchedule != ScheduleConst && + c.loadSchedule != ScheduleStep && + c.loadSchedule != ScheduleLine { + return nil, fmt.Errorf(`schedule much be "%s", "%s", or "%s"`, + ScheduleConst, ScheduleStep, ScheduleLine) + } + + if c.loadSchedule == ScheduleStep || c.loadSchedule == ScheduleLine { + if c.loadStart == c.loadEnd { + return nil, errors.New("load start cannot equal load end") + } + + // step value for step schedule or + // slope for line schedule + if c.loadStep == 0 { + return nil, errors.New("invalid load step") + } + } + + if c.cSchedule == ScheduleStep || c.cSchedule == ScheduleLine { + if c.cMin == c.cMax { + return nil, errors.New("concurrency min start cannot equal concurrency max") + } + + // step value for step schedule or + // slope for line schedule + if c.cStep == 0 { + return nil, errors.New("invalid concurrency step") + } + } + + if c.loadSchedule == ScheduleLine { + c.loadStepDuration = time.Second + } + + if c.cSchedule == ScheduleLine { + c.cStepDuration = time.Second } if c.skipFirst > 0 && int(c.skipFirst) > c.n { - return nil, errors.New("You cannot skip more requests than those run") + return nil, errors.New("you cannot skip more requests than those run") } + creds, err := createClientTransportCredentials( c.skipVerify, c.cacert, @@ -641,6 +708,137 @@ func WithEnableCompression(enableCompression bool) Option { } } +// WithLoadSchedule specifies the load schedule +// WithLoadSchedule("const") +func WithLoadSchedule(schedule string) Option { + return func(o *RunConfig) error { + s := strings.TrimSpace(schedule) + if len(s) > 0 { + o.loadSchedule = strings.ToLower(s) + } + + return nil + } +} + +// WithLoadStart specifies the load start +// WithLoadStart(5) +func WithLoadStart(start uint) Option { + return func(o *RunConfig) error { + o.loadStart = start + + return nil + } +} + +// WithLoadEnd specifies the load end +// WithLoadEnd(25) +func WithLoadEnd(end uint) Option { + return func(o *RunConfig) error { + o.loadEnd = end + + return nil + } +} + +// WithLoadStep specifies the load step +// WithLoadStep(5) +func WithLoadStep(step uint) Option { + return func(o *RunConfig) error { + o.loadStep = step + + return nil + } +} + +// WithLoadStepDuration specifies the load step duration for step schedule +func WithLoadStepDuration(duration time.Duration) Option { + return func(o *RunConfig) error { + o.loadStepDuration = duration + + return nil + } +} + +// WithLoadDuration specifies the load duration +func WithLoadDuration(duration time.Duration) Option { + return func(o *RunConfig) error { + o.loadDuration = duration + + return nil + } +} + +// WithAsync specifies the async option +func WithAsync(async bool) Option { + return func(o *RunConfig) error { + o.async = async + + return nil + } +} + +// WithConcurrencySchedule specifies the concurrency adjustment schedule +// WithConcurrencySchedule("const") +func WithConcurrencySchedule(schedule string) Option { + return func(o *RunConfig) error { + s := strings.TrimSpace(schedule) + if len(s) > 0 { + o.cSchedule = strings.ToLower(s) + } + + return nil + } +} + +// WithConcurrencyMin specifies the concurrency minimum for line or step schedule +// WithConcurrencyMin(5) +func WithConcurrencyMin(min uint) Option { + return func(o *RunConfig) error { + o.cMin = min + + return nil + } +} + +// WithConcurrencyMax specifies the concurrency maximum value for line or step schedule +// WithConcurrencyMax(25) +func WithConcurrencyMax(max uint) Option { + return func(o *RunConfig) error { + o.cMax = max + + return nil + } +} + +// WithConcurrencyStep specifies the concurrency step value or slope +// WithConcurrencyStep(5) +func WithConcurrencyStep(step uint) Option { + return func(o *RunConfig) error { + o.cStep = step + + return nil + } +} + +// WithConcurrencyStepDuration specifies the concurrency step duration for step schedule +func WithConcurrencyStepDuration(duration time.Duration) Option { + return func(o *RunConfig) error { + o.cStepDuration = duration + + return nil + } +} + +// WithConcurrencyDuration specifies the total concurrency adjustment duration +func WithConcurrencyDuration(duration time.Duration) Option { + return func(o *RunConfig) error { + o.cMaxDuration = duration + + return nil + } +} + func createClientTransportCredentials(skipVerify bool, cacertFile, clientCertFile, clientKeyFile, cname string) (credentials.TransportCredentials, error) { var tlsConf tls.Config @@ -715,6 +913,19 @@ func fromConfig(cfg *Config) []Option { WithConnections(cfg.Connections), WithEnableCompression(cfg.EnableCompression), WithDurationStopAction(cfg.ZStop), + WithLoadSchedule(cfg.LoadSchedule), + WithLoadStart(cfg.LoadStart), + WithLoadStep(cfg.LoadStep), + WithLoadStepDuration(time.Duration(cfg.LoadStepDuration)), + WithLoadEnd(cfg.LoadEnd), + WithLoadDuration(time.Duration(cfg.LoadMaxDuration)), + WithAsync(cfg.Async), + WithConcurrencySchedule(cfg.LoadSchedule), + WithConcurrencyMin(cfg.CMin), + WithConcurrencyMax(cfg.CMax), + WithConcurrencyStep(cfg.CStep), + WithConcurrencyStepDuration(time.Duration(cfg.CStepDuration)), + WithConcurrencyDuration(time.Duration(cfg.CMaxDuration)), func(o *RunConfig) error { o.call = cfg.Call return nil diff --git a/runner/options_test.go b/runner/options_test.go index 844e581b..ee348803 100644 --- a/runner/options_test.go +++ b/runner/options_test.go @@ -65,6 +65,15 @@ func TestRunConfig_newRunConfig(t *testing.T) { assert.Equal(t, c.enableCompression, false) }) + t.Run("skipFirst > n", func(t *testing.T) { + _, err := NewConfig(" call ", " localhost:50050 ", + WithProtoFile("testdata/data.proto", []string{}), + WithSkipFirst(1000), + ) + + assert.Error(t, err) + }) + t.Run("with options", func(t *testing.T) { c, err := NewConfig( "call", "localhost:50050", @@ -462,4 +471,153 @@ func TestRunConfig_newRunConfig(t *testing.T) { assert.Equal(t, true, c.insecure) }) }) + + t.Run("invalid schedule", func(t *testing.T) { + _, err := NewConfig(" call ", " localhost:50050 ", + WithProtoFile("testdata/data.proto", []string{}), + WithLoadSchedule("foo"), + ) + + assert.Error(t, err) + }) + + t.Run("with load step", func(t *testing.T) { + t.Run("no step", func(t *testing.T) { + _, err := NewConfig(" call ", " localhost:50050 ", + WithProtoFile("testdata/data.proto", []string{}), + WithLoadSchedule(ScheduleStep), + WithLoadStart(10), + WithLoadDuration(20*time.Second), + WithLoadEnd(20), + ) + + assert.Error(t, err) + }) + + t.Run("no step duration", func(t *testing.T) { + _, err := NewConfig(" call ", " localhost:50050 ", + WithProtoFile("testdata/data.proto", []string{}), + WithLoadSchedule(ScheduleStep), + WithLoadStep(5), + ) + + assert.Error(t, err) + }) + + t.Run("with all load settings", func(t *testing.T) { + c, err := NewConfig(" call ", " localhost:50050 ", + WithProtoFile("testdata/data.proto", []string{}), + WithLoadSchedule(ScheduleStep), + WithLoadStep(5), + WithLoadStart(10), + WithLoadDuration(20*time.Second), + WithLoadStepDuration(5*time.Second), + WithLoadEnd(20), + ) + + assert.NoError(t, err) + + assert.Equal(t, ScheduleStep, c.loadSchedule) + assert.Equal(t, uint(10), c.loadStart) + assert.Equal(t, uint(20), c.loadEnd) + assert.Equal(t, 20*time.Second, c.loadDuration) + assert.Equal(t, uint(5), c.loadStep) + assert.Equal(t, 5*time.Second, c.loadStepDuration) + }) + }) + + t.Run("with load line", func(t *testing.T) { + t.Run("no step", func(t *testing.T) { + _, err := NewConfig(" call ", " localhost:50050 ", + WithProtoFile("testdata/data.proto", []string{}), + WithLoadSchedule(ScheduleLine), + ) + + assert.Error(t, err) + }) + + t.Run("with all setting", func(t *testing.T) { + c, err := NewConfig(" call ", " localhost:50050 ", + WithProtoFile("testdata/data.proto", []string{}), + WithLoadSchedule(ScheduleLine), + WithLoadDuration(20*time.Second), + WithLoadStepDuration(5*time.Second), // overwritten + WithLoadEnd(20), + WithLoadStep(2), + ) + + assert.NoError(t, err) + + assert.Equal(t, ScheduleLine, c.loadSchedule) + assert.Equal(t, uint(0), c.loadStart) + assert.Equal(t, uint(20), c.loadEnd) + assert.Equal(t, 20*time.Second, c.loadDuration) + assert.Equal(t, uint(2), c.loadStep) + assert.Equal(t, 1*time.Second, c.loadStepDuration) + }) + }) + + t.Run("with concurrency step", func(t *testing.T) { + t.Run("no step", func(t *testing.T) { + _, err := NewConfig(" call ", " localhost:50050 ", + WithProtoFile("testdata/data.proto", []string{}), + WithConcurrencySchedule(ScheduleStep), + ) + + assert.Error(t, err) + }) + + t.Run("with all concurrency settings", func(t *testing.T) { + c, err := NewConfig(" call ", " localhost:50050 ", + WithProtoFile("testdata/data.proto", []string{}), + WithConcurrencySchedule(ScheduleStep), + WithConcurrencyStep(5), + WithConcurrencyMin(10), + WithConcurrencyDuration(20*time.Second), + WithConcurrencyStepDuration(5*time.Second), + WithConcurrencyMax(20), + ) + + assert.NoError(t, err) + + assert.Equal(t, ScheduleStep, c.cSchedule) + assert.Equal(t, uint(10), c.cMin) + assert.Equal(t, uint(20), c.cMax) + assert.Equal(t, 20*time.Second, c.cMaxDuration) + assert.Equal(t, uint(5), c.cStep) + assert.Equal(t, 5*time.Second, c.cStepDuration) + }) + }) + + t.Run("with concurrency line", func(t *testing.T) { + t.Run("no step", func(t *testing.T) { + _, err := NewConfig(" call ", " localhost:50050 ", + WithProtoFile("testdata/data.proto", []string{}), + WithConcurrencySchedule(ScheduleLine), + ) + + assert.Error(t, err) + }) + + t.Run("with all concurrency settings", func(t *testing.T) { + c, err := NewConfig(" call ", " localhost:50050 ", + WithProtoFile("testdata/data.proto", []string{}), + WithConcurrencySchedule(ScheduleLine), + WithConcurrencyStep(2), + WithConcurrencyMin(5), + WithConcurrencyDuration(20*time.Second), + WithConcurrencyStepDuration(5*time.Second), // overwritten + WithConcurrencyMax(20), + ) + + assert.NoError(t, err) + + assert.Equal(t, ScheduleLine, c.cSchedule) + assert.Equal(t, uint(5), c.cMin) + assert.Equal(t, uint(20), c.cMax) + assert.Equal(t, 20*time.Second, c.cMaxDuration) + assert.Equal(t, uint(2), c.cStep) + assert.Equal(t, 1*time.Second, c.cStepDuration) + }) + }) } diff --git a/runner/run.go b/runner/run.go index 5c5eef98..92463d72 100644 --- a/runner/run.go +++ b/runner/run.go @@ -36,6 +36,7 @@ func Run(call, host string, options ...Option) (*Report, error) { cancel := make(chan os.Signal, 1) signal.Notify(cancel, os.Interrupt) + go func() { <-cancel reqr.Stop(ReasonCancel) diff --git a/runner/worker.go b/runner/worker.go index f5a8fb6c..f3825b8c 100644 --- a/runner/worker.go +++ b/runner/worker.go @@ -130,38 +130,22 @@ func (w *Worker) makeRequest() error { "input", inputs, "metadata", reqMD) } + inputsLen := len(inputs) + if inputsLen == 0 { + return fmt.Errorf("no data provided for request") + } + inputIdx := int((reqNum - 1) % int64(inputsLen)) // we want to start from inputs[0] so dec reqNum + unaryInput := inputs[inputIdx] + // RPC errors are handled via stats handler if w.mtd.IsClientStreaming() && w.mtd.IsServerStreaming() { _ = w.makeBidiRequest(&ctx, inputs) } else if w.mtd.IsClientStreaming() { _ = w.makeClientStreamingRequest(&ctx, inputs) + } else if w.mtd.IsServerStreaming() { + _ = w.makeServerStreamingRequest(&ctx, unaryInput) } else { - - inputsLen := len(inputs) - if inputsLen == 0 { - return fmt.Errorf("no data provided for request") - } - inputIdx := int((reqNum - 1) % int64(inputsLen)) // we want to start from inputs[0] so dec reqNum - - if w.mtd.IsServerStreaming() { - _ = w.makeServerStreamingRequest(&ctx, inputs[inputIdx]) - } else { - var res proto.Message - var resErr error - var callOptions = []grpc.CallOption{} - if w.config.enableCompression { - callOptions = append(callOptions, grpc.UseCompressor(gzip.Name)) - } - - res, resErr = w.stub.InvokeRpc(ctx, w.mtd, inputs[inputIdx], callOptions...) - - if w.config.hasLog { - w.config.log.Debugw("Received response", "workerID", w.workerID, "call type", callType, - "call", w.mtd.GetFullyQualifiedName(), - "input", inputs, "metadata", reqMD, - "response", res, "error", resErr) - } - } + _ = w.makeUnaryRequest(&ctx, reqMD, unaryInput) } return err @@ -197,6 +181,26 @@ func (w *Worker) getMessages(ctd *callTemplateData, inputData []byte) ([]*dynami return inputs, nil } +func (w *Worker) makeUnaryRequest(ctx *context.Context, reqMD *metadata.MD, input *dynamic.Message) error { + var res proto.Message + var resErr error + var callOptions = []grpc.CallOption{} + if w.config.enableCompression { + callOptions = append(callOptions, grpc.UseCompressor(gzip.Name)) + } + + res, resErr = w.stub.InvokeRpc(*ctx, w.mtd, input, callOptions...) + + if w.config.hasLog { + w.config.log.Debugw("Received response", "workerID", w.workerID, "call type", "unary", + "call", w.mtd.GetFullyQualifiedName(), + "input", input, "metadata", reqMD, + "response", res, "error", resErr) + } + + return resErr +} + func (w *Worker) makeClientStreamingRequest(ctx *context.Context, input []*dynamic.Message) error { var str *grpcdynamic.ClientStream var err error