diff --git a/lib/fetch/body.js b/lib/fetch/body.js index 1861bdbe99b..49bce6c6e12 100644 --- a/lib/fetch/body.js +++ b/lib/fetch/body.js @@ -5,7 +5,6 @@ const { toWebReadable } = require('./util') const { FormData } = require('./formdata') const { kState } = require('./symbols') const { Blob } = require('buffer') -const { Readable } = require('stream') const { kBodyUsed } = require('../core/symbols') const assert = require('assert') const nodeUtil = require('util') @@ -143,7 +142,7 @@ function extractBody (object, keepalive = false) { if (object.type) { contentType = object.type } - } else if (object instanceof ReadableStream || object instanceof Readable) { + } else if (object instanceof ReadableStream || typeof object[Symbol.asyncIterator] === 'function') { // ReadableStream // If keepalive is true, then throw a TypeError. @@ -158,7 +157,7 @@ function extractBody (object, keepalive = false) { ) } - if (util.isStream(object)) { + if (typeof object[Symbol.asyncIterator] === 'function') { stream = toWebReadable(object) } else { stream = object diff --git a/lib/fetch/util.js b/lib/fetch/util.js index bea352d61a2..9067308f185 100644 --- a/lib/fetch/util.js +++ b/lib/fetch/util.js @@ -1,13 +1,9 @@ 'use strict' const { redirectStatus } = require('./constants') -const { destroy, isDestroyed } = require('../../lib/core/util') -const { AbortError } = require('../../lib/core/errors') -const { finished } = require('stream') const { performance } = require('perf_hooks') let ReadableStream -let CountQueuingStrategy // https://fetch.spec.whatwg.org/#block-bad-port const badPorts = [ @@ -134,77 +130,33 @@ function isValidHTTPToken (characters) { return true } -function toWebReadable (streamReadable) { +function toWebReadable (iterable) { if (!ReadableStream) { ReadableStream = require('stream/web').ReadableStream } - if (!CountQueuingStrategy) { - CountQueuingStrategy = require('stream/web').CountQueuingStrategy - } - if (isDestroyed(streamReadable)) { - const readable = new ReadableStream() - readable.cancel() - return readable + if (ReadableStream.from) { + // https://github.com/whatwg/streams/pull/1083 + return ReadableStream.from(iterable) } - const objectMode = streamReadable.readableObjectMode - const highWaterMark = streamReadable.readableHighWaterMark - const strategy = objectMode - ? new CountQueuingStrategy({ highWaterMark }) - : { highWaterMark } - - let controller - - function onData (chunk) { - // Copy the Buffer to detach it from the pool. - if (Buffer.isBuffer(chunk) && !objectMode) { - chunk = new Uint8Array(chunk) - } - controller.enqueue(chunk) - if (controller.desiredSize <= 0) { - streamReadable.pause() - } - } - - streamReadable.pause() - - finished(streamReadable, (err) => { - if (err && err.code === 'ERR_STREAM_PREMATURE_CLOSE') { - const er = new AbortError() - er.cause = er - err = er - } - - if (err) { - controller.error(err) - } else { - queueMicrotask(() => { - // Must not use `process.nextTick()`. - // See https://github.com/nodejs/node/issues/39758 - controller.close() - }) - } - }) - - streamReadable.on('data', onData) - - return new ReadableStream( - { - start (c) { - controller = c - }, - - pull () { - streamReadable.resume() - }, - - cancel (reason) { - destroy(streamReadable, reason) + const iterator = iterable[Symbol.asyncIterator]() + + return new ReadableStream({ + async pull (controller) { + const { done, value } = await iterator.next() + if (done) { + queueMicrotask(() => { + controller.close() + }) + } else { + controller.enqueue(new Uint8Array(Buffer.from(value))) } }, - strategy - ) + async cancel (reason) { + await iterator.return() + } + }, 0) } // https://w3c.github.io/webappsec-referrer-policy/#set-requests-referrer-policy-on-redirect