From f0731b4c42092487cadb2b20d17afb22444cf7b1 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Thu, 2 Jul 2020 15:37:17 +0300 Subject: [PATCH 1/5] Add a new dropped_iterations metric --- js/runner.go | 5 +++++ lib/executor/base_executor.go | 15 +++++++++++++++ lib/executor/constant_arrival_rate.go | 7 ++++++- lib/executor/per_vu_iterations.go | 6 ++++++ lib/executor/ramping_arrival_rate.go | 7 ++++++- lib/executor/shared_iterations.go | 17 +++++++++++++---- lib/metrics/metrics.go | 1 + lib/runner.go | 3 +++ lib/testutils/minirunner/minirunner.go | 5 +++++ 9 files changed, 60 insertions(+), 6 deletions(-) diff --git a/js/runner.go b/js/runner.go index ce6c80d4060..a25ce877c69 100644 --- a/js/runner.go +++ b/js/runner.go @@ -413,6 +413,11 @@ type ActiveVU struct { busy chan struct{} } +// GetID returns the unique VU ID. +func (u *VU) GetID() int64 { + return u.ID +} + // Activate the VU so it will be able to run code. func (u *VU) Activate(params *lib.VUActivationParams) lib.ActiveVU { u.Runtime.ClearInterrupt() diff --git a/lib/executor/base_executor.go b/lib/executor/base_executor.go index 95a7b04b286..f43377983e6 100644 --- a/lib/executor/base_executor.go +++ b/lib/executor/base_executor.go @@ -22,10 +22,12 @@ package executor import ( "context" + "strconv" "github.com/sirupsen/logrus" "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/stats" "github.com/loadimpact/k6/ui/pb" ) @@ -73,3 +75,16 @@ func (bs BaseExecutor) GetLogger() *logrus.Entry { func (bs BaseExecutor) GetProgress() *pb.ProgressBar { return bs.progress } + +// getMetricTags returns a tag set that can be used to emit metrics by the +// executor. The VU ID is optional. +func (bs BaseExecutor) getMetricTags(vuID *int64) *stats.SampleTags { + tags := bs.executionState.Options.RunTags.CloneTags() + if bs.executionState.Options.SystemTags.Has(stats.TagScenario) { + tags["scenario"] = bs.config.GetName() + } + if vuID != nil && bs.executionState.Options.SystemTags.Has(stats.TagVU) { + tags["vu"] = strconv.FormatInt(*vuID, 10) + } + return stats.IntoSampleTags(&tags) +} diff --git a/lib/executor/constant_arrival_rate.go b/lib/executor/constant_arrival_rate.go index b8f44c86777..c2df4554df6 100644 --- a/lib/executor/constant_arrival_rate.go +++ b/lib/executor/constant_arrival_rate.go @@ -33,6 +33,7 @@ import ( "gopkg.in/guregu/null.v3" "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" "github.com/loadimpact/k6/ui/pb" @@ -328,6 +329,7 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC )).Duration) shownWarning := false + metricTags := car.getMetricTags(nil) for li, gi := 0, start; ; li, gi = li+1, gi+offsets[li%len(offsets)] { t := notScaledTickerPeriod*time.Duration(gi) - time.Since(startTime) timer.Reset(t) @@ -343,7 +345,10 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC // Since there aren't any free VUs available, consider this iteration // dropped - we aren't going to try to recover it, but - // TODO: emit a dropped_iterations metric + stats.PushIfNotDone(ctx, out, stats.Sample{ + Value: 1, Metric: metrics.DroppedIterations, + Tags: metricTags, Time: time.Now(), + }) // We'll try to start allocating another VU in the background, // non-blockingly, if we have remainingUnplannedVUs... diff --git a/lib/executor/per_vu_iterations.go b/lib/executor/per_vu_iterations.go index 6a06167d3f3..58167fb2e36 100644 --- a/lib/executor/per_vu_iterations.go +++ b/lib/executor/per_vu_iterations.go @@ -31,6 +31,7 @@ import ( "gopkg.in/guregu/null.v3" "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" "github.com/loadimpact/k6/ui/pb" @@ -209,11 +210,16 @@ func (pvi PerVUIterations) Run(ctx context.Context, out chan<- stats.SampleConta newParams := *activationParams newParams.RunContext = ctx + vuID := initVU.GetID() activeVU := initVU.Activate(&newParams) for i := int64(0); i < iterations; i++ { select { case <-regDurationDone: + stats.PushIfNotDone(ctx, out, stats.Sample{ + Value: float64(iterations - i), Metric: metrics.DroppedIterations, + Tags: pvi.getMetricTags(&vuID), Time: time.Now(), + }) return // don't make more iterations default: // continue looping diff --git a/lib/executor/ramping_arrival_rate.go b/lib/executor/ramping_arrival_rate.go index 321ca87da06..e72c8861b1e 100644 --- a/lib/executor/ramping_arrival_rate.go +++ b/lib/executor/ramping_arrival_rate.go @@ -32,6 +32,7 @@ import ( "gopkg.in/guregu/null.v3" "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" "github.com/loadimpact/k6/ui/pb" @@ -419,6 +420,7 @@ func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC ch := make(chan time.Duration, 10) // buffer 10 iteration times ahead var prevTime time.Duration shownWarning := false + metricTags := varr.getMetricTags(nil) go varr.config.cal(varr.executionState.ExecutionTuple, ch) for nextTime := range ch { select { @@ -447,7 +449,10 @@ func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC // Since there aren't any free VUs available, consider this iteration // dropped - we aren't going to try to recover it, but - // TODO: emit a dropped_iterations metric + stats.PushIfNotDone(ctx, out, stats.Sample{ + Value: 1, Metric: metrics.DroppedIterations, + Tags: metricTags, Time: time.Now(), + }) // We'll try to start allocating another VU in the background, // non-blockingly, if we have remainingUnplannedVUs... diff --git a/lib/executor/shared_iterations.go b/lib/executor/shared_iterations.go index 92ea27f5955..0ba54fcbfaa 100644 --- a/lib/executor/shared_iterations.go +++ b/lib/executor/shared_iterations.go @@ -31,6 +31,7 @@ import ( "gopkg.in/guregu/null.v3" "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" "github.com/loadimpact/k6/ui/pb" @@ -214,15 +215,23 @@ func (si SharedIterations) Run(ctx context.Context, out chan<- stats.SampleConta si.progress.Modify(pb.WithProgress(progresFn)) go trackProgress(ctx, maxDurationCtx, regDurationCtx, &si, progresFn) + var attemptedIters uint64 + // Actually schedule the VUs and iterations... activeVUs := &sync.WaitGroup{} - defer activeVUs.Wait() + defer func() { + activeVUs.Wait() + if attemptedIters < totalIters { + stats.PushIfNotDone(ctx, out, stats.Sample{ + Value: float64(totalIters - attemptedIters), Metric: metrics.DroppedIterations, + Tags: si.getMetricTags(nil), Time: time.Now(), + }) + } + }() regDurationDone := regDurationCtx.Done() runIteration := getIterationRunner(si.executionState, si.logger) - attemptedIters := new(uint64) - activationParams := getVUActivationParams(maxDurationCtx, si.config.BaseConfig, func(u lib.InitializedVU) { si.executionState.ReturnVU(u, true) @@ -245,7 +254,7 @@ func (si SharedIterations) Run(ctx context.Context, out chan<- stats.SampleConta // continue looping } - attemptedIterNumber := atomic.AddUint64(attemptedIters, 1) + attemptedIterNumber := atomic.AddUint64(&attemptedIters, 1) if attemptedIterNumber > totalIters { return } diff --git a/lib/metrics/metrics.go b/lib/metrics/metrics.go index 33e1f579eff..4c0d025ca53 100644 --- a/lib/metrics/metrics.go +++ b/lib/metrics/metrics.go @@ -33,6 +33,7 @@ var ( VUsMax = stats.New("vus_max", stats.Gauge) Iterations = stats.New("iterations", stats.Counter) IterationDuration = stats.New("iteration_duration", stats.Trend, stats.Time) + DroppedIterations = stats.New("dropped_iterations", stats.Counter) Errors = stats.New("errors", stats.Counter) // Runner-emitted. diff --git a/lib/runner.go b/lib/runner.go index e444734d5a2..b874f3ca1c8 100644 --- a/lib/runner.go +++ b/lib/runner.go @@ -41,6 +41,9 @@ type ActiveVU interface { type InitializedVU interface { // Fully activate the VU so it will be able to run code Activate(*VUActivationParams) ActiveVU + + // GetID returns the unique VU ID + GetID() int64 } // VUActivationParams are supplied by each executor when it retrieves a VU from diff --git a/lib/testutils/minirunner/minirunner.go b/lib/testutils/minirunner/minirunner.go index 63dcce0fbed..6ca17adc406 100644 --- a/lib/testutils/minirunner/minirunner.go +++ b/lib/testutils/minirunner/minirunner.go @@ -127,6 +127,11 @@ type ActiveVU struct { busy chan struct{} } +// GetID returns the unique VU ID. +func (vu *VU) GetID() int64 { + return vu.ID +} + // Activate the VU so it will be able to run code. func (vu *VU) Activate(params *lib.VUActivationParams) lib.ActiveVU { avu := &ActiveVU{ From 2ad16fa29d53c99e185ccdcb61c759a21ad4a3c1 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Fri, 3 Jul 2020 09:25:24 +0300 Subject: [PATCH 2/5] Fix a context bug and add tests --- lib/executor/constant_arrival_rate_test.go | 36 ++++++++++++++++++++++ lib/executor/helpers_test.go | 36 ++++++++++++++++++++++ lib/executor/per_vu_iterations.go | 8 ++--- lib/executor/per_vu_iterations_test.go | 32 +++++++++++++++++++ lib/executor/ramping_arrival_rate_test.go | 6 ++-- lib/executor/shared_iterations.go | 8 ++--- lib/executor/shared_iterations_test.go | 32 +++++++++++++++++++ 7 files changed, 148 insertions(+), 10 deletions(-) create mode 100644 lib/executor/helpers_test.go diff --git a/lib/executor/constant_arrival_rate_test.go b/lib/executor/constant_arrival_rate_test.go index 36eca62e41a..7f20ec26844 100644 --- a/lib/executor/constant_arrival_rate_test.go +++ b/lib/executor/constant_arrival_rate_test.go @@ -34,6 +34,7 @@ import ( "gopkg.in/guregu/null.v3" "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" ) @@ -296,3 +297,38 @@ func TestArrivalRateCancel(t *testing.T) { }) } } + +func TestConstantArrivalRateDroppedIterations(t *testing.T) { + t.Parallel() + var count int64 + et, err := lib.NewExecutionTuple(nil, nil) + require.NoError(t, err) + + config := &ConstantArrivalRateConfig{ + BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0 * time.Second)}, + TimeUnit: types.NullDurationFrom(time.Second), + Rate: null.IntFrom(20), + Duration: types.NullDurationFrom(1 * time.Second), + PreAllocatedVUs: null.IntFrom(10), + MaxVUs: null.IntFrom(10), + } + + es := lib.NewExecutionState(lib.Options{}, et, 10, 50) + ctx, cancel, executor, logHook := setupExecutor( + t, config, es, + simpleRunner(func(ctx context.Context) error { + atomic.AddInt64(&count, 1) + <-ctx.Done() + return nil + }), + ) + defer cancel() + engineOut := make(chan stats.SampleContainer, 1000) + err = executor.Run(ctx, engineOut) + require.NoError(t, err) + logs := logHook.Drain() + require.Len(t, logs, 1) + assert.Contains(t, logs[0].Message, "cannot initialize more") + assert.Equal(t, int64(10), count) + assert.Equal(t, float64(10), sumMetricValues(engineOut, metrics.DroppedIterations.Name)) +} diff --git a/lib/executor/helpers_test.go b/lib/executor/helpers_test.go new file mode 100644 index 00000000000..a377a116798 --- /dev/null +++ b/lib/executor/helpers_test.go @@ -0,0 +1,36 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2020 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package executor + +import "github.com/loadimpact/k6/stats" + +func sumMetricValues(samples chan stats.SampleContainer, metricName string) (sum float64) { + bufferedSmaples := stats.GetBufferedSamples(samples) + for _, sc := range bufferedSmaples { + samples := sc.GetSamples() + for _, s := range samples { + if s.Metric.Name == metricName { + sum += s.Value + } + } + } + return sum +} diff --git a/lib/executor/per_vu_iterations.go b/lib/executor/per_vu_iterations.go index 58167fb2e36..15419fad6b6 100644 --- a/lib/executor/per_vu_iterations.go +++ b/lib/executor/per_vu_iterations.go @@ -152,13 +152,13 @@ var _ lib.Executor = &PerVUIterations{} // Run executes a specific number of iterations with each configured VU. // nolint:funlen -func (pvi PerVUIterations) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) { +func (pvi PerVUIterations) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) { numVUs := pvi.config.GetVUs(pvi.executionState.ExecutionTuple) iterations := pvi.config.GetIterations() duration := time.Duration(pvi.config.MaxDuration.Duration) gracefulStop := pvi.config.GetGracefulStop() - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop) + startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop) defer cancel() // Make sure the log and the progress bar have accurate information @@ -189,7 +189,7 @@ func (pvi PerVUIterations) Run(ctx context.Context, out chan<- stats.SampleConta return float64(currentDoneIters) / float64(totalIters), right } pvi.progress.Modify(pb.WithProgress(progresFn)) - go trackProgress(ctx, maxDurationCtx, regDurationCtx, pvi, progresFn) + go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, pvi, progresFn) // Actually schedule the VUs and iterations... activeVUs := &sync.WaitGroup{} @@ -216,7 +216,7 @@ func (pvi PerVUIterations) Run(ctx context.Context, out chan<- stats.SampleConta for i := int64(0); i < iterations; i++ { select { case <-regDurationDone: - stats.PushIfNotDone(ctx, out, stats.Sample{ + stats.PushIfNotDone(parentCtx, out, stats.Sample{ Value: float64(iterations - i), Metric: metrics.DroppedIterations, Tags: pvi.getMetricTags(&vuID), Time: time.Now(), }) diff --git a/lib/executor/per_vu_iterations_test.go b/lib/executor/per_vu_iterations_test.go index 7811a811ee6..4eb57303390 100644 --- a/lib/executor/per_vu_iterations_test.go +++ b/lib/executor/per_vu_iterations_test.go @@ -32,7 +32,9 @@ import ( "gopkg.in/guregu/null.v3" "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/types" + "github.com/loadimpact/k6/stats" ) func getTestPerVUIterationsConfig() PerVUIterationsConfig { @@ -124,3 +126,33 @@ func TestPerVUIterationsRunVariableVU(t *testing.T) { assert.Equal(t, uint64(16), val) assert.Equal(t, uint64(916), totalIters) } + +func TestPerVuIterationsEmitDroppedIterations(t *testing.T) { + t.Parallel() + var count int64 + et, err := lib.NewExecutionTuple(nil, nil) + require.NoError(t, err) + + config := PerVUIterationsConfig{ + VUs: null.IntFrom(5), + Iterations: null.IntFrom(20), + MaxDuration: types.NullDurationFrom(1 * time.Second), + } + + es := lib.NewExecutionState(lib.Options{}, et, 10, 50) + ctx, cancel, executor, logHook := setupExecutor( + t, config, es, + simpleRunner(func(ctx context.Context) error { + atomic.AddInt64(&count, 1) + <-ctx.Done() + return nil + }), + ) + defer cancel() + engineOut := make(chan stats.SampleContainer, 1000) + err = executor.Run(ctx, engineOut) + require.NoError(t, err) + assert.Empty(t, logHook.Drain()) + assert.Equal(t, int64(5), count) + assert.Equal(t, float64(95), sumMetricValues(engineOut, metrics.DroppedIterations.Name)) +} diff --git a/lib/executor/ramping_arrival_rate_test.go b/lib/executor/ramping_arrival_rate_test.go index 7499d058023..0b830ef62a1 100644 --- a/lib/executor/ramping_arrival_rate_test.go +++ b/lib/executor/ramping_arrival_rate_test.go @@ -35,6 +35,7 @@ import ( "gopkg.in/guregu/null.v3" "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" ) @@ -188,8 +189,9 @@ func TestRampingArrivalRateRunUnplannedVUs(t *testing.T) { err = executor.Run(ctx, engineOut) assert.NoError(t, err) assert.Empty(t, logHook.Drain()) - //TODO: test that the sum of dropped_iteartions and count is 9 - // assert.Equal(t, count, int64(9)) + + droppedIters := sumMetricValues(engineOut, metrics.DroppedIterations.Name) + assert.Equal(t, count+int64(droppedIters), int64(9)) } func TestRampingArrivalRateRunCorrectRateWithSlowRate(t *testing.T) { diff --git a/lib/executor/shared_iterations.go b/lib/executor/shared_iterations.go index 0ba54fcbfaa..10cfa164861 100644 --- a/lib/executor/shared_iterations.go +++ b/lib/executor/shared_iterations.go @@ -182,13 +182,13 @@ func (si *SharedIterations) Init(ctx context.Context) error { // Run executes a specific total number of iterations, which are all shared by // the configured VUs. // nolint:funlen -func (si SharedIterations) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) { +func (si SharedIterations) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) { numVUs := si.config.GetVUs(si.executionState.ExecutionTuple) iterations := si.et.ScaleInt64(si.config.Iterations.Int64) duration := time.Duration(si.config.MaxDuration.Duration) gracefulStop := si.config.GetGracefulStop() - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop) + startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop) defer cancel() // Make sure the log and the progress bar have accurate information @@ -213,7 +213,7 @@ func (si SharedIterations) Run(ctx context.Context, out chan<- stats.SampleConta return float64(currentDoneIters) / float64(totalIters), right } si.progress.Modify(pb.WithProgress(progresFn)) - go trackProgress(ctx, maxDurationCtx, regDurationCtx, &si, progresFn) + go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, &si, progresFn) var attemptedIters uint64 @@ -222,7 +222,7 @@ func (si SharedIterations) Run(ctx context.Context, out chan<- stats.SampleConta defer func() { activeVUs.Wait() if attemptedIters < totalIters { - stats.PushIfNotDone(ctx, out, stats.Sample{ + stats.PushIfNotDone(parentCtx, out, stats.Sample{ Value: float64(totalIters - attemptedIters), Metric: metrics.DroppedIterations, Tags: si.getMetricTags(nil), Time: time.Now(), }) diff --git a/lib/executor/shared_iterations_test.go b/lib/executor/shared_iterations_test.go index f0b7cd12fcd..65f0d73bc1c 100644 --- a/lib/executor/shared_iterations_test.go +++ b/lib/executor/shared_iterations_test.go @@ -32,7 +32,9 @@ import ( "gopkg.in/guregu/null.v3" "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/types" + "github.com/loadimpact/k6/stats" ) func getTestSharedIterationsConfig() SharedIterationsConfig { @@ -109,3 +111,33 @@ func TestSharedIterationsRunVariableVU(t *testing.T) { assert.Equal(t, uint64(2), val) assert.Equal(t, uint64(100), totalIters) } + +func TestSharedIterationsEmitDroppedIterations(t *testing.T) { + t.Parallel() + var count int64 + et, err := lib.NewExecutionTuple(nil, nil) + require.NoError(t, err) + + config := &SharedIterationsConfig{ + VUs: null.IntFrom(5), + Iterations: null.IntFrom(100), + MaxDuration: types.NullDurationFrom(1 * time.Second), + } + + es := lib.NewExecutionState(lib.Options{}, et, 10, 50) + ctx, cancel, executor, logHook := setupExecutor( + t, config, es, + simpleRunner(func(ctx context.Context) error { + atomic.AddInt64(&count, 1) + <-ctx.Done() + return nil + }), + ) + defer cancel() + engineOut := make(chan stats.SampleContainer, 1000) + err = executor.Run(ctx, engineOut) + require.NoError(t, err) + assert.Empty(t, logHook.Drain()) + assert.Equal(t, int64(5), count) + assert.Equal(t, float64(95), sumMetricValues(engineOut, metrics.DroppedIterations.Name)) +} From f53738baa2960dd5e68e2202da8449652e268244 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Fri, 3 Jul 2020 10:11:55 +0300 Subject: [PATCH 3/5] Fix a flaky test --- lib/executor/constant_arrival_rate_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/executor/constant_arrival_rate_test.go b/lib/executor/constant_arrival_rate_test.go index 7f20ec26844..050d1b430d0 100644 --- a/lib/executor/constant_arrival_rate_test.go +++ b/lib/executor/constant_arrival_rate_test.go @@ -308,7 +308,7 @@ func TestConstantArrivalRateDroppedIterations(t *testing.T) { BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0 * time.Second)}, TimeUnit: types.NullDurationFrom(time.Second), Rate: null.IntFrom(20), - Duration: types.NullDurationFrom(1 * time.Second), + Duration: types.NullDurationFrom(950 * time.Millisecond), PreAllocatedVUs: null.IntFrom(10), MaxVUs: null.IntFrom(10), } From 2153af88c7414d8b47342f555b3aa886d5de0e40 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Fri, 3 Jul 2020 10:49:59 +0300 Subject: [PATCH 4/5] Fix a typo by simplifying the code --- lib/executor/helpers_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/executor/helpers_test.go b/lib/executor/helpers_test.go index a377a116798..81dd825b8d3 100644 --- a/lib/executor/helpers_test.go +++ b/lib/executor/helpers_test.go @@ -23,8 +23,7 @@ package executor import "github.com/loadimpact/k6/stats" func sumMetricValues(samples chan stats.SampleContainer, metricName string) (sum float64) { - bufferedSmaples := stats.GetBufferedSamples(samples) - for _, sc := range bufferedSmaples { + for _, sc := range stats.GetBufferedSamples(samples) { samples := sc.GetSamples() for _, s := range samples { if s.Metric.Name == metricName { From 966510ca52aa4ec054f0d5ab3d50dff6e2783967 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Fri, 3 Jul 2020 16:16:37 +0300 Subject: [PATCH 5/5] Consistently use parentCtx in the executors' Run() methods --- lib/executor/constant_arrival_rate.go | 9 +++++---- lib/executor/constant_vus.go | 6 +++--- lib/executor/ramping_arrival_rate.go | 8 ++++---- lib/executor/ramping_vus.go | 8 ++++---- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/lib/executor/constant_arrival_rate.go b/lib/executor/constant_arrival_rate.go index c2df4554df6..751b424c39d 100644 --- a/lib/executor/constant_arrival_rate.go +++ b/lib/executor/constant_arrival_rate.go @@ -209,7 +209,8 @@ func (car *ConstantArrivalRate) Init(ctx context.Context) error { // time should iteration X begin) different, but keep everything else the same. // This will allow us to implement https://github.com/loadimpact/k6/issues/1386 // and things like all of the TODOs below in one place only. -func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) { //nolint:funlen +//nolint:funlen +func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) { gracefulStop := car.config.GetGracefulStop() duration := time.Duration(car.config.Duration.Duration) preAllocatedVUs := car.config.GetPreAllocatedVUs(car.executionState.ExecutionTuple) @@ -228,7 +229,7 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC activeVUsWg := &sync.WaitGroup{} returnedVUs := make(chan struct{}) - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop) + startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop) defer func() { // Make sure all VUs aren't executing iterations anymore, for the cancel() @@ -310,7 +311,7 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC return math.Min(1, float64(spent)/float64(duration)), right } car.progress.Modify(pb.WithProgress(progresFn)) - go trackProgress(ctx, maxDurationCtx, regDurationCtx, &car, progresFn) + go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, &car, progresFn) runIterationBasic := getIterationRunner(car.executionState, car.logger) runIteration := func(vu lib.ActiveVU) { @@ -345,7 +346,7 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC // Since there aren't any free VUs available, consider this iteration // dropped - we aren't going to try to recover it, but - stats.PushIfNotDone(ctx, out, stats.Sample{ + stats.PushIfNotDone(parentCtx, out, stats.Sample{ Value: 1, Metric: metrics.DroppedIterations, Tags: metricTags, Time: time.Now(), }) diff --git a/lib/executor/constant_vus.go b/lib/executor/constant_vus.go index ba4249ea982..be21030b285 100644 --- a/lib/executor/constant_vus.go +++ b/lib/executor/constant_vus.go @@ -142,12 +142,12 @@ var _ lib.Executor = &ConstantVUs{} // Run constantly loops through as many iterations as possible on a fixed number // of VUs for the specified duration. -func (clv ConstantVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) { +func (clv ConstantVUs) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) { numVUs := clv.config.GetVUs(clv.executionState.ExecutionTuple) duration := time.Duration(clv.config.Duration.Duration) gracefulStop := clv.config.GetGracefulStop() - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop) + startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop) defer cancel() // Make sure the log and the progress bar have accurate information @@ -167,7 +167,7 @@ func (clv ConstantVUs) Run(ctx context.Context, out chan<- stats.SampleContainer return float64(spent) / float64(duration), right } clv.progress.Modify(pb.WithProgress(progresFn)) - go trackProgress(ctx, maxDurationCtx, regDurationCtx, clv, progresFn) + go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, clv, progresFn) // Actually schedule the VUs and iterations... activeVUs := &sync.WaitGroup{} diff --git a/lib/executor/ramping_arrival_rate.go b/lib/executor/ramping_arrival_rate.go index e72c8861b1e..98503ce944e 100644 --- a/lib/executor/ramping_arrival_rate.go +++ b/lib/executor/ramping_arrival_rate.go @@ -291,7 +291,7 @@ func (varc RampingArrivalRateConfig) cal(et *lib.ExecutionTuple, ch chan<- time. // This will allow us to implement https://github.com/loadimpact/k6/issues/1386 // and things like all of the TODOs below in one place only. //nolint:funlen,gocognit -func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) { +func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) { segment := varr.executionState.ExecutionTuple.Segment gracefulStop := varr.config.GetGracefulStop() duration := sumStagesDuration(varr.config.Stages) @@ -314,7 +314,7 @@ func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC activeVUsWg := &sync.WaitGroup{} returnedVUs := make(chan struct{}) - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop) + startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop) defer func() { // Make sure all VUs aren't executing iterations anymore, for the cancel() @@ -406,7 +406,7 @@ func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC } varr.progress.Modify(pb.WithProgress(progresFn)) - go trackProgress(ctx, maxDurationCtx, regDurationCtx, varr, progresFn) + go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, varr, progresFn) regDurationDone := regDurationCtx.Done() runIterationBasic := getIterationRunner(varr.executionState, varr.logger) @@ -449,7 +449,7 @@ func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC // Since there aren't any free VUs available, consider this iteration // dropped - we aren't going to try to recover it, but - stats.PushIfNotDone(ctx, out, stats.Sample{ + stats.PushIfNotDone(parentCtx, out, stats.Sample{ Value: 1, Metric: metrics.DroppedIterations, Tags: metricTags, Time: time.Now(), }) diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index fe4e0be4390..3320b2095c6 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -533,7 +533,7 @@ var _ lib.Executor = &RampingVUs{} // of a less complex way to implement it (besides the old "increment by 100ms // and see what happens)... :/ so maybe see how it can be split? // nolint:funlen,gocognit -func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) { +func (vlv RampingVUs) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) { rawExecutionSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true) regularDuration, isFinal := lib.GetEndOffset(rawExecutionSteps) if !isFinal { @@ -548,7 +548,7 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) maxVUs := lib.GetMaxPlannedVUs(gracefulExecutionSteps) gracefulStop := maxDuration - regularDuration - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, regularDuration, gracefulStop) + startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, regularDuration, gracefulStop) defer cancel() activeVUs := &sync.WaitGroup{} @@ -575,7 +575,7 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) return float64(spent) / float64(regularDuration), []string{progVUs, progDur} } vlv.progress.Modify(pb.WithProgress(progresFn)) - go trackProgress(ctx, maxDurationCtx, regDurationCtx, vlv, progresFn) + go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, vlv, progresFn) // Actually schedule the VUs and iterations, likely the most complicated // executor among all of them... @@ -633,7 +633,7 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) currentMaxAllowedVUs = newMaxAllowedVUs } - wait := waiter(ctx, startTime) + wait := waiter(parentCtx, startTime) // iterate over rawExecutionSteps and gracefulExecutionSteps in order by TimeOffset // giving rawExecutionSteps precedence. // we stop iterating once rawExecutionSteps are over as we need to run the remaining