Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Expose P2P service API and clean up logs #10552

Merged
merged 11 commits into from
Dec 10, 2024
11 changes: 8 additions & 3 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ export class AztecNodeService implements AztecNode {
return this.blockSource;
}

public getP2P(): P2P {
return this.p2pClient;
}

/**
* Method to return the currently deployed L1 contract addresses.
* @returns - The currently deployed L1 contract addresses.
Expand Down Expand Up @@ -427,11 +431,12 @@ export class AztecNodeService implements AztecNode {
* @returns - The pending txs.
*/
public getPendingTxs() {
return Promise.resolve(this.p2pClient!.getTxs('pending'));
return this.p2pClient!.getPendingTxs();
}

public getPendingTxCount() {
return Promise.resolve(this.p2pClient!.getTxs('pending').length);
public async getPendingTxCount() {
const pendingTxs = await this.getPendingTxs();
return pendingTxs.length;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/aztec/src/cli/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export function injectAztecCommands(program: Command, userLog: LogFn, debugLogge
await startArchiver(options, signalHandlers, services);
} else if (options.p2pBootstrap) {
const { startP2PBootstrap } = await import('./cmds/start_p2p_bootstrap.js');
await startP2PBootstrap(options, userLog, debugLogger);
await startP2PBootstrap(options, signalHandlers, services, userLog);
} else if (options.proverAgent) {
const { startProverAgent } = await import('./cmds/start_prover_agent.js');
await startProverAgent(options, signalHandlers, services, userLog);
Expand Down
5 changes: 3 additions & 2 deletions yarn-project/aztec/src/cli/cmds/start_node.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { aztecNodeConfigMappings } from '@aztec/aztec-node';
import { AztecNodeApiSchema, type PXE } from '@aztec/circuit-types';
import { AztecNodeApiSchema, P2PApiSchema, type PXE } from '@aztec/circuit-types';
import { NULL_KEY } from '@aztec/ethereum';
import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server';
import { type LogFn } from '@aztec/foundation/log';
Expand Down Expand Up @@ -93,8 +93,9 @@ export async function startNode(
// Create and start Aztec Node
const node = await createAztecNode(nodeConfig, telemetryClient);

// Add node to services list
// Add node and p2p to services list
services.node = [node, AztecNodeApiSchema];
services.p2p = [node.getP2P(), P2PApiSchema];

// Add node stop function to signal handlers
signalHandlers.push(node.stop.bind(node));
Expand Down
25 changes: 17 additions & 8 deletions yarn-project/aztec/src/cli/cmds/start_p2p_bootstrap.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
import { type Logger } from '@aztec/aztec.js';
import { type LogFn } from '@aztec/foundation/log';
import { type BootnodeConfig, bootnodeConfigMappings } from '@aztec/p2p';
import runBootstrapNode from '@aztec/p2p-bootstrap';
import { P2PBootstrapApiSchema } from '@aztec/circuit-types';
import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server';
import { type LogFn, createLogger } from '@aztec/foundation/log';
import { createStore } from '@aztec/kv-store/lmdb';
import { type BootnodeConfig, BootstrapNode, bootnodeConfigMappings } from '@aztec/p2p';
import {
createAndStartTelemetryClient,
getConfigEnvVars as getTelemetryClientConfig,
} from '@aztec/telemetry-client/start';

import { extractRelevantOptions } from '../util.js';

export const startP2PBootstrap = async (options: any, userLog: LogFn, debugLogger: Logger) => {
export async function startP2PBootstrap(
options: any,
signalHandlers: (() => Promise<void>)[],
services: NamespacedApiHandlers,
userLog: LogFn,
) {
// Start a P2P bootstrap node.
const config = extractRelevantOptions<BootnodeConfig>(options, bootnodeConfigMappings, 'p2p');
const telemetryClient = await createAndStartTelemetryClient(getTelemetryClientConfig());

await runBootstrapNode(config, telemetryClient, debugLogger);
const store = await createStore('p2p-bootstrap', config, createLogger('p2p:bootstrap:store'));
const node = new BootstrapNode(store, telemetryClient);
await node.start(config);
signalHandlers.push(() => node.stop());
services.bootstrap = [node, P2PBootstrapApiSchema];
userLog(`P2P bootstrap node started on ${config.udpListenAddress}`);
};
}
8 changes: 6 additions & 2 deletions yarn-project/aztec/src/cli/cmds/start_prover_node.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ProverNodeApiSchema, type ProvingJobBroker, createAztecNodeClient } from '@aztec/circuit-types';
import { P2PApiSchema, ProverNodeApiSchema, type ProvingJobBroker, createAztecNodeClient } from '@aztec/circuit-types';
import { NULL_KEY } from '@aztec/ethereum';
import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server';
import { type LogFn } from '@aztec/foundation/log';
Expand Down Expand Up @@ -81,12 +81,16 @@ export async function startProverNode(
const proverNode = await createProverNode(proverConfig, { telemetry, broker });
services.proverNode = [proverNode, ProverNodeApiSchema];

const p2p = proverNode.getP2P();
if (p2p) {
services.p2p = [proverNode.getP2P(), P2PApiSchema];
}

if (!proverConfig.proverBrokerUrl) {
services.provingJobSource = [proverNode.getProver().getProvingJobSource(), ProvingJobConsumerSchema];
}

signalHandlers.push(proverNode.stop.bind(proverNode));

// Automatically start proving unproven blocks
await proverNode.start();
}
2 changes: 2 additions & 0 deletions yarn-project/circuit-types/src/interfaces/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ export * from './service.js';
export * from './sync-status.js';
export * from './world_state.js';
export * from './prover-broker.js';
export * from './p2p.js';
export * from './p2p-bootstrap.js';
21 changes: 21 additions & 0 deletions yarn-project/circuit-types/src/interfaces/p2p-bootstrap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { type ApiSchemaFor } from '@aztec/foundation/schemas';

import { z } from 'zod';

/** Exposed API to the P2P bootstrap node. */
export interface P2PBootstrapApi {
/**
* Returns the ENR for this node.
*/
getEncodedEnr(): Promise<string>;

/**
* Returns ENRs for all nodes in the routing table.
*/
getRoutingTable(): Promise<string[]>;
}

export const P2PBootstrapApiSchema: ApiSchemaFor<P2PBootstrapApi> = {
getEncodedEnr: z.function().returns(z.string()),
getRoutingTable: z.function().returns(z.array(z.string())),
};
82 changes: 82 additions & 0 deletions yarn-project/circuit-types/src/interfaces/p2p.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { type JsonRpcTestContext, createJsonRpcTestSetup } from '@aztec/foundation/json-rpc/test';

import { BlockAttestation } from '../p2p/block_attestation.js';
import { EpochProofQuote } from '../prover_coordination/epoch_proof_quote.js';
import { Tx } from '../tx/tx.js';
import { type P2PApi, P2PApiSchema, type PeerInfo } from './p2p.js';

describe('P2PApiSchema', () => {
let handler: MockP2P;
let context: JsonRpcTestContext<P2PApi>;

const tested = new Set<string>();

beforeEach(async () => {
handler = new MockP2P();
context = await createJsonRpcTestSetup<P2PApi>(handler, P2PApiSchema);
});

afterEach(() => {
tested.add(/^P2PApiSchema\s+([^(]+)/.exec(expect.getState().currentTestName!)![1]);
context.httpServer.close();
});

afterAll(() => {
const all = Object.keys(P2PApiSchema);
expect([...tested].sort()).toEqual(all.sort());
});

it('getAttestationsForSlot', async () => {
const attestations = await context.client.getAttestationsForSlot(BigInt(1), 'proposalId');
expect(attestations).toEqual([BlockAttestation.empty()]);
expect(attestations[0]).toBeInstanceOf(BlockAttestation);
});

it('getEpochProofQuotes', async () => {
const quotes = await context.client.getEpochProofQuotes(BigInt(1));
expect(quotes).toEqual([EpochProofQuote.empty()]);
expect(quotes[0]).toBeInstanceOf(EpochProofQuote);
});

it('getPendingTxs', async () => {
const txs = await context.client.getPendingTxs();
expect(txs[0]).toBeInstanceOf(Tx);
});

it('getEncodedEnr', async () => {
const enr = await context.client.getEncodedEnr();
expect(enr).toEqual('enr');
});

it('getPeers', async () => {
const peers = await context.client.getPeers();
expect(peers).toEqual(peers);
});
});

const peers: PeerInfo[] = [
{ status: 'connected', score: 1, id: 'id' },
{ status: 'dialing', dialStatus: 'dialStatus', id: 'id', addresses: ['address'] },
{ status: 'cached', id: 'id', addresses: ['address'], enr: 'enr', dialAttempts: 1 },
];

class MockP2P implements P2PApi {
getAttestationsForSlot(slot: bigint, proposalId?: string | undefined): Promise<BlockAttestation[]> {
expect(slot).toEqual(1n);
expect(proposalId).toEqual('proposalId');
return Promise.resolve([BlockAttestation.empty()]);
}
getEpochProofQuotes(epoch: bigint): Promise<EpochProofQuote[]> {
expect(epoch).toEqual(1n);
return Promise.resolve([EpochProofQuote.empty()]);
}
getPendingTxs(): Promise<Tx[]> {
return Promise.resolve([Tx.random()]);
}
getEncodedEnr(): Promise<string | undefined> {
return Promise.resolve('enr');
}
getPeers(): Promise<PeerInfo[]> {
return Promise.resolve(peers);
}
}
71 changes: 71 additions & 0 deletions yarn-project/circuit-types/src/interfaces/p2p.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { type ApiSchemaFor, optional, schemas } from '@aztec/foundation/schemas';

import { z } from 'zod';

import { BlockAttestation } from '../p2p/block_attestation.js';
import { EpochProofQuote } from '../prover_coordination/epoch_proof_quote.js';
import { Tx } from '../tx/tx.js';

export type PeerInfo =
| { status: 'connected'; score: number; id: string }
| { status: 'dialing'; dialStatus: string; id: string; addresses: string[] }
| { status: 'cached'; id: string; addresses: string[]; enr: string; dialAttempts: number };

const PeerInfoSchema = z.discriminatedUnion('status', [
z.object({ status: z.literal('connected'), score: z.number(), id: z.string() }),
z.object({ status: z.literal('dialing'), dialStatus: z.string(), id: z.string(), addresses: z.array(z.string()) }),
z.object({
status: z.literal('cached'),
id: z.string(),
addresses: z.array(z.string()),
enr: z.string(),
dialAttempts: z.number(),
}),
]);

/** Exposed API to the P2P module. */
export interface P2PApi {
/**
* Queries the Attestation pool for attestations for the given slot
*
* @param slot - the slot to query
* @param proposalId - the proposal id to query, or undefined to query all proposals for the slot
* @returns BlockAttestations
*/
getAttestationsForSlot(slot: bigint, proposalId?: string): Promise<BlockAttestation[]>;

/**
* Queries the EpochProofQuote pool for quotes for the given epoch
*
* @param epoch - the epoch to query
* @returns EpochProofQuotes
*/
getEpochProofQuotes(epoch: bigint): Promise<EpochProofQuote[]>;

/**
* Returns all pending transactions in the transaction pool.
* @returns An array of Txs.
*/
getPendingTxs(): Promise<Tx[]>;

/**
* Returns the ENR for this node, if any.
*/
getEncodedEnr(): Promise<string | undefined>;

/**
* Returns info for all connected, dialing, and cached peers.
*/
getPeers(): Promise<PeerInfo[]>;
}

export const P2PApiSchema: ApiSchemaFor<P2PApi> = {
getAttestationsForSlot: z
.function()
.args(schemas.BigInt, optional(z.string()))
.returns(z.array(BlockAttestation.schema)),
getEpochProofQuotes: z.function().args(schemas.BigInt).returns(z.array(EpochProofQuote.schema)),
getPendingTxs: z.function().returns(z.array(Tx.schema)),
getEncodedEnr: z.function().returns(z.string().optional()),
getPeers: z.function().returns(z.array(PeerInfoSchema)),
};
12 changes: 12 additions & 0 deletions yarn-project/circuit-types/src/p2p/block_attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ import { keccak256, recoverAddress } from '@aztec/foundation/crypto';
import { type EthAddress } from '@aztec/foundation/eth-address';
import { Signature } from '@aztec/foundation/eth-signature';
import { type Fr } from '@aztec/foundation/fields';
import { type ZodFor } from '@aztec/foundation/schemas';
import { BufferReader, serializeToBuffer } from '@aztec/foundation/serialize';

import { z } from 'zod';

import { ConsensusPayload } from './consensus_payload.js';
import { Gossipable } from './gossipable.js';
import { SignatureDomainSeperator, getHashedSignaturePayloadEthSignedMessage } from './signature_utils.js';
Expand Down Expand Up @@ -37,6 +40,15 @@ export class BlockAttestation extends Gossipable {
super();
}

static get schema(): ZodFor<BlockAttestation> {
return z
.object({
payload: ConsensusPayload.schema,
signature: Signature.schema,
})
.transform(obj => new BlockAttestation(obj.payload, obj.signature));
}

override p2pMessageIdentifier(): Buffer32 {
return new BlockAttestationHash(keccak256(this.signature.toBuffer()));
}
Expand Down
11 changes: 11 additions & 0 deletions yarn-project/circuit-types/src/p2p/consensus_payload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { hexToBuffer } from '@aztec/foundation/string';
import { type FieldsOf } from '@aztec/foundation/types';

import { encodeAbiParameters, parseAbiParameters } from 'viem';
import { z } from 'zod';

import { TxHash } from '../tx/tx_hash.js';
import { type Signable, type SignatureDomainSeperator } from './signature_utils.js';
Expand All @@ -21,6 +22,16 @@ export class ConsensusPayload implements Signable {
public readonly txHashes: TxHash[],
) {}

static get schema() {
return z
.object({
header: BlockHeader.schema,
archive: Fr.schema,
txHashes: z.array(TxHash.schema),
})
.transform(obj => new ConsensusPayload(obj.header, obj.archive, obj.txHashes));
}

static getFields(fields: FieldsOf<ConsensusPayload>) {
return [fields.header, fields.archive, fields.txHashes] as const;
}
Expand Down
10 changes: 10 additions & 0 deletions yarn-project/foundation/src/collection/array.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,13 @@ export function areArraysEqual<T>(a: T[], b: T[], eq: (a: T, b: T) => boolean =
export function maxBy<T>(arr: T[], fn: (x: T) => number): T | undefined {
return arr.reduce((max, x) => (fn(x) > fn(max) ? x : max), arr[0]);
}

/** Computes the median of a numeric array. Returns undefined if array is empty. */
export function median(arr: number[]) {
if (arr.length === 0) {
return undefined;
}
const sorted = [...arr].sort((a, b) => a - b);
const mid = Math.floor(sorted.length / 2);
return sorted.length % 2 !== 0 ? sorted[mid] : (sorted[mid - 1] + sorted[mid]) / 2;
}
1 change: 1 addition & 0 deletions yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export type EnvVar =
| 'L2_QUEUE_SIZE'
| 'LOG_ELAPSED_TIME'
| 'LOG_JSON'
| 'LOG_MULTILINE'
| 'LOG_LEVEL'
| 'MNEMONIC'
| 'NETWORK_NAME'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,10 @@ export function createSafeJsonRpcClient<T extends object>(
return (schema as ApiSchema)[methodName].returnType().parse(res.result);
};

// Intercept any RPC methods with a proxy
const proxy = new Proxy(
{},
{
get: (target, method: string) => {
if (['then', 'catch'].includes(method)) {
return Reflect.get(target, method);
}
return (...params: any[]) => request(method, params);
},
},
) as T;
const proxy: any = {};
for (const method of Object.keys(schema)) {
proxy[method] = (...params: any[]) => request(method, params);
}

return proxy;
return proxy as T;
}
Loading
Loading