Skip to content

Commit

Permalink
WIP: Refactoring lib.VU and lib.Runner
Browse files Browse the repository at this point in the history
The complicated handling of context in RunOnce is not necessary. Each
of the new executors handles contexts and VUs very deliberately and
precisely. We can separate the context handling to interrupt goja
runtime and the actual running user's code.

This PR introduces two new interfaces, "InitializedVU" annd "ActiveVU".

InitializedVU is what Runner.NewVU will return, and is stored in
ExecutionState.vus buffer. Whenever a InitializedVU is pull from the
buffer, caller can call InitializedVU.Activate with VUActivationParams
argument. This will return an ActiveVU, which will actually run the
user's code.

The InitializedVU.Activate will spawn a goroutine to track the context
for interrupting script execution, and calling the callback function and
return the VU to the buffer.

Fixes #889
Fixes #1283
  • Loading branch information
cuonglm committed Jan 6, 2020
1 parent f6be35c commit f84725a
Show file tree
Hide file tree
Showing 23 changed files with 236 additions and 208 deletions.
17 changes: 7 additions & 10 deletions core/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,16 @@ func (e *ExecutionScheduler) GetExecutionPlan() []lib.ExecutionStep {
// any unplanned VUs themselves.
//TODO: actually use the context...
func (e *ExecutionScheduler) initVU(
_ context.Context, logger *logrus.Entry, engineOut chan<- stats.SampleContainer,
) (lib.VU, error) {
vu, err := e.runner.NewVU(engineOut)
if err != nil {
return nil, fmt.Errorf("error while initializing a VU: '%s'", err)
}

ctx context.Context, logger *logrus.Entry, engineOut chan<- stats.SampleContainer,
) (lib.InitializedVU, error) {
// Get the VU ID here, so that the VUs are (mostly) ordered by their
// number in the channel buffer
vuID := e.state.GetUniqueVUIdentifier()
if err := vu.Reconfigure(int64(vuID)); err != nil {
return nil, fmt.Errorf("error while reconfiguring VU #%d: '%s'", vuID, err)
vu, err := e.runner.NewVU(ctx, int64(vuID), engineOut)
if err != nil {
return nil, fmt.Errorf("error while initializing a VU: '%s'", err)
}

logger.Debugf("Initialized VU #%d", vuID)
return vu, nil
}
Expand Down Expand Up @@ -237,7 +234,7 @@ func (e *ExecutionScheduler) Init(ctx context.Context, engineOut chan<- stats.Sa
}
}

e.state.SetInitVUFunc(func(ctx context.Context, logger *logrus.Entry) (lib.VU, error) {
e.state.SetInitVUFunc(func(ctx context.Context, logger *logrus.Entry) (lib.InitializedVU, error) {
return e.initVU(ctx, logger, engineOut)
})

Expand Down
5 changes: 4 additions & 1 deletion core/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,13 +305,16 @@ func TestExecutionSchedulerEndErrors(t *testing.T) {
}
logger, hook := logtest.NewNullLogger()
ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{})
defer cancel()

endTime, isFinal := lib.GetEndOffset(execScheduler.GetExecutionPlan())
assert.Equal(t, 1*time.Second, endTime) // because of the 0s gracefulStop
assert.True(t, isFinal)

startTime := time.Now()
go func() {
time.Sleep(endTime)
cancel()
}()
assert.NoError(t, execScheduler.Run(ctx, samples))
runTime := time.Since(startTime)
assert.True(t, runTime > 1*time.Second, "test did not take 1s")
Expand Down
8 changes: 4 additions & 4 deletions js/console_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ func TestConsole(t *testing.T) {
assert.NoError(t, err)

samples := make(chan stats.SampleContainer, 100)
vu, err := r.newVU(samples)
vu, err := r.newVU(0, samples)
assert.NoError(t, err)

logger, hook := logtest.NewNullLogger()
logger.Level = logrus.DebugLevel
vu.Console.Logger = logger

err = vu.RunOnce(context.Background())
err = vu.Activate(&lib.VUActivationParams{Ctx: context.Background()}).RunOnce()
assert.NoError(t, err)

entry := hook.LastEntry()
Expand Down Expand Up @@ -215,13 +215,13 @@ func TestFileConsole(t *testing.T) {
assert.NoError(t, err)

samples := make(chan stats.SampleContainer, 100)
vu, err := r.newVU(samples)
vu, err := r.newVU(0, samples)
assert.NoError(t, err)

vu.Console.Logger.Level = logrus.DebugLevel
hook := logtest.NewLocal(vu.Console.Logger)

err = vu.RunOnce(context.Background())
err = vu.Activate(&lib.VUActivationParams{Ctx: context.Background()}).RunOnce()
assert.NoError(t, err)

// Test if the file was created.
Expand Down
4 changes: 2 additions & 2 deletions js/http_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ func BenchmarkHTTPRequests(b *testing.B) {
<-ch
}
}()
vu, err := r.NewVU(ch)
vu, err := r.NewVU(context.Background(), 0, ch)
if !assert.NoError(b, err) {
return
}
b.StartTimer()
for i := 0; i < b.N; i++ {
err = vu.RunOnce(context.Background())
err = vu.Activate(&lib.VUActivationParams{Ctx: context.Background()}).RunOnce()
assert.NoError(b, err)
}
}
36 changes: 18 additions & 18 deletions js/module_loading_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ func TestLoadOnceGlobalVars(t *testing.T) {
t.Run(name, func(t *testing.T) {
ch := newDevNullSampleChannel()
defer close(ch)
vu, err := r.NewVU(ch)
vu, err := r.NewVU(context.Background(), 0, ch)
require.NoError(t, err)
err = vu.RunOnce(context.Background())
err = vu.Activate(&lib.VUActivationParams{Ctx: context.Background()}).RunOnce()
require.NoError(t, err)
})
}
Expand Down Expand Up @@ -156,9 +156,9 @@ func TestLoadExportsIsUsableInModule(t *testing.T) {
t.Run(name, func(t *testing.T) {
ch := newDevNullSampleChannel()
defer close(ch)
vu, err := r.NewVU(ch)
vu, err := r.NewVU(context.Background(), 0, ch)
require.NoError(t, err)
err = vu.RunOnce(context.Background())
err = vu.Activate(&lib.VUActivationParams{Ctx: context.Background()}).RunOnce()
require.NoError(t, err)
})
}
Expand Down Expand Up @@ -200,9 +200,9 @@ func TestLoadDoesntBreakHTTPGet(t *testing.T) {
t.Run(name, func(t *testing.T) {
ch := newDevNullSampleChannel()
defer close(ch)
vu, err := r.NewVU(ch)
vu, err := r.NewVU(context.Background(), 0, ch)
require.NoError(t, err)
err = vu.RunOnce(context.Background())
err = vu.Activate(&lib.VUActivationParams{Ctx: context.Background()}).RunOnce()
require.NoError(t, err)
})
}
Expand Down Expand Up @@ -241,15 +241,15 @@ func TestLoadGlobalVarsAreNotSharedBetweenVUs(t *testing.T) {
t.Run(name, func(t *testing.T) {
ch := newDevNullSampleChannel()
defer close(ch)
vu, err := r.NewVU(ch)
vu, err := r.NewVU(context.Background(), 0, ch)
require.NoError(t, err)
err = vu.RunOnce(context.Background())
err = vu.Activate(&lib.VUActivationParams{Ctx: context.Background()}).RunOnce()
require.NoError(t, err)

// run a second VU
vu, err = r.NewVU(ch)
vu, err = r.NewVU(context.Background(), 0, ch)
require.NoError(t, err)
err = vu.RunOnce(context.Background())
err = vu.Activate(&lib.VUActivationParams{Ctx: context.Background()}).RunOnce()
require.NoError(t, err)
})
}
Expand Down Expand Up @@ -298,9 +298,9 @@ func TestLoadCycle(t *testing.T) {
t.Run(name, func(t *testing.T) {
ch := newDevNullSampleChannel()
defer close(ch)
vu, err := r.NewVU(ch)
vu, err := r.NewVU(context.Background(), 0, ch)
require.NoError(t, err)
err = vu.RunOnce(context.Background())
err = vu.Activate(&lib.VUActivationParams{Ctx: context.Background()}).RunOnce()
require.NoError(t, err)
})
}
Expand Down Expand Up @@ -357,9 +357,9 @@ func TestLoadCycleBinding(t *testing.T) {
t.Run(name, func(t *testing.T) {
ch := newDevNullSampleChannel()
defer close(ch)
vu, err := r.NewVU(ch)
vu, err := r.NewVU(context.Background(), 0, ch)
require.NoError(t, err)
err = vu.RunOnce(context.Background())
err = vu.Activate(&lib.VUActivationParams{Ctx: context.Background()}).RunOnce()
require.NoError(t, err)
})
}
Expand Down Expand Up @@ -418,9 +418,9 @@ func TestBrowserified(t *testing.T) {
t.Run(name, func(t *testing.T) {
ch := make(chan stats.SampleContainer, 100)
defer close(ch)
vu, err := r.NewVU(ch)
vu, err := r.NewVU(context.Background(), 0, ch)
require.NoError(t, err)
err = vu.RunOnce(context.Background())
err = vu.Activate(&lib.VUActivationParams{Ctx: context.Background()}).RunOnce()
require.NoError(t, err)
})
}
Expand Down Expand Up @@ -456,9 +456,9 @@ func TestLoadingUnexistingModuleDoesntPanic(t *testing.T) {
t.Run(name, func(t *testing.T) {
ch := newDevNullSampleChannel()
defer close(ch)
vu, err := r.NewVU(ch)
vu, err := r.NewVU(context.Background(), 0, ch)
require.NoError(t, err)
err = vu.RunOnce(context.Background())
err = vu.Activate(&lib.VUActivationParams{Ctx: context.Background()}).RunOnce()
require.NoError(t, err)
})
}
Expand Down
4 changes: 2 additions & 2 deletions js/modules/k6/marshalling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ func TestSetupDataMarshalling(t *testing.T) {
if !assert.NoError(t, runner.Setup(context.Background(), samples)) {
return
}
vu, err := runner.NewVU(samples)
vu, err := runner.NewVU(context.Background(), 0, samples)
if assert.NoError(t, err) {
err := vu.RunOnce(context.Background())
err := vu.Activate(&lib.VUActivationParams{Ctx: context.Background()}).RunOnce()
assert.NoError(t, err)
}
}
79 changes: 30 additions & 49 deletions js/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ func (r *Runner) MakeArchive() *lib.Archive {
return r.Bundle.makeArchive()
}

func (r *Runner) NewVU(samplesOut chan<- stats.SampleContainer) (lib.VU, error) {
vu, err := r.newVU(samplesOut)
func (r *Runner) NewVU(ctx context.Context, id int64, samplesOut chan<- stats.SampleContainer) (lib.InitializedVU, error) {
vu, err := r.newVU(id, samplesOut)
if err != nil {
return nil, err
}
return lib.VU(vu), nil
return lib.InitializedVU(vu), nil
}

func (r *Runner) newVU(samplesOut chan<- stats.SampleContainer) (*VU, error) {
func (r *Runner) newVU(id int64, samplesOut chan<- stats.SampleContainer) (*VU, error) {
// Instantiate a new bundle, make a VU out of it.
bi, err := r.Bundle.Instantiate()
if err != nil {
Expand Down Expand Up @@ -185,6 +185,8 @@ func (r *Runner) newVU(samplesOut chan<- stats.SampleContainer) (*VU, error) {
}

vu := &VU{
ID: id,
Iteration: 0,
BundleInstance: *bi,
Runner: r,
Transport: transport,
Expand All @@ -203,10 +205,7 @@ func (r *Runner) newVU(samplesOut chan<- stats.SampleContainer) (*VU, error) {
},
})

// Give the VU an initial sense of identity.
if err := vu.Reconfigure(0); err != nil {
return nil, err
}
vu.Runtime.Set("__VU", vu.ID)

return vu, nil
}
Expand Down Expand Up @@ -261,6 +260,7 @@ func (r *Runner) Teardown(ctx context.Context, out chan<- stats.SampleContainer)
} else {
data = goja.Undefined()
}

_, err := r.runPart(teardownCtx, out, stageTeardown, data)
return err
}
Expand Down Expand Up @@ -298,7 +298,7 @@ func (r *Runner) SetOptions(opts lib.Options) error {
// Runs an exported function in its own temporary VU, optionally with an argument. Execution is
// interrupted if the context expires. No error is returned if the part does not exist.
func (r *Runner) runPart(ctx context.Context, out chan<- stats.SampleContainer, name string, arg interface{}) (goja.Value, error) {
vu, err := r.newVU(out)
vu, err := r.newVU(0, out)
if err != nil {
return goja.Undefined(), err
}
Expand All @@ -323,7 +323,7 @@ func (r *Runner) runPart(ctx context.Context, out chan<- stats.SampleContainer,
return goja.Undefined(), err
}

v, _, _, err := vu.runFn(ctx, group, false, fn, vu.Runtime.ToValue(arg))
v, _, _, err := vu.runFn(group, false, fn, vu.Runtime.ToValue(arg))

// deadline is reached so we have timeouted but this might've not been registered correctly
if deadline, ok := ctx.Deadline(); ok && time.Now().After(deadline) {
Expand Down Expand Up @@ -368,49 +368,32 @@ type VU struct {

setupData goja.Value

// A VU will track the last context it was called with for cancellation.
// Note that interruptTrackedCtx is the context that is currently being tracked, while
// interruptCancel cancels an unrelated context that terminates the tracking goroutine
// without triggering an interrupt (for if the context changes).
// There are cleaner ways of handling the interruption problem, but this is a hot path that
// needs to be called thousands of times per second, which rules out anything that spawns a
// goroutine per call.
interruptTrackedCtx context.Context
interruptCancel context.CancelFunc

m *sync.Mutex
}

// Verify that VU implements lib.VU
var _ lib.VU = &VU{}
var _ lib.InitializedVU = &VU{}
var _ lib.ActiveVU = &VU{}

func (u *VU) Reconfigure(id int64) error {
u.ID = id
func (u *VU) Activate(params *lib.VUActivationParams) lib.ActiveVU {
u.Iteration = 0
u.Runtime.Set("__VU", u.ID)
return nil
if params != nil {
if params.Ctx != nil {
go func() {
<-params.Ctx.Done()
u.Runtime.Interrupt(errInterrupt)
if params.DeactivateCallback != nil {
params.DeactivateCallback()
}
}()
}
}
return u
}

func (u *VU) RunOnce(ctx context.Context) error {
func (u *VU) RunOnce() error {
u.m.Lock()
defer u.m.Unlock()
// Track the context and interrupt JS execution if it's cancelled.
if u.interruptTrackedCtx != ctx {
interCtx, interCancel := context.WithCancel(context.Background())
if u.interruptCancel != nil {
u.interruptCancel()
}
u.interruptCancel = interCancel
u.interruptTrackedCtx = ctx
defer interCancel()
go func() {
select {
case <-interCtx.Done():
case <-ctx.Done():
u.Runtime.Interrupt(errInterrupt)
}
}()
}

// Unmarshall the setupData only the first time for each VU so that VUs are isolated but we
// still don't use too much CPU in the middle test
Expand All @@ -427,7 +410,7 @@ func (u *VU) RunOnce(ctx context.Context) error {
}

// Call the default function.
_, isFullIteration, totalTime, err := u.runFn(ctx, u.Runner.defaultGroup, true, u.Default, u.setupData)
_, isFullIteration, totalTime, err := u.runFn(u.Runner.defaultGroup, true, u.Default, u.setupData)

// If MinIterationDuration is specified and the iteration wasn't cancelled
// and was less than it, sleep for the remainder
Expand All @@ -441,9 +424,7 @@ func (u *VU) RunOnce(ctx context.Context) error {
return err
}

func (u *VU) runFn(
ctx context.Context, group *lib.Group, isDefault bool, fn goja.Callable, args ...goja.Value,
) (goja.Value, bool, time.Duration, error) {
func (u *VU) runFn(group *lib.Group, isDefault bool, fn goja.Callable, args ...goja.Value) (goja.Value, bool, time.Duration, error) {
cookieJar := u.CookieJar
if !u.Runner.Bundle.Options.NoCookiesReset.ValueOrZero() {
var err error
Expand All @@ -468,7 +449,7 @@ func (u *VU) runFn(
Iteration: u.Iteration,
}

newctx := common.WithRuntime(ctx, u.Runtime)
newctx := common.WithRuntime(context.Background(), u.Runtime)
newctx = lib.WithState(newctx, state)
*u.Context = newctx

Expand All @@ -482,7 +463,7 @@ func (u *VU) runFn(

var isFullIteration bool
select {
case <-ctx.Done():
case <-newctx.Done():
isFullIteration = false
default:
isFullIteration = true
Expand Down
Loading

0 comments on commit f84725a

Please sign in to comment.