Skip to content

Commit

Permalink
feat(fetch): async iterable body (#976)
Browse files Browse the repository at this point in the history
Refs: #975
  • Loading branch information
ronag authored Aug 17, 2021
1 parent d0becbc commit f068a04
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 70 deletions.
5 changes: 2 additions & 3 deletions lib/fetch/body.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const { toWebReadable } = require('./util')
const { FormData } = require('./formdata')
const { kState } = require('./symbols')
const { Blob } = require('buffer')
const { Readable } = require('stream')
const { kBodyUsed } = require('../core/symbols')
const assert = require('assert')
const nodeUtil = require('util')
Expand Down Expand Up @@ -143,7 +142,7 @@ function extractBody (object, keepalive = false) {
if (object.type) {
contentType = object.type
}
} else if (object instanceof ReadableStream || object instanceof Readable) {
} else if (object instanceof ReadableStream || typeof object[Symbol.asyncIterator] === 'function') {
// ReadableStream

// If keepalive is true, then throw a TypeError.
Expand All @@ -158,7 +157,7 @@ function extractBody (object, keepalive = false) {
)
}

if (util.isStream(object)) {
if (typeof object[Symbol.asyncIterator] === 'function') {
stream = toWebReadable(object)
} else {
stream = object
Expand Down
86 changes: 19 additions & 67 deletions lib/fetch/util.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
'use strict'

const { redirectStatus } = require('./constants')
const { destroy, isDestroyed } = require('../../lib/core/util')
const { AbortError } = require('../../lib/core/errors')
const { finished } = require('stream')
const { performance } = require('perf_hooks')

let ReadableStream
let CountQueuingStrategy

// https://fetch.spec.whatwg.org/#block-bad-port
const badPorts = [
Expand Down Expand Up @@ -134,77 +130,33 @@ function isValidHTTPToken (characters) {
return true
}

function toWebReadable (streamReadable) {
function toWebReadable (iterable) {
if (!ReadableStream) {
ReadableStream = require('stream/web').ReadableStream
}
if (!CountQueuingStrategy) {
CountQueuingStrategy = require('stream/web').CountQueuingStrategy
}

if (isDestroyed(streamReadable)) {
const readable = new ReadableStream()
readable.cancel()
return readable
if (ReadableStream.from) {
// https://github.com/whatwg/streams/pull/1083
return ReadableStream.from(iterable)
}

const objectMode = streamReadable.readableObjectMode
const highWaterMark = streamReadable.readableHighWaterMark
const strategy = objectMode
? new CountQueuingStrategy({ highWaterMark })
: { highWaterMark }

let controller

function onData (chunk) {
// Copy the Buffer to detach it from the pool.
if (Buffer.isBuffer(chunk) && !objectMode) {
chunk = new Uint8Array(chunk)
}
controller.enqueue(chunk)
if (controller.desiredSize <= 0) {
streamReadable.pause()
}
}

streamReadable.pause()

finished(streamReadable, (err) => {
if (err && err.code === 'ERR_STREAM_PREMATURE_CLOSE') {
const er = new AbortError()
er.cause = er
err = er
}

if (err) {
controller.error(err)
} else {
queueMicrotask(() => {
// Must not use `process.nextTick()`.
// See https://github.com/nodejs/node/issues/39758
controller.close()
})
}
})

streamReadable.on('data', onData)

return new ReadableStream(
{
start (c) {
controller = c
},

pull () {
streamReadable.resume()
},

cancel (reason) {
destroy(streamReadable, reason)
const iterator = iterable[Symbol.asyncIterator]()

return new ReadableStream({
async pull (controller) {
const { done, value } = await iterator.next()
if (done) {
queueMicrotask(() => {
controller.close()
})
} else {
controller.enqueue(new Uint8Array(Buffer.from(value)))
}
},
strategy
)
async cancel (reason) {
await iterator.return()
}
}, 0)
}

// https://w3c.github.io/webappsec-referrer-policy/#set-requests-referrer-policy-on-redirect
Expand Down

0 comments on commit f068a04

Please sign in to comment.