-
Notifications
You must be signed in to change notification settings - Fork 37
feat(transport): use parallel limited dialer #195
Changes from 2 commits
b04e954
5ee81ed
dfc9078
76c20d6
634ab78
4f555c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -86,4 +86,4 @@ | |
"greenkeeper[bot] <greenkeeper[bot]@users.noreply.github.com>", | ||
"ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ <[email protected]>" | ||
] | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,13 +2,19 @@ | |
|
||
const Connection = require('interface-connection').Connection | ||
const parallel = require('async/parallel') | ||
const queue = require('async/queue') | ||
const once = require('once') | ||
const debug = require('debug') | ||
const log = debug('libp2p:swarm:transport') | ||
|
||
const protocolMuxer = require('./protocol-muxer') | ||
|
||
// number of concurrent outbound dials to make per peer, same as go-libp2p-swarm | ||
const defaultPerPeerRateLimit = 8 | ||
|
||
module.exports = function (swarm) { | ||
const queues = new Map() | ||
|
||
return { | ||
add (key, transport, options, callback) { | ||
if (typeof options === 'function') { | ||
|
@@ -36,32 +42,76 @@ module.exports = function (swarm) { | |
multiaddrs = [multiaddrs] | ||
} | ||
log('dialing %s', key, multiaddrs.map((m) => m.toString())) | ||
// a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that) | ||
// filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that) | ||
multiaddrs = dialables(t, multiaddrs) | ||
|
||
// b) if multiaddrs.length = 1, return the conn from the | ||
// transport, otherwise, create a passthrough | ||
if (multiaddrs.length === 1) { | ||
const conn = t.dial(multiaddrs.shift(), (err) => { | ||
if (err) return callback(err) | ||
callback(null, new Connection(conn)) | ||
}) | ||
return | ||
} | ||
// create dial queue if non exists | ||
let q | ||
if (queues.has(key)) { | ||
log('reusing queue') | ||
q = queues.get(key) | ||
} else { | ||
log('setting up new queue') | ||
q = queue((multiaddr, cb) => { | ||
const conn = t.dial(multiaddr, (err) => { | ||
if (err) { | ||
log('dial failed: %s', multiaddr.toString()) | ||
return cb(err) | ||
} | ||
if (q.canceled) { | ||
log('dial canceled: %s', multiaddr.toString()) | ||
// clean up already done dials | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is the conn being closed, if we already had dialed one? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hahahaha I forgot to write it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably should use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
return cb() | ||
} | ||
// one is enough | ||
log('dial success: %s', multiaddr.toString()) | ||
q.kill() | ||
q.canceled = true | ||
|
||
q.finish(null, conn) | ||
}) | ||
}, defaultPerPeerRateLimit) | ||
|
||
q.errors = [] | ||
q.finishCbs = [] | ||
|
||
// c) multiaddrs should already be a filtered list | ||
// specific for the transport we are using | ||
const proxyConn = new Connection() | ||
// handle finish | ||
q.finish = (err, conn) => { | ||
log('queue finish') | ||
queues.delete(key) | ||
|
||
next(multiaddrs.shift()) | ||
q.finishCbs.forEach((next) => { | ||
if (err) { | ||
next(err) | ||
} else { | ||
const proxyConn = new Connection() | ||
proxyConn.setInnerConn(conn) | ||
|
||
next(null, proxyConn) | ||
} | ||
}) | ||
} | ||
|
||
// TODO improve in the future to make all the dials in paralell | ||
function next (multiaddr) { | ||
const conn = t.dial(multiaddr, () => { | ||
proxyConn.setInnerConn(conn) | ||
callback(null, proxyConn) | ||
}) | ||
// collect errors | ||
q.error = (err) => { | ||
q.errors.push(err) | ||
} | ||
|
||
// no more addresses and all failed | ||
q.drain = () => { | ||
log('queue drain') | ||
const err = new Error('Could not dial any address') | ||
err.errors = q.errors | ||
q.errors = [] | ||
q.finish(err) | ||
} | ||
|
||
queues.set(key, q) | ||
} | ||
|
||
q.push(multiaddrs) | ||
q.finishCbs.push(callback) | ||
}, | ||
|
||
listen (key, options, handler, callback) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does dial return an error to its callback? It doesn't seem to for the websocket transport
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixing this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I am finding a bug where the onConnect callback is being called even when the websocket 'open' event isn't called. Will file an issue. EDIT - actually scratch that, the problem is the websocket transport isn't handling errors correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See libp2p/js-libp2p-websockets#60
After this PR, websockets will indeed return an error to its callback, so your code will work great 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I got everything working, except that we need to upgrade both libp2p-tcp and libpp2-websocket to actually provide an error if one happened on the callback, otherwise the detection is horrific
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No there wasn't a pull-ws issue after all, just a libp2p-websockets error handling issue
and I've issued a PR to libp2p-websockets, see my comment above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well it is both I am afraid, I actually can't detect the error in libp2p-websockets easily as the connect callback is called even though an error occured :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think onConnect is meant to be called with errors as the first argument. Why is that difficult to check for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened then closed an issue on this as I realised that they sent errors to onConnect in several places. pull-stream/pull-ws#18
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just merged you pr, thank you it works great with that :)