Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event loop testing #2399

Merged
merged 4 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package local
package tests

import (
"context"
Expand All @@ -9,12 +9,19 @@ import (

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.k6.io/k6/cmd/integration_tests/testmodules/events"
"go.k6.io/k6/core/local"
"go.k6.io/k6/js"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/executor"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/loader"
"go.k6.io/k6/stats"
"gopkg.in/guregu/null.v3"
)

func eventLoopTest(t *testing.T, script []byte, testHandle func(context.Context, lib.Runner, error, *testutils.SimpleLogrusHook)) {
Expand All @@ -23,7 +30,7 @@ func eventLoopTest(t *testing.T, script []byte, testHandle func(context.Context,
logHook := &testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.InfoLevel, logrus.WarnLevel, logrus.ErrorLevel}}
logger.AddHook(logHook)

script = []byte(`import {setTimeout} from "k6/experimental";
script = []byte(`import {setTimeout} from "k6/x/events";
` + string(script))
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
na-- marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -58,6 +65,10 @@ func eventLoopTest(t *testing.T, script []byte, testHandle func(context.Context,
}
}

func init() {
modules.Register("k6/x/events", events.New())
}

func TestEventLoop(t *testing.T) {
t.Parallel()
script := []byte(`
Expand Down Expand Up @@ -147,7 +158,10 @@ export default function() {
for i, entry := range entries {
msgs[i] = entry.Message
}
require.Equal(t, []string{"second"}, msgs)
require.Equal(t, []string{
"setTimeout 1 was stopped because the VU iteration was interrupted",
"second",
}, msgs)
})
}

Expand Down Expand Up @@ -178,6 +192,48 @@ export default function() {
for i, entry := range entries {
msgs[i] = entry.Message
}
require.Equal(t, []string{"just error\n\tat /script.js:13:4(15)\n\tat native\n", "1"}, msgs)
require.Equal(t, []string{
"setTimeout 1 was stopped because the VU iteration was interrupted",
"just error\n\tat /script.js:13:4(15)\n\tat native\n", "1",
}, msgs)
})
}

func newTestExecutionScheduler(
t *testing.T, runner lib.Runner, logger *logrus.Logger, opts lib.Options,
) (ctx context.Context, cancel func(), execScheduler *local.ExecutionScheduler, samples chan stats.SampleContainer) {
if runner == nil {
runner = &minirunner.MiniRunner{}
}
ctx, cancel = context.WithCancel(context.Background())
newOpts, err := executor.DeriveScenariosFromShortcuts(lib.Options{
MetricSamplesBufferSize: null.NewInt(200, false),
}.Apply(runner.GetOptions()).Apply(opts), nil)
require.NoError(t, err)
require.Empty(t, newOpts.Validate())

require.NoError(t, runner.SetOptions(newOpts))

if logger == nil {
logger = logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
}

execScheduler, err = local.NewExecutionScheduler(runner, logger)
require.NoError(t, err)

samples = make(chan stats.SampleContainer, newOpts.MetricSamplesBufferSize.Int64)
go func() {
for {
select {
case <-samples:
case <-ctx.Done():
return
}
}
}()

require.NoError(t, execScheduler.Init(ctx, samples))

return ctx, cancel, execScheduler, samples
}
157 changes: 157 additions & 0 deletions cmd/integration_tests/testmodules/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Package events implements setInterval, setTimeout and co. Not to be used, mostly for testing purposes
package events

import (
"sync"
"sync/atomic"
"time"

"github.com/dop251/goja"
"go.k6.io/k6/js/modules"
)

// RootModule is the global module instance that will create module
// instances for each VU.
type RootModule struct{}

// Events represents an instance of the events module.
type Events struct {
vu modules.VU

timerStopCounter uint32
timerStopsLock sync.Mutex
timerStops map[uint32]chan struct{}
}

var (
_ modules.Module = &RootModule{}
_ modules.Instance = &Events{}
)

// New returns a pointer to a new RootModule instance.
func New() *RootModule {
return &RootModule{}
}

// NewModuleInstance implements the modules.Module interface to return
// a new instance for each VU.
func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
return &Events{
vu: vu,
timerStops: make(map[uint32]chan struct{}),
}
}

// Exports returns the exports of the k6 module.
func (e *Events) Exports() modules.Exports {
return modules.Exports{
Named: map[string]interface{}{
"setTimeout": e.setTimeout,
"clearTimeout": e.clearTimeout,
"setInterval": e.setInterval,
"clearInterval": e.clearInterval,
},
}
}

func noop() error { return nil }

func (e *Events) getTimerStopCh() (uint32, chan struct{}) {
id := atomic.AddUint32(&e.timerStopCounter, 1)
ch := make(chan struct{})
e.timerStopsLock.Lock()
e.timerStops[id] = ch
e.timerStopsLock.Unlock()
return id, ch
}

func (e *Events) stopTimerCh(id uint32) bool { //nolint:unparam
e.timerStopsLock.Lock()
defer e.timerStopsLock.Unlock()
ch, ok := e.timerStops[id]
if !ok {
return false
}
delete(e.timerStops, id)
close(ch)
return true
}

func (e *Events) call(callback goja.Callable, args []goja.Value) error {
// TODO: investigate, not sure GlobalObject() is always the correct value for `this`?
_, err := callback(e.vu.Runtime().GlobalObject(), args...)
return err
}

func (e *Events) setTimeout(callback goja.Callable, delay float64, args ...goja.Value) uint32 {
runOnLoop := e.vu.RegisterCallback()
id, stopCh := e.getTimerStopCh()

if delay < 0 {
delay = 0
}

go func() {
timer := time.NewTimer(time.Duration(delay * float64(time.Millisecond)))
defer func() {
e.stopTimerCh(id)
if !timer.Stop() {
<-timer.C
}
}()

select {
case <-timer.C:
runOnLoop(func() error {
return e.call(callback, args)
})
case <-stopCh:
runOnLoop(noop)
case <-e.vu.Context().Done():
e.vu.State().Logger.Warnf("setTimeout %d was stopped because the VU iteration was interrupted", id)
runOnLoop(noop)
}
}()

return id
}

func (e *Events) clearTimeout(id uint32) {
e.stopTimerCh(id)
}

func (e *Events) setInterval(callback goja.Callable, delay float64, args ...goja.Value) uint32 {
runOnLoop := e.vu.RegisterCallback()
id, stopCh := e.getTimerStopCh()

go func() {
ticker := time.NewTicker(time.Duration(delay * float64(time.Millisecond)))
defer func() {
e.stopTimerCh(id)
ticker.Stop()
}()

for {
select {
case <-ticker.C:
runOnLoop(func() error {
runOnLoop = e.vu.RegisterCallback()
return e.call(callback, args)
})
case <-stopCh:
runOnLoop(noop)
return
case <-e.vu.Context().Done():
e.vu.State().Logger.Warnf("setInterval %d was stopped because the VU iteration was interrupted", id)
runOnLoop(noop)
return
}
}
}()

return id
}

func (e *Events) clearInterval(id uint32) {
e.stopTimerCh(id)
}
49 changes: 49 additions & 0 deletions cmd/integration_tests/testmodules/events/events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import exec from 'k6/execution';
import { setTimeout, clearTimeout, setInterval, clearInterval } from 'k6/events'


export let options = {
scenarios: {
'foo': {
executor: 'constant-vus',
vus: 1,
duration: '3.8s',
gracefulStop: '0s',
}
}
};

function debug(arg) {
let t = String((new Date()) - exec.scenario.startTime).padStart(6, ' ')
console.log(`[${t}ms, iter=${exec.scenario.iterationInTest}] ${arg}`);
}

export default function () {
debug('default start');

let tickCount = 1;
let f0 = (arg) => {
debug(`${arg} ${tickCount++}`);
}
let t0 = setInterval(f0, 500, 'tick')

let f1 = (arg) => {
debug(arg);
clearInterval(t0);
}
let t1 = setTimeout(f1, 2000, 'third');

let t2 = setTimeout(debug, 1500, 'never happening');

let f3 = (arg) => {
debug(arg);
clearTimeout(t2);
setTimeout(debug, 600, 'second');
}
let t3 = setTimeout(f3, 1000, 'first');

debug('default end');
if (exec.scenario.iterationInTest == 1) {
debug(`expected last iter, the interval ID is ${t0}, we also expect timer ${t1} to be interrupted`)
}
}
43 changes: 43 additions & 0 deletions cmd/integration_tests/testmodules/events/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package events

import (
"context"
"testing"

"github.com/dop251/goja"
"github.com/stretchr/testify/require"
"go.k6.io/k6/js/common"
"go.k6.io/k6/js/eventloop"
"go.k6.io/k6/js/modulestest"
)

func TestSetTimeout(t *testing.T) {
t.Parallel()
rt := goja.New()
vu := &modulestest.VU{
RuntimeField: rt,
InitEnvField: &common.InitEnvironment{},
CtxField: context.Background(),
StateField: nil,
}

m, ok := New().NewModuleInstance(vu).(*Events)
require.True(t, ok)
var log []string
require.NoError(t, rt.Set("events", m.Exports().Named))
require.NoError(t, rt.Set("print", func(s string) { log = append(log, s) }))
loop := eventloop.New(vu)
vu.RegisterCallbackField = loop.RegisterCallback

err := loop.Start(func() error {
_, err := vu.Runtime().RunString(`
events.setTimeout(()=> {
print("in setTimeout")
})
print("outside setTimeout")
`)
return err
})
require.NoError(t, err)
require.Equal(t, []string{"outside setTimeout", "in setTimeout"}, log)
}
5 changes: 3 additions & 2 deletions js/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"go.k6.io/k6/js/common"
"go.k6.io/k6/js/compiler"
"go.k6.io/k6/js/eventloop"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/consts"
"go.k6.io/k6/lib/metrics"
Expand Down Expand Up @@ -331,8 +332,8 @@ func (b *Bundle) instantiate(logger logrus.FieldLogger, rt *goja.Runtime, init *
init.moduleVUImpl.initEnv = initenv
init.moduleVUImpl.ctx = context.Background()
unbindInit := b.setInitGlobals(rt, init)
init.moduleVUImpl.eventLoop = newEventLoop(init.moduleVUImpl)
err := init.moduleVUImpl.eventLoop.start(func() error {
init.moduleVUImpl.eventLoop = eventloop.New(init.moduleVUImpl)
err := init.moduleVUImpl.eventLoop.Start(func() error {
_, err := rt.RunProgram(b.Program)
return err
})
Expand Down
Loading