Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): send goodbye messages on disconnecting to peers #10920

Merged
merged 11 commits into from
Jan 20, 2025
15 changes: 8 additions & 7 deletions yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@ import { type BootnodeConfig, type P2PConfig } from '../config.js';
import { type MemPools } from '../mem_pools/interface.js';
import { DiscV5Service } from '../services/discv5/discV5_service.js';
import { LibP2PService } from '../services/libp2p/libp2p_service.js';
import { type PeerManager } from '../services/peer_manager.js';
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of diffs as i moved peer-scoring and peer-manager into the peer-manager service folder

import { type PeerScoring } from '../services/peer-manager/peer_scoring.js';
import { type P2PReqRespConfig } from '../services/reqresp/config.js';
import {
ReqRespSubProtocol,
type ReqRespSubProtocolHandlers,
type ReqRespSubProtocolValidators,
noopValidator,
} from '../services/reqresp/interface.js';
import { pingHandler } from '../services/reqresp/protocols/ping.js';
import { statusHandler } from '../services/reqresp/protocols/status.js';
import { pingHandler, statusHandler } from '../services/reqresp/protocols/index.js';
import { ReqResp } from '../services/reqresp/reqresp.js';
import { type PubSubLibp2p } from '../util.js';

Expand Down Expand Up @@ -153,6 +152,7 @@ export const MOCK_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = {
[ReqRespSubProtocol.PING]: pingHandler,
[ReqRespSubProtocol.STATUS]: statusHandler,
[ReqRespSubProtocol.TX]: (_msg: any) => Promise.resolve(Buffer.from('tx')),
[ReqRespSubProtocol.GOODBYE]: (_msg: any) => Promise.resolve(Buffer.from('goodbye')),
};

// By default, all requests are valid
Expand All @@ -161,14 +161,15 @@ export const MOCK_SUB_PROTOCOL_VALIDATORS: ReqRespSubProtocolValidators = {
[ReqRespSubProtocol.PING]: noopValidator,
[ReqRespSubProtocol.STATUS]: noopValidator,
[ReqRespSubProtocol.TX]: noopValidator,
[ReqRespSubProtocol.GOODBYE]: noopValidator,
};

/**
* @param numberOfNodes - the number of nodes to create
* @returns An array of the created nodes
*/
export const createNodes = async (peerManager: PeerManager, numberOfNodes: number): Promise<ReqRespNode[]> => {
return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp(peerManager)));
export const createNodes = async (peerScoring: PeerScoring, numberOfNodes: number): Promise<ReqRespNode[]> => {
return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp(peerScoring)));
};

export const startNodes = async (
Expand All @@ -191,13 +192,13 @@ export const stopNodes = async (nodes: ReqRespNode[]): Promise<void> => {
};

// Create a req resp node, exposing the underlying p2p node
export const createReqResp = async (peerManager: PeerManager): Promise<ReqRespNode> => {
export const createReqResp = async (peerScoring: PeerScoring): Promise<ReqRespNode> => {
const p2p = await createLibp2pNode();
const config: P2PReqRespConfig = {
overallRequestTimeoutMs: 4000,
individualRequestTimeoutMs: 2000,
};
const req = new ReqResp(config, p2p, peerManager);
const req = new ReqResp(config, p2p, peerScoring);
return {
p2p,
req,
Expand Down
232 changes: 120 additions & 112 deletions yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,10 @@ import {
import { type PubSubLibp2p, convertToMultiaddr } from '../../util.js';
import { AztecDatastore } from '../data_store.js';
import { SnappyTransform, fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from '../encoding.js';
import { PeerManager } from '../peer_manager.js';
import {
DEFAULT_SUB_PROTOCOL_HANDLERS,
DEFAULT_SUB_PROTOCOL_VALIDATORS,
ReqRespSubProtocol,
type ReqRespSubProtocolHandlers,
type SubProtocolMap,
} from '../reqresp/interface.js';
import { PeerManager } from '../peer-manager/peer_manager.js';
import { PeerScoring } from '../peer-manager/peer_scoring.js';
import { DEFAULT_SUB_PROTOCOL_VALIDATORS, ReqRespSubProtocol, type SubProtocolMap } from '../reqresp/interface.js';
import { reqGoodbyeHandler } from '../reqresp/protocols/goodbye.js';
import { pingHandler, statusHandler } from '../reqresp/protocols/index.js';
import { reqRespTxHandler } from '../reqresp/protocols/tx.js';
import { ReqResp } from '../reqresp/reqresp.js';
Expand Down Expand Up @@ -115,21 +111,32 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
private peerDiscoveryService: PeerDiscoveryService,
private mempools: MemPools<T>,
private l2BlockSource: L2BlockSource,
private epochCache: EpochCache,
epochCache: EpochCache,
private proofVerifier: ClientProtocolCircuitVerifier,
private worldStateSynchronizer: WorldStateSynchronizer,
private telemetry: TelemetryClient,
private requestResponseHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea why i included these as a dependency, they are now just constructed in the start function, alongside the validators

telemetry: TelemetryClient,
private logger = createLogger('p2p:libp2p_service'),
) {
super(telemetry, 'LibP2PService');

this.peerManager = new PeerManager(node, peerDiscoveryService, config, telemetry, logger);
const peerScoring = new PeerScoring(config);
this.reqresp = new ReqResp(config, node, peerScoring);

this.peerManager = new PeerManager(
node,
peerDiscoveryService,
config,
telemetry,
logger,
peerScoring,
this.reqresp,
);

// Update gossipsub score params
this.node.services.pubsub.score.params.appSpecificScore = (peerId: string) => {
return this.peerManager.getPeerScore(peerId);
};
this.node.services.pubsub.score.params.appSpecificWeight = 10;
this.reqresp = new ReqResp(config, node, this.peerManager);

this.attestationValidator = new AttestationValidator(epochCache);
this.blockProposalValidator = new BlockProposalValidator(epochCache);
Expand All @@ -143,95 +150,6 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
};
}

/**
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just moved start and stop below the static initialiser just so that it was closer to the constructor

* Starts the LibP2P service.
* @returns An empty promise.
*/
public async start() {
// Check if service is already started
if (this.node.status === 'started') {
throw new Error('P2P service already started');
}

// Get listen & announce addresses for logging
const { tcpListenAddress, tcpAnnounceAddress } = this.config;
if (!tcpAnnounceAddress) {
throw new Error('Announce address not provided.');
}
const announceTcpMultiaddr = convertToMultiaddr(tcpAnnounceAddress, 'tcp');

// Start job queue, peer discovery service and libp2p node
this.jobQueue.start();
await this.peerDiscoveryService.start();
await this.node.start();

// Subscribe to standard GossipSub topics by default
for (const topic of getTopicTypeForClientType(this.clientType)) {
this.subscribeToTopic(TopicTypeMap[topic].p2pTopic);
}

// Add p2p topic validators
// As they are stored within a kv pair, there is no need to register them conditionally
// based on the client type
const topicValidators = {
[Tx.p2pTopic]: this.validatePropagatedTxFromMessage.bind(this),
[BlockAttestation.p2pTopic]: this.validatePropagatedAttestationFromMessage.bind(this),
[BlockProposal.p2pTopic]: this.validatePropagatedBlockFromMessage.bind(this),
[EpochProofQuote.p2pTopic]: this.validatePropagatedEpochProofQuoteFromMessage.bind(this),
};
for (const [topic, validator] of Object.entries(topicValidators)) {
this.node.services.pubsub.topicValidators.set(topic, validator);
}

// add GossipSub listener
this.node.services.pubsub.addEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this));

// Start running promise for peer discovery
this.discoveryRunningPromise = new RunningPromise(
() => this.peerManager.heartbeat(),
this.logger,
this.config.peerCheckIntervalMS,
);
this.discoveryRunningPromise.start();

// Define the sub protocol validators - This is done within this start() method to gain a callback to the existing validateTx function
const reqrespSubProtocolValidators = {
...DEFAULT_SUB_PROTOCOL_VALIDATORS,
[ReqRespSubProtocol.TX]: this.validateRequestedTx.bind(this),
};
await this.reqresp.start(this.requestResponseHandlers, reqrespSubProtocolValidators);
this.logger.info(`Started P2P service`, {
listen: tcpListenAddress,
announce: announceTcpMultiaddr,
peerId: this.node.peerId.toString(),
});
}

/**
* Stops the LibP2P service.
* @returns An empty promise.
*/
public async stop() {
// Remove gossip sub listener
this.node.services.pubsub.removeEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this));

// Stop peer manager
this.logger.debug('Stopping peer manager...');
this.peerManager.stop();

this.logger.debug('Stopping job queue...');
await this.jobQueue.end();
this.logger.debug('Stopping running promise...');
await this.discoveryRunningPromise?.stop();
this.logger.debug('Stopping peer discovery service...');
await this.peerDiscoveryService.stop();
this.logger.debug('Request response service stopped...');
await this.reqresp.stop();
this.logger.debug('Stopping LibP2P...');
await this.stopLibP2P();
this.logger.info('LibP2P service stopped');
}

/**
* Creates an instance of the LibP2P service.
* @param config - The configuration to use when creating the service.
Expand Down Expand Up @@ -334,15 +252,6 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
},
});

// Create request response protocol handlers
const txHandler = reqRespTxHandler(mempools);

const requestResponseHandlers = {
[ReqRespSubProtocol.PING]: pingHandler,
[ReqRespSubProtocol.STATUS]: statusHandler,
[ReqRespSubProtocol.TX]: txHandler,
};

return new LibP2PService(
clientType,
config,
Expand All @@ -354,10 +263,109 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
proofVerifier,
worldStateSynchronizer,
telemetry,
requestResponseHandlers,
);
}

/**
* Starts the LibP2P service.
* @returns An empty promise.
*/
public async start() {
// Check if service is already started
if (this.node.status === 'started') {
throw new Error('P2P service already started');
}

// Get listen & announce addresses for logging
const { tcpListenAddress, tcpAnnounceAddress } = this.config;
if (!tcpAnnounceAddress) {
throw new Error('Announce address not provided.');
}
const announceTcpMultiaddr = convertToMultiaddr(tcpAnnounceAddress, 'tcp');

// Start job queue, peer discovery service and libp2p node
this.jobQueue.start();
await this.peerDiscoveryService.start();
await this.node.start();

// Subscribe to standard GossipSub topics by default
for (const topic of getTopicTypeForClientType(this.clientType)) {
this.subscribeToTopic(TopicTypeMap[topic].p2pTopic);
}

// Create request response protocol handlers
const txHandler = reqRespTxHandler(this.mempools);
const goodbyeHandler = reqGoodbyeHandler(this.peerManager);

const requestResponseHandlers = {
[ReqRespSubProtocol.PING]: pingHandler,
[ReqRespSubProtocol.STATUS]: statusHandler,
[ReqRespSubProtocol.TX]: txHandler.bind(this),
[ReqRespSubProtocol.GOODBYE]: goodbyeHandler.bind(this),
};

// Add p2p topic validators
// As they are stored within a kv pair, there is no need to register them conditionally
// based on the client type
const topicValidators = {
[Tx.p2pTopic]: this.validatePropagatedTxFromMessage.bind(this),
[BlockAttestation.p2pTopic]: this.validatePropagatedAttestationFromMessage.bind(this),
[BlockProposal.p2pTopic]: this.validatePropagatedBlockFromMessage.bind(this),
[EpochProofQuote.p2pTopic]: this.validatePropagatedEpochProofQuoteFromMessage.bind(this),
};
for (const [topic, validator] of Object.entries(topicValidators)) {
this.node.services.pubsub.topicValidators.set(topic, validator);
}

// add GossipSub listener
this.node.services.pubsub.addEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this));

// Start running promise for peer discovery
this.discoveryRunningPromise = new RunningPromise(
() => this.peerManager.heartbeat(),
this.logger,
this.config.peerCheckIntervalMS,
);
this.discoveryRunningPromise.start();

// Define the sub protocol validators - This is done within this start() method to gain a callback to the existing validateTx function
const reqrespSubProtocolValidators = {
...DEFAULT_SUB_PROTOCOL_VALIDATORS,
[ReqRespSubProtocol.TX]: this.validateRequestedTx.bind(this),
};
await this.reqresp.start(requestResponseHandlers, reqrespSubProtocolValidators);
this.logger.info(`Started P2P service`, {
listen: tcpListenAddress,
announce: announceTcpMultiaddr,
peerId: this.node.peerId.toString(),
});
}

/**
* Stops the LibP2P service.
* @returns An empty promise.
*/
public async stop() {
// Remove gossip sub listener
this.node.services.pubsub.removeEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this));

// Stop peer manager
this.logger.debug('Stopping peer manager...');
await this.peerManager.stop();

this.logger.debug('Stopping job queue...');
await this.jobQueue.end();
this.logger.debug('Stopping running promise...');
await this.discoveryRunningPromise?.stop();
this.logger.debug('Stopping peer discovery service...');
await this.peerDiscoveryService.stop();
this.logger.debug('Request response service stopped...');
await this.reqresp.stop();
this.logger.debug('Stopping LibP2P...');
await this.stopLibP2P();
this.logger.info('LibP2P service stopped');
}

public getPeers(includePending?: boolean): PeerInfo[] {
return this.peerManager.getPeers(includePending);
}
Expand Down
41 changes: 41 additions & 0 deletions yarn-project/p2p/src/services/peer-manager/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import {
Attributes,
Metrics,
type TelemetryClient,
type Tracer,
type UpDownCounter,
ValueType,
} from '@aztec/telemetry-client';

import { type GoodByeReason, prettyGoodbyeReason } from '../reqresp/protocols/index.js';

export class PeerManagerMetrics {
private sentGoodbyes: UpDownCounter;
private receivedGoodbyes: UpDownCounter;

public readonly tracer: Tracer;

constructor(public readonly telemetryClient: TelemetryClient, name = 'PeerManager') {
this.tracer = telemetryClient.getTracer(name);

const meter = telemetryClient.getMeter(name);
this.sentGoodbyes = meter.createUpDownCounter(Metrics.PEER_MANAGER_GOODBYES_SENT, {
description: 'Number of goodbyes sent to peers',
unit: 'peers',
valueType: ValueType.INT,
});
this.receivedGoodbyes = meter.createUpDownCounter(Metrics.PEER_MANAGER_GOODBYES_RECEIVED, {
description: 'Number of goodbyes received from peers',
unit: 'peers',
valueType: ValueType.INT,
});
}

public recordGoodbyeSent(reason: GoodByeReason) {
this.sentGoodbyes.add(1, { [Attributes.P2P_GOODBYE_REASON]: prettyGoodbyeReason(reason) });
}

public recordGoodbyeReceived(reason: GoodByeReason) {
this.receivedGoodbyes.add(1, { [Attributes.P2P_GOODBYE_REASON]: prettyGoodbyeReason(reason) });
}
}
Loading
Loading