diff --git a/packages/kad-dht/src/kad-dht.ts b/packages/kad-dht/src/kad-dht.ts index 5f076d2c5c..f3812ae95d 100644 --- a/packages/kad-dht/src/kad-dht.ts +++ b/packages/kad-dht/src/kad-dht.ts @@ -1,7 +1,5 @@ import { CodeError, CustomEvent, TypedEventEmitter, contentRoutingSymbol, peerDiscoverySymbol, peerRoutingSymbol } from '@libp2p/interface' import drain from 'it-drain' -import map from 'it-map' -import parallel from 'it-parallel' import pDefer from 'p-defer' import { PROTOCOL } from './constants.js' import { ContentFetching } from './content-fetching/index.js' @@ -22,42 +20,17 @@ import { removePrivateAddressesMapper } from './utils.js' import type { KadDHTComponents, KadDHTInit, Validators, Selectors, KadDHT as KadDHTInterface, QueryEvent, PeerInfoMapper } from './index.js' -import type { AbortOptions, ContentRouting, Logger, PeerDiscovery, PeerDiscoveryEvents, PeerId, PeerInfo, PeerRouting, RoutingOptions, Startable } from '@libp2p/interface' +import type { ContentRouting, Logger, PeerDiscovery, PeerDiscoveryEvents, PeerId, PeerInfo, PeerRouting, RoutingOptions, Startable } from '@libp2p/interface' import type { CID } from 'multiformats/cid' -async function * ensurePeerInfoHasMultiaddrs (source: AsyncGenerator, peerRouting: PeerRouting, log: Logger, options: AbortOptions = {}): AsyncGenerator<() => Promise, void, undefined> { - yield * map(source, prov => { - return async () => { - if (prov.multiaddrs.length > 0) { - return prov - } - - try { - return await peerRouting.findPeer(prov.id, { - ...options, - useCache: false - }) - } catch (err) { - log.error('could not find peer', err) - } - } - }) -} - /** * Wrapper class to convert events into returned values */ class DHTContentRouting implements ContentRouting { private readonly dht: KadDHTInterface - private readonly peerInfoMapper: PeerInfoMapper - private readonly peerRouting: PeerRouting - private readonly log: Logger - constructor (dht: KadDHTInterface, peerInfoMapper: PeerInfoMapper, peerRouting: PeerRouting, log: Logger) { + constructor (dht: KadDHTInterface) { this.dht = dht - this.peerInfoMapper = peerInfoMapper - this.peerRouting = peerRouting - this.log = log } async provide (cid: CID, options: RoutingOptions = {}): Promise { @@ -65,27 +38,11 @@ class DHTContentRouting implements ContentRouting { } async * findProviders (cid: CID, options: RoutingOptions = {}): AsyncGenerator { - const self = this - const source = async function * (): AsyncGenerator { - for await (const event of self.dht.findProviders(cid, options)) { - if (event.name === 'PROVIDER') { - yield * event.providers - } + for await (const event of this.dht.findProviders(cid, options)) { + if (event.name === 'PROVIDER') { + yield * event.providers } } - for await (let peerInfo of parallel(ensurePeerInfoHasMultiaddrs(source(), this.peerRouting, this.log, options))) { - if (peerInfo == null) { - continue - } - - peerInfo = this.peerInfoMapper(peerInfo) - - if (peerInfo.multiaddrs.length === 0) { - continue - } - - yield peerInfo - } } async put (key: Uint8Array, value: Uint8Array, options?: RoutingOptions): Promise { @@ -108,23 +65,15 @@ class DHTContentRouting implements ContentRouting { */ class DHTPeerRouting implements PeerRouting { private readonly dht: KadDHTInterface - private readonly peerInfoMapper: PeerInfoMapper - private readonly log: Logger - constructor (dht: KadDHTInterface, peerInfoMapper: PeerInfoMapper, log: Logger) { + constructor (dht: KadDHTInterface) { this.dht = dht - this.peerInfoMapper = peerInfoMapper - this.log = log } async findPeer (peerId: PeerId, options: RoutingOptions = {}): Promise { for await (const event of this.dht.findPeer(peerId, options)) { if (event.name === 'FINAL_PEER') { - const peer = this.peerInfoMapper(event.peer) - - if (peer.multiaddrs.length > 0) { - return event.peer - } + return event.peer } } @@ -132,27 +81,10 @@ class DHTPeerRouting implements PeerRouting { } async * getClosestPeers (key: Uint8Array, options: RoutingOptions = {}): AsyncIterable { - const self = this - const source = async function * (): AsyncGenerator { - for await (const event of self.dht.getClosestPeers(key, options)) { - if (event.name === 'FINAL_PEER') { - yield event.peer - } - } - } - - for await (let peerInfo of parallel(ensurePeerInfoHasMultiaddrs(source(), this, this.log, options))) { - if (peerInfo == null) { - continue - } - - peerInfo = this.peerInfoMapper(peerInfo) - - if (peerInfo.multiaddrs.length === 0) { - continue + for await (const event of this.dht.getClosestPeers(key, options)) { + if (event.name === 'FINAL_PEER') { + yield event.peer } - - yield peerInfo } } } @@ -347,8 +279,8 @@ export class KadDHT extends TypedEventEmitter implements Ka }) }) - this.dhtPeerRouting = new DHTPeerRouting(this, this.peerInfoMapper, this.log) - this.dhtContentRouting = new DHTContentRouting(this, this.peerInfoMapper, this.dhtPeerRouting, this.log) + this.dhtPeerRouting = new DHTPeerRouting(this) + this.dhtContentRouting = new DHTContentRouting(this) // if client mode has not been explicitly specified, auto-switch to server // mode when the node's peer data is updated with publicly dialable diff --git a/packages/kad-dht/test/libp2p-routing.spec.ts b/packages/kad-dht/test/libp2p-routing.spec.ts index 021e207435..c4ce230e92 100644 --- a/packages/kad-dht/test/libp2p-routing.spec.ts +++ b/packages/kad-dht/test/libp2p-routing.spec.ts @@ -195,215 +195,6 @@ describe('content routing', () => { multiaddrs: providerPeer.addresses.map(({ multiaddr }) => multiaddr.toString()) }]) }) - - it('should not block on finding providers without multiaddrs', async () => { - const receivedFirstProvider = pDefer() - const remotePeerInteractionsComplete = pDefer() - const providerPeerInteractionsComplete = pDefer() - - const remotePeer = createPeer(peers[3]) - const providerPeerWithoutAddresses = createPeer(peers[2]) - const providerPeer = createPeer(peers[1]) - - components.peerStore.get.withArgs(matchPeerId(remotePeer.id)).resolves(remotePeer) - - const { - connection, - incomingStream - } = createStreams(remotePeer.id, components) - - const { - incomingStream: providerPeerIncomingStream - } = createStreams(providerPeer.id, components) - - // a peer has connected - const topology = components.registrar.register.getCall(0).args[1] - topology.onConnect?.(remotePeer.id, connection) - - // remotePeer stream - void Promise.resolve().then(async () => { - const pb = pbStream(incomingStream) - - // read GET_PROVIDERS message - const getProvidersRequest = await pb.read(Message) - - expect(getProvidersRequest.type).to.equal(MessageType.GET_PROVIDERS) - expect(getProvidersRequest.key).to.equalBytes(key.multihash.bytes) - - // reply with the provider node - await pb.write({ - type: MessageType.GET_PROVIDERS, - providers: [{ - id: providerPeerWithoutAddresses.id.toBytes(), - multiaddrs: [] - }, { - id: providerPeer.id.toBytes(), - multiaddrs: providerPeer.addresses.map(({ multiaddr }) => multiaddr.bytes) - }] - }, Message) - - // read FIND_NODE message - const findNodeRequest = await pb.read(Message) - expect(findNodeRequest.type).to.equal(MessageType.FIND_NODE) - expect(findNodeRequest.key).to.equalBytes(providerPeerWithoutAddresses.id.toBytes()) - - // delay sending the response until providerPeer has been received - await receivedFirstProvider.promise - - // return details of providerPeerWithoutAddresses - await pb.write({ - type: MessageType.FIND_NODE, - closer: [{ - id: providerPeerWithoutAddresses.id.toBytes(), - multiaddrs: providerPeerWithoutAddresses.addresses.map(({ multiaddr }) => multiaddr.bytes) - }] - }, Message) - - remotePeerInteractionsComplete.resolve() - }) - - // providerPeer stream - void Promise.resolve().then(async () => { - const pb = pbStream(providerPeerIncomingStream) - - // read FIND_NODE message - const findNodeRequest = await pb.read(Message) - expect(findNodeRequest.type).to.equal(MessageType.FIND_NODE) - expect(findNodeRequest.key).to.equalBytes(providerPeerWithoutAddresses.id.toBytes()) - - // don't know providerPeerWithoutAddresses - await pb.write({ - type: MessageType.FIND_NODE - }, Message) - - providerPeerInteractionsComplete.resolve() - }) - - const provs: Array<{ id: string, multiaddrs: string[] }> = [] - - for await (const prov of contentRouting.findProviders(key)) { - provs.push({ - id: prov.id.toString(), - multiaddrs: prov.multiaddrs.map(ma => ma.toString()) - }) - - receivedFirstProvider.resolve() - } - - // should have received the provider - expect(provs).to.deep.equal([{ - id: providerPeer.id.toString(), - multiaddrs: providerPeer.addresses.map(({ multiaddr }) => multiaddr.toString()) - }, { - id: providerPeerWithoutAddresses.id.toString(), - multiaddrs: providerPeerWithoutAddresses.addresses.map(({ multiaddr }) => multiaddr.toString()) - }]) - - await expect(remotePeerInteractionsComplete.promise).to.eventually.be.undefined() - await expect(providerPeerInteractionsComplete.promise).to.eventually.be.undefined() - }) - - it('should ignore providers without multiaddrs', async () => { - const receivedFirstProvider = pDefer() - const remotePeerInteractionsComplete = pDefer() - const providerPeerInteractionsComplete = pDefer() - - const remotePeer = createPeer(peers[3]) - const providerPeerWithoutAddresses = createPeer(peers[2], { - addresses: [] - }) - const providerPeer = createPeer(peers[1]) - - components.peerStore.get.withArgs(matchPeerId(remotePeer.id)).resolves(remotePeer) - - const { - connection, - incomingStream - } = createStreams(remotePeer.id, components) - - const { - incomingStream: providerPeerIncomingStream - } = createStreams(providerPeer.id, components) - - // a peer has connected - const topology = components.registrar.register.getCall(0).args[1] - topology.onConnect?.(remotePeer.id, connection) - - // remotePeer stream - void Promise.resolve().then(async () => { - const pb = pbStream(incomingStream) - - // read GET_PROVIDERS message - const getProvidersRequest = await pb.read(Message) - - expect(getProvidersRequest.type).to.equal(MessageType.GET_PROVIDERS) - expect(getProvidersRequest.key).to.equalBytes(key.multihash.bytes) - - // reply with the provider node - await pb.write({ - type: MessageType.GET_PROVIDERS, - providers: [{ - id: providerPeerWithoutAddresses.id.toBytes(), - multiaddrs: [] - }, { - id: providerPeer.id.toBytes(), - multiaddrs: providerPeer.addresses.map(({ multiaddr }) => multiaddr.bytes) - }] - }, Message) - - // read FIND_NODE message - const findNodeRequest = await pb.read(Message) - expect(findNodeRequest.type).to.equal(MessageType.FIND_NODE) - expect(findNodeRequest.key).to.equalBytes(providerPeerWithoutAddresses.id.toBytes()) - - // delay sending the response until providerPeer has been received - await receivedFirstProvider.promise - - // don't know providerPeerWithoutAddresses - await pb.write({ - type: MessageType.FIND_NODE - }, Message) - - remotePeerInteractionsComplete.resolve() - }) - - // providerPeer stream - void Promise.resolve().then(async () => { - const pb = pbStream(providerPeerIncomingStream) - - // read FIND_NODE message - const findNodeRequest = await pb.read(Message) - expect(findNodeRequest.type).to.equal(MessageType.FIND_NODE) - expect(findNodeRequest.key).to.equalBytes(providerPeerWithoutAddresses.id.toBytes()) - - // don't know providerPeerWithoutAddresses - await pb.write({ - type: MessageType.FIND_NODE - }, Message) - - providerPeerInteractionsComplete.resolve() - }) - - const provs: Array<{ id: string, multiaddrs: string[] }> = [] - - for await (const prov of contentRouting.findProviders(key)) { - provs.push({ - id: prov.id.toString(), - multiaddrs: prov.multiaddrs.map(ma => ma.toString()) - }) - - receivedFirstProvider.resolve() - } - - // should have received the provider - expect(provs).to.deep.equal([{ - id: providerPeer.id.toString(), - multiaddrs: providerPeer.addresses.map(({ multiaddr }) => multiaddr.toString()) - }]) - - await expect(remotePeerInteractionsComplete.promise).to.eventually.be.undefined() - await expect(providerPeerInteractionsComplete.promise).to.eventually.be.undefined() - }) }) describe('peer routing', () => { @@ -566,236 +357,4 @@ describe('peer routing', () => { await expect(remotePeerInteractionsComplete.promise).to.eventually.be.undefined() await expect(closestPeerInteractionsComplete.promise).to.eventually.be.undefined() }) - - it('should not block on finding closest peers without multiaddrs', async () => { - const receivedFirstClosest = pDefer() - const remotePeerInteractionsComplete = pDefer() - const closestPeerInteractionsComplete = pDefer() - - const remotePeer = createPeer(peers[3]) - const closestPeerWithoutAddresses = createPeer(peers[2]) - const closestPeer = createPeer(peers[1]) - - components.peerStore.get.withArgs(matchPeerId(remotePeer.id)).resolves(remotePeer) - components.peerStore.get.withArgs(matchPeerId(closestPeer.id)).resolves(closestPeer) - components.peerStore.get.withArgs(matchPeerId(closestPeerWithoutAddresses.id)).resolves({ - ...closestPeerWithoutAddresses, - addresses: [] - }) - - const { - connection, - incomingStream - } = createStreams(remotePeer.id, components) - - const { - incomingStream: closestPeerIncomingStream - } = createStreams(closestPeer.id, components) - - // a peer has connected - const topology = components.registrar.register.getCall(0).args[1] - topology.onConnect?.(remotePeer.id, connection) - - // remotePeer stream - void Promise.resolve().then(async () => { - const pb = pbStream(incomingStream) - - // read FIND_NODE message - const getProvidersRequest = await pb.read(Message) - - expect(getProvidersRequest.type).to.equal(MessageType.FIND_NODE) - expect(getProvidersRequest.key).to.equalBytes(key.multihash.bytes) - - // reply with the closer nodes - await pb.write({ - type: MessageType.FIND_NODE, - closer: [{ - id: closestPeerWithoutAddresses.id.toBytes(), - multiaddrs: [] - }, { - id: closestPeer.id.toBytes(), - multiaddrs: closestPeer.addresses.map(({ multiaddr }) => multiaddr.bytes) - }] - }, Message) - - // read FIND_NODE message - const findNodeRequest = await pb.read(Message) - - expect(findNodeRequest.type).to.equal(MessageType.FIND_NODE) - expect(findNodeRequest.key).to.equalBytes(closestPeerWithoutAddresses.id.toBytes()) - - // delay sending the response until closestPeer has been received - await receivedFirstClosest.promise - - // return details of closestPeerWithoutAddresses - await pb.write({ - type: MessageType.FIND_NODE, - closer: [{ - id: closestPeerWithoutAddresses.id.toBytes(), - multiaddrs: closestPeerWithoutAddresses.addresses.map(({ multiaddr }) => multiaddr.bytes) - }] - }, Message) - - remotePeerInteractionsComplete.resolve() - }) - - // closestPeer stream - void Promise.resolve().then(async () => { - const pb = pbStream(closestPeerIncomingStream) - - // read FIND_NODE message - const findNodeRequest = await pb.read(Message) - expect(findNodeRequest.type).to.equal(MessageType.FIND_NODE) - expect(findNodeRequest.key).to.equalBytes(key.multihash.bytes) - - await pb.write({ - type: MessageType.FIND_NODE, - closer: [] - }, Message) - - const secondFindNodeRequest = await pb.read(Message) - expect(secondFindNodeRequest.type).to.equal(MessageType.FIND_NODE) - expect(secondFindNodeRequest.key).to.equalBytes(closestPeerWithoutAddresses.id.toBytes()) - - // don't know closestPeerWithoutAddresses - await pb.write({ - type: MessageType.FIND_NODE - }, Message) - - closestPeerInteractionsComplete.resolve() - }) - - const closest: Array<{ id: string, multiaddrs: string[] }> = [] - - for await (const closer of peerRouting.getClosestPeers(key.multihash.bytes)) { - closest.push({ - id: closer.id.toString(), - multiaddrs: closer.multiaddrs.map(ma => ma.toString()) - }) - - receivedFirstClosest.resolve() - } - - // should have received the closest peers - expect(closest).to.deep.equal([{ - id: closestPeer.id.toString(), - multiaddrs: closestPeer.addresses.map(({ multiaddr }) => multiaddr.toString()) - }, { - id: closestPeerWithoutAddresses.id.toString(), - multiaddrs: closestPeerWithoutAddresses.addresses.map(({ multiaddr }) => multiaddr.toString()) - }]) - - await expect(remotePeerInteractionsComplete.promise).to.eventually.be.undefined() - await expect(closestPeerInteractionsComplete.promise).to.eventually.be.undefined() - }) - - it('should ignore closest peers without multiaddrs', async () => { - const receivedFirstClosest = pDefer() - const remotePeerInteractionsComplete = pDefer() - const closestPeerInteractionsComplete = pDefer() - - const remotePeer = createPeer(peers[3]) - const closestPeerWithoutAddresses = createPeer(peers[2]) - const closestPeer = createPeer(peers[1]) - - components.peerStore.get.withArgs(matchPeerId(remotePeer.id)).resolves(remotePeer) - components.peerStore.get.withArgs(matchPeerId(closestPeer.id)).resolves(closestPeer) - components.peerStore.get.withArgs(matchPeerId(closestPeerWithoutAddresses.id)).resolves({ - ...closestPeerWithoutAddresses, - addresses: [] - }) - - const { - connection, - incomingStream - } = createStreams(remotePeer.id, components) - - const { - incomingStream: closestPeerIncomingStream - } = createStreams(closestPeer.id, components) - - // a peer has connected - const topology = components.registrar.register.getCall(0).args[1] - topology.onConnect?.(remotePeer.id, connection) - - // remotePeer stream - void Promise.resolve().then(async () => { - const pb = pbStream(incomingStream) - - // read FIND_NODE message - const getProvidersRequest = await pb.read(Message) - - expect(getProvidersRequest.type).to.equal(MessageType.FIND_NODE) - expect(getProvidersRequest.key).to.equalBytes(key.multihash.bytes) - - // reply with the closer nodes - await pb.write({ - type: MessageType.FIND_NODE, - closer: [{ - id: closestPeerWithoutAddresses.id.toBytes(), - multiaddrs: [] - }, { - id: closestPeer.id.toBytes(), - multiaddrs: closestPeer.addresses.map(({ multiaddr }) => multiaddr.bytes) - }] - }, Message) - - const secondFindNodeRequest = await pb.read(Message) - expect(secondFindNodeRequest.type).to.equal(MessageType.FIND_NODE) - expect(secondFindNodeRequest.key).to.equalBytes(closestPeerWithoutAddresses.id.toBytes()) - - // don't know closestPeerWithoutAddresses - await pb.write({ - type: MessageType.FIND_NODE - }, Message) - - remotePeerInteractionsComplete.resolve() - }) - - // closestPeer stream - void Promise.resolve().then(async () => { - const pb = pbStream(closestPeerIncomingStream) - - // read FIND_NODE message - const findNodeRequest = await pb.read(Message) - expect(findNodeRequest.type).to.equal(MessageType.FIND_NODE) - expect(findNodeRequest.key).to.equalBytes(key.multihash.bytes) - - await pb.write({ - type: MessageType.FIND_NODE, - closer: [] - }, Message) - - const secondFindNodeRequest = await pb.read(Message) - expect(secondFindNodeRequest.type).to.equal(MessageType.FIND_NODE) - expect(secondFindNodeRequest.key).to.equalBytes(closestPeerWithoutAddresses.id.toBytes()) - - // don't know closestPeerWithoutAddresses - await pb.write({ - type: MessageType.FIND_NODE - }, Message) - - closestPeerInteractionsComplete.resolve() - }) - - const closest: Array<{ id: string, multiaddrs: string[] }> = [] - - for await (const closer of peerRouting.getClosestPeers(key.multihash.bytes)) { - closest.push({ - id: closer.id.toString(), - multiaddrs: closer.multiaddrs.map(ma => ma.toString()) - }) - - receivedFirstClosest.resolve() - } - - // should have received the closest peers - expect(closest).to.deep.equal([{ - id: closestPeer.id.toString(), - multiaddrs: closestPeer.addresses.map(({ multiaddr }) => multiaddr.toString()) - }]) - - await expect(remotePeerInteractionsComplete.promise).to.eventually.be.undefined() - await expect(closestPeerInteractionsComplete.promise).to.eventually.be.undefined() - }) })