Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance(node-fetch): use pipeline from node:streams to pipe streams #1500

Merged
merged 3 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/eighty-worms-accept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@whatwg-node/node-fetch': patch
---

Use suggested \`pipeline\` from \`node:streams\` to pipe streams to the final Response object
25 changes: 22 additions & 3 deletions packages/node-fetch/src/Request.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Agent } from 'http';
import { Agent as HTTPAgent, globalAgent as httpGlobalAgent } from 'http';
import { Agent as HTTPSAgent, globalAgent as httpsGlobalAgent } from 'https';
import { BodyPonyfillInit, PonyfillBody, PonyfillBodyOptions } from './Body.js';
import { isHeadersLike, PonyfillHeaders, PonyfillHeadersInit } from './Headers.js';

Expand All @@ -12,7 +13,7 @@ export type RequestPonyfillInit = PonyfillBodyOptions &
duplex?: 'half' | 'full';
headers?: PonyfillHeadersInit;
headersSerializer?: HeadersSerializer;
agent?: Agent;
agent?: HTTPAgent | HTTPSAgent | false;
};

type HeadersSerializer = (
Expand Down Expand Up @@ -95,6 +96,8 @@ export class PonyfillRequest<TJSON = any> extends PonyfillBody<TJSON> implements
this.contentLength = parseInt(contentLengthInHeaders, 10);
}
}

this._agent = requestInit?.agent;
}

headersSerializer?: HeadersSerializer;
Expand All @@ -111,9 +114,25 @@ export class PonyfillRequest<TJSON = any> extends PonyfillBody<TJSON> implements
referrer: string;
referrerPolicy: ReferrerPolicy;
url: string;
agent?: Agent;
duplex: 'half' | 'full';

private _agent: HTTPAgent | HTTPSAgent | false | undefined;

get agent() {
if (this._agent != null) {
return this._agent;
}
// Disable agent when running in jest
if (globalThis['libcurl'] || typeof jest === 'object') {
return false;
}
if (this.url.startsWith('http:')) {
return httpGlobalAgent;
} else if (this.url.startsWith('https:')) {
return httpsGlobalAgent;
}
}

private _signal: AbortSignal | undefined | null;

get signal() {
Expand Down
34 changes: 20 additions & 14 deletions packages/node-fetch/src/fetchCurl.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PassThrough, Readable } from 'stream';
import { PassThrough, Readable, promises as streamPromises } from 'stream';
import { PonyfillRequest } from './Request.js';
import { PonyfillResponse } from './Response.js';
import { defaultHeadersSerializer, isNodeReadable } from './utils.js';
Expand Down Expand Up @@ -100,7 +100,19 @@ export function fetchCurl<TResponseJSON = any, TRequestJSON = any>(
curlHandle.once(
'stream',
function streamListener(stream: Readable, status: number, headersBuf: Buffer) {
const pipedStream = stream.pipe(new PassThrough());
const outputStream = new PassThrough();

streamPromises
.pipeline(stream, outputStream, {
end: true,
signal: fetchRequest['_signal'] ?? undefined,
})
.then(() => {
if (!stream.destroyed) {
stream.resume();
}
})
.catch(reject);
const headersFlat = headersBuf
.toString('utf8')
.split(/\r?\n|\r/g)
Expand All @@ -110,7 +122,10 @@ export function fetchCurl<TResponseJSON = any, TRequestJSON = any>(
fetchRequest.redirect === 'error' &&
(headerFilter.includes('location') || headerFilter.includes('Location'))
) {
pipedStream.destroy();
if (!stream.destroyed) {
stream.resume();
}
outputStream.destroy();
reject(new Error('redirect is not allowed'));
}
return true;
Expand All @@ -120,22 +135,13 @@ export function fetchCurl<TResponseJSON = any, TRequestJSON = any>(
const headersInit = headersFlat.map(
headerFlat => headerFlat.split(/:\s(.+)/).slice(0, 2) as [string, string],
);
pipedStream.on('pause', () => {
stream.pause();
});
pipedStream.on('resume', () => {
stream.resume();
});
pipedStream.on('close', () => {
stream.destroy();
});
const ponyfillResponse = new PonyfillResponse(pipedStream, {
const ponyfillResponse = new PonyfillResponse(outputStream, {
status,
headers: headersInit,
url: fetchRequest.url,
});
resolve(ponyfillResponse);
streamResolved = pipedStream;
streamResolved = outputStream;
},
);
curlHandle.perform();
Expand Down
43 changes: 19 additions & 24 deletions packages/node-fetch/src/fetchNodeHttp.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { request as httpRequest } from 'http';
import { request as httpsRequest } from 'https';
import { PassThrough, Readable } from 'stream';
import { PassThrough, Readable, promises as streamPromises } from 'stream';
import { createBrotliDecompress, createGunzip, createInflate, createInflateRaw } from 'zlib';
import { PonyfillAbortError } from './AbortError.js';
import { PonyfillRequest } from './Request.js';
import { PonyfillResponse } from './Response.js';
import { PonyfillURL } from './URL.js';
Expand Down Expand Up @@ -46,24 +45,26 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
});

nodeRequest.once('response', nodeResponse => {
let responseBody: Readable = nodeResponse;
let outputStream: PassThrough;
const contentEncoding = nodeResponse.headers['content-encoding'];
switch (contentEncoding) {
case 'x-gzip':
case 'gzip':
responseBody = nodeResponse.pipe(createGunzip());
outputStream = createGunzip();
break;
case 'x-deflate':
case 'deflate':
responseBody = nodeResponse.pipe(createInflate());
outputStream = createInflate();
break;
case 'x-deflate-raw':
case 'deflate-raw':
responseBody = nodeResponse.pipe(createInflateRaw());
outputStream = createInflateRaw();
break;
case 'br':
responseBody = nodeResponse.pipe(createBrotliDecompress());
outputStream = createBrotliDecompress();
break;
default:
outputStream = new PassThrough();
}
if (nodeResponse.headers.location) {
if (fetchRequest.redirect === 'error') {
Expand All @@ -87,25 +88,19 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
return;
}
}
if (responseBody === nodeResponse) {
responseBody = nodeResponse.pipe(new PassThrough());
responseBody.on('pause', () => {
nodeResponse.pause();
});
responseBody.on('resume', () => {
nodeResponse.resume();
});
responseBody.on('close', () => {
nodeResponse.destroy();
});
fetchRequest['_signal']?.addEventListener('abort', () => {
streamPromises
.pipeline(nodeResponse, outputStream, {
signal: fetchRequest['_signal'] ?? undefined,
end: true,
})
.then(() => {
if (!nodeResponse.destroyed) {
responseBody.emit('error', new PonyfillAbortError());
nodeResponse.resume();
}
});
}
nodeResponse.once('error', reject);
const ponyfillResponse = new PonyfillResponse(responseBody, {
})
.catch(reject);

const ponyfillResponse = new PonyfillResponse(outputStream, {
status: nodeResponse.statusCode,
statusText: nodeResponse.statusMessage,
headers: nodeResponse.headers as Record<string, string>,
Expand Down
3 changes: 3 additions & 0 deletions packages/node-fetch/tests/fetch.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,21 @@ describe('Node Fetch Ponyfill', () => {
});
it('should respect gzip', async () => {
const response = await fetchPonyfill(baseUrl + '/gzip');
expect(response.headers.get('content-encoding')).toBe('gzip');
expect(response.status).toBe(200);
const body = await response.json();
expect(body.gzipped).toBe(true);
});
it('should respect deflate', async () => {
const response = await fetchPonyfill(baseUrl + '/deflate');
expect(response.headers.get('content-encoding')).toBe('deflate');
expect(response.status).toBe(200);
const body = await response.json();
expect(body.deflated).toBe(true);
});
it('should respect brotli', async () => {
const response = await fetchPonyfill(baseUrl + '/brotli');
expect(response.headers.get('content-encoding')).toBe('br');
expect(response.status).toBe(200);
const body = await response.json();
expect(body.brotli).toBe(true);
Expand Down
16 changes: 9 additions & 7 deletions packages/server/test/test-fetch.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
/* eslint-disable n/no-callback-literal */
import { globalAgent as httpGlobalAgent } from 'http';
import { globalAgent as httpsGlobalAgent } from 'https';
import type { Dispatcher } from 'undici';
import { createFetch } from '@whatwg-node/fetch';
import { createServerAdapter } from '../src/createServerAdapter';
Expand Down Expand Up @@ -52,8 +50,6 @@ export function runTestsForEachFetchImpl(
});
afterAll(() => {
globalThis.libcurl = libcurl;
httpGlobalAgent.destroy();
httpsGlobalAgent.destroy();
});
const fetchAPI = createFetch({ skipPonyfill: false });
callback('node-http', {
Expand All @@ -66,9 +62,15 @@ export function runTestsForEachFetchImpl(
});
});
});
const nodeMajor = parseInt(process.versions.node.split('.')[0], 10);
// Node 18 is leaking memory with native fetch
if (!opts.noNativeFetch && process.env.LEAK_TEST && nodeMajor >= 22) {
let noNative = opts.noNativeFetch;
if (
process.env.LEAK_TEST &&
// @ts-expect-error - Only if global dispatcher is available
!globalThis[Symbol.for('undici.globalDispatcher.1')]
) {
noNative = true;
}
if (!noNative) {
describe('Native', () => {
const fetchAPI = createFetch({ skipPonyfill: true });
callback('native', {
Expand Down
Loading