This repository has been archived by the owner on Jul 21, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: compatibility with go-libp2p-mdns (#80)
Adds an additional mdns poller to interop with go-libp2p until both implementations comply with the new spec, https://github.com/libp2p/specs/blob/4c5a459ae8fb9a250e5f87f0c64fadaa7997266a/discovery/mdns.md.
- Loading branch information
Showing
12 changed files
with
995 additions
and
18 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ docs | |
**/*.log | ||
test/repo-tests* | ||
**/bundle.js | ||
.nyc_output | ||
|
||
# Logs | ||
logs | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
'use strict' | ||
|
||
exports.SERVICE_TAG = '_ipfs-discovery._udp' | ||
exports.SERVICE_TAG_LOCAL = `${exports.SERVICE_TAG}.local` | ||
exports.MULTICAST_IP = '224.0.0.251' | ||
exports.MULTICAST_PORT = 5353 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
'use strict' | ||
|
||
// Compatibility with Go libp2p MDNS | ||
|
||
const EE = require('events') | ||
const parallel = require('async/parallel') | ||
const Responder = require('./responder') | ||
const Querier = require('./querier') | ||
|
||
class GoMulticastDNS extends EE { | ||
constructor (peerInfo) { | ||
super() | ||
this._started = false | ||
this._peerInfo = peerInfo | ||
this._onPeer = this._onPeer.bind(this) | ||
} | ||
|
||
start (callback) { | ||
if (this._started) { | ||
return callback(new Error('MulticastDNS service is already started')) | ||
} | ||
|
||
this._started = true | ||
this._responder = new Responder(this._peerInfo) | ||
this._querier = new Querier(this._peerInfo.id) | ||
|
||
this._querier.on('peer', this._onPeer) | ||
|
||
parallel([ | ||
cb => this._responder.start(cb), | ||
cb => this._querier.start(cb) | ||
], callback) | ||
} | ||
|
||
_onPeer (peerInfo) { | ||
this.emit('peer', peerInfo) | ||
} | ||
|
||
stop (callback) { | ||
if (!this._started) { | ||
return callback(new Error('MulticastDNS service is not started')) | ||
} | ||
|
||
const responder = this._responder | ||
const querier = this._querier | ||
|
||
this._started = false | ||
this._responder = null | ||
this._querier = null | ||
|
||
querier.removeListener('peer', this._onPeer) | ||
|
||
parallel([ | ||
cb => responder.stop(cb), | ||
cb => querier.stop(cb) | ||
], callback) | ||
} | ||
} | ||
|
||
module.exports = GoMulticastDNS |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
'use strict' | ||
|
||
const assert = require('assert') | ||
const EE = require('events') | ||
const MDNS = require('multicast-dns') | ||
const Multiaddr = require('multiaddr') | ||
const PeerInfo = require('peer-info') | ||
const PeerId = require('peer-id') | ||
const nextTick = require('async/nextTick') | ||
const log = require('debug')('libp2p:mdns:compat:querier') | ||
const { SERVICE_TAG_LOCAL, MULTICAST_IP, MULTICAST_PORT } = require('./constants') | ||
|
||
class Querier extends EE { | ||
constructor (peerId, options) { | ||
super() | ||
assert(peerId, 'missing peerId parameter') | ||
options = options || {} | ||
this._peerIdStr = peerId.toB58String() | ||
// Re-query every 60s, in leu of network change detection | ||
options.queryInterval = options.queryInterval || 60000 | ||
// Time for which the MDNS server will stay alive waiting for responses | ||
// Must be less than options.queryInterval! | ||
options.queryPeriod = Math.min( | ||
options.queryInterval, | ||
options.queryPeriod == null ? 5000 : options.queryPeriod | ||
) | ||
this._options = options | ||
this._onResponse = this._onResponse.bind(this) | ||
} | ||
|
||
start (callback) { | ||
this._handle = periodically(() => { | ||
// Create a querier that queries multicast but gets responses unicast | ||
const mdns = MDNS({ multicast: false, interface: '0.0.0.0', port: 0 }) | ||
|
||
mdns.on('response', this._onResponse) | ||
|
||
mdns.query({ | ||
id: nextId(), // id > 0 for unicast response | ||
questions: [{ name: SERVICE_TAG_LOCAL, type: 'PTR', class: 'IN' }] | ||
}, null, { | ||
address: MULTICAST_IP, | ||
port: MULTICAST_PORT | ||
}) | ||
|
||
return { | ||
stop: callback => { | ||
mdns.removeListener('response', this._onResponse) | ||
mdns.destroy(callback) | ||
} | ||
} | ||
}, { | ||
period: this._options.queryPeriod, | ||
interval: this._options.queryInterval | ||
}) | ||
|
||
nextTick(() => callback()) | ||
} | ||
|
||
_onResponse (event, info) { | ||
const answers = event.answers || [] | ||
const ptrRecord = answers.find(a => a.type === 'PTR' && a.name === SERVICE_TAG_LOCAL) | ||
|
||
// Only deal with responses for our service tag | ||
if (!ptrRecord) return | ||
|
||
log('got response', event, info) | ||
|
||
const txtRecord = answers.find(a => a.type === 'TXT') | ||
if (!txtRecord) return log('missing TXT record in response') | ||
|
||
let peerIdStr | ||
try { | ||
peerIdStr = txtRecord.data[0].toString() | ||
} catch (err) { | ||
return log('failed to extract peer ID from TXT record data', txtRecord, err) | ||
} | ||
|
||
if (this._peerIdStr === peerIdStr) { | ||
return log('ignoring reply to myself') | ||
} | ||
|
||
let peerId | ||
try { | ||
peerId = PeerId.createFromB58String(peerIdStr) | ||
} catch (err) { | ||
return log('failed to create peer ID from TXT record data', peerIdStr, err) | ||
} | ||
|
||
PeerInfo.create(peerId, (err, info) => { | ||
if (err) return log('failed to create peer info from peer ID', peerId, err) | ||
|
||
const srvRecord = answers.find(a => a.type === 'SRV') | ||
if (!srvRecord) return log('missing SRV record in response') | ||
|
||
log('peer found', peerIdStr) | ||
|
||
const { port } = srvRecord.data || {} | ||
const protos = { A: 'ip4', AAAA: 'ip6' } | ||
|
||
const multiaddrs = answers | ||
.filter(a => ['A', 'AAAA'].includes(a.type)) | ||
.reduce((addrs, a) => { | ||
const maStr = `/${protos[a.type]}/${a.data}/tcp/${port}` | ||
try { | ||
addrs.push(new Multiaddr(maStr)) | ||
log(maStr) | ||
} catch (err) { | ||
log(`failed to create multiaddr from ${a.type} record data`, maStr, port, err) | ||
} | ||
return addrs | ||
}, []) | ||
|
||
multiaddrs.forEach(addr => info.multiaddrs.add(addr)) | ||
this.emit('peer', info) | ||
}) | ||
} | ||
|
||
stop (callback) { | ||
this._handle.stop(callback) | ||
} | ||
} | ||
|
||
module.exports = Querier | ||
|
||
/** | ||
* Run `fn` for a certain period of time, and then wait for an interval before | ||
* running it again. `fn` must return an object with a stop function, which is | ||
* called when the period expires. | ||
* | ||
* @param {Function} fn function to run | ||
* @param {Object} [options] | ||
* @param {Object} [options.period] Period in ms to run the function for | ||
* @param {Object} [options.interval] Interval in ms between runs | ||
* @returns {Object} handle that can be used to stop execution | ||
*/ | ||
function periodically (fn, options) { | ||
let handle, timeoutId | ||
let stopped = false | ||
|
||
const reRun = () => { | ||
handle = fn() | ||
timeoutId = setTimeout(() => { | ||
handle.stop(err => { | ||
if (err) log(err) | ||
if (!stopped) { | ||
timeoutId = setTimeout(reRun, options.interval) | ||
} | ||
}) | ||
handle = null | ||
}, options.period) | ||
} | ||
|
||
reRun() | ||
|
||
return { | ||
stop (callback) { | ||
stopped = true | ||
clearTimeout(timeoutId) | ||
if (handle) { | ||
handle.stop(callback) | ||
} else { | ||
callback() | ||
} | ||
} | ||
} | ||
} | ||
|
||
const nextId = (() => { | ||
let id = 0 | ||
return () => { | ||
id++ | ||
if (id === Number.MAX_SAFE_INTEGER) id = 1 | ||
return id | ||
} | ||
})() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
'use strict' | ||
|
||
const OS = require('os') | ||
const assert = require('assert') | ||
const MDNS = require('multicast-dns') | ||
const log = require('debug')('libp2p:mdns:compat:responder') | ||
const TCP = require('libp2p-tcp') | ||
const nextTick = require('async/nextTick') | ||
const { SERVICE_TAG_LOCAL } = require('./constants') | ||
|
||
const tcp = new TCP() | ||
|
||
class Responder { | ||
constructor (peerInfo) { | ||
assert(peerInfo, 'missing peerInfo parameter') | ||
this._peerInfo = peerInfo | ||
this._peerIdStr = peerInfo.id.toB58String() | ||
this._onQuery = this._onQuery.bind(this) | ||
} | ||
|
||
start (callback) { | ||
this._mdns = MDNS() | ||
this._mdns.on('query', this._onQuery) | ||
nextTick(() => callback()) | ||
} | ||
|
||
_onQuery (event, info) { | ||
const multiaddrs = tcp.filter(this._peerInfo.multiaddrs.toArray()) | ||
// Only announce TCP for now | ||
if (!multiaddrs.length) return | ||
|
||
const questions = event.questions || [] | ||
|
||
// Only respond to queries for our service tag | ||
if (!questions.some(q => q.name === SERVICE_TAG_LOCAL)) return | ||
|
||
log('got query', event, info) | ||
|
||
const answers = [] | ||
const peerServiceTagLocal = `${this._peerIdStr}.${SERVICE_TAG_LOCAL}` | ||
|
||
answers.push({ | ||
name: SERVICE_TAG_LOCAL, | ||
type: 'PTR', | ||
class: 'IN', | ||
ttl: 120, | ||
data: peerServiceTagLocal | ||
}) | ||
|
||
// Only announce TCP multiaddrs for now | ||
const port = multiaddrs[0].toString().split('/')[4] | ||
|
||
answers.push({ | ||
name: peerServiceTagLocal, | ||
type: 'SRV', | ||
class: 'IN', | ||
ttl: 120, | ||
data: { | ||
priority: 10, | ||
weight: 1, | ||
port, | ||
target: OS.hostname() | ||
} | ||
}) | ||
|
||
answers.push({ | ||
name: peerServiceTagLocal, | ||
type: 'TXT', | ||
class: 'IN', | ||
ttl: 120, | ||
data: [Buffer.from(this._peerIdStr)] | ||
}) | ||
|
||
multiaddrs.forEach((ma) => { | ||
const proto = ma.protoNames()[0] | ||
if (proto === 'ip4' || proto === 'ip6') { | ||
answers.push({ | ||
name: OS.hostname(), | ||
type: proto === 'ip4' ? 'A' : 'AAAA', | ||
class: 'IN', | ||
ttl: 120, | ||
data: ma.toString().split('/')[2] | ||
}) | ||
} | ||
}) | ||
|
||
log('responding to query', answers) | ||
this._mdns.respond(answers, info) | ||
} | ||
|
||
stop (callback) { | ||
this._mdns.removeListener('query', this._onQuery) | ||
this._mdns.destroy(callback) | ||
} | ||
} | ||
|
||
module.exports = Responder |
Oops, something went wrong.