Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic event loop with some API to be used by modules #2228

Merged
merged 7 commits into from
Mar 2, 2022
183 changes: 183 additions & 0 deletions core/local/eventloop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package local

import (
"context"
"io/ioutil"
"net/url"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/loader"
)

func eventLoopTest(t *testing.T, script []byte, testHandle func(context.Context, lib.Runner, error, *testutils.SimpleLogrusHook)) {
logger := logrus.New()
logger.SetOutput(ioutil.Discard)
logHook := &testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.InfoLevel, logrus.WarnLevel, logrus.ErrorLevel}}
logger.AddHook(logHook)

script = []byte(`import {setTimeout} from "k6/experimental";
` + string(script))
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
runner, err := js.New(
logger,
&loader.SourceData{
URL: &url.URL{Path: "/script.js"},
Data: script,
},
nil,
lib.RuntimeOptions{},
builtinMetrics,
registry,
)
require.NoError(t, err)

ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger,
lib.Options{
TeardownTimeout: types.NullDurationFrom(time.Second),
SetupTimeout: types.NullDurationFrom(time.Second),
})
defer cancel()

errCh := make(chan error, 1)
go func() { errCh <- execScheduler.Run(ctx, ctx, samples, builtinMetrics) }()

select {
case err := <-errCh:
testHandle(ctx, runner, err, logHook)
case <-time.After(10 * time.Second):
t.Fatal("timed out")
}
}

func TestEventLoop(t *testing.T) {
t.Parallel()
script := []byte(`
setTimeout(()=> {console.log("initcontext setTimeout")}, 200)
console.log("initcontext");
export default function() {
setTimeout(()=> {console.log("default setTimeout")}, 200)
console.log("default");
};
export function setup() {
setTimeout(()=> {console.log("setup setTimeout")}, 200)
console.log("setup");
};
export function teardown() {
setTimeout(()=> {console.log("teardown setTimeout")}, 200)
console.log("teardown");
};
export function handleSummary() {
setTimeout(()=> {console.log("handleSummary setTimeout")}, 200)
console.log("handleSummary");
};
`)
eventLoopTest(t, script, func(ctx context.Context, runner lib.Runner, err error, logHook *testutils.SimpleLogrusHook) {
require.NoError(t, err)
_, err = runner.HandleSummary(ctx, &lib.Summary{RootGroup: &lib.Group{}})
require.NoError(t, err)
entries := logHook.Drain()
msgs := make([]string, len(entries))
for i, entry := range entries {
msgs[i] = entry.Message
}
require.Equal(t, []string{
"initcontext", // first initialization
"initcontext setTimeout",
"initcontext", // for vu
"initcontext setTimeout",
"initcontext", // for setup
"initcontext setTimeout",
"setup", // setup
"setup setTimeout",
"default", // one iteration
"default setTimeout",
"initcontext", // for teardown
"initcontext setTimeout",
"teardown", // teardown
"teardown setTimeout",
"initcontext", // for handleSummary
"initcontext setTimeout",
"handleSummary", // handleSummary
"handleSummary setTimeout",
}, msgs)
})
}

func TestEventLoopCrossScenario(t *testing.T) {
t.Parallel()
script := []byte(`
import exec from "k6/execution"
export const options = {
scenarios: {
"first":{
executor: "shared-iterations",
maxDuration: "1s",
iterations: 1,
vus: 1,
gracefulStop:"1s",
},
"second": {
executor: "shared-iterations",
maxDuration: "1s",
iterations: 1,
vus: 1,
startTime: "3s",
}
}
}
export default function() {
let i = exec.scenario.name
setTimeout(()=> {console.log(i)}, 3000)
}
`)

eventLoopTest(t, script, func(_ context.Context, _ lib.Runner, err error, logHook *testutils.SimpleLogrusHook) {
require.NoError(t, err)
entries := logHook.Drain()
msgs := make([]string, len(entries))
for i, entry := range entries {
msgs[i] = entry.Message
}
require.Equal(t, []string{"second"}, msgs)
})
}

func TestEventLoopDoesntCrossIterations(t *testing.T) {
t.Parallel()
script := []byte(`
import { sleep } from "k6"
export const options = {
iterations: 2,
vus: 1,
}

export default function() {
let i = __ITER;
setTimeout(()=> { console.log(i) }, 1000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think is it an expected behaviour from a JS developer's point of view? I mean the fact that this function will print 1 and it goes across iterations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exactly what does not happen. __ITER is 0 based so it starts at 0 and goes to 1 in this case. And below we specifically error out immediately on the first/zeroth iteration so we can test that it doesn't cross iterations.

I guess the name is a bit confusing 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the name in 4baf367, is this sufficient?

if (__ITER == 0) {
throw "just error"
} else {
sleep(1)
}
}
`)

eventLoopTest(t, script, func(_ context.Context, _ lib.Runner, err error, logHook *testutils.SimpleLogrusHook) {
require.NoError(t, err)
entries := logHook.Drain()
msgs := make([]string, len(entries))
for i, entry := range entries {
msgs[i] = entry.Message
}
require.Equal(t, []string{"just error\n\tat /script.js:13:4(15)\n\tat native\n", "1"}, msgs)
})
}
7 changes: 6 additions & 1 deletion js/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,12 @@ func (b *Bundle) instantiate(logger logrus.FieldLogger, rt *goja.Runtime, init *
"require": init.Require,
"open": init.Open,
})
if _, err := rt.RunProgram(b.Program); err != nil {
init.moduleVUImpl.eventLoop = newEventLoop(init.moduleVUImpl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the VU -> event loop -> VU cycle required?

The connection between the eventloop and the Runtime is the VU so maybe we should have the following methods directly on the VUImpl ? 🤔

vu.rt.SetPromiseRejectionTracker(e.promiseRejectionTracker)
vu.addSetTimeout()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of prefer it, it makes the whole event loop implementation feel almost as a another js module, that just happens to be used by the js package.

Also the vu already has a ton of methods and functionality pin on it, so adding more doesn't really seem that much better.

It is very likely that whatever additional functionality is added to event loop and vu lifecycle will require quite a bit of rewriting of this either way, so this is likely to change a lot in the coming months.

err := init.moduleVUImpl.eventLoop.start(func() error {
_, err := rt.RunProgram(b.Program)
return err
})
if err != nil {
var exception *goja.Exception
if errors.As(err, &exception) {
err = &scriptException{inner: exception}
Expand Down
5 changes: 5 additions & 0 deletions js/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/*
Package js is the JavaScript implementation of the lib.Runner and relative concepts for
executing concurrent-safe JavaScript code.
*/
package js
141 changes: 141 additions & 0 deletions js/eventloop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package js

import (
"fmt"
"sync"

"github.com/dop251/goja"
"go.k6.io/k6/js/modules"
)

// eventLoop implements an event with
// handling of unhandled rejected promises.
//
// A specific thing about this event loop is that it will wait to return
// not only until the queue is empty but until nothing is registered that it will run in the future.
// This is in contrast with more common behaviours where it only returns on
// a specific event/action or when the loop is empty.
// This is required as in k6 iterations (for which event loop will be primary used)
// are supposed to be independent and any work started in them needs to finish,
// but also they need to end when all the instructions are done.
// Additionally because of this on any error while the event loop will exit it's
// required to wait on the event loop to be empty before the execution can continue.
type eventLoop struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My proposal on name changes:

  • RunOnLoop(f) -> Add(f)
  • Reserve remains as is
  • Start -> Process

Rationale: eventLoop is basically a queue and Start is a blocking call that goes through that queue and it is allowed to call it more than once sequentially. If naming would reflect that more closely, we would have modules.VU interface free to use the names more fitting to the main purpose (like 'promise', 'loop', etc.) without creating duplicate-looking methods and with less confusion between different objects. IOW, eventLoop can be seen as a low-level queue while modules.VU is a higher level interface to work with that queue.

lock sync.Mutex
queue []func() error
wakeupCh chan struct{} // TODO: maybe use sync.Cond ?
registeredCallbacks int
vu modules.VU

// pendingPromiseRejections are rejected promises with no handler,
// if there is something in this map at an end of an event loop then it will exit with an error.
// It's similar to what Deno and Node do.
pendingPromiseRejections map[*goja.Promise]struct{}
}

// newEventLoop returns a new event loop with a few helpers attached to it:
// - reporting (and aborting on) unhandled promise rejections
func newEventLoop(vu modules.VU) *eventLoop {
e := &eventLoop{
wakeupCh: make(chan struct{}, 1),
pendingPromiseRejections: make(map[*goja.Promise]struct{}),
vu: vu,
}
vu.Runtime().SetPromiseRejectionTracker(e.promiseRejectionTracker)

return e
}

func (e *eventLoop) wakeup() {
select {
case e.wakeupCh <- struct{}{}:
default:
}
}

// registerCallback register that a callback will be invoked on the loop, preventing it from returning/finishing.
// The returned function, upon invocation, will queue its argument and wakeup the loop if needed.
// If the eventLoop has since stopped, it will not be executed.
// This function *must* be called from within running on the event loop, but its result can be called from anywhere.
func (e *eventLoop) registerCallback() func(func() error) {
e.lock.Lock()
e.registeredCallbacks++
e.lock.Unlock()

return func(f func() error) {
codebien marked this conversation as resolved.
Show resolved Hide resolved
e.lock.Lock()
e.queue = append(e.queue, f)
e.registeredCallbacks--
e.lock.Unlock()
e.wakeup()
imiric marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (e *eventLoop) promiseRejectionTracker(p *goja.Promise, op goja.PromiseRejectionOperation) {
// No locking necessary here as the goja runtime will call this synchronously
// Read Notes on https://tc39.es/ecma262/#sec-host-promise-rejection-tracker
if op == goja.PromiseRejectionReject {
e.pendingPromiseRejections[p] = struct{}{}
} else { // goja.PromiseRejectionHandle so a promise that was previously rejected without handler now got one
delete(e.pendingPromiseRejections, p)
}
}

func (e *eventLoop) popAll() (queue []func() error, awaiting bool) {
e.lock.Lock()
queue = e.queue
e.queue = make([]func() error, 0, len(queue))
awaiting = e.registeredCallbacks != 0
e.lock.Unlock()
return
}

// start will run the event loop until it's empty and there are no uninvoked registered callbacks
// or a queued function returns an error. The provided firstCallback will be the first thing executed.
// After start returns the event loop can be reused as long as waitOnRegistered is called.
func (e *eventLoop) start(firstCallback func() error) error {
e.queue = []func() error{firstCallback}
for {
queue, awaiting := e.popAll()

if len(queue) == 0 {
if !awaiting {
return nil
}
<-e.wakeupCh
continue
}

for _, f := range queue {
if err := f(); err != nil {
return err
}
}

// This will get a random unhandled rejection instead of the first one, for example.
// But that seems to be the case in other tools as well so it seems to not be that big of a problem.
for promise := range e.pendingPromiseRejections {
// TODO maybe throw the whole promise up and get make a better message outside of the event loop
value := promise.Result()
if o := value.ToObject(e.vu.Runtime()); o != nil {
stack := o.Get("stack")
if stack != nil {
value = stack
}
}
// this is the de facto wording in both firefox and deno at least
return fmt.Errorf("Uncaught (in promise) %s", value) //nolint:stylecheck
codebien marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

// Wait on all registered callbacks so we know nothing is still doing work.
func (e *eventLoop) waitOnRegistered() {
for {
_, awaiting := e.popAll()
if !awaiting {
return
}
<-e.wakeupCh
}
}
Loading