diff --git a/asynciterator.ts b/asynciterator.ts index 38f5872..4da632e 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1717,7 +1717,7 @@ export class MultiTransformIterator extends TransformIterator { @extends module:asynciterator.BufferedIterator */ export class UnionIterator extends BufferedIterator { - private _sources : InternalSource[] = []; + private _sources : { requiresRead: boolean, source: InternalSource }[] = []; private _pending? : { loading: boolean, sources?: AsyncIterator>> }; private _currentSource = -1; protected _destroySources: boolean; @@ -1784,10 +1784,14 @@ export class UnionIterator extends BufferedIterator { if (isPromise(source)) source = wrap(source) as any as InternalSource; if (!source.done) { - this._sources.push(source); + const sourceObj = { requiresRead: true, source }; + this._sources.push(sourceObj); source[DESTINATION] = this; source.on('error', destinationEmitError); - source.on('readable', destinationFillBuffer); + source.on('readable', () => { + sourceObj.requiresRead = true; + destinationFillBuffer.bind(>sourceObj.source)(); + }); source.on('end', destinationRemoveEmptySources); } } @@ -1796,9 +1800,9 @@ export class UnionIterator extends BufferedIterator { protected _removeEmptySources() { this._sources = this._sources.filter((source, index) => { // Adjust the index of the current source if needed - if (source.done && index <= this._currentSource) + if (source.source.done && index <= this._currentSource) this._currentSource--; - return !source.done; + return !source.source.done; }); this._fillBuffer(); } @@ -1817,8 +1821,11 @@ export class UnionIterator extends BufferedIterator { // Pick the next source this._currentSource = (this._currentSource + 1) % this._sources.length; const source = this._sources[this._currentSource]; + if (!source.source.readable && !source.requiresRead) + continue; + source.requiresRead = false; // Attempt to read an item from that source - if ((item = source.read()) !== null) { + if ((item = source.source.read()) !== null) { count--; this._push(item); } @@ -1837,7 +1844,7 @@ export class UnionIterator extends BufferedIterator { // Destroy all sources that are still readable if (this._destroySources) { for (const source of this._sources) - source.destroy(); + source.source.destroy(); // Also close the sources stream if applicable if (this._pending) {