diff --git a/justfile b/justfile index 6fa62bf..1f8acc3 100644 --- a/justfile +++ b/justfile @@ -294,7 +294,7 @@ _build_browser_tests out='tests/browser' args='[]': build: prepare build_worker_node build_worker_browser build_browser build_node_esm build_node_cjs build_bun _build_browser_tests _build_node_tests _build_bun_tests npm pack --pack-destination dist/ -_test: +_test filter='.*': #!/bin/bash set -eou pipefail just serve 8124 false & @@ -305,16 +305,16 @@ _test: trap cleanup ERR sleep 0.1 - deno test -A src/mod.test.ts - node --no-warnings --test --experimental-global-webcrypto dist/tests/cjs/*.test.js - node --no-warnings --test --experimental-global-webcrypto dist/tests/esm/*.test.js - if &>/dev/null which bun; then bun run dist/tests/bun/*.test.js; fi - playwright test --browser all tests/playwright.test.js --trace retain-on-failure + if [[ "deno" =~ {{ filter }} ]]; then deno test -A src/mod.test.ts; fi + if [[ "node-cjs" =~ {{ filter }} ]]; then node --no-warnings --test --experimental-global-webcrypto dist/tests/cjs/*.test.js; fi + if [[ "node-esm" =~ {{ filter }} ]]; then node --no-warnings --test --experimental-global-webcrypto dist/tests/esm/*.test.js; fi + if [[ "bun" =~ {{ filter }} ]]; then if &>/dev/null which bun; then bun run dist/tests/bun/*.test.js; fi; fi + if [[ "browsers" =~ {{ filter }} ]]; then playwright test --browser all tests/playwright.test.js --trace retain-on-failure; fi test: build && _test test-artifacts -bake: - while just _test; do true; done +bake filter='.*': + while just _test '{{ filter }}'; do true; done test-artifacts: #!/bin/bash diff --git a/src/background-plugin.ts b/src/background-plugin.ts index 524c7bd..2f403af 100644 --- a/src/background-plugin.ts +++ b/src/background-plugin.ts @@ -1,21 +1,11 @@ import { CallContext, RESET, IMPORT_STATE, EXPORT_STATE, STORE, GET_BLOCK } from './call-context.ts'; -import { PluginOutput, type InternalConfig } from './interfaces.ts'; +import { PluginOutput, SAB_BASE_OFFSET, SharedArrayBufferSection, type InternalConfig } from './interfaces.ts'; import { WORKER_URL } from './worker-url.ts'; import { Worker } from 'node:worker_threads'; import { CAPABILITIES } from './polyfills/deno-capabilities.ts'; import { EXTISM_ENV } from './foreground-plugin.ts'; import { matches } from './polyfills/deno-minimatch.ts'; -const MAX_WAIT = 5000; - -enum SharedArrayBufferSection { - End = 0, - RetI64 = 1, - RetF64 = 2, - RetVoid = 3, - Block = 4, -} - // Firefox has not yet implemented Atomics.waitAsync, but we can polyfill // it using a worker as a one-off. // @@ -62,7 +52,7 @@ class BackgroundPlugin { this.opts = opts; this.#context = context; - this.hostFlag[0] = RingBufferWriter.SAB_BASE_OFFSET; + this.hostFlag[0] = SAB_BASE_OFFSET; this.worker.on('message', (ev) => this.#handleMessage(ev)); } @@ -192,16 +182,45 @@ class BackgroundPlugin { return data; } + async getExports(name?: string): Promise { + return await this.#invoke('getExports', name ?? '0'); + } + + async getImports(name?: string): Promise { + return await this.#invoke('getImports', name ?? '0'); + } + + async getInstance(): Promise { + throw new Error('todo'); + } + + async close(): Promise { + if (this.worker) { + this.worker.terminate(); + this.worker = null as any; + } + } + // guest -> host invoke() async #handleInvoke(ev: any) { const writer = new RingBufferWriter(this.sharedData); const namespace = this.opts.functions[ev.namespace]; const func = (namespace ?? {})[ev.func]; + // XXX(chrisdickinson): this is cürsëd code. Add a setTimeout because some platforms + // don't spin their event loops if the only pending item is a Promise generated by Atomics.waitAsync. + // + // - https://github.com/nodejs/node/pull/44409 + // - https://github.com/denoland/deno/issues/14786 + const timer = setInterval(() => {}, 0); try { if (!func) { throw Error(`Plugin error: host function "${ev.namespace}" "${ev.func}" does not exist`); } + // Fill the shared array buffer with an expected garbage value to make debugging + // errors more straightforward + new Uint8Array(this.sharedData).subarray(8).fill(0xfe); + this.#context[IMPORT_STATE](ev.state, true); const data = await func(this.#context, ...ev.args); @@ -272,27 +291,122 @@ class BackgroundPlugin { const [, reject] = this.#request as any[]; this.#request = null; return reject(err); + } finally { + clearInterval(timer); } } +} - async getExports(name?: string): Promise { - return await this.#invoke('getExports', name ?? '0'); +// Return control to the waiting promise. Anecdotally, this appears to help +// with a race condition in Bun. +const MAX_WAIT = 500; +class RingBufferWriter { + output: SharedArrayBuffer; + scratch: ArrayBuffer; + scratchView: DataView; + outputOffset: number; + flag: Int32Array; + + static SAB_IDX = 0; + + constructor(output: SharedArrayBuffer) { + this.scratch = new ArrayBuffer(8); + this.scratchView = new DataView(this.scratch); + this.output = output; + this.outputOffset = SAB_BASE_OFFSET; + this.flag = new Int32Array(this.output); + this.wait(0); } - async getImports(name?: string): Promise { - return await this.#invoke('getImports', name ?? '0'); + async wait(lastKnownValue: number) { + // if the flag == SAB_BASE_OFFSET, that means "we have ownership", every other value means "the thread has ownership" + let value = 0; + do { + value = Atomics.load(this.flag, 0); + if (value === lastKnownValue) { + const { value: result, async } = AtomicsWaitAsync(this.flag, 0, lastKnownValue, MAX_WAIT); + if (async) { + if ((await result) === 'timed-out') { + continue; + } + } + } + } while (value === lastKnownValue); } - async getInstance(): Promise { - throw new Error('todo'); + signal() { + let old = Atomics.load(this.flag, 0); + while (Atomics.compareExchange(this.flag, 0, old, this.outputOffset) === old) { + } + Atomics.notify(this.flag, 0, 1); } - async close(): Promise { - if (this.worker) { - this.worker.terminate(); - this.worker = null as any; + async flush() { + if (this.outputOffset === SAB_BASE_OFFSET) { + // no need to flush -- we haven't written anything! + return; + } + + const workerId = this.outputOffset; + this.signal(); + this.outputOffset = SAB_BASE_OFFSET; + await this.wait(workerId); + } + + async spanningWrite(input: Uint8Array) { + let inputOffset = 0; + let toWrite = this.output.byteLength - this.outputOffset; + let flushedWriteCount = + 1 + Math.floor((input.byteLength - toWrite) / (this.output.byteLength - SAB_BASE_OFFSET)); + const finalWrite = (input.byteLength - toWrite) % (this.output.byteLength - SAB_BASE_OFFSET); + + do { + new Uint8Array(this.output).set(input.subarray(inputOffset, inputOffset + toWrite), this.outputOffset); + + // increment the offset so we know we've written _something_ (and can bypass the "did we not write anything" check in `flush()`) + this.outputOffset += toWrite; + inputOffset += toWrite; + await this.flush(); + + // reset toWrite to the maximum available length. (So we may write 29 bytes the first time, but 4096 the next N times. + toWrite = this.output.byteLength - SAB_BASE_OFFSET; + --flushedWriteCount; + } while (flushedWriteCount != 0); + + if (finalWrite) { + this.write(input.subarray(inputOffset, inputOffset + finalWrite)); } } + + write(bytes: ArrayBufferLike): void | Promise { + if (bytes.byteLength + this.outputOffset < this.output.byteLength) { + new Uint8Array(this.output).set(new Uint8Array(bytes), this.outputOffset); + this.outputOffset += bytes.byteLength; + return; + } + + return this.spanningWrite(new Uint8Array(bytes)); + } + + writeUint8(value: number): void | Promise { + this.scratchView.setUint8(0, value); + return this.write(this.scratch.slice(0, 1)); + } + + writeUint32(value: number): void | Promise { + this.scratchView.setUint32(0, value, true); + return this.write(this.scratch.slice(0, 4)); + } + + writeUint64(value: bigint): void | Promise { + this.scratchView.setBigUint64(0, value, true); + return this.write(this.scratch.slice(0, 8)); + } + + writeFloat64(value: number): void | Promise { + this.scratchView.setFloat64(0, value, true); + return this.write(this.scratch.slice(0, 8)); + } } class HttpContext { @@ -373,6 +487,8 @@ export async function createBackgroundPlugin( // and webkit do. const sharedData = new (SharedArrayBuffer as any)(opts.sharedArrayBufferSize); + new Uint8Array(sharedData).subarray(8).fill(0xfe); + const { fetch: _, logger: __, ...rest } = opts; const message = { ...rest, @@ -399,110 +515,3 @@ export async function createBackgroundPlugin( return new BackgroundPlugin(worker, sharedData, opts, context); } - -class RingBufferWriter { - output: SharedArrayBuffer; - scratch: ArrayBuffer; - scratchView: DataView; - outputOffset: number; - flag: Int32Array; - - static SAB_IDX = 0; - static SAB_BASE_OFFSET = 4; - - constructor(output: SharedArrayBuffer) { - this.scratch = new ArrayBuffer(8); - this.scratchView = new DataView(this.scratch); - this.output = output; - this.outputOffset = RingBufferWriter.SAB_BASE_OFFSET; - this.flag = new Int32Array(this.output); - } - - async flush() { - if (this.outputOffset === RingBufferWriter.SAB_BASE_OFFSET) { - // no need to flush -- we haven't written anything! - return; - } - - const targetOffset = this.outputOffset; - this.outputOffset = RingBufferWriter.SAB_BASE_OFFSET; - - while ( - Atomics.compareExchange(this.flag, RingBufferWriter.SAB_IDX, RingBufferWriter.SAB_BASE_OFFSET, targetOffset) !== - targetOffset - ) {} // eslint-disable-line no-empty - Atomics.notify(this.flag, RingBufferWriter.SAB_IDX, 1); - - // wait for the thread to read the data out... - const result = AtomicsWaitAsync(this.flag, RingBufferWriter.SAB_IDX, targetOffset, MAX_WAIT); - - // XXX(chrisdickinson): this is cürsëd code. Add a setTimeout because some platforms - // don't spin their event loops if the only pending item is a Promise generated by Atomics.waitAsync. - // - // - https://github.com/nodejs/node/pull/44409 - // - https://github.com/denoland/deno/issues/14786 - let timer; - try { - timer = setInterval(() => {}, 0); - if (result.async) { - result.value = (await result.value) as any; - } - } finally { - clearInterval(timer); - } - - if (result.value === 'timed-out') { - throw new Error(`encountered timeout while flushing host function to worker memory ${this.flag[0]}`); - } - } - - async spanningWrite(input: Uint8Array) { - let inputOffset = 0; - let toWrite = this.output.byteLength - this.outputOffset; - let flushedWriteCount = - 1 + Math.floor((input.byteLength - toWrite) / (this.output.byteLength - RingBufferWriter.SAB_BASE_OFFSET)); - const finalWrite = (input.byteLength - toWrite) % (this.output.byteLength - RingBufferWriter.SAB_BASE_OFFSET); - do { - new Uint8Array(this.output).set(input.subarray(inputOffset, inputOffset + toWrite), this.outputOffset); - this.outputOffset += toWrite; - inputOffset += toWrite; - await this.flush(); - toWrite = this.output.byteLength - RingBufferWriter.SAB_BASE_OFFSET; - --flushedWriteCount; - } while (flushedWriteCount != 0); - - if (finalWrite) { - this.write(input.subarray(inputOffset, inputOffset + finalWrite)); - } - } - - write(bytes: ArrayBufferLike): void | Promise { - if (bytes.byteLength + this.outputOffset < this.output.byteLength) { - new Uint8Array(this.output).set(new Uint8Array(bytes), this.outputOffset); - this.outputOffset += bytes.byteLength; - return; - } - - return this.spanningWrite(new Uint8Array(bytes)); - } - - writeUint8(value: number): void | Promise { - this.scratchView.setUint8(0, value); - return this.write(this.scratch.slice(0, 1)); - } - - writeUint32(value: number): void | Promise { - this.scratchView.setUint32(0, value, true); - return this.write(this.scratch.slice(0, 4)); - } - - writeUint64(value: bigint): void | Promise { - this.scratchView.setBigUint64(0, value, true); - return this.write(this.scratch.slice(0, 8)); - } - - writeFloat64(value: number): void | Promise { - this.scratchView.setFloat64(0, value, true); - return this.write(this.scratch.slice(0, 8)); - } -} diff --git a/src/interfaces.ts b/src/interfaces.ts index 88d70c4..1fc32c7 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -394,3 +394,13 @@ export interface Capabilities { */ extismStdoutEnvVarSet: boolean; } + +export const SAB_BASE_OFFSET = 4; + +export enum SharedArrayBufferSection { + End = 0xff, + RetI64 = 1, + RetF64 = 2, + RetVoid = 3, + Block = 4, +} diff --git a/src/mod.test.ts b/src/mod.test.ts index 32665f6..cbef140 100644 --- a/src/mod.test.ts +++ b/src/mod.test.ts @@ -363,15 +363,14 @@ if (typeof WebAssembly === 'undefined') { }); test('test writes that span multiple blocks (w/small buffer)', async () => { - const res = await fetch('http://localhost:8124/src/mod.test.ts'); - const result = await res.text(); + const value = '9:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ'.repeat(18428 / 34); const functions = { 'extism:host/user': { async hello_world(context: CallContext, _off: bigint) { context.setVariable('hmmm okay storing a variable', 'hello world hello.'); - const res = await fetch('http://localhost:8124/src/mod.test.ts'); - const result = await res.text(); - return context.store(result); + const result = new TextEncoder().encode(value); + const ret = context.store(result); + return ret; }, }, }; @@ -381,12 +380,15 @@ if (typeof WebAssembly === 'undefined') { { useWasi: true, functions, runInWorker: true, sharedArrayBufferSize: 1 << 6 }, ); + let i = 0; try { - const output = await plugin.call('count_vowels', 'hello world'); - assert.equal(output?.string(), result); + for (; i < 10; ++i) { + const output = await plugin.call('count_vowels', 'hello world'); + assert.equal(output?.string(), value); + } const again = await plugin.call('count_vowels', 'hello world'); - assert.equal(again?.string(), result); + assert.equal(again?.string(), value); } finally { await plugin.close(); } diff --git a/src/mod.ts b/src/mod.ts index fb5dc86..075cb38 100644 --- a/src/mod.ts +++ b/src/mod.ts @@ -21,6 +21,8 @@ export type { ManifestWasm, Manifest, Plugin, + PluginConfig, + PluginConfigLike, PluginOutput, } from './interfaces.ts'; diff --git a/src/worker.ts b/src/worker.ts index d4e28eb..2659524 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -2,10 +2,7 @@ import { parentPort } from 'node:worker_threads'; import { ForegroundPlugin, createForegroundPlugin as _createForegroundPlugin } from './foreground-plugin.ts'; import { CallContext, EXPORT_STATE, CallState, IMPORT_STATE } from './call-context.ts'; -import { type InternalConfig } from './interfaces.ts'; - -// TODO: make this configurable -const MAX_WAIT = 5000; +import { SharedArrayBufferSection, SAB_BASE_OFFSET, type InternalConfig } from './interfaces.ts'; class Reactor { hostFlag: Int32Array | null; @@ -173,8 +170,9 @@ class Reactor { if (!this.hostFlag) { throw new Error('attempted to call host before receiving shared array buffer'); } - const state = context[EXPORT_STATE](); + Atomics.store(this.hostFlag, 0, SAB_BASE_OFFSET); + const state = context[EXPORT_STATE](); this.port.postMessage({ type: 'invoke', namespace, @@ -191,30 +189,25 @@ class Reactor { const sectionType = reader.readUint8(); switch (sectionType) { // end - case 0: + case SharedArrayBufferSection.End: state.blocks = blocks; context[IMPORT_STATE](state); reader.close(); - this.hostFlag[0] = RingBufferReader.SAB_BASE_OFFSET; return retval; - // ret i64 - case 1: + case SharedArrayBufferSection.RetI64: retval = reader.readUint64(); break; - // ret f64 - case 2: + case SharedArrayBufferSection.RetF64: retval = reader.readFloat64(); break; - // ret void - case 3: + case SharedArrayBufferSection.RetVoid: retval = undefined; break; - // block - case 4: + case SharedArrayBufferSection.Block: { const index = reader.readUint32(); const len = reader.readUint32(); @@ -228,11 +221,13 @@ class Reactor { } break; + // a common invalid state: + // case 0: + // console.log({retval, input: reader.input, reader }) default: throw new Error( - `invalid section type="${sectionType}"; please open an issue (https://github.com/extism/js-sdk/issues/new?title=shared+array+buffer+bad+section+type+${sectionType}&labels=bug)`, + `invalid section type="${sectionType}" at position ${reader.position}; please open an issue (https://github.com/extism/js-sdk/issues/new?title=shared+array+buffer+bad+section+type+${sectionType}&labels=bug)`, ); - break; } } while (1); } @@ -240,61 +235,70 @@ class Reactor { new Reactor(parentPort); +// This controls how frequently we "release" control from the Atomic; anecdotally +// this appears to help with stalled wait() on Bun. +const MAX_WAIT = 500; + class RingBufferReader { input: SharedArrayBuffer; flag: Int32Array; inputOffset: number; scratch: ArrayBuffer; scratchView: DataView; - expected: number; + position: number; + #available: number; static SAB_IDX = 0; - static SAB_BASE_OFFSET = 4; constructor(input: SharedArrayBuffer) { this.input = input; - this.inputOffset = RingBufferReader.SAB_BASE_OFFSET; + this.inputOffset = SAB_BASE_OFFSET; this.flag = new Int32Array(this.input); this.scratch = new ArrayBuffer(8); this.scratchView = new DataView(this.scratch); - this.expected = 0; - this.pull(false); + this.position = 0; + this.#available = 0; + this.wait(); + } + + close() { + this.signal(); + Atomics.store(this.flag, 0, SAB_BASE_OFFSET); + } + + wait() { + let value = SAB_BASE_OFFSET; + do { + value = Atomics.load(this.flag, 0); + if (value === SAB_BASE_OFFSET) { + const result = Atomics.wait(this.flag, 0, SAB_BASE_OFFSET, MAX_WAIT); + if (result === 'timed-out') { + continue; + } + } + } while (value <= SAB_BASE_OFFSET); + + this.#available = Atomics.load(this.flag, 0); + + this.inputOffset = SAB_BASE_OFFSET; } get available() { - return this.flag[0] - this.inputOffset; + return this.#available - this.inputOffset; } - close() { - while ( - Atomics.compareExchange(this.flag, RingBufferReader.SAB_IDX, this.expected, RingBufferReader.SAB_BASE_OFFSET) !== - RingBufferReader.SAB_BASE_OFFSET - ) {} // eslint-disable-line no-empty - Atomics.notify(this.flag, RingBufferReader.SAB_IDX, MAX_WAIT); + signal() { + Atomics.store(this.flag, 0, SAB_BASE_OFFSET); + Atomics.notify(this.flag, 0, 1); } - pull(reset: boolean = true) { - if (reset) { - while ( - Atomics.compareExchange( - this.flag, - RingBufferReader.SAB_IDX, - this.expected, - RingBufferReader.SAB_BASE_OFFSET, - ) !== RingBufferReader.SAB_BASE_OFFSET - ) {} // eslint-disable-line no-empty - Atomics.notify(this.flag, RingBufferReader.SAB_IDX, MAX_WAIT); - } - // host now copies out, once it's done it writes the available bytes to the flag. - const v = Atomics.wait(this.flag, 0, RingBufferReader.SAB_BASE_OFFSET, MAX_WAIT); - this.expected = Atomics.load(this.flag, 0); - if (v === 'timed-out') { - throw new Error(`Worker timed out waiting for response from host after ${MAX_WAIT}ms ${this.flag[0]}`); - } - this.inputOffset = RingBufferReader.SAB_BASE_OFFSET; + pull() { + this.signal(); + this.wait(); } read(output: Uint8Array) { + this.position += output.byteLength; if (output.byteLength < this.available) { output.set(new Uint8Array(this.input).subarray(this.inputOffset, this.inputOffset + output.byteLength)); this.inputOffset += output.byteLength;