Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve and Cleanup #3

Open
wants to merge 47 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
2244bb9
WIP
victorb May 17, 2018
463a3ac
Expire peers +_protect namespace if already exists when creating
victorb May 17, 2018
2a93a8b
Store to handle global namespace
victorb May 17, 2018
8573cff
WIP
victorb May 19, 2018
935f925
Make the store tests pass
victorb May 21, 2018
2556b06
feat: New state-based system
mkg20001 May 19, 2018
b5f2197
Fixes for state system
mkg20001 May 19, 2018
be3ab16
Fix a race cond in dial
mkg20001 May 19, 2018
56a6549
Add discover function
mkg20001 May 19, 2018
840a3a6
misc: lint
mkg20001 May 19, 2018
6fe96e1
test: Fix browser tests
mkg20001 May 19, 2018
6e4df35
chore: upgrade deps
mkg20001 Jun 8, 2018
050c78a
Add clearEmptyNamespaces function to store
mkg20001 Jun 8, 2018
8815ae0
Split utils in store
mkg20001 Jun 8, 2018
ddbe9aa
Begin rewriting server
mkg20001 Jun 8, 2018
8540852
Add missing handlers for discover/unregister
mkg20001 Jun 8, 2018
5325c92
Tweaks
mkg20001 Jun 8, 2018
55eae94
Finish register
mkg20001 Jun 8, 2018
dc5d317
Tweaks
mkg20001 Jun 8, 2018
baa64b2
Rm duplicate code
mkg20001 Jun 8, 2018
152d91a
Finish discover on server
mkg20001 Jun 9, 2018
f7ea0e0
WIP client rewrite
mkg20001 Jun 9, 2018
d97dca5
WIP
mkg20001 Jun 9, 2018
aa0ca21
fix
mkg20001 Jun 9, 2018
118c6cd
Client sync function
mkg20001 Jun 9, 2018
1d3f6d5
Clean
mkg20001 Jun 9, 2018
c3586f3
Work on tests
mkg20001 Jun 9, 2018
6c5b371
Clean
mkg20001 Jun 9, 2018
9c4a7f4
More work on tests
mkg20001 Jun 9, 2018
ca335b2
Rename files
mkg20001 Jun 9, 2018
8bf0ca8
Work on tests
mkg20001 Jun 9, 2018
0c3b76b
More work on tests
mkg20001 Jun 9, 2018
e14743a
fix
mkg20001 Jun 9, 2018
b754102
WIP
mkg20001 Jun 10, 2018
91e2e9a
WIP tests
mkg20001 Jun 11, 2018
d05c234
More fix
mkg20001 Jun 11, 2018
056db10
Fix tests
mkg20001 Jun 11, 2018
4981c54
Cleanup
mkg20001 Jun 11, 2018
ed735ed
Rm yarn.lock
mkg20001 Jun 11, 2018
f78da18
More cleanup
mkg20001 Jun 11, 2018
9482c06
Cleanup
mkg20001 Jun 11, 2018
0c23e11
More fixes - Libp2p discover interface
mkg20001 Jun 11, 2018
055b0f0
Better cookie
mkg20001 Jun 11, 2018
bd70e66
Sync on disconnect
mkg20001 Jun 11, 2018
5590e63
Tiny fix
mkg20001 Jun 11, 2018
9985dbb
Enable circuit - Dial test
mkg20001 Jun 12, 2018
10f6880
Merge branch 'master' into feat/improve-and-cleanup
jacobheun Feb 1, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict'

const Utils = require('./test/utils.peer')

let Server

async function pre (done) {
Server = await Utils.createServer(require('./test/server.id.json'))
done()
}

function post (done) {
Server.stop()
Server.swarm.stop(done)
}

module.exports = {
hooks: {
pre,
post
}
}
22 changes: 16 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
"version": "0.0.0",
"description": "A javascript implementation of the rendezvous protocol for libp2p",
"leadMaintainer": "Vasco Santos <[email protected]>",
"main": "index.js",
"main": "src/index.js",
"scripts": {
"test": "aegir test"
"test": "aegir test",
"test:node": "aegir test -t node",
"test:browser": "aegir test -t browser",
"test:webworker": "aegir test -t webworker"
},
"browser": {
"test/utils.js": "./test/utils.browser.js"
},
"keywords": [
"libp2p",
Expand All @@ -18,16 +24,20 @@
"dependencies": {
"chai": "^4.1.2",
"dirty-chai": "^2.0.1",
"immutable": "4.0.0-rc.9",
"promisify-es6": "^1.0.3",
"protons": "^1.0.1",
"pull-protocol-buffers": "^0.1.2"
"pull-protocol-buffers": "^0.1.2",
"pull-through": "^1.0.18"
},
"devDependencies": {
"aegir": "^13.1.0",
"libp2p": "^0.20.2",
"aegir": "^14.0.0",
"libp2p": "^0.20.4",
"libp2p-mplex": "^0.7.0",
"libp2p-secio": "^0.10.0",
"libp2p-spdy": "^0.12.1",
"libp2p-tcp": "^0.12.0"
"libp2p-tcp": "^0.12.0",
"libp2p-websockets": "^0.12.0"
},
"repository": {
"type": "git",
Expand Down
204 changes: 204 additions & 0 deletions src/client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
'use strict'

const Sync = require('./sync')
const RPC = require('./rpc')
const pull = require('pull-stream')

const debug = require('debug')
const log = debug('libp2p:rendezvous:client')
const {parallel, map} = require('async')

class Client {
constructor (swarm) {
this.swarm = swarm
this.store = Sync.create()
this._dialLock = {}
this._failedCache = {}
}

dial (peer) {
const id = peer.id.toB58String()
this.sync()

// check if we need to dial
if (this._failedCache[id]) return log('not dialing %s because dial previously failed', id)
if (this._dialLock[id]) return log('not dialing %s because dial is already in progress', id)
if (Sync.getPoint(this.store, id)) return log('not dialing %s because peer is already connected', id)

this._dialLock[id] = true // prevent race
log('dialing %s', id)

const cb = (err) => {
delete this._dialLock[id]

if (err) {
log('dialing %s failed: %s', err)
this._failedCache[id] = true
return
}

log('dialing %s succeeded', id)
this.sync()
}

// do the actual dialing
this.swarm.dialProtocol(peer, '/p2p/rendezvous/1.0.0', (err, conn) => {
if (err) return cb(err)

conn.getPeerInfo((err, pi) => {
if (err) return cb(err)

let rpc = RPC(pi, this)

pull(
conn,
rpc,
conn
)

this.store = Sync.addPoint(this.store, id, rpc.rpc)
cb()
})
})
}

sync () {
if (this._syncLock) {
this._needResync = true
return
}
this._syncLock = true
log('syncing')
this.store = Sync.clearPoints(this.store)
let actions = [] // async rpc calls

// adds register / unregsiter actions to "actions" array
/*
pseudo-code:

for all store.points as point:
for all point.registrations as pReg:
if store.registrations does not contain pReg:
actions push "unregister pReg @ point"
delete point.registrations[pReg]
for all store.registrations as reg:
if point.registrations does not contain reg:
actions push "register reg @ point"
set point.registrations[reg]
*/

let points = this.store.get('points')
let registrations = this.store.get('registrations')

this.store = this.store.set('points', points.reduce((points, point, id) => {
let regs = point.get('registrations')

regs = regs.reduce((regs, pReg, pRegId) => {
if (!registrations.get(pRegId)) {
log('sync: unregister@%s: %s', id, pRegId)
actions.push(cb => point.toJS().rpc().unregister(pRegId, pReg.peer.id.toBytes(), cb))
return regs.delete(pRegId)
}

return regs
}, regs)

regs = registrations.reduce((regs, reg, regId) => {
if (!regs.get(regId)) {
log('sync: register@%s: %s', id, regId)
actions.push(cb => point.toJS().rpc().register(regId, reg.peer, reg.ttl, cb))
return regs.set(regId, reg)
}

return regs
}, regs)

return points.set(id, point.set('registrations', regs))
}, points))

log('do sync')
parallel(actions, (err) => {
log('done sync')
delete this._syncLock

if (err) {
log(err) // ???
}

if (this._needResync) {
delete this._needResync
this.sync()
}
})
}

register (ns, peer, ttl, cb) {
if (typeof ttl === 'function') {
cb = ttl
ttl = 0
}
if (typeof peer === 'function') {
ttl = 0
cb = peer
peer = this.swarm.peerInfo
}

this.store = this.store.set('registrations', this.store.get('registrations').set(ns, {peer, ttl}))
this.sync()
}

_discover (peerID, ns, limit, cb) {
if (typeof limit === 'function') {
cb = limit
limit = 0
}
if (typeof ns === 'function') {
limit = 0
cb = ns
ns = null
}

log('discover@%s: %s limit=%s', peerID, ns || '<GLOBAL>', limit)

let point = this.store.get('points').get(peerID)
if (!point || !point.get('rpc')().online()) {
return cb(new Error('Point not connected!'))
}

point.get('rpc')().discover(ns, limit, point.get('cookies').get(ns) || Buffer.from(''), (err, res) => {
if (err) return cb(err)
this.store.set('points',
this.store.get('points').set(peerID,
this.store.get('points').get(peerID).set('cookies',
this.store.get('points').get(peerID).get('cookies').set(ns, res.cookie))))
return cb(null, res.peers)
})
}

discover (ns, cb) {
if (typeof ns === 'function') {
cb = ns
}

let ids = this.store.get('points').toArray().map(p => p[0])

map(ids,
(peerID, cb) => this._discover(peerID, ns, 0, (err, res) => err ? cb(null, []) : cb(null, res)),
(err, res) => err ? cb(err) : cb(null, res.reduce((a, b) => a.concat(b), [])))
}

unregister (ns, id) {
if (!ns) {
id = this.swarm.peerInfo.id.toBytes()
ns = null
}
if (!id) {
id = this.swarm.peerInfo.id.toBytes()
}

this.store = this.store.set('registrations', this.store.get('registrations').delete(ns))
this.sync()
}
}

module.exports = Client
109 changes: 48 additions & 61 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,77 +1,64 @@
'use strict'

const RPC = require('./rpc')
const debug = require('debug')
const log = debug('libp2p:rendezvous')
const noop = () => {}

class RendezvousDiscovery {
constructor (swarm) {
this.swarm = swarm
this.peers = []
}

_dial (pi, cb) {
if (!cb) cb = noop
this.swarm.dialProtocol(pi, '/rendezvous/1.0.0', (err, conn) => {
if (err) return cb(err)
const rpc = new RPC()
rpc.setup(conn, err => {
if (err) return cb(err)
this.peers.push(rpc)
cb()
})
})
}

_rpc (cmd, ...a) { // TODO: add. round-robin / multicast / anycast?
this.peers[0][cmd](...a)
}
const Client = require('./client')
const EE = require('events').EventEmitter

register (ns, peer, cb) {
this._rpc('register', ns, peer, 0, cb) // TODO: interface does not expose ttl option?!
class RendezvousDiscovery extends EE {
constructor (swarm, opt) {
super()
this._client = new Client(swarm, opt)
this._discover = {}
this.swarm = swarm
this.tag = 'rendezvous'
}

discover (ns, limit, cookie, cb) {
if (typeof cookie === 'function') {
cb = cookie
cookie = Buffer.from('')
}
if (typeof limit === 'function') {
cookie = Buffer.from('')
cb = limit
limit = 0
start (cb) {
log('start')
this._loop = setInterval(this._discoverLoop.bind(this), 10 * 1000)
this.swarm.on('peer:connect', (peer) => this._client.dial(peer))
this.swarm.on('peer:disconnect', () => this._client.sync())
if (cb) {
cb()
}
if (typeof ns === 'function') {
cookie = Buffer.from('')
limit = 0
cb = ns
ns = null
}
stop (cb) {
log('stop')
clearInterval(this._loop)
this._client.stop()
if (cb) {
cb()
}

this._rpc('discover', ns, limit, cookie, cb)
}

unregister (ns, id) {
// TODO: https://github.com/libp2p/specs/issues/47
register (ns) {
if (!ns) {
id = this.swarm.peerInfo.id.toBytes()
ns = null
}
if (!id) {
id = this.swarm.peerInfo.id.toBytes()
ns = null // need cannonical form of "empty"
}

this._rpc('unregister', ns, id)
log('register', ns)
this._discover[ns] = true
this._client.register(ns, noop)
}

start (cb) {
this.swarm.on('peer:connect', peer => {
this._dial(peer)
})
cb()
unregister (ns) {
if (!ns) {
ns = null // need cannonical form of "empty"
}
log('unregister', ns)
delete this._discover[ns]
this._client.unregister(ns, noop)
}

stop (cb) {
// TODO: shutdown all conns
cb()
_discoverLoop() {
log('discover loop')
for (const ns in this._discover) {
this._client.discover(ns, (err, peers) => {
peers.forEach(peer => {
this.emit('peer', peer)
this.emit(ns ? 'ns:' + ns : 'global', peer)
})
})
}
}
}

Expand Down
Loading