Skip to content

Commit

Permalink
docs and code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
bojand committed Oct 25, 2020
1 parent a61944b commit 54efa0a
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 74 deletions.
54 changes: 24 additions & 30 deletions load/pacer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// nano is the const for number of nanoseconds in a second
const nano = 1e9

// A Pacer defines the control interface to control the rate of hit.
// Pacer defines the interface to control the rate of hit.
type Pacer interface {
// Pace returns the duration the attacker should wait until
// making next hit, given an already elapsed duration and
Expand All @@ -23,11 +23,7 @@ type Pacer interface {
Rate(elapsed time.Duration) float64
}

// A PacerFunc is a function adapter type that implements
// the Pacer interface.
// type PacerFunc func(time.Duration, uint64) (time.Duration, bool)

// A ConstantPacer defines a constant rate of hits for the target.
// A ConstantPacer defines a constant rate of hits.
type ConstantPacer struct {
Freq uint64 // Frequency of hits per second
Max uint64 // Optional maximum allowed hits
Expand Down Expand Up @@ -74,21 +70,20 @@ func (cp *ConstantPacer) Rate(elapsed time.Duration) float64 {
return cp.hitsPerNs() * 1e9
}

// hitsPerNs returns the attack rate this ConstantPacer represents, in
// fractional hits per nanosecond.
// hitsPerNs returns the rate in fractional hits per nanosecond.
func (cp *ConstantPacer) hitsPerNs() float64 {
return float64(cp.Freq) / nano
}

// StepPacer paces an attack by starting at a given request rate
// and increasing with steps at a given time interval and duration.
// and increasing or decreasing with steps at a given step interval and duration.
type StepPacer struct {
Start ConstantPacer
Step int64
StepDuration time.Duration
Stop ConstantPacer
LoadDuration time.Duration
Max uint64
Start ConstantPacer // Constant start rate
Step int64 // Step value
StepDuration time.Duration // Step duration
Stop ConstantPacer // Optional constant stop value
LoadDuration time.Duration // Optional maximum load duration
Max uint64 // Optional maximum allowed hits

once sync.Once
init bool // TOOO improve this
Expand Down Expand Up @@ -198,7 +193,7 @@ func (p *StepPacer) Pace(elapsed time.Duration, hits uint64) (time.Duration, boo
}

// Rate returns a StepPacer's instantaneous hit rate (i.e. requests per second)
// at the given elapsed duration of an attack.
// at the given elapsed duration.
func (p *StepPacer) Rate(elapsed time.Duration) float64 {
p.initialize()

Expand All @@ -219,9 +214,7 @@ func (p *StepPacer) Rate(elapsed time.Duration) float64 {
return rate
}

// hits returns the number of hits that have been sent during an attack
// lasting t nanoseconds. It returns a float so we can tell exactly how
// much we've missed our target by when solving numerically in Pace.
// hits returns the number of hits that have been sent at elapsed duration t.
func (p *StepPacer) hits(t time.Duration) float64 {
if t < 0 {
return 0
Expand Down Expand Up @@ -258,24 +251,25 @@ func (p *StepPacer) hits(t time.Duration) float64 {
}

// String returns a pretty-printed description of the StepPacer's behaviour:
// StepPacer{Step: 1, StepDuration: 5s} => Step{Step:1 hits/5s}
// StepPacer{Step: 1, StepDuration: 5s} => Step{Step:1 hits / 5s}
func (p *StepPacer) String() string {
return fmt.Sprintf("Step{Step: %d hits / %s}", p.Step, p.StepDuration.String())
}

// LinearPacer paces an attack by starting at a given request rate
// and increasing linearly with the given slope.
// LinearPacer paces the hit rate by starting at a given request rate
// and increasing linearly with the given slope at 1s interval.
type LinearPacer struct {
Start ConstantPacer
Slope int64
Stop ConstantPacer
LoadDuration time.Duration
Max uint64
Start ConstantPacer // Constant start rate
Slope int64 // Slope value to change the rate
Stop ConstantPacer // Constant stop rate
LoadDuration time.Duration // Total maximum load duration
Max uint64 // Maximum number of hits

once sync.Once
sp StepPacer
}

// initializes the wrapped step pacer
func (p *LinearPacer) initialize() {
if p.Start.Freq == 0 {
panic("LinearPacer.Start cannot be 0")
Expand Down Expand Up @@ -310,7 +304,7 @@ func (p *LinearPacer) Pace(elapsed time.Duration, hits uint64) (time.Duration, b
}

// Rate returns a LinearPacer's instantaneous hit rate (i.e. requests per second)
// at the given elapsed duration of an attack.
// at the given elapsed duration.
func (p *LinearPacer) Rate(elapsed time.Duration) float64 {

p.initialize()
Expand All @@ -319,7 +313,7 @@ func (p *LinearPacer) Rate(elapsed time.Duration) float64 {
}

// String returns a pretty-printed description of the LinearPacer's behaviour:
// LinearPacer{Slope: 1} => Linear{1 hits/1s}
// LinearPacer{Slope: 1} => Linear{1 hits / 1s}
func (p *LinearPacer) String() string {
return fmt.Sprintf("Linear{%d hits/1s}", p.Slope)
return fmt.Sprintf("Linear{%d hits / 1s}", p.Slope)
}
75 changes: 42 additions & 33 deletions load/worker_ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,57 +4,65 @@ import (
"time"
)

// WorkerTicker is the interface for worker ticker which controls worker parallelism
// WorkerTicker is the interface controlling worker parallelism.
type WorkerTicker interface {
// Ticker returns a channel which sends TickValues
// When a value is received the number of workers should be appropriately
// increased or decreased given by the delta property.
Ticker() <-chan TickValue

// Run starts the worker ticker
Run()

// Finish closes the channel
Finish()
}

// TickValue is the delta value
// TickValue is the tick value sent over the ticker channel.
type TickValue struct {
Delta int
Done bool
Delta int // Delta value representing worker increase or decrease
Done bool // A flag representing whether the ticker is done running. Once true no more values should be received over the ticker channel.
}

// ConstWorkerTicker is the const worker
// ConstWorkerTicker represents a constant number of workers.
// It would send one value for initial number of workers to start.
type ConstWorkerTicker struct {
C chan TickValue
N uint
C chan TickValue // The tick value channel
N uint // The number of workers
}

// Ticker returns the ticker channer
// Ticker returns the ticker channel.
func (c *ConstWorkerTicker) Ticker() <-chan TickValue {
return c.C
}

// Run runs the ticker
// Run runs the ticker.
func (c *ConstWorkerTicker) Run() {
c.C <- TickValue{Delta: int(c.N)}
c.C <- TickValue{Delta: int(c.N), Done: true}
}

// Finish stops
// Finish closes the channel.
func (c *ConstWorkerTicker) Finish() {
close(c.C)
}

// StepWorkerTicker is the worker ticker that implements step adjustments to concurrency
// StepWorkerTicker is the worker ticker that implements step adjustments to worker concurrency.
type StepWorkerTicker struct {
C chan TickValue
C chan TickValue // The tick value channel

Start uint
Step int
StepDuration time.Duration
Stop uint
LoadDuration time.Duration
Start uint // Starting number of workers
Step int // Step change
StepDuration time.Duration // Duration to apply the step change
Stop uint // Final number of workers
MaxDuration time.Duration // Maximum duration
}

// Ticker returns the ticker channer
// Ticker returns the ticker channel.
func (c *StepWorkerTicker) Ticker() <-chan TickValue {
return c.C
}

// Run runs the ticker
// Run runs the ticker.
func (c *StepWorkerTicker) Run() {

stepUp := c.Step > 0
Expand All @@ -71,7 +79,7 @@ func (c *StepWorkerTicker) Run() {
go func() {
for range ticker.C {
// we have load duration and we eclipsed it
if c.LoadDuration > 0 && time.Since(begin) >= c.LoadDuration {
if c.MaxDuration > 0 && time.Since(begin) >= c.MaxDuration {
if stepUp && c.Stop > 0 && c.Stop >= uint(wc) {
// if we have step up and stop value is > current count
// send the final diff
Expand All @@ -87,7 +95,7 @@ func (c *StepWorkerTicker) Run() {

done <- true
return
} else if (c.LoadDuration == 0) && ((c.Stop > 0 && stepUp && wc >= int(c.Stop)) ||
} else if (c.MaxDuration == 0) && ((c.Stop > 0 && stepUp && wc >= int(c.Stop)) ||
(!stepUp && wc <= int(c.Stop))) {
// we do not have load duration
// if we have stop and are step up and current count >= stop
Expand All @@ -107,29 +115,30 @@ func (c *StepWorkerTicker) Run() {
<-done
}

// Finish stops
// Finish closes the channel.
func (c *StepWorkerTicker) Finish() {
close(c.C)
}

// LineWorkerTicker is the worker ticker that implements line adjustments to concurrency
// LineWorkerTicker is the worker ticker that implements line adjustments to concurrency.
// Essentially this is same as step worker with 1s step duration.
type LineWorkerTicker struct {
C chan TickValue
C chan TickValue // The tick value channel

Start uint
Slope int
Stop uint
LoadDuration time.Duration
Start uint // Starting number of workers
Slope int // Slope value to adjust the number of workers
Stop uint // Final number of workers
MaxDuration time.Duration // Maximum adjustment duration

stepTicker StepWorkerTicker
}

// Ticker returns the ticker channer
// Ticker returns the ticker channel.
func (c *LineWorkerTicker) Ticker() <-chan TickValue {
return c.C
}

// Run runs the ticker
// Run runs the ticker.
func (c *LineWorkerTicker) Run() {

c.stepTicker = StepWorkerTicker{
Expand All @@ -138,13 +147,13 @@ func (c *LineWorkerTicker) Run() {
Step: c.Slope,
StepDuration: 1 * time.Second,
Stop: c.Stop,
LoadDuration: c.LoadDuration,
MaxDuration: c.MaxDuration,
}

c.stepTicker.Run()
}

// Finish stops
// Finish closes the internal tick value channel.
func (c *LineWorkerTicker) Finish() {
c.stepTicker.Finish()
}
10 changes: 5 additions & 5 deletions load/worker_ticker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestStepWorkerTicker(t *testing.T) {
Step: 2,
Stop: 0,
StepDuration: 2 * time.Second,
LoadDuration: 5 * time.Second,
MaxDuration: 5 * time.Second,
}

defer wt.Finish()
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestStepWorkerTicker(t *testing.T) {
Step: 2,
Stop: 15,
StepDuration: 2 * time.Second,
LoadDuration: 5 * time.Second,
MaxDuration: 5 * time.Second,
}

defer wt.Finish()
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestStepWorkerTicker(t *testing.T) {
Step: -2,
Stop: 0,
StepDuration: 2 * time.Second,
LoadDuration: 5 * time.Second,
MaxDuration: 5 * time.Second,
}

defer wt.Finish()
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestStepWorkerTicker(t *testing.T) {
Step: -2,
Stop: 4,
StepDuration: 2 * time.Second,
LoadDuration: 0,
MaxDuration: 0,
}

defer wt.Finish()
Expand Down Expand Up @@ -230,7 +230,7 @@ func TestStepWorkerTicker(t *testing.T) {
Step: -2,
Stop: 3,
StepDuration: 2 * time.Second,
LoadDuration: 5 * time.Second,
MaxDuration: 5 * time.Second,
}

defer wt.Finish()
Expand Down
12 changes: 6 additions & 6 deletions runner/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,11 +479,11 @@ func createWorkerTicker(config *RunConfig) load.WorkerTicker {
switch config.cSchedule {
case ScheduleLine:
wt = &load.LineWorkerTicker{
C: make(chan load.TickValue),
Start: config.cStart,
Slope: config.cStep,
Stop: config.cEnd,
LoadDuration: config.cMaxDuration,
C: make(chan load.TickValue),
Start: config.cStart,
Slope: config.cStep,
Stop: config.cEnd,
MaxDuration: config.cMaxDuration,
}
case ScheduleStep:
wt = &load.StepWorkerTicker{
Expand All @@ -492,7 +492,7 @@ func createWorkerTicker(config *RunConfig) load.WorkerTicker {
Step: config.cStep,
Stop: config.cEnd,
StepDuration: config.cStepDuration,
LoadDuration: config.cMaxDuration,
MaxDuration: config.cMaxDuration,
}
default:
wt = &load.ConstWorkerTicker{N: uint(config.c), C: make(chan load.TickValue)}
Expand Down

0 comments on commit 54efa0a

Please sign in to comment.