Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove unnecessary conversion from Multiaddr to IP #369

Merged
merged 5 commits into from
Nov 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"@libp2p/peer-record": "^4.0.1",
"@libp2p/pubsub": "^5.0.0",
"@libp2p/topology": "^3.0.0",
"@multiformats/multiaddr": "^11.0.0",
"abortable-iterator": "^4.0.2",
"denque": "^1.5.0",
"err-code": "^3.0.1",
Expand All @@ -98,7 +99,6 @@
"@libp2p/interface-pubsub-compliance-tests": "^4.0.0",
"@libp2p/peer-id-factory": "^1.0.18",
"@libp2p/peer-store": "^5.0.0",
"@multiformats/multiaddr": "^11.0.0",
"@types/node": "^17.0.21",
"@typescript-eslint/eslint-plugin": "^3.0.2",
"@typescript-eslint/parser": "^3.0.2",
Expand Down
53 changes: 48 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ import { InboundStream, OutboundStream } from './stream.js'
import { Uint8ArrayList } from 'uint8arraylist'
import { decodeRpc, DecodeRPCLimits, defaultDecodeRpcLimits } from './message/decodeRpc.js'
import { ConnectionManager } from '@libp2p/interface-connection-manager'
import { PeerStore } from '@libp2p/interface-peer-store'
import { PeerMultiaddrsChangeData, PeerStore } from '@libp2p/interface-peer-store'
import { Multiaddr } from '@multiformats/multiaddr'
import { multiaddrToIPStr } from './utils/multiaddr.js'

type ConnectionDirection = 'inbound' | 'outbound'

Expand Down Expand Up @@ -495,7 +497,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
/**
* libp2p
*/
this.score = new PeerScore(components, this.opts.scoreParams, this.metrics, {
this.score = new PeerScore(this.opts.scoreParams, this.metrics, {
scoreCacheValidityMs: opts.heartbeatInterval
})

Expand Down Expand Up @@ -588,6 +590,8 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
const heartbeatTimeout = setTimeout(this.runHeartbeat, constants.GossipsubHeartbeatInitialDelay)
// Then, run heartbeat every `heartbeatInterval` offset by `GossipsubHeartbeatInitialDelay`

this.components.peerStore.addEventListener('change:multiaddrs', this.onPeerAddressChange)

this.status = {
code: GossipStatusCode.started,
registrarTopologyIds,
Expand Down Expand Up @@ -624,6 +628,8 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
const { registrarTopologyIds } = this.status
this.status = { code: GossipStatusCode.stopped }

this.components.peerStore.removeEventListener('change:multiaddrs', this.onPeerAddressChange)

// unregister protocol and handlers
const registrar = this.components.registrar
registrarTopologyIds.forEach((id) => registrar.unregister(id))
Expand Down Expand Up @@ -684,7 +690,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G

const peerId = connection.remotePeer
// add peer to router
this.addPeer(peerId, connection.stat.direction)
this.addPeer(peerId, connection.stat.direction, connection.remoteAddr)
// create inbound stream
this.createInboundStream(peerId, stream)
// attempt to create outbound stream
Expand All @@ -699,7 +705,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
return
}

this.addPeer(peerId, connection.stat.direction)
this.addPeer(peerId, connection.stat.direction, connection.remoteAddr)
this.outboundInflightQueue.push({ peerId, connection })
}

Expand Down Expand Up @@ -788,7 +794,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
/**
* Add a peer to the router
*/
private addPeer(peerId: PeerId, direction: ConnectionDirection): void {
private addPeer(peerId: PeerId, direction: ConnectionDirection, addr: Multiaddr): void {
const id = peerId.toString()

if (!this.peers.has(id)) {
Expand All @@ -798,6 +804,13 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G

// Add to peer scoring
this.score.addPeer(id)
const currentIP = multiaddrToIPStr(addr)
if (currentIP !== null) {
this.score.addIP(id, currentIP)
} else {
this.log('Added peer has no IP in current address %s %s', id, addr.toString())
}

// track the connection direction. Don't allow to unset outbound
if (!this.outbound.has(id)) {
this.outbound.set(id, direction === 'outbound')
Expand Down Expand Up @@ -866,6 +879,36 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
this.acceptFromWhitelist.delete(id)
}

private onPeerAddressChange(evt: CustomEvent<PeerMultiaddrsChangeData>): void {
const { peerId, multiaddrs, oldMultiaddrs } = evt.detail
const newIps = new Set<string>()
const oldIps = new Set<string>()
for (const mu of multiaddrs) {
const ipStr = multiaddrToIPStr(mu)
if (ipStr) {
newIps.add(ipStr)
}
}
for (const mu of oldMultiaddrs) {
const ipStr = multiaddrToIPStr(mu)
if (ipStr) {
// Remove multiaddrs that aren't new
if (newIps.has(ipStr)) {
newIps.delete(ipStr)
} else {
oldIps.add(ipStr)
}
}
}
const id = peerId.toString()
for (const ipStr of oldIps) {
this.score.removeIP(id, ipStr)
}
for (const ipStr of newIps) {
this.score.addIP(id, ipStr)
}
}

// API METHODS

get started(): boolean {
Expand Down
2 changes: 1 addition & 1 deletion src/score/compute-score.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export function computeScore(
score += p5 * params.appSpecificWeight

// P6: IP colocation factor
pstats.ips.forEach((ip) => {
pstats.knownIPs.forEach((ip) => {
if (params.IPColocationFactorWhitelist.has(ip)) {
return
}
Expand Down
136 changes: 39 additions & 97 deletions src/score/peer-score.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import { MessageDeliveries, DeliveryRecordStatus } from './message-deliveries.js
import { logger } from '@libp2p/logger'
import { MsgIdStr, PeerIdStr, RejectReason, TopicStr, IPStr } from '../types.js'
import type { Metrics, ScorePenalty } from '../metrics.js'
import { ConnectionManager } from '@libp2p/interface-connection-manager'
import { peerIdFromString } from '@libp2p/peer-id'
import { MapDef } from '../utils/set.js'

const log = logger('libp2p:gossipsub:score')

Expand All @@ -28,10 +27,6 @@ interface ScoreCacheEntry {

export type PeerScoreStatsDump = Record<PeerIdStr, PeerStats>

export interface PeerScoreComponents {
connectionManager: ConnectionManager
}

export class PeerScore {
/**
* Per-peer stats for score calculation
Expand All @@ -40,7 +35,7 @@ export class PeerScore {
/**
* IP colocation tracking; maps IP => set of peers.
*/
readonly peerIPs = new Map<PeerIdStr, Set<IPStr>>()
readonly peerIPs = new MapDef<PeerIdStr, Set<IPStr>>(() => new Set())
/**
* Cache score up to decayInterval if topic stats are unchanged.
*/
Expand All @@ -53,17 +48,10 @@ export class PeerScore {
_backgroundInterval?: ReturnType<typeof setInterval>

private readonly scoreCacheValidityMs: number
private readonly components: PeerScoreComponents
private readonly computeScore: typeof computeScore

constructor(
components: PeerScoreComponents,
readonly params: PeerScoreParams,
private readonly metrics: Metrics | null,
opts: PeerScoreOpts
) {
constructor(readonly params: PeerScoreParams, private readonly metrics: Metrics | null, opts: PeerScoreOpts) {
validatePeerScoreParams(params)
this.components = components
this.scoreCacheValidityMs = opts.scoreCacheValidityMs
this.computeScore = opts.computeScore ?? computeScore
}
Expand Down Expand Up @@ -105,7 +93,6 @@ export class PeerScore {
*/
background(): void {
this.refreshScores()
this.updateIPs()
this.deliveryRecords.gc()
}

Expand All @@ -125,7 +112,7 @@ export class PeerScore {
// has the retention period expired?
if (now > pstats.expire) {
// yes, throw it away (but clean up the IP tracking first)
this.removeIPs(id, pstats.ips)
this.removeIPsForPeer(id, pstats.knownIPs)
this.peerStats.delete(id)
this.scoreCache.delete(id)
}
Expand Down Expand Up @@ -236,15 +223,36 @@ export class PeerScore {
connected: true,
expire: 0,
topics: {},
ips: [],
knownIPs: new Set(),
behaviourPenalty: 0
}
this.peerStats.set(id, pstats)
}

/** Adds a new IP to a peer, if the peer is not known the update is ignored */
addIP(id: PeerIdStr, ip: string): void {
const pstats = this.peerStats.get(id)
if (pstats) {
pstats.knownIPs.add(ip)
}

// get + update peer IPs
const ips = this.getIPs(id)
this.setIPs(id, ips, pstats.ips)
pstats.ips = ips
this.peerIPs.getOrDefault(ip).add(id)
}

/** Remove peer association with IP */
removeIP(id: PeerIdStr, ip: string): void {
const pstats = this.peerStats.get(id)
if (pstats) {
pstats.knownIPs.delete(ip)
}

const peersWithIP = this.peerIPs.get(ip)
if (peersWithIP) {
peersWithIP.delete(id)
if (peersWithIP.size === 0) {
this.peerIPs.delete(ip)
}
}
}

removePeer(id: PeerIdStr): void {
Expand All @@ -256,7 +264,7 @@ export class PeerScore {
// decide whether to retain the score; this currently only retains non-positive scores
// to dissuade attacks on the score function.
if (this.score(id) > 0) {
this.removeIPs(id, pstats.ips)
this.removeIPsForPeer(id, pstats.knownIPs)
this.peerStats.delete(id)
return
}
Expand Down Expand Up @@ -501,84 +509,18 @@ export class PeerScore {
}

/**
* Gets the current IPs for a peer.
*/
private getIPs(id: PeerIdStr): IPStr[] {
return this.components.connectionManager
.getConnections(peerIdFromString(id))
.map((c) => c.remoteAddr.toOptions().host)
}

/**
* Adds tracking for the new IPs in the list, and removes tracking from the obsolete IPs.
* Removes an IP list from the tracking list for a peer.
*/
public setIPs(id: PeerIdStr, newIPs: IPStr[], oldIPs: IPStr[]): void {
// add the new IPs to the tracking
// eslint-disable-next-line no-labels
addNewIPs: for (const ip of newIPs) {
// check if it is in the old ips list
for (const xip of oldIPs) {
if (ip === xip) {
// eslint-disable-next-line no-labels
continue addNewIPs
private removeIPsForPeer(id: PeerIdStr, ipsToRemove: Set<IPStr>): void {
for (const ipToRemove of ipsToRemove) {
const peerSet = this.peerIPs.get(ipToRemove)
if (peerSet) {
peerSet.delete(id)
if (peerSet.size === 0) {
this.peerIPs.delete(ipToRemove)
}
}
// no, it's a new one -- add it to the tracker
let peers = this.peerIPs.get(ip)
if (!peers) {
peers = new Set()
this.peerIPs.set(ip, peers)
}
peers.add(id)
}
// remove the obsolete old IPs from the tracking
// eslint-disable-next-line no-labels
removeOldIPs: for (const ip of oldIPs) {
// check if it is in the new ips list
for (const xip of newIPs) {
if (ip === xip) {
// eslint-disable-next-line no-labels
continue removeOldIPs
}
}
// no, its obselete -- remove it from the tracker
const peers = this.peerIPs.get(ip)
if (!peers) {
continue
}
peers.delete(id)
if (!peers.size) {
this.peerIPs.delete(ip)
}
}
}

/**
* Removes an IP list from the tracking list for a peer.
*/
public removeIPs(id: PeerIdStr, ips: IPStr[]): void {
ips.forEach((ip) => {
const peers = this.peerIPs.get(ip)
if (!peers) {
return
}

peers.delete(id)
if (!peers.size) {
this.peerIPs.delete(ip)
}
})
}

/**
* Update all peer IPs to currently open connections
*/
public updateIPs(): void {
this.peerStats.forEach((pstats, id) => {
const newIPs = this.getIPs(id)
this.setIPs(id, newIPs, pstats.ips)
pstats.ips = newIPs
})
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/score/peer-stats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ export interface PeerStats {
expire: number
/** per topic stats */
topics: Record<TopicStr, TopicStats>
/** IP tracking; store as string for easy processing */
ips: string[]
/** IP tracking; store as set for easy processing */
knownIPs: Set<string>
/** behavioural pattern penalties (applied by the router) */
behaviourPenalty: number
}
Expand Down
2 changes: 1 addition & 1 deletion src/score/scoreMetrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ export function computeScoreWeights(
p5w += p5 * params.appSpecificWeight

// P6: IP colocation factor
pstats.ips.forEach((ip) => {
pstats.knownIPs.forEach((ip) => {
if (params.IPColocationFactorWhitelist.has(ip)) {
return
}
Expand Down
23 changes: 23 additions & 0 deletions src/utils/multiaddr.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Multiaddr } from '@multiformats/multiaddr'
import { convertToString } from '@multiformats/multiaddr/convert'

// Protocols https://github.com/multiformats/multiaddr/blob/master/protocols.csv
// code size name
// 4 32 ip4
// 41 128 ip6
enum Protocol {
ip4 = 4,
ip6 = 41
}

export function multiaddrToIPStr(multiaddr: Multiaddr): string | null {
for (const tuple of multiaddr.tuples()) {
switch (tuple[0]) {
case Protocol.ip4:
case Protocol.ip6:
return convertToString(tuple[0], tuple[1]!)
}
}

return null
}
Loading