Skip to content

Commit

Permalink
Experimentally implement shared workers
Browse files Browse the repository at this point in the history
  • Loading branch information
novemberborn committed Jul 4, 2020
1 parent 47514d8 commit 5bde5aa
Show file tree
Hide file tree
Showing 32 changed files with 1,006 additions and 9 deletions.
7 changes: 6 additions & 1 deletion ava.config.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
const skipTests = [];
if (process.versions.node < '12.17.0') {
skipTests.push('!test/shared-workers/!(requires-newish-node)/**');
}

export default {
files: ['test/**', '!test/**/{fixtures,helpers}/**'],
files: ['test/**', '!test/**/{fixtures,helpers}/**', ...skipTests],
ignoredByWatcher: ['{coverage,docs,media,test-d,test-tap}/**']
};
2 changes: 2 additions & 0 deletions lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const RunStatus = require('./run-status');
const fork = require('./fork');
const serializeError = require('./serialize-error');
const {getApplicableLineNumbers} = require('./line-numbers');
const sharedWorkers = require('./shared-workers');

function resolveModules(modules) {
return arrify(modules).map(name => {
Expand Down Expand Up @@ -231,6 +232,7 @@ class Api extends Emittery {

const worker = fork(file, options, apiOptions.nodeArguments);
runStatus.observeWorker(worker, file, {selectingLines: lineNumbers.length > 0});
sharedWorkers.observeWorkerProcess(worker, runStatus);

pendingWorkers.add(worker);
worker.promise.then(() => {
Expand Down
64 changes: 64 additions & 0 deletions lib/fork.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,55 @@ const serializeOptions = useAdvanced ?
options => JSON.parse(JSON.stringify(options)) : // Use JSON serialization to remove non-clonable values.
options => options;

class SharedWorkerChannel extends Emittery {
constructor({channelId, filename, initialData}, sendToFork) {
super();

this.id = channelId;
this.filename = filename;
this.initialData = initialData;
this.sendToFork = sendToFork;
}

signalReady() {
this.sendToFork({
type: 'shared-worker-ready',
channelId: this.id
});
}

signalError() {
this.sendToFork({
type: 'shared-worker-error',
channelId: this.id
});
}

emitMessage({messageId, replyTo, data}) {
this.emit('message', {
messageId,
replyTo,
data
});
}

forwardMessageToFork({messageId, replyTo, data}) {
this.sendToFork({
type: 'shared-worker-message',
channelId: this.id,
messageId,
replyTo,
data
});
}
}

let forkCounter = 0;

module.exports = (file, options, execArgv = process.execArgv) => {
const forkId = `fork/${++forkCounter}`;
const sharedWorkerChannels = new Map();

let finished = false;

const emitter = new Emittery();
Expand All @@ -32,6 +80,7 @@ module.exports = (file, options, execArgv = process.execArgv) => {
options = {
baseDir: process.cwd(),
file,
forkId,
...options
};

Expand Down Expand Up @@ -75,6 +124,16 @@ module.exports = (file, options, execArgv = process.execArgv) => {
case 'ready-for-options':
send({type: 'options', options: serializeOptions(options)});
break;
case 'shared-worker-connect': {
const channel = new SharedWorkerChannel(message.ava, send);
sharedWorkerChannels.set(channel.id, channel);
emitter.emit('connectSharedWorker', channel);
break;
}

case 'shared-worker-message':
sharedWorkerChannels.get(message.ava.channelId).emitMessage(message.ava);
break;
case 'ping':
send({type: 'pong'});
break;
Expand Down Expand Up @@ -105,6 +164,7 @@ module.exports = (file, options, execArgv = process.execArgv) => {

return {
file,
forkId,
promise,

exit() {
Expand All @@ -116,6 +176,10 @@ module.exports = (file, options, execArgv = process.execArgv) => {
send({type: 'peer-failed'});
},

onConnectSharedWorker(listener) {
return emitter.on('connectSharedWorker', listener);
},

onStateChange(listener) {
return emitter.on('stateChange', listener);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/load-config.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const pkgConf = require('pkg-conf');

const NO_SUCH_FILE = Symbol('no ava.config.js file');
const MISSING_DEFAULT_EXPORT = Symbol('missing default export');
const EXPERIMENTS = new Set(['disableSnapshotsInHooks', 'likeAssertion', 'reverseTeardowns']);
const EXPERIMENTS = new Set(['disableSnapshotsInHooks', 'likeAssertion', 'reverseTeardowns', 'sharedWorkers']);

// *Very* rudimentary support for loading ava.config.js files containing an `export default` statement.
const evaluateJsConfig = configFile => {
Expand Down
35 changes: 33 additions & 2 deletions lib/reporters/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ class Reporter {
this.internalErrors = [];
this.knownFailures = [];
this.lineNumberErrors = [];
this.sharedWorkerErrors = [];
this.uncaughtExceptions = [];
this.unhandledRejections = [];
this.unsavedSnapshots = [];
Expand Down Expand Up @@ -296,6 +297,19 @@ class Reporter {
break;
}

case 'shared-worker-error': {
this.sharedWorkerErrors.push(event);

if (this.verbose) {
this.lineWriter.ensureEmptyLine();
this.lineWriter.writeLine(colors.error(`${figures.cross} Error in shared worker`));
this.lineWriter.writeLine();
this.writeErr(event);
}

break;
}

case 'snapshot-error':
this.unsavedSnapshots.push(event);
break;
Expand Down Expand Up @@ -670,7 +684,7 @@ class Reporter {
}

if (this.failures.length > 0) {
const writeTrailingLines = this.internalErrors.length > 0 || this.uncaughtExceptions.length > 0 || this.unhandledRejections.length > 0;
const writeTrailingLines = this.internalErrors.length > 0 || this.sharedWorkerErrors.length > 0 || this.uncaughtExceptions.length > 0 || this.unhandledRejections.length > 0;

const lastFailure = this.failures[this.failures.length - 1];
for (const event of this.failures) {
Expand All @@ -694,7 +708,7 @@ class Reporter {

if (!this.verbose) {
if (this.internalErrors.length > 0) {
const writeTrailingLines = this.uncaughtExceptions.length > 0 || this.unhandledRejections.length > 0;
const writeTrailingLines = this.sharedWorkerErrors.length > 0 || this.uncaughtExceptions.length > 0 || this.unhandledRejections.length > 0;

const last = this.internalErrors[this.internalErrors.length - 1];
for (const event of this.internalErrors) {
Expand All @@ -716,6 +730,23 @@ class Reporter {
}
}

if (this.sharedWorkerErrors.length > 0) {
const writeTrailingLines = this.uncaughtExceptions.length > 0 || this.unhandledRejections.length > 0;

const last = this.sharedWorkerErrors[this.sharedWorkerErrors.length - 1];
for (const evt of this.sharedWorkerErrors) {
this.lineWriter.writeLine(colors.error(`${figures.cross} Error in shared worker`));
this.lineWriter.writeLine();
this.writeErr(evt.err);
if (evt !== last || writeTrailingLines) {
this.lineWriter.writeLine();
this.lineWriter.writeLine();
}

wroteSomething = true;
}
}

if (this.uncaughtExceptions.length > 0) {
const writeTrailingLines = this.unhandledRejections.length > 0;

Expand Down
3 changes: 3 additions & 0 deletions lib/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Runner extends Emittery {
serial: [],
todo: []
};
this.waitForReady = [];

const uniqueTestTitles = new Set();
this.registerUniqueTitle = title => {
Expand Down Expand Up @@ -444,6 +445,8 @@ class Runner extends Emittery {
});
}

await Promise.all(this.waitForReady);

if (concurrentTests.length === 0 && serialTests.length === 0) {
this.emit('finish');
// Don't run any hooks if there are no tests to run.
Expand Down
Loading

0 comments on commit 5bde5aa

Please sign in to comment.