From 9953f95fa29d867a97973173c17fc8948bd07b5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Kr=C3=BCger?= Date: Sun, 17 Jun 2018 17:12:42 +0200 Subject: [PATCH] feat: fix stream closing behaviour --- src/listener.js | 63 +++++++++++++++++++++++++++++++++++++++-------- test/dial.spec.js | 17 +++++++++++++ 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/src/listener.js b/src/listener.js index a84449e..679187b 100644 --- a/src/listener.js +++ b/src/listener.js @@ -14,6 +14,7 @@ const setImmediate = require('async/setImmediate') const utils = require('./utils') const cleanUrlSIO = utils.cleanUrlSIO const crypto = require('libp2p-crypto') +const pull = require('pull-stream') const noop = once(() => {}) @@ -38,6 +39,8 @@ class Listener extends EE { this._handler = options.handler || noop this.listeners_list = options.listeners || {} this.flag = options.flag + this.conns = [] + this.connected = false } // "private" functions @@ -209,6 +212,11 @@ class Listener extends EE { this.listeners_list[this.server] = this callback = callback ? once(callback) : noop + if (this.connected) { // listener was .close()'d yet not all conns disconnected. we're still connected, so don't do anything + this.closing = false + return setImmediate(() => callback()) + } + series([ (cb) => this._up(cb), (cb) => this._crypto(cb) @@ -220,7 +228,10 @@ class Listener extends EE { this.emit('error', err) this.emit('close') return callback(err) - } else this.log('success') + } + + this.log('success') + this.connected = true this.io.on('reconnect', () => { // force to get a new signature @@ -247,14 +258,52 @@ class Listener extends EE { setImmediate(() => callback(null, this.ma ? [this.ma] : [])) } + get activeConnections () { + this.conns = this.conns.filter(c => c.sink || c.source) + return Boolean(this.conns.length) + } + + maybeClose () { + if (!this.activeConnections && this.closing) { + this.connected = false + this.closing = false + this.log('no more connections and listener is offline - closing') + this._down() + } + } + close (callback) { callback = callback ? once(callback) : noop - this._down() + this.closing = true // will close once the last connection quits + this.maybeClose() callback() } + stateWatch (sink, source) { + let cstate = {sink: true, source: true} + const watch = (name) => pull.through(v => v, e => { + cstate[name] = false + if (!cstate.sink && !cstate.source) { + this.maybeClose() + } + }) + + this.conns.push(cstate) + + return { + sink: pull( + watch('sink'), + sink + ), + source: pull( + source, + watch('source') + ) + } + } + // called from transport /** * Dials a peer @@ -293,14 +342,8 @@ class Listener extends EE { if (err) return callback(err instanceof Error ? err : new Error(err)) dlog(err ? 'error: ' + err.toString() : 'success') const source = io.createSource(dialId + '.listener') - conn.setInnerConn( - { - sink: sink, - source: source - }, { - getObservedAddrs: (cb) => cb(null, [_ma]) - } - ) + + conn.setInnerConn(this.stateWatch(sink, source), { getObservedAddrs: (cb) => cb(null, [_ma]) }) callback(null, conn) }) diff --git a/test/dial.spec.js b/test/dial.spec.js index 4adb684..1f86fa4 100644 --- a/test/dial.spec.js +++ b/test/dial.spec.js @@ -96,6 +96,23 @@ describe('dial', () => { }) }) + it('dial on IPv4, close listener, prevent end, re-start listener', (done) => { + ws1.dial(ma2, (err, conn) => { + expect(err).to.not.exist() + + pull( + (end, cb) => {}, + conn, + pull.drain(() => { + expect('Stream should never end').to.not.exist() + }) + ) + + listeners[0].close(() => {}) + listeners[0].listen(ma1, done) + }) + }) + it('dial offline / non-exist()ent node on IPv4, check callback', (done) => { const maOffline = multiaddr('/ip4/127.0.0.1/tcp/40404/ws/p2p-websocket-star/ipfs/ABCD')