Skip to content

Commit

Permalink
feat(p2p): reqresp spans
Browse files Browse the repository at this point in the history
  • Loading branch information
Maddiaa0 committed Jan 20, 2025
1 parent e8d51bb commit df0bc25
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ export class ConnectionSampler {

await stream?.close();
} catch (error) {
this.logger.error(`Failed to close connection to peer ${streamId}`, { error });
this.logger.warn(`Failed to close connection to peer with stream id ${streamId}`);
} finally {
this.streams.delete(streamId);
}
Expand Down
57 changes: 57 additions & 0 deletions yarn-project/p2p/src/services/reqresp/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Request response metrics
import { Attributes, Metrics, ValueType } from '@aztec/telemetry-client';
import { type TelemetryClient, type Tracer, type UpDownCounter } from '@aztec/telemetry-client';

export class ReqRespMetrics {
public readonly tracer: Tracer;

private readonly sentRequests: UpDownCounter;
private readonly receivedRequests: UpDownCounter;

private readonly failedOutboundRequests: UpDownCounter;
private readonly failedInboundRequests: UpDownCounter;

constructor(readonly telemetryClient: TelemetryClient, name = 'ReqResp') {
this.tracer = telemetryClient.getTracer(name);

const meter = telemetryClient.getMeter(name);
this.sentRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_SENT_REQUESTS, {
description: 'Number of requests sent to peers',
unit: 'requests',
valueType: ValueType.INT,
});
this.receivedRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_RECEIVED_REQUESTS, {
description: 'Number of requests received from peers',
unit: 'requests',
valueType: ValueType.INT,
});

this.failedOutboundRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_FAILED_OUTBOUND_REQUESTS, {
description: 'Number of failed outbound requests - nodes not getting valid responses',
unit: 'requests',
valueType: ValueType.INT,
});

this.failedInboundRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_FAILED_INBOUND_REQUESTS, {
description: 'Number of failed inbound requests - node failing to respond to requests',
unit: 'requests',
valueType: ValueType.INT,
});
}

public recordRequestSent(protocol: string) {
this.sentRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol });
}

public recordRequestReceived(protocol: string) {
this.receivedRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol });
}

public recordRequestError(protocol: string) {
this.failedOutboundRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol });
}

public recordResponseError(protocol: string) {
this.failedInboundRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol });
}
}
44 changes: 39 additions & 5 deletions yarn-project/p2p/src/services/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import { PeerErrorSeverity } from '@aztec/circuit-types';
import { type Logger, createLogger } from '@aztec/foundation/log';
import { executeTimeout } from '@aztec/foundation/timer';
import { Attributes, type TelemetryClient, getTelemetryClient, trackSpan } from '@aztec/telemetry-client';

import { type IncomingStreamData, type PeerId, type Stream } from '@libp2p/interface';
import { pipe } from 'it-pipe';
Expand All @@ -27,6 +28,7 @@ import {
type SubProtocolMap,
subProtocolMap,
} from './interface.js';
import { ReqRespMetrics } from './metrics.js';
import { RequestResponseRateLimiter } from './rate-limiter/rate_limiter.js';

/**
Expand All @@ -53,13 +55,19 @@ export class ReqResp {
private subProtocolHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS;
private subProtocolValidators: ReqRespSubProtocolValidators = DEFAULT_SUB_PROTOCOL_VALIDATORS;

private connectionSampler: ConnectionSampler;
private rateLimiter: RequestResponseRateLimiter;

private snappyTransform: SnappyTransform;

private connectionSampler: ConnectionSampler;
private metrics: ReqRespMetrics;

constructor(config: P2PReqRespConfig, private libp2p: Libp2p, private peerScoring: PeerScoring) {
constructor(
config: P2PReqRespConfig,
private libp2p: Libp2p,
private peerScoring: PeerScoring,
telemetryClient: TelemetryClient = getTelemetryClient(),
) {
this.logger = createLogger('p2p:reqresp');

this.overallRequestTimeoutMs = config.overallRequestTimeoutMs;
Expand All @@ -71,6 +79,11 @@ export class ReqResp {
this.connectionSampler = new ConnectionSampler(libp2p);

this.snappyTransform = new SnappyTransform();
this.metrics = new ReqRespMetrics(telemetryClient);
}

get tracer() {
return this.metrics.tracer;
}

/**
Expand All @@ -97,16 +110,16 @@ export class ReqResp {
}

// Close all active connections
await this.connectionSampler.stop();
this.logger.debug('ReqResp: Connection sampler stopped');

const closeStreamPromises = this.libp2p.getConnections().map(connection => connection.close());
await Promise.all(closeStreamPromises);
this.logger.debug('ReqResp: All active streams closed');

this.rateLimiter.stop();
this.logger.debug('ReqResp: Rate limiter stopped');

await this.connectionSampler.stop();
this.logger.debug('ReqResp: Connection sampler stopped');

// NOTE: We assume libp2p instance is managed by the caller
}

Expand Down Expand Up @@ -213,6 +226,13 @@ export class ReqResp {
*
* @throws {CollectiveReqRespTimeoutError} - If the request batch exceeds the specified timeout (`timeoutMs`).
*/
@trackSpan(
'ReqResp.sendBatchRequest',
(subProtocol: ReqRespSubProtocol, requests: InstanceType<SubProtocolMap[ReqRespSubProtocol]['request']>[]) => ({
[Attributes.P2P_REQ_RESP_PROTOCOL]: subProtocol,
[Attributes.P2P_REQ_RESP_BATCH_REQUESTS_COUNT]: requests.length,
}),
)
async sendBatchRequest<SubProtocol extends ReqRespSubProtocol>(
subProtocol: SubProtocol,
requests: InstanceType<SubProtocolMap[SubProtocol]['request']>[],
Expand Down Expand Up @@ -354,13 +374,19 @@ export class ReqResp {
* If the stream is not closed by the dialled peer, and a timeout occurs, then
* the stream is closed on the requester's end and sender (us) updates its peer score
*/
@trackSpan('ReqResp.sendRequestToPeer', (peerId: PeerId, subProtocol: ReqRespSubProtocol, _: Buffer) => ({
[Attributes.P2P_ID]: peerId.toString(),
[Attributes.P2P_REQ_RESP_PROTOCOL]: subProtocol,
}))
public async sendRequestToPeer(
peerId: PeerId,
subProtocol: ReqRespSubProtocol,
payload: Buffer,
): Promise<Buffer | undefined> {
let stream: Stream | undefined;
try {
this.metrics.recordRequestSent(subProtocol);

stream = await this.connectionSampler.dialProtocol(peerId, subProtocol);

// Open the stream with a timeout
Expand All @@ -372,6 +398,7 @@ export class ReqResp {

return result;
} catch (e: any) {
this.metrics.recordRequestError(subProtocol);
this.handleResponseError(e, peerId, subProtocol);
} finally {
// Only close the stream if we created it
Expand Down Expand Up @@ -479,7 +506,13 @@ export class ReqResp {
* We check rate limits for each peer, note the peer will be penalised within the rate limiter implementation
* if they exceed their peer specific limits.
*/
@trackSpan('ReqResp.streamHandler', (protocol: ReqRespSubProtocol, { connection }: IncomingStreamData) => ({
[Attributes.P2P_REQ_RESP_PROTOCOL]: protocol,
[Attributes.P2P_ID]: connection.remotePeer.toString(),
}))
private async streamHandler(protocol: ReqRespSubProtocol, { stream, connection }: IncomingStreamData) {
this.metrics.recordRequestReceived(protocol);

// Store a reference to from this for the async generator
if (!this.rateLimiter.allow(protocol, connection.remotePeer)) {
this.logger.warn(`Rate limit exceeded for ${protocol} from ${connection.remotePeer}`);
Expand All @@ -506,6 +539,7 @@ export class ReqResp {
);
} catch (e: any) {
this.logger.warn(e);
this.metrics.recordResponseError(protocol);
} finally {
await stream.close();
}
Expand Down
2 changes: 2 additions & 0 deletions yarn-project/telemetry-client/src/attributes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ export const ROLLUP_PROVER_ID = 'aztec.rollup.prover_id';
export const PROOF_TIMED_OUT = 'aztec.proof.timed_out';

export const P2P_ID = 'aztec.p2p.id';
export const P2P_REQ_RESP_PROTOCOL = 'aztec.p2p.req_resp.protocol';
export const P2P_REQ_RESP_BATCH_REQUESTS_COUNT = 'aztec.p2p.req_resp.batch_requests_count';
export const POOL_NAME = 'aztec.pool.name';

export const SEQUENCER_STATE = 'aztec.sequencer.state';
Expand Down
5 changes: 5 additions & 0 deletions yarn-project/telemetry-client/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ export const L1_PUBLISHER_TX_BLOBDATA_GAS_COST = 'aztec.l1_publisher.tx_blobdata
export const PEER_MANAGER_GOODBYES_SENT = 'aztec.peer_manager.goodbyes_sent';
export const PEER_MANAGER_GOODBYES_RECEIVED = 'aztec.peer_manager.goodbyes_received';

export const P2P_REQ_RESP_SENT_REQUESTS = 'aztec.p2p.req_resp.sent_requests';
export const P2P_REQ_RESP_RECEIVED_REQUESTS = 'aztec.p2p.req_resp.received_requests';
export const P2P_REQ_RESP_FAILED_OUTBOUND_REQUESTS = 'aztec.p2p.req_resp.failed_outbound_requests';
export const P2P_REQ_RESP_FAILED_INBOUND_REQUESTS = 'aztec.p2p.req_resp.failed_inbound_requests';

export const PUBLIC_PROCESSOR_TX_DURATION = 'aztec.public_processor.tx_duration';
export const PUBLIC_PROCESSOR_TX_COUNT = 'aztec.public_processor.tx_count';
export const PUBLIC_PROCESSOR_TX_PHASE_COUNT = 'aztec.public_processor.tx_phase_count';
Expand Down

0 comments on commit df0bc25

Please sign in to comment.