Skip to content

Commit

Permalink
fix: include DHT client in FIND_NODE response if exact match (#2835)
Browse files Browse the repository at this point in the history
If we receive a query for a peer id, and that peer is in the peer
store, include that peer in the response whether or not it is a
DHT server.

This is not mentioned in the spec but is in the go implementation.
  • Loading branch information
achingbrain authored Nov 21, 2024
1 parent b248eef commit 98f3c77
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 21 deletions.
50 changes: 41 additions & 9 deletions packages/kad-dht/src/peer-routing/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { publicKeyFromProtobuf } from '@libp2p/crypto/keys'
import { InvalidPublicKeyError, NotFoundError } from '@libp2p/interface'
import { peerIdFromPublicKey } from '@libp2p/peer-id'
import { peerIdFromPublicKey, peerIdFromMultihash } from '@libp2p/peer-id'
import { Libp2pRecord } from '@libp2p/record'
import * as Digest from 'multiformats/hashes/digest'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { xor as uint8ArrayXor } from 'uint8arrays/xor'
import { xorCompare as uint8ArrayXorCompare } from 'uint8arrays/xor-compare'
import { QueryError, InvalidRecordError } from '../errors.js'
import { MessageType } from '../message/dht.js'
import { PeerDistanceList } from '../peer-distance-list.js'
Expand All @@ -12,14 +15,20 @@ import {
valueEvent
} from '../query/events.js'
import { verifyRecord } from '../record/validators.js'
import { convertBuffer, keyForPublicKey } from '../utils.js'
import type { KadDHTComponents, DHTRecord, FinalPeerEvent, QueryEvent, Validators } from '../index.js'
import { convertBuffer, convertPeerId, keyForPublicKey } from '../utils.js'
import type { DHTRecord, FinalPeerEvent, QueryEvent, Validators } from '../index.js'
import type { Message } from '../message/dht.js'
import type { Network } from '../network.js'
import type { QueryManager, QueryOptions } from '../query/manager.js'
import type { QueryFunc } from '../query/types.js'
import type { RoutingTable } from '../routing-table/index.js'
import type { Logger, PeerId, PeerInfo, PeerStore, RoutingOptions } from '@libp2p/interface'
import type { ComponentLogger, Logger, PeerId, PeerInfo, PeerStore, RoutingOptions } from '@libp2p/interface'

export interface PeerRoutingComponents {
peerId: PeerId
peerStore: PeerStore
logger: ComponentLogger
}

export interface PeerRoutingInit {
routingTable: RoutingTable
Expand All @@ -38,7 +47,7 @@ export class PeerRouting {
private readonly peerStore: PeerStore
private readonly peerId: PeerId

constructor (components: KadDHTComponents, init: PeerRoutingInit) {
constructor (components: PeerRoutingComponents, init: PeerRoutingInit) {
this.routingTable = init.routingTable
this.network = init.network
this.validators = init.validators
Expand Down Expand Up @@ -283,19 +292,42 @@ export class PeerRouting {
}

/**
* Get the nearest peers to the given query, but if closer
* than self
* Get the nearest peers to the given query, but if closer than self
*/
async getCloserPeersOffline (key: Uint8Array, closerThan: PeerId): Promise<PeerInfo[]> {
const id = await convertBuffer(key)
const ids = this.routingTable.closestPeers(id)
const output: PeerInfo[] = []

// try getting the peer directly
try {
const multihash = Digest.decode(key)
const targetPeerId = peerIdFromMultihash(multihash)

const peer = await this.peerStore.get(targetPeerId)

output.push({
id: peer.id,
multiaddrs: peer.addresses.map(({ multiaddr }) => multiaddr)
})
} catch {}

const keyKadId = await convertBuffer(key)
const ids = this.routingTable.closestPeers(keyKadId)
const closerThanKadId = await convertPeerId(closerThan)
const requesterXor = uint8ArrayXor(closerThanKadId, keyKadId)

for (const peerId of ids) {
if (peerId.equals(closerThan)) {
continue
}

const peerKadId = await convertPeerId(peerId)
const peerXor = uint8ArrayXor(peerKadId, keyKadId)

// only include if peer isy closer than requester
if (uint8ArrayXorCompare(peerXor, requesterXor) !== -1) {
continue
}

try {
const peer = await this.peerStore.get(peerId)

Expand Down
6 changes: 3 additions & 3 deletions packages/kad-dht/test/kad-dht.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import * as kadUtils from '../src/utils.js'
import { createPeerIdsWithPrivateKey } from './utils/create-peer-id.js'
import { createValues } from './utils/create-values.js'
import { countDiffPeers } from './utils/index.js'
import { sortClosestPeers } from './utils/sort-closest-peers.js'
import { sortClosestPeers, sortDHTs } from './utils/sort-closest-peers.js'
import { TestDHT } from './utils/test-dht.js'
import type { PeerIdWithPrivateKey } from './utils/create-peer-id.js'
import type { FinalPeerEvent, QueryEvent, ValueEvent } from '../src/index.js'
Expand Down Expand Up @@ -371,12 +371,12 @@ describe('KadDHT', () => {
const key = uint8ArrayFromString('/v/hello')
const value = uint8ArrayFromString('world')

const dhts = await Promise.all([
const dhts = await sortDHTs(await Promise.all([
tdht.spawn(),
tdht.spawn(),
tdht.spawn(),
tdht.spawn()
])
]), await kadUtils.convertBuffer(key))

// Connect all
await Promise.all([
Expand Down
26 changes: 17 additions & 9 deletions packages/kad-dht/test/multiple-nodes.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { expect } from 'aegir/chai'
import drain from 'it-drain'
import last from 'it-last'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { convertBuffer } from '../src/utils.js'
import { sortDHTs } from './utils/sort-closest-peers.js'
import { TestDHT } from './utils/test-dht.js'
import type { KadDHT } from '../src/kad-dht.js'

Expand Down Expand Up @@ -37,6 +39,8 @@ describe('multiple nodes', function () {
const key = uint8ArrayFromString('/v/hello0')
const value = uint8ArrayFromString('world')

dhts = await sortDHTs(dhts, await convertBuffer(key))

await drain(dhts[7].put(key, value))

const res = await Promise.all([
Expand All @@ -62,6 +66,8 @@ describe('multiple nodes', function () {
const key = uint8ArrayFromString('/v/hello1')
const value = uint8ArrayFromString('world')

dhts = await sortDHTs(dhts, await convertBuffer(key))

await drain(dhts[1].put(key, value))

const res = await Promise.all([
Expand All @@ -87,17 +93,19 @@ describe('multiple nodes', function () {
const key = uint8ArrayFromString('/v/hallo')
const result = uint8ArrayFromString('world4')

await drain(dhts[0].put(key, uint8ArrayFromString('world0')))
await drain(dhts[1].put(key, uint8ArrayFromString('world1')))
await drain(dhts[2].put(key, uint8ArrayFromString('world2')))
await drain(dhts[3].put(key, uint8ArrayFromString('world3')))
await drain(dhts[4].put(key, uint8ArrayFromString('world4')))
dhts = await sortDHTs(dhts, await convertBuffer(key))

await drain(dhts[3].put(key, uint8ArrayFromString('world0')))
await drain(dhts[4].put(key, uint8ArrayFromString('world1')))
await drain(dhts[5].put(key, uint8ArrayFromString('world2')))
await drain(dhts[6].put(key, uint8ArrayFromString('world3')))
await drain(dhts[7].put(key, uint8ArrayFromString('world4')))

const res = await Promise.all([
last(dhts[4].get(key)),
last(dhts[5].get(key)),
last(dhts[6].get(key)),
last(dhts[7].get(key))
last(dhts[0].get(key)),
last(dhts[1].get(key)),
last(dhts[2].get(key)),
last(dhts[3].get(key))
])

expect(res[0]).have.property('value').that.equalBytes(result)
Expand Down
176 changes: 176 additions & 0 deletions packages/kad-dht/test/peer-routing.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import { generateKeyPair } from '@libp2p/crypto/keys'
import { defaultLogger } from '@libp2p/logger'
import { peerIdFromPrivateKey } from '@libp2p/peer-id'
import { multiaddr } from '@multiformats/multiaddr'
import { expect } from 'aegir/chai'
import { stubInterface, type StubbedInstance } from 'sinon-ts'
import { PeerRouting } from '../src/peer-routing/index.js'
import { convertBuffer } from '../src/utils.js'
import { sortClosestPeers } from './utils/sort-closest-peers.js'
import type { Validators } from '../src/index.js'
import type { Network } from '../src/network.js'
import type { QueryManager } from '../src/query/manager.js'
import type { RoutingTable } from '../src/routing-table/index.js'
import type { Peer, ComponentLogger, PeerId, PeerStore } from '@libp2p/interface'

interface StubbedPeerRoutingComponents {
peerId: PeerId
peerStore: StubbedInstance<PeerStore>
logger: ComponentLogger
}

interface StubbedPeerRoutingInit {
routingTable: StubbedInstance<RoutingTable>
network: StubbedInstance<Network>
validators: Validators
queryManager: StubbedInstance<QueryManager>
logPrefix: string
}

describe('peer-routing', () => {
let peerRouting: PeerRouting
let components: StubbedPeerRoutingComponents
let init: StubbedPeerRoutingInit

beforeEach(async () => {
const privateKey = await generateKeyPair('Ed25519')
const peerId = peerIdFromPrivateKey(privateKey)
components = {
peerId,
peerStore: stubInterface(),
logger: defaultLogger()
}

init = {
routingTable: stubInterface(),
network: stubInterface(),
validators: {},
queryManager: stubInterface(),
logPrefix: 'libp2p:test-dht'
}

peerRouting = new PeerRouting(components, init)
})

describe('getCloserPeersOffline', () => {
it('should only return DHT servers', async () => {
const key = Uint8Array.from([0, 1, 2, 3, 4])
const [
clientPeerId,
serverPeerId,
requester
] = await getSortedPeers(key)

const clientPeer: Peer = stubInterface<Peer>({
id: clientPeerId,
addresses: [{
isCertified: true,
multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001')
}]
})
const serverPeer: Peer = stubInterface<Peer>({
id: serverPeerId,
addresses: [{
isCertified: true,
multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002')
}]
})

init.routingTable.closestPeers.returns([
serverPeer.id
])

components.peerStore.get.withArgs(serverPeer.id).resolves(serverPeer)
components.peerStore.get.withArgs(clientPeer.id).resolves(clientPeer)

const closer = await peerRouting.getCloserPeersOffline(key, requester)

expect(closer).to.have.lengthOf(1)
expect(closer[0].id).to.equal(serverPeer.id)
})

it('should include the target peer if known, even if the peer is not a DHT server', async () => {
const clientPeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
const key = clientPeerId.toMultihash().bytes
const [
serverPeerId,
requester
] = await getSortedPeers(key)

const clientPeer: Peer = stubInterface<Peer>({
id: clientPeerId,
addresses: [{
isCertified: true,
multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001')
}]
})
const serverPeer: Peer = stubInterface<Peer>({
id: serverPeerId,
addresses: [{
isCertified: true,
multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002')
}]
})

init.routingTable.closestPeers.returns([
serverPeer.id
])

components.peerStore.get.withArgs(serverPeer.id).resolves(serverPeer)
components.peerStore.get.withArgs(clientPeer.id).resolves(clientPeer)

const closer = await peerRouting.getCloserPeersOffline(key, requester)

expect(closer).to.have.lengthOf(2)
expect(closer[0].id).to.equal(clientPeer.id)
expect(closer[1].id).to.equal(serverPeer.id)
})

it('should only include peers closer than the requesting peer', async () => {
const key = Uint8Array.from([0, 1, 2, 3, 4])
const [
closerPeerId,
requester,
furtherPeerId
] = await getSortedPeers(key)

const closerPeer: Peer = stubInterface<Peer>({
id: closerPeerId,
addresses: [{
isCertified: true,
multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001')
}]
})
const furtherPeer: Peer = stubInterface<Peer>({
id: furtherPeerId,
addresses: [{
isCertified: true,
multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002')
}]
})

init.routingTable.closestPeers.returns([
closerPeer.id,
furtherPeer.id
])

components.peerStore.get.withArgs(closerPeer.id).resolves(closerPeer)
components.peerStore.get.withArgs(furtherPeer.id).resolves(furtherPeer)

const closer = await peerRouting.getCloserPeersOffline(key, requester)

expect(closer).to.have.lengthOf(1)
expect(closer[0].id).to.equal(closerPeer.id)
})
})
})

async function getSortedPeers (key: Uint8Array, count = 3): Promise<PeerId[]> {
const keyKadId = await convertBuffer(key)

const peers = await Promise.all(
new Array(count).fill(0).map(async () => peerIdFromPrivateKey(await generateKeyPair('Ed25519')))
)

return sortClosestPeers(peers, keyKadId)
}
21 changes: 21 additions & 0 deletions packages/kad-dht/test/utils/sort-closest-peers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import map from 'it-map'
import { xor as uint8ArrayXor } from 'uint8arrays/xor'
import { xorCompare as uint8ArrayXorCompare } from 'uint8arrays/xor-compare'
import { convertPeerId } from '../../src/utils.js'
import type { KadDHT } from '../../src/kad-dht.js'
import type { PeerId } from '@libp2p/interface'

/**
Expand All @@ -26,3 +27,23 @@ export async function sortClosestPeers <T extends PeerId = PeerId> (peers: T[],
})
.map((d) => d.peer)
}

export async function sortDHTs <T extends KadDHT[]> (dhts: T, kadId: Uint8Array): Promise<T> {
const distances = await all(
map(dhts, async (dht) => {
const id = await convertPeerId(dht.components.peerId)

return {
dht,
distance: uint8ArrayXor(id, kadId)
}
})
)

// @ts-expect-error KadDHT may not be T
return distances
.sort((a, b) => {
return uint8ArrayXorCompare(a.distance, b.distance)
})
.map((d) => d.dht)
}

0 comments on commit 98f3c77

Please sign in to comment.