Skip to content

Commit

Permalink
enhance(server): use native AbortController and AbortSignal (#2031)
Browse files Browse the repository at this point in the history
  • Loading branch information
ardatan committed Feb 5, 2025
1 parent 1953c16 commit 337e605
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 165 deletions.
7 changes: 7 additions & 0 deletions .changeset/metal-moles-care.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@whatwg-node/server': patch
'@whatwg-node/node-fetch': patch
---

- Use native AbortSignal and AbortController for Request.signal
- Remove custom AbortSignal implementation (ServerAdapterAbortSignal)
13 changes: 2 additions & 11 deletions packages/node-fetch/src/Request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export class PonyfillRequest<TJSON = any> extends PonyfillBody<TJSON> implements
this.redirect = requestInit?.redirect || 'follow';
this.referrer = requestInit?.referrer || 'about:client';
this.referrerPolicy = requestInit?.referrerPolicy || 'no-referrer';
this._signal = requestInit?.signal;
this.signal = requestInit?.signal || new AbortController().signal;
this.headersSerializer = requestInit?.headersSerializer;
this.duplex = requestInit?.duplex || 'half';

Expand Down Expand Up @@ -137,16 +137,7 @@ export class PonyfillRequest<TJSON = any> extends PonyfillBody<TJSON> implements

agent: HTTPAgent | HTTPSAgent | false | undefined;

private _signal: AbortSignal | undefined | null;

get signal() {
// Create a new signal only if needed
// Because the creation of signal is expensive
if (!this._signal) {
this._signal = new AbortController().signal;
}
return this._signal!;
}
signal: AbortSignal;

clone(): PonyfillRequest<TJSON> {
return this;
Expand Down
52 changes: 6 additions & 46 deletions packages/node-fetch/src/WritableStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,55 +44,15 @@ export class PonyfillWritableStream<W = any> implements WritableStream<W> {
},
});
this.writable = writable;
let onabort: EventListener | null;
let reason: any;
const abortCtrl = new AbortController();
const controller: WritableStreamDefaultController = {
signal: {
any(signals) {
return AbortSignal.any([...signals]);
},
get reason() {
return reason;
},
get aborted() {
return writable.destroyed;
},
addEventListener: (_event: string, eventListener: EventListener) => {
writable.once('error', eventListener);
writable.once('close', eventListener);
},
removeEventListener: (_event: string, eventListener: EventListener) => {
writable.off('error', eventListener);
writable.off('close', eventListener);
},
dispatchEvent: (_event: Event) => {
return false;
},
get onabort() {
return onabort;
},
set onabort(value) {
if (onabort) {
this.removeEventListener('abort', onabort);
}
onabort = value;
if (onabort) {
this.addEventListener('abort', onabort);
}
},
throwIfAborted() {
if (writable.destroyed) {
throw reason;
}
},
},
error: e => {
this.writable.destroy(e);
signal: abortCtrl.signal,
error(e) {
writable.destroy(e);
},
};
this.writable.once('error', err => {
reason = err;
});
writable.once('error', err => abortCtrl.abort(err));
writable.once('close', () => abortCtrl.abort());
} else {
this.writable = new Writable();
}
Expand Down
16 changes: 6 additions & 10 deletions packages/node-fetch/src/fetchCurl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ export function fetchCurl<TResponseJSON = any, TRequestJSON = any>(
curlHandle.enable(CurlFeature.StreamResponse);

curlHandle.setStreamProgressCallback(function () {
return fetchRequest['_signal']?.aborted
? process.env.DEBUG
? CurlProgressFunc.Continue
: 1
: 0;
return fetchRequest.signal.aborted ? (process.env.DEBUG ? CurlProgressFunc.Continue : 1) : 0;
});

if (fetchRequest['bodyType'] === 'String') {
Expand Down Expand Up @@ -92,17 +88,17 @@ export function fetchCurl<TResponseJSON = any, TRequestJSON = any>(
}
}
}
if (fetchRequest['_signal']) {
fetchRequest['_signal'].addEventListener('abort', onAbort, { once: true });
if (fetchRequest.signal) {
fetchRequest.signal.addEventListener('abort', onAbort, { once: true });
}
curlHandle.once('end', function endListener() {
try {
curlHandle.close();
} catch (e) {
deferredPromise.reject(e);
}
if (fetchRequest['_signal']) {
fetchRequest['_signal'].removeEventListener('abort', onAbort);
if (fetchRequest.signal) {
fetchRequest.signal.removeEventListener('abort', onAbort);
}
});
curlHandle.once('error', function errorListener(error: any) {
Expand All @@ -127,7 +123,7 @@ export function fetchCurl<TResponseJSON = any, TRequestJSON = any>(

pipeline(stream, outputStream, {
end: true,
signal: fetchRequest['_signal'] ?? undefined,
signal: fetchRequest.signal,
})
.then(() => {
if (!stream.destroyed) {
Expand Down
6 changes: 3 additions & 3 deletions packages/node-fetch/src/fetchNodeHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
nodeRequest = requestFn(fetchRequest.parsedUrl, {
method: fetchRequest.method,
headers: nodeHeaders,
signal: fetchRequest['_signal'] ?? undefined,
signal: fetchRequest.signal,
agent: fetchRequest.agent,
});
} else {
nodeRequest = requestFn(fetchRequest.url, {
method: fetchRequest.method,
headers: nodeHeaders,
signal: fetchRequest['_signal'] ?? undefined,
signal: fetchRequest.signal,
agent: fetchRequest.agent,
});
}
Expand Down Expand Up @@ -107,7 +107,7 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
}
}
pipeline(nodeResponse, outputStream, {
signal: fetchRequest['_signal'] ?? undefined,
signal: fetchRequest.signal,
end: true,
})
.then(() => {
Expand Down
25 changes: 14 additions & 11 deletions packages/server/src/createServerAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import {
NodeResponse,
normalizeNodeRequest,
sendNodeResponse,
ServerAdapterRequestAbortSignal,
} from './utils.js';
import {
fakePromise,
Expand Down Expand Up @@ -319,7 +318,7 @@ function createServerAdapter<
? completeAssign(defaultServerContext, ...ctx)
: defaultServerContext;

const signal = new ServerAdapterRequestAbortSignal();
const controller = new AbortController();
const originalResEnd = res.end.bind(res);
let resEnded = false;
res.end = function (data: any) {
Expand All @@ -328,16 +327,16 @@ function createServerAdapter<
};
const originalOnAborted = res.onAborted.bind(res);
originalOnAborted(function () {
signal.sendAbort();
controller.abort();
});
res.onAborted = function (cb: () => void) {
signal.addEventListener('abort', cb);
controller.signal.addEventListener('abort', cb, { once: true });
};
const request = getRequestFromUWSRequest({
req,
res,
fetchAPI,
signal,
controller,
});
let response$: Response | Promise<Response> | undefined;
try {
Expand All @@ -349,8 +348,8 @@ function createServerAdapter<
return response$
.catch((e: any) => handleErrorFromRequestHandler(e, fetchAPI.Response))
.then(response => {
if (!signal.aborted && !resEnded) {
return sendResponseToUwsOpts(res, response, signal, fetchAPI);
if (!controller.signal.aborted && !resEnded) {
return sendResponseToUwsOpts(res, response, controller, fetchAPI);
}
})
.catch(err => {
Expand All @@ -360,8 +359,8 @@ function createServerAdapter<
});
}
try {
if (!signal.aborted && !resEnded) {
return sendResponseToUwsOpts(res, response$, signal, fetchAPI);
if (!controller.signal.aborted && !resEnded) {
return sendResponseToUwsOpts(res, response$, controller, fetchAPI);
}
} catch (err: any) {
console.error(
Expand Down Expand Up @@ -406,13 +405,17 @@ function createServerAdapter<
if (isRequestInit(initOrCtx)) {
const request = new fetchAPI.Request(input, initOrCtx);
const res$ = handleRequestWithWaitUntil(request, ...restOfCtx);
return handleAbortSignalAndPromiseResponse(res$, (initOrCtx as RequestInit)?.signal);
const signal = (initOrCtx as RequestInit).signal;
if (signal) {
return handleAbortSignalAndPromiseResponse(res$, signal);
}
return res$;
}
const request = new fetchAPI.Request(input);
return handleRequestWithWaitUntil(request, ...maybeCtx);
}
const res$ = handleRequestWithWaitUntil(input, ...maybeCtx);
return handleAbortSignalAndPromiseResponse(res$, (input as any)._signal);
return handleAbortSignalAndPromiseResponse(res$, input.signal);
};

const genericRequestHandler = (
Expand Down
Loading

0 comments on commit 337e605

Please sign in to comment.