Skip to content

Commit

Permalink
Consistently use parentCtx in the executors' Run() methods
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- committed Jul 3, 2020
1 parent 2153af8 commit e2bbb8d
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 15 deletions.
8 changes: 4 additions & 4 deletions lib/executor/constant_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (car *ConstantArrivalRate) Init(ctx context.Context) error {
// time should iteration X begin) different, but keep everything else the same.
// This will allow us to implement https://github.com/loadimpact/k6/issues/1386
// and things like all of the TODOs below in one place only.
func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) { //nolint:funlen
func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) { //nolint:funlen
gracefulStop := car.config.GetGracefulStop()
duration := time.Duration(car.config.Duration.Duration)
preAllocatedVUs := car.config.GetPreAllocatedVUs(car.executionState.ExecutionTuple)
Expand All @@ -228,7 +228,7 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
activeVUsWg := &sync.WaitGroup{}

returnedVUs := make(chan struct{})
startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop)
startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop)

defer func() {
// Make sure all VUs aren't executing iterations anymore, for the cancel()
Expand Down Expand Up @@ -310,7 +310,7 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
return math.Min(1, float64(spent)/float64(duration)), right
}
car.progress.Modify(pb.WithProgress(progresFn))
go trackProgress(ctx, maxDurationCtx, regDurationCtx, &car, progresFn)
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, &car, progresFn)

runIterationBasic := getIterationRunner(car.executionState, car.logger)
runIteration := func(vu lib.ActiveVU) {
Expand Down Expand Up @@ -345,7 +345,7 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
// Since there aren't any free VUs available, consider this iteration
// dropped - we aren't going to try to recover it, but

stats.PushIfNotDone(ctx, out, stats.Sample{
stats.PushIfNotDone(parentCtx, out, stats.Sample{
Value: 1, Metric: metrics.DroppedIterations,
Tags: metricTags, Time: time.Now(),
})
Expand Down
6 changes: 3 additions & 3 deletions lib/executor/constant_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ var _ lib.Executor = &ConstantVUs{}

// Run constantly loops through as many iterations as possible on a fixed number
// of VUs for the specified duration.
func (clv ConstantVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) {
func (clv ConstantVUs) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) {
numVUs := clv.config.GetVUs(clv.executionState.ExecutionTuple)
duration := time.Duration(clv.config.Duration.Duration)
gracefulStop := clv.config.GetGracefulStop()

startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop)
startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop)
defer cancel()

// Make sure the log and the progress bar have accurate information
Expand All @@ -167,7 +167,7 @@ func (clv ConstantVUs) Run(ctx context.Context, out chan<- stats.SampleContainer
return float64(spent) / float64(duration), right
}
clv.progress.Modify(pb.WithProgress(progresFn))
go trackProgress(ctx, maxDurationCtx, regDurationCtx, clv, progresFn)
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, clv, progresFn)

// Actually schedule the VUs and iterations...
activeVUs := &sync.WaitGroup{}
Expand Down
8 changes: 4 additions & 4 deletions lib/executor/ramping_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (varc RampingArrivalRateConfig) cal(et *lib.ExecutionTuple, ch chan<- time.
// This will allow us to implement https://github.com/loadimpact/k6/issues/1386
// and things like all of the TODOs below in one place only.
//nolint:funlen,gocognit
func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) {
func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) {
segment := varr.executionState.ExecutionTuple.Segment
gracefulStop := varr.config.GetGracefulStop()
duration := sumStagesDuration(varr.config.Stages)
Expand All @@ -314,7 +314,7 @@ func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
activeVUsWg := &sync.WaitGroup{}

returnedVUs := make(chan struct{})
startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop)
startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop)

defer func() {
// Make sure all VUs aren't executing iterations anymore, for the cancel()
Expand Down Expand Up @@ -406,7 +406,7 @@ func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
}

varr.progress.Modify(pb.WithProgress(progresFn))
go trackProgress(ctx, maxDurationCtx, regDurationCtx, varr, progresFn)
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, varr, progresFn)

regDurationDone := regDurationCtx.Done()
runIterationBasic := getIterationRunner(varr.executionState, varr.logger)
Expand Down Expand Up @@ -449,7 +449,7 @@ func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
// Since there aren't any free VUs available, consider this iteration
// dropped - we aren't going to try to recover it, but

stats.PushIfNotDone(ctx, out, stats.Sample{
stats.PushIfNotDone(parentCtx, out, stats.Sample{
Value: 1, Metric: metrics.DroppedIterations,
Tags: metricTags, Time: time.Now(),
})
Expand Down
8 changes: 4 additions & 4 deletions lib/executor/ramping_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ var _ lib.Executor = &RampingVUs{}
// of a less complex way to implement it (besides the old "increment by 100ms
// and see what happens)... :/ so maybe see how it can be split?
// nolint:funlen,gocognit
func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) {
func (vlv RampingVUs) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) {
rawExecutionSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true)
regularDuration, isFinal := lib.GetEndOffset(rawExecutionSteps)
if !isFinal {
Expand All @@ -548,7 +548,7 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer)
maxVUs := lib.GetMaxPlannedVUs(gracefulExecutionSteps)
gracefulStop := maxDuration - regularDuration

startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, regularDuration, gracefulStop)
startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, regularDuration, gracefulStop)
defer cancel()

activeVUs := &sync.WaitGroup{}
Expand All @@ -575,7 +575,7 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer)
return float64(spent) / float64(regularDuration), []string{progVUs, progDur}
}
vlv.progress.Modify(pb.WithProgress(progresFn))
go trackProgress(ctx, maxDurationCtx, regDurationCtx, vlv, progresFn)
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, vlv, progresFn)

// Actually schedule the VUs and iterations, likely the most complicated
// executor among all of them...
Expand Down Expand Up @@ -633,7 +633,7 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer)
currentMaxAllowedVUs = newMaxAllowedVUs
}

wait := waiter(ctx, startTime)
wait := waiter(parentCtx, startTime)
// iterate over rawExecutionSteps and gracefulExecutionSteps in order by TimeOffset
// giving rawExecutionSteps precedence.
// we stop iterating once rawExecutionSteps are over as we need to run the remaining
Expand Down

0 comments on commit e2bbb8d

Please sign in to comment.