Skip to content

Commit

Permalink
fix: perform find peer during dial if peer has no multiaddrs (#2345)
Browse files Browse the repository at this point in the history
A recent change to go-libp2p started yielding peers with no multiaddrs
during DHT queries.

These peer ids have addresses that can be found during a find peer
query but performing this query for every peer is expensive, so do the
lookup only when dialing the peer.
  • Loading branch information
achingbrain authored Jan 6, 2024
1 parent 53e83ee commit 444d837
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 86 deletions.
56 changes: 37 additions & 19 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
LAST_DIAL_FAILURE_KEY
} from './constants.js'
import { resolveMultiaddrs } from './utils.js'
import type { AddressSorter, AbortOptions, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore } from '@libp2p/interface'
import type { AddressSorter, AbortOptions, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore, PeerRouting } from '@libp2p/interface'
import type { TransportManager } from '@libp2p/interface-internal'

export interface PendingDialTarget {
Expand Down Expand Up @@ -57,17 +57,15 @@ interface DialQueueComponents {
peerId: PeerId
metrics?: Metrics
peerStore: PeerStore
peerRouting: PeerRouting
transportManager: TransportManager
connectionGater: ConnectionGater
logger: ComponentLogger
}

export class DialQueue {
public queue: Queue<Connection, DialQueueJobOptions>
private readonly peerId: PeerId
private readonly peerStore: PeerStore
private readonly connectionGater: ConnectionGater
private readonly transportManager: TransportManager
private readonly components: DialQueueComponents
private readonly addressSorter: AddressSorter
private readonly maxPeerAddrsToDial: number
private readonly dialTimeout: number
Expand All @@ -82,10 +80,7 @@ export class DialQueue {
this.connections = init.connections ?? new PeerMap()
this.log = components.logger.forComponent('libp2p:connection-manager:dial-queue')

this.peerId = components.peerId
this.peerStore = components.peerStore
this.connectionGater = components.connectionGater
this.transportManager = components.transportManager
this.components = components
this.shutDownController = new AbortController()

setMaxListeners(Infinity, this.shutDownController.signal)
Expand Down Expand Up @@ -218,15 +213,15 @@ export class DialQueue {

for (const address of addrsToDial) {
if (dialed === this.maxPeerAddrsToDial) {
this.log('dialed %d addresses for %p, not trying any others', dialed, peerId)
this.log('dialed maxPeerAddrsToDial (%d) addresses for %p, not trying any others', dialed, peerId)

throw new CodeError('Peer had more than maxPeerAddrsToDial', codes.ERR_TOO_MANY_ADDRESSES)
}

dialed++

try {
const conn = await this.transportManager.dial(address.multiaddr, {
const conn = await this.components.transportManager.dial(address.multiaddr, {
...options,
signal
})
Expand All @@ -240,7 +235,7 @@ export class DialQueue {
if (peerId != null) {
// record the failed dial
try {
await this.peerStore.patch(peerId, {
await this.components.peerStore.patch(peerId, {
metadata: {
[LAST_DIAL_FAILURE_KEY]: uint8ArrayFromString(Date.now().toString())
}
Expand Down Expand Up @@ -299,19 +294,20 @@ export class DialQueue {

// if a peer id or multiaddr(s) with a peer id, make sure it isn't our peer id and that we are allowed to dial it
if (peerId != null) {
if (this.peerId.equals(peerId)) {
if (this.components.peerId.equals(peerId)) {
throw new CodeError('Tried to dial self', codes.ERR_DIALED_SELF)
}

if ((await this.connectionGater.denyDialPeer?.(peerId)) === true) {
if ((await this.components.connectionGater.denyDialPeer?.(peerId)) === true) {
throw new CodeError('The dial request is blocked by gater.allowDialPeer', codes.ERR_PEER_DIAL_INTERCEPTED)
}

// if just a peer id was passed, load available multiaddrs for this peer from the address book
// if just a peer id was passed, load available multiaddrs for this peer
// from the peer store
if (addrs.length === 0) {
this.log('loading multiaddrs for %p', peerId)
try {
const peer = await this.peerStore.get(peerId)
const peer = await this.components.peerStore.get(peerId)
addrs.push(...peer.addresses)
this.log('loaded multiaddrs for %p', peerId, addrs.map(({ multiaddr }) => multiaddr.toString()))
} catch (err: any) {
Expand All @@ -320,9 +316,31 @@ export class DialQueue {
}
}
}

// if we still don't have any addresses for this peer, try a lookup
// using the peer routing
if (addrs.length === 0) {
this.log('looking up multiaddrs for %p in the peer routing', peerId)

try {
const peerInfo = await this.components.peerRouting.findPeer(peerId)

this.log('found multiaddrs for %p in the peer routing', peerId, addrs.map(({ multiaddr }) => multiaddr.toString()))

addrs.push(...peerInfo.multiaddrs.map(multiaddr => ({
multiaddr,
isCertified: false
})))
} catch (err: any) {
if (err.code !== codes.ERR_NO_ROUTERS_AVAILABLE) {
this.log.error('looking up multiaddrs for %p in the peer routing failed', peerId, err)
}
}
}
}

// resolve addresses - this can result in a one-to-many translation when dnsaddrs are resolved
// resolve addresses - this can result in a one-to-many translation when
// dnsaddrs are resolved
let resolvedAddresses = (await Promise.all(
addrs.map(async addr => {
const result = await resolveMultiaddrs(addr.multiaddr, {
Expand Down Expand Up @@ -367,7 +385,7 @@ export class DialQueue {

const filteredAddrs = resolvedAddresses.filter(addr => {
// filter out any multiaddrs that we do not have transports for
if (this.transportManager.transportForMultiaddr(addr.multiaddr) == null) {
if (this.components.transportManager.transportForMultiaddr(addr.multiaddr) == null) {
return false
}

Expand Down Expand Up @@ -407,7 +425,7 @@ export class DialQueue {
const gatedAdrs: Address[] = []

for (const addr of dedupedMultiaddrs) {
if (this.connectionGater.denyDialMultiaddr != null && await this.connectionGater.denyDialMultiaddr(addr.multiaddr)) {
if (this.components.connectionGater.denyDialMultiaddr != null && await this.components.connectionGater.denyDialMultiaddr(addr.multiaddr)) {
continue
}

Expand Down
17 changes: 6 additions & 11 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { AutoDial } from './auto-dial.js'
import { ConnectionPruner } from './connection-pruner.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
import { DialQueue } from './dial-queue.js'
import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, Peer, PeerStore, Startable, PendingDialStatus } from '@libp2p/interface'
import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, Peer, PeerStore, Startable, PendingDialStatus, PeerRouting } from '@libp2p/interface'
import type { ConnectionManager, OpenConnectionOptions, TransportManager } from '@libp2p/interface-internal'
import type { JobStatus } from '@libp2p/utils/queue'

Expand Down Expand Up @@ -82,8 +82,9 @@ export interface ConnectionManagerInit {
maxParallelDials?: number

/**
* Maximum number of addresses allowed for a given peer - if a peer has more
* addresses than this then the dial will fail. (default: 25)
* Maximum number of addresses allowed for a given peer before giving up
*
* @default 25
*/
maxPeerAddrsToDial?: number

Expand Down Expand Up @@ -144,6 +145,7 @@ export interface DefaultConnectionManagerComponents {
peerId: PeerId
metrics?: Metrics
peerStore: PeerStore
peerRouting: PeerRouting
transportManager: TransportManager
connectionGater: ConnectionGater
events: TypedEventTarget<Libp2pEvents>
Expand Down Expand Up @@ -233,14 +235,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
allow: this.allow
})

this.dialQueue = new DialQueue({
peerId: components.peerId,
metrics: components.metrics,
peerStore: components.peerStore,
transportManager: components.transportManager,
connectionGater: components.connectionGater,
logger: components.logger
}, {
this.dialQueue = new DialQueue(components, {
addressSorter: init.addressSorter ?? defaultAddressSort,
maxParallelDials: init.maxParallelDials ?? MAX_PARALLEL_DIALS,
maxPeerAddrsToDial: init.maxPeerAddrsToDial ?? MAX_PEER_ADDRS_TO_DIAL,
Expand Down
45 changes: 8 additions & 37 deletions packages/libp2p/src/content-routing.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { CodeError } from '@libp2p/interface'
import { PeerSet } from '@libp2p/peer-collections'
import merge from 'it-merge'
import parallel from 'it-parallel'
import { codes, messages } from './errors.js'
import type { AbortOptions, ComponentLogger, ContentRouting, Logger, PeerInfo, PeerRouting, PeerStore, RoutingOptions, Startable } from '@libp2p/interface'
import type { AbortOptions, ComponentLogger, ContentRouting, PeerInfo, PeerRouting, PeerStore, RoutingOptions, Startable } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'

export interface CompoundContentRoutingInit {
Expand All @@ -17,13 +16,11 @@ export interface CompoundContentRoutingComponents {
}

export class CompoundContentRouting implements ContentRouting, Startable {
private readonly log: Logger
private readonly routers: ContentRouting[]
private started: boolean
private readonly components: CompoundContentRoutingComponents

constructor (components: CompoundContentRoutingComponents, init: CompoundContentRoutingInit) {
this.log = components.logger.forComponent('libp2p:content-routing')
this.routers = init.routers ?? []
this.started = false
this.components = components
Expand Down Expand Up @@ -52,48 +49,22 @@ export class CompoundContentRouting implements ContentRouting, Startable {
const self = this
const seen = new PeerSet()

for await (const peer of parallel(
async function * () {
const source = merge(
...self.routers.map(router => router.findProviders(key, options))
)

for await (let peer of source) {
yield async () => {
// find multiaddrs if they are missing
if (peer.multiaddrs.length === 0) {
try {
peer = await self.components.peerRouting.findPeer(peer.id, {
...options,
useCache: false
})
} catch (err) {
self.log.error('could not find peer multiaddrs', err)
return
}
}

return peer
}
}
}()
for await (const peer of merge(
...self.routers.map(router => router.findProviders(key, options))
)) {
// the peer was yielded by a content router without multiaddrs and we
// failed to load them
if (peer == null) {
continue
}

// skip peers without addresses
if (peer.multiaddrs.length === 0) {
continue
// store the addresses for the peer if found
if (peer.multiaddrs.length > 0) {
await this.components.peerStore.merge(peer.id, {
multiaddrs: peer.multiaddrs
})
}

// ensure we have the addresses for a given peer
await this.components.peerStore.merge(peer.id, {
multiaddrs: peer.multiaddrs
})

// deduplicate peers
if (seen.has(peer.id)) {
continue
Expand Down
27 changes: 12 additions & 15 deletions packages/libp2p/src/peer-routing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class DefaultPeerRouting implements PeerRouting {
private readonly peerStore: PeerStore
private readonly routers: PeerRouting[]

constructor (components: DefaultPeerRoutingComponents, init: PeerRoutingInit) {
constructor (components: DefaultPeerRoutingComponents, init: PeerRoutingInit = {}) {
this.log = components.logger.forComponent('libp2p:peer-routing')
this.peerId = components.peerId
this.peerStore = components.peerStore
Expand Down Expand Up @@ -57,10 +57,12 @@ export class DefaultPeerRouting implements PeerRouting {
continue
}

// ensure we have the addresses for a given peer
await this.peerStore.merge(peer.id, {
multiaddrs: peer.multiaddrs
})
// store the addresses for the peer if found
if (peer.multiaddrs.length > 0) {
await this.peerStore.merge(peer.id, {
multiaddrs: peer.multiaddrs
})
}

return peer
}
Expand Down Expand Up @@ -105,22 +107,17 @@ export class DefaultPeerRouting implements PeerRouting {
}
}()
)) {
// the peer was yielded by a content router without multiaddrs and we
// failed to load them
if (peer == null) {
continue
}

// skip peers without addresses
if (peer.multiaddrs.length === 0) {
continue
// store the addresses for the peer if found
if (peer.multiaddrs.length > 0) {
await this.peerStore.merge(peer.id, {
multiaddrs: peer.multiaddrs
})
}

// ensure we have the addresses for a given peer
await this.peerStore.merge(peer.id, {
multiaddrs: peer.multiaddrs
})

// deduplicate peers
if (seen.has(peer.id)) {
continue
Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/test/connection-manager/auto-dial.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { defaultComponents } from '../../src/components.js'
import { AutoDial } from '../../src/connection-manager/auto-dial.js'
import { LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js'
import { matchPeerId } from '../fixtures/match-peer-id.js'
import { matchPeerId } from '../fixtures/matchers.js'
import type { ConnectionManager } from '@libp2p/interface-internal'

describe('auto-dial', () => {
Expand Down
Loading

0 comments on commit 444d837

Please sign in to comment.