Skip to content

Commit

Permalink
chore: reqresp + goodbye tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Maddiaa0 committed Jan 19, 2025
1 parent ee93473 commit b755ab5
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 28 deletions.
20 changes: 15 additions & 5 deletions yarn-project/p2p/src/services/peer-manager/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,32 @@ import {
import { type GoodByeReason, prettyGoodbyeReason } from '../reqresp/protocols/index.js';

export class PeerManagerMetrics {
private disconnectedPeers: UpDownCounter;
private sentGoodbyes: UpDownCounter;
private receivedGoodbyes: UpDownCounter;

public readonly tracer: Tracer;

constructor(public readonly telemetryClient: TelemetryClient, name = 'PeerManager') {
this.tracer = telemetryClient.getTracer(name);

const meter = telemetryClient.getMeter(name);
this.disconnectedPeers = meter.createUpDownCounter(Metrics.PEER_MANAGER_DISCONNECTED_PEERS, {
description: 'Number of disconnected peers',
this.sentGoodbyes = meter.createUpDownCounter(Metrics.PEER_MANAGER_GOODBYES_SENT, {
description: 'Number of goodbyes sent to peers',
unit: 'peers',
valueType: ValueType.INT,
});
this.receivedGoodbyes = meter.createUpDownCounter(Metrics.PEER_MANAGER_GOODBYES_RECEIVED, {
description: 'Number of goodbyes received from peers',
unit: 'peers',
valueType: ValueType.INT,
});
}

public recordGoodbyeSent(reason: GoodByeReason) {
this.sentGoodbyes.add(1, { [Attributes.P2P_GOODBYE_REASON]: prettyGoodbyeReason(reason) });
}

public recordDisconnectedPeer(reason: GoodByeReason) {
this.disconnectedPeers.add(1, { [Attributes.P2P_GOODBYE_REASON]: prettyGoodbyeReason(reason) });
public recordGoodbyeReceived(reason: GoodByeReason) {
this.receivedGoodbyes.add(1, { [Attributes.P2P_GOODBYE_REASON]: prettyGoodbyeReason(reason) });
}
}
61 changes: 60 additions & 1 deletion yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { PeerErrorSeverity } from '@aztec/circuit-types';
import { createLogger } from '@aztec/foundation/log';
import { sleep } from '@aztec/foundation/sleep';
import { getTelemetryClient } from '@aztec/telemetry-client';
import { Attributes, getTelemetryClient } from '@aztec/telemetry-client';

import { type ENR, SignableENR } from '@chainsafe/enr';
import { jest } from '@jest/globals';
Expand Down Expand Up @@ -132,6 +132,20 @@ describe('PeerManager', () => {
// Verify that discover was called
expect(mockPeerDiscoveryService.runRandomNodesQuery).toHaveBeenCalled();
});

it('should send goodbye to peers on shutdown', async () => {
const peerId = await createSecp256k1PeerId();
const peerId2 = await createSecp256k1PeerId();
mockLibP2PNode.getPeers.mockReturnValue([peerId, peerId2]);

const goodbyeAndDisconnectPeerSpy = jest.spyOn(peerManager as any, 'goodbyeAndDisconnectPeer');

await peerManager.stop();

// Both peers were sent goodbyes on shutdown
expect(goodbyeAndDisconnectPeerSpy).toHaveBeenCalledWith(peerId, GoodByeReason.SHUTDOWN);
expect(goodbyeAndDisconnectPeerSpy).toHaveBeenCalledWith(peerId2, GoodByeReason.SHUTDOWN);
});
});

describe('peer timeout functionality', () => {
Expand Down Expand Up @@ -337,4 +351,49 @@ describe('PeerManager', () => {
expect(mockReqResp.sendRequestToPeer).toHaveBeenCalledTimes(2);
});
});

describe('goodbye metrics', () => {
it('should record metrics when receiving goodbye messages', async () => {
const peerId = await createSecp256k1PeerId();

// Get reference to the counter's add function
const goodbyeReceivedMetric = jest.spyOn((peerManager as any).metrics.receivedGoodbyes, 'add');

// Test receiving goodbye for different reasons
peerManager.goodbyeReceived(peerId, GoodByeReason.BANNED);
expect(goodbyeReceivedMetric).toHaveBeenCalledWith(1, { [Attributes.P2P_GOODBYE_REASON]: 'banned' });

peerManager.goodbyeReceived(peerId, GoodByeReason.DISCONNECTED);
expect(goodbyeReceivedMetric).toHaveBeenCalledWith(1, { [Attributes.P2P_GOODBYE_REASON]: 'disconnected' });

peerManager.goodbyeReceived(peerId, GoodByeReason.SHUTDOWN);
expect(goodbyeReceivedMetric).toHaveBeenCalledWith(1, { [Attributes.P2P_GOODBYE_REASON]: 'shutdown' });
});

it('should record metrics when sending goodbye messages', async () => {
const peerId = await createSecp256k1PeerId();

// Get reference to the counter's add function
const goodbyeSentMetric = jest.spyOn((peerManager as any).metrics.sentGoodbyes, 'add');

// Mock connections to include our test peer
mockLibP2PNode.getConnections.mockReturnValue([{ remotePeer: peerId }]);

// Test sending goodbye for different scenarios

// Test banned scenario
peerManager.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError); // Set score below -100
peerManager.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError);
peerManager.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError);
peerManager.heartbeat();
expect(goodbyeSentMetric).toHaveBeenCalledWith(1, { [Attributes.P2P_GOODBYE_REASON]: 'banned' });

// Reset mocks
mockLibP2PNode.getPeers.mockReturnValue([{ remotePeer: peerId }]);

// Test shutdown scenario
await peerManager.stop();
expect(goodbyeSentMetric).toHaveBeenCalledWith(1, { [Attributes.P2P_GOODBYE_REASON]: 'shutdown' });
});
});
});
25 changes: 17 additions & 8 deletions yarn-project/p2p/src/services/peer-manager/peer_manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type PeerErrorSeverity, type PeerInfo } from '@aztec/circuit-types';
import { createLogger } from '@aztec/foundation/log';
import { type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client';
import { type TelemetryClient, trackSpan } from '@aztec/telemetry-client';

import { type ENR } from '@chainsafe/enr';
import { type Connection, type PeerId } from '@libp2p/interface';
Expand Down Expand Up @@ -123,9 +123,17 @@ export class PeerManager {
}
}

// TODO: include reason here and add to metrics, but this is fine for now
public goodbyeReceived(peerId: PeerId) {
this.logger.debug(`Goodbye received from peer ${peerId.toString()}`);
/**
* Handles a goodbye received from a peer.
*
* Used as the reqresp handler when a peer sends us goodbye message.
* @param peerId - The peer ID.
* @param reason - The reason for the goodbye.
*/
public goodbyeReceived(peerId: PeerId, reason: GoodByeReason) {
this.logger.debug(`Goodbye received from peer ${peerId.toString()} with reason ${prettyGoodbyeReason(reason)}`);

this.metrics.recordGoodbyeReceived(reason);

void this.disconnectPeer(peerId);
}
Expand Down Expand Up @@ -258,7 +266,7 @@ export class PeerManager {
private async goodbyeAndDisconnectPeer(peer: PeerId, reason: GoodByeReason) {
this.logger.debug(`Disconnecting peer ${peer.toString()} with reason ${prettyGoodbyeReason(reason)}`);

this.metrics.recordDisconnectedPeer(reason);
this.metrics.recordGoodbyeSent(reason);

try {
await this.reqresp.sendRequestToPeer(peer, ReqRespSubProtocol.GOODBYE, Buffer.from([reason]));
Expand Down Expand Up @@ -403,14 +411,15 @@ export class PeerManager {
* Removing all event listeners.
*/
public async stop() {
this.libP2PNode.removeEventListener(PeerEvent.CONNECTED, this.handleConnectedPeerEvent);
this.libP2PNode.removeEventListener(PeerEvent.DISCONNECTED, this.handleDisconnectedPeerEvent);
this.peerDiscoveryService.off(PeerEvent.DISCOVERED, this.handleDiscoveredPeer);

// Send goodbyes to all peers
await Promise.all(
this.libP2PNode.getPeers().map(peer => this.goodbyeAndDisconnectPeer(peer, GoodByeReason.DISCONNECTED)),
this.libP2PNode.getPeers().map(peer => this.goodbyeAndDisconnectPeer(peer, GoodByeReason.SHUTDOWN)),
);

this.libP2PNode.removeEventListener(PeerEvent.CONNECTED, this.handleConnectedPeerEvent);
this.libP2PNode.removeEventListener(PeerEvent.DISCONNECTED, this.handleDisconnectedPeerEvent);
}
}

Expand Down
15 changes: 15 additions & 0 deletions yarn-project/p2p/src/services/reqresp/protocols/goodbye.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { GoodByeReason, decodeGoodbyeReason, encodeGoodbyeReason } from './goodbye.js';

describe('goodbye', () => {
it('should encode and decode goodbye reason', () => {
const reason = GoodByeReason.SHUTDOWN;
const encoded = encodeGoodbyeReason(reason);
const decoded = decodeGoodbyeReason(encoded);
expect(decoded).toBe(reason);
});

it('should return unknown if the goodbye reason buffer length is invalid', () => {
const invalidBuffer = Buffer.from([0x1, 0x2]);
expect(decodeGoodbyeReason(invalidBuffer)).toBe(GoodByeReason.UNKNOWN);
});
});
38 changes: 30 additions & 8 deletions yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,37 @@ import { type PeerManager } from '../../peer-manager/peer_manager.js';
import { ReqRespSubProtocol, type ReqRespSubProtocolHandler } from '../interface.js';
import { type ReqResp } from '../reqresp.js';

// TODO: implement fully

/**
* Enum defining the possible reasons for a goodbye message.
*/
export enum GoodByeReason {
/** The peer has shutdown, will be received whenever a peer's node is routinely stopped */
SHUTDOWN = 0x1,
// TOOD(md): what is the correct values to put in here - read other specs to see reasons
// what is even the point of the reason
/** Whenever the peer must disconnect due to maintaining max peers */
DISCONNECTED = 0x2,
/** The peer has a low score, will be received whenever a peer's score is low */
LOW_SCORE = 0x3,
/** The peer has been banned, will be received whenever a peer is banned */
BANNED = 0x4,
/** Wrong network / fork */
WRONG_NETWORK = 0x5,
/** Unknown reason */
UNKNOWN = 0x6,
}

export function encodeGoodbyeReason(reason: GoodByeReason): Buffer {
return Buffer.from([reason]);
}

export function decodeGoodbyeReason(buffer: Buffer): GoodByeReason {
try {
if (buffer.length !== 1) {
throw new Error('Invalid goodbye reason buffer length');
}
return buffer[0] as GoodByeReason;
} catch (error) {
return GoodByeReason.UNKNOWN;
}
}

/**
Expand All @@ -36,9 +51,14 @@ export function prettyGoodbyeReason(reason: GoodByeReason): string {
case GoodByeReason.DISCONNECTED:
return 'disconnected';
case GoodByeReason.LOW_SCORE:
return 'low score';
return 'low_score';
case GoodByeReason.BANNED:
return 'banned';
// TODO(https://github.com/AztecProtocol/aztec-packages/issues/11328): implement
case GoodByeReason.WRONG_NETWORK:
return 'wrong_network';
case GoodByeReason.UNKNOWN:
return 'unknown';
}
}

Expand Down Expand Up @@ -67,9 +87,11 @@ export class GoodbyeProtocolHandler {
*/
export function reqGoodbyeHandler(peerManager: PeerManager): ReqRespSubProtocolHandler {
return (peerId: PeerId, _msg: Buffer) => {
peerManager.goodbyeReceived(peerId);
const reason = decodeGoodbyeReason(_msg);

peerManager.goodbyeReceived(peerId, reason);

// TODO(md): they want to receive some kind of response, but we don't have a response here
return Promise.resolve(Buffer.from(''));
// Return a buffer of length 1 as an acknowledgement
return Promise.resolve(Buffer.from([0x0]));
};
}
28 changes: 23 additions & 5 deletions yarn-project/p2p/src/services/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,23 @@ import {
startNodes,
stopNodes,
} from '../../mocks/index.js';
import { PeerManager } from '../peer-manager/peer_manager.js';
import { type PeerScoring } from '../peer-manager/peer_scoring.js';
import { ReqRespSubProtocol, RequestableBuffer } from './interface.js';
import { GoodByeReason } from './protocols/goodbye.js';
import { GoodByeReason, reqGoodbyeHandler } from './protocols/goodbye.js';

const PING_REQUEST = RequestableBuffer.fromBuffer(Buffer.from('ping'));

// The Req Resp protocol should allow nodes to dial specific peers
// and ask for specific data that they missed via the traditional gossip protocol.
describe('ReqResp', () => {
let peerManager: MockProxy<PeerManager>;
let peerScoring: MockProxy<PeerScoring>;
let nodes: ReqRespNode[];

beforeEach(() => {
peerScoring = mock<PeerScoring>();
peerManager = mock<PeerManager>();
});

afterEach(async () => {
Expand Down Expand Up @@ -314,19 +317,34 @@ describe('ReqResp', () => {
});

describe('Goodbye protocol', () => {
it('Should send a goodbye message to a peer', async () => {
const nodes = await createNodes(peerScoring, 2);
it('should send a goodbye message to a peer', async () => {
nodes = await createNodes(peerScoring, 2);

await startNodes(nodes);
const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS;
// Req Goodbye Handler is defined in the reqresp.ts file
protocolHandlers[ReqRespSubProtocol.GOODBYE] = reqGoodbyeHandler(peerManager);

await startNodes(nodes, protocolHandlers);
await sleep(500);
await connectToPeers(nodes);
await sleep(500);

await nodes[0].req.sendRequestToPeer(
const response = await nodes[0].req.sendRequestToPeer(
nodes[1].p2p.peerId,
ReqRespSubProtocol.GOODBYE,
Buffer.from([GoodByeReason.SHUTDOWN]),
);

// Node 1 Peer manager receives the goodbye from the sending node
expect(peerManager.goodbyeReceived).toHaveBeenCalledWith(
expect.objectContaining({
publicKey: nodes[0].p2p.peerId.publicKey,
}),
GoodByeReason.SHUTDOWN,
);

// Expect the response to be a buffer of length 1
expect(response).toEqual(Buffer.from([0x0]));
});
});
});
3 changes: 2 additions & 1 deletion yarn-project/telemetry-client/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ export const L1_PUBLISHER_TX_CALLDATA_GAS = 'aztec.l1_publisher.tx_calldata_gas'
export const L1_PUBLISHER_TX_BLOBDATA_GAS_USED = 'aztec.l1_publisher.tx_blobdata_gas_used';
export const L1_PUBLISHER_TX_BLOBDATA_GAS_COST = 'aztec.l1_publisher.tx_blobdata_gas_cost';

export const PEER_MANAGER_DISCONNECTED_PEERS = 'aztec.peer_manager.disconnected_peers';
export const PEER_MANAGER_GOODBYES_SENT = 'aztec.peer_manager.goodbyes_sent';
export const PEER_MANAGER_GOODBYES_RECEIVED = 'aztec.peer_manager.goodbyes_received';

export const PUBLIC_PROCESSOR_TX_DURATION = 'aztec.public_processor.tx_duration';
export const PUBLIC_PROCESSOR_TX_COUNT = 'aztec.public_processor.tx_count';
Expand Down

0 comments on commit b755ab5

Please sign in to comment.