diff --git a/lib/executor/constant_arrival_rate.go b/lib/executor/constant_arrival_rate.go index c2df4554df6..751b424c39d 100644 --- a/lib/executor/constant_arrival_rate.go +++ b/lib/executor/constant_arrival_rate.go @@ -209,7 +209,8 @@ func (car *ConstantArrivalRate) Init(ctx context.Context) error { // time should iteration X begin) different, but keep everything else the same. // This will allow us to implement https://github.com/loadimpact/k6/issues/1386 // and things like all of the TODOs below in one place only. -func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) { //nolint:funlen +//nolint:funlen +func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) { gracefulStop := car.config.GetGracefulStop() duration := time.Duration(car.config.Duration.Duration) preAllocatedVUs := car.config.GetPreAllocatedVUs(car.executionState.ExecutionTuple) @@ -228,7 +229,7 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC activeVUsWg := &sync.WaitGroup{} returnedVUs := make(chan struct{}) - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop) + startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop) defer func() { // Make sure all VUs aren't executing iterations anymore, for the cancel() @@ -310,7 +311,7 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC return math.Min(1, float64(spent)/float64(duration)), right } car.progress.Modify(pb.WithProgress(progresFn)) - go trackProgress(ctx, maxDurationCtx, regDurationCtx, &car, progresFn) + go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, &car, progresFn) runIterationBasic := getIterationRunner(car.executionState, car.logger) runIteration := func(vu lib.ActiveVU) { @@ -345,7 +346,7 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC // Since there aren't any free VUs available, consider this iteration // dropped - we aren't going to try to recover it, but - stats.PushIfNotDone(ctx, out, stats.Sample{ + stats.PushIfNotDone(parentCtx, out, stats.Sample{ Value: 1, Metric: metrics.DroppedIterations, Tags: metricTags, Time: time.Now(), }) diff --git a/lib/executor/constant_vus.go b/lib/executor/constant_vus.go index ba4249ea982..be21030b285 100644 --- a/lib/executor/constant_vus.go +++ b/lib/executor/constant_vus.go @@ -142,12 +142,12 @@ var _ lib.Executor = &ConstantVUs{} // Run constantly loops through as many iterations as possible on a fixed number // of VUs for the specified duration. -func (clv ConstantVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) { +func (clv ConstantVUs) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) { numVUs := clv.config.GetVUs(clv.executionState.ExecutionTuple) duration := time.Duration(clv.config.Duration.Duration) gracefulStop := clv.config.GetGracefulStop() - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop) + startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop) defer cancel() // Make sure the log and the progress bar have accurate information @@ -167,7 +167,7 @@ func (clv ConstantVUs) Run(ctx context.Context, out chan<- stats.SampleContainer return float64(spent) / float64(duration), right } clv.progress.Modify(pb.WithProgress(progresFn)) - go trackProgress(ctx, maxDurationCtx, regDurationCtx, clv, progresFn) + go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, clv, progresFn) // Actually schedule the VUs and iterations... activeVUs := &sync.WaitGroup{} diff --git a/lib/executor/ramping_arrival_rate.go b/lib/executor/ramping_arrival_rate.go index e72c8861b1e..98503ce944e 100644 --- a/lib/executor/ramping_arrival_rate.go +++ b/lib/executor/ramping_arrival_rate.go @@ -291,7 +291,7 @@ func (varc RampingArrivalRateConfig) cal(et *lib.ExecutionTuple, ch chan<- time. // This will allow us to implement https://github.com/loadimpact/k6/issues/1386 // and things like all of the TODOs below in one place only. //nolint:funlen,gocognit -func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) { +func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) { segment := varr.executionState.ExecutionTuple.Segment gracefulStop := varr.config.GetGracefulStop() duration := sumStagesDuration(varr.config.Stages) @@ -314,7 +314,7 @@ func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC activeVUsWg := &sync.WaitGroup{} returnedVUs := make(chan struct{}) - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop) + startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop) defer func() { // Make sure all VUs aren't executing iterations anymore, for the cancel() @@ -406,7 +406,7 @@ func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC } varr.progress.Modify(pb.WithProgress(progresFn)) - go trackProgress(ctx, maxDurationCtx, regDurationCtx, varr, progresFn) + go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, varr, progresFn) regDurationDone := regDurationCtx.Done() runIterationBasic := getIterationRunner(varr.executionState, varr.logger) @@ -449,7 +449,7 @@ func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC // Since there aren't any free VUs available, consider this iteration // dropped - we aren't going to try to recover it, but - stats.PushIfNotDone(ctx, out, stats.Sample{ + stats.PushIfNotDone(parentCtx, out, stats.Sample{ Value: 1, Metric: metrics.DroppedIterations, Tags: metricTags, Time: time.Now(), }) diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index fe4e0be4390..3320b2095c6 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -533,7 +533,7 @@ var _ lib.Executor = &RampingVUs{} // of a less complex way to implement it (besides the old "increment by 100ms // and see what happens)... :/ so maybe see how it can be split? // nolint:funlen,gocognit -func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) { +func (vlv RampingVUs) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) { rawExecutionSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true) regularDuration, isFinal := lib.GetEndOffset(rawExecutionSteps) if !isFinal { @@ -548,7 +548,7 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) maxVUs := lib.GetMaxPlannedVUs(gracefulExecutionSteps) gracefulStop := maxDuration - regularDuration - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, regularDuration, gracefulStop) + startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, regularDuration, gracefulStop) defer cancel() activeVUs := &sync.WaitGroup{} @@ -575,7 +575,7 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) return float64(spent) / float64(regularDuration), []string{progVUs, progDur} } vlv.progress.Modify(pb.WithProgress(progresFn)) - go trackProgress(ctx, maxDurationCtx, regDurationCtx, vlv, progresFn) + go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, vlv, progresFn) // Actually schedule the VUs and iterations, likely the most complicated // executor among all of them... @@ -633,7 +633,7 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) currentMaxAllowedVUs = newMaxAllowedVUs } - wait := waiter(ctx, startTime) + wait := waiter(parentCtx, startTime) // iterate over rawExecutionSteps and gracefulExecutionSteps in order by TimeOffset // giving rawExecutionSteps precedence. // we stop iterating once rawExecutionSteps are over as we need to run the remaining