Skip to content

Commit

Permalink
Add getScenarioStats to k6/execution
Browse files Browse the repository at this point in the history
Part of #1320
  • Loading branch information
Ivan Mirić committed May 27, 2021
1 parent 304a27b commit 2480df8
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 60 deletions.
18 changes: 18 additions & 0 deletions js/modules/k6/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,21 @@ func (e *Execution) GetVUStats(ctx context.Context) (map[string]interface{}, err

return out, nil
}

// GetScenarioStats returns information about the currently executing scenario.
func (e *Execution) GetScenarioStats(ctx context.Context) (map[string]interface{}, error) {
ss := lib.GetScenarioState(ctx)
if ss == nil {
return nil, errors.New("scenario information can only be returned from an exported function")
}

progress, _ := ss.ProgressFn()
out := map[string]interface{}{
"name": ss.Name,
"executor": ss.Executor,
"startTime": ss.StartTime,
"progress": progress,
}

return out, nil
}
32 changes: 31 additions & 1 deletion js/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,24 @@ func TestExecutionStats(t *testing.T) {
var exec = require('k6/execution');
exec.getVUStats();
`, "VU information can only be returned from an exported function"},
{"scenario_ok", `
var exec = require('k6/execution');
var sleep = require('k6').sleep;
exports.default = function() {
var ss = exec.getScenarioStats();
sleep(0.1);
// goja's Date handling is weird, see https://github.com/dop251/goja/issues/170
var startTime = new Date(JSON.parse(JSON.stringify(ss.startTime)));
if (ss.name !== 'default') throw new Error('unexpected scenario name: '+ss.name);
if (ss.executor !== 'test-exec') throw new Error('unexpected executor: '+ss.name);
if (startTime > new Date()) throw new Error('unexpected startTime: '+startTime);
if (ss.progress !== 0.1) throw new Error('unexpected progress: '+ss.progress);
}`, ""},
{"scenario_err", `
var exec = require('k6/execution');
exec.getScenarioStats();
`, "scenario information can only be returned from an exported function"},
}

for _, tc := range testCases {
Expand All @@ -2059,7 +2077,19 @@ func TestExecutionStats(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx})

ctx = lib.WithScenarioState(ctx, &lib.ScenarioState{
Name: "default",
Executor: "test-exec",
StartTime: time.Now(),
ProgressFn: func() (float64, []string) {
return 0.1, nil
},
})
vu := initVU.Activate(&lib.VUActivationParams{
RunContext: ctx,
Exec: "default",
})

err = vu.RunOnce()
assert.NoError(t, err)
Expand Down
15 changes: 15 additions & 0 deletions lib/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ctxKey int

const (
ctxKeyState ctxKey = iota
ctxKeyScenario
)

func WithState(ctx context.Context, state *State) context.Context {
Expand All @@ -39,3 +40,17 @@ func GetState(ctx context.Context) *State {
}
return v.(*State)
}

// WithScenarioState embeds a ScenarioState in ctx.
func WithScenarioState(ctx context.Context, s *ScenarioState) context.Context {
return context.WithValue(ctx, ctxKeyScenario, s)
}

// GetScenarioState returns a ScenarioState from ctx.
func GetScenarioState(ctx context.Context) *ScenarioState {
v := ctx.Value(ctxKeyScenario)
if v == nil {
return nil
}
return v.(*ScenarioState)
}
56 changes: 31 additions & 25 deletions lib/executor/constant_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,41 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
}()
activeVUsCount := uint64(0)

vusFmt := pb.GetFixedLengthIntFormat(maxVUs)
progIters := fmt.Sprintf(
pb.GetFixedLengthFloatFormat(arrivalRatePerSec, 0)+" iters/s", arrivalRatePerSec)
progressFn := func() (float64, []string) {
spent := time.Since(startTime)
currActiveVUs := atomic.LoadUint64(&activeVUsCount)
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs",
vusPool.Running(), currActiveVUs)

right := []string{progVUs, duration.String(), progIters}

if spent > duration {
return 1, right
}

spentDuration := pb.GetFixedLengthDuration(spent, duration)
progDur := fmt.Sprintf("%s/%s", spentDuration, duration)
right[1] = progDur

return math.Min(1, float64(spent)/float64(duration)), right
}
car.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, &car, progressFn)

maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{
Name: car.config.Name,
Executor: car.config.Type,
StartTime: startTime,
ProgressFn: progressFn,
})

returnVU := func(u lib.InitializedVU) {
car.executionState.ReturnVU(u, true)
activeVUsWg.Done()
}
runIterationBasic := getIterationRunner(car.executionState, car.logger)
activateVU := func(initVU lib.InitializedVU) lib.ActiveVU {
activeVUsWg.Add(1)
activeVU := initVU.Activate(getVUActivationParams(maxDurationCtx, car.config.BaseConfig, returnVU))
Expand Down Expand Up @@ -284,30 +314,6 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
activateVU(initVU)
}

vusFmt := pb.GetFixedLengthIntFormat(maxVUs)
progIters := fmt.Sprintf(
pb.GetFixedLengthFloatFormat(arrivalRatePerSec, 0)+" iters/s", arrivalRatePerSec)
progressFn := func() (float64, []string) {
spent := time.Since(startTime)
currActiveVUs := atomic.LoadUint64(&activeVUsCount)
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs",
vusPool.Running(), currActiveVUs)

right := []string{progVUs, duration.String(), progIters}

if spent > duration {
return 1, right
}

spentDuration := pb.GetFixedLengthDuration(spent, duration)
progDur := fmt.Sprintf("%s/%s", spentDuration, duration)
right[1] = progDur

return math.Min(1, float64(spent)/float64(duration)), right
}
car.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, &car, progressFn)

start, offsets, _ := car.et.GetStripedOffsets()
timer := time.NewTimer(time.Hour * 24)
// here the we need the not scaled one
Expand Down
8 changes: 8 additions & 0 deletions lib/executor/constant_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,18 @@ func (clv ConstantVUs) Run(parentCtx context.Context, out chan<- stats.SampleCon
regDurationDone := regDurationCtx.Done()
runIteration := getIterationRunner(clv.executionState, clv.logger)

maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{
Name: clv.config.Name,
Executor: clv.config.Type,
StartTime: startTime,
ProgressFn: progressFn,
})

returnVU := func(u lib.InitializedVU) {
clv.executionState.ReturnVU(u, true)
activeVUs.Done()
}

handleVU := func(initVU lib.InitializedVU) {
ctx, cancel := context.WithCancel(maxDurationCtx)
defer cancel()
Expand Down
7 changes: 7 additions & 0 deletions lib/executor/externally_controlled.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,13 @@ func (mex *ExternallyControlled) Run(parentCtx context.Context, out chan<- stats
return err
}

ctx = lib.WithScenarioState(ctx, &lib.ScenarioState{
Name: mex.config.Name,
Executor: mex.config.Type,
StartTime: time.Now(),
ProgressFn: runState.progressFn,
})

mex.progress.Modify(pb.WithProgress(runState.progressFn)) // Keep track of the progress
go trackProgress(parentCtx, ctx, ctx, mex, runState.progressFn)

Expand Down
8 changes: 8 additions & 0 deletions lib/executor/per_vu_iterations.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,18 @@ func (pvi PerVUIterations) Run(parentCtx context.Context, out chan<- stats.Sampl
regDurationDone := regDurationCtx.Done()
runIteration := getIterationRunner(pvi.executionState, pvi.logger)

maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{
Name: pvi.config.Name,
Executor: pvi.config.Type,
StartTime: startTime,
ProgressFn: progressFn,
})

returnVU := func(u lib.InitializedVU) {
pvi.executionState.ReturnVU(u, true)
activeVUs.Done()
}

handleVU := func(initVU lib.InitializedVU) {
defer handleVUsWG.Done()
ctx, cancel := context.WithCancel(maxDurationCtx)
Expand Down
73 changes: 39 additions & 34 deletions lib/executor/ramping_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,45 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
}()

activeVUsCount := uint64(0)
tickerPeriod := int64(startTickerPeriod.Duration)
vusFmt := pb.GetFixedLengthIntFormat(maxVUs)
itersFmt := pb.GetFixedLengthFloatFormat(maxArrivalRatePerSec, 0) + " iters/s"

progressFn := func() (float64, []string) {
currActiveVUs := atomic.LoadUint64(&activeVUsCount)
currentTickerPeriod := atomic.LoadInt64(&tickerPeriod)
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs",
vusPool.Running(), currActiveVUs)

itersPerSec := 0.0
if currentTickerPeriod > 0 {
itersPerSec = float64(time.Second) / float64(currentTickerPeriod)
}
progIters := fmt.Sprintf(itersFmt, itersPerSec)

right := []string{progVUs, duration.String(), progIters}

spent := time.Since(startTime)
if spent > duration {
return 1, right
}

spentDuration := pb.GetFixedLengthDuration(spent, duration)
progDur := fmt.Sprintf("%s/%s", spentDuration, duration)
right[1] = progDur

return math.Min(1, float64(spent)/float64(duration)), right
}

varr.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, varr, progressFn)

maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{
Name: varr.config.Name,
Executor: varr.config.Type,
StartTime: startTime,
ProgressFn: progressFn,
})

returnVU := func(u lib.InitializedVU) {
varr.executionState.ReturnVU(u, true)
Expand Down Expand Up @@ -391,40 +430,6 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
activateVU(initVU)
}

tickerPeriod := int64(startTickerPeriod.Duration)

vusFmt := pb.GetFixedLengthIntFormat(maxVUs)
itersFmt := pb.GetFixedLengthFloatFormat(maxArrivalRatePerSec, 0) + " iters/s"

progressFn := func() (float64, []string) {
currActiveVUs := atomic.LoadUint64(&activeVUsCount)
currentTickerPeriod := atomic.LoadInt64(&tickerPeriod)
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs",
vusPool.Running(), currActiveVUs)

itersPerSec := 0.0
if currentTickerPeriod > 0 {
itersPerSec = float64(time.Second) / float64(currentTickerPeriod)
}
progIters := fmt.Sprintf(itersFmt, itersPerSec)

right := []string{progVUs, duration.String(), progIters}

spent := time.Since(startTime)
if spent > duration {
return 1, right
}

spentDuration := pb.GetFixedLengthDuration(spent, duration)
progDur := fmt.Sprintf("%s/%s", spentDuration, duration)
right[1] = progDur

return math.Min(1, float64(spent)/float64(duration)), right
}

varr.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, varr, progressFn)

regDurationDone := regDurationCtx.Done()
timer := time.NewTimer(time.Hour)
start := time.Now()
Expand Down
6 changes: 6 additions & 0 deletions lib/executor/ramping_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,12 @@ func (vlv RampingVUs) Run(parentCtx context.Context, out chan<- stats.SampleCont
vlv.executionState.ModCurrentlyActiveVUsCount(-1)
}

maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{
Name: vlv.config.Name,
Executor: vlv.config.Type,
StartTime: startTime,
ProgressFn: progressFn,
})
vuHandles := make([]*vuHandle, maxVUs)
for i := uint64(0); i < maxVUs; i++ {
vuHandle := newStoppedVUHandle(
Expand Down
8 changes: 8 additions & 0 deletions lib/executor/shared_iterations.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,18 @@ func (si SharedIterations) Run(parentCtx context.Context, out chan<- stats.Sampl
regDurationDone := regDurationCtx.Done()
runIteration := getIterationRunner(si.executionState, si.logger)

maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{
Name: si.config.Name,
Executor: si.config.Type,
StartTime: startTime,
ProgressFn: progressFn,
})

returnVU := func(u lib.InitializedVU) {
si.executionState.ReturnVU(u, true)
activeVUs.Done()
}

handleVU := func(initVU lib.InitializedVU) {
ctx, cancel := context.WithCancel(maxDurationCtx)
defer cancel()
Expand Down
1 change: 1 addition & 0 deletions lib/executor/vu_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
Expand Down
8 changes: 8 additions & 0 deletions lib/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ type ExecutorConfig interface {
HasWork(*ExecutionTuple) bool
}

// ScenarioState holds runtime scenario information returned by the k6/execution
// JS module.
type ScenarioState struct {
Name, Executor string
StartTime time.Time
ProgressFn func() (float64, []string)
}

// InitVUFunc is just a shorthand so we don't have to type the function
// signature every time.
type InitVUFunc func(context.Context, *logrus.Entry) (InitializedVU, error)
Expand Down

0 comments on commit 2480df8

Please sign in to comment.