diff --git a/ava.config.js b/ava.config.js index 363e4279f1..813350fe01 100644 --- a/ava.config.js +++ b/ava.config.js @@ -1,4 +1,9 @@ +const skipTests = []; +if (process.versions.node < '12.17.0') { + skipTests.push('!test/shared-workers/!(requires-newish-node)/**'); +} + export default { - files: ['test/**', '!test/**/{fixtures,helpers}/**'], + files: ['test/**', '!test/**/{fixtures,helpers}/**', ...skipTests], ignoredByWatcher: ['{coverage,docs,media,test-d,test-tap}/**'] }; diff --git a/lib/api.js b/lib/api.js index 92a9b9162b..7c2cc0fc28 100644 --- a/lib/api.js +++ b/lib/api.js @@ -17,6 +17,7 @@ const RunStatus = require('./run-status'); const fork = require('./fork'); const serializeError = require('./serialize-error'); const {getApplicableLineNumbers} = require('./line-numbers'); +const sharedWorkers = require('./shared-workers'); function resolveModules(modules) { return arrify(modules).map(name => { @@ -231,6 +232,7 @@ class Api extends Emittery { const worker = fork(file, options, apiOptions.nodeArguments); runStatus.observeWorker(worker, file, {selectingLines: lineNumbers.length > 0}); + sharedWorkers.observeWorkerProcess(worker, runStatus); pendingWorkers.add(worker); worker.promise.then(() => { diff --git a/lib/fork.js b/lib/fork.js index cd4b6b9089..c591e69048 100644 --- a/lib/fork.js +++ b/lib/fork.js @@ -19,7 +19,55 @@ const serializeOptions = useAdvanced ? options => JSON.parse(JSON.stringify(options)) : // Use JSON serialization to remove non-clonable values. options => options; +class SharedWorkerChannel extends Emittery { + constructor({channelId, filename, initialData}, sendToFork) { + super(); + + this.id = channelId; + this.filename = filename; + this.initialData = initialData; + this.sendToFork = sendToFork; + } + + signalReady() { + this.sendToFork({ + type: 'shared-worker-ready', + channelId: this.id + }); + } + + signalError() { + this.sendToFork({ + type: 'shared-worker-error', + channelId: this.id + }); + } + + emitMessage({messageId, replyTo, data}) { + this.emit('message', { + messageId, + replyTo, + data + }); + } + + forwardMessageToFork({messageId, replyTo, data}) { + this.sendToFork({ + type: 'shared-worker-message', + channelId: this.id, + messageId, + replyTo, + data + }); + } +} + +let forkCounter = 0; + module.exports = (file, options, execArgv = process.execArgv) => { + const forkId = `fork/${++forkCounter}`; + const sharedWorkerChannels = new Map(); + let finished = false; const emitter = new Emittery(); @@ -32,6 +80,7 @@ module.exports = (file, options, execArgv = process.execArgv) => { options = { baseDir: process.cwd(), file, + forkId, ...options }; @@ -75,6 +124,16 @@ module.exports = (file, options, execArgv = process.execArgv) => { case 'ready-for-options': send({type: 'options', options: serializeOptions(options)}); break; + case 'shared-worker-connect': { + const channel = new SharedWorkerChannel(message.ava, send); + sharedWorkerChannels.set(channel.id, channel); + emitter.emit('connectSharedWorker', channel); + break; + } + + case 'shared-worker-message': + sharedWorkerChannels.get(message.ava.channelId).emitMessage(message.ava); + break; case 'ping': send({type: 'pong'}); break; @@ -105,6 +164,7 @@ module.exports = (file, options, execArgv = process.execArgv) => { return { file, + forkId, promise, exit() { @@ -116,6 +176,10 @@ module.exports = (file, options, execArgv = process.execArgv) => { send({type: 'peer-failed'}); }, + onConnectSharedWorker(listener) { + return emitter.on('connectSharedWorker', listener); + }, + onStateChange(listener) { return emitter.on('stateChange', listener); } diff --git a/lib/load-config.js b/lib/load-config.js index f84983b4b6..531cfe5b8f 100644 --- a/lib/load-config.js +++ b/lib/load-config.js @@ -7,7 +7,7 @@ const pkgConf = require('pkg-conf'); const NO_SUCH_FILE = Symbol('no ava.config.js file'); const MISSING_DEFAULT_EXPORT = Symbol('missing default export'); -const EXPERIMENTS = new Set(['disableSnapshotsInHooks', 'likeAssertion', 'reverseTeardowns']); +const EXPERIMENTS = new Set(['disableSnapshotsInHooks', 'likeAssertion', 'reverseTeardowns', 'sharedWorkers']); // *Very* rudimentary support for loading ava.config.js files containing an `export default` statement. const evaluateJsConfig = configFile => { diff --git a/lib/reporters/default.js b/lib/reporters/default.js index 7dbfcaeec5..5789d475a3 100644 --- a/lib/reporters/default.js +++ b/lib/reporters/default.js @@ -151,6 +151,7 @@ class Reporter { this.internalErrors = []; this.knownFailures = []; this.lineNumberErrors = []; + this.sharedWorkerErrors = []; this.uncaughtExceptions = []; this.unhandledRejections = []; this.unsavedSnapshots = []; @@ -296,6 +297,19 @@ class Reporter { break; } + case 'shared-worker-error': { + this.sharedWorkerErrors.push(event); + + if (this.verbose) { + this.lineWriter.ensureEmptyLine(); + this.lineWriter.writeLine(colors.error(`${figures.cross} Error in shared worker`)); + this.lineWriter.writeLine(); + this.writeErr(event); + } + + break; + } + case 'snapshot-error': this.unsavedSnapshots.push(event); break; @@ -670,7 +684,7 @@ class Reporter { } if (this.failures.length > 0) { - const writeTrailingLines = this.internalErrors.length > 0 || this.uncaughtExceptions.length > 0 || this.unhandledRejections.length > 0; + const writeTrailingLines = this.internalErrors.length > 0 || this.sharedWorkerErrors.length > 0 || this.uncaughtExceptions.length > 0 || this.unhandledRejections.length > 0; const lastFailure = this.failures[this.failures.length - 1]; for (const event of this.failures) { @@ -694,7 +708,7 @@ class Reporter { if (!this.verbose) { if (this.internalErrors.length > 0) { - const writeTrailingLines = this.uncaughtExceptions.length > 0 || this.unhandledRejections.length > 0; + const writeTrailingLines = this.sharedWorkerErrors.length > 0 || this.uncaughtExceptions.length > 0 || this.unhandledRejections.length > 0; const last = this.internalErrors[this.internalErrors.length - 1]; for (const event of this.internalErrors) { @@ -716,6 +730,23 @@ class Reporter { } } + if (this.sharedWorkerErrors.length > 0) { + const writeTrailingLines = this.uncaughtExceptions.length > 0 || this.unhandledRejections.length > 0; + + const last = this.sharedWorkerErrors[this.sharedWorkerErrors.length - 1]; + for (const evt of this.sharedWorkerErrors) { + this.lineWriter.writeLine(colors.error(`${figures.cross} Error in shared worker`)); + this.lineWriter.writeLine(); + this.writeErr(evt.err); + if (evt !== last || writeTrailingLines) { + this.lineWriter.writeLine(); + this.lineWriter.writeLine(); + } + + wroteSomething = true; + } + } + if (this.uncaughtExceptions.length > 0) { const writeTrailingLines = this.unhandledRejections.length > 0; diff --git a/lib/runner.js b/lib/runner.js index dbb076b930..37c7211abe 100644 --- a/lib/runner.js +++ b/lib/runner.js @@ -42,6 +42,7 @@ class Runner extends Emittery { serial: [], todo: [] }; + this.waitForReady = []; const uniqueTestTitles = new Set(); this.registerUniqueTitle = title => { @@ -444,6 +445,8 @@ class Runner extends Emittery { }); } + await Promise.all(this.waitForReady); + if (concurrentTests.length === 0 && serialTests.length === 0) { this.emit('finish'); // Don't run any hooks if there are no tests to run. diff --git a/lib/shared-worker-launcher.js b/lib/shared-worker-launcher.js new file mode 100644 index 0000000000..40e4365100 --- /dev/null +++ b/lib/shared-worker-launcher.js @@ -0,0 +1,227 @@ +const {EventEmitter, on} = require('events'); +const {workerData, parentPort} = require('worker_threads'); // eslint-disable-line node/no-unsupported-features/node-builtins +const nowAndTimers = require('./now-and-timers'); +const pkg = require('../package.json'); + +// Map of active test workers, used in receiveMessages() to get a reference to +// the TestWorker instance, and relevant release functions. +const activeTestWorkers = new Map(); + +class TestWorker { + constructor(id, file) { + this.id = id; + this.file = file; + } + + defer(fn) { + let released = false; + const release = async () => { + if (released) { + return; + } + + released = true; + if (activeTestWorkers.has(this.id)) { + activeTestWorkers.get(this.id).releaseFns.delete(release); + } + + await fn(); + }; + + activeTestWorkers.get(this.id).releaseFns.add(release); + + return release; + } + + publish(data) { + return publishMessage(this, data); + } + + async * subscribe() { + yield * receiveMessages(this); + } +} + +class ReceivedMessage { + constructor(testWorker, id, data) { + this.testWorker = testWorker; + this.id = id; + this.data = data; + } + + reply(data) { + return publishMessage(this.testWorker, data, this.id); + } +} + +// Ensure that, no matter how often it's received, we have a stable message +// object. +const messageCache = new WeakMap(); + +// Allow micro tasks to finish processing the previous message. +const turn = () => new Promise(resolve => { + nowAndTimers.setImmediate(resolve); +}); + +async function * receiveMessages(fromTestWorker, replyTo) { + for await (const [evt] of on(parentPort, 'message')) { + if (fromTestWorker !== undefined) { + if (evt.type === 'deregister-test-worker' && evt.id === fromTestWorker.id) { + return; + } + + if (evt.type === 'message' && evt.testWorkerId !== fromTestWorker.id) { + continue; + } + } + + if (evt.type !== 'message') { + continue; + } + + if (replyTo === undefined && evt.replyTo !== undefined) { + continue; + } + + if (replyTo !== undefined && evt.replyTo !== replyTo) { + continue; + } + + await turn(); + + const active = activeTestWorkers.get(evt.testWorkerId); + if (active === undefined) { + return; + } + + let message = messageCache.get(evt); + if (message === undefined) { + message = new ReceivedMessage(active.instance, evt.messageId, evt.data); + messageCache.set(evt, message); + } + + yield message; + } +} + +let messageCounter = 0; +const messageIdPrefix = `${workerData.id}/message`; +const nextMessageId = () => `${messageIdPrefix}/${++messageCounter}`; + +function publishMessage(testWorker, data, replyTo) { + const id = nextMessageId(); + parentPort.postMessage({ + type: 'message', + messageId: id, + testWorkerId: testWorker.id, + data, + replyTo + }); + + return { + id, + async * replies() { + yield * receiveMessages(testWorker, id); + } + }; +} + +function broadcastMessage(data) { + const id = nextMessageId(); + parentPort.postMessage({ + type: 'broadcast', + messageId: id, + data + }); + + return { + id, + async * replies() { + yield * receiveMessages(undefined, id); + } + }; +} + +const loadFactory = async () => { + try { + const mod = require(workerData.filename); + if (typeof mod === 'function') { + return mod; + } + + return mod.default; + } catch (error) { + if (error && error.code === 'ERR_REQUIRE_ESM') { + return import(workerData.filename); // eslint-disable-line node/no-unsupported-features/es-syntax + } + + throw error; + } +}; + +let fatal; +loadFactory(workerData.filename).then(factory => factory({ + negotiateProtocol(supported) { + if (!supported.includes('experimental')) { + fatal = new Error(`This version of AVA (${pkg.version}) is not compatible with shared worker plugin at ${workerData.filename}`); + throw fatal; + } + + const events = new EventEmitter(); + const produceTestWorker = instance => events.emit('testWorker', instance); + + parentPort.on('message', async message => { + if (message.type === 'register-test-worker') { + const {id, file} = message; + const instance = new TestWorker(id, file); + + activeTestWorkers.set(id, {instance, releaseFns: new Set()}); + + produceTestWorker(instance); + } + + if (message.type === 'deregister-test-worker') { + const {id} = message; + const {releaseFns} = activeTestWorkers.get(id); + activeTestWorkers.delete(id); + + // Run possibly asynchronous release functions serially, in reverse + // order. Any error will crash the worker. + for await (const fn of [...releaseFns].reverse()) { + await fn(); + } + } + }); + + return { + initialData: workerData.initialData, + protocol: 'experimental', + + broadcast(data) { + return broadcastMessage(data); + }, + + async * subscribe() { + yield * receiveMessages(); + }, + + async * testWorkers() { + for await (const [worker] of on(events, 'testWorker')) { + yield worker; + } + } + }; + } +})).catch(error => { + if (fatal === undefined) { + process.nextTick(() => { + throw error; + }); + } +}); + +if (fatal !== undefined) { + throw fatal; +} + +parentPort.postMessage({type: 'available'}); diff --git a/lib/shared-workers.js b/lib/shared-workers.js new file mode 100644 index 0000000000..da69bdc91a --- /dev/null +++ b/lib/shared-workers.js @@ -0,0 +1,116 @@ +const events = require('events'); +const serializeError = require('./serialize-error'); + +let Worker; +try { + ({Worker} = require('worker_threads')); // eslint-disable-line node/no-unsupported-features/node-builtins +} catch {} + +const LAUNCHER = require.resolve('./shared-worker-launcher'); + +let sharedWorkerCounter = 0; +const launchedWorkers = new Map(); + +const waitForAvailable = async worker => { + try { + for await (const [{type}] of events.on(worker, 'message')) { + if (type === 'available') { + return; + } + } + } catch { + // Remain pending if errors occur. + await new Promise(() => {}); + } +}; + +function launchWorker({filename, initialData}) { + if (launchedWorkers.has(filename)) { + return launchedWorkers.get(filename); + } + + const id = `shared-worker/${++sharedWorkerCounter}`; + const worker = new Worker(LAUNCHER, { + // Ensure the worker crashes for unhandled rejections, rather than allowing undefined behavior. + execArgv: ['--unhandled-rejections=strict'], + workerData: { + filename, + id, + initialData + } + }); + const launched = { + statePromises: { + available: waitForAvailable(worker), + error: events.once(worker, 'error') + }, + exited: false, + worker + }; + + launchedWorkers.set(filename, launched); + worker.once('exit', () => { + launched.exited = true; + }); + + return launched; +} + +function observeWorkerProcess(fork, runStatus) { + fork.onConnectSharedWorker(async channel => { + const launched = launchWorker(channel); + + const handleChannelMessage = ({messageId, replyTo, data}) => { + launched.worker.postMessage({ + type: 'message', + testWorkerId: fork.forkId, + messageId, + replyTo, + data + }); + }; + + const handleWorkerMessage = async message => { + if (message.type === 'broadcast' || (message.type === 'message' && message.testWorkerId === fork.forkId)) { + const {messageId, replyTo, data} = message; + channel.forwardMessageToFork({messageId, replyTo, data}); + } + }; + + launched.statePromises.error.then(error => { // eslint-disable-line promise/prefer-await-to-then + runStatus.emitStateChange({type: 'shared-worker-error', err: serializeError('Shared worker error', false, error)}); + channel.signalError(); + }); + + await launched.statePromises.available; + if (launched.exited) { + return; + } + + launched.worker.postMessage({ + type: 'register-test-worker', + id: fork.forkId, + file: fork.file + }); + + fork.promise.finally(() => { + launched.worker.postMessage({ + type: 'deregister-test-worker', + id: fork.forkId + }); + + channel.off('message', handleChannelMessage); + launched.worker.off('message', handleWorkerMessage); + }); + + launched.worker.on('message', handleWorkerMessage); + channel.on('message', handleChannelMessage); + channel.signalReady(); + + // Attaching the listener has the side-effect of referencing the worker. + // Explicitly unreference it now. + launched.worker.unref(); + }); +} + +exports.observeWorkerProcess = observeWorkerProcess; diff --git a/lib/worker/ipc.js b/lib/worker/ipc.js index 2eb5bda861..f8dcecd173 100644 --- a/lib/worker/ipc.js +++ b/lib/worker/ipc.js @@ -1,5 +1,6 @@ 'use strict'; const Emittery = require('emittery'); +const {get: getOptions} = require('./options'); const emitter = new Emittery(); process.on('message', message => { @@ -17,6 +18,19 @@ process.on('message', message => { case 'pong': emitter.emit('pong'); break; + case 'shared-worker-message': + // Only emit the message in the next turn of the event loop, so that code + // has had a chance to process the previous message. + setImmediate(() => { + emitter.emit('shared-worker-message', message.ava); + }); + break; + case 'shared-worker-ready': + emitter.emit('shared-worker-ready', message.ava); + break; + case 'shared-worker-error': + emitter.emit('shared-worker-error', message.ava); + break; default: break; } @@ -33,15 +47,24 @@ function send(evt) { exports.send = send; +let refs = 1; +function ref() { + if (++refs === 1) { + process.channel.ref(); + } +} + function unref() { - process.channel.unref(); + if (refs > 0 && --refs === 0) { + process.channel.unref(); + } } exports.unref = unref; let pendingPings = Promise.resolve(); async function flush() { - process.channel.ref(); + ref(); const promise = pendingPings.then(async () => { // eslint-disable-line promise/prefer-await-to-then send({type: 'ping'}); await emitter.once('pong'); @@ -54,3 +77,124 @@ async function flush() { } exports.flush = flush; + +let channelCounter = 0; +let messageCounter = 0; + +function registerSharedWorker(filename, initialData) { + const channelId = `${getOptions().forkId}/channel/${++channelCounter}`; + + let forcedUnref = false; + let refs = 0; + const forceUnref = () => { + if (forcedUnref) { + return; + } + + forcedUnref = true; + if (refs > 0) { + unref(); + } + }; + + const refChannel = () => { + if (!forcedUnref && ++refs === 1) { + ref(); + } + }; + + const unrefChannel = () => { + if (!forcedUnref && refs > 0 && --refs === 0) { + unref(); + } + }; + + send({ + type: 'shared-worker-connect', + channelId, + filename, + initialData + }); + + let currentlyAvailable = false; + + let error = null; + const disposeError = emitter.on('shared-worker-error', evt => { + if (evt.channelId === channelId) { + error = new Error('The shared worker is no longer available'); + currentlyAvailable = false; + disposeError(); + forceUnref(); + } + }); + + refChannel(); + const ready = new Promise(resolve => { + const disposeReady = emitter.on('shared-worker-ready', evt => { + if (evt.channelId !== channelId) { + return; + } + + currentlyAvailable = error === null; + disposeReady(); + resolve(); + }); + }).finally(unrefChannel); + + return { + forceUnref, + ready, + channel: { + available: ready, + + get currentlyAvailable() { + return currentlyAvailable; + }, + + async * receive() { + if (error !== null) { + throw error; + } + + refChannel(); + try { + for await (const evt of emitter.events(['shared-worker-error', 'shared-worker-message'])) { + if (error !== null) { + throw error; + } + + if (evt.channelId === channelId) { + yield evt; + } + } + } finally { + unrefChannel(); + } + }, + + post(data, replyTo) { + if (error !== null) { + throw error; + } + + if (!currentlyAvailable) { + throw new Error('Shared worker is not yet available'); + } + + const messageId = `${channelId}/message/${++messageCounter}`; + send({ + type: 'shared-worker-message', + channelId, + messageId, + replyTo, + data + }); + + return messageId; + } + } + }; +} + +exports.registerSharedWorker = registerSharedWorker; + diff --git a/lib/worker/plugin.js b/lib/worker/plugin.js new file mode 100644 index 0000000000..75a57fce0b --- /dev/null +++ b/lib/worker/plugin.js @@ -0,0 +1,120 @@ +const pkg = require('../../package.json'); +const subprocess = require('./subprocess'); +const options = require('./options'); + +const workers = new Map(); +const workerTeardownFns = new WeakMap(); + +function createSharedWorker(filename, initialData, teardown) { + const channel = subprocess.registerSharedWorker(filename, initialData, teardown); + + class ReceivedMessage { + constructor(id, data) { + this.id = id; + this.data = data; + } + + reply(data) { + return publishMessage(data, this.id); + } + } + + // Ensure that, no matter how often it's received, we have a stable message + // object. + const messageCache = new WeakMap(); + async function * receiveMessages(replyTo) { + for await (const evt of channel.receive()) { + if (replyTo === undefined && evt.replyTo !== undefined) { + continue; + } + + if (replyTo !== undefined && evt.replyTo !== replyTo) { + continue; + } + + let message = messageCache.get(evt); + if (message === undefined) { + message = new ReceivedMessage(evt.messageId, evt.data); + messageCache.set(evt, message); + } + + yield message; + } + } + + function publishMessage(data, replyTo) { + const id = channel.post(data, replyTo); + + return { + id, + async * replies() { + yield * receiveMessages(id); + } + }; + } + + return { + available: channel.available, + protocol: 'experimental', + + get currentlyAvailable() { + return channel.currentlyAvailable; + }, + + publish(data) { + return publishMessage(data); + }, + + async * subscribe() { + yield * receiveMessages(); + } + }; +} + +const supportsSharedWorkers = process.versions.node >= '12.17.0'; + +function registerSharedWorker({ + filename, + initialData, + supportedProtocols, + teardown +}) { + if (!options.get().experiments.sharedWorkers) { + throw new Error('Shared workers are experimental. Opt in to them in your AVA configuration'); + } + + if (!supportsSharedWorkers) { + throw new Error('Shared workers require Node.js 12.17 or newer'); + } + + if (!supportedProtocols.includes('experimental')) { + throw new Error(`This version of AVA (${pkg.version}) does not support any of desired shared worker protocols: ${supportedProtocols.join()}`); + } + + let worker = workers.get(filename); + if (worker === undefined) { + worker = createSharedWorker(filename, initialData, async () => { + // Run possibly asynchronous teardown functions serially, in reverse + // order. Any error will crash the worker. + const teardownFns = workerTeardownFns.get(worker); + if (teardownFns !== undefined) { + for await (const fn of [...teardownFns].reverse()) { + await fn(); + } + } + }); + workers.set(filename, worker); + } + + if (teardown !== undefined) { + if (workerTeardownFns.has(worker)) { + workerTeardownFns.get(worker).push(teardown); + } else { + workerTeardownFns.set(worker, [teardown]); + } + } + + return worker; +} + +exports.registerSharedWorker = registerSharedWorker; diff --git a/lib/worker/subprocess.js b/lib/worker/subprocess.js index 116861bca5..9ba5b63405 100644 --- a/lib/worker/subprocess.js +++ b/lib/worker/subprocess.js @@ -32,6 +32,8 @@ ipc.options.then(async options => { const dependencyTracking = require('./dependency-tracker'); const lineNumberSelection = require('./line-numbers'); + const sharedWorkerTeardowns = []; + async function exit(code) { if (!process.exitCode) { process.exitCode = code; @@ -89,7 +91,7 @@ ipc.options.then(async options => { exit(1); }); - runner.on('finish', () => { + runner.on('finish', async () => { try { const {cannotSave, touchedFiles} = runner.saveSnapshotState(); if (cannotSave) { @@ -103,6 +105,14 @@ ipc.options.then(async options => { return; } + try { + await Promise.all(sharedWorkerTeardowns.map(fn => fn())); + } catch (error) { + ipc.send({type: 'uncaught-exception', err: serializeError('Shared worker teardown error', false, error, runner.file)}); + exit(1); + return; + } + nowAndTimers.setImmediate(() => { currentlyUnhandled() .filter(rejection => !attributedRejections.has(rejection.promise)) @@ -129,6 +139,19 @@ ipc.options.then(async options => { return runner; }; + exports.registerSharedWorker = (filename, initialData, teardown) => { + const {channel, forceUnref, ready} = ipc.registerSharedWorker(filename, initialData); + runner.waitForReady.push(ready); + sharedWorkerTeardowns.push(async () => { + try { + await teardown(); + } finally { + forceUnref(); + } + }); + return channel; + }; + // Store value to prevent required modules from modifying it. const testPath = options.file; diff --git a/package-lock.json b/package-lock.json index f2eb12eb97..831fe9846b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4819,6 +4819,12 @@ "istanbul-lib-report": "^3.0.0" } }, + "it-first": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/it-first/-/it-first-1.0.2.tgz", + "integrity": "sha512-hU5ObR14987PR7l0J7dfWAgKYiWoKbXcoXKqhQDGgHSZML6UPmHSS9ILBGucZkoA2B152kEqEOllS4tVQq11fg==", + "dev": true + }, "jackspeak": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/jackspeak/-/jackspeak-1.4.0.tgz", diff --git a/package.json b/package.json index 2979aad93e..e814ad5568 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,8 @@ "lib", "*.js", "!*.config.js", - "index.d.ts" + "index.d.ts", + "*.d.ts" ], "keywords": [ "🦄", @@ -122,6 +123,7 @@ "esm": "^3.2.25", "execa": "^4.0.2", "get-stream": "^5.1.0", + "it-first": "^1.0.2", "p-event": "^4.2.0", "proxyquire": "^2.1.3", "react": "^16.13.1", diff --git a/plugin.d.ts b/plugin.d.ts new file mode 100644 index 0000000000..318296b419 --- /dev/null +++ b/plugin.d.ts @@ -0,0 +1,72 @@ +export namespace SharedWorker { + export const enum ProtocolIdentifier { + Experimental = 'experimental' + } + + export type TestWorker = { + readonly id: string; + readonly file: string; + defer: (fn: ReleaseFn) => ReleaseFn; + publish: (data: Data) => PublishedMessage; + subscribe: () => AsyncIterableIterator>; + }; + + export type Factory = (options: { + readonly negotiateProtocol: (supported: readonly ProtocolIdentifier[]) => Protocol; + }) => void; + + export type Protocol = { + readonly initialData: Data; + readonly protocol: ProtocolIdentifier.Experimental; + broadcast: (data: Data) => BroadcastMessage; + subscribe: () => AsyncIterableIterator>; + testWorkers: () => AsyncIterableIterator>; + }; + + export type BroadcastMessage = { + readonly id: string; + replies: () => AsyncIterableIterator>; + }; + + export type PublishedMessage = { + readonly id: string; + replies: () => AsyncIterableIterator>; + }; + + export type ReceivedMessage = { + readonly data: Data; + readonly id: string; + readonly testWorker: TestWorker; + reply: (data: Data) => PublishedMessage; + }; + + export namespace Plugin { + export type PublishedMessage = { + readonly id: string; + replies: () => AsyncIterableIterator>; + }; + + export type ReceivedMessage = { + readonly data: Data; + readonly id: string; + reply: (data: Data) => PublishedMessage; + }; + + export type Protocol = { + readonly available: Promise; + readonly currentlyAvailable: boolean; + readonly protocol: ProtocolIdentifier.Experimental; + publish: (data: Data) => PublishedMessage; + subscribe: () => AsyncIterableIterator>; + }; + } +} + +export type SharedWorkerRegistrationOptions = { + readonly filename: string; + readonly initialData?: Data; + readonly supportedProtocols: readonly SharedWorker.ProtocolIdentifier[]; + readonly teardown?: (protocol: SharedWorker.Plugin.Protocol) => void; +}; + +export function registerSharedWorker (options: SharedWorkerRegistrationOptions): SharedWorker.Plugin.Protocol; diff --git a/plugin.js b/plugin.js new file mode 100644 index 0000000000..f8f4c10493 --- /dev/null +++ b/plugin.js @@ -0,0 +1,9 @@ +'use strict'; +const path = require('path'); + +// Ensure the same AVA install is loaded by the test file as by the test worker +if (process.env.AVA_PATH && process.env.AVA_PATH !== __dirname) { + module.exports = require(path.join(process.env.AVA_PATH, 'plugin')); +} else { + module.exports = require('./lib/worker/plugin'); +} diff --git a/test/shared-workers/is-an-experiment/fixtures/package.json b/test/shared-workers/is-an-experiment/fixtures/package.json new file mode 100644 index 0000000000..0967ef424b --- /dev/null +++ b/test/shared-workers/is-an-experiment/fixtures/package.json @@ -0,0 +1 @@ +{} diff --git a/test/shared-workers/is-an-experiment/fixtures/test.js b/test/shared-workers/is-an-experiment/fixtures/test.js new file mode 100644 index 0000000000..9f6cd0dd53 --- /dev/null +++ b/test/shared-workers/is-an-experiment/fixtures/test.js @@ -0,0 +1,4 @@ +const plugin = require('ava/plugin'); +plugin.registerSharedWorker({ + supportedProtocols: ['experimental'] +}); diff --git a/test/shared-workers/is-an-experiment/snapshots/test.js.md b/test/shared-workers/is-an-experiment/snapshots/test.js.md new file mode 100644 index 0000000000..e264b32e4d --- /dev/null +++ b/test/shared-workers/is-an-experiment/snapshots/test.js.md @@ -0,0 +1,11 @@ +# Snapshot report for `test/shared-workers/is-an-experiment/test.js` + +The actual snapshot is saved in `test.js.snap`. + +Generated by [AVA](https://avajs.dev). + +## opt-in is required + +> Snapshot 1 + + 'Shared workers are experimental. Opt in to them in your AVA configuration' diff --git a/test/shared-workers/is-an-experiment/snapshots/test.js.snap b/test/shared-workers/is-an-experiment/snapshots/test.js.snap new file mode 100644 index 0000000000..265ada9490 Binary files /dev/null and b/test/shared-workers/is-an-experiment/snapshots/test.js.snap differ diff --git a/test/shared-workers/is-an-experiment/test.js b/test/shared-workers/is-an-experiment/test.js new file mode 100644 index 0000000000..9e66cb783b --- /dev/null +++ b/test/shared-workers/is-an-experiment/test.js @@ -0,0 +1,10 @@ +const test = require('@ava/test'); +const exec = require('../../helpers/exec'); + +test('opt-in is required', async t => { + const result = await t.throwsAsync(exec.fixture()); + t.is(result.exitCode, 1); + t.is(result.stats.uncaughtExceptions.length, 1); + t.snapshot(result.stats.uncaughtExceptions[0].message); +}); + diff --git a/test/shared-workers/proof-of-concept/fixtures/_plugin.js b/test/shared-workers/proof-of-concept/fixtures/_plugin.js new file mode 100644 index 0000000000..1ce2573269 --- /dev/null +++ b/test/shared-workers/proof-of-concept/fixtures/_plugin.js @@ -0,0 +1,26 @@ +const plugin = require('ava/plugin'); +const itFirst = require('it-first'); + +const worker = plugin.registerSharedWorker({ + filename: require.resolve('./_worker'), + supportedProtocols: ['experimental'] +}); + +exports.store = async value => { + const status = worker.publish({type: 'store', value}); + await itFirst(status.replies()); +}; + +exports.retrieve = async () => { + const status = worker.publish({type: 'retrieve'}); + const {data: value} = await itFirst(status.replies()); + return value; +}; + +exports.subscribe = async function * () { + for await (const {data} of worker.subscribe()) { + if (data.type === 'change') { + yield data.value; + } + } +}; diff --git a/test/shared-workers/proof-of-concept/fixtures/_worker.js b/test/shared-workers/proof-of-concept/fixtures/_worker.js new file mode 100644 index 0000000000..781e322262 --- /dev/null +++ b/test/shared-workers/proof-of-concept/fixtures/_worker.js @@ -0,0 +1,14 @@ +module.exports = async ({negotiateProtocol}) => { + const protocol = negotiateProtocol(['experimental']); + + let stored; + for await (const message of protocol.subscribe()) { + if (message.data.type === 'store') { + stored = message.data.value; + message.reply(); + protocol.broadcast({type: 'change', value: stored}); + } else if (message.data.type === 'retrieve') { + message.reply(stored); + } + } +}; diff --git a/test/shared-workers/proof-of-concept/fixtures/package.json b/test/shared-workers/proof-of-concept/fixtures/package.json new file mode 100644 index 0000000000..e677ba9479 --- /dev/null +++ b/test/shared-workers/proof-of-concept/fixtures/package.json @@ -0,0 +1,7 @@ +{ + "ava": { + "nonSemVerExperiments": { + "sharedWorkers": true + } + } +} diff --git a/test/shared-workers/proof-of-concept/fixtures/test.js b/test/shared-workers/proof-of-concept/fixtures/test.js new file mode 100644 index 0000000000..e72e44cfb7 --- /dev/null +++ b/test/shared-workers/proof-of-concept/fixtures/test.js @@ -0,0 +1,34 @@ +const crypto = require('crypto'); +const test = require('ava'); +const storeAndRetrieve = require('./_plugin'); + +const random = crypto.randomBytes(16); + +test.serial('store', async t => { + await t.notThrowsAsync(storeAndRetrieve.store(random)); +}); + +test.serial('retrieve', async t => { + t.deepEqual(Buffer.from(await storeAndRetrieve.retrieve()), random); +}); + +test.serial('subscribe', async t => { + await storeAndRetrieve.store(1); + + const changes = []; + for await (const change of storeAndRetrieve.subscribe()) { + changes.push(change); + if (change === 1) { + await storeAndRetrieve.store(2); + } + + if (change === 2) { + await storeAndRetrieve.store(3); + } + + if (change === 3) { + t.deepEqual(changes, [1, 2, 3]); + break; + } + } +}); diff --git a/test/shared-workers/proof-of-concept/snapshots/test.js.md b/test/shared-workers/proof-of-concept/snapshots/test.js.md new file mode 100644 index 0000000000..0bf18d3daf --- /dev/null +++ b/test/shared-workers/proof-of-concept/snapshots/test.js.md @@ -0,0 +1,24 @@ +# Snapshot report for `test/shared-workers/proof-of-concept/test.js` + +The actual snapshot is saved in `test.js.snap`. + +Generated by [AVA](https://avajs.dev). + +## shared worker plugins work + +> Snapshot 1 + + [ + { + file: 'test.js', + title: 'retrieve', + }, + { + file: 'test.js', + title: 'store', + }, + { + file: 'test.js', + title: 'subscribe', + }, + ] diff --git a/test/shared-workers/proof-of-concept/snapshots/test.js.snap b/test/shared-workers/proof-of-concept/snapshots/test.js.snap new file mode 100644 index 0000000000..254cb205e4 Binary files /dev/null and b/test/shared-workers/proof-of-concept/snapshots/test.js.snap differ diff --git a/test/shared-workers/proof-of-concept/test.js b/test/shared-workers/proof-of-concept/test.js new file mode 100644 index 0000000000..47b0fc7d38 --- /dev/null +++ b/test/shared-workers/proof-of-concept/test.js @@ -0,0 +1,7 @@ +const test = require('@ava/test'); +const exec = require('../../helpers/exec'); + +test('shared worker plugins work', async t => { + const result = await exec.fixture(); + t.snapshot(result.stats.passed); +}); diff --git a/test/shared-workers/requires-newish-node/fixtures/_worker.js b/test/shared-workers/requires-newish-node/fixtures/_worker.js new file mode 100644 index 0000000000..cc40a4649c --- /dev/null +++ b/test/shared-workers/requires-newish-node/fixtures/_worker.js @@ -0,0 +1 @@ +module.exports = () => {}; diff --git a/test/shared-workers/requires-newish-node/fixtures/package.json b/test/shared-workers/requires-newish-node/fixtures/package.json new file mode 100644 index 0000000000..e677ba9479 --- /dev/null +++ b/test/shared-workers/requires-newish-node/fixtures/package.json @@ -0,0 +1,7 @@ +{ + "ava": { + "nonSemVerExperiments": { + "sharedWorkers": true + } + } +} diff --git a/test/shared-workers/requires-newish-node/fixtures/test.js b/test/shared-workers/requires-newish-node/fixtures/test.js new file mode 100644 index 0000000000..a4859f3ee9 --- /dev/null +++ b/test/shared-workers/requires-newish-node/fixtures/test.js @@ -0,0 +1,10 @@ +const test = require('ava'); +const plugin = require('ava/plugin'); +const {available} = plugin.registerSharedWorker({ + supportedProtocols: ['experimental'], + filename: require.resolve('./_worker') +}); + +test('worker becomes available', async t => { + await t.notThrowsAsync(available); +}); diff --git a/test/shared-workers/requires-newish-node/test.js b/test/shared-workers/requires-newish-node/test.js new file mode 100644 index 0000000000..2663dbb773 --- /dev/null +++ b/test/shared-workers/requires-newish-node/test.js @@ -0,0 +1,17 @@ +const test = require('@ava/test'); +const exec = require('../../helpers/exec'); + +test('requires node.js >= 12.17', async t => { + const result = await exec.fixture().catch(error => error); + + t.log(result.stdout); + + if (process.versions.node >= '12.17.0') { + t.is(result.exitCode, 0); + } else { + t.is(result.exitCode, 1); + t.is(result.stats.uncaughtExceptions.length, 1); + // Don't snapshot since it can't easily be updated anyway. + t.is(result.stats.uncaughtExceptions[0].message, 'Shared workers require Node.js 12.17 or newer'); + } +}); diff --git a/xo.config.js b/xo.config.js index 75eb391c7e..6895f1b1f9 100644 --- a/xo.config.js +++ b/xo.config.js @@ -15,7 +15,7 @@ module.exports = { }, overrides: [ { - files: '*.d.ts', + files: 'index.d.ts', rules: { '@typescript-eslint/member-ordering': 'off', '@typescript-eslint/method-signature-style': 'off',