Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new dropped_iterations metric #1529

Merged
merged 5 commits into from
Jul 3, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions js/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,11 @@ type ActiveVU struct {
busy chan struct{}
}

// GetID returns the unique VU ID.
func (u *VU) GetID() int64 {
return u.ID
}

// Activate the VU so it will be able to run code.
func (u *VU) Activate(params *lib.VUActivationParams) lib.ActiveVU {
u.Runtime.ClearInterrupt()
Expand Down
15 changes: 15 additions & 0 deletions lib/executor/base_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ package executor

import (
"context"
"strconv"

"github.com/sirupsen/logrus"

"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/ui/pb"
)

Expand Down Expand Up @@ -73,3 +75,16 @@ func (bs BaseExecutor) GetLogger() *logrus.Entry {
func (bs BaseExecutor) GetProgress() *pb.ProgressBar {
return bs.progress
}

// getMetricTags returns a tag set that can be used to emit metrics by the
// executor. The VU ID is optional.
func (bs BaseExecutor) getMetricTags(vuID *int64) *stats.SampleTags {
tags := bs.executionState.Options.RunTags.CloneTags()
if bs.executionState.Options.SystemTags.Has(stats.TagScenario) {
tags["scenario"] = bs.config.GetName()
}
if vuID != nil && bs.executionState.Options.SystemTags.Has(stats.TagVU) {
tags["vu"] = strconv.FormatInt(*vuID, 10)
}
return stats.IntoSampleTags(&tags)
}
7 changes: 6 additions & 1 deletion lib/executor/constant_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/metrics"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/ui/pb"
Expand Down Expand Up @@ -328,6 +329,7 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
)).Duration)

shownWarning := false
metricTags := car.getMetricTags(nil)
for li, gi := 0, start; ; li, gi = li+1, gi+offsets[li%len(offsets)] {
t := notScaledTickerPeriod*time.Duration(gi) - time.Since(startTime)
timer.Reset(t)
Expand All @@ -343,7 +345,10 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
// Since there aren't any free VUs available, consider this iteration
// dropped - we aren't going to try to recover it, but

// TODO: emit a dropped_iterations metric
stats.PushIfNotDone(ctx, out, stats.Sample{
Value: 1, Metric: metrics.DroppedIterations,
Tags: metricTags, Time: time.Now(),
})

// We'll try to start allocating another VU in the background,
// non-blockingly, if we have remainingUnplannedVUs...
Expand Down
36 changes: 36 additions & 0 deletions lib/executor/constant_arrival_rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/metrics"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
)
Expand Down Expand Up @@ -296,3 +297,38 @@ func TestArrivalRateCancel(t *testing.T) {
})
}
}

func TestConstantArrivalRateDroppedIterations(t *testing.T) {
t.Parallel()
var count int64
et, err := lib.NewExecutionTuple(nil, nil)
require.NoError(t, err)

config := &ConstantArrivalRateConfig{
BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0 * time.Second)},
TimeUnit: types.NullDurationFrom(time.Second),
Rate: null.IntFrom(20),
Duration: types.NullDurationFrom(950 * time.Millisecond),
PreAllocatedVUs: null.IntFrom(10),
MaxVUs: null.IntFrom(10),
}

es := lib.NewExecutionState(lib.Options{}, et, 10, 50)
ctx, cancel, executor, logHook := setupExecutor(
t, config, es,
simpleRunner(func(ctx context.Context) error {
atomic.AddInt64(&count, 1)
<-ctx.Done()
return nil
}),
)
defer cancel()
engineOut := make(chan stats.SampleContainer, 1000)
err = executor.Run(ctx, engineOut)
require.NoError(t, err)
logs := logHook.Drain()
require.Len(t, logs, 1)
assert.Contains(t, logs[0].Message, "cannot initialize more")
assert.Equal(t, int64(10), count)
assert.Equal(t, float64(10), sumMetricValues(engineOut, metrics.DroppedIterations.Name))
}
35 changes: 35 additions & 0 deletions lib/executor/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
*
* k6 - a next-generation load testing tool
* Copyright (C) 2020 Load Impact
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package executor

import "github.com/loadimpact/k6/stats"

func sumMetricValues(samples chan stats.SampleContainer, metricName string) (sum float64) {
for _, sc := range stats.GetBufferedSamples(samples) {
samples := sc.GetSamples()
for _, s := range samples {
if s.Metric.Name == metricName {
sum += s.Value
}
}
}
return sum
}
12 changes: 9 additions & 3 deletions lib/executor/per_vu_iterations.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/metrics"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/ui/pb"
Expand Down Expand Up @@ -151,13 +152,13 @@ var _ lib.Executor = &PerVUIterations{}

// Run executes a specific number of iterations with each configured VU.
// nolint:funlen
func (pvi PerVUIterations) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) {
func (pvi PerVUIterations) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) {
na-- marked this conversation as resolved.
Show resolved Hide resolved
numVUs := pvi.config.GetVUs(pvi.executionState.ExecutionTuple)
iterations := pvi.config.GetIterations()
duration := time.Duration(pvi.config.MaxDuration.Duration)
gracefulStop := pvi.config.GetGracefulStop()

startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop)
startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop)
defer cancel()

// Make sure the log and the progress bar have accurate information
Expand Down Expand Up @@ -188,7 +189,7 @@ func (pvi PerVUIterations) Run(ctx context.Context, out chan<- stats.SampleConta
return float64(currentDoneIters) / float64(totalIters), right
}
pvi.progress.Modify(pb.WithProgress(progresFn))
go trackProgress(ctx, maxDurationCtx, regDurationCtx, pvi, progresFn)
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, pvi, progresFn)

// Actually schedule the VUs and iterations...
activeVUs := &sync.WaitGroup{}
Expand All @@ -209,11 +210,16 @@ func (pvi PerVUIterations) Run(ctx context.Context, out chan<- stats.SampleConta
newParams := *activationParams
newParams.RunContext = ctx

vuID := initVU.GetID()
activeVU := initVU.Activate(&newParams)

for i := int64(0); i < iterations; i++ {
select {
case <-regDurationDone:
stats.PushIfNotDone(parentCtx, out, stats.Sample{
Value: float64(iterations - i), Metric: metrics.DroppedIterations,
Tags: pvi.getMetricTags(&vuID), Time: time.Now(),
})
return // don't make more iterations
default:
// continue looping
Expand Down
32 changes: 32 additions & 0 deletions lib/executor/per_vu_iterations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import (
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/metrics"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
)

func getTestPerVUIterationsConfig() PerVUIterationsConfig {
Expand Down Expand Up @@ -124,3 +126,33 @@ func TestPerVUIterationsRunVariableVU(t *testing.T) {
assert.Equal(t, uint64(16), val)
assert.Equal(t, uint64(916), totalIters)
}

func TestPerVuIterationsEmitDroppedIterations(t *testing.T) {
t.Parallel()
var count int64
et, err := lib.NewExecutionTuple(nil, nil)
require.NoError(t, err)

config := PerVUIterationsConfig{
VUs: null.IntFrom(5),
Iterations: null.IntFrom(20),
MaxDuration: types.NullDurationFrom(1 * time.Second),
}

es := lib.NewExecutionState(lib.Options{}, et, 10, 50)
ctx, cancel, executor, logHook := setupExecutor(
t, config, es,
simpleRunner(func(ctx context.Context) error {
atomic.AddInt64(&count, 1)
<-ctx.Done()
return nil
}),
)
defer cancel()
engineOut := make(chan stats.SampleContainer, 1000)
err = executor.Run(ctx, engineOut)
require.NoError(t, err)
assert.Empty(t, logHook.Drain())
assert.Equal(t, int64(5), count)
assert.Equal(t, float64(95), sumMetricValues(engineOut, metrics.DroppedIterations.Name))
}
7 changes: 6 additions & 1 deletion lib/executor/ramping_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/metrics"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/ui/pb"
Expand Down Expand Up @@ -419,6 +420,7 @@ func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
ch := make(chan time.Duration, 10) // buffer 10 iteration times ahead
var prevTime time.Duration
shownWarning := false
metricTags := varr.getMetricTags(nil)
go varr.config.cal(varr.executionState.ExecutionTuple, ch)
for nextTime := range ch {
select {
Expand Down Expand Up @@ -447,7 +449,10 @@ func (varr RampingArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
// Since there aren't any free VUs available, consider this iteration
// dropped - we aren't going to try to recover it, but

// TODO: emit a dropped_iterations metric
stats.PushIfNotDone(ctx, out, stats.Sample{
Value: 1, Metric: metrics.DroppedIterations,
Tags: metricTags, Time: time.Now(),
})

// We'll try to start allocating another VU in the background,
// non-blockingly, if we have remainingUnplannedVUs...
Expand Down
6 changes: 4 additions & 2 deletions lib/executor/ramping_arrival_rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/metrics"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
)
Expand Down Expand Up @@ -188,8 +189,9 @@ func TestRampingArrivalRateRunUnplannedVUs(t *testing.T) {
err = executor.Run(ctx, engineOut)
assert.NoError(t, err)
assert.Empty(t, logHook.Drain())
//TODO: test that the sum of dropped_iteartions and count is 9
// assert.Equal(t, count, int64(9))

droppedIters := sumMetricValues(engineOut, metrics.DroppedIterations.Name)
assert.Equal(t, count+int64(droppedIters), int64(9))
}

func TestRampingArrivalRateRunCorrectRateWithSlowRate(t *testing.T) {
Expand Down
23 changes: 16 additions & 7 deletions lib/executor/shared_iterations.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/metrics"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/ui/pb"
Expand Down Expand Up @@ -181,13 +182,13 @@ func (si *SharedIterations) Init(ctx context.Context) error {
// Run executes a specific total number of iterations, which are all shared by
// the configured VUs.
// nolint:funlen
func (si SharedIterations) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) {
func (si SharedIterations) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) {
numVUs := si.config.GetVUs(si.executionState.ExecutionTuple)
iterations := si.et.ScaleInt64(si.config.Iterations.Int64)
duration := time.Duration(si.config.MaxDuration.Duration)
gracefulStop := si.config.GetGracefulStop()

startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop)
startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop)
defer cancel()

// Make sure the log and the progress bar have accurate information
Expand All @@ -212,17 +213,25 @@ func (si SharedIterations) Run(ctx context.Context, out chan<- stats.SampleConta
return float64(currentDoneIters) / float64(totalIters), right
}
si.progress.Modify(pb.WithProgress(progresFn))
go trackProgress(ctx, maxDurationCtx, regDurationCtx, &si, progresFn)
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, &si, progresFn)

var attemptedIters uint64

// Actually schedule the VUs and iterations...
activeVUs := &sync.WaitGroup{}
defer activeVUs.Wait()
defer func() {
activeVUs.Wait()
if attemptedIters < totalIters {
stats.PushIfNotDone(parentCtx, out, stats.Sample{
Value: float64(totalIters - attemptedIters), Metric: metrics.DroppedIterations,
Tags: si.getMetricTags(nil), Time: time.Now(),
})
imiric marked this conversation as resolved.
Show resolved Hide resolved
}
}()

regDurationDone := regDurationCtx.Done()
runIteration := getIterationRunner(si.executionState, si.logger)

attemptedIters := new(uint64)

activationParams := getVUActivationParams(maxDurationCtx, si.config.BaseConfig,
func(u lib.InitializedVU) {
si.executionState.ReturnVU(u, true)
Expand All @@ -245,7 +254,7 @@ func (si SharedIterations) Run(ctx context.Context, out chan<- stats.SampleConta
// continue looping
}

attemptedIterNumber := atomic.AddUint64(attemptedIters, 1)
attemptedIterNumber := atomic.AddUint64(&attemptedIters, 1)
if attemptedIterNumber > totalIters {
return
}
Expand Down
Loading