diff --git a/lib/api.js b/lib/api.js index 92a9b9162..7c2cc0fc2 100644 --- a/lib/api.js +++ b/lib/api.js @@ -17,6 +17,7 @@ const RunStatus = require('./run-status'); const fork = require('./fork'); const serializeError = require('./serialize-error'); const {getApplicableLineNumbers} = require('./line-numbers'); +const sharedWorkers = require('./shared-workers'); function resolveModules(modules) { return arrify(modules).map(name => { @@ -231,6 +232,7 @@ class Api extends Emittery { const worker = fork(file, options, apiOptions.nodeArguments); runStatus.observeWorker(worker, file, {selectingLines: lineNumbers.length > 0}); + sharedWorkers.observeWorkerProcess(worker, runStatus); pendingWorkers.add(worker); worker.promise.then(() => { diff --git a/lib/fork.js b/lib/fork.js index f393011e5..1414cef4b 100644 --- a/lib/fork.js +++ b/lib/fork.js @@ -11,10 +11,57 @@ if (fs.realpathSync(__filename) !== __filename) { // In case the test file imports a different AVA install, // the presence of this variable allows it to require this one instead const AVA_PATH = path.resolve(__dirname, '..'); +const WORKER_PATH = require.resolve('./worker/subprocess'); + +class SharedWorkerChannel extends Emittery { + constructor({channelId, filename, initialData}, sendToFork) { + super(); + + this.id = channelId; + this.filename = filename; + this.initialData = initialData; + this.sendToFork = sendToFork; + } + + signalReady() { + this.sendToFork({ + type: 'shared-worker-ready', + channelId: this.id + }); + } + + signalError() { + this.sendToFork({ + type: 'shared-worker-error', + channelId: this.id + }); + } -const workerPath = require.resolve('./worker/subprocess'); + emitMessage({messageId, replyTo, data}) { + this.emit('message', { + messageId, + replyTo, + data + }); + } + + forwardMessageToFork({messageId, replyTo, data}) { + this.sendToFork({ + type: 'shared-worker-message', + channelId: this.id, + messageId, + replyTo, + data + }); + } +} + +let forkCounter = 0; module.exports = (file, options, execArgv = process.execArgv) => { + const forkId = `fork/${++forkCounter}`; + const sharedWorkerChannels = new Map(); + let finished = false; const emitter = new Emittery(); @@ -25,16 +72,18 @@ module.exports = (file, options, execArgv = process.execArgv) => { }; options = { - file, baseDir: process.cwd(), + file, + forkId, ...options }; - const subprocess = childProcess.fork(workerPath, options.workerArgv, { + const subprocess = childProcess.fork(WORKER_PATH, options.workerArgv, { cwd: options.projectDir, silent: true, env: {NODE_ENV: 'test', ...process.env, ...options.environmentVariables, AVA_PATH}, - execArgv + execArgv, + serialization: process.versions.node >= '12.16.0' ? 'advanced' : 'json' }); subprocess.stdout.on('data', chunk => { @@ -65,15 +114,25 @@ module.exports = (file, options, execArgv = process.execArgv) => { return; } - if (message.ava.type === 'ready-for-options') { - send({type: 'options', options}); - return; - } - - if (message.ava.type === 'ping') { - send({type: 'pong'}); - } else { - emitStateChange(message.ava); + switch (message.ava.type) { + case 'ready-for-options': + send({type: 'options', options}); + break; + case 'shared-worker-connect': { + const channel = new SharedWorkerChannel(message.ava, send); + sharedWorkerChannels.set(channel.id, channel); + emitter.emit('connectSharedWorker', channel); + break; + } + + case 'shared-worker-message': + sharedWorkerChannels.get(message.ava.channelId).emitMessage(message.ava); + break; + case 'ping': + send({type: 'pong'}); + break; + default: + emitStateChange(message.ava); } }); @@ -98,6 +157,10 @@ module.exports = (file, options, execArgv = process.execArgv) => { }); return { + file, + forkId, + promise, + exit() { forcedExit = true; subprocess.kill(); @@ -107,11 +170,12 @@ module.exports = (file, options, execArgv = process.execArgv) => { send({type: 'peer-failed'}); }, - onStateChange(listener) { - return emitter.on('stateChange', listener); + onConnectSharedWorker(listener) { + return emitter.on('connectSharedWorker', listener); }, - file, - promise + onStateChange(listener) { + return emitter.on('stateChange', listener); + } }; }; diff --git a/lib/reporters/mini.js b/lib/reporters/mini.js index 4e221140e..754a89359 100644 --- a/lib/reporters/mini.js +++ b/lib/reporters/mini.js @@ -117,6 +117,7 @@ class MiniReporter { this.prefixTitle = (testFile, title) => title; this.previousFailures = 0; this.removePreviousListener = null; + this.sharedWorkerErrors = []; this.stats = null; this.uncaughtExceptions = []; this.unhandledRejections = []; @@ -180,6 +181,9 @@ class MiniReporter { case 'selected-test': // Ignore break; + case 'shared-worker-error': + this.sharedWorkerErrors.push(evt); + break; case 'stats': this.stats = evt.stats; break; @@ -511,7 +515,7 @@ class MiniReporter { const shouldWriteFailFastDisclaimer = this.failFastEnabled && (this.stats.remainingTests > 0 || this.stats.files > this.stats.finishedWorkers); if (this.failures.length > 0) { - const writeTrailingLines = shouldWriteFailFastDisclaimer || this.internalErrors.length > 0 || this.uncaughtExceptions.length > 0 || this.unhandledRejections.length > 0; + const writeTrailingLines = shouldWriteFailFastDisclaimer || this.internalErrors.length > 0 || this.sharedWorkerErrors.length > 0 || this.uncaughtExceptions.length > 0 || this.unhandledRejections.length > 0; this.lineWriter.writeLine(); const last = this.failures[this.failures.length - 1]; @@ -527,7 +531,7 @@ class MiniReporter { if (this.internalErrors.length > 0) { const writeLeadingLine = this.failures.length === 0; - const writeTrailingLines = shouldWriteFailFastDisclaimer || this.uncaughtExceptions.length > 0 || this.unhandledRejections.length > 0; + const writeTrailingLines = shouldWriteFailFastDisclaimer || this.sharedWorkerErrors.length > 0 || this.uncaughtExceptions.length > 0 || this.unhandledRejections.length > 0; if (writeLeadingLine) { this.lineWriter.writeLine(); @@ -551,8 +555,30 @@ class MiniReporter { } } - if (this.uncaughtExceptions.length > 0) { + if (this.sharedWorkerErrors.length > 0) { const writeLeadingLine = this.failures.length === 0 && this.internalErrors.length === 0; + const writeTrailingLines = shouldWriteFailFastDisclaimer || this.uncaughtExceptions.length > 0 || this.unhandledRejections.length > 0; + + if (writeLeadingLine) { + this.lineWriter.writeLine(); + } + + const last = this.sharedWorkerErrors[this.sharedWorkerErrors.length - 1]; + for (const evt of this.sharedWorkerErrors) { + this.lineWriter.writeLine(colors.error(`${figures.cross} Error in shared worker`)); + + this.lineWriter.writeLine(colors.stack(evt.err.summary)); + this.lineWriter.writeLine(colors.errorStack(evt.err.stack)); + if (evt !== last || writeTrailingLines) { + this.lineWriter.writeLine(); + this.lineWriter.writeLine(); + this.lineWriter.writeLine(); + } + } + } + + if (this.uncaughtExceptions.length > 0) { + const writeLeadingLine = this.failures.length === 0 && this.internalErrors.length === 0 && this.sharedWorkerErrors.length === 0; const writeTrailingLines = shouldWriteFailFastDisclaimer || this.unhandledRejections.length > 0; if (writeLeadingLine) { @@ -573,7 +599,7 @@ class MiniReporter { } if (this.unhandledRejections.length > 0) { - const writeLeadingLine = this.failures.length === 0 && this.internalErrors.length === 0 && this.uncaughtExceptions.length === 0; + const writeLeadingLine = this.failures.length === 0 && this.internalErrors.length === 0 && this.sharedWorkerErrors.length === 0 && this.uncaughtExceptions.length === 0; const writeTrailingLines = shouldWriteFailFastDisclaimer; if (writeLeadingLine) { diff --git a/lib/reporters/verbose.js b/lib/reporters/verbose.js index 0d393a3a9..fa51f97b2 100644 --- a/lib/reporters/verbose.js +++ b/lib/reporters/verbose.js @@ -150,6 +150,14 @@ class VerboseReporter { this.lineWriter.writeLine(colors.todo(`- ${this.prefixTitle(evt.testFile, evt.title)}`)); } + break; + case 'shared-worker-error': + this.lineWriter.writeLine(colors.error(`${figures.cross} Error in shared worker`)); + + this.lineWriter.writeLine(colors.stack(evt.err.summary)); + this.lineWriter.writeLine(colors.errorStack(evt.err.stack)); + this.lineWriter.writeLine(); + this.lineWriter.writeLine(); break; case 'stats': this.stats = evt.stats; diff --git a/lib/runner.js b/lib/runner.js index 0aec70a1f..2e78103af 100644 --- a/lib/runner.js +++ b/lib/runner.js @@ -41,6 +41,7 @@ class Runner extends Emittery { serial: [], todo: [] }; + this.waitForReady = []; const uniqueTestTitles = new Set(); this.registerUniqueTitle = title => { @@ -435,6 +436,8 @@ class Runner extends Emittery { }); } + await Promise.all(this.waitForReady); + if (concurrentTests.length === 0 && serialTests.length === 0) { this.emit('finish'); // Don't run any hooks if there are no tests to run. diff --git a/lib/shared-worker-provider.js b/lib/shared-worker-provider.js new file mode 100644 index 000000000..3ee6a14c9 --- /dev/null +++ b/lib/shared-worker-provider.js @@ -0,0 +1,170 @@ +const {EventEmitter} = require('events'); +const {workerData, parentPort} = require('worker_threads'); // eslint-disable-line node/no-unsupported-features/node-builtins +const itFirst = require('it-first'); +const pDefer = require('p-defer'); +const pEvent = require('p-event'); +const pkg = require('../package.json'); + +// Map of active test workers, used in receiveMessages() to get a reference to +// the TestWorker instance. +const activeTestWorkers = new Map(); + +class TestWorker { + constructor(id, file, finished) { + this.id = id; + this.file = file; + this.finished = finished; + } + + publish(data) { + return publishMessage(this, data); + } + + subscribe() { + return receiveMessages(this); + } +} + +class ReceivedMessage { + constructor(testWorker, id, data) { + this.testWorker = testWorker; + this.id = id; + this.data = data; + } + + reply(data) { + return publishMessage(this.testWorker, data, this.id); + } +} + +// Ensure that, no matter how often it's received, we have a stable message +// object. +const messageCache = new WeakMap(); + +async function * receiveMessages(testWorker, replyTo) { + for await (const evt of pEvent.iterator(parentPort, 'message')) { + if (evt.type !== 'message') { + continue; + } + + if (testWorker !== undefined && evt.testWorkerId !== testWorker.id) { + continue; + } + + if (replyTo === undefined && evt.replyTo !== undefined) { + continue; + } + + if (replyTo !== undefined && evt.replyTo !== replyTo) { + continue; + } + + let message = messageCache.get(evt); + if (message === undefined) { + message = new ReceivedMessage(activeTestWorkers.get(evt.testWorkerId).instance, evt.messageId, evt.data); + messageCache.set(evt, message); + } + + yield message; + } +} + +let messageCounter = 0; +const messageIdPrefix = `${workerData.id}/message`; +const nextMessageId = () => `${messageIdPrefix}/${++messageCounter}`; + +function publishMessage(testWorker, data, replyTo) { + const id = nextMessageId(); + parentPort.postMessage({ + type: 'message', + messageId: id, + testWorkerId: testWorker.id, + data, + replyTo + }); + + return { + id, + + get reply() { + return itFirst(receiveMessages(testWorker, id)); + } + }; +} + +function broadcastMessage(data) { + const id = nextMessageId(); + parentPort.postMessage({ + type: 'broadcast', + messageId: id, + data + }); + + return { + id, + + get replies() { + return receiveMessages(undefined, id); + } + }; +} + +let fatal; +Promise.resolve(require(workerData.filename)({ + negotiateProtocol(supported) { + if (!supported.includes('experimental')) { + fatal = new Error(`This version of AVA (${pkg.version}) is not compatible with shared worker plugin at ${workerData.filename}`); + throw fatal; + } + + const events = new EventEmitter(); + const testWorkers = pEvent.iterator(events, 'testWorker'); + const produceTestWorker = instance => events.emit('testWorker', instance); + + parentPort.on('message', message => { + if (message.type === 'register-test-worker') { + const {id, file} = message; + const finished = pDefer(); + const instance = new TestWorker(id, file, finished.promise); + + activeTestWorkers.set(id, { + finish: finished.resolve, + instance + }); + + produceTestWorker(instance); + } + + if (message.type === 'deregister-test-worker') { + const {id} = message; + activeTestWorkers.get(id).finish(); + activeTestWorkers.delete(id); + } + }); + + return { + protocol: 'experimental', + testWorkers, + + broadcast(data) { + return broadcastMessage(data); + }, + + subscribe() { + return receiveMessages(); + } + }; + } +})).catch(error => { + if (fatal === undefined) { + process.nextTick(() => { + throw error; + }); + } +}); + +if (fatal !== undefined) { + throw fatal; +} + +parentPort.postMessage({type: 'available'}); diff --git a/lib/shared-workers.js b/lib/shared-workers.js new file mode 100644 index 000000000..8f92b6c55 --- /dev/null +++ b/lib/shared-workers.js @@ -0,0 +1,87 @@ +const {Worker} = require('worker_threads'); // eslint-disable-line node/no-unsupported-features/node-builtins +const pEvent = require('p-event'); +const serializeError = require('./serialize-error'); +const provider = require.resolve('./shared-worker-provider'); + +let sharedWorkerCounter = 0; +const launchedWorkers = new Map(); + +function launchWorker({filename, initialData}) { + if (launchedWorkers.has(filename)) { + return launchedWorkers.get(filename); + } + + const id = `shared-worker/${++sharedWorkerCounter}`; + const worker = new Worker(provider, { + workerData: { + filename, + id, + initialData + } + }); + const launched = { + statePromises: { + available: pEvent(worker, 'message', ({type}) => type === 'available', {rejectionEvents: []}), + error: pEvent(worker, 'error', {rejectionEvents: []}) + }, + exited: false, + worker + }; + + launchedWorkers.set(filename, launched); + worker.once('exit', () => { + launched.exited = true; + }); + + return launched; +} + +function observeWorkerProcess(fork, runStatus) { + fork.onConnectSharedWorker(async channel => { + const launched = launchWorker(channel); + + launched.statePromises.error.then(error => { // eslint-disable-line promise/prefer-await-to-then + runStatus.emitStateChange({type: 'shared-worker-error', err: serializeError('Shared worker error', false, error)}); + channel.signalError(); + }); + + await launched.statePromises.available; + if (launched.exited) { + return; + } + + launched.worker.postMessage({ + type: 'register-test-worker', + id: fork.forkId, + file: fork.file + }); + + fork.promise.finally(() => { + launched.worker.postMessage({ + type: 'deregister-test-worker', + id: fork.forkId + }); + }); + + launched.worker.on('message', async message => { + if (message.type === 'broadcast' || (message.type === 'message' && message.testWorkerId === fork.forkId)) { + const {messageId, replyTo, data} = message; + channel.forwardMessageToFork({messageId, replyTo, data}); + } + }); + + channel.on('message', ({messageId, replyTo, data}) => { + launched.worker.postMessage({ + type: 'message', + testWorkerId: fork.forkId, + messageId, + replyTo, + data + }); + }); + + channel.signalReady(); + }); +} + +exports.observeWorkerProcess = observeWorkerProcess; diff --git a/lib/worker/ipc.js b/lib/worker/ipc.js index 2eb5bda86..e513daf01 100644 --- a/lib/worker/ipc.js +++ b/lib/worker/ipc.js @@ -1,5 +1,6 @@ 'use strict'; const Emittery = require('emittery'); +const {get: getOptions} = require('./options'); const emitter = new Emittery(); process.on('message', message => { @@ -17,6 +18,15 @@ process.on('message', message => { case 'pong': emitter.emit('pong'); break; + case 'shared-worker-message': + emitter.emit('shared-worker-message', message.ava); + break; + case 'shared-worker-ready': + emitter.emit('shared-worker-ready', message.ava); + break; + case 'shared-worker-error': + emitter.emit('shared-worker-error', message.ava); + break; default: break; } @@ -33,15 +43,24 @@ function send(evt) { exports.send = send; +let refs = 1; +function ref() { + if (++refs === 1) { + process.channel.ref(); + } +} + function unref() { - process.channel.unref(); + if (refs > 0 && --refs === 0) { + process.channel.unref(); + } } exports.unref = unref; let pendingPings = Promise.resolve(); async function flush() { - process.channel.ref(); + ref(); const promise = pendingPings.then(async () => { // eslint-disable-line promise/prefer-await-to-then send({type: 'ping'}); await emitter.once('pong'); @@ -54,3 +73,133 @@ async function flush() { } exports.flush = flush; + +let channelCounter = 0; +let messageCounter = 0; + +function registerSharedWorker(filename, initialData) { + const channelId = `${getOptions().forkId}/channel/${++channelCounter}`; + + let forcedUnref = false; + let refs = 0; + const forceUnref = () => { + if (forcedUnref) { + return; + } + + forcedUnref = true; + if (refs > 0) { + unref(); + } + }; + + const refManagement = { + ref() { + if (!forcedUnref && ++refs === 1) { + ref(); + } + }, + + unref() { + if (!forcedUnref && refs > 0 && --refs === 0) { + unref(); + } + } + }; + + refManagement.ref(); + send({ + type: 'shared-worker-connect', + channelId, + filename, + initialData + }); + + let currentlyAvailable = false; + + let error = null; + const disposeError = emitter.on('shared-worker-error', evt => { + if (evt.channelId === channelId) { + error = new Error('The shared worker is no longer available'); + currentlyAvailable = false; + disposeError(); + forceUnref(); + } + }); + + const ready = new Promise(resolve => { + const disposeReady = emitter.on('shared-worker-ready', evt => { + if (evt.channelId !== channelId) { + return; + } + + currentlyAvailable = error === null; + refManagement.unref(); + + disposeReady(); + resolve(); + }); + }); + + return { + ready, + + forceUnref, + + channel: { + available: ready, + + get currentlyAvailable() { + return currentlyAvailable; + }, + + ...refManagement, + + async * receive() { + if (error !== null) { + throw error; + } + + for await (const evt of emitter.events('shared-worker-message')) { + if (evt.channelId !== channelId) { + continue; + } + + if (error !== null) { + throw error; + } + + yield evt; + + if (error !== null) { + throw error; + } + } + }, + + post(data, replyTo) { + if (error !== null) { + throw error; + } + + if (!currentlyAvailable) { + throw new Error('Shared worker is not yet available'); + } + + const messageId = `${channelId}/message/${++messageCounter}`; + send({ + type: 'shared-worker-message', + channelId, + messageId, + replyTo, + data + }); + + return messageId; + } + } + }; +} + +exports.registerSharedWorker = registerSharedWorker; + diff --git a/lib/worker/main.js b/lib/worker/main.js index 80f0497c7..84555ed12 100644 --- a/lib/worker/main.js +++ b/lib/worker/main.js @@ -1,5 +1,5 @@ 'use strict'; -const runner = require('./subprocess').getRunner(); +const runner = require('./subprocess').accessRunner(); const makeCjsExport = () => { function test(...args) { diff --git a/lib/worker/plugin.js b/lib/worker/plugin.js new file mode 100644 index 000000000..1ad864d4c --- /dev/null +++ b/lib/worker/plugin.js @@ -0,0 +1,114 @@ +const itFirst = require('it-first'); +const pkg = require('../../package.json'); +const subprocess = require('./subprocess'); + +const workers = new Map(); +const workerTeardownFns = new WeakMap(); + +function createSharedWorker(filename, initialData, teardown) { + const channel = subprocess.registerSharedWorker(filename, initialData, teardown); + + class ReceivedMessage { + constructor(id, data) { + this.id = id; + this.data = data; + } + + reply(data) { + return publishMessage(data, this.id); + } + } + + // Ensure that, no matter how often it's received, we have a stable message + // object. + const messageCache = new WeakMap(); + async function * receiveMessages(replyTo) { + for await (const evt of channel.receive()) { + if (replyTo === undefined && evt.replyTo !== undefined) { + continue; + } + + if (replyTo !== undefined && evt.replyTo !== replyTo) { + continue; + } + + let message = messageCache.get(evt); + if (message === undefined) { + message = new ReceivedMessage(evt.messageId, evt.data); + messageCache.set(evt, message); + } + + yield message; + } + } + + function publishMessage(data, replyTo) { + const id = channel.post(data, replyTo); + + return { + id, + + get reply() { + channel.ref(); + return itFirst(receiveMessages(id)).finally(() => { + channel.unref(); + }); + } + }; + } + + return { + protocol: 'experimental', + + ref: channel.ref, + unref: channel.unref, + + publish(data) { + return publishMessage(data); + }, + + async * subscribe() { + channel.ref(); + try { + for await (const message of receiveMessages()) { + yield message; + } + } finally { + channel.unref(); + } + } + }; +} + +function registerSharedWorker({ + filename, + initialData, + supportedProtocols, + teardown +}) { + if (!supportedProtocols.includes('experimental')) { + throw new Error(`This version of AVA (${pkg.version}) does not support any of desired shared worker protocols: ${supportedProtocols.join()}`); + } + + let worker = workers.get(filename); + if (worker === undefined) { + worker = createSharedWorker(filename, initialData, async () => { + if (workerTeardownFns.has(worker)) { + await Promise.all(workerTeardownFns.get(worker).map(fn => fn())); + } + }); + workers.set(filename, worker); + } + + if (teardown !== undefined) { + if (workerTeardownFns.has(worker)) { + workerTeardownFns.get(worker).push(teardown); + } else { + workerTeardownFns.set(worker, [teardown]); + } + } + + return worker; +} + +exports.registerSharedWorker = registerSharedWorker; diff --git a/lib/worker/subprocess.js b/lib/worker/subprocess.js index 2b3ae3bf6..f88777632 100644 --- a/lib/worker/subprocess.js +++ b/lib/worker/subprocess.js @@ -32,6 +32,8 @@ ipc.options.then(async options => { const dependencyTracking = require('./dependency-tracker'); const lineNumberSelection = require('./line-numbers'); + const sharedWorkerTeardowns = []; + async function exit(code) { if (!process.exitCode) { process.exitCode = code; @@ -89,7 +91,7 @@ ipc.options.then(async options => { exit(1); }); - runner.on('finish', () => { + runner.on('finish', async () => { try { const touchedFiles = runner.saveSnapshotState(); if (touchedFiles) { @@ -101,6 +103,14 @@ ipc.options.then(async options => { return; } + try { + await Promise.all(sharedWorkerTeardowns.map(fn => fn())); + } catch (error) { + ipc.send({type: 'uncaught-exception', err: serializeError('Shared worker teardown error', false, error, runner.file)}); + exit(1); + return; + } + nowAndTimers.setImmediate(() => { currentlyUnhandled() .filter(rejection => !attributedRejections.has(rejection.promise)) @@ -122,11 +132,24 @@ ipc.options.then(async options => { }); let accessedRunner = false; - exports.getRunner = () => { + exports.accessRunner = () => { accessedRunner = true; return runner; }; + exports.registerSharedWorker = (filename, initialData, teardown) => { + const {channel, forceUnref, ready} = ipc.registerSharedWorker(filename, initialData); + runner.waitForReady.push(ready); + sharedWorkerTeardowns.push(async () => { + try { + await teardown(); + } finally { + forceUnref(); + } + }); + return channel; + }; + // Store value to prevent required modules from modifying it. const testPath = options.file; diff --git a/package-lock.json b/package-lock.json index 9349b6678..8bfd6ebf9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5069,6 +5069,11 @@ "istanbul-lib-report": "^3.0.0" } }, + "it-first": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/it-first/-/it-first-1.0.2.tgz", + "integrity": "sha512-hU5ObR14987PR7l0J7dfWAgKYiWoKbXcoXKqhQDGgHSZML6UPmHSS9ILBGucZkoA2B152kEqEOllS4tVQq11fg==" + }, "jackspeak": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/jackspeak/-/jackspeak-1.4.0.tgz", @@ -5347,6 +5352,13 @@ "pify": "^4.0.1", "strip-bom": "^3.0.0", "type-fest": "^0.3.0" + }, + "dependencies": { + "type-fest": { + "version": "0.3.1", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.3.1.tgz", + "integrity": "sha512-cUGJnCdr4STbePCgqNFbpVNCepa+kAVohJs1sLhxzdH+gnEoOd8VhbYa7pD3zZYGiURWM2xzEII3fQcRizDkYQ==" + } } }, "locate-path": { @@ -5515,6 +5527,13 @@ "integrity": "sha512-bJzx6nMoP6PDLPBFmg7+xRKeFZvFboMrGlxmNj9ClvX53KrmvM5bXFXEWjbz4cz1AFn+jWJ9z/DJSz7hrs0w3w==", "requires": { "p-defer": "^1.0.0" + }, + "dependencies": { + "p-defer": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/p-defer/-/p-defer-1.0.0.tgz", + "integrity": "sha1-n26xgvbJqozXQwBKfU+WsZaw+ww=" + } } }, "map-cache": { @@ -6619,9 +6638,9 @@ "integrity": "sha512-s73XxOZ4zpt1edZYZzvhqFa6uvQc1vwUa0K0BdtIZgQMAJj9IbebH+JkgKZc9h+B05PKHLOTl4ajG1BmNrVZlw==" }, "p-defer": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/p-defer/-/p-defer-1.0.0.tgz", - "integrity": "sha1-n26xgvbJqozXQwBKfU+WsZaw+ww=" + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/p-defer/-/p-defer-3.0.0.tgz", + "integrity": "sha512-ugZxsxmtTln604yeYd29EGrNhazN2lywetzpKhfmQjW/VJmhpDmWbiX+h0zL8V91R0UXkhb3KtPmyq9PZw3aYw==" }, "p-event": { "version": "4.1.0", @@ -10429,11 +10448,6 @@ "integrity": "sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==", "dev": true }, - "type-fest": { - "version": "0.3.1", - "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.3.1.tgz", - "integrity": "sha512-cUGJnCdr4STbePCgqNFbpVNCepa+kAVohJs1sLhxzdH+gnEoOd8VhbYa7pD3zZYGiURWM2xzEII3fQcRizDkYQ==" - }, "typedarray-to-buffer": { "version": "3.1.5", "resolved": "https://registry.npmjs.org/typedarray-to-buffer/-/typedarray-to-buffer-3.1.5.tgz", diff --git a/package.json b/package.json index f06d25788..6c0279227 100644 --- a/package.json +++ b/package.json @@ -87,12 +87,14 @@ "is-error": "^2.2.2", "is-plain-object": "^3.0.0", "is-promise": "^4.0.0", + "it-first": "^1.0.2", "lodash": "^4.17.15", "matcher": "^3.0.0", "md5-hex": "^3.0.1", "mem": "^6.1.0", "ms": "^2.1.2", "ora": "^4.0.4", + "p-defer": "^3.0.0", "p-map": "^4.0.0", "picomatch": "^2.2.2", "pkg-conf": "^3.1.0", diff --git a/plugin.d.ts b/plugin.d.ts new file mode 100644 index 000000000..066964fea --- /dev/null +++ b/plugin.d.ts @@ -0,0 +1,72 @@ +export namespace SharedWorker { + export const enum ProtocolIdentifier { + Experimental = 'experimental' + } + + export type TestWorker = { + readonly file: string; + readonly finished: Promise; + publish: (data: Data) => PublishedMessage; + subscribe: () => AsyncIterableIterator>; + }; + + export type Factory = (options: { + readonly negotiateProtocol: (supported: readonly ProtocolIdentifier[]) => Protocol; + }) => void; + + export type Protocol = { + readonly protocol: ProtocolIdentifier.Experimental; + readonly testWorkers: AsyncIterableIterator>; + broadcast: (data: Data) => BroadcastMessage; + subscribe: () => AsyncIterableIterator>; + }; + + export type BroadcastMessage = { + readonly id: string; + readonly replies: AsyncIterableIterator>; + }; + + export type PublishedMessage = { + readonly id: string; + readonly reply: Promise>; + }; + + export type ReceivedMessage = { + readonly data: Data; + readonly id: string; + readonly testWorker: TestWorker; + reply: (data: Data) => PublishedMessage; + }; + + export namespace Plugin { + export type PublishedMessage = { + readonly id: string; + readonly reply: Promise>; + }; + + export type ReceivedMessage = { + readonly data: Data; + readonly id: string; + reply: (data: Data) => PublishedMessage; + }; + + export type Protocol = { + readonly currentlyAvailable: boolean; + readonly available: Promise; + readonly protocol: ProtocolIdentifier.Experimental; + publish: (data: Data) => PublishedMessage; + ref: () => void; + subscribe: () => AsyncIterableIterator>; + unref: () => void; + }; + } +} + +export type SharedWorkerRegistrationOptions = { + readonly filename: string; + readonly initialData?: Data; + readonly supportedProtocols: readonly SharedWorker.ProtocolIdentifier[]; + readonly teardown: (worker: SharedWorker) => void; +}; + +export function registerSharedWorker (options: SharedWorkerRegistrationOptions): SharedWorker.Plugin.Protocol; diff --git a/plugin.js b/plugin.js new file mode 100644 index 000000000..f8f4c1049 --- /dev/null +++ b/plugin.js @@ -0,0 +1,9 @@ +'use strict'; +const path = require('path'); + +// Ensure the same AVA install is loaded by the test file as by the test worker +if (process.env.AVA_PATH && process.env.AVA_PATH !== __dirname) { + module.exports = require(path.join(process.env.AVA_PATH, 'plugin')); +} else { + module.exports = require('./lib/worker/plugin'); +} diff --git a/test/shared-worker-plugins/fixtures/helper.js b/test/shared-worker-plugins/fixtures/helper.js new file mode 100644 index 000000000..0d4e71f0d --- /dev/null +++ b/test/shared-worker-plugins/fixtures/helper.js @@ -0,0 +1,25 @@ +const plugin = require('ava/plugin'); + +const worker = plugin.registerSharedWorker({ + filename: require.resolve('./worker'), + supportedProtocols: ['experimental'] +}); + +exports.store = async value => { + const status = worker.publish({type: 'store', value}); + await status.reply; +}; + +exports.retrieve = async () => { + const status = worker.publish({type: 'retrieve'}); + const {data: value} = await status.reply; + return value; +}; + +exports.subscribe = async function * () { + for await (const {data} of worker.subscribe()) { + if (data.type === 'change') { + yield data.value; + } + } +}; diff --git a/test/shared-worker-plugins/fixtures/package.json b/test/shared-worker-plugins/fixtures/package.json new file mode 100644 index 000000000..078d5e5bd --- /dev/null +++ b/test/shared-worker-plugins/fixtures/package.json @@ -0,0 +1,5 @@ +{ + "dependencies": { + "ava": "file:../../.." + } +} diff --git a/test/shared-worker-plugins/fixtures/test.js b/test/shared-worker-plugins/fixtures/test.js new file mode 100644 index 000000000..7de75d34d --- /dev/null +++ b/test/shared-worker-plugins/fixtures/test.js @@ -0,0 +1,34 @@ +const crypto = require('crypto'); +const test = require('ava'); +const storeAndRetrieve = require('./helper'); + +const random = crypto.randomBytes(16); + +test.serial('store', async t => { + await t.notThrowsAsync(storeAndRetrieve.store(random)); +}); + +test.serial('retrieve', async t => { + t.deepEqual(Buffer.from(await storeAndRetrieve.retrieve()), random); +}); + +test.serial('subscribe', async t => { + storeAndRetrieve.store(1); + + const changes = []; + for await (const change of storeAndRetrieve.subscribe()) { + changes.push(change); + if (change === 1) { + await storeAndRetrieve.store(2); + } + + if (change === 2) { + await storeAndRetrieve.store(3); + } + + if (change === 3) { + t.deepEqual(changes, [1, 2, 3]); + break; + } + } +}); diff --git a/test/shared-worker-plugins/fixtures/worker.js b/test/shared-worker-plugins/fixtures/worker.js new file mode 100644 index 000000000..781e32226 --- /dev/null +++ b/test/shared-worker-plugins/fixtures/worker.js @@ -0,0 +1,14 @@ +module.exports = async ({negotiateProtocol}) => { + const protocol = negotiateProtocol(['experimental']); + + let stored; + for await (const message of protocol.subscribe()) { + if (message.data.type === 'store') { + stored = message.data.value; + message.reply(); + protocol.broadcast({type: 'change', value: stored}); + } else if (message.data.type === 'retrieve') { + message.reply(stored); + } + } +}; diff --git a/test/shared-worker-plugins/snapshots/test.js.md b/test/shared-worker-plugins/snapshots/test.js.md new file mode 100644 index 000000000..b829b1d59 --- /dev/null +++ b/test/shared-worker-plugins/snapshots/test.js.md @@ -0,0 +1,22 @@ +# Snapshot report for `test/shared-worker-plugins/test.js` + +The actual snapshot is saved in `test.js.snap`. + +Generated by [AVA](https://avajs.dev). + +## shared worker plugins work + +> Snapshot 1 + + { + passed: [ + { + file: 'test.js', + title: 'retrieve', + }, + { + file: 'test.js', + title: 'store', + }, + ], + } diff --git a/test/shared-worker-plugins/snapshots/test.js.snap b/test/shared-worker-plugins/snapshots/test.js.snap new file mode 100644 index 000000000..f97dd5beb Binary files /dev/null and b/test/shared-worker-plugins/snapshots/test.js.snap differ diff --git a/test/shared-worker-plugins/test.js b/test/shared-worker-plugins/test.js new file mode 100644 index 000000000..57a14761e --- /dev/null +++ b/test/shared-worker-plugins/test.js @@ -0,0 +1,7 @@ +const test = require('@ava/test'); +const exec = require('../helpers/exec'); + +test('shared worker plugins work', async t => { + const result = await exec.fixture(); + t.snapshot(result.stats.passed); +}); diff --git a/xo.config.js b/xo.config.js index 5f53bb4fc..2aaaaa9e8 100644 --- a/xo.config.js +++ b/xo.config.js @@ -14,7 +14,7 @@ module.exports = { }, overrides: [ { - files: '*.d.ts', + files: 'index.d.ts', rules: { '@typescript-eslint/member-ordering': 'off', '@typescript-eslint/method-signature-style': 'off',