diff --git a/index.js b/index.js index eeee2dc..0c80ce4 100644 --- a/index.js +++ b/index.js @@ -34,10 +34,15 @@ var Channel = function (name, plex, opts) { this.once('end', function () { this._read() // trigger drain + if (this.destroyed) return + ended = true - if (finished) this._finalize() - else if (!this.halfOpen) this.end() + if (finished) { + this._finalize() + } else if (!this.halfOpen) { + // this.end() + } }) this.once('finish', function onfinish () { @@ -45,10 +50,16 @@ var Channel = function (name, plex, opts) { if (!this._opened) { this.once('open', onfinish) } else { - if (this._lazy && this.initiator) this._open() - this._multiplex._send(this.channel << 3 | (this.initiator ? 4 : 3), null) + if (this._lazy && this.initiator) { + this._open() + } + this._multiplex + ._send(this.channel << 3 | (this.initiator ? 4 : 3), null) finished = true - if (ended) this._finalize() + + if (ended) { + this._finalize() + } } }) } @@ -145,7 +156,9 @@ var Multiplex = function (opts, onchannel) { this._channel = 0 this._missing = 0 this._message = null - this._buf = new Buffer(this.limit ? varint.encodingLength(this.limit) : 100) + this._buf = new Buffer(this.limit + ? varint.encodingLength(this.limit) + : 100) this._ptr = 0 this._awaitChannelDrains = 0 this._onwritedrain = null @@ -159,9 +172,17 @@ inherits(Multiplex, stream.Duplex) Multiplex.prototype.createStream = function (name, opts) { if (this.destroyed) throw new Error('Multiplexer is destroyed') + var id = this._local.indexOf(null) + if (id === -1) id = this._local.push(null) - 1 - var channel = new Channel(this._name(name || id.toString()), this, xtend(this._options, opts)) + + var channel = new Channel( + this._name(name || id.toString()), + this, + xtend(this._options, opts) + ) + return this._addChannel(channel, id, this._local) } @@ -206,7 +227,9 @@ Multiplex.prototype._send = function (header, data) { } Multiplex.prototype._addChannel = function (channel, id, list) { - while (list.length <= id) list.push(null) + while (list.length <= id) { + list.push(null) + } list[id] = channel channel.on('finalize', function () { list[id] = null @@ -219,7 +242,9 @@ Multiplex.prototype._addChannel = function (channel, id, list) { Multiplex.prototype._writeVarint = function (data, offset) { for (offset; offset < data.length; offset++) { - if (this._ptr === this._buf.length) return this._lengthError(data) + if (this._ptr === this._buf.length) { + return this._lengthError(data) + } this._buf[this._ptr++] = data[offset] if (!(data[offset] & 0x80)) { if (this._state === 0) { @@ -288,7 +313,10 @@ Multiplex.prototype._push = function (data) { if (this._type === 0) { // open if (this.destroyed || this._finished) return - var name = this._binaryName ? data : (data.toString() || this._channel.toString()) + var name = this._binaryName + ? data + : (data.toString() || this._channel.toString()) + var channel if (this._receiving && this._receiving[name]) { @@ -297,64 +325,82 @@ Multiplex.prototype._push = function (data) { this._addChannel(channel, this._channel, this._list) } else { channel = new Channel(name, this, this._options) - this.emit('stream', this._addChannel(channel, this._channel, this._list), channel.name) + this.emit('stream', this._addChannel( + channel, + this._channel, + this._list), channel.name) } return } - var stream = this._list[this._channel] - if (!stream) return + var stream = this._list[this._channel] || + this._remote[this._channel] || + this._local[this._channel] + + if (!stream) { return } switch (this._type) { case 5: // local error case 6: // remote error - stream._destroy(new Error(data.toString() || 'Channel destroyed'), false) - return + var error = new Error(data.toString() || 'Channel destroyed') + stream._destroy(error, false) + return case 3: // local end case 4: // remote end - stream.push(null) - return + stream.push(null) + return case 1: // local packet case 2: // remote packet - if (!stream.push(data)) { - this._awaitChannelDrains++ - stream._awaitDrain++ - } - return + if (!stream.push(data)) { + this._awaitChannelDrains++ + stream._awaitDrain++ + } + return } } Multiplex.prototype._onchanneldrain = function (drained) { this._awaitChannelDrains -= drained + if (this._awaitChannelDrains) return + var ondrain = this._onwritedrain this._onwritedrain = null + if (ondrain) ondrain() } Multiplex.prototype._write = function (data, enc, cb) { if (this._finished) return cb() + if (this._corked) return this._onuncork(this._write.bind(this, data, enc, cb)) + if (data === SIGNAL_FLUSH) return this._finish(cb) var offset = 0 while (offset < data.length) { - if (this._state === 2) offset = this._writeMessage(data, offset) - else offset = this._writeVarint(data, offset) + if (this._state === 2) { + offset = this._writeMessage(data, offset) + } else offset = this._writeVarint(data, offset) } + if (this._state === 2 && !this._missing) this._push(empty) - if (this._awaitChannelDrains) this._onwritedrain = cb - else cb() + if (this._awaitChannelDrains) { + this._onwritedrain = cb + } else cb() } Multiplex.prototype._finish = function (cb) { var self = this + this._onuncork(function () { - if (self._writableState.prefinished === false) self._writableState.prefinished = true + if (self._writableState.prefinished === false) { + self._writableState.prefinished = true + } self.emit('prefinish') self._onuncork(cb) }) diff --git a/test.js b/test.js index f222550..b4b85a2 100644 --- a/test.js +++ b/test.js @@ -288,8 +288,95 @@ test('if onstream is not passed, stream is emitted', function (t) { stream.on('data', function (data) { t.same(data, new Buffer('hello world')) stream.end() + setTimeout(() => t.end(), 1000) + }) +}) + +test('half close a muxed stream', function (t) { + var plex1 = multiplex() + var plex2 = multiplex() + + plex1.pipe(plex2) + .pipe(plex1) + + plex2.on('stream', function (stream, id) { + t.ok(stream, 'received stream') + t.ok(id, 'has id2') + + // let it flow + stream.on('data', function () {}) + + stream.on('end', function () { + t.end() + }) + + stream.on('error', function (err) { + t.notOk(err) + }) + + stream.write(new Buffer('hello world')) + + stream.end() + }) + + var stream = plex1.createStream() + + stream.on('data', function (data) { + t.same(data, new Buffer('hello world')) + }) + + stream.on('error', function (err) { + t.notOk(err) + }) + + stream.on('end', function () { + stream.end() + }) +}) + +test('half close a half closed muxed stream', function (t) { + var plex1 = multiplex() + var plex2 = multiplex() + + plex1.nameTag = 'plex1:' + plex2.nameTag = 'plex2:' + + plex1.pipe(plex2) + .pipe(plex1) + + plex2.on('stream', function (stream, id) { + t.ok(stream, 'received stream') + t.ok(id, 'has id2') + + stream.on('data', function (data) { + t.same(data, new Buffer('some data')) + }) + + stream.on('end', function () { + stream.write(new Buffer('hello world')) + stream.end() + }) + + stream.on('error', function (err) { t.notOk(err) }) + }) + + var stream = plex1.createStream() + + stream.on('data', function (data) { + t.same(data, new Buffer('hello world')) + }) + + stream.on('error', function (err) { + t.notOk(err) + }) + + stream.on('end', function () { t.end() }) + + stream.write(new Buffer('some data')) + + stream.end() }) test('underlying error is propagated to muxed streams', function (t) {