Skip to content

Commit

Permalink
fix: address race condition when copying blocks between threads on node
Browse files Browse the repository at this point in the history
Oh my god, Atomics.

The linked issue explains the behavior we're fixing in more detail. But
the specific behavior we were running into (I _think_) goes a little something
like this: Imagine we have two threads.

```
  worker thread              host thread
        |                        |
	|                        |
	o postMessage ---------> | - calls hostFunc, waits for completion
        x wait for flag to != 4  x - wait for flag to == 4; writes 1
        |  ↖︎_____________________o   scratchbuffer's worth of info, sets flag to "N" bytes
        |                        x - wait for flag to == 4
        | reads N bytes    _____↗︎|
	| sets flag to 4  /      |
	o _______________/       |
	x wait for flag to != 4  | - all done, set flag to N, wait for
	| ↖︎______________________o   flag to == 4 again
        |                     __↗︎|
        | all done           /   |
	| set flag to 4     /    |
	| return to wasm   /	 |
        |                 / 	 |
        o _______________/	 |
        ↓			 ↓

```

We had a couple of problems:

1. In the first postMessage, we didn't wait for the flag to == 4 before writing
   data back.
2. We implemented waits as a single `Atomics.wait{,Async}` with a MAX_WAIT
   timeout.
3. We trusted the value that came out of `Atomics.load` directly after the
   `Atomics.wait`.

The first problem was pretty straightforward to fix. This merely makes the two
threads agree that the shared array buffer is in a certain state rather than
relying on it implicitly being in the correct state. (Which is an assumption I
slipped into: if the main thread is executing, what other value could the flag
have? After all, we set the flag before we called `postMessage`! --this turns
out to be a _class_ of bug.)

The second two problems were more surprising: looking into other semaphore
implementations I was surprised to see that they combined `wait` with a loop,
and further ensured that the value that they loaded directly after the `wait`
had actually changed. This was the proximate cause of the bug: we had a single
wait, sure, but it was possible for the observed value loaded after the wait to
not change.  This meant skipping an entire flush of the buffer, which would
permanently misalign the two threads.

This has an interesting effect on performance: Bun, browsers, and Node appear
to perform just as well as they did before, minus the errors we saw before.
Deno, on the other hand, hits a hideous slowdown -- the test jumps from taking
3 seconds on other platforms to 18-20 seconds. I'm investigating what's going
on there, but I'm surprised to see how different two V8-backed JS platforms
perform in practice. I've left the `runInWorker` flag defaulted to "off" in the
meantime while I dig into this.

Fixes #46.
  • Loading branch information
chrisdickinson committed Jan 12, 2024
1 parent 702d794 commit d14493d
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 204 deletions.
16 changes: 8 additions & 8 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ _build_browser_tests out='tests/browser' args='[]':
build: prepare build_worker_node build_worker_browser build_browser build_node_esm build_node_cjs build_bun _build_browser_tests _build_node_tests _build_bun_tests
npm pack --pack-destination dist/

_test:
_test filter='.*':
#!/bin/bash
set -eou pipefail
just serve 8124 false &
Expand All @@ -305,16 +305,16 @@ _test:
trap cleanup ERR

sleep 0.1
deno test -A src/mod.test.ts
node --no-warnings --test --experimental-global-webcrypto dist/tests/cjs/*.test.js
node --no-warnings --test --experimental-global-webcrypto dist/tests/esm/*.test.js
if &>/dev/null which bun; then bun run dist/tests/bun/*.test.js; fi
playwright test --browser all tests/playwright.test.js --trace retain-on-failure
if [[ "deno" =~ {{ filter }} ]]; then deno test -A src/mod.test.ts; fi
if [[ "node-cjs" =~ {{ filter }} ]]; then node --no-warnings --test --experimental-global-webcrypto dist/tests/cjs/*.test.js; fi
if [[ "node-esm" =~ {{ filter }} ]]; then node --no-warnings --test --experimental-global-webcrypto dist/tests/esm/*.test.js; fi
if [[ "bun" =~ {{ filter }} ]]; then if &>/dev/null which bun; then bun run dist/tests/bun/*.test.js; fi; fi
if [[ "browsers" =~ {{ filter }} ]]; then playwright test --browser all tests/playwright.test.js --trace retain-on-failure; fi

test: build && _test test-artifacts

bake:
while just _test; do true; done
bake filter='.*':
while just _test '{{ filter }}'; do true; done

test-artifacts:
#!/bin/bash
Expand Down
270 changes: 141 additions & 129 deletions src/background-plugin.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,11 @@
import { CallContext, RESET, IMPORT_STATE, EXPORT_STATE, STORE, GET_BLOCK } from './call-context.ts';
import { PluginOutput, type InternalConfig } from './interfaces.ts';
import { PluginOutput, SAB_BASE_OFFSET, SharedArrayBufferSection, type InternalConfig } from './interfaces.ts';
import { WORKER_URL } from './worker-url.ts';
import { Worker } from 'node:worker_threads';
import { CAPABILITIES } from './polyfills/deno-capabilities.ts';
import { EXTISM_ENV } from './foreground-plugin.ts';
import { matches } from './polyfills/deno-minimatch.ts';

const MAX_WAIT = 5000;

enum SharedArrayBufferSection {
End = 0,
RetI64 = 1,
RetF64 = 2,
RetVoid = 3,
Block = 4,
}

// Firefox has not yet implemented Atomics.waitAsync, but we can polyfill
// it using a worker as a one-off.
//
Expand Down Expand Up @@ -62,7 +52,7 @@ class BackgroundPlugin {
this.opts = opts;
this.#context = context;

this.hostFlag[0] = RingBufferWriter.SAB_BASE_OFFSET;
this.hostFlag[0] = SAB_BASE_OFFSET;

this.worker.on('message', (ev) => this.#handleMessage(ev));
}
Expand Down Expand Up @@ -192,16 +182,45 @@ class BackgroundPlugin {
return data;
}

async getExports(name?: string): Promise<WebAssembly.ModuleExportDescriptor[]> {
return await this.#invoke('getExports', name ?? '0');
}

async getImports(name?: string): Promise<WebAssembly.ModuleImportDescriptor[]> {
return await this.#invoke('getImports', name ?? '0');
}

async getInstance(): Promise<WebAssembly.Instance> {
throw new Error('todo');
}

async close(): Promise<void> {
if (this.worker) {
this.worker.terminate();
this.worker = null as any;
}
}

// guest -> host invoke()
async #handleInvoke(ev: any) {
const writer = new RingBufferWriter(this.sharedData);
const namespace = this.opts.functions[ev.namespace];
const func = (namespace ?? {})[ev.func];
// XXX(chrisdickinson): this is cürsëd code. Add a setTimeout because some platforms
// don't spin their event loops if the only pending item is a Promise generated by Atomics.waitAsync.
//
// - https://github.com/nodejs/node/pull/44409
// - https://github.com/denoland/deno/issues/14786
const timer = setInterval(() => {}, 0);
try {
if (!func) {
throw Error(`Plugin error: host function "${ev.namespace}" "${ev.func}" does not exist`);
}

// Fill the shared array buffer with an expected garbage value to make debugging
// errors more straightforward
new Uint8Array(this.sharedData).subarray(8).fill(0xfe);

this.#context[IMPORT_STATE](ev.state, true);

const data = await func(this.#context, ...ev.args);
Expand Down Expand Up @@ -272,27 +291,125 @@ class BackgroundPlugin {
const [, reject] = this.#request as any[];
this.#request = null;
return reject(err);
} finally {
clearInterval(timer);
}
}
}

async getExports(name?: string): Promise<WebAssembly.ModuleExportDescriptor[]> {
return await this.#invoke('getExports', name ?? '0');
// Return control to the waiting promise. Anecdotally, this appears to help
// with a race condition in Bun.
const MAX_WAIT = 500;
class RingBufferWriter {
output: SharedArrayBuffer;
scratch: ArrayBuffer;
scratchView: DataView;
outputOffset: number;
flag: Int32Array;
written: number;

static SAB_IDX = 0;

constructor(output: SharedArrayBuffer) {
this.scratch = new ArrayBuffer(8);
this.scratchView = new DataView(this.scratch);
this.output = output;
this.outputOffset = SAB_BASE_OFFSET;
this.flag = new Int32Array(this.output);
this.written = 0;
this.wait(0);
}

async getImports(name?: string): Promise<WebAssembly.ModuleImportDescriptor[]> {
return await this.#invoke('getImports', name ?? '0');
async wait(WORKER_THREAD_ID: number) {
// if the flag == SAB_BASE_OFFSET, that means "we have ownership", every other value means "the thread has ownership"
let value = 0;
do {
value = Atomics.load(this.flag, 0);
if (value === WORKER_THREAD_ID) {
const { value: result, async } = AtomicsWaitAsync(this.flag, 0, WORKER_THREAD_ID, MAX_WAIT);
if (async) {
if ((await result) === 'timed-out') {
continue;
}
}
}
} while (value === WORKER_THREAD_ID);
}

async getInstance(): Promise<WebAssembly.Instance> {
throw new Error('todo');
signal() {
let old = Atomics.load(this.flag, 0);
while (Atomics.compareExchange(this.flag, 0, old, this.outputOffset) === old) {
}
Atomics.notify(this.flag, 0, 1);
}

async close(): Promise<void> {
if (this.worker) {
this.worker.terminate();
this.worker = null as any;
async flush() {
if (this.outputOffset === SAB_BASE_OFFSET) {
// no need to flush -- we haven't written anything!
return;
}

const workerId = this.outputOffset;
this.signal();
this.outputOffset = SAB_BASE_OFFSET;
await this.wait(workerId);
}

async spanningWrite(input: Uint8Array) {
let inputOffset = 0;
let toWrite = this.output.byteLength - this.outputOffset;
let flushedWriteCount =
1 + Math.floor((input.byteLength - toWrite) / (this.output.byteLength - SAB_BASE_OFFSET));
const finalWrite = (input.byteLength - toWrite) % (this.output.byteLength - SAB_BASE_OFFSET);

do {
new Uint8Array(this.output).set(input.subarray(inputOffset, inputOffset + toWrite), this.outputOffset);

// increment the offset so we know we've written _something_ (and can bypass the "did we not write anything" check in `flush()`)
this.outputOffset += toWrite;
inputOffset += toWrite;
await this.flush();

// reset toWrite to the maximum available length. (So we may write 29 bytes the first time, but 4096 the next N times.
toWrite = this.output.byteLength - SAB_BASE_OFFSET;
--flushedWriteCount;
} while (flushedWriteCount != 0);

if (finalWrite) {
this.write(input.subarray(inputOffset, inputOffset + finalWrite));
}
}

write(bytes: ArrayBufferLike): void | Promise<void> {
this.written += bytes.byteLength;
if (bytes.byteLength + this.outputOffset < this.output.byteLength) {
new Uint8Array(this.output).set(new Uint8Array(bytes), this.outputOffset);
this.outputOffset += bytes.byteLength;
return;
}

return this.spanningWrite(new Uint8Array(bytes));
}

writeUint8(value: number): void | Promise<void> {
this.scratchView.setUint8(0, value);
return this.write(this.scratch.slice(0, 1));
}

writeUint32(value: number): void | Promise<void> {
this.scratchView.setUint32(0, value, true);
return this.write(this.scratch.slice(0, 4));
}

writeUint64(value: bigint): void | Promise<void> {
this.scratchView.setBigUint64(0, value, true);
return this.write(this.scratch.slice(0, 8));
}

writeFloat64(value: number): void | Promise<void> {
this.scratchView.setFloat64(0, value, true);
return this.write(this.scratch.slice(0, 8));
}
}

class HttpContext {
Expand Down Expand Up @@ -373,6 +490,8 @@ export async function createBackgroundPlugin(
// and webkit do.
const sharedData = new (SharedArrayBuffer as any)(opts.sharedArrayBufferSize);

new Uint8Array(sharedData).subarray(8).fill(0xfe);

const { fetch: _, logger: __, ...rest } = opts;
const message = {
...rest,
Expand All @@ -399,110 +518,3 @@ export async function createBackgroundPlugin(

return new BackgroundPlugin(worker, sharedData, opts, context);
}

class RingBufferWriter {
output: SharedArrayBuffer;
scratch: ArrayBuffer;
scratchView: DataView;
outputOffset: number;
flag: Int32Array;

static SAB_IDX = 0;
static SAB_BASE_OFFSET = 4;

constructor(output: SharedArrayBuffer) {
this.scratch = new ArrayBuffer(8);
this.scratchView = new DataView(this.scratch);
this.output = output;
this.outputOffset = RingBufferWriter.SAB_BASE_OFFSET;
this.flag = new Int32Array(this.output);
}

async flush() {
if (this.outputOffset === RingBufferWriter.SAB_BASE_OFFSET) {
// no need to flush -- we haven't written anything!
return;
}

const targetOffset = this.outputOffset;
this.outputOffset = RingBufferWriter.SAB_BASE_OFFSET;

while (
Atomics.compareExchange(this.flag, RingBufferWriter.SAB_IDX, RingBufferWriter.SAB_BASE_OFFSET, targetOffset) !==
targetOffset
) {} // eslint-disable-line no-empty
Atomics.notify(this.flag, RingBufferWriter.SAB_IDX, 1);

// wait for the thread to read the data out...
const result = AtomicsWaitAsync(this.flag, RingBufferWriter.SAB_IDX, targetOffset, MAX_WAIT);

// XXX(chrisdickinson): this is cürsëd code. Add a setTimeout because some platforms
// don't spin their event loops if the only pending item is a Promise generated by Atomics.waitAsync.
//
// - https://github.com/nodejs/node/pull/44409
// - https://github.com/denoland/deno/issues/14786
let timer;
try {
timer = setInterval(() => {}, 0);
if (result.async) {
result.value = (await result.value) as any;
}
} finally {
clearInterval(timer);
}

if (result.value === 'timed-out') {
throw new Error(`encountered timeout while flushing host function to worker memory ${this.flag[0]}`);
}
}

async spanningWrite(input: Uint8Array) {
let inputOffset = 0;
let toWrite = this.output.byteLength - this.outputOffset;
let flushedWriteCount =
1 + Math.floor((input.byteLength - toWrite) / (this.output.byteLength - RingBufferWriter.SAB_BASE_OFFSET));
const finalWrite = (input.byteLength - toWrite) % (this.output.byteLength - RingBufferWriter.SAB_BASE_OFFSET);
do {
new Uint8Array(this.output).set(input.subarray(inputOffset, inputOffset + toWrite), this.outputOffset);
this.outputOffset += toWrite;
inputOffset += toWrite;
await this.flush();
toWrite = this.output.byteLength - RingBufferWriter.SAB_BASE_OFFSET;
--flushedWriteCount;
} while (flushedWriteCount != 0);

if (finalWrite) {
this.write(input.subarray(inputOffset, inputOffset + finalWrite));
}
}

write(bytes: ArrayBufferLike): void | Promise<void> {
if (bytes.byteLength + this.outputOffset < this.output.byteLength) {
new Uint8Array(this.output).set(new Uint8Array(bytes), this.outputOffset);
this.outputOffset += bytes.byteLength;
return;
}

return this.spanningWrite(new Uint8Array(bytes));
}

writeUint8(value: number): void | Promise<void> {
this.scratchView.setUint8(0, value);
return this.write(this.scratch.slice(0, 1));
}

writeUint32(value: number): void | Promise<void> {
this.scratchView.setUint32(0, value, true);
return this.write(this.scratch.slice(0, 4));
}

writeUint64(value: bigint): void | Promise<void> {
this.scratchView.setBigUint64(0, value, true);
return this.write(this.scratch.slice(0, 8));
}

writeFloat64(value: number): void | Promise<void> {
this.scratchView.setFloat64(0, value, true);
return this.write(this.scratch.slice(0, 8));
}
}
10 changes: 10 additions & 0 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,13 @@ export interface Capabilities {
*/
extismStdoutEnvVarSet: boolean;
}

export const SAB_BASE_OFFSET = 4;

export enum SharedArrayBufferSection {
End = 0xff,
RetI64 = 1,
RetF64 = 2,
RetVoid = 3,
Block = 4,
}
Loading

0 comments on commit d14493d

Please sign in to comment.