From a64ff50c76df1520684246655c9efe988360247d Mon Sep 17 00:00:00 2001 From: Bojan Date: Tue, 20 Oct 2020 22:51:40 -0300 Subject: [PATCH] step and line --- cmd/ghz/main.go | 12 +++---- runner/config.go | 4 +-- runner/options.go | 10 +++--- runner/requester.go | 72 +++++++++++++++++++++++++++++++++++----- runner/run_test.go | 81 ++++++++++++++++++--------------------------- runner/worker.go | 27 ++++++++++----- 6 files changed, 128 insertions(+), 78 deletions(-) diff --git a/cmd/ghz/main.go b/cmd/ghz/main.go index b75276bf..2459d706 100644 --- a/cmd/ghz/main.go +++ b/cmd/ghz/main.go @@ -81,21 +81,21 @@ var ( async = kingpin.Flag("async", "Make async requests."). Default("false").IsSetByUser(&isAsyncSet).Bool() - isScheduleSet = false - schedule = kingpin.Flag("load-schedule", "Specifies the load schedule. Options are const, step, or line. Default is const."). - Default("const").IsSetByUser(&isScheduleSet).String() - isQSet = false q = kingpin.Flag("qps", "Queries per second (QPS) rate limit for constant load. Default is no rate limit."). Default("0").Short('q').IsSetByUser(&isQSet).Uint() + isScheduleSet = false + schedule = kingpin.Flag("load-schedule", "Specifies the load schedule. Options are const, step, or line. Default is const."). + Default("const").IsSetByUser(&isScheduleSet).String() + isLoadStartSet = false loadStart = kingpin.Flag("load-start", "Specifies the load start value."). Default("0").IsSetByUser(&isLoadStartSet).Uint() isLoadStepSet = false loadStep = kingpin.Flag("load-step", "Specifies the load step value or slope value."). - Default("0").IsSetByUser(&isLoadStepSet).Uint() + Default("0").IsSetByUser(&isLoadStepSet).Int() isLoadEndSet = false loadEnd = kingpin.Flag("load-end", "Specifies the load end value."). @@ -128,7 +128,7 @@ var ( isCStepSet = false cstep = kingpin.Flag("concurrency-step", "Concurrency step / slope value for step and line schedules."). - Default("1").IsSetByUser(&isCStepSet).Uint() + Default("1").IsSetByUser(&isCStepSet).Int() isCStepDurSet = false cStepDuration = kingpin.Flag("concurrency-step-duration", "Specifies the concurrency step duration value for step concurrency schedule."). diff --git a/runner/config.go b/runner/config.go index 8a25901a..2a9f9b88 100644 --- a/runner/config.go +++ b/runner/config.go @@ -72,7 +72,7 @@ type Config struct { CSchedule string `json:"concurrency-schedule" toml:"concurrency-schedule" yaml:"concurrency-schedule" default:"const"` CMin uint `json:"concurrency-min" toml:"concurrency-min" yaml:"concurrency-min" default:"1"` CMax uint `json:"concurrency-max" toml:"concurrency-max" yaml:"concurrency-max" default:"0"` - CStep uint `json:"concurrency-step" toml:"concurrency-step" yaml:"concurrency-step" default:"0"` + CStep int `json:"concurrency-step" toml:"concurrency-step" yaml:"concurrency-step" default:"0"` CStepDuration Duration `json:"concurrency-step-duration" toml:"concurrency-step-duration" yaml:"concurrency-step-duration" default:"0"` CMaxDuration Duration `json:"concurrency-max-duration" toml:"concurrency-max-duration" yaml:"concurrency-max-duration" default:"0"` Connections uint `json:"connections" toml:"connections" yaml:"connections" default:"1"` @@ -103,7 +103,7 @@ type Config struct { LoadSchedule string `json:"load-schedule" toml:"load-schedule" yaml:"load-schedule" default:"const"` LoadStart uint `json:"load-start" toml:"load-start" yaml:"load-start"` LoadEnd uint `json:"load-end" toml:"load-end" yaml:"load-end"` - LoadStep uint `json:"load-step" toml:"load-step" yaml:"load-step"` + LoadStep int `json:"load-step" toml:"load-step" yaml:"load-step"` LoadStepDuration Duration `json:"load-step-duration" toml:"load-step-duration" yaml:"load-step-duration"` LoadMaxDuration Duration `json:"load-max-duration" toml:"load-max-duration" yaml:"load-max-duration"` } diff --git a/runner/options.go b/runner/options.go index 4bdb9d7c..29a54984 100644 --- a/runner/options.go +++ b/runner/options.go @@ -52,7 +52,7 @@ type RunConfig struct { qps int loadStart uint loadEnd uint - loadStep uint + loadStep int loadSchedule string loadDuration time.Duration loadStepDuration time.Duration @@ -61,7 +61,7 @@ type RunConfig struct { c int cMin uint cMax uint - cStep uint + cStep int cSchedule string cMaxDuration time.Duration cStepDuration time.Duration @@ -743,7 +743,7 @@ func WithLoadEnd(end uint) Option { // WithLoadStep specifies the load step // WithLoadStep(5) -func WithLoadStep(step uint) Option { +func WithLoadStep(step int) Option { return func(o *RunConfig) error { o.loadStep = step @@ -813,7 +813,7 @@ func WithConcurrencyMax(max uint) Option { // WithConcurrencyStep specifies the concurrency step value or slope // WithConcurrencyStep(5) -func WithConcurrencyStep(step uint) Option { +func WithConcurrencyStep(step int) Option { return func(o *RunConfig) error { o.cStep = step @@ -920,7 +920,7 @@ func fromConfig(cfg *Config) []Option { WithLoadEnd(cfg.LoadEnd), WithLoadDuration(time.Duration(cfg.LoadMaxDuration)), WithAsync(cfg.Async), - WithConcurrencySchedule(cfg.LoadSchedule), + WithConcurrencySchedule(cfg.CSchedule), WithConcurrencyMin(cfg.CMin), WithConcurrencyMax(cfg.CMax), WithConcurrencyStep(cfg.CStep), diff --git a/runner/requester.go b/runner/requester.go index 2dbbbd28..5b40c6da 100644 --- a/runner/requester.go +++ b/runner/requester.go @@ -175,10 +175,11 @@ func (b *Requester) Run() (*Report, error) { b.reporter.Run() }() - wt := load.ConstWorkerTicker{N: uint(b.config.c), C: make(chan load.TickValue)} - pacer := load.ConstantPacer{Freq: uint64(b.config.qps), Max: uint64(b.config.n)} + wt := createWorkerTicker(b.config) - err = b.runWorkers(&wt, &pacer) + p := createPacer(b.config) + + err = b.runWorkers(wt, p) report := b.Finish() b.closeClientConns() @@ -342,7 +343,7 @@ func (b *Requester) runWorkers(wt load.WorkerTicker, p load.Pacer) error { errC := make(chan error, b.config.c) done := make(chan struct{}) - ticks := make(chan struct{}) + ticks := make(chan TickValue) counter := Counter{} go func() { @@ -366,7 +367,6 @@ func (b *Requester) runWorkers(wt load.WorkerTicker, p load.Pacer) error { w := Worker{ ticks: ticks, active: true, - counter: &counter, stub: b.stubs[n], mtd: b.mtd, config: b.config, @@ -436,11 +436,13 @@ func (b *Requester) runWorkers(wt load.WorkerTicker, p load.Pacer) error { return } - time.Sleep(wait) + if wait > 0 { + time.Sleep(wait) + } select { - case ticks <- struct{}{}: - counter.Inc() + case ticks <- TickValue{instant: time.Now(), reqNumber: counter.Inc() - 1}: + continue case <-b.stopCh: fmt.Println("stop. count:", counter.Get()) done <- struct{}{} @@ -465,3 +467,57 @@ func min(a, b int) int { } return b } + +func createWorkerTicker(config *RunConfig) load.WorkerTicker { + var wt load.WorkerTicker + switch config.cSchedule { + case ScheduleLine: + wt = &load.LineWorkerTicker{ + C: make(chan load.TickValue), + Start: config.cMin, + Slope: config.cStep, + Stop: config.cMax, + LoadDuration: config.cMaxDuration, + } + case ScheduleStep: + wt = &load.StepWorkerTicker{ + C: make(chan load.TickValue), + Start: config.cMin, + Step: config.cStep, + Stop: config.cMax, + StepDuration: config.cStepDuration, + LoadDuration: config.cMaxDuration, + } + default: + wt = &load.ConstWorkerTicker{N: uint(config.c), C: make(chan load.TickValue)} + } + + return wt +} + +func createPacer(config *RunConfig) load.Pacer { + var p load.Pacer + switch config.loadSchedule { + case ScheduleLine: + p = &load.LinearPacer{ + Start: load.ConstantPacer{Freq: uint64(config.loadStart), Max: uint64(config.n)}, + Slope: int64(config.loadStep), + Stop: load.ConstantPacer{Freq: uint64(config.loadEnd), Max: uint64(config.n)}, + LoadDuration: config.loadDuration, + Max: uint64(config.n), + } + case ScheduleStep: + p = &load.StepPacer{ + Start: load.ConstantPacer{Freq: uint64(config.loadStart), Max: uint64(config.n)}, + Step: int64(config.loadStep), + Stop: load.ConstantPacer{Freq: uint64(config.loadEnd), Max: uint64(config.n)}, + LoadDuration: config.loadDuration, + StepDuration: config.loadStepDuration, + Max: uint64(config.n), + } + default: + p = &load.ConstantPacer{Freq: uint64(config.qps), Max: uint64(config.n)} + } + + return p +} diff --git a/runner/run_test.go b/runner/run_test.go index dfca98c9..f0d587ea 100644 --- a/runner/run_test.go +++ b/runner/run_test.go @@ -3,7 +3,6 @@ package runner import ( "fmt" "strconv" - "sync" "testing" "time" @@ -247,60 +246,46 @@ func TestRunUnary(t *testing.T) { }) t.Run("test QPS", func(t *testing.T) { - gs.ResetCounters() - var wg sync.WaitGroup + gs.ResetCounters() - wg.Add(1) + data := make(map[string]interface{}) + data["name"] = "bob" - time.AfterFunc(time.Duration(1500*time.Millisecond), func() { - count := gs.GetCount(callType) - assert.Equal(t, 2, count) - }) + report, err := Run( + "helloworld.Greeter.SayHello", + internal.TestLocalhost, + WithProtoFile("../testdata/greeter.proto", []string{}), + WithTotalRequests(10), + WithConcurrency(2), + WithQPS(1), + WithTimeout(time.Duration(20*time.Second)), + WithDialTimeout(time.Duration(20*time.Second)), + WithData(data), + WithInsecure(true), + ) - go func() { - data := make(map[string]interface{}) - data["name"] = "bob" + assert.NoError(t, err) - report, err := Run( - "helloworld.Greeter.SayHello", - internal.TestLocalhost, - WithProtoFile("../testdata/greeter.proto", []string{}), - WithTotalRequests(10), - WithConcurrency(2), - WithQPS(1), - WithTimeout(time.Duration(20*time.Second)), - WithDialTimeout(time.Duration(20*time.Second)), - WithData(data), - WithInsecure(true), - ) + assert.NotNil(t, report) - assert.NoError(t, err) + assert.Equal(t, 10, int(report.Count)) + assert.NotZero(t, report.Average) + assert.NotZero(t, report.Fastest) + assert.NotZero(t, report.Slowest) + assert.NotZero(t, report.Rps) + assert.Empty(t, report.Name) + assert.NotEmpty(t, report.Date) + assert.NotEmpty(t, report.Options) + assert.NotEmpty(t, report.Details) + assert.Equal(t, true, report.Options.Insecure) + assert.NotEmpty(t, report.LatencyDistribution) + assert.Equal(t, ReasonNormalEnd, report.EndReason) + assert.Empty(t, report.ErrorDist) - assert.NotNil(t, report) - - assert.Equal(t, 10, int(report.Count)) - assert.NotZero(t, report.Average) - assert.NotZero(t, report.Fastest) - assert.NotZero(t, report.Slowest) - assert.NotZero(t, report.Rps) - assert.Empty(t, report.Name) - assert.NotEmpty(t, report.Date) - assert.NotEmpty(t, report.Options) - assert.NotEmpty(t, report.Details) - assert.Equal(t, true, report.Options.Insecure) - assert.NotEmpty(t, report.LatencyDistribution) - assert.Equal(t, ReasonNormalEnd, report.EndReason) - assert.Empty(t, report.ErrorDist) - - assert.NotEqual(t, report.Average, report.Slowest) - assert.NotEqual(t, report.Average, report.Fastest) - assert.NotEqual(t, report.Slowest, report.Fastest) - - wg.Done() - - }() - wg.Wait() + assert.NotEqual(t, report.Average, report.Slowest) + assert.NotEqual(t, report.Average, report.Fastest) + assert.NotEqual(t, report.Slowest, report.Fastest) }) t.Run("test binary", func(t *testing.T) { diff --git a/runner/worker.go b/runner/worker.go index 90fffa65..0a011fe2 100644 --- a/runner/worker.go +++ b/runner/worker.go @@ -17,6 +17,12 @@ import ( "google.golang.org/grpc/metadata" ) +// TickValue is the tick value +type TickValue struct { + instant time.Time + reqNumber uint64 +} + // Worker is used for doing a single stream of requests in parallel type Worker struct { stub grpcdynamic.Stub @@ -25,9 +31,8 @@ type Worker struct { config *RunConfig workerID string active bool - counter RequestCounter stopCh chan bool - ticks <-chan struct{} + ticks <-chan TickValue // cached messages only for binary cachedMessages []*dynamic.Message @@ -45,14 +50,17 @@ func (w *Worker) runWorker() error { case <-w.stopCh: if w.config.async { return g.Wait() - } else { - return err } - case <-w.ticks: + + return err + + case tv := <-w.ticks: if w.config.async { - g.Go(w.makeRequest) + g.Go(func() error { + return w.makeRequest(tv) + }) } else { - rErr := w.makeRequest() + rErr := w.makeRequest(tv) err = multierr.Append(err, rErr) } } @@ -69,8 +77,9 @@ func (w *Worker) Stop() { w.stopCh <- true } -func (w *Worker) makeRequest() error { - reqNum := int64(w.counter.Get()) +func (w *Worker) makeRequest(tv TickValue) error { + // fmt.Println("wid:", w.workerID, tv.instant.String(), tv.reqNumber) + reqNum := int64(tv.reqNumber) ctd := newCallTemplateData(w.mtd, w.config.funcs, w.workerID, reqNum)