Skip to content

Commit

Permalink
Experimentally implement shared workers
Browse files Browse the repository at this point in the history
  • Loading branch information
novemberborn committed Aug 22, 2020
1 parent b93cce1 commit aba2021
Show file tree
Hide file tree
Showing 87 changed files with 1,645 additions and 42 deletions.
7 changes: 6 additions & 1 deletion ava.config.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
const skipTests = [];
if (process.versions.node < '12.17.0') {
skipTests.push('!test/shared-workers/!(requires-newish-node)/**');
}

export default {
files: ['test/**', '!test/**/{fixtures,helpers}/**'],
files: ['test/**', '!test/**/{fixtures,helpers}/**', ...skipTests],
ignoredByWatcher: ['{coverage,docs,media,test-d,test-tap}/**']
};
5 changes: 4 additions & 1 deletion lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const RunStatus = require('./run-status');
const fork = require('./fork');
const serializeError = require('./serialize-error');
const {getApplicableLineNumbers} = require('./line-numbers');
const sharedWorkers = require('./plugin-support/shared-workers');

function resolveModules(modules) {
return arrify(modules).map(name => {
Expand Down Expand Up @@ -231,14 +232,16 @@ class Api extends Emittery {

const worker = fork(file, options, apiOptions.nodeArguments);
runStatus.observeWorker(worker, file, {selectingLines: lineNumbers.length > 0});
const deregistered = sharedWorkers.observeWorkerProcess(worker, runStatus);

pendingWorkers.add(worker);
worker.promise.then(() => {
pendingWorkers.delete(worker);
});
restartTimer();

return worker.promise;
await worker.promise;
await deregistered;
}, {concurrency, stopOnError: false});
} catch (error) {
if (error && error.name === 'AggregateError') {
Expand Down
64 changes: 64 additions & 0 deletions lib/fork.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,55 @@ const serializeOptions = useAdvanced ?
options => JSON.parse(JSON.stringify(options)) : // Use JSON serialization to remove non-clonable values.
options => options;

class SharedWorkerChannel extends Emittery {
constructor({channelId, filename, initialData}, sendToFork) {
super();

this.id = channelId;
this.filename = filename;
this.initialData = initialData;
this.sendToFork = sendToFork;
}

signalReady() {
this.sendToFork({
type: 'shared-worker-ready',
channelId: this.id
});
}

signalError() {
this.sendToFork({
type: 'shared-worker-error',
channelId: this.id
});
}

emitMessage({messageId, replyTo, data}) {
this.emit('message', {
messageId,
replyTo,
data
});
}

forwardMessageToFork({messageId, replyTo, data}) {
this.sendToFork({
type: 'shared-worker-message',
channelId: this.id,
messageId,
replyTo,
data
});
}
}

let forkCounter = 0;

module.exports = (file, options, execArgv = process.execArgv) => {
const forkId = `fork/${++forkCounter}`;
const sharedWorkerChannels = new Map();

let finished = false;

const emitter = new Emittery();
Expand All @@ -33,6 +81,7 @@ module.exports = (file, options, execArgv = process.execArgv) => {
options = {
baseDir: process.cwd(),
file,
forkId,
...options
};

Expand Down Expand Up @@ -76,6 +125,16 @@ module.exports = (file, options, execArgv = process.execArgv) => {
case 'ready-for-options':
send({type: 'options', options: serializeOptions(options)});
break;
case 'shared-worker-connect': {
const channel = new SharedWorkerChannel(message.ava, send);
sharedWorkerChannels.set(channel.id, channel);
emitter.emit('connectSharedWorker', channel);
break;
}

case 'shared-worker-message':
sharedWorkerChannels.get(message.ava.channelId).emitMessage(message.ava);
break;
case 'ping':
send({type: 'pong'});
break;
Expand Down Expand Up @@ -106,6 +165,7 @@ module.exports = (file, options, execArgv = process.execArgv) => {

return {
file,
forkId,
promise,

exit() {
Expand All @@ -117,6 +177,10 @@ module.exports = (file, options, execArgv = process.execArgv) => {
send({type: 'peer-failed'});
},

onConnectSharedWorker(listener) {
return emitter.on('connectSharedWorker', listener);
},

onStateChange(listener) {
return emitter.on('stateChange', listener);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/load-config.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const pkgConf = require('pkg-conf');

const NO_SUCH_FILE = Symbol('no ava.config.js file');
const MISSING_DEFAULT_EXPORT = Symbol('missing default export');
const EXPERIMENTS = new Set(['disableSnapshotsInHooks', 'reverseTeardowns']);
const EXPERIMENTS = new Set(['disableSnapshotsInHooks', 'reverseTeardowns', 'sharedWorkers']);

// *Very* rudimentary support for loading ava.config.js files containing an `export default` statement.
const evaluateJsConfig = configFile => {
Expand Down
251 changes: 251 additions & 0 deletions lib/plugin-support/shared-worker-loader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
const {EventEmitter, on} = require('events');
const {workerData, parentPort} = require('worker_threads'); // eslint-disable-line node/no-unsupported-features/node-builtins
const pkg = require('../../package.json');

// Used to forward messages received over the `parentPort`. Every subscription
// adds a listener, so do not enforce any maximums.
const events = new EventEmitter().setMaxListeners(0);

// Map of active test workers, used in receiveMessages() to get a reference to
// the TestWorker instance, and relevant release functions.
const activeTestWorkers = new Map();

class TestWorker {
constructor(id, file) {
this.id = id;
this.file = file;
}

defer(fn) {
let released = false;
const release = async () => {
if (released) {
return;
}

released = true;
if (activeTestWorkers.has(this.id)) {
activeTestWorkers.get(this.id).releaseFns.delete(release);
}

await fn();
};

activeTestWorkers.get(this.id).releaseFns.add(release);

return release;
}

publish(data) {
return publishMessage(this, data);
}

async * subscribe() {
yield * receiveMessages(this);
}
}

class ReceivedMessage {
constructor(testWorker, id, data) {
this.testWorker = testWorker;
this.id = id;
this.data = data;
}

reply(data) {
return publishMessage(this.testWorker, data, this.id);
}
}

// Ensure that, no matter how often it's received, we have a stable message
// object.
const messageCache = new WeakMap();

async function * receiveMessages(fromTestWorker, replyTo) {
for await (const [message] of on(events, 'message')) {
if (fromTestWorker !== undefined) {
if (message.type === 'deregister-test-worker' && message.id === fromTestWorker.id) {
return;
}

if (message.type === 'message' && message.testWorkerId !== fromTestWorker.id) {
continue;
}
}

if (message.type !== 'message') {
continue;
}

if (replyTo === undefined && message.replyTo !== undefined) {
continue;
}

if (replyTo !== undefined && message.replyTo !== replyTo) {
continue;
}

const active = activeTestWorkers.get(message.testWorkerId);
// It is possible for a message to have been buffering for so long — perhaps
// due to the caller waiting before iterating to the next message — that the
// test worker has been deregistered. Ignore such messages.
//
// (This is really hard to write a test for, however!)
if (active === undefined) {
continue;
}

let received = messageCache.get(message);
if (received === undefined) {
received = new ReceivedMessage(active.instance, message.messageId, message.data);
messageCache.set(message, received);
}

yield received;
}
}

let messageCounter = 0;
const messageIdPrefix = `${workerData.id}/message`;
const nextMessageId = () => `${messageIdPrefix}/${++messageCounter}`;

function publishMessage(testWorker, data, replyTo) {
const id = nextMessageId();
parentPort.postMessage({
type: 'message',
messageId: id,
testWorkerId: testWorker.id,
data,
replyTo
});

return {
id,
async * replies() {
yield * receiveMessages(testWorker, id);
}
};
}

function broadcastMessage(data) {
const id = nextMessageId();
parentPort.postMessage({
type: 'broadcast',
messageId: id,
data
});

return {
id,
async * replies() {
yield * receiveMessages(undefined, id);
}
};
}

async function loadFactory() {
try {
const mod = require(workerData.filename);
if (typeof mod === 'function') {
return mod;
}

return mod.default;
} catch (error) {
if (error && (error.code === 'ERR_REQUIRE_ESM' || (error.code === 'MODULE_NOT_FOUND' && workerData.filename.startsWith('file://')))) {
const {default: factory} = await import(workerData.filename); // eslint-disable-line node/no-unsupported-features/es-syntax
return factory;
}

throw error;
}
}

let signalAvailable = () => {
parentPort.postMessage({type: 'available'});
signalAvailable = () => {};
};

let fatal;
loadFactory(workerData.filename).then(factory => {
if (typeof factory !== 'function') {
throw new TypeError(`Missing default factory function export for shared worker plugin at ${workerData.filename}`);
}

factory({
negotiateProtocol(supported) {
if (!supported.includes('experimental')) {
fatal = new Error(`This version of AVA (${pkg.version}) is not compatible with shared worker plugin at ${workerData.filename}`);
throw fatal;
}

const produceTestWorker = instance => events.emit('testWorker', instance);

parentPort.on('message', async message => {
if (message.type === 'register-test-worker') {
const {id, file} = message;
const instance = new TestWorker(id, file);

activeTestWorkers.set(id, {instance, releaseFns: new Set()});

produceTestWorker(instance);
}

if (message.type === 'deregister-test-worker') {
const {id} = message;
const {releaseFns} = activeTestWorkers.get(id);
activeTestWorkers.delete(id);

// Run possibly asynchronous release functions serially, in reverse
// order. Any error will crash the worker.
for await (const fn of [...releaseFns].reverse()) {
await fn();
}

parentPort.postMessage({
type: 'deregistered-test-worker',
id
});
}

// Wait for a turn of the event loop, to allow new subscriptions to be
// set up in response to the previous message.
setImmediate(() => events.emit('message', message));
});

return {
initialData: workerData.initialData,
protocol: 'experimental',

ready() {
signalAvailable();
return this;
},

broadcast(data) {
return broadcastMessage(data);
},

async * subscribe() {
yield * receiveMessages();
},

async * testWorkers() {
for await (const [worker] of on(events, 'testWorker')) {
yield worker;
}
}
};
}
});
}).catch(error => {
if (fatal === undefined) {
fatal = error;
}
}).finally(() => {
if (fatal !== undefined) {
process.nextTick(() => {
throw fatal;
});
}
});
Loading

0 comments on commit aba2021

Please sign in to comment.