Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Commit

Permalink
feat(transport): use parallel limited dialer (#195)
Browse files Browse the repository at this point in the history
* feat(transport): use parallel limited dialer
  • Loading branch information
dignifiedquire authored and dryajov committed Mar 29, 2017
1 parent 2cec4ce commit 2abd8d8
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 32 deletions.
16 changes: 8 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,36 +40,36 @@
"npm": ">=3.0.0"
},
"devDependencies": {
"aegir": "^11.0.0",
"aegir": "^11.0.1",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"dirty-chai": "^1.2.2",
"gulp": "^3.9.1",
"libp2p-multiplex": "~0.4.3",
"libp2p-secio": "~0.6.8",
"libp2p-spdy": "~0.10.6",
"libp2p-tcp": "~0.9.4",
"libp2p-tcp": "~0.10.0",
"libp2p-webrtc-star": "~0.8.10",
"libp2p-websockets": "~0.9.4",
"libp2p-websockets": "~0.10.0",
"pre-commit": "^1.2.2",
"pull-goodbye": "0.0.1",
"pull-stream": "^3.5.0",
"webrtcsupport": "^2.2.0"
},
"dependencies": {
"async": "^2.1.5",
"async": "^2.2.0",
"browserify-zlib-next": "^1.0.1",
"debug": "^2.6.3",
"interface-connection": "~0.3.2",
"ip-address": "^5.8.6",
"libp2p-identify": "~0.3.3",
"libp2p-circuit": "~0.0.1",
"lodash.includes": "^4.3.0",
"multiaddr": "^2.2.2",
"multiaddr": "^2.2.3",
"multistream-select": "~0.13.5",
"once": "^1.4.0",
"peer-id": "~0.8.4",
"peer-info": "~0.8.4",
"peer-id": "~0.8.5",
"peer-info": "~0.8.5",
"protocol-buffers": "^3.2.1"
},
"contributors": [
Expand All @@ -87,4 +87,4 @@
"greenkeeper[bot] <greenkeeper[bot]@users.noreply.github.com>",
"ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ <[email protected]>"
]
}
}
110 changes: 86 additions & 24 deletions src/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,23 @@

const Connection = require('interface-connection').Connection
const parallel = require('async/parallel')
const queue = require('async/queue')
const timeout = require('async/timeout')
const once = require('once')
const debug = require('debug')
const log = debug('libp2p:swarm:transport')

const protocolMuxer = require('./protocol-muxer')

// number of concurrent outbound dials to make per peer, same as go-libp2p-swarm
const defaultPerPeerRateLimit = 8

// the amount of time a single dial has to succeed
const dialTimeout = 10 * 1000

module.exports = function (swarm) {
const queues = new Map()

return {
add (key, transport, options, callback) {
if (typeof options === 'function') {
Expand Down Expand Up @@ -36,37 +46,80 @@ module.exports = function (swarm) {
multiaddrs = [multiaddrs]
}
log('dialing %s', key, multiaddrs.map((m) => m.toString()))
// a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that)
// filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that)
multiaddrs = dialables(t, multiaddrs)

// b) if multiaddrs.length = 1, return the conn from the
// transport, otherwise, create a passthrough
if (multiaddrs.length === 1) {
const conn = t.dial(multiaddrs.shift(), (err) => {
if (err) return callback(err)
callback(null, new Connection(conn))
})
return
}
// create dial queue if non exists
let q
if (queues.has(key)) {
log('reusing queue')
q = queues.get(key)
} else {
log('setting up new queue')
q = queue((multiaddr, cb) => {
dialWithTimeout(t, multiaddr, dialTimeout, (err, conn) => {
if (err) {
log('dial err', err)
return cb(err)
}

// c) multiaddrs should already be a filtered list
// specific for the transport we are using
const proxyConn = new Connection()
if (q.canceled) {
log('dial canceled: %s', multiaddr.toString())
// clean up already done dials
if (conn) {
conn.close()
}
return cb()
}

// one is enough
log('dial success: %s', multiaddr.toString())
q.kill()
q.canceled = true

q.finish(null, conn)
})
}, defaultPerPeerRateLimit)

next(multiaddrs.shift())
q.errors = []
q.finishCbs = []

// TODO improve in the future to make all the dials in paralell
function next (multiaddr) {
const conn = t.dial(multiaddr, (err) => {
if (err) {
log(err)
return next(multiaddrs.shift())
}
// handle finish
q.finish = (err, conn) => {
log('queue finish')
queues.delete(key)

proxyConn.setInnerConn(conn)
callback(null, proxyConn)
})
q.finishCbs.forEach((next) => {
if (err) {
return next(err)
}

const proxyConn = new Connection()
proxyConn.setInnerConn(conn)

next(null, proxyConn)
})
}

// collect errors
q.error = (err) => {
q.errors.push(err)
}

// no more addresses and all failed
q.drain = () => {
log('queue drain')
const err = new Error('Could not dial any address')
err.errors = q.errors
q.errors = []
q.finish(err)
}

queues.set(key, q)
}

q.push(multiaddrs)
q.finishCbs.push(callback)
},

listen (key, options, handler, callback) {
Expand Down Expand Up @@ -139,4 +192,13 @@ function dialables (tp, multiaddrs) {
return tp.filter(multiaddrs)
}

function dialWithTimeout (transport, multiaddr, maxTimeout, callback) {
timeout((cb) => {
const conn = transport.dial(multiaddr, (err) => {
log('dialed')
cb(err, conn)
})
}, maxTimeout)(callback)
}

function noop () {}
15 changes: 15 additions & 0 deletions test/01-transport-tcp.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ describe('transport - tcp', function () {
it('dial to set of multiaddr, only one is available', (done) => {
const conn = swarmA.transport.dial('tcp', [
multiaddr('/ip4/127.0.0.1/tcp/9910/ws'), // not valid on purpose
multiaddr('/ip4/127.0.0.1/tcp/9359'),
multiaddr('/ip4/127.0.0.1/tcp/9329'),
multiaddr('/ip4/127.0.0.1/tcp/9910'),
multiaddr('/ip4/127.0.0.1/tcp/9999'),
multiaddr('/ip4/127.0.0.1/tcp/9309')
Expand All @@ -114,6 +116,19 @@ describe('transport - tcp', function () {
)
})

it('dial to set of multiaddr, none is available', (done) => {
swarmA.transport.dial('tcp', [
multiaddr('/ip4/127.0.0.1/tcp/9910/ws'), // not valid on purpose
multiaddr('/ip4/127.0.0.1/tcp/9359'),
multiaddr('/ip4/127.0.0.1/tcp/9329')
], (err, conn) => {
expect(err).to.exist()
expect(err.errors).to.have.length(2)
expect(conn).to.not.exist()
done()
})
})

it('close', (done) => {
parallel([
(cb) => swarmA.transport.close('tcp', cb),
Expand Down
12 changes: 12 additions & 0 deletions test/03-transport-websockets.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ describe('transport - websockets', function () {
})
})

it('dial to set of multiaddr, none is available', (done) => {
swarmA.transport.dial('ws', [
multiaddr('/ip4/127.0.0.1/tcp/9320/ws'),
multiaddr('/ip4/127.0.0.1/tcp/9359/ws')
], (err, conn) => {
expect(err).to.exist()
expect(err.errors).to.have.length(2)
expect(conn).to.not.exist()
done()
})
})

it('close', (done) => {
parallel([
(cb) => swarmA.transport.close('ws', cb),
Expand Down

0 comments on commit 2abd8d8

Please sign in to comment.