-
Notifications
You must be signed in to change notification settings - Fork 30.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: added experimental support for for-await #17755
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
'use strict'; | ||
|
||
const kLastResolve = Symbol('lastResolve'); | ||
const kLastReject = Symbol('lastReject'); | ||
const kError = Symbol('error'); | ||
const kEnded = Symbol('ended'); | ||
const kLastPromise = Symbol('lastPromise'); | ||
const kHandlePromise = Symbol('handlePromise'); | ||
const kStream = Symbol('stream'); | ||
|
||
const AsyncIteratorRecord = class AsyncIteratorRecord { | ||
constructor(value, done) { | ||
this.done = done; | ||
this.value = value; | ||
} | ||
}; | ||
|
||
function readAndResolve(iter) { | ||
const resolve = iter[kLastResolve]; | ||
if (resolve !== null) { | ||
const data = iter[kStream].read(); | ||
// we defer if data is null | ||
// we can be expecting either 'end' or | ||
// 'error' | ||
if (data !== null) { | ||
iter[kLastPromise] = null; | ||
iter[kLastResolve] = null; | ||
iter[kLastReject] = null; | ||
resolve(new AsyncIteratorRecord(data, false)); | ||
} | ||
} | ||
} | ||
|
||
function onReadable(iter) { | ||
// we wait for the next tick, because it might | ||
// emit an error with process.nextTick | ||
process.nextTick(readAndResolve, iter); | ||
} | ||
|
||
function onEnd(iter) { | ||
const resolve = iter[kLastResolve]; | ||
if (resolve !== null) { | ||
iter[kLastPromise] = null; | ||
iter[kLastResolve] = null; | ||
iter[kLastReject] = null; | ||
resolve(new AsyncIteratorRecord(null, true)); | ||
} | ||
iter[kEnded] = true; | ||
} | ||
|
||
function onError(iter, err) { | ||
const reject = iter[kLastReject]; | ||
// reject if we are waiting for data in the Promise | ||
// returned by next() and store the error | ||
if (reject !== null) { | ||
iter[kLastPromise] = null; | ||
iter[kLastResolve] = null; | ||
iter[kLastReject] = null; | ||
reject(err); | ||
} | ||
iter.error = err; | ||
} | ||
|
||
function wrapForNext(lastPromise, iter) { | ||
return function(resolve, reject) { | ||
lastPromise.then(function() { | ||
iter[kHandlePromise](resolve, reject); | ||
}, reject); | ||
}; | ||
} | ||
|
||
const ReadableAsyncIterator = class ReadableAsyncIterator { | ||
constructor(stream) { | ||
this[kStream] = stream; | ||
this[kLastResolve] = null; | ||
this[kLastReject] = null; | ||
this[kError] = null; | ||
this[kEnded] = false; | ||
this[kLastPromise] = null; | ||
|
||
stream.on('readable', onReadable.bind(null, this)); | ||
stream.on('end', onEnd.bind(null, this)); | ||
stream.on('error', onError.bind(null, this)); | ||
|
||
// the function passed to new Promise | ||
// is cached so we avoid allocating a new | ||
// closure at every run | ||
this[kHandlePromise] = (resolve, reject) => { | ||
const data = this[kStream].read(); | ||
if (data) { | ||
this[kLastPromise] = null; | ||
this[kLastResolve] = null; | ||
this[kLastReject] = null; | ||
resolve(new AsyncIteratorRecord(data, false)); | ||
} else { | ||
this[kLastResolve] = resolve; | ||
this[kLastReject] = reject; | ||
} | ||
}; | ||
} | ||
|
||
get stream() { | ||
return this[kStream]; | ||
} | ||
|
||
next() { | ||
// if we have detected an error in the meanwhile | ||
// reject straight away | ||
const error = this[kError]; | ||
if (error !== null) { | ||
return Promise.reject(error); | ||
} | ||
|
||
if (this[kEnded]) { | ||
return Promise.resolve(new AsyncIteratorRecord(null, true)); | ||
} | ||
|
||
// if we have multiple next() calls | ||
// we will wait for the previous Promise to finish | ||
// this logic is optimized to support for await loops, | ||
// where next() is only called once at a time | ||
const lastPromise = this[kLastPromise]; | ||
let promise; | ||
|
||
if (lastPromise) { | ||
promise = new Promise(wrapForNext(lastPromise, this)); | ||
} else { | ||
// fast path needed to support multiple this.push() | ||
// without triggering the next() queue | ||
const data = this[kStream].read(); | ||
if (data !== null) { | ||
return Promise.resolve(new AsyncIteratorRecord(data, false)); | ||
} | ||
|
||
promise = new Promise(this[kHandlePromise]); | ||
} | ||
|
||
this[kLastPromise] = promise; | ||
|
||
return promise; | ||
} | ||
|
||
return() { | ||
// destroy(err, cb) is a private API | ||
// we can guarantee we have that here, because we control the | ||
// Readable class this is attached to | ||
return new Promise((resolve, reject) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure that's faster than what you had before this change (with promisify) :D I'm sorry if I was confusing in the comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually think that this is faster because we would have to call |
||
this[kStream].destroy(null, (err) => { | ||
if (err) { | ||
reject(err); | ||
return; | ||
} | ||
resolve(new AsyncIteratorRecord(null, true)); | ||
}); | ||
}); | ||
} | ||
}; | ||
|
||
module.exports = ReadableAsyncIterator; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,298 @@ | ||
'use strict'; | ||
|
||
const common = require('../common'); | ||
const { Readable } = require('stream'); | ||
const assert = require('assert'); | ||
|
||
common.crashOnUnhandledRejection(); | ||
|
||
async function tests() { | ||
await (async function() { | ||
console.log('read without for..await'); | ||
const max = 5; | ||
const readable = new Readable({ | ||
objectMode: true, | ||
read() {} | ||
}); | ||
|
||
const iter = readable[Symbol.asyncIterator](); | ||
assert.strictEqual(iter.stream, readable); | ||
const values = []; | ||
for (let i = 0; i < max; i++) { | ||
values.push(iter.next()); | ||
} | ||
Promise.all(values).then(common.mustCall((values) => { | ||
values.forEach(common.mustCall( | ||
(item, i) => assert.strictEqual(item.value, 'hello-' + i), 5)); | ||
})); | ||
|
||
readable.push('hello-0'); | ||
readable.push('hello-1'); | ||
readable.push('hello-2'); | ||
readable.push('hello-3'); | ||
readable.push('hello-4'); | ||
readable.push(null); | ||
|
||
const last = await iter.next(); | ||
assert.strictEqual(last.done, true); | ||
})(); | ||
|
||
await (async function() { | ||
console.log('read without for..await deferred'); | ||
const readable = new Readable({ | ||
objectMode: true, | ||
read() {} | ||
}); | ||
|
||
const iter = readable[Symbol.asyncIterator](); | ||
assert.strictEqual(iter.stream, readable); | ||
let values = []; | ||
for (let i = 0; i < 3; i++) { | ||
values.push(iter.next()); | ||
} | ||
|
||
readable.push('hello-0'); | ||
readable.push('hello-1'); | ||
readable.push('hello-2'); | ||
|
||
let k = 0; | ||
const results1 = await Promise.all(values); | ||
results1.forEach(common.mustCall( | ||
(item) => assert.strictEqual(item.value, 'hello-' + k++), 3)); | ||
|
||
values = []; | ||
for (let i = 0; i < 2; i++) { | ||
values.push(iter.next()); | ||
} | ||
|
||
readable.push('hello-3'); | ||
readable.push('hello-4'); | ||
readable.push(null); | ||
|
||
const results2 = await Promise.all(values); | ||
results2.forEach(common.mustCall( | ||
(item) => assert.strictEqual(item.value, 'hello-' + k++), 2)); | ||
|
||
const last = await iter.next(); | ||
assert.strictEqual(last.done, true); | ||
})(); | ||
|
||
await (async function() { | ||
console.log('read without for..await with errors'); | ||
const max = 3; | ||
const readable = new Readable({ | ||
objectMode: true, | ||
read() {} | ||
}); | ||
|
||
const iter = readable[Symbol.asyncIterator](); | ||
assert.strictEqual(iter.stream, readable); | ||
const values = []; | ||
const errors = []; | ||
let i; | ||
for (i = 0; i < max; i++) { | ||
values.push(iter.next()); | ||
} | ||
for (i = 0; i < 2; i++) { | ||
errors.push(iter.next()); | ||
} | ||
|
||
readable.push('hello-0'); | ||
readable.push('hello-1'); | ||
readable.push('hello-2'); | ||
|
||
const resolved = await Promise.all(values); | ||
|
||
resolved.forEach(common.mustCall( | ||
(item, i) => assert.strictEqual(item.value, 'hello-' + i), max)); | ||
|
||
errors.forEach((promise) => { | ||
promise.catch(common.mustCall((err) => { | ||
assert.strictEqual(err.message, 'kaboom'); | ||
})); | ||
}); | ||
|
||
readable.destroy(new Error('kaboom')); | ||
})(); | ||
|
||
await (async function() { | ||
console.log('read object mode'); | ||
const max = 42; | ||
let readed = 0; | ||
let received = 0; | ||
const readable = new Readable({ | ||
objectMode: true, | ||
read() { | ||
this.push('hello'); | ||
if (++readed === max) { | ||
this.push(null); | ||
} | ||
} | ||
}); | ||
|
||
for await (const k of readable) { | ||
received++; | ||
assert.strictEqual(k, 'hello'); | ||
} | ||
|
||
assert.strictEqual(readed, received); | ||
})(); | ||
|
||
await (async function() { | ||
console.log('destroy sync'); | ||
const readable = new Readable({ | ||
objectMode: true, | ||
read() { | ||
this.destroy(new Error('kaboom from read')); | ||
} | ||
}); | ||
|
||
let err; | ||
try { | ||
// eslint-disable-next-line no-unused-vars | ||
for await (const k of readable) {} | ||
} catch (e) { | ||
err = e; | ||
} | ||
assert.strictEqual(err.message, 'kaboom from read'); | ||
})(); | ||
|
||
await (async function() { | ||
console.log('destroy async'); | ||
const readable = new Readable({ | ||
objectMode: true, | ||
read() { | ||
if (!this.pushed) { | ||
this.push('hello'); | ||
this.pushed = true; | ||
|
||
setImmediate(() => { | ||
this.destroy(new Error('kaboom')); | ||
}); | ||
} | ||
} | ||
}); | ||
|
||
let received = 0; | ||
|
||
let err = null; | ||
try { | ||
// eslint-disable-next-line no-unused-vars | ||
for await (const k of readable) { | ||
received++; | ||
} | ||
} catch (e) { | ||
err = e; | ||
} | ||
|
||
assert.strictEqual(err.message, 'kaboom'); | ||
assert.strictEqual(received, 1); | ||
})(); | ||
|
||
await (async function() { | ||
console.log('destroyed by throw'); | ||
const readable = new Readable({ | ||
objectMode: true, | ||
read() { | ||
this.push('hello'); | ||
} | ||
}); | ||
|
||
let err = null; | ||
try { | ||
for await (const k of readable) { | ||
assert.strictEqual(k, 'hello'); | ||
throw new Error('kaboom'); | ||
} | ||
} catch (e) { | ||
err = e; | ||
} | ||
|
||
assert.strictEqual(err.message, 'kaboom'); | ||
assert.strictEqual(readable.destroyed, true); | ||
})(); | ||
|
||
await (async function() { | ||
console.log('destroyed sync after push'); | ||
const readable = new Readable({ | ||
objectMode: true, | ||
read() { | ||
this.push('hello'); | ||
this.destroy(new Error('kaboom')); | ||
} | ||
}); | ||
|
||
let received = 0; | ||
|
||
let err = null; | ||
try { | ||
for await (const k of readable) { | ||
assert.strictEqual(k, 'hello'); | ||
received++; | ||
} | ||
} catch (e) { | ||
err = e; | ||
} | ||
|
||
assert.strictEqual(err.message, 'kaboom'); | ||
assert.strictEqual(received, 1); | ||
})(); | ||
|
||
await (async function() { | ||
console.log('push async'); | ||
const max = 42; | ||
let readed = 0; | ||
let received = 0; | ||
const readable = new Readable({ | ||
objectMode: true, | ||
read() { | ||
setImmediate(() => { | ||
this.push('hello'); | ||
if (++readed === max) { | ||
this.push(null); | ||
} | ||
}); | ||
} | ||
}); | ||
|
||
for await (const k of readable) { | ||
received++; | ||
assert.strictEqual(k, 'hello'); | ||
} | ||
|
||
assert.strictEqual(readed, received); | ||
})(); | ||
|
||
await (async function() { | ||
console.log('push binary async'); | ||
const max = 42; | ||
let readed = 0; | ||
const readable = new Readable({ | ||
read() { | ||
setImmediate(() => { | ||
this.push('hello'); | ||
if (++readed === max) { | ||
this.push(null); | ||
} | ||
}); | ||
} | ||
}); | ||
|
||
let expected = ''; | ||
readable.setEncoding('utf8'); | ||
readable.pause(); | ||
readable.on('data', (chunk) => { | ||
expected += chunk; | ||
}); | ||
|
||
let data = ''; | ||
for await (const k of readable) { | ||
data += k; | ||
} | ||
|
||
assert.strictEqual(data, expected); | ||
})(); | ||
} | ||
|
||
// to avoid missing some tests if a promise does not resolve | ||
tests().then(common.mustCall()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[AsyncIterator][async-iterator]
->[AsyncIterator][stream-iterator]
? Or the bottom reference can be edited.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in the bottom reference.