Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Commit

Permalink
first passing secio tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Aug 9, 2016
1 parent d1e337e commit 5ab7db2
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 72 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
"peer-id": "^0.7.0",
"peer-info": "^0.7.0",
"protocol-buffers": "^3.1.6",
"pull-stream": "^3.4.3",
"run-parallel": "^1.1.6"
},
"contributors": [
Expand Down
23 changes: 15 additions & 8 deletions src/connection.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
'use strict'

const protocolMuxer = require('./protocol-muxer')
const identify = require('libp2p-identify')
const multistream = require('multistream-select')
const pull = require('pull-stream')

const protocolMuxer = require('./protocol-muxer')

module.exports = function connection (swarm) {
return {
Expand All @@ -14,7 +16,7 @@ module.exports = function connection (swarm) {

// for listening
swarm.handle(muxer.multicodec, (conn) => {
const muxedConn = muxer(conn, true)
const muxedConn = muxer.listen(conn)

muxedConn.on('stream', (conn) => {
protocolMuxer(swarm.protocols, conn)
Expand All @@ -35,7 +37,7 @@ module.exports = function connection (swarm) {
ms.select(identify.multicodec, (err, conn) => {
if (err) { return cb(err) }

identify.exec(conn, (err, peerInfo, observedAddrs) => {
identify.listen(conn, (err, peerInfo, observedAddrs) => {
if (err) { return cb(err) }

observedAddrs.forEach((oa) => {
Expand All @@ -57,18 +59,23 @@ module.exports = function connection (swarm) {
}

swarm.emit('peer-mux-established', peerInfo)
muxedConn.on('close', () => {
delete swarm.muxedConns[peerInfo.id.toB58String()]
swarm.emit('peer-mux-closed', peerInfo)
})
pull(
muxedConn,
pull.onEnd(() => {
delete swarm.muxedConns[peerInfo.id.toB58String()]
swarm.emit('peer-mux-closed', peerInfo)
})
)
})
}
})
},

reuse () {
swarm.identify = true
swarm.handle(identify.multicodec, identify.handler(swarm._peerInfo))
swarm.handle(identify.multicodec, (conn) => {
identify.dial(conn, swarm._peerInfo)
})
}
}
}
11 changes: 8 additions & 3 deletions src/dial.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

const multistream = require('multistream-select')
const Connection = require('interface-connection').Connection
const debug = require('debug')
const log = debug('libp2p:swarm:dial')

const protocolMuxer = require('./protocol-muxer')
const secio = require('./secio')
Expand All @@ -21,6 +23,7 @@ module.exports = function dial (swarm) {
const proxyConn = new Connection()

const b58Id = pi.id.toB58String()
log('dialing %s', b58Id)

if (!swarm.muxedConns[b58Id]) {
if (!swarm.conns[b58Id]) {
Expand All @@ -45,8 +48,10 @@ module.exports = function dial (swarm) {
return proxyConn

function gotWarmedUpConn (conn) {
if (!conn.setPeerInfo) {
conn = new Connection(conn)
}
conn.setPeerInfo(pi)

attemptMuxerUpgrade(conn, (err, muxer) => {
if (!protocol) {
if (err) {
Expand Down Expand Up @@ -142,6 +147,7 @@ module.exports = function dial (swarm) {
if (err) {
return callback(new Error('multistream not supported'))
}
log('selecting %s', key)
ms.select(key, (err, conn) => {
if (err) {
if (muxers.length === 0) {
Expand All @@ -152,7 +158,7 @@ module.exports = function dial (swarm) {
return
}

const muxedConn = swarm.muxers[key](conn, false)
const muxedConn = swarm.muxers[key].dial(conn)
swarm.muxedConns[b58Id] = {}
swarm.muxedConns[b58Id].muxer = muxedConn
// should not be needed anymore - swarm.muxedConns[b58Id].conn = conn
Expand All @@ -161,7 +167,6 @@ module.exports = function dial (swarm) {

muxedConn.once('close', () => {
delete swarm.muxedConns[pi.id.toB58String()]
conn.end()
swarm.emit('peer-mux-closed', pi)
})

Expand Down
2 changes: 1 addition & 1 deletion src/secio.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ exports = module.exports

exports.create = (local, insecure) => {
const session = new SecureSession(local, local.privKey, insecure)
return session.secureStream()
return session.secure
}
64 changes: 18 additions & 46 deletions src/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

const Connection = require('interface-connection').Connection
const parallel = require('run-parallel')
const pull = require('pull-stream')
const debug = require('debug')
const log = debug('libp2p:swarm:transport')

const protocolMuxer = require('./protocol-muxer')

Expand All @@ -14,7 +17,7 @@ module.exports = function (swarm) {
}

if (!callback) { callback = noop }

log('adding %s', key)
if (swarm.transports[key]) {
throw new Error('There is already a transport with this key')
}
Expand All @@ -32,26 +35,16 @@ module.exports = function (swarm) {
if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
}

log('dialing %s', key, multiaddrs.map((m) => m.toString()))
// a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that)
multiaddrs = dialables(t, multiaddrs)

// b) if multiaddrs.length = 1, return the conn from the
// transport, otherwise, create a passthrough
if (multiaddrs.length === 1) {
const conn = t.dial(multiaddrs.shift())

conn.once('error', connectError)

conn.once('connect', () => {
conn.removeListener('error', connectError)
callback(null, conn)
})

return conn
}
function connectError () {
callback(new Error('failed to connect to every multiaddr'))
callback(null, new Connection(conn))
return
}

// c) multiaddrs should already be a filtered list
Expand All @@ -60,23 +53,9 @@ module.exports = function (swarm) {

next(multiaddrs.shift())

return proxyConn

// TODO improve in the future to make all the dials in paralell
function next (multiaddr) {
const conn = t.dial(multiaddr)

conn.once('error', connectError)

function connectError () {
if (multiaddrs.length === 0) {
return callback(new Error('failed to connect to every multiaddr'))
}
next(multiaddrs.shift())
}

conn.once('connect', () => {
conn.removeListener('error', connectError)
const conn = t.dial(multiaddr, () => {
proxyConn.setInnerConn(conn)
callback(null, proxyConn)
})
Expand All @@ -102,15 +81,14 @@ module.exports = function (swarm) {
const createListeners = multiaddrs.map((ma) => {
return (cb) => {
const listener = transport.createListener(handler)
listener.listen(ma, () => {
listener.getAddrs((err, addrs) => {
if (err) {
return cb(err)
}
freshMultiaddrs = freshMultiaddrs.concat(addrs)
transport.listeners.push(listener)
cb()
})
listener.listen(ma)
listener.getAddrs((err, addrs) => {
if (err) {
return cb(err)
}
freshMultiaddrs = freshMultiaddrs.concat(addrs)
transport.listeners.push(listener)
cb()
})
}
})
Expand Down Expand Up @@ -139,13 +117,7 @@ module.exports = function (swarm) {
}

function dialables (tp, multiaddrs) {
return tp.filter(multiaddrs.map((addr) => {
// webrtc-star needs the /ipfs/QmHash
if (addr.toString().indexOf('webrtc-star') > 0) {
return addr
}

return addr
}))
return tp.filter(multiaddrs)
}

function noop () {}
22 changes: 8 additions & 14 deletions test/06-conn-upgrade-secio.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ const multiaddr = require('multiaddr')
const Peer = require('peer-info')
const TCP = require('libp2p-tcp')
const multiplex = require('libp2p-spdy')
const pull = require('pull-stream')

const Swarm = require('../src')

describe.skip('secio conn upgrade (on TCP)', function () {
describe.only('secio conn upgrade (on TCP)', function () {
this.timeout(60 * 1000)

var swarmA
Expand Down Expand Up @@ -73,20 +74,17 @@ describe.skip('secio conn upgrade (on TCP)', function () {

it('handle + dial on protocol', (done) => {
swarmB.handle('/abacaxi/1.0.0', (conn) => {
conn.pipe(conn)
pull(conn, conn)
})

swarmA.dial(peerB, '/abacaxi/1.0.0', (err, conn) => {
expect(err).to.not.exist
expect(Object.keys(swarmA.muxedConns).length).to.equal(1)
conn.end()

conn.on('data', () => {}) // let it flow.. let it flooooow
conn.on('end', done)
pull(pull.empty(), conn, pull.onEnd(done))
})
})

it.skip('dial to warm conn', (done) => {
it('dial to warm conn', (done) => {
swarmB.dial(peerA, (err) => {
expect(err).to.not.exist
expect(Object.keys(swarmB.conns).length).to.equal(0)
Expand All @@ -95,20 +93,16 @@ describe.skip('secio conn upgrade (on TCP)', function () {
})
})

it.skip('dial on protocol, reuse warmed conn', (done) => {
it('dial on protocol, reuse warmed conn', (done) => {
swarmA.handle('/papaia/1.0.0', (conn) => {
conn.pipe(conn)
conn.on('error', (err) => { throw err })
pull(conn, conn)
})

swarmB.dial(peerA, '/papaia/1.0.0', (err, conn) => {
expect(err).to.not.exist
expect(Object.keys(swarmB.conns).length).to.equal(0)
expect(Object.keys(swarmB.muxedConns).length).to.equal(1)
conn.end()
conn.on('error', (err) => { throw err })
conn.on('data', () => {}) // let it flow.. let it flooooow
conn.on('end', done)
pull(pull.empty(), conn, pull.onEnd(done))
})
})

Expand Down

0 comments on commit 5ab7db2

Please sign in to comment.