Skip to content

Commit

Permalink
refactor: convert dht API to async/await (ipfs#1156)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Shaw authored Nov 14, 2019
1 parent 621973c commit 5110423
Show file tree
Hide file tree
Showing 12 changed files with 238 additions and 285 deletions.
37 changes: 37 additions & 0 deletions src/dht/find-peer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
'use strict'

const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const ndjson = require('iterable-ndjson')
const configure = require('../lib/configure')
const toIterable = require('../lib/stream-to-iterable')

module.exports = configure(({ ky }) => {
return (peerId, options) => (async function * () {
options = options || {}

const searchParams = new URLSearchParams(options.searchParams)
searchParams.set('arg', `${peerId}`)
if (options.verbose != null) searchParams.set('verbose', options.verbose)

const res = await ky.get('dht/findpeer', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams
})

for await (const message of ndjson(toIterable(res.body))) {
// 2 = FinalPeer
// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18
if (message.Type === 2 && message.Responses) {
for (const { ID, Addrs } of message.Responses) {
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
yield peerInfo
}
}
}
})()
})
38 changes: 38 additions & 0 deletions src/dht/find-provs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
'use strict'

const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const ndjson = require('iterable-ndjson')
const configure = require('../lib/configure')
const toIterable = require('../lib/stream-to-iterable')

module.exports = configure(({ ky }) => {
return (cid, options) => (async function * () {
options = options || {}

const searchParams = new URLSearchParams(options.searchParams)
searchParams.set('arg', `${cid}`)
if (options.numProviders) searchParams.set('num-providers', options.numProviders)
if (options.verbose != null) searchParams.set('verbose', options.verbose)

const res = await ky.get('dht/findprovs', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams
})

for await (const message of ndjson(toIterable(res.body))) {
// 4 = Provider
// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L20
if (message.Type === 4 && message.Responses) {
for (const { ID, Addrs } of message.Responses) {
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
yield peerInfo
}
}
}
})()
})
63 changes: 0 additions & 63 deletions src/dht/findpeer.js

This file was deleted.

63 changes: 0 additions & 63 deletions src/dht/findprovs.js

This file was deleted.

62 changes: 22 additions & 40 deletions src/dht/get.js
Original file line number Diff line number Diff line change
@@ -1,48 +1,30 @@
'use strict'

const promisify = require('promisify-es6')
const ndjson = require('iterable-ndjson')
const configure = require('../lib/configure')
const toIterable = require('../lib/stream-to-iterable')

module.exports = (send) => {
return promisify((key, opts, callback) => {
if (typeof opts === 'function' && !callback) {
callback = opts
opts = {}
}

// opts is the real callback --
// 'callback' is being injected by promisify
if (typeof opts === 'function' && typeof callback === 'function') {
callback = opts
opts = {}
}
module.exports = configure(({ ky }) => {
return (key, options) => (async function * () {
options = options || {}

function handleResult (done, err, res) {
if (err) {
return done(err)
}
if (!res) {
return done(new Error('empty response'))
}
if (res.length === 0) {
return done(new Error('no value returned for key'))
}
const searchParams = new URLSearchParams(options.searchParams)
searchParams.set('arg', `${key}`)
if (options.verbose != null) searchParams.set('verbose', options.verbose)

// Inconsistent return values in the browser vs node
if (Array.isArray(res)) {
res = res[0]
}
const res = await ky.get('dht/get', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams
})

if (res.Type === 5) {
done(null, res.Extra)
} else {
done(new Error('key was not found (type 6)'))
for await (const message of ndjson(toIterable(res.body))) {
// 5 = Value
// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L21
if (message.Type === 5) {
yield message.Extra
}
}

send({
path: 'dht/get',
args: key,
qs: opts
}, handleResult.bind(null, callback))
})
}
})()
})
31 changes: 22 additions & 9 deletions src/dht/index.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
'use strict'

const moduleConfig = require('../utils/module-config')
const callbackify = require('callbackify')
const errCode = require('err-code')
const { collectify } = require('../lib/converters')

module.exports = (arg) => {
const send = moduleConfig(arg)
module.exports = config => {
const get = require('./get')(config)
const findPeer = require('./find-peer')(config)

return {
get: require('./get')(send),
put: require('./put')(send),
findProvs: require('./findprovs')(send),
findPeer: require('./findpeer')(send),
provide: require('./provide')(send),
get: callbackify.variadic(async (key, options) => {
for await (const value of get(key, options)) {
return value
}
throw errCode(new Error('value not found'), 'ERR_TYPE_5_NOT_FOUND')
}),
put: callbackify.variadic(collectify(require('./put')(config))),
findProvs: callbackify.variadic(collectify(require('./find-provs')(config))),
findPeer: callbackify.variadic(async (peerId, options) => {
for await (const peerInfo of findPeer(peerId, options)) {
return peerInfo
}
throw errCode(new Error('final peer not found'), 'ERR_TYPE_2_NOT_FOUND')
}),
provide: callbackify.variadic(collectify(require('./provide')(config))),
// find closest peerId to given peerId
query: require('./query')(send)
query: callbackify.variadic(collectify(require('./query')(config)))
}
}
63 changes: 33 additions & 30 deletions src/dht/provide.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,40 @@
'use strict'

const promisify = require('promisify-es6')
const CID = require('cids')
const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const ndjson = require('iterable-ndjson')
const configure = require('../lib/configure')
const toIterable = require('../lib/stream-to-iterable')
const toCamel = require('../lib/object-to-camel')

module.exports = (send) => {
return promisify((cids, opts, callback) => {
if (typeof opts === 'function' && !callback) {
callback = opts
opts = {}
}
module.exports = configure(({ ky }) => {
return (cids, options) => (async function * () {
cids = Array.isArray(cids) ? cids : [cids]
options = options || {}

// opts is the real callback --
// 'callback' is being injected by promisify
if (typeof opts === 'function' && typeof callback === 'function') {
callback = opts
opts = {}
}
const searchParams = new URLSearchParams(options.searchParams)
cids.forEach(cid => searchParams.append('arg', `${cid}`))
if (options.recursive != null) searchParams.set('recursive', options.recursive)
if (options.verbose != null) searchParams.set('verbose', options.verbose)

if (!Array.isArray(cids)) {
cids = [cids]
}
const res = await ky.get('dht/provide', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams
})

// Validate CID(s) and serialize
try {
cids = cids.map(cid => new CID(cid).toBaseEncodedString('base58btc'))
} catch (err) {
return callback(err)
for await (let message of ndjson(toIterable(res.body))) {
message = toCamel(message)
if (message.responses) {
message.responses = message.responses.map(({ ID, Addrs }) => {
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
return peerInfo
})
}
yield message
}

send({
path: 'dht/provide',
args: cids,
qs: opts
}, callback)
})
}
})()
})
Loading

0 comments on commit 5110423

Please sign in to comment.