From 34492a4b007c02e697c6f7a18da8558211234815 Mon Sep 17 00:00:00 2001 From: Chris Dickinson Date: Thu, 11 Jan 2024 14:25:56 -0800 Subject: [PATCH] fix: address race condition when copying blocks between threads on node MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Oh my god, Atomics. The linked issue explains the behavior we're fixing in more detail. But the specific behavior we were running into (I _think_) goes a little something like this: Imagine we have two threads. ``` worker thread host thread | | | | o postMessage ---------> | - calls hostFunc, waits for completion x wait for flag to != 4 x - wait for flag to == 4; writes 1 | ↖︎_____________________o scratchbuffer's worth of info, sets flag to "N" bytes | x - wait for flag to == 4 | reads N bytes _____↗︎| | sets flag to 4 / | o _______________/ | x wait for flag to != 4 | - all done, set flag to N, wait for | ↖︎______________________o flag to == 4 again | __↗︎| | all done / | | set flag to 4 / | | return to wasm / | | / | o _______________/ | ↓ ↓ ``` We had a couple of problems: 1. In the first postMessage, we didn't wait for the flag to == 4 before writing data back. 2. We implemented waits as a single `Atomics.wait{,Async}` with a MAX_WAIT timeout. 3. We trusted the value that came out of `Atomics.load` directly after the `Atomics.wait`. The first problem was pretty straightforward to fix. This merely makes the two threads agree that the shared array buffer is in a certain state rather than relying on it implicitly being in the correct state. (Which is an assumption I slipped into: if the main thread is executing, what other value could the flag have? After all, we set the flag before we called `postMessage`! --this turns out to be a _class_ of bug.) The second two problems were more surprising: looking into other semaphore implementations I was surprised to see that they combined `wait` with a loop, and further ensured that the value that they loaded directly after the `wait` had actually changed. This was the proximate cause of the bug: we had a single wait, sure, but it was possible for the observed value loaded after the wait to not change. This meant skipping an entire flush of the buffer, which would permanently misalign the two threads. This has an interesting effect on performance: Bun, browsers, and Node appear to perform just as well as they did before, minus the errors we saw before. Deno, on the other hand, hits a hideous slowdown -- the test jumps from taking 3 seconds on other platforms to 18-20 seconds. I'm investigating what's going on there, but I'm surprised to see how different two V8-backed JS platforms perform in practice. I've left the `runInWorker` flag defaulted to "off" in the meantime while I dig into this. Fixes #46. --- justfile | 16 +-- src/background-plugin.ts | 267 ++++++++++++++++++++------------------- src/interfaces.ts | 10 ++ src/mod.test.ts | 18 +-- src/mod.ts | 2 + src/worker.ts | 100 ++++++++------- 6 files changed, 220 insertions(+), 193 deletions(-) diff --git a/justfile b/justfile index 6fa62bf..1f8acc3 100644 --- a/justfile +++ b/justfile @@ -294,7 +294,7 @@ _build_browser_tests out='tests/browser' args='[]': build: prepare build_worker_node build_worker_browser build_browser build_node_esm build_node_cjs build_bun _build_browser_tests _build_node_tests _build_bun_tests npm pack --pack-destination dist/ -_test: +_test filter='.*': #!/bin/bash set -eou pipefail just serve 8124 false & @@ -305,16 +305,16 @@ _test: trap cleanup ERR sleep 0.1 - deno test -A src/mod.test.ts - node --no-warnings --test --experimental-global-webcrypto dist/tests/cjs/*.test.js - node --no-warnings --test --experimental-global-webcrypto dist/tests/esm/*.test.js - if &>/dev/null which bun; then bun run dist/tests/bun/*.test.js; fi - playwright test --browser all tests/playwright.test.js --trace retain-on-failure + if [[ "deno" =~ {{ filter }} ]]; then deno test -A src/mod.test.ts; fi + if [[ "node-cjs" =~ {{ filter }} ]]; then node --no-warnings --test --experimental-global-webcrypto dist/tests/cjs/*.test.js; fi + if [[ "node-esm" =~ {{ filter }} ]]; then node --no-warnings --test --experimental-global-webcrypto dist/tests/esm/*.test.js; fi + if [[ "bun" =~ {{ filter }} ]]; then if &>/dev/null which bun; then bun run dist/tests/bun/*.test.js; fi; fi + if [[ "browsers" =~ {{ filter }} ]]; then playwright test --browser all tests/playwright.test.js --trace retain-on-failure; fi test: build && _test test-artifacts -bake: - while just _test; do true; done +bake filter='.*': + while just _test '{{ filter }}'; do true; done test-artifacts: #!/bin/bash diff --git a/src/background-plugin.ts b/src/background-plugin.ts index 524c7bd..2f403af 100644 --- a/src/background-plugin.ts +++ b/src/background-plugin.ts @@ -1,21 +1,11 @@ import { CallContext, RESET, IMPORT_STATE, EXPORT_STATE, STORE, GET_BLOCK } from './call-context.ts'; -import { PluginOutput, type InternalConfig } from './interfaces.ts'; +import { PluginOutput, SAB_BASE_OFFSET, SharedArrayBufferSection, type InternalConfig } from './interfaces.ts'; import { WORKER_URL } from './worker-url.ts'; import { Worker } from 'node:worker_threads'; import { CAPABILITIES } from './polyfills/deno-capabilities.ts'; import { EXTISM_ENV } from './foreground-plugin.ts'; import { matches } from './polyfills/deno-minimatch.ts'; -const MAX_WAIT = 5000; - -enum SharedArrayBufferSection { - End = 0, - RetI64 = 1, - RetF64 = 2, - RetVoid = 3, - Block = 4, -} - // Firefox has not yet implemented Atomics.waitAsync, but we can polyfill // it using a worker as a one-off. // @@ -62,7 +52,7 @@ class BackgroundPlugin { this.opts = opts; this.#context = context; - this.hostFlag[0] = RingBufferWriter.SAB_BASE_OFFSET; + this.hostFlag[0] = SAB_BASE_OFFSET; this.worker.on('message', (ev) => this.#handleMessage(ev)); } @@ -192,16 +182,45 @@ class BackgroundPlugin { return data; } + async getExports(name?: string): Promise { + return await this.#invoke('getExports', name ?? '0'); + } + + async getImports(name?: string): Promise { + return await this.#invoke('getImports', name ?? '0'); + } + + async getInstance(): Promise { + throw new Error('todo'); + } + + async close(): Promise { + if (this.worker) { + this.worker.terminate(); + this.worker = null as any; + } + } + // guest -> host invoke() async #handleInvoke(ev: any) { const writer = new RingBufferWriter(this.sharedData); const namespace = this.opts.functions[ev.namespace]; const func = (namespace ?? {})[ev.func]; + // XXX(chrisdickinson): this is cürsëd code. Add a setTimeout because some platforms + // don't spin their event loops if the only pending item is a Promise generated by Atomics.waitAsync. + // + // - https://github.com/nodejs/node/pull/44409 + // - https://github.com/denoland/deno/issues/14786 + const timer = setInterval(() => {}, 0); try { if (!func) { throw Error(`Plugin error: host function "${ev.namespace}" "${ev.func}" does not exist`); } + // Fill the shared array buffer with an expected garbage value to make debugging + // errors more straightforward + new Uint8Array(this.sharedData).subarray(8).fill(0xfe); + this.#context[IMPORT_STATE](ev.state, true); const data = await func(this.#context, ...ev.args); @@ -272,27 +291,122 @@ class BackgroundPlugin { const [, reject] = this.#request as any[]; this.#request = null; return reject(err); + } finally { + clearInterval(timer); } } +} - async getExports(name?: string): Promise { - return await this.#invoke('getExports', name ?? '0'); +// Return control to the waiting promise. Anecdotally, this appears to help +// with a race condition in Bun. +const MAX_WAIT = 500; +class RingBufferWriter { + output: SharedArrayBuffer; + scratch: ArrayBuffer; + scratchView: DataView; + outputOffset: number; + flag: Int32Array; + + static SAB_IDX = 0; + + constructor(output: SharedArrayBuffer) { + this.scratch = new ArrayBuffer(8); + this.scratchView = new DataView(this.scratch); + this.output = output; + this.outputOffset = SAB_BASE_OFFSET; + this.flag = new Int32Array(this.output); + this.wait(0); } - async getImports(name?: string): Promise { - return await this.#invoke('getImports', name ?? '0'); + async wait(lastKnownValue: number) { + // if the flag == SAB_BASE_OFFSET, that means "we have ownership", every other value means "the thread has ownership" + let value = 0; + do { + value = Atomics.load(this.flag, 0); + if (value === lastKnownValue) { + const { value: result, async } = AtomicsWaitAsync(this.flag, 0, lastKnownValue, MAX_WAIT); + if (async) { + if ((await result) === 'timed-out') { + continue; + } + } + } + } while (value === lastKnownValue); } - async getInstance(): Promise { - throw new Error('todo'); + signal() { + let old = Atomics.load(this.flag, 0); + while (Atomics.compareExchange(this.flag, 0, old, this.outputOffset) === old) { + } + Atomics.notify(this.flag, 0, 1); } - async close(): Promise { - if (this.worker) { - this.worker.terminate(); - this.worker = null as any; + async flush() { + if (this.outputOffset === SAB_BASE_OFFSET) { + // no need to flush -- we haven't written anything! + return; + } + + const workerId = this.outputOffset; + this.signal(); + this.outputOffset = SAB_BASE_OFFSET; + await this.wait(workerId); + } + + async spanningWrite(input: Uint8Array) { + let inputOffset = 0; + let toWrite = this.output.byteLength - this.outputOffset; + let flushedWriteCount = + 1 + Math.floor((input.byteLength - toWrite) / (this.output.byteLength - SAB_BASE_OFFSET)); + const finalWrite = (input.byteLength - toWrite) % (this.output.byteLength - SAB_BASE_OFFSET); + + do { + new Uint8Array(this.output).set(input.subarray(inputOffset, inputOffset + toWrite), this.outputOffset); + + // increment the offset so we know we've written _something_ (and can bypass the "did we not write anything" check in `flush()`) + this.outputOffset += toWrite; + inputOffset += toWrite; + await this.flush(); + + // reset toWrite to the maximum available length. (So we may write 29 bytes the first time, but 4096 the next N times. + toWrite = this.output.byteLength - SAB_BASE_OFFSET; + --flushedWriteCount; + } while (flushedWriteCount != 0); + + if (finalWrite) { + this.write(input.subarray(inputOffset, inputOffset + finalWrite)); } } + + write(bytes: ArrayBufferLike): void | Promise { + if (bytes.byteLength + this.outputOffset < this.output.byteLength) { + new Uint8Array(this.output).set(new Uint8Array(bytes), this.outputOffset); + this.outputOffset += bytes.byteLength; + return; + } + + return this.spanningWrite(new Uint8Array(bytes)); + } + + writeUint8(value: number): void | Promise { + this.scratchView.setUint8(0, value); + return this.write(this.scratch.slice(0, 1)); + } + + writeUint32(value: number): void | Promise { + this.scratchView.setUint32(0, value, true); + return this.write(this.scratch.slice(0, 4)); + } + + writeUint64(value: bigint): void | Promise { + this.scratchView.setBigUint64(0, value, true); + return this.write(this.scratch.slice(0, 8)); + } + + writeFloat64(value: number): void | Promise { + this.scratchView.setFloat64(0, value, true); + return this.write(this.scratch.slice(0, 8)); + } } class HttpContext { @@ -373,6 +487,8 @@ export async function createBackgroundPlugin( // and webkit do. const sharedData = new (SharedArrayBuffer as any)(opts.sharedArrayBufferSize); + new Uint8Array(sharedData).subarray(8).fill(0xfe); + const { fetch: _, logger: __, ...rest } = opts; const message = { ...rest, @@ -399,110 +515,3 @@ export async function createBackgroundPlugin( return new BackgroundPlugin(worker, sharedData, opts, context); } - -class RingBufferWriter { - output: SharedArrayBuffer; - scratch: ArrayBuffer; - scratchView: DataView; - outputOffset: number; - flag: Int32Array; - - static SAB_IDX = 0; - static SAB_BASE_OFFSET = 4; - - constructor(output: SharedArrayBuffer) { - this.scratch = new ArrayBuffer(8); - this.scratchView = new DataView(this.scratch); - this.output = output; - this.outputOffset = RingBufferWriter.SAB_BASE_OFFSET; - this.flag = new Int32Array(this.output); - } - - async flush() { - if (this.outputOffset === RingBufferWriter.SAB_BASE_OFFSET) { - // no need to flush -- we haven't written anything! - return; - } - - const targetOffset = this.outputOffset; - this.outputOffset = RingBufferWriter.SAB_BASE_OFFSET; - - while ( - Atomics.compareExchange(this.flag, RingBufferWriter.SAB_IDX, RingBufferWriter.SAB_BASE_OFFSET, targetOffset) !== - targetOffset - ) {} // eslint-disable-line no-empty - Atomics.notify(this.flag, RingBufferWriter.SAB_IDX, 1); - - // wait for the thread to read the data out... - const result = AtomicsWaitAsync(this.flag, RingBufferWriter.SAB_IDX, targetOffset, MAX_WAIT); - - // XXX(chrisdickinson): this is cürsëd code. Add a setTimeout because some platforms - // don't spin their event loops if the only pending item is a Promise generated by Atomics.waitAsync. - // - // - https://github.com/nodejs/node/pull/44409 - // - https://github.com/denoland/deno/issues/14786 - let timer; - try { - timer = setInterval(() => {}, 0); - if (result.async) { - result.value = (await result.value) as any; - } - } finally { - clearInterval(timer); - } - - if (result.value === 'timed-out') { - throw new Error(`encountered timeout while flushing host function to worker memory ${this.flag[0]}`); - } - } - - async spanningWrite(input: Uint8Array) { - let inputOffset = 0; - let toWrite = this.output.byteLength - this.outputOffset; - let flushedWriteCount = - 1 + Math.floor((input.byteLength - toWrite) / (this.output.byteLength - RingBufferWriter.SAB_BASE_OFFSET)); - const finalWrite = (input.byteLength - toWrite) % (this.output.byteLength - RingBufferWriter.SAB_BASE_OFFSET); - do { - new Uint8Array(this.output).set(input.subarray(inputOffset, inputOffset + toWrite), this.outputOffset); - this.outputOffset += toWrite; - inputOffset += toWrite; - await this.flush(); - toWrite = this.output.byteLength - RingBufferWriter.SAB_BASE_OFFSET; - --flushedWriteCount; - } while (flushedWriteCount != 0); - - if (finalWrite) { - this.write(input.subarray(inputOffset, inputOffset + finalWrite)); - } - } - - write(bytes: ArrayBufferLike): void | Promise { - if (bytes.byteLength + this.outputOffset < this.output.byteLength) { - new Uint8Array(this.output).set(new Uint8Array(bytes), this.outputOffset); - this.outputOffset += bytes.byteLength; - return; - } - - return this.spanningWrite(new Uint8Array(bytes)); - } - - writeUint8(value: number): void | Promise { - this.scratchView.setUint8(0, value); - return this.write(this.scratch.slice(0, 1)); - } - - writeUint32(value: number): void | Promise { - this.scratchView.setUint32(0, value, true); - return this.write(this.scratch.slice(0, 4)); - } - - writeUint64(value: bigint): void | Promise { - this.scratchView.setBigUint64(0, value, true); - return this.write(this.scratch.slice(0, 8)); - } - - writeFloat64(value: number): void | Promise { - this.scratchView.setFloat64(0, value, true); - return this.write(this.scratch.slice(0, 8)); - } -} diff --git a/src/interfaces.ts b/src/interfaces.ts index 88d70c4..1fc32c7 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -394,3 +394,13 @@ export interface Capabilities { */ extismStdoutEnvVarSet: boolean; } + +export const SAB_BASE_OFFSET = 4; + +export enum SharedArrayBufferSection { + End = 0xff, + RetI64 = 1, + RetF64 = 2, + RetVoid = 3, + Block = 4, +} diff --git a/src/mod.test.ts b/src/mod.test.ts index 32665f6..cbef140 100644 --- a/src/mod.test.ts +++ b/src/mod.test.ts @@ -363,15 +363,14 @@ if (typeof WebAssembly === 'undefined') { }); test('test writes that span multiple blocks (w/small buffer)', async () => { - const res = await fetch('http://localhost:8124/src/mod.test.ts'); - const result = await res.text(); + const value = '9:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ'.repeat(18428 / 34); const functions = { 'extism:host/user': { async hello_world(context: CallContext, _off: bigint) { context.setVariable('hmmm okay storing a variable', 'hello world hello.'); - const res = await fetch('http://localhost:8124/src/mod.test.ts'); - const result = await res.text(); - return context.store(result); + const result = new TextEncoder().encode(value); + const ret = context.store(result); + return ret; }, }, }; @@ -381,12 +380,15 @@ if (typeof WebAssembly === 'undefined') { { useWasi: true, functions, runInWorker: true, sharedArrayBufferSize: 1 << 6 }, ); + let i = 0; try { - const output = await plugin.call('count_vowels', 'hello world'); - assert.equal(output?.string(), result); + for (; i < 10; ++i) { + const output = await plugin.call('count_vowels', 'hello world'); + assert.equal(output?.string(), value); + } const again = await plugin.call('count_vowels', 'hello world'); - assert.equal(again?.string(), result); + assert.equal(again?.string(), value); } finally { await plugin.close(); } diff --git a/src/mod.ts b/src/mod.ts index fb5dc86..075cb38 100644 --- a/src/mod.ts +++ b/src/mod.ts @@ -21,6 +21,8 @@ export type { ManifestWasm, Manifest, Plugin, + PluginConfig, + PluginConfigLike, PluginOutput, } from './interfaces.ts'; diff --git a/src/worker.ts b/src/worker.ts index d4e28eb..2659524 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -2,10 +2,7 @@ import { parentPort } from 'node:worker_threads'; import { ForegroundPlugin, createForegroundPlugin as _createForegroundPlugin } from './foreground-plugin.ts'; import { CallContext, EXPORT_STATE, CallState, IMPORT_STATE } from './call-context.ts'; -import { type InternalConfig } from './interfaces.ts'; - -// TODO: make this configurable -const MAX_WAIT = 5000; +import { SharedArrayBufferSection, SAB_BASE_OFFSET, type InternalConfig } from './interfaces.ts'; class Reactor { hostFlag: Int32Array | null; @@ -173,8 +170,9 @@ class Reactor { if (!this.hostFlag) { throw new Error('attempted to call host before receiving shared array buffer'); } - const state = context[EXPORT_STATE](); + Atomics.store(this.hostFlag, 0, SAB_BASE_OFFSET); + const state = context[EXPORT_STATE](); this.port.postMessage({ type: 'invoke', namespace, @@ -191,30 +189,25 @@ class Reactor { const sectionType = reader.readUint8(); switch (sectionType) { // end - case 0: + case SharedArrayBufferSection.End: state.blocks = blocks; context[IMPORT_STATE](state); reader.close(); - this.hostFlag[0] = RingBufferReader.SAB_BASE_OFFSET; return retval; - // ret i64 - case 1: + case SharedArrayBufferSection.RetI64: retval = reader.readUint64(); break; - // ret f64 - case 2: + case SharedArrayBufferSection.RetF64: retval = reader.readFloat64(); break; - // ret void - case 3: + case SharedArrayBufferSection.RetVoid: retval = undefined; break; - // block - case 4: + case SharedArrayBufferSection.Block: { const index = reader.readUint32(); const len = reader.readUint32(); @@ -228,11 +221,13 @@ class Reactor { } break; + // a common invalid state: + // case 0: + // console.log({retval, input: reader.input, reader }) default: throw new Error( - `invalid section type="${sectionType}"; please open an issue (https://github.com/extism/js-sdk/issues/new?title=shared+array+buffer+bad+section+type+${sectionType}&labels=bug)`, + `invalid section type="${sectionType}" at position ${reader.position}; please open an issue (https://github.com/extism/js-sdk/issues/new?title=shared+array+buffer+bad+section+type+${sectionType}&labels=bug)`, ); - break; } } while (1); } @@ -240,61 +235,70 @@ class Reactor { new Reactor(parentPort); +// This controls how frequently we "release" control from the Atomic; anecdotally +// this appears to help with stalled wait() on Bun. +const MAX_WAIT = 500; + class RingBufferReader { input: SharedArrayBuffer; flag: Int32Array; inputOffset: number; scratch: ArrayBuffer; scratchView: DataView; - expected: number; + position: number; + #available: number; static SAB_IDX = 0; - static SAB_BASE_OFFSET = 4; constructor(input: SharedArrayBuffer) { this.input = input; - this.inputOffset = RingBufferReader.SAB_BASE_OFFSET; + this.inputOffset = SAB_BASE_OFFSET; this.flag = new Int32Array(this.input); this.scratch = new ArrayBuffer(8); this.scratchView = new DataView(this.scratch); - this.expected = 0; - this.pull(false); + this.position = 0; + this.#available = 0; + this.wait(); + } + + close() { + this.signal(); + Atomics.store(this.flag, 0, SAB_BASE_OFFSET); + } + + wait() { + let value = SAB_BASE_OFFSET; + do { + value = Atomics.load(this.flag, 0); + if (value === SAB_BASE_OFFSET) { + const result = Atomics.wait(this.flag, 0, SAB_BASE_OFFSET, MAX_WAIT); + if (result === 'timed-out') { + continue; + } + } + } while (value <= SAB_BASE_OFFSET); + + this.#available = Atomics.load(this.flag, 0); + + this.inputOffset = SAB_BASE_OFFSET; } get available() { - return this.flag[0] - this.inputOffset; + return this.#available - this.inputOffset; } - close() { - while ( - Atomics.compareExchange(this.flag, RingBufferReader.SAB_IDX, this.expected, RingBufferReader.SAB_BASE_OFFSET) !== - RingBufferReader.SAB_BASE_OFFSET - ) {} // eslint-disable-line no-empty - Atomics.notify(this.flag, RingBufferReader.SAB_IDX, MAX_WAIT); + signal() { + Atomics.store(this.flag, 0, SAB_BASE_OFFSET); + Atomics.notify(this.flag, 0, 1); } - pull(reset: boolean = true) { - if (reset) { - while ( - Atomics.compareExchange( - this.flag, - RingBufferReader.SAB_IDX, - this.expected, - RingBufferReader.SAB_BASE_OFFSET, - ) !== RingBufferReader.SAB_BASE_OFFSET - ) {} // eslint-disable-line no-empty - Atomics.notify(this.flag, RingBufferReader.SAB_IDX, MAX_WAIT); - } - // host now copies out, once it's done it writes the available bytes to the flag. - const v = Atomics.wait(this.flag, 0, RingBufferReader.SAB_BASE_OFFSET, MAX_WAIT); - this.expected = Atomics.load(this.flag, 0); - if (v === 'timed-out') { - throw new Error(`Worker timed out waiting for response from host after ${MAX_WAIT}ms ${this.flag[0]}`); - } - this.inputOffset = RingBufferReader.SAB_BASE_OFFSET; + pull() { + this.signal(); + this.wait(); } read(output: Uint8Array) { + this.position += output.byteLength; if (output.byteLength < this.available) { output.set(new Uint8Array(this.input).subarray(this.inputOffset, this.inputOffset + output.byteLength)); this.inputOffset += output.byteLength;