Skip to content

Commit

Permalink
BREAKING: Convert to ES class to drop inherits dep
Browse files Browse the repository at this point in the history
Replace `MultiStream()` with `new MultiStream()` now. Readme examples have always shown this usage, but now it is required.

For: brave/brave-browser#5490
  • Loading branch information
feross committed Aug 6, 2019
1 parent 738c904 commit 80088f9
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 125 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ new MultiStream(streams).pipe(process.stdout) // => 123
Alternatively, streams may be created by an asynchronous "factory" function:

```js
var count = 0;
var count = 0
function factory (cb) {
if (count > 3) return cb(null, null)
count++
Expand Down
225 changes: 109 additions & 116 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,152 +1,145 @@
module.exports = MultiStream

var inherits = require('inherits')
var stream = require('readable-stream')

inherits(MultiStream, stream.Readable)

function MultiStream (streams, opts) {
var self = this
if (!(self instanceof MultiStream)) return new MultiStream(streams, opts)
stream.Readable.call(self, opts)
function toStreams2Obj (s) {
return toStreams2(s, { objectMode: true, highWaterMark: 16 })
}

self.destroyed = false
function toStreams2Buf (s) {
return toStreams2(s)
}

self._drained = false
self._forwarding = false
self._current = null
self._toStreams2 = (opts && opts.objectMode) ? toStreams2Obj : toStreams2Buf
function toStreams2 (s, opts) {
if (!s || typeof s === 'function' || s._readableState) return s

if (typeof streams === 'function') {
self._queue = streams
} else {
self._queue = streams.map(self._toStreams2)
self._queue.forEach(function (stream) {
if (typeof stream !== 'function') self._attachErrorListener(stream)
})
var wrap = new stream.Readable(opts).wrap(s)
if (s.destroy) {
wrap.destroy = s.destroy.bind(s)
}

self._next()
return wrap
}

MultiStream.obj = function (streams) {
return new MultiStream(streams, { objectMode: true, highWaterMark: 16 })
}
class MultiStream extends stream.Readable {
constructor (streams, opts) {
super(opts)

MultiStream.prototype._read = function () {
this._drained = true
this._forward()
}
this.destroyed = false

MultiStream.prototype._forward = function () {
if (this._forwarding || !this._drained || !this._current) return
this._forwarding = true
this._drained = false
this._forwarding = false
this._current = null
this._toStreams2 = (opts && opts.objectMode) ? toStreams2Obj : toStreams2Buf

var chunk
while ((chunk = this._current.read()) !== null && this._drained) {
this._drained = this.push(chunk)
}

this._forwarding = false
}
if (typeof streams === 'function') {
this._queue = streams
} else {
this._queue = streams.map(this._toStreams2)
this._queue.forEach(stream => {
if (typeof stream !== 'function') this._attachErrorListener(stream)
})
}

MultiStream.prototype.destroy = function (err) {
if (this.destroyed) return
this.destroyed = true
this._next()
}

if (this._current && this._current.destroy) this._current.destroy()
if (typeof this._queue !== 'function') {
this._queue.forEach(function (stream) {
if (stream.destroy) stream.destroy()
})
_read () {
this._drained = true
this._forward()
}

if (err) this.emit('error', err)
this.emit('close')
}
_forward () {
if (this._forwarding || !this._drained || !this._current) return
this._forwarding = true

MultiStream.prototype._next = function () {
var self = this
self._current = null

if (typeof self._queue === 'function') {
self._queue(function (err, stream) {
if (err) return self.destroy(err)
stream = self._toStreams2(stream)
self._attachErrorListener(stream)
self._gotNextStream(stream)
})
} else {
var stream = self._queue.shift()
if (typeof stream === 'function') {
stream = self._toStreams2(stream())
self._attachErrorListener(stream)
var chunk
while ((chunk = this._current.read()) !== null && this._drained) {
this._drained = this.push(chunk)
}
self._gotNextStream(stream)
}
}

MultiStream.prototype._gotNextStream = function (stream) {
var self = this

if (!stream) {
self.push(null)
self.destroy()
return
this._forwarding = false
}

self._current = stream
self._forward()
destroy (err) {
if (this.destroyed) return
this.destroyed = true

stream.on('readable', onReadable)
stream.once('end', onEnd)
stream.once('close', onClose)
if (this._current && this._current.destroy) this._current.destroy()
if (typeof this._queue !== 'function') {
this._queue.forEach(stream => {
if (stream.destroy) stream.destroy()
})
}

function onReadable () {
self._forward()
if (err) this.emit('error', err)
this.emit('close')
}

function onClose () {
if (!stream._readableState.ended) {
self.destroy()
_next () {
this._current = null

if (typeof this._queue === 'function') {
this._queue((err, stream) => {
if (err) return this.destroy(err)
stream = this._toStreams2(stream)
this._attachErrorListener(stream)
this._gotNextStream(stream)
})
} else {
var stream = this._queue.shift()
if (typeof stream === 'function') {
stream = this._toStreams2(stream())
this._attachErrorListener(stream)
}
this._gotNextStream(stream)
}
}

function onEnd () {
self._current = null
stream.removeListener('readable', onReadable)
stream.removeListener('end', onEnd)
stream.removeListener('close', onClose)
self._next()
}
}
_gotNextStream (stream) {
if (!stream) {
this.push(null)
this.destroy()
return
}

MultiStream.prototype._attachErrorListener = function (stream) {
var self = this
if (!stream) return
this._current = stream
this._forward()

stream.once('error', onError)
const onReadable = () => {
this._forward()
}

function onError (err) {
stream.removeListener('error', onError)
self.destroy(err)
}
}
const onClose = () => {
if (!stream._readableState.ended) {
this.destroy()
}
}

function toStreams2Obj (s) {
return toStreams2(s, { objectMode: true, highWaterMark: 16 })
}
const onEnd = () => {
this._current = null
stream.removeListener('readable', onReadable)
stream.removeListener('end', onEnd)
stream.removeListener('close', onClose)
this._next()
}

function toStreams2Buf (s) {
return toStreams2(s)
}
stream.on('readable', onReadable)
stream.once('end', onEnd)
stream.once('close', onClose)
}

function toStreams2 (s, opts) {
if (!s || typeof s === 'function' || s._readableState) return s
_attachErrorListener (stream) {
if (!stream) return

var wrap = new stream.Readable(opts).wrap(s)
if (s.destroy) {
wrap.destroy = s.destroy.bind(s)
const onError = (err) => {
stream.removeListener('error', onError)
this.destroy(err)
}

stream.once('error', onError)
}
return wrap
}

MultiStream.obj = streams => (
new MultiStream(streams, { objectMode: true, highWaterMark: 16 })
)

module.exports = MultiStream
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"url": "https://github.com/feross/multistream/issues"
},
"dependencies": {
"inherits": "^2.0.1",
"readable-stream": "^3.4.0"
},
"devDependencies": {
Expand Down
14 changes: 7 additions & 7 deletions test/basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ test('combine streams', function (t) {
str('3')
]

var stream = MultiStream(streams)
var stream = new MultiStream(streams)
.on('error', function (err) {
t.fail(err)
})
Expand All @@ -30,7 +30,7 @@ test('combine streams (classic)', function (t) {
through()
]

var stream = MultiStream(streams)
var stream = new MultiStream(streams)
.on('error', function (err) {
t.fail(err)
})
Expand All @@ -57,7 +57,7 @@ test('lazy stream creation', function (t) {
}
]

var stream = MultiStream(streams)
var stream = new MultiStream(streams)
.on('error', function (err) {
t.fail(err)
})
Expand All @@ -79,7 +79,7 @@ test('lazy stream via factory', function (t) {
}, 0)
}

var stream = MultiStream(factory)
var stream = new MultiStream(factory)
.on('error', function (err) {
t.fail(err)
})
Expand All @@ -102,7 +102,7 @@ test('lazy stream via factory (factory returns error)', function (t) {
}, 0)
}

MultiStream(factory)
new MultiStream(factory)
.on('error', function (err) {
t.pass('got error', err)
})
Expand All @@ -125,7 +125,7 @@ test('lazy stream via factory (classic)', function (t) {
cb(null, s)
}

var stream = MultiStream(factory)
var stream = new MultiStream(factory)
.on('error', function (err) {
t.fail(err)
})
Expand All @@ -145,7 +145,7 @@ test('throw immediate error', function (t) {
through() // will emit 'error'
]

MultiStream(streams).on('error', function (err) {
new MultiStream(streams).on('error', function (err) {
t.ok(err instanceof Error, 'got expected error')
})

Expand Down

0 comments on commit 80088f9

Please sign in to comment.