Skip to content

Commit

Permalink
Merge pull request #2 from diasdavid/fix/half-closed
Browse files Browse the repository at this point in the history
test: test half-closed
  • Loading branch information
daviddias authored Feb 15, 2017
2 parents e4cba9d + 46b355b commit d091f6b
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 27 deletions.
100 changes: 73 additions & 27 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,32 @@ var Channel = function (name, plex, opts) {

this.once('end', function () {
this._read() // trigger drain

if (this.destroyed) return

ended = true
if (finished) this._finalize()
else if (!this.halfOpen) this.end()
if (finished) {
this._finalize()
} else if (!this.halfOpen) {
// this.end()
}
})

this.once('finish', function onfinish () {
if (this.destroyed) return
if (!this._opened) {
this.once('open', onfinish)
} else {
if (this._lazy && this.initiator) this._open()
this._multiplex._send(this.channel << 3 | (this.initiator ? 4 : 3), null)
if (this._lazy && this.initiator) {
this._open()
}
this._multiplex
._send(this.channel << 3 | (this.initiator ? 4 : 3), null)
finished = true
if (ended) this._finalize()

if (ended) {
this._finalize()
}
}
})
}
Expand Down Expand Up @@ -145,7 +156,9 @@ var Multiplex = function (opts, onchannel) {
this._channel = 0
this._missing = 0
this._message = null
this._buf = new Buffer(this.limit ? varint.encodingLength(this.limit) : 100)
this._buf = new Buffer(this.limit
? varint.encodingLength(this.limit)
: 100)
this._ptr = 0
this._awaitChannelDrains = 0
this._onwritedrain = null
Expand All @@ -159,9 +172,17 @@ inherits(Multiplex, stream.Duplex)

Multiplex.prototype.createStream = function (name, opts) {
if (this.destroyed) throw new Error('Multiplexer is destroyed')

var id = this._local.indexOf(null)

if (id === -1) id = this._local.push(null) - 1
var channel = new Channel(this._name(name || id.toString()), this, xtend(this._options, opts))

var channel = new Channel(
this._name(name || id.toString()),
this,
xtend(this._options, opts)
)

return this._addChannel(channel, id, this._local)
}

Expand Down Expand Up @@ -206,7 +227,9 @@ Multiplex.prototype._send = function (header, data) {
}

Multiplex.prototype._addChannel = function (channel, id, list) {
while (list.length <= id) list.push(null)
while (list.length <= id) {
list.push(null)
}
list[id] = channel
channel.on('finalize', function () {
list[id] = null
Expand All @@ -219,7 +242,9 @@ Multiplex.prototype._addChannel = function (channel, id, list) {

Multiplex.prototype._writeVarint = function (data, offset) {
for (offset; offset < data.length; offset++) {
if (this._ptr === this._buf.length) return this._lengthError(data)
if (this._ptr === this._buf.length) {
return this._lengthError(data)
}
this._buf[this._ptr++] = data[offset]
if (!(data[offset] & 0x80)) {
if (this._state === 0) {
Expand Down Expand Up @@ -288,7 +313,10 @@ Multiplex.prototype._push = function (data) {
if (this._type === 0) { // open
if (this.destroyed || this._finished) return

var name = this._binaryName ? data : (data.toString() || this._channel.toString())
var name = this._binaryName
? data
: (data.toString() || this._channel.toString())

var channel

if (this._receiving && this._receiving[name]) {
Expand All @@ -297,64 +325,82 @@ Multiplex.prototype._push = function (data) {
this._addChannel(channel, this._channel, this._list)
} else {
channel = new Channel(name, this, this._options)
this.emit('stream', this._addChannel(channel, this._channel, this._list), channel.name)
this.emit('stream', this._addChannel(
channel,
this._channel,
this._list), channel.name)
}
return
}

var stream = this._list[this._channel]
if (!stream) return
var stream = this._list[this._channel] ||
this._remote[this._channel] ||
this._local[this._channel]

if (!stream) { return }

switch (this._type) {
case 5: // local error
case 6: // remote error
stream._destroy(new Error(data.toString() || 'Channel destroyed'), false)
return
var error = new Error(data.toString() || 'Channel destroyed')
stream._destroy(error, false)
return

case 3: // local end
case 4: // remote end
stream.push(null)
return
stream.push(null)
return

case 1: // local packet
case 2: // remote packet
if (!stream.push(data)) {
this._awaitChannelDrains++
stream._awaitDrain++
}
return
if (!stream.push(data)) {
this._awaitChannelDrains++
stream._awaitDrain++
}
return
}
}

Multiplex.prototype._onchanneldrain = function (drained) {
this._awaitChannelDrains -= drained

if (this._awaitChannelDrains) return

var ondrain = this._onwritedrain
this._onwritedrain = null

if (ondrain) ondrain()
}

Multiplex.prototype._write = function (data, enc, cb) {
if (this._finished) return cb()

if (this._corked) return this._onuncork(this._write.bind(this, data, enc, cb))

if (data === SIGNAL_FLUSH) return this._finish(cb)

var offset = 0

while (offset < data.length) {
if (this._state === 2) offset = this._writeMessage(data, offset)
else offset = this._writeVarint(data, offset)
if (this._state === 2) {
offset = this._writeMessage(data, offset)
} else offset = this._writeVarint(data, offset)
}

if (this._state === 2 && !this._missing) this._push(empty)

if (this._awaitChannelDrains) this._onwritedrain = cb
else cb()
if (this._awaitChannelDrains) {
this._onwritedrain = cb
} else cb()
}

Multiplex.prototype._finish = function (cb) {
var self = this

this._onuncork(function () {
if (self._writableState.prefinished === false) self._writableState.prefinished = true
if (self._writableState.prefinished === false) {
self._writableState.prefinished = true
}
self.emit('prefinish')
self._onuncork(cb)
})
Expand Down
87 changes: 87 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,95 @@ test('if onstream is not passed, stream is emitted', function (t) {
stream.on('data', function (data) {
t.same(data, new Buffer('hello world'))
stream.end()
setTimeout(() => t.end(), 1000)
})
})

test('half close a muxed stream', function (t) {
var plex1 = multiplex()
var plex2 = multiplex()

plex1.pipe(plex2)
.pipe(plex1)

plex2.on('stream', function (stream, id) {
t.ok(stream, 'received stream')
t.ok(id, 'has id2')

// let it flow
stream.on('data', function () {})

stream.on('end', function () {
t.end()
})

stream.on('error', function (err) {
t.notOk(err)
})

stream.write(new Buffer('hello world'))

stream.end()
})

var stream = plex1.createStream()

stream.on('data', function (data) {
t.same(data, new Buffer('hello world'))
})

stream.on('error', function (err) {
t.notOk(err)
})

stream.on('end', function () {
stream.end()
})
})

test('half close a half closed muxed stream', function (t) {
var plex1 = multiplex()
var plex2 = multiplex()

plex1.nameTag = 'plex1:'
plex2.nameTag = 'plex2:'

plex1.pipe(plex2)
.pipe(plex1)

plex2.on('stream', function (stream, id) {
t.ok(stream, 'received stream')
t.ok(id, 'has id2')

stream.on('data', function (data) {
t.same(data, new Buffer('some data'))
})

stream.on('end', function () {
stream.write(new Buffer('hello world'))
stream.end()
})

stream.on('error', function (err) { t.notOk(err) })
})

var stream = plex1.createStream()

stream.on('data', function (data) {
t.same(data, new Buffer('hello world'))
})

stream.on('error', function (err) {
t.notOk(err)
})

stream.on('end', function () {
t.end()
})

stream.write(new Buffer('some data'))

stream.end()
})

test('underlying error is propagated to muxed streams', function (t) {
Expand Down

0 comments on commit d091f6b

Please sign in to comment.