From 337e6051b71270bde7c1e1d38e19aa0e2fd9573f Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Wed, 5 Feb 2025 19:05:24 +0300 Subject: [PATCH] enhance(server): use native AbortController and AbortSignal (#2031) --- .changeset/metal-moles-care.md | 7 ++ packages/node-fetch/src/Request.ts | 13 +--- packages/node-fetch/src/WritableStream.ts | 52 ++----------- packages/node-fetch/src/fetchCurl.ts | 16 ++-- packages/node-fetch/src/fetchNodeHttp.ts | 6 +- packages/server/src/createServerAdapter.ts | 25 +++--- packages/server/src/utils.ts | 90 ++++++---------------- packages/server/src/uwebsockets.ts | 45 +++++++---- 8 files changed, 89 insertions(+), 165 deletions(-) create mode 100644 .changeset/metal-moles-care.md diff --git a/.changeset/metal-moles-care.md b/.changeset/metal-moles-care.md new file mode 100644 index 00000000000..2dc57ec80a5 --- /dev/null +++ b/.changeset/metal-moles-care.md @@ -0,0 +1,7 @@ +--- +'@whatwg-node/server': patch +'@whatwg-node/node-fetch': patch +--- + +- Use native AbortSignal and AbortController for Request.signal +- Remove custom AbortSignal implementation (ServerAdapterAbortSignal) \ No newline at end of file diff --git a/packages/node-fetch/src/Request.ts b/packages/node-fetch/src/Request.ts index f731a425bb7..b32b7ace689 100644 --- a/packages/node-fetch/src/Request.ts +++ b/packages/node-fetch/src/Request.ts @@ -73,7 +73,7 @@ export class PonyfillRequest extends PonyfillBody implements this.redirect = requestInit?.redirect || 'follow'; this.referrer = requestInit?.referrer || 'about:client'; this.referrerPolicy = requestInit?.referrerPolicy || 'no-referrer'; - this._signal = requestInit?.signal; + this.signal = requestInit?.signal || new AbortController().signal; this.headersSerializer = requestInit?.headersSerializer; this.duplex = requestInit?.duplex || 'half'; @@ -137,16 +137,7 @@ export class PonyfillRequest extends PonyfillBody implements agent: HTTPAgent | HTTPSAgent | false | undefined; - private _signal: AbortSignal | undefined | null; - - get signal() { - // Create a new signal only if needed - // Because the creation of signal is expensive - if (!this._signal) { - this._signal = new AbortController().signal; - } - return this._signal!; - } + signal: AbortSignal; clone(): PonyfillRequest { return this; diff --git a/packages/node-fetch/src/WritableStream.ts b/packages/node-fetch/src/WritableStream.ts index 10e809f12c4..c74f2798438 100644 --- a/packages/node-fetch/src/WritableStream.ts +++ b/packages/node-fetch/src/WritableStream.ts @@ -44,55 +44,15 @@ export class PonyfillWritableStream implements WritableStream { }, }); this.writable = writable; - let onabort: EventListener | null; - let reason: any; + const abortCtrl = new AbortController(); const controller: WritableStreamDefaultController = { - signal: { - any(signals) { - return AbortSignal.any([...signals]); - }, - get reason() { - return reason; - }, - get aborted() { - return writable.destroyed; - }, - addEventListener: (_event: string, eventListener: EventListener) => { - writable.once('error', eventListener); - writable.once('close', eventListener); - }, - removeEventListener: (_event: string, eventListener: EventListener) => { - writable.off('error', eventListener); - writable.off('close', eventListener); - }, - dispatchEvent: (_event: Event) => { - return false; - }, - get onabort() { - return onabort; - }, - set onabort(value) { - if (onabort) { - this.removeEventListener('abort', onabort); - } - onabort = value; - if (onabort) { - this.addEventListener('abort', onabort); - } - }, - throwIfAborted() { - if (writable.destroyed) { - throw reason; - } - }, - }, - error: e => { - this.writable.destroy(e); + signal: abortCtrl.signal, + error(e) { + writable.destroy(e); }, }; - this.writable.once('error', err => { - reason = err; - }); + writable.once('error', err => abortCtrl.abort(err)); + writable.once('close', () => abortCtrl.abort()); } else { this.writable = new Writable(); } diff --git a/packages/node-fetch/src/fetchCurl.ts b/packages/node-fetch/src/fetchCurl.ts index 22c602fb25c..2c19e1a2035 100644 --- a/packages/node-fetch/src/fetchCurl.ts +++ b/packages/node-fetch/src/fetchCurl.ts @@ -30,11 +30,7 @@ export function fetchCurl( curlHandle.enable(CurlFeature.StreamResponse); curlHandle.setStreamProgressCallback(function () { - return fetchRequest['_signal']?.aborted - ? process.env.DEBUG - ? CurlProgressFunc.Continue - : 1 - : 0; + return fetchRequest.signal.aborted ? (process.env.DEBUG ? CurlProgressFunc.Continue : 1) : 0; }); if (fetchRequest['bodyType'] === 'String') { @@ -92,8 +88,8 @@ export function fetchCurl( } } } - if (fetchRequest['_signal']) { - fetchRequest['_signal'].addEventListener('abort', onAbort, { once: true }); + if (fetchRequest.signal) { + fetchRequest.signal.addEventListener('abort', onAbort, { once: true }); } curlHandle.once('end', function endListener() { try { @@ -101,8 +97,8 @@ export function fetchCurl( } catch (e) { deferredPromise.reject(e); } - if (fetchRequest['_signal']) { - fetchRequest['_signal'].removeEventListener('abort', onAbort); + if (fetchRequest.signal) { + fetchRequest.signal.removeEventListener('abort', onAbort); } }); curlHandle.once('error', function errorListener(error: any) { @@ -127,7 +123,7 @@ export function fetchCurl( pipeline(stream, outputStream, { end: true, - signal: fetchRequest['_signal'] ?? undefined, + signal: fetchRequest.signal, }) .then(() => { if (!stream.destroyed) { diff --git a/packages/node-fetch/src/fetchNodeHttp.ts b/packages/node-fetch/src/fetchNodeHttp.ts index e1f8f057543..8b692a29065 100644 --- a/packages/node-fetch/src/fetchNodeHttp.ts +++ b/packages/node-fetch/src/fetchNodeHttp.ts @@ -47,14 +47,14 @@ export function fetchNodeHttp( nodeRequest = requestFn(fetchRequest.parsedUrl, { method: fetchRequest.method, headers: nodeHeaders, - signal: fetchRequest['_signal'] ?? undefined, + signal: fetchRequest.signal, agent: fetchRequest.agent, }); } else { nodeRequest = requestFn(fetchRequest.url, { method: fetchRequest.method, headers: nodeHeaders, - signal: fetchRequest['_signal'] ?? undefined, + signal: fetchRequest.signal, agent: fetchRequest.agent, }); } @@ -107,7 +107,7 @@ export function fetchNodeHttp( } } pipeline(nodeResponse, outputStream, { - signal: fetchRequest['_signal'] ?? undefined, + signal: fetchRequest.signal, end: true, }) .then(() => { diff --git a/packages/server/src/createServerAdapter.ts b/packages/server/src/createServerAdapter.ts index fef9f223600..23c50edbfe3 100644 --- a/packages/server/src/createServerAdapter.ts +++ b/packages/server/src/createServerAdapter.ts @@ -32,7 +32,6 @@ import { NodeResponse, normalizeNodeRequest, sendNodeResponse, - ServerAdapterRequestAbortSignal, } from './utils.js'; import { fakePromise, @@ -319,7 +318,7 @@ function createServerAdapter< ? completeAssign(defaultServerContext, ...ctx) : defaultServerContext; - const signal = new ServerAdapterRequestAbortSignal(); + const controller = new AbortController(); const originalResEnd = res.end.bind(res); let resEnded = false; res.end = function (data: any) { @@ -328,16 +327,16 @@ function createServerAdapter< }; const originalOnAborted = res.onAborted.bind(res); originalOnAborted(function () { - signal.sendAbort(); + controller.abort(); }); res.onAborted = function (cb: () => void) { - signal.addEventListener('abort', cb); + controller.signal.addEventListener('abort', cb, { once: true }); }; const request = getRequestFromUWSRequest({ req, res, fetchAPI, - signal, + controller, }); let response$: Response | Promise | undefined; try { @@ -349,8 +348,8 @@ function createServerAdapter< return response$ .catch((e: any) => handleErrorFromRequestHandler(e, fetchAPI.Response)) .then(response => { - if (!signal.aborted && !resEnded) { - return sendResponseToUwsOpts(res, response, signal, fetchAPI); + if (!controller.signal.aborted && !resEnded) { + return sendResponseToUwsOpts(res, response, controller, fetchAPI); } }) .catch(err => { @@ -360,8 +359,8 @@ function createServerAdapter< }); } try { - if (!signal.aborted && !resEnded) { - return sendResponseToUwsOpts(res, response$, signal, fetchAPI); + if (!controller.signal.aborted && !resEnded) { + return sendResponseToUwsOpts(res, response$, controller, fetchAPI); } } catch (err: any) { console.error( @@ -406,13 +405,17 @@ function createServerAdapter< if (isRequestInit(initOrCtx)) { const request = new fetchAPI.Request(input, initOrCtx); const res$ = handleRequestWithWaitUntil(request, ...restOfCtx); - return handleAbortSignalAndPromiseResponse(res$, (initOrCtx as RequestInit)?.signal); + const signal = (initOrCtx as RequestInit).signal; + if (signal) { + return handleAbortSignalAndPromiseResponse(res$, signal); + } + return res$; } const request = new fetchAPI.Request(input); return handleRequestWithWaitUntil(request, ...maybeCtx); } const res$ = handleRequestWithWaitUntil(input, ...maybeCtx); - return handleAbortSignalAndPromiseResponse(res$, (input as any)._signal); + return handleAbortSignalAndPromiseResponse(res$, input.signal); }; const genericRequestHandler = ( diff --git a/packages/server/src/utils.ts b/packages/server/src/utils.ts index 6a5a61bbefb..4c1f1e2f78f 100644 --- a/packages/server/src/utils.ts +++ b/packages/server/src/utils.ts @@ -82,50 +82,11 @@ function isRequestBody(body: any): body is BodyInit { return false; } -export class ServerAdapterRequestAbortSignal extends EventTarget implements AbortSignal { - aborted = false; - private _onabort: ((this: AbortSignal, ev: Event) => any) | null = null; - reason: any; - - throwIfAborted(): void { - if (this.aborted) { - throw this.reason; - } - } - - sendAbort() { - this.reason = new DOMException('This operation was aborted', 'AbortError'); - this.aborted = true; - this.dispatchEvent(new Event('abort')); - } - - get onabort() { - return this._onabort; - } - - set onabort(value) { - this._onabort = value; - if (value) { - this.addEventListener('abort', value); - } else { - this.removeEventListener('abort', value); - } - } - - any(signals: Iterable): AbortSignal { - return AbortSignal.any([...signals]); - } -} - let bunNodeCompatModeWarned = false; export const nodeRequestResponseMap = new WeakMap(); -export function normalizeNodeRequest( - nodeRequest: NodeRequest, - fetchAPI: FetchAPI, - registerSignal?: (signal: ServerAdapterRequestAbortSignal) => void, -): Request { +export function normalizeNodeRequest(nodeRequest: NodeRequest, fetchAPI: FetchAPI): Request { const rawRequest = nodeRequest.raw || nodeRequest.req || nodeRequest; let fullUrl = buildFullUrl(rawRequest); if (nodeRequest.query) { @@ -136,8 +97,6 @@ export function normalizeNodeRequest( fullUrl = url.toString(); } - let signal: AbortSignal | undefined; - const nodeResponse = nodeRequestResponseMap.get(nodeRequest); nodeRequestResponseMap.delete(nodeRequest); let normalizedHeaders: Record = nodeRequest.headers; @@ -149,25 +108,12 @@ export function normalizeNodeRequest( } } } + const controller = new AbortController(); if (nodeResponse?.once) { - let sendAbortSignal: VoidFunction; - - // If ponyfilled - if (fetchAPI.Request !== globalThis.Request) { - const newSignal = new ServerAdapterRequestAbortSignal(); - registerSignal?.(newSignal); - signal = newSignal; - sendAbortSignal = () => (signal as ServerAdapterRequestAbortSignal).sendAbort(); - } else { - const controller = new AbortController(); - signal = controller.signal; - sendAbortSignal = () => controller.abort(); - } - const closeEventListener: EventListener = () => { - if (signal && !signal.aborted) { + if (!controller.signal.aborted) { Object.defineProperty(rawRequest, 'aborted', { value: true }); - sendAbortSignal(); + controller.abort(nodeResponse.errored ?? undefined); } }; @@ -183,7 +129,7 @@ export function normalizeNodeRequest( return new fetchAPI.Request(fullUrl, { method: nodeRequest.method, headers: normalizedHeaders, - signal: signal || null, + signal: controller.signal, }); } @@ -200,13 +146,13 @@ export function normalizeNodeRequest( method: nodeRequest.method || 'GET', headers: normalizedHeaders, body: maybeParsedBody, - signal: signal || null, + signal: controller.signal, }); } const request = new fetchAPI.Request(fullUrl, { method: nodeRequest.method || 'GET', headers: normalizedHeaders, - signal: signal || null, + signal: controller.signal, }); if (!request.headers.get('content-type')?.includes('json')) { request.headers.set('content-type', 'application/json; charset=utf-8'); @@ -254,7 +200,7 @@ It will affect your performance. Please check our Bun integration recipe, and av rawRequest.destroy(e); }, }), - signal, + signal: controller.signal, } as RequestInit); } @@ -262,10 +208,11 @@ It will affect your performance. Please check our Bun integration recipe, and av return new fetchAPI.Request(fullUrl, { method: nodeRequest.method, headers: normalizedHeaders, - body: rawRequest as any, + signal: controller.signal, + // @ts-expect-error - AsyncIterable is supported as body + body: rawRequest, duplex: 'half', - signal, - } as RequestInit); + }); } export function isReadable(stream: any): stream is Readable { @@ -585,19 +532,26 @@ export function createDeferredPromise(): DeferredPromise { export function handleAbortSignalAndPromiseResponse( response$: Promise | Response, - abortSignal?: AbortSignal | null, + abortSignal: AbortSignal, ) { + if (abortSignal?.aborted) { + throw abortSignal.reason; + } if (isPromise(response$) && abortSignal) { const deferred$ = createDeferredPromise(); - abortSignal.addEventListener('abort', function abortSignalFetchErrorHandler() { + function abortSignalFetchErrorHandler() { deferred$.reject(abortSignal.reason); - }); + } + abortSignal.addEventListener('abort', abortSignalFetchErrorHandler, { once: true }); response$ .then(function fetchSuccessHandler(res) { deferred$.resolve(res); }) .catch(function fetchErrorHandler(err) { deferred$.reject(err); + }) + .finally(() => { + abortSignal.removeEventListener('abort', abortSignalFetchErrorHandler); }); return deferred$.promise; } diff --git a/packages/server/src/uwebsockets.ts b/packages/server/src/uwebsockets.ts index cc07f8745b0..3ee1a8d3e0b 100644 --- a/packages/server/src/uwebsockets.ts +++ b/packages/server/src/uwebsockets.ts @@ -1,5 +1,5 @@ import type { FetchAPI } from './types.js'; -import { isPromise, ServerAdapterRequestAbortSignal } from './utils.js'; +import { isPromise } from './utils.js'; export interface UWSRequest { getMethod(): string; @@ -31,10 +31,15 @@ interface GetRequestFromUWSOpts { req: UWSRequest; res: UWSResponse; fetchAPI: FetchAPI; - signal: AbortSignal; + controller: AbortController; } -export function getRequestFromUWSRequest({ req, res, fetchAPI, signal }: GetRequestFromUWSOpts) { +export function getRequestFromUWSRequest({ + req, + res, + fetchAPI, + controller, +}: GetRequestFromUWSOpts) { const method = req.getMethod(); let duplex: 'half' | undefined; @@ -70,9 +75,13 @@ export function getRequestFromUWSRequest({ req, res, fetchAPI, signal }: GetRequ let getReadableStream: (() => ReadableStream) | undefined; if (method !== 'get' && method !== 'head') { duplex = 'half'; - signal.addEventListener('abort', () => { - stop(); - }); + controller.signal.addEventListener( + 'abort', + () => { + stop(); + }, + { once: true }, + ); let readableStream: ReadableStream; getReadableStream = () => { if (!readableStream) { @@ -124,7 +133,7 @@ export function getRequestFromUWSRequest({ req, res, fetchAPI, signal }: GetRequ get body() { return getBody(); }, - signal, + signal: controller.signal, // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore - not in the TS types yet duplex, @@ -202,7 +211,7 @@ export function createWritableFromUWS(uwsResponse: UWSResponse, fetchAPI: FetchA export function sendResponseToUwsOpts( uwsResponse: UWSResponse, fetchResponse: Response, - signal: ServerAdapterRequestAbortSignal, + controller: AbortController, fetchAPI: FetchAPI, ) { if (!fetchResponse) { @@ -211,7 +220,7 @@ export function sendResponseToUwsOpts( return; } const bufferOfRes: Uint8Array = (fetchResponse as any)._buffer; - if (signal.aborted) { + if (controller.signal.aborted) { return; } uwsResponse.cork(() => { @@ -240,17 +249,21 @@ export function sendResponseToUwsOpts( if (bufferOfRes || !fetchResponse.body) { return; } - signal.addEventListener('abort', () => { - if (!fetchResponse.body?.locked) { - fetchResponse.body?.cancel(signal.reason); - } - }); + controller.signal.addEventListener( + 'abort', + () => { + if (!fetchResponse.body?.locked) { + fetchResponse.body?.cancel(controller.signal.reason); + } + }, + { once: true }, + ); return fetchResponse.body .pipeTo(createWritableFromUWS(uwsResponse, fetchAPI), { - signal, + signal: controller.signal, }) .catch(err => { - if (signal.aborted) { + if (controller.signal.aborted) { return; } throw err;