Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: address race condition when copying blocks between threads on node #52

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ _build_browser_tests out='tests/browser' args='[]':
build: prepare build_worker_node build_worker_browser build_browser build_node_esm build_node_cjs build_bun _build_browser_tests _build_node_tests _build_bun_tests
npm pack --pack-destination dist/

_test:
_test filter='.*':
#!/bin/bash
set -eou pipefail
just serve 8124 false &
Expand All @@ -305,16 +305,16 @@ _test:
trap cleanup ERR

sleep 0.1
deno test -A src/mod.test.ts
node --no-warnings --test --experimental-global-webcrypto dist/tests/cjs/*.test.js
node --no-warnings --test --experimental-global-webcrypto dist/tests/esm/*.test.js
if &>/dev/null which bun; then bun run dist/tests/bun/*.test.js; fi
playwright test --browser all tests/playwright.test.js --trace retain-on-failure
if [[ "deno" =~ {{ filter }} ]]; then deno test -A src/mod.test.ts; fi
if [[ "node-cjs" =~ {{ filter }} ]]; then node --no-warnings --test --experimental-global-webcrypto dist/tests/cjs/*.test.js; fi
if [[ "node-esm" =~ {{ filter }} ]]; then node --no-warnings --test --experimental-global-webcrypto dist/tests/esm/*.test.js; fi
if [[ "bun" =~ {{ filter }} ]]; then if &>/dev/null which bun; then bun run dist/tests/bun/*.test.js; fi; fi
if [[ "browsers" =~ {{ filter }} ]]; then playwright test --browser all tests/playwright.test.js --trace retain-on-failure; fi

test: build && _test test-artifacts

bake:
while just _test; do true; done
bake filter='.*':
while just _test '{{ filter }}'; do true; done
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change made it possible to run the tests in a tighter loop, surfacing the bug more quickly.


test-artifacts:
#!/bin/bash
Expand Down
267 changes: 138 additions & 129 deletions src/background-plugin.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,11 @@
import { CallContext, RESET, IMPORT_STATE, EXPORT_STATE, STORE, GET_BLOCK } from './call-context.ts';
import { PluginOutput, type InternalConfig } from './interfaces.ts';
import { PluginOutput, SAB_BASE_OFFSET, SharedArrayBufferSection, type InternalConfig } from './interfaces.ts';
import { WORKER_URL } from './worker-url.ts';
import { Worker } from 'node:worker_threads';
import { CAPABILITIES } from './polyfills/deno-capabilities.ts';
import { EXTISM_ENV } from './foreground-plugin.ts';
import { matches } from './polyfills/deno-minimatch.ts';

const MAX_WAIT = 5000;

enum SharedArrayBufferSection {
End = 0,
RetI64 = 1,
RetF64 = 2,
RetVoid = 3,
Block = 4,
}

// Firefox has not yet implemented Atomics.waitAsync, but we can polyfill
// it using a worker as a one-off.
//
Expand Down Expand Up @@ -62,7 +52,7 @@ class BackgroundPlugin {
this.opts = opts;
this.#context = context;

this.hostFlag[0] = RingBufferWriter.SAB_BASE_OFFSET;
this.hostFlag[0] = SAB_BASE_OFFSET;

this.worker.on('message', (ev) => this.#handleMessage(ev));
}
Expand Down Expand Up @@ -192,16 +182,45 @@ class BackgroundPlugin {
return data;
}

async getExports(name?: string): Promise<WebAssembly.ModuleExportDescriptor[]> {
return await this.#invoke('getExports', name ?? '0');
}

async getImports(name?: string): Promise<WebAssembly.ModuleImportDescriptor[]> {
return await this.#invoke('getImports', name ?? '0');
}

async getInstance(): Promise<WebAssembly.Instance> {
throw new Error('todo');
}

async close(): Promise<void> {
if (this.worker) {
this.worker.terminate();
this.worker = null as any;
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These functions moved up to put the RingBufferWriter closer to #handleInvoke.


// guest -> host invoke()
async #handleInvoke(ev: any) {
const writer = new RingBufferWriter(this.sharedData);
const namespace = this.opts.functions[ev.namespace];
const func = (namespace ?? {})[ev.func];
// XXX(chrisdickinson): this is cürsëd code. Add a setTimeout because some platforms
// don't spin their event loops if the only pending item is a Promise generated by Atomics.waitAsync.
//
// - https://github.com/nodejs/node/pull/44409
// - https://github.com/denoland/deno/issues/14786
const timer = setInterval(() => {}, 0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code moved – the timer used to be created per AtomicsWaitAsync, now we create it once at the start of the invocation and run it until we're done with the invocation.

try {
if (!func) {
throw Error(`Plugin error: host function "${ev.namespace}" "${ev.func}" does not exist`);
}

// Fill the shared array buffer with an expected garbage value to make debugging
// errors more straightforward
new Uint8Array(this.sharedData).subarray(8).fill(0xfe);

this.#context[IMPORT_STATE](ev.state, true);

const data = await func(this.#context, ...ev.args);
Expand Down Expand Up @@ -272,27 +291,122 @@ class BackgroundPlugin {
const [, reject] = this.#request as any[];
this.#request = null;
return reject(err);
} finally {
clearInterval(timer);
}
}
}

async getExports(name?: string): Promise<WebAssembly.ModuleExportDescriptor[]> {
return await this.#invoke('getExports', name ?? '0');
// Return control to the waiting promise. Anecdotally, this appears to help
// with a race condition in Bun.
const MAX_WAIT = 500;
class RingBufferWriter {
output: SharedArrayBuffer;
scratch: ArrayBuffer;
scratchView: DataView;
outputOffset: number;
flag: Int32Array;

static SAB_IDX = 0;

constructor(output: SharedArrayBuffer) {
this.scratch = new ArrayBuffer(8);
this.scratchView = new DataView(this.scratch);
this.output = output;
this.outputOffset = SAB_BASE_OFFSET;
this.flag = new Int32Array(this.output);
this.wait(0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the "first bug" I talked about – now we .wait(0) before attempting to write anything.

}

async getImports(name?: string): Promise<WebAssembly.ModuleImportDescriptor[]> {
return await this.#invoke('getImports', name ?? '0');
async wait(lastKnownValue: number) {
// if the flag == SAB_BASE_OFFSET, that means "we have ownership", every other value means "the thread has ownership"
let value = 0;
do {
value = Atomics.load(this.flag, 0);
if (value === lastKnownValue) {
const { value: result, async } = AtomicsWaitAsync(this.flag, 0, lastKnownValue, MAX_WAIT);
if (async) {
if ((await result) === 'timed-out') {
continue;
}
}
}
} while (value === lastKnownValue);
}

async getInstance(): Promise<WebAssembly.Instance> {
throw new Error('todo');
signal() {
let old = Atomics.load(this.flag, 0);
while (Atomics.compareExchange(this.flag, 0, old, this.outputOffset) === old) {
}
Atomics.notify(this.flag, 0, 1);
}

async close(): Promise<void> {
if (this.worker) {
this.worker.terminate();
this.worker = null as any;
async flush() {
if (this.outputOffset === SAB_BASE_OFFSET) {
// no need to flush -- we haven't written anything!
return;
}

const workerId = this.outputOffset;
this.signal();
this.outputOffset = SAB_BASE_OFFSET;
await this.wait(workerId);
}

async spanningWrite(input: Uint8Array) {
let inputOffset = 0;
let toWrite = this.output.byteLength - this.outputOffset;
let flushedWriteCount =
1 + Math.floor((input.byteLength - toWrite) / (this.output.byteLength - SAB_BASE_OFFSET));
const finalWrite = (input.byteLength - toWrite) % (this.output.byteLength - SAB_BASE_OFFSET);

do {
new Uint8Array(this.output).set(input.subarray(inputOffset, inputOffset + toWrite), this.outputOffset);

// increment the offset so we know we've written _something_ (and can bypass the "did we not write anything" check in `flush()`)
this.outputOffset += toWrite;
inputOffset += toWrite;
await this.flush();

// reset toWrite to the maximum available length. (So we may write 29 bytes the first time, but 4096 the next N times.
toWrite = this.output.byteLength - SAB_BASE_OFFSET;
--flushedWriteCount;
} while (flushedWriteCount != 0);

if (finalWrite) {
this.write(input.subarray(inputOffset, inputOffset + finalWrite));
}
}

write(bytes: ArrayBufferLike): void | Promise<void> {
if (bytes.byteLength + this.outputOffset < this.output.byteLength) {
new Uint8Array(this.output).set(new Uint8Array(bytes), this.outputOffset);
this.outputOffset += bytes.byteLength;
return;
}

return this.spanningWrite(new Uint8Array(bytes));
}

writeUint8(value: number): void | Promise<void> {
this.scratchView.setUint8(0, value);
return this.write(this.scratch.slice(0, 1));
}

writeUint32(value: number): void | Promise<void> {
this.scratchView.setUint32(0, value, true);
return this.write(this.scratch.slice(0, 4));
}

writeUint64(value: bigint): void | Promise<void> {
this.scratchView.setBigUint64(0, value, true);
return this.write(this.scratch.slice(0, 8));
}

writeFloat64(value: number): void | Promise<void> {
this.scratchView.setFloat64(0, value, true);
return this.write(this.scratch.slice(0, 8));
}
}

class HttpContext {
Expand Down Expand Up @@ -373,6 +487,8 @@ export async function createBackgroundPlugin(
// and webkit do.
const sharedData = new (SharedArrayBuffer as any)(opts.sharedArrayBufferSize);

new Uint8Array(sharedData).subarray(8).fill(0xfe);

const { fetch: _, logger: __, ...rest } = opts;
const message = {
...rest,
Expand All @@ -399,110 +515,3 @@ export async function createBackgroundPlugin(

return new BackgroundPlugin(worker, sharedData, opts, context);
}

class RingBufferWriter {
output: SharedArrayBuffer;
scratch: ArrayBuffer;
scratchView: DataView;
outputOffset: number;
flag: Int32Array;

static SAB_IDX = 0;
static SAB_BASE_OFFSET = 4;

constructor(output: SharedArrayBuffer) {
this.scratch = new ArrayBuffer(8);
this.scratchView = new DataView(this.scratch);
this.output = output;
this.outputOffset = RingBufferWriter.SAB_BASE_OFFSET;
this.flag = new Int32Array(this.output);
}

async flush() {
if (this.outputOffset === RingBufferWriter.SAB_BASE_OFFSET) {
// no need to flush -- we haven't written anything!
return;
}

const targetOffset = this.outputOffset;
this.outputOffset = RingBufferWriter.SAB_BASE_OFFSET;

while (
Atomics.compareExchange(this.flag, RingBufferWriter.SAB_IDX, RingBufferWriter.SAB_BASE_OFFSET, targetOffset) !==
targetOffset
) {} // eslint-disable-line no-empty
Atomics.notify(this.flag, RingBufferWriter.SAB_IDX, 1);

// wait for the thread to read the data out...
const result = AtomicsWaitAsync(this.flag, RingBufferWriter.SAB_IDX, targetOffset, MAX_WAIT);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we only wait once in this version of the code.


// XXX(chrisdickinson): this is cürsëd code. Add a setTimeout because some platforms
// don't spin their event loops if the only pending item is a Promise generated by Atomics.waitAsync.
//
// - https://github.com/nodejs/node/pull/44409
// - https://github.com/denoland/deno/issues/14786
let timer;
try {
timer = setInterval(() => {}, 0);
if (result.async) {
result.value = (await result.value) as any;
}
} finally {
clearInterval(timer);
}

if (result.value === 'timed-out') {
throw new Error(`encountered timeout while flushing host function to worker memory ${this.flag[0]}`);
}
}

async spanningWrite(input: Uint8Array) {
let inputOffset = 0;
let toWrite = this.output.byteLength - this.outputOffset;
let flushedWriteCount =
1 + Math.floor((input.byteLength - toWrite) / (this.output.byteLength - RingBufferWriter.SAB_BASE_OFFSET));
const finalWrite = (input.byteLength - toWrite) % (this.output.byteLength - RingBufferWriter.SAB_BASE_OFFSET);
do {
new Uint8Array(this.output).set(input.subarray(inputOffset, inputOffset + toWrite), this.outputOffset);
this.outputOffset += toWrite;
inputOffset += toWrite;
await this.flush();
toWrite = this.output.byteLength - RingBufferWriter.SAB_BASE_OFFSET;
--flushedWriteCount;
} while (flushedWriteCount != 0);

if (finalWrite) {
this.write(input.subarray(inputOffset, inputOffset + finalWrite));
}
}

write(bytes: ArrayBufferLike): void | Promise<void> {
if (bytes.byteLength + this.outputOffset < this.output.byteLength) {
new Uint8Array(this.output).set(new Uint8Array(bytes), this.outputOffset);
this.outputOffset += bytes.byteLength;
return;
}

return this.spanningWrite(new Uint8Array(bytes));
}

writeUint8(value: number): void | Promise<void> {
this.scratchView.setUint8(0, value);
return this.write(this.scratch.slice(0, 1));
}

writeUint32(value: number): void | Promise<void> {
this.scratchView.setUint32(0, value, true);
return this.write(this.scratch.slice(0, 4));
}

writeUint64(value: bigint): void | Promise<void> {
this.scratchView.setBigUint64(0, value, true);
return this.write(this.scratch.slice(0, 8));
}

writeFloat64(value: number): void | Promise<void> {
this.scratchView.setFloat64(0, value, true);
return this.write(this.scratch.slice(0, 8));
}
}
10 changes: 10 additions & 0 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,13 @@ export interface Capabilities {
*/
extismStdoutEnvVarSet: boolean;
}

export const SAB_BASE_OFFSET = 4;

export enum SharedArrayBufferSection {
End = 0xff,
RetI64 = 1,
RetF64 = 2,
RetVoid = 3,
Block = 4,
}
Loading