Skip to content

Commit

Permalink
Experimentally implement shared workers
Browse files Browse the repository at this point in the history
  • Loading branch information
novemberborn committed Sep 20, 2020
1 parent 8e72354 commit 1e85865
Show file tree
Hide file tree
Showing 87 changed files with 1,656 additions and 43 deletions.
7 changes: 5 additions & 2 deletions ava.config.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
const skipTests = [];
if (process.versions.node < '12.14.0') {
skipTests.push('!test/configurable-module-format/module.js');
if (process.versions.node < '12.17.0') {
skipTests.push(
'!test/configurable-module-format/module.js',
'!test/shared-workers/!(requires-newish-node)/**'
);
}

export default {
Expand Down
9 changes: 8 additions & 1 deletion lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const RunStatus = require('./run-status');
const fork = require('./fork');
const serializeError = require('./serialize-error');
const {getApplicableLineNumbers} = require('./line-numbers');
const sharedWorkers = require('./plugin-support/shared-workers');

function resolveModules(modules) {
return arrify(modules).map(name => {
Expand Down Expand Up @@ -206,6 +207,8 @@ class Api extends Emittery {
concurrency = 1;
}

const deregisteredSharedWorkers = [];

// Try and run each file, limited by `concurrency`.
await pMap(selectedFiles, async file => {
// No new files should be run once a test has timed out or failed,
Expand All @@ -231,15 +234,19 @@ class Api extends Emittery {

const worker = fork(file, options, apiOptions.nodeArguments);
runStatus.observeWorker(worker, file, {selectingLines: lineNumbers.length > 0});
deregisteredSharedWorkers.push(sharedWorkers.observeWorkerProcess(worker, runStatus));

pendingWorkers.add(worker);
worker.promise.then(() => {
pendingWorkers.delete(worker);
});
restartTimer();

return worker.promise;
await worker.promise;
}, {concurrency, stopOnError: false});

// Allow shared workers to clean up before the run ends.
await Promise.all(deregisteredSharedWorkers);
} catch (error) {
if (error && error.name === 'AggregateError') {
for (const err of error) {
Expand Down
64 changes: 64 additions & 0 deletions lib/fork.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,55 @@ if (fs.realpathSync(__filename) !== __filename) {
const AVA_PATH = path.resolve(__dirname, '..');
const WORKER_PATH = require.resolve('./worker/subprocess');

class SharedWorkerChannel extends Emittery {
constructor({channelId, filename, initialData}, sendToFork) {
super();

this.id = channelId;
this.filename = filename;
this.initialData = initialData;
this.sendToFork = sendToFork;
}

signalReady() {
this.sendToFork({
type: 'shared-worker-ready',
channelId: this.id
});
}

signalError() {
this.sendToFork({
type: 'shared-worker-error',
channelId: this.id
});
}

emitMessage({messageId, replyTo, serializedData}) {
this.emit('message', {
messageId,
replyTo,
serializedData
});
}

forwardMessageToFork({messageId, replyTo, serializedData}) {
this.sendToFork({
type: 'shared-worker-message',
channelId: this.id,
messageId,
replyTo,
serializedData
});
}
}

let forkCounter = 0;

module.exports = (file, options, execArgv = process.execArgv) => {
const forkId = `fork/${++forkCounter}`;
const sharedWorkerChannels = new Map();

let finished = false;

const emitter = new Emittery();
Expand All @@ -27,6 +75,7 @@ module.exports = (file, options, execArgv = process.execArgv) => {
options = {
baseDir: process.cwd(),
file,
forkId,
...options
};

Expand Down Expand Up @@ -69,6 +118,16 @@ module.exports = (file, options, execArgv = process.execArgv) => {
case 'ready-for-options':
send({type: 'options', options});
break;
case 'shared-worker-connect': {
const channel = new SharedWorkerChannel(message.ava, send);
sharedWorkerChannels.set(channel.id, channel);
emitter.emit('connectSharedWorker', channel);
break;
}

case 'shared-worker-message':
sharedWorkerChannels.get(message.ava.channelId).emitMessage(message.ava);
break;
case 'ping':
send({type: 'pong'});
break;
Expand Down Expand Up @@ -99,6 +158,7 @@ module.exports = (file, options, execArgv = process.execArgv) => {

return {
file,
forkId,
promise,

exit() {
Expand All @@ -110,6 +170,10 @@ module.exports = (file, options, execArgv = process.execArgv) => {
send({type: 'peer-failed'});
},

onConnectSharedWorker(listener) {
return emitter.on('connectSharedWorker', listener);
},

onStateChange(listener) {
return emitter.on('stateChange', listener);
}
Expand Down
8 changes: 7 additions & 1 deletion lib/load-config.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ const pkgConf = require('pkg-conf');

const NO_SUCH_FILE = Symbol('no ava.config.js file');
const MISSING_DEFAULT_EXPORT = Symbol('missing default export');
const EXPERIMENTS = new Set(['configurableModuleFormat', 'disableNullExpectations', 'disableSnapshotsInHooks', 'reverseTeardowns']);
const EXPERIMENTS = new Set([
'configurableModuleFormat',
'disableNullExpectations',
'disableSnapshotsInHooks',
'reverseTeardowns',
'sharedWorkers'
]);

// *Very* rudimentary support for loading ava.config.js files containing an `export default` statement.
const evaluateJsConfig = configFile => {
Expand Down
Loading

0 comments on commit 1e85865

Please sign in to comment.