Skip to content

Commit

Permalink
step and line
Browse files Browse the repository at this point in the history
  • Loading branch information
bojand committed Oct 21, 2020
1 parent 7fb2f0d commit a64ff50
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 78 deletions.
12 changes: 6 additions & 6 deletions cmd/ghz/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,21 @@ var (
async = kingpin.Flag("async", "Make async requests.").
Default("false").IsSetByUser(&isAsyncSet).Bool()

isScheduleSet = false
schedule = kingpin.Flag("load-schedule", "Specifies the load schedule. Options are const, step, or line. Default is const.").
Default("const").IsSetByUser(&isScheduleSet).String()

isQSet = false
q = kingpin.Flag("qps", "Queries per second (QPS) rate limit for constant load. Default is no rate limit.").
Default("0").Short('q').IsSetByUser(&isQSet).Uint()

isScheduleSet = false
schedule = kingpin.Flag("load-schedule", "Specifies the load schedule. Options are const, step, or line. Default is const.").
Default("const").IsSetByUser(&isScheduleSet).String()

isLoadStartSet = false
loadStart = kingpin.Flag("load-start", "Specifies the load start value.").
Default("0").IsSetByUser(&isLoadStartSet).Uint()

isLoadStepSet = false
loadStep = kingpin.Flag("load-step", "Specifies the load step value or slope value.").
Default("0").IsSetByUser(&isLoadStepSet).Uint()
Default("0").IsSetByUser(&isLoadStepSet).Int()

isLoadEndSet = false
loadEnd = kingpin.Flag("load-end", "Specifies the load end value.").
Expand Down Expand Up @@ -128,7 +128,7 @@ var (

isCStepSet = false
cstep = kingpin.Flag("concurrency-step", "Concurrency step / slope value for step and line schedules.").
Default("1").IsSetByUser(&isCStepSet).Uint()
Default("1").IsSetByUser(&isCStepSet).Int()

isCStepDurSet = false
cStepDuration = kingpin.Flag("concurrency-step-duration", "Specifies the concurrency step duration value for step concurrency schedule.").
Expand Down
4 changes: 2 additions & 2 deletions runner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Config struct {
CSchedule string `json:"concurrency-schedule" toml:"concurrency-schedule" yaml:"concurrency-schedule" default:"const"`
CMin uint `json:"concurrency-min" toml:"concurrency-min" yaml:"concurrency-min" default:"1"`
CMax uint `json:"concurrency-max" toml:"concurrency-max" yaml:"concurrency-max" default:"0"`
CStep uint `json:"concurrency-step" toml:"concurrency-step" yaml:"concurrency-step" default:"0"`
CStep int `json:"concurrency-step" toml:"concurrency-step" yaml:"concurrency-step" default:"0"`
CStepDuration Duration `json:"concurrency-step-duration" toml:"concurrency-step-duration" yaml:"concurrency-step-duration" default:"0"`
CMaxDuration Duration `json:"concurrency-max-duration" toml:"concurrency-max-duration" yaml:"concurrency-max-duration" default:"0"`
Connections uint `json:"connections" toml:"connections" yaml:"connections" default:"1"`
Expand Down Expand Up @@ -103,7 +103,7 @@ type Config struct {
LoadSchedule string `json:"load-schedule" toml:"load-schedule" yaml:"load-schedule" default:"const"`
LoadStart uint `json:"load-start" toml:"load-start" yaml:"load-start"`
LoadEnd uint `json:"load-end" toml:"load-end" yaml:"load-end"`
LoadStep uint `json:"load-step" toml:"load-step" yaml:"load-step"`
LoadStep int `json:"load-step" toml:"load-step" yaml:"load-step"`
LoadStepDuration Duration `json:"load-step-duration" toml:"load-step-duration" yaml:"load-step-duration"`
LoadMaxDuration Duration `json:"load-max-duration" toml:"load-max-duration" yaml:"load-max-duration"`
}
Expand Down
10 changes: 5 additions & 5 deletions runner/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type RunConfig struct {
qps int
loadStart uint
loadEnd uint
loadStep uint
loadStep int
loadSchedule string
loadDuration time.Duration
loadStepDuration time.Duration
Expand All @@ -61,7 +61,7 @@ type RunConfig struct {
c int
cMin uint
cMax uint
cStep uint
cStep int
cSchedule string
cMaxDuration time.Duration
cStepDuration time.Duration
Expand Down Expand Up @@ -743,7 +743,7 @@ func WithLoadEnd(end uint) Option {

// WithLoadStep specifies the load step
// WithLoadStep(5)
func WithLoadStep(step uint) Option {
func WithLoadStep(step int) Option {
return func(o *RunConfig) error {
o.loadStep = step

Expand Down Expand Up @@ -813,7 +813,7 @@ func WithConcurrencyMax(max uint) Option {

// WithConcurrencyStep specifies the concurrency step value or slope
// WithConcurrencyStep(5)
func WithConcurrencyStep(step uint) Option {
func WithConcurrencyStep(step int) Option {
return func(o *RunConfig) error {
o.cStep = step

Expand Down Expand Up @@ -920,7 +920,7 @@ func fromConfig(cfg *Config) []Option {
WithLoadEnd(cfg.LoadEnd),
WithLoadDuration(time.Duration(cfg.LoadMaxDuration)),
WithAsync(cfg.Async),
WithConcurrencySchedule(cfg.LoadSchedule),
WithConcurrencySchedule(cfg.CSchedule),
WithConcurrencyMin(cfg.CMin),
WithConcurrencyMax(cfg.CMax),
WithConcurrencyStep(cfg.CStep),
Expand Down
72 changes: 64 additions & 8 deletions runner/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,11 @@ func (b *Requester) Run() (*Report, error) {
b.reporter.Run()
}()

wt := load.ConstWorkerTicker{N: uint(b.config.c), C: make(chan load.TickValue)}
pacer := load.ConstantPacer{Freq: uint64(b.config.qps), Max: uint64(b.config.n)}
wt := createWorkerTicker(b.config)

err = b.runWorkers(&wt, &pacer)
p := createPacer(b.config)

err = b.runWorkers(wt, p)

report := b.Finish()
b.closeClientConns()
Expand Down Expand Up @@ -342,7 +343,7 @@ func (b *Requester) runWorkers(wt load.WorkerTicker, p load.Pacer) error {

errC := make(chan error, b.config.c)
done := make(chan struct{})
ticks := make(chan struct{})
ticks := make(chan TickValue)
counter := Counter{}

go func() {
Expand All @@ -366,7 +367,6 @@ func (b *Requester) runWorkers(wt load.WorkerTicker, p load.Pacer) error {
w := Worker{
ticks: ticks,
active: true,
counter: &counter,
stub: b.stubs[n],
mtd: b.mtd,
config: b.config,
Expand Down Expand Up @@ -436,11 +436,13 @@ func (b *Requester) runWorkers(wt load.WorkerTicker, p load.Pacer) error {
return
}

time.Sleep(wait)
if wait > 0 {
time.Sleep(wait)
}

select {
case ticks <- struct{}{}:
counter.Inc()
case ticks <- TickValue{instant: time.Now(), reqNumber: counter.Inc() - 1}:
continue
case <-b.stopCh:
fmt.Println("stop. count:", counter.Get())
done <- struct{}{}
Expand All @@ -465,3 +467,57 @@ func min(a, b int) int {
}
return b
}

func createWorkerTicker(config *RunConfig) load.WorkerTicker {
var wt load.WorkerTicker
switch config.cSchedule {
case ScheduleLine:
wt = &load.LineWorkerTicker{
C: make(chan load.TickValue),
Start: config.cMin,
Slope: config.cStep,
Stop: config.cMax,
LoadDuration: config.cMaxDuration,
}
case ScheduleStep:
wt = &load.StepWorkerTicker{
C: make(chan load.TickValue),
Start: config.cMin,
Step: config.cStep,
Stop: config.cMax,
StepDuration: config.cStepDuration,
LoadDuration: config.cMaxDuration,
}
default:
wt = &load.ConstWorkerTicker{N: uint(config.c), C: make(chan load.TickValue)}
}

return wt
}

func createPacer(config *RunConfig) load.Pacer {
var p load.Pacer
switch config.loadSchedule {
case ScheduleLine:
p = &load.LinearPacer{
Start: load.ConstantPacer{Freq: uint64(config.loadStart), Max: uint64(config.n)},
Slope: int64(config.loadStep),
Stop: load.ConstantPacer{Freq: uint64(config.loadEnd), Max: uint64(config.n)},
LoadDuration: config.loadDuration,
Max: uint64(config.n),
}
case ScheduleStep:
p = &load.StepPacer{
Start: load.ConstantPacer{Freq: uint64(config.loadStart), Max: uint64(config.n)},
Step: int64(config.loadStep),
Stop: load.ConstantPacer{Freq: uint64(config.loadEnd), Max: uint64(config.n)},
LoadDuration: config.loadDuration,
StepDuration: config.loadStepDuration,
Max: uint64(config.n),
}
default:
p = &load.ConstantPacer{Freq: uint64(config.qps), Max: uint64(config.n)}
}

return p
}
81 changes: 33 additions & 48 deletions runner/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package runner
import (
"fmt"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -247,60 +246,46 @@ func TestRunUnary(t *testing.T) {
})

t.Run("test QPS", func(t *testing.T) {
gs.ResetCounters()

var wg sync.WaitGroup
gs.ResetCounters()

wg.Add(1)
data := make(map[string]interface{})
data["name"] = "bob"

time.AfterFunc(time.Duration(1500*time.Millisecond), func() {
count := gs.GetCount(callType)
assert.Equal(t, 2, count)
})
report, err := Run(
"helloworld.Greeter.SayHello",
internal.TestLocalhost,
WithProtoFile("../testdata/greeter.proto", []string{}),
WithTotalRequests(10),
WithConcurrency(2),
WithQPS(1),
WithTimeout(time.Duration(20*time.Second)),
WithDialTimeout(time.Duration(20*time.Second)),
WithData(data),
WithInsecure(true),
)

go func() {
data := make(map[string]interface{})
data["name"] = "bob"
assert.NoError(t, err)

report, err := Run(
"helloworld.Greeter.SayHello",
internal.TestLocalhost,
WithProtoFile("../testdata/greeter.proto", []string{}),
WithTotalRequests(10),
WithConcurrency(2),
WithQPS(1),
WithTimeout(time.Duration(20*time.Second)),
WithDialTimeout(time.Duration(20*time.Second)),
WithData(data),
WithInsecure(true),
)
assert.NotNil(t, report)

assert.NoError(t, err)
assert.Equal(t, 10, int(report.Count))
assert.NotZero(t, report.Average)
assert.NotZero(t, report.Fastest)
assert.NotZero(t, report.Slowest)
assert.NotZero(t, report.Rps)
assert.Empty(t, report.Name)
assert.NotEmpty(t, report.Date)
assert.NotEmpty(t, report.Options)
assert.NotEmpty(t, report.Details)
assert.Equal(t, true, report.Options.Insecure)
assert.NotEmpty(t, report.LatencyDistribution)
assert.Equal(t, ReasonNormalEnd, report.EndReason)
assert.Empty(t, report.ErrorDist)

assert.NotNil(t, report)

assert.Equal(t, 10, int(report.Count))
assert.NotZero(t, report.Average)
assert.NotZero(t, report.Fastest)
assert.NotZero(t, report.Slowest)
assert.NotZero(t, report.Rps)
assert.Empty(t, report.Name)
assert.NotEmpty(t, report.Date)
assert.NotEmpty(t, report.Options)
assert.NotEmpty(t, report.Details)
assert.Equal(t, true, report.Options.Insecure)
assert.NotEmpty(t, report.LatencyDistribution)
assert.Equal(t, ReasonNormalEnd, report.EndReason)
assert.Empty(t, report.ErrorDist)

assert.NotEqual(t, report.Average, report.Slowest)
assert.NotEqual(t, report.Average, report.Fastest)
assert.NotEqual(t, report.Slowest, report.Fastest)

wg.Done()

}()
wg.Wait()
assert.NotEqual(t, report.Average, report.Slowest)
assert.NotEqual(t, report.Average, report.Fastest)
assert.NotEqual(t, report.Slowest, report.Fastest)
})

t.Run("test binary", func(t *testing.T) {
Expand Down
27 changes: 18 additions & 9 deletions runner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ import (
"google.golang.org/grpc/metadata"
)

// TickValue is the tick value
type TickValue struct {
instant time.Time
reqNumber uint64
}

// Worker is used for doing a single stream of requests in parallel
type Worker struct {
stub grpcdynamic.Stub
Expand All @@ -25,9 +31,8 @@ type Worker struct {
config *RunConfig
workerID string
active bool
counter RequestCounter
stopCh chan bool
ticks <-chan struct{}
ticks <-chan TickValue

// cached messages only for binary
cachedMessages []*dynamic.Message
Expand All @@ -45,14 +50,17 @@ func (w *Worker) runWorker() error {
case <-w.stopCh:
if w.config.async {
return g.Wait()
} else {
return err
}
case <-w.ticks:

return err

case tv := <-w.ticks:
if w.config.async {
g.Go(w.makeRequest)
g.Go(func() error {
return w.makeRequest(tv)
})
} else {
rErr := w.makeRequest()
rErr := w.makeRequest(tv)
err = multierr.Append(err, rErr)
}
}
Expand All @@ -69,8 +77,9 @@ func (w *Worker) Stop() {
w.stopCh <- true
}

func (w *Worker) makeRequest() error {
reqNum := int64(w.counter.Get())
func (w *Worker) makeRequest(tv TickValue) error {
// fmt.Println("wid:", w.workerID, tv.instant.String(), tv.reqNumber)
reqNum := int64(tv.reqNumber)

ctd := newCallTemplateData(w.mtd, w.config.funcs, w.workerID, reqNum)

Expand Down

0 comments on commit a64ff50

Please sign in to comment.