Skip to content

Commit

Permalink
Add basic event loop with an API to be used by modules
Browse files Browse the repository at this point in the history
As well as cut down setTimeout implementation.

A recent update to goja introduced support for ECMAScript Promise.

The catch here is that Promise's then will only be called when goja
exits executing js code and it has already been resolved.
Also resolving and rejecting Promises needs to happen while no other
js code is being executed as it will otherwise lead to a data race.

This more or less necessitates adding an event loop. Additionally
because a call to a k6 modules such as `k6/http` might make a promise to
signal when an http request is made, but if (no changes were made) the
iteration then finishes before the request completes, nothing would've
stopped the start of a *new* iteration. That new iteration would then
probably just again ask k6/http to make a new request with a Promise ...

This might be a desirable behaviour for some cases but arguably will be
very confusing so this commit also adds a way to RegisterCallback that
will return a function to actually queue the callback on the event loop,
but prevent the event loop from ending before the callback is queued and
possible executed, once RegisterCallback is called.

Additionally to that, some additional code was needed so there is an
event loop for all special functions calls (setup, teardown,
handleSummary, default) and the init context.

This also adds handling of rejected promise which don't have a reject
handler similar to what deno does.

It also adds a per iteration context that gets canceled on the end of
each iteration letting other code know that it needs to stop. This is
particularly needed here as if an iteration gets aborted by a syntax
error (or unhandled promise rejection), a new iteration will start right
after that. But this means that any in-flight asynchronous operation (an
http requests for example) will *not* get stopped. With a context that
gets canceled every time module code can notice that and abort any
operation. For this same reason the event loop needs wait to be
*empty* before the iteration ends.

This did lead to some ... not very nice code, but a whole package needs
a big refactor which will likely happen once common.Bind and co gets
removed.

And finally, a basic setTimeout implementation was added.
There is no way to currently cancel the setTimeout - no clearTimeout.

This likely needs to be extended but this can definitely wait. Or we
might decide to actually drop setTimeout altogether as it isn't
particularly useful currently without any async APIs, it just makes
testing the event loop functionality possible.

fixes #882
  • Loading branch information
mstoykov committed Feb 22, 2022
1 parent 9c58b15 commit b000817
Show file tree
Hide file tree
Showing 9 changed files with 562 additions and 26 deletions.
181 changes: 181 additions & 0 deletions core/local/eventloop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package local

import (
"context"
"io/ioutil"
"net/url"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/loader"
)

func eventLoopTest(t *testing.T, script []byte, testHandle func(context.Context, lib.Runner, error, *testutils.SimpleLogrusHook)) {
logger := logrus.New()
logger.SetOutput(ioutil.Discard)
logHook := &testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.InfoLevel, logrus.WarnLevel, logrus.ErrorLevel}}
logger.AddHook(logHook)

registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
runner, err := js.New(
logger,
&loader.SourceData{
URL: &url.URL{Path: "/script.js"},
Data: script,
},
nil,
lib.RuntimeOptions{},
builtinMetrics,
registry,
)
require.NoError(t, err)

ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger,
lib.Options{
TeardownTimeout: types.NullDurationFrom(time.Second),
SetupTimeout: types.NullDurationFrom(time.Second),
})
defer cancel()

errCh := make(chan error, 1)
go func() { errCh <- execScheduler.Run(ctx, ctx, samples, builtinMetrics) }()

select {
case err := <-errCh:
testHandle(ctx, runner, err, logHook)
case <-time.After(10 * time.Second):
t.Fatal("timed out")
}
}

func TestEventLoop(t *testing.T) {
t.Parallel()
script := []byte(`
setTimeout(()=> {console.log("initcontext setTimeout")}, 200)
console.log("initcontext");
export default function() {
setTimeout(()=> {console.log("default setTimeout")}, 200)
console.log("default");
};
export function setup() {
setTimeout(()=> {console.log("setup setTimeout")}, 200)
console.log("setup");
};
export function teardown() {
setTimeout(()=> {console.log("teardown setTimeout")}, 200)
console.log("teardown");
};
export function handleSummary() {
setTimeout(()=> {console.log("handleSummary setTimeout")}, 200)
console.log("handleSummary");
};
`)
eventLoopTest(t, script, func(ctx context.Context, runner lib.Runner, err error, logHook *testutils.SimpleLogrusHook) {
require.NoError(t, err)
_, err = runner.HandleSummary(ctx, &lib.Summary{RootGroup: &lib.Group{}})
require.NoError(t, err)
entries := logHook.Drain()
msgs := make([]string, len(entries))
for i, entry := range entries {
msgs[i] = entry.Message
}
require.Equal(t, []string{
"initcontext", // first initialization
"initcontext setTimeout",
"initcontext", // for vu
"initcontext setTimeout",
"initcontext", // for setup
"initcontext setTimeout",
"setup", // setup
"setup setTimeout",
"default", // one iteration
"default setTimeout",
"initcontext", // for teardown
"initcontext setTimeout",
"teardown", // teardown
"teardown setTimeout",
"initcontext", // for handleSummary
"initcontext setTimeout",
"handleSummary", // handleSummary
"handleSummary setTimeout",
}, msgs)
})
}

func TestEventLoopCrossScenario(t *testing.T) {
t.Parallel()
script := []byte(`
import exec from "k6/execution"
export const options = {
scenarios: {
"first":{
executor: "shared-iterations",
maxDuration: "1s",
iterations: 1,
vus: 1,
gracefulStop:"1s",
},
"second": {
executor: "shared-iterations",
maxDuration: "1s",
iterations: 1,
vus: 1,
startTime: "3s",
}
}
}
export default function() {
let i = exec.scenario.name
setTimeout(()=> {console.log(i)}, 3000)
}
`)

eventLoopTest(t, script, func(_ context.Context, _ lib.Runner, err error, logHook *testutils.SimpleLogrusHook) {
require.NoError(t, err)
entries := logHook.Drain()
msgs := make([]string, len(entries))
for i, entry := range entries {
msgs[i] = entry.Message
}
require.Equal(t, []string{"second"}, msgs)
})
}

func TestEventLoopDoesntCrossIterations(t *testing.T) {
t.Parallel()
script := []byte(`
import { sleep } from "k6"
export const options = {
iterations: 2,
vus: 1,
}
export default function() {
let i = __ITER;
setTimeout(()=> { console.log(i) }, 1000)
if (__ITER == 0) {
throw "just error"
} else {
sleep(1)
}
}
`)

eventLoopTest(t, script, func(_ context.Context, _ lib.Runner, err error, logHook *testutils.SimpleLogrusHook) {
require.NoError(t, err)
entries := logHook.Drain()
msgs := make([]string, len(entries))
for i, entry := range entries {
msgs[i] = entry.Message
}
require.Equal(t, []string{"just error\n\tat /script.js:12:4(13)\n\tat native\n", "1"}, msgs)
})
}
7 changes: 6 additions & 1 deletion js/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,12 @@ func (b *Bundle) instantiate(logger logrus.FieldLogger, rt *goja.Runtime, init *
"require": init.Require,
"open": init.Open,
})
if _, err := rt.RunProgram(b.Program); err != nil {
init.moduleVUImpl.eventLoop = newEventLoop(init.moduleVUImpl)
err := init.moduleVUImpl.eventLoop.start(func() error {
_, err := rt.RunProgram(b.Program)
return err
})
if err != nil {
var exception *goja.Exception
if errors.As(err, &exception) {
err = &scriptException{inner: exception}
Expand Down
5 changes: 5 additions & 0 deletions js/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/*
Package js is the JavaScript implementation of the lib.Runner and relative concepts for
executing concurrent-safe JavaScript code.
*/
package js
167 changes: 167 additions & 0 deletions js/eventloop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package js

import (
"fmt"
"sync"
"time"

"github.com/dop251/goja"
"go.k6.io/k6/js/modules"
)

// eventLoop implements an event loop with a cut-down setTimeout and
// handling of unhandled rejected promises.
//
// A specific thing about this event loop is that it will wait to return
// not only until the queue is empty but until nothing is registered that it will run in the future.
// This is in contrast with more common behaviours where it only returns on
// a specific event/action or when the loop is empty.
// This is required as in k6 iterations (for which event loop will be primary used)
// are supposed to be independent and any work started in them needs to finish,
// but also they need to end when all the instructions are done.
// Additionally because of this on any error while the event loop will exit it's
// required to wait on the event loop to be empty before the execution can continue.
type eventLoop struct {
lock sync.Mutex
queue []func() error
wakeupCh chan struct{} // TODO: maybe use sync.Cond ?
registeredCallbacks int
vu modules.VU

// pendingPromiseRejections are rejected promises with no handler,
// if there is something in this map at an end of an event loop then it will exit with an error.
// It's similar to what Deno and Node do.
pendingPromiseRejections map[*goja.Promise]struct{}
}

// newEventLoop returns a new event loop with a few helpers attached to it:
// - adding setTimeout javascript implementation
// - reporting (and aborting on) unhandled promise rejections
func newEventLoop(vu modules.VU) *eventLoop {
e := &eventLoop{
wakeupCh: make(chan struct{}, 1),
pendingPromiseRejections: make(map[*goja.Promise]struct{}),
vu: vu,
}
vu.Runtime().SetPromiseRejectionTracker(e.promiseRejectionTracker)
e.addSetTimeout()

return e
}

func (e *eventLoop) wakeup() {
select {
case e.wakeupCh <- struct{}{}:
default:
}
}

// registerCallback register that a callback will be invoked on the loop, preventing it from returning/finishing.
// The returned function, upon invocation, will queue its argument and wakeup the loop if needed.
// If the eventLoop has since stopped, it will not be executed.
// This function *must* be called from within running on the event loop, but its result can be called from anywhere.
func (e *eventLoop) registerCallback() func(func() error) {
e.lock.Lock()
e.registeredCallbacks++
e.lock.Unlock()

return func(f func() error) {
e.lock.Lock()
e.queue = append(e.queue, f)
e.registeredCallbacks--
e.lock.Unlock()
e.wakeup()
}
}

func (e *eventLoop) promiseRejectionTracker(p *goja.Promise, op goja.PromiseRejectionOperation) {
// No locking necessary here as the goja runtime will call this synchronously
// Read Notes on https://tc39.es/ecma262/#sec-host-promise-rejection-tracker
if op == goja.PromiseRejectionReject {
e.pendingPromiseRejections[p] = struct{}{}
} else { // goja.PromiseRejectionHandle so a promise that was previously rejected without handler now got one
delete(e.pendingPromiseRejections, p)
}
}

func (e *eventLoop) popAll() (queue []func() error, awaiting bool) {
e.lock.Lock()
queue = e.queue
e.queue = make([]func() error, 0, len(queue))
awaiting = e.registeredCallbacks != 0
e.lock.Unlock()
return
}

// start will run the event loop until it's empty and there are no uninvoked registered callbacks
// or a queued function returns an error. The provided firstCallback will be the first thing executed.
// After start returns the event loop can be reused as long as waitOnRegistered is called.
func (e *eventLoop) start(firstCallback func() error) error {
e.queue = []func() error{firstCallback}
for {
queue, awaiting := e.popAll()

if len(queue) == 0 {
if !awaiting {
return nil
}
<-e.wakeupCh
continue
}

for _, f := range queue {
if err := f(); err != nil {
return err
}
}

// This will get a random unhandled rejection instead of the first one, for example.
// But that seems to be the case in other tools as well so it seems to not be that big of a problem.
for promise := range e.pendingPromiseRejections {
// TODO maybe throw the whole promise up and get make a better message outside of the event loop
value := promise.Result()
if o := value.ToObject(e.vu.Runtime()); o != nil {
stack := o.Get("stack")
if stack != nil {
value = stack
}
}
// this is the de facto wording in both firefox and deno at least
return fmt.Errorf("Uncaught (in promise) %s", value) //nolint:stylecheck
}
}
}

// Wait on all registered callbacks so we know nothing is still doing work.
func (e *eventLoop) waitOnRegistered() {
for {
_, awaiting := e.popAll()
if !awaiting {
return
}
<-e.wakeupCh
}
}

func (e *eventLoop) addSetTimeout() {
_ = e.vu.Runtime().Set("setTimeout", func(f goja.Callable, t float64) {
// TODO maybe really return something to use with `clearTimeout
// TODO support arguments ... maybe
runOnLoop := e.registerCallback()
go func() {
timer := time.NewTimer(time.Duration(t * float64(time.Millisecond)))
select {
case <-timer.C:
runOnLoop(func() error {
_, err := f(goja.Undefined())
return err
})
case <-e.vu.Context().Done():
// TODO log something?

timer.Stop()
runOnLoop(func() error { return nil })
}
}()
})
}
Loading

0 comments on commit b000817

Please sign in to comment.