Skip to content

Commit

Permalink
feat: remove unnecessary conversion from Multiaddr to IP (#369)
Browse files Browse the repository at this point in the history
* Remove unnecessary conversion from Multiaddr to IP

* Uncomment un-implemented code

* chore: use proper change:multiaddrs event handler

* chore: add multiaddr as dependency

Co-authored-by: Cayman <[email protected]>
  • Loading branch information
dapplion and wemeetagain authored Nov 13, 2022
1 parent 053709e commit e37c7c2
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 142 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"@libp2p/peer-record": "^4.0.1",
"@libp2p/pubsub": "^5.0.0",
"@libp2p/topology": "^3.0.0",
"@multiformats/multiaddr": "^11.0.0",
"abortable-iterator": "^4.0.2",
"denque": "^1.5.0",
"err-code": "^3.0.1",
Expand All @@ -98,7 +99,6 @@
"@libp2p/interface-pubsub-compliance-tests": "^4.0.0",
"@libp2p/peer-id-factory": "^1.0.18",
"@libp2p/peer-store": "^5.0.0",
"@multiformats/multiaddr": "^11.0.0",
"@types/node": "^17.0.21",
"@typescript-eslint/eslint-plugin": "^3.0.2",
"@typescript-eslint/parser": "^3.0.2",
Expand Down
53 changes: 48 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ import { InboundStream, OutboundStream } from './stream.js'
import { Uint8ArrayList } from 'uint8arraylist'
import { decodeRpc, DecodeRPCLimits, defaultDecodeRpcLimits } from './message/decodeRpc.js'
import { ConnectionManager } from '@libp2p/interface-connection-manager'
import { PeerStore } from '@libp2p/interface-peer-store'
import { PeerMultiaddrsChangeData, PeerStore } from '@libp2p/interface-peer-store'
import { Multiaddr } from '@multiformats/multiaddr'
import { multiaddrToIPStr } from './utils/multiaddr.js'

type ConnectionDirection = 'inbound' | 'outbound'

Expand Down Expand Up @@ -495,7 +497,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
/**
* libp2p
*/
this.score = new PeerScore(components, this.opts.scoreParams, this.metrics, {
this.score = new PeerScore(this.opts.scoreParams, this.metrics, {
scoreCacheValidityMs: opts.heartbeatInterval
})

Expand Down Expand Up @@ -588,6 +590,8 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
const heartbeatTimeout = setTimeout(this.runHeartbeat, constants.GossipsubHeartbeatInitialDelay)
// Then, run heartbeat every `heartbeatInterval` offset by `GossipsubHeartbeatInitialDelay`

this.components.peerStore.addEventListener('change:multiaddrs', this.onPeerAddressChange)

this.status = {
code: GossipStatusCode.started,
registrarTopologyIds,
Expand Down Expand Up @@ -624,6 +628,8 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
const { registrarTopologyIds } = this.status
this.status = { code: GossipStatusCode.stopped }

this.components.peerStore.removeEventListener('change:multiaddrs', this.onPeerAddressChange)

// unregister protocol and handlers
const registrar = this.components.registrar
registrarTopologyIds.forEach((id) => registrar.unregister(id))
Expand Down Expand Up @@ -684,7 +690,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G

const peerId = connection.remotePeer
// add peer to router
this.addPeer(peerId, connection.stat.direction)
this.addPeer(peerId, connection.stat.direction, connection.remoteAddr)
// create inbound stream
this.createInboundStream(peerId, stream)
// attempt to create outbound stream
Expand All @@ -699,7 +705,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
return
}

this.addPeer(peerId, connection.stat.direction)
this.addPeer(peerId, connection.stat.direction, connection.remoteAddr)
this.outboundInflightQueue.push({ peerId, connection })
}

Expand Down Expand Up @@ -788,7 +794,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
/**
* Add a peer to the router
*/
private addPeer(peerId: PeerId, direction: ConnectionDirection): void {
private addPeer(peerId: PeerId, direction: ConnectionDirection, addr: Multiaddr): void {
const id = peerId.toString()

if (!this.peers.has(id)) {
Expand All @@ -798,6 +804,13 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G

// Add to peer scoring
this.score.addPeer(id)
const currentIP = multiaddrToIPStr(addr)
if (currentIP !== null) {
this.score.addIP(id, currentIP)
} else {
this.log('Added peer has no IP in current address %s %s', id, addr.toString())
}

// track the connection direction. Don't allow to unset outbound
if (!this.outbound.has(id)) {
this.outbound.set(id, direction === 'outbound')
Expand Down Expand Up @@ -866,6 +879,36 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
this.acceptFromWhitelist.delete(id)
}

private onPeerAddressChange(evt: CustomEvent<PeerMultiaddrsChangeData>): void {
const { peerId, multiaddrs, oldMultiaddrs } = evt.detail
const newIps = new Set<string>()
const oldIps = new Set<string>()
for (const mu of multiaddrs) {
const ipStr = multiaddrToIPStr(mu)
if (ipStr) {
newIps.add(ipStr)
}
}
for (const mu of oldMultiaddrs) {
const ipStr = multiaddrToIPStr(mu)
if (ipStr) {
// Remove multiaddrs that aren't new
if (newIps.has(ipStr)) {
newIps.delete(ipStr)
} else {
oldIps.add(ipStr)
}
}
}
const id = peerId.toString()
for (const ipStr of oldIps) {
this.score.removeIP(id, ipStr)
}
for (const ipStr of newIps) {
this.score.addIP(id, ipStr)
}
}

// API METHODS

get started(): boolean {
Expand Down
2 changes: 1 addition & 1 deletion src/score/compute-score.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export function computeScore(
score += p5 * params.appSpecificWeight

// P6: IP colocation factor
pstats.ips.forEach((ip) => {
pstats.knownIPs.forEach((ip) => {
if (params.IPColocationFactorWhitelist.has(ip)) {
return
}
Expand Down
136 changes: 39 additions & 97 deletions src/score/peer-score.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import { MessageDeliveries, DeliveryRecordStatus } from './message-deliveries.js
import { logger } from '@libp2p/logger'
import { MsgIdStr, PeerIdStr, RejectReason, TopicStr, IPStr } from '../types.js'
import type { Metrics, ScorePenalty } from '../metrics.js'
import { ConnectionManager } from '@libp2p/interface-connection-manager'
import { peerIdFromString } from '@libp2p/peer-id'
import { MapDef } from '../utils/set.js'

const log = logger('libp2p:gossipsub:score')

Expand All @@ -28,10 +27,6 @@ interface ScoreCacheEntry {

export type PeerScoreStatsDump = Record<PeerIdStr, PeerStats>

export interface PeerScoreComponents {
connectionManager: ConnectionManager
}

export class PeerScore {
/**
* Per-peer stats for score calculation
Expand All @@ -40,7 +35,7 @@ export class PeerScore {
/**
* IP colocation tracking; maps IP => set of peers.
*/
readonly peerIPs = new Map<PeerIdStr, Set<IPStr>>()
readonly peerIPs = new MapDef<PeerIdStr, Set<IPStr>>(() => new Set())
/**
* Cache score up to decayInterval if topic stats are unchanged.
*/
Expand All @@ -53,17 +48,10 @@ export class PeerScore {
_backgroundInterval?: ReturnType<typeof setInterval>

private readonly scoreCacheValidityMs: number
private readonly components: PeerScoreComponents
private readonly computeScore: typeof computeScore

constructor(
components: PeerScoreComponents,
readonly params: PeerScoreParams,
private readonly metrics: Metrics | null,
opts: PeerScoreOpts
) {
constructor(readonly params: PeerScoreParams, private readonly metrics: Metrics | null, opts: PeerScoreOpts) {
validatePeerScoreParams(params)
this.components = components
this.scoreCacheValidityMs = opts.scoreCacheValidityMs
this.computeScore = opts.computeScore ?? computeScore
}
Expand Down Expand Up @@ -105,7 +93,6 @@ export class PeerScore {
*/
background(): void {
this.refreshScores()
this.updateIPs()
this.deliveryRecords.gc()
}

Expand All @@ -125,7 +112,7 @@ export class PeerScore {
// has the retention period expired?
if (now > pstats.expire) {
// yes, throw it away (but clean up the IP tracking first)
this.removeIPs(id, pstats.ips)
this.removeIPsForPeer(id, pstats.knownIPs)
this.peerStats.delete(id)
this.scoreCache.delete(id)
}
Expand Down Expand Up @@ -236,15 +223,36 @@ export class PeerScore {
connected: true,
expire: 0,
topics: {},
ips: [],
knownIPs: new Set(),
behaviourPenalty: 0
}
this.peerStats.set(id, pstats)
}

/** Adds a new IP to a peer, if the peer is not known the update is ignored */
addIP(id: PeerIdStr, ip: string): void {
const pstats = this.peerStats.get(id)
if (pstats) {
pstats.knownIPs.add(ip)
}

// get + update peer IPs
const ips = this.getIPs(id)
this.setIPs(id, ips, pstats.ips)
pstats.ips = ips
this.peerIPs.getOrDefault(ip).add(id)
}

/** Remove peer association with IP */
removeIP(id: PeerIdStr, ip: string): void {
const pstats = this.peerStats.get(id)
if (pstats) {
pstats.knownIPs.delete(ip)
}

const peersWithIP = this.peerIPs.get(ip)
if (peersWithIP) {
peersWithIP.delete(id)
if (peersWithIP.size === 0) {
this.peerIPs.delete(ip)
}
}
}

removePeer(id: PeerIdStr): void {
Expand All @@ -256,7 +264,7 @@ export class PeerScore {
// decide whether to retain the score; this currently only retains non-positive scores
// to dissuade attacks on the score function.
if (this.score(id) > 0) {
this.removeIPs(id, pstats.ips)
this.removeIPsForPeer(id, pstats.knownIPs)
this.peerStats.delete(id)
return
}
Expand Down Expand Up @@ -501,84 +509,18 @@ export class PeerScore {
}

/**
* Gets the current IPs for a peer.
*/
private getIPs(id: PeerIdStr): IPStr[] {
return this.components.connectionManager
.getConnections(peerIdFromString(id))
.map((c) => c.remoteAddr.toOptions().host)
}

/**
* Adds tracking for the new IPs in the list, and removes tracking from the obsolete IPs.
* Removes an IP list from the tracking list for a peer.
*/
public setIPs(id: PeerIdStr, newIPs: IPStr[], oldIPs: IPStr[]): void {
// add the new IPs to the tracking
// eslint-disable-next-line no-labels
addNewIPs: for (const ip of newIPs) {
// check if it is in the old ips list
for (const xip of oldIPs) {
if (ip === xip) {
// eslint-disable-next-line no-labels
continue addNewIPs
private removeIPsForPeer(id: PeerIdStr, ipsToRemove: Set<IPStr>): void {
for (const ipToRemove of ipsToRemove) {
const peerSet = this.peerIPs.get(ipToRemove)
if (peerSet) {
peerSet.delete(id)
if (peerSet.size === 0) {
this.peerIPs.delete(ipToRemove)
}
}
// no, it's a new one -- add it to the tracker
let peers = this.peerIPs.get(ip)
if (!peers) {
peers = new Set()
this.peerIPs.set(ip, peers)
}
peers.add(id)
}
// remove the obsolete old IPs from the tracking
// eslint-disable-next-line no-labels
removeOldIPs: for (const ip of oldIPs) {
// check if it is in the new ips list
for (const xip of newIPs) {
if (ip === xip) {
// eslint-disable-next-line no-labels
continue removeOldIPs
}
}
// no, its obselete -- remove it from the tracker
const peers = this.peerIPs.get(ip)
if (!peers) {
continue
}
peers.delete(id)
if (!peers.size) {
this.peerIPs.delete(ip)
}
}
}

/**
* Removes an IP list from the tracking list for a peer.
*/
public removeIPs(id: PeerIdStr, ips: IPStr[]): void {
ips.forEach((ip) => {
const peers = this.peerIPs.get(ip)
if (!peers) {
return
}

peers.delete(id)
if (!peers.size) {
this.peerIPs.delete(ip)
}
})
}

/**
* Update all peer IPs to currently open connections
*/
public updateIPs(): void {
this.peerStats.forEach((pstats, id) => {
const newIPs = this.getIPs(id)
this.setIPs(id, newIPs, pstats.ips)
pstats.ips = newIPs
})
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/score/peer-stats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ export interface PeerStats {
expire: number
/** per topic stats */
topics: Record<TopicStr, TopicStats>
/** IP tracking; store as string for easy processing */
ips: string[]
/** IP tracking; store as set for easy processing */
knownIPs: Set<string>
/** behavioural pattern penalties (applied by the router) */
behaviourPenalty: number
}
Expand Down
2 changes: 1 addition & 1 deletion src/score/scoreMetrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ export function computeScoreWeights(
p5w += p5 * params.appSpecificWeight

// P6: IP colocation factor
pstats.ips.forEach((ip) => {
pstats.knownIPs.forEach((ip) => {
if (params.IPColocationFactorWhitelist.has(ip)) {
return
}
Expand Down
23 changes: 23 additions & 0 deletions src/utils/multiaddr.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Multiaddr } from '@multiformats/multiaddr'
import { convertToString } from '@multiformats/multiaddr/convert'

// Protocols https://github.com/multiformats/multiaddr/blob/master/protocols.csv
// code size name
// 4 32 ip4
// 41 128 ip6
enum Protocol {
ip4 = 4,
ip6 = 41
}

export function multiaddrToIPStr(multiaddr: Multiaddr): string | null {
for (const tuple of multiaddr.tuples()) {
switch (tuple[0]) {
case Protocol.ip4:
case Protocol.ip6:
return convertToString(tuple[0], tuple[1]!)
}
}

return null
}
Loading

0 comments on commit e37c7c2

Please sign in to comment.