Skip to content

Commit

Permalink
fix some race conditions in the worker (#590)
Browse files Browse the repository at this point in the history
  • Loading branch information
evanw committed Dec 12, 2020
1 parent 284062c commit b901055
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 73 deletions.
6 changes: 2 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@

In the future the output file name of a virtual module will likely be completely customizable with a plugin, so it will be possible to have different behavior for this if desired. But that isn't possible quite yet.

* Added an option to speed up the JavaScript `buildSync` and `transformSync` APIs ([#590](https://github.com/evanw/esbuild/issues/590))
* Speed up the JavaScript `buildSync` and `transformSync` APIs ([#590](https://github.com/evanw/esbuild/issues/590))

Previously the `buildSync` and `transformSync` API calls created a new child esbuild process on every call because communicating with a long-lived child process is asynchronous in node. However, there's a trick that can work around this limitation: esbuild can communicate with the long-lived child process from a child thread using node's [`worker_threads`](https://nodejs.org/api/worker_threads.html) module and block the main thread using JavaScript's new [Atomics API](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics/wait). This was a tip from [@cspotcode](https://github.com/cspotcode).

This approach has now been implemented. A quick benchmark shows that these synchronous API calls are now 1.5x to 15x faster than they used to be. The speedup depends on the size of the input (smaller inputs get a bigger speedup). The worker thread and child process should automatically be terminated when there are no more event handlers registered on the main thread, so there is no explicit `stop()` call like there is with a service object.

This change is experimental. You have to opt-in to this change by passing `workerThread: true` to `buildSync` or `transformSync` to get the accelerated version. Be aware that there may be issues with this approach on Windows. One of my Windows CI runs hung during JavaScript API tests, although I have been unable to reproduce it myself. I'm currently looking for feedback on this new synchronous API approach.
This approach has now been implemented. A quick benchmark shows that `transformSync` is now 1.5x to 15x faster than it used to be. The speedup depends on the size of the input (smaller inputs get a bigger speedup). The worker thread and child process should automatically be terminated when there are no more event handlers registered on the main thread, so there is no explicit `stop()` call like there is with a service object.

## 0.8.21

Expand Down
1 change: 0 additions & 1 deletion lib/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ function pushCommonFlags(flags: string[], options: CommonOptions, keys: OptionKe
let keepNames = getFlag(options, keys, 'keepNames', mustBeBoolean);
let banner = getFlag(options, keys, 'banner', mustBeString);
let footer = getFlag(options, keys, 'footer', mustBeString);
getFlag(options as any, keys, 'workerThread', mustBeBoolean);

if (target) {
if (Array.isArray(target)) flags.push(`--target=${Array.from(target).map(validateTarget).join(',')}`)
Expand Down
103 changes: 58 additions & 45 deletions lib/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export let transform: typeof types.transform = (input, options) => {

export let buildSync: typeof types.buildSync = (options: types.BuildOptions): any => {
// Try using a long-lived worker thread to avoid repeated start-up overhead
if (worker_threads && options.workerThread) {
if (worker_threads) {
if (!workerThreadService) workerThreadService = startWorkerThreadService(worker_threads);
return workerThreadService.buildSync(options);
}
Expand All @@ -117,7 +117,7 @@ export let buildSync: typeof types.buildSync = (options: types.BuildOptions): an

export let transformSync: typeof types.transformSync = (input, options) => {
// Try using a long-lived worker thread to avoid repeated start-up overhead
if (worker_threads && options && options.workerThread) {
if (worker_threads) {
if (!workerThreadService) workerThreadService = startWorkerThreadService(worker_threads);
return workerThreadService.transformSync(input, options);
}
Expand Down Expand Up @@ -248,9 +248,11 @@ let randomFileName = () => {
return path.join(os.tmpdir(), `esbuild-${crypto.randomBytes(32).toString('hex')}`);
};

interface WorkerData {
interface MainToWorkerMessage {
sharedBuffer: SharedArrayBuffer;
workerToMain: import('worker_threads').MessagePort;
id: number;
command: string;
args: any[];
}

interface WorkerThreadService {
Expand All @@ -261,13 +263,10 @@ interface WorkerThreadService {
let workerThreadService: WorkerThreadService | null = null;

let startWorkerThreadService = (worker_threads: typeof import('worker_threads')): WorkerThreadService => {
let { port1: mainToWorker, port2: workerToMain } = new worker_threads.MessageChannel();
let sharedBuffer = new SharedArrayBuffer(8);
let sharedBufferView = new Int32Array(sharedBuffer);
let workerData: WorkerData = { sharedBuffer, workerToMain };
let { port1: mainPort, port2: workerPort } = new worker_threads.MessageChannel();
let worker = new worker_threads.Worker(__filename, {
workerData,
transferList: [workerToMain],
workerData: workerPort,
transferList: [workerPort],
});
let nextID = 0;
let wasStopped = false;
Expand All @@ -293,18 +292,27 @@ let startWorkerThreadService = (worker_threads: typeof import('worker_threads'))
let runCallSync = (command: string, args: any[]): any => {
if (wasStopped) throw new Error('The service was stopped');
let id = nextID++;
worker.postMessage({ id, command, args });

// Time out after 10 minutes. This should be long enough that esbuild
// shouldn't exceed this time but short enough that CI should fail if
// something went wrong with waiting. I added this because I experienced
// a random hang on the Windows CI machine that was presumably caused
// by this.
let timeout = 10 * 60 * 1000;
let status = Atomics.wait(sharedBufferView, 0, 0, timeout);
if (status !== 'ok') throw new Error('Internal error: Atomics.wait() failed: ' + status);

let { message: { id: id2, resolve, reject, properties } } = worker_threads!.receiveMessageOnPort(mainToWorker)!;

// Make a fresh shared buffer for every request. That way we can't have a
// race where a notification from the previous call overlaps with this call.
let sharedBuffer = new SharedArrayBuffer(8);
let sharedBufferView = new Int32Array(sharedBuffer);

// Send the message to the worker. Note that the worker could potentially
// complete the request before this thread returns from this call.
let msg: MainToWorkerMessage = { sharedBuffer, id, command, args };
worker.postMessage(msg);

// If the value hasn't changed (i.e. the request hasn't been completed,
// wait until the worker thread notifies us that the request is complete).
//
// Otherwise, if the value has changed, the request has already been
// completed. Don't wait in that case because the notification may never
// arrive if it has already been sent.
let status = Atomics.wait(sharedBufferView, 0, 0);
if (status !== 'ok' && status !== 'not-equal') throw new Error('Internal error: Atomics.wait() failed: ' + status);

let { message: { id: id2, resolve, reject, properties } } = worker_threads!.receiveMessageOnPort(mainPort)!;
if (id !== id2) throw new Error(`Internal error: Expected id ${id} but got id ${id2}`);
if (reject) {
applyProperties(reject, properties);
Expand All @@ -331,8 +339,7 @@ let startWorkerThreadService = (worker_threads: typeof import('worker_threads'))
};

let startSyncServiceWorker = () => {
let { sharedBuffer, workerToMain } = worker_threads!.workerData as WorkerData;
let sharedBufferView = new Int32Array(sharedBuffer);
let workerPort: import('worker_threads').MessagePort = worker_threads!.workerData;
let parentPort = worker_threads!.parentPort!;
let servicePromise = startService();

Expand All @@ -349,29 +356,35 @@ let startSyncServiceWorker = () => {
return properties;
};

parentPort.on('message', ({ id, command, args }: { id: number, command: string, args: any[] }) => {
parentPort.on('message', (msg: MainToWorkerMessage) => {
servicePromise.then(async (service) => {
switch (command) {
case 'build':
try {
let resolve = await service.build(args[0])
workerToMain.postMessage({ id, resolve });
} catch (reject) {
workerToMain.postMessage({ id, reject, properties: extractProperties(reject) });
}
Atomics.notify(sharedBufferView, 0, Infinity);
break;

case 'transform':
try {
let resolve = await service.transform(args[0], args[1])
workerToMain.postMessage({ id, resolve });
} catch (reject) {
workerToMain.postMessage({ id, reject, properties: extractProperties(reject) });
}
Atomics.notify(sharedBufferView, 0, Infinity);
break;
let { sharedBuffer, id, command, args } = msg;
let sharedBufferView = new Int32Array(sharedBuffer);

try {
if (command === 'build') {
workerPort.postMessage({ id, resolve: await service.build(args[0]) });
} else if (command === 'transform') {
workerPort.postMessage({ id, resolve: await service.transform(args[0], args[1]) });
} else {
throw new Error(`Invalid command: ${command}`);
}
} catch (reject) {
workerPort.postMessage({ id, reject, properties: extractProperties(reject) });
}

// The message has already been posted by this point, so it should be
// safe to wake the main thread. The main thread should always get the
// message we sent above.

// First, change the shared value. That way if the main thread attempts
// to wait for us after this point, the wait will fail because the shared
// value has changed.
Atomics.add(sharedBufferView, 0, 1);

// Then, wake the main thread. This handles the case where the main
// thread was already waiting for us before the shared value was changed.
Atomics.notify(sharedBufferView, 0, Infinity);
});
});
};
Expand Down
6 changes: 0 additions & 6 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ interface CommonOptions {
color?: boolean;
logLevel?: LogLevel;
errorLimit?: number;

// This option only applies to "buildSync" and "transformSync". Enable it for
// extra speed if you plan on calling these functions repeatedly. It creates
// a long-lived worker thread that is reused across calls and which should be
// automatically cleaned up when the process exits.
workerThread?: boolean;
}

export interface BuildOptions extends CommonOptions {
Expand Down
38 changes: 21 additions & 17 deletions scripts/js-api-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -2113,28 +2113,35 @@ let transformTests = {
asyncGenClassExprFn: ({ service }) => futureSyntax(service, '(class { async* foo() {} })', 'es2017', 'es2018'),
}

let syncTests = workerThread => ({
async ['buildSync-' + workerThread]({ esbuild, testDir }) {
let syncTests = {
async buildSync({ esbuild, testDir }) {
const input = path.join(testDir, 'in.js')
const output = path.join(testDir, 'out.js')
await writeFileAsync(input, 'export default 123')
esbuild.buildSync({ workerThread, entryPoints: [input], bundle: true, outfile: output, format: 'cjs' })
esbuild.buildSync({ entryPoints: [input], bundle: true, outfile: output, format: 'cjs' })
const result = require(output)
assert.strictEqual(result.default, 123)
assert.strictEqual(result.__esModule, true)
},

async ['transformSync-' + workerThread]({ esbuild }) {
const { code } = esbuild.transformSync(`console.log(1+2)`, { workerThread })
async transformSync({ esbuild }) {
const { code } = esbuild.transformSync(`console.log(1+2)`, {})
assert.strictEqual(code, `console.log(1 + 2);\n`)
},

async ['buildSyncThrow-' + workerThread]({ esbuild, testDir }) {
async transformSync100x({ esbuild }) {
for (let i = 0; i < 100; i++) {
const { code } = esbuild.transformSync(`console.log(1+${i})`, {})
assert.strictEqual(code, `console.log(1 + ${i});\n`)
}
},

async buildSyncThrow({ esbuild, testDir }) {
const input = path.join(testDir, 'in.js')
try {
const output = path.join(testDir, 'out.js')
await writeFileAsync(input, '1+')
esbuild.buildSync({ workerThread, entryPoints: [input], bundle: true, outfile: output, format: 'cjs', logLevel: 'silent' })
esbuild.buildSync({ entryPoints: [input], bundle: true, outfile: output, format: 'cjs', logLevel: 'silent' })
const result = require(output)
assert.strictEqual(result.default, 123)
assert.strictEqual(result.__esModule, true)
Expand All @@ -2148,12 +2155,12 @@ ${path.relative(process.cwd(), input).replace(/\\/g, '/')}:1:2: error: Unexpecte
}
},

async ['buildSyncIncrementalThrow-' + workerThread]({ esbuild, testDir }) {
async buildSyncIncrementalThrow({ esbuild, testDir }) {
try {
const input = path.join(testDir, 'in.js')
const output = path.join(testDir, 'out.js')
await writeFileAsync(input, '1+')
esbuild.buildSync({ workerThread, entryPoints: [input], bundle: true, outfile: output, format: 'cjs', logLevel: 'silent', incremental: true })
esbuild.buildSync({ entryPoints: [input], bundle: true, outfile: output, format: 'cjs', logLevel: 'silent', incremental: true })
const result = require(output)
assert.strictEqual(result.default, 123)
assert.strictEqual(result.__esModule, true)
Expand All @@ -2166,9 +2173,9 @@ ${path.relative(process.cwd(), input).replace(/\\/g, '/')}:1:2: error: Unexpecte
}
},

async ['transformThrow-' + workerThread]({ service }) {
async transformThrow({ service }) {
try {
await service.transform(`1+`, { workerThread })
await service.transform(`1+`, {})
throw new Error('Expected an error to be thrown');
} catch (error) {
assert(error instanceof Error, 'Must be an Error object');
Expand All @@ -2177,7 +2184,7 @@ ${path.relative(process.cwd(), input).replace(/\\/g, '/')}:1:2: error: Unexpecte
assert.strictEqual(error.warnings.length, 0);
}
},
})
}

async function assertSourceMap(jsSourceMap, source) {
const map = await new SourceMapConsumer(jsSourceMap)
Expand All @@ -2191,7 +2198,7 @@ async function main() {
const esbuild = installForTests(rootTestDir)

// Time out these tests after 5 minutes. This exists to help debug test hangs in CI.
let minutes = 0.5
let minutes = 5
let timeout = setTimeout(() => {
console.error(`❌ js api tests timed out after ${minutes} minutes, exiting...`)
process.exit(1)
Expand All @@ -2217,10 +2224,7 @@ async function main() {
...Object.entries(buildTests),
...Object.entries(serveTests),
...Object.entries(transformTests),

// Run sync tests both without and with the worker thread
...Object.entries(syncTests(false)),
...Object.entries(syncTests(true)),
...Object.entries(syncTests),
]
const allTestsPassed = (await Promise.all(tests.map(runTest))).every(success => success)

Expand Down

0 comments on commit b901055

Please sign in to comment.