-
Notifications
You must be signed in to change notification settings - Fork 29.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
stream: added experimental support for for-await
Adds support for Symbol.asyncIterator into the Readable class. The stream is destroyed when the loop terminates with break or throw. Fixes: #15709
- Loading branch information
Showing
5 changed files
with
492 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
'use strict'; | ||
|
||
const kLastResolve = Symbol('lastResolve'); | ||
const kLastReject = Symbol('lastReject'); | ||
const kError = Symbol('error'); | ||
const kEnded = Symbol('ended'); | ||
const kLastPromise = Symbol('lastPromise'); | ||
const kHandlePromise = Symbol('handlePromise'); | ||
const kStream = Symbol('stream'); | ||
|
||
const AsyncIteratorRecord = class AsyncIteratorRecord { | ||
constructor(value, done) { | ||
this.done = done; | ||
this.value = value; | ||
} | ||
}; | ||
|
||
function readAndResolve(iter) { | ||
const resolve = iter[kLastResolve]; | ||
if (resolve !== null) { | ||
const data = iter[kStream].read(); | ||
// we defer if data is null | ||
// we can be expecting either 'end' or | ||
// 'error' | ||
if (data !== null) { | ||
iter[kLastPromise] = null; | ||
iter[kLastResolve] = null; | ||
iter[kLastReject] = null; | ||
resolve(new AsyncIteratorRecord(data, false)); | ||
} | ||
} | ||
} | ||
|
||
function onReadable(iter) { | ||
// we wait for the next tick, because it might | ||
// emit an error with process.nextTick | ||
process.nextTick(readAndResolve, iter); | ||
} | ||
|
||
function onEnd(iter) { | ||
const resolve = iter[kLastResolve]; | ||
if (resolve !== null) { | ||
iter[kLastPromise] = null; | ||
iter[kLastResolve] = null; | ||
iter[kLastReject] = null; | ||
resolve(new AsyncIteratorRecord(null, true)); | ||
} | ||
iter[kEnded] = true; | ||
} | ||
|
||
function onError(iter, err) { | ||
const reject = iter[kLastReject]; | ||
// reject if we are waiting for data in the Promise | ||
// returned by next() and store the error | ||
if (reject !== null) { | ||
iter[kLastPromise] = null; | ||
iter[kLastResolve] = null; | ||
iter[kLastReject] = null; | ||
reject(err); | ||
} | ||
iter.error = err; | ||
} | ||
|
||
function wrapForNext(lastPromise, iter) { | ||
return function(resolve, reject) { | ||
lastPromise.then(function() { | ||
iter[kHandlePromise](resolve, reject); | ||
}, reject); | ||
}; | ||
} | ||
|
||
const ReadableAsyncIterator = class ReadableAsyncIterator { | ||
constructor(stream) { | ||
this[kStream] = stream; | ||
this[kLastResolve] = null; | ||
this[kLastReject] = null; | ||
this[kError] = null; | ||
this[kEnded] = false; | ||
this[kLastPromise] = null; | ||
|
||
stream.on('readable', onReadable.bind(null, this)); | ||
stream.on('end', onEnd.bind(null, this)); | ||
stream.on('error', onError.bind(null, this)); | ||
|
||
// the function passed to new Promise | ||
// is cached so we avoid allocating a new | ||
// closure at every run | ||
this[kHandlePromise] = (resolve, reject) => { | ||
const data = this[kStream].read(); | ||
if (data) { | ||
this[kLastPromise] = null; | ||
this[kLastResolve] = null; | ||
this[kLastReject] = null; | ||
resolve(new AsyncIteratorRecord(data, false)); | ||
} else { | ||
this[kLastResolve] = resolve; | ||
this[kLastReject] = reject; | ||
} | ||
}; | ||
} | ||
|
||
get stream() { | ||
return this[kStream]; | ||
} | ||
|
||
next() { | ||
// if we have detected an error in the meanwhile | ||
// reject straight away | ||
const error = this[kError]; | ||
if (error !== null) { | ||
return Promise.reject(error); | ||
} | ||
|
||
if (this[kEnded]) { | ||
return Promise.resolve(new AsyncIteratorRecord(null, true)); | ||
} | ||
|
||
// if we have multiple next() calls | ||
// we will wait for the previous Promise to finish | ||
// this logic is optimized to support for await loops, | ||
// where next() is only called once at a time | ||
const lastPromise = this[kLastPromise]; | ||
let promise; | ||
|
||
if (lastPromise) { | ||
promise = new Promise(wrapForNext(lastPromise, this)); | ||
} else { | ||
// fast path needed to support multiple this.push() | ||
// without triggering the next() queue | ||
const data = this[kStream].read(); | ||
if (data !== null) { | ||
return Promise.resolve(new AsyncIteratorRecord(data, false)); | ||
} | ||
|
||
promise = new Promise(this[kHandlePromise]); | ||
} | ||
|
||
this[kLastPromise] = promise; | ||
|
||
return promise; | ||
} | ||
|
||
return() { | ||
// destroy(err, cb) is a private API | ||
// we can guarantee we have that here, because we control the | ||
// Readable class this is attached to | ||
return new Promise((resolve, reject) => { | ||
this[kStream].destroy(null, (err) => { | ||
if (err) { | ||
reject(err); | ||
return; | ||
} | ||
resolve(new AsyncIteratorRecord(null, true)); | ||
}); | ||
}); | ||
} | ||
}; | ||
|
||
module.exports = ReadableAsyncIterator; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.