diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index d0a5271640d..2865e1f8649 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -218,6 +218,10 @@ export class AztecNodeService implements AztecNode { return this.blockSource; } + public getP2P(): P2P { + return this.p2pClient; + } + /** * Method to return the currently deployed L1 contract addresses. * @returns - The currently deployed L1 contract addresses. @@ -427,11 +431,12 @@ export class AztecNodeService implements AztecNode { * @returns - The pending txs. */ public getPendingTxs() { - return Promise.resolve(this.p2pClient!.getTxs('pending')); + return this.p2pClient!.getPendingTxs(); } - public getPendingTxCount() { - return Promise.resolve(this.p2pClient!.getTxs('pending').length); + public async getPendingTxCount() { + const pendingTxs = await this.getPendingTxs(); + return pendingTxs.length; } /** diff --git a/yarn-project/aztec/src/cli/cli.ts b/yarn-project/aztec/src/cli/cli.ts index 173a9ff2f46..9655f5169a7 100644 --- a/yarn-project/aztec/src/cli/cli.ts +++ b/yarn-project/aztec/src/cli/cli.ts @@ -95,7 +95,7 @@ export function injectAztecCommands(program: Command, userLog: LogFn, debugLogge await startArchiver(options, signalHandlers, services); } else if (options.p2pBootstrap) { const { startP2PBootstrap } = await import('./cmds/start_p2p_bootstrap.js'); - await startP2PBootstrap(options, userLog, debugLogger); + await startP2PBootstrap(options, signalHandlers, services, userLog); } else if (options.proverAgent) { const { startProverAgent } = await import('./cmds/start_prover_agent.js'); await startProverAgent(options, signalHandlers, services, userLog); diff --git a/yarn-project/aztec/src/cli/cmds/start_node.ts b/yarn-project/aztec/src/cli/cmds/start_node.ts index 939776c345b..2459533aa9f 100644 --- a/yarn-project/aztec/src/cli/cmds/start_node.ts +++ b/yarn-project/aztec/src/cli/cmds/start_node.ts @@ -1,5 +1,5 @@ import { aztecNodeConfigMappings } from '@aztec/aztec-node'; -import { AztecNodeApiSchema, type PXE } from '@aztec/circuit-types'; +import { AztecNodeApiSchema, P2PApiSchema, type PXE } from '@aztec/circuit-types'; import { NULL_KEY } from '@aztec/ethereum'; import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server'; import { type LogFn } from '@aztec/foundation/log'; @@ -93,8 +93,9 @@ export async function startNode( // Create and start Aztec Node const node = await createAztecNode(nodeConfig, telemetryClient); - // Add node to services list + // Add node and p2p to services list services.node = [node, AztecNodeApiSchema]; + services.p2p = [node.getP2P(), P2PApiSchema]; // Add node stop function to signal handlers signalHandlers.push(node.stop.bind(node)); diff --git a/yarn-project/aztec/src/cli/cmds/start_p2p_bootstrap.ts b/yarn-project/aztec/src/cli/cmds/start_p2p_bootstrap.ts index 4d4f7618d80..975f339aef6 100644 --- a/yarn-project/aztec/src/cli/cmds/start_p2p_bootstrap.ts +++ b/yarn-project/aztec/src/cli/cmds/start_p2p_bootstrap.ts @@ -1,7 +1,8 @@ -import { type Logger } from '@aztec/aztec.js'; -import { type LogFn } from '@aztec/foundation/log'; -import { type BootnodeConfig, bootnodeConfigMappings } from '@aztec/p2p'; -import runBootstrapNode from '@aztec/p2p-bootstrap'; +import { P2PBootstrapApiSchema } from '@aztec/circuit-types'; +import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server'; +import { type LogFn, createLogger } from '@aztec/foundation/log'; +import { createStore } from '@aztec/kv-store/lmdb'; +import { type BootnodeConfig, BootstrapNode, bootnodeConfigMappings } from '@aztec/p2p'; import { createAndStartTelemetryClient, getConfigEnvVars as getTelemetryClientConfig, @@ -9,11 +10,19 @@ import { import { extractRelevantOptions } from '../util.js'; -export const startP2PBootstrap = async (options: any, userLog: LogFn, debugLogger: Logger) => { +export async function startP2PBootstrap( + options: any, + signalHandlers: (() => Promise)[], + services: NamespacedApiHandlers, + userLog: LogFn, +) { // Start a P2P bootstrap node. const config = extractRelevantOptions(options, bootnodeConfigMappings, 'p2p'); const telemetryClient = await createAndStartTelemetryClient(getTelemetryClientConfig()); - - await runBootstrapNode(config, telemetryClient, debugLogger); + const store = await createStore('p2p-bootstrap', config, createLogger('p2p:bootstrap:store')); + const node = new BootstrapNode(store, telemetryClient); + await node.start(config); + signalHandlers.push(() => node.stop()); + services.bootstrap = [node, P2PBootstrapApiSchema]; userLog(`P2P bootstrap node started on ${config.udpListenAddress}`); -}; +} diff --git a/yarn-project/aztec/src/cli/cmds/start_prover_node.ts b/yarn-project/aztec/src/cli/cmds/start_prover_node.ts index 0d6fa266edc..ceb4cc00ff5 100644 --- a/yarn-project/aztec/src/cli/cmds/start_prover_node.ts +++ b/yarn-project/aztec/src/cli/cmds/start_prover_node.ts @@ -1,4 +1,4 @@ -import { ProverNodeApiSchema, type ProvingJobBroker, createAztecNodeClient } from '@aztec/circuit-types'; +import { P2PApiSchema, ProverNodeApiSchema, type ProvingJobBroker, createAztecNodeClient } from '@aztec/circuit-types'; import { NULL_KEY } from '@aztec/ethereum'; import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server'; import { type LogFn } from '@aztec/foundation/log'; @@ -81,12 +81,16 @@ export async function startProverNode( const proverNode = await createProverNode(proverConfig, { telemetry, broker }); services.proverNode = [proverNode, ProverNodeApiSchema]; + const p2p = proverNode.getP2P(); + if (p2p) { + services.p2p = [proverNode.getP2P(), P2PApiSchema]; + } + if (!proverConfig.proverBrokerUrl) { services.provingJobSource = [proverNode.getProver().getProvingJobSource(), ProvingJobConsumerSchema]; } signalHandlers.push(proverNode.stop.bind(proverNode)); - // Automatically start proving unproven blocks await proverNode.start(); } diff --git a/yarn-project/circuit-types/src/interfaces/index.ts b/yarn-project/circuit-types/src/interfaces/index.ts index c717ceae649..3f05c960e1b 100644 --- a/yarn-project/circuit-types/src/interfaces/index.ts +++ b/yarn-project/circuit-types/src/interfaces/index.ts @@ -21,3 +21,5 @@ export * from './service.js'; export * from './sync-status.js'; export * from './world_state.js'; export * from './prover-broker.js'; +export * from './p2p.js'; +export * from './p2p-bootstrap.js'; diff --git a/yarn-project/circuit-types/src/interfaces/p2p-bootstrap.ts b/yarn-project/circuit-types/src/interfaces/p2p-bootstrap.ts new file mode 100644 index 00000000000..8283f4da841 --- /dev/null +++ b/yarn-project/circuit-types/src/interfaces/p2p-bootstrap.ts @@ -0,0 +1,21 @@ +import { type ApiSchemaFor } from '@aztec/foundation/schemas'; + +import { z } from 'zod'; + +/** Exposed API to the P2P bootstrap node. */ +export interface P2PBootstrapApi { + /** + * Returns the ENR for this node. + */ + getEncodedEnr(): Promise; + + /** + * Returns ENRs for all nodes in the routing table. + */ + getRoutingTable(): Promise; +} + +export const P2PBootstrapApiSchema: ApiSchemaFor = { + getEncodedEnr: z.function().returns(z.string()), + getRoutingTable: z.function().returns(z.array(z.string())), +}; diff --git a/yarn-project/circuit-types/src/interfaces/p2p.test.ts b/yarn-project/circuit-types/src/interfaces/p2p.test.ts new file mode 100644 index 00000000000..d9eb0cc654a --- /dev/null +++ b/yarn-project/circuit-types/src/interfaces/p2p.test.ts @@ -0,0 +1,88 @@ +import { type JsonRpcTestContext, createJsonRpcTestSetup } from '@aztec/foundation/json-rpc/test'; + +import { BlockAttestation } from '../p2p/block_attestation.js'; +import { EpochProofQuote } from '../prover_coordination/epoch_proof_quote.js'; +import { Tx } from '../tx/tx.js'; +import { type P2PApi, P2PApiSchema, type PeerInfo } from './p2p.js'; + +describe('P2PApiSchema', () => { + let handler: MockP2P; + let context: JsonRpcTestContext; + + const tested = new Set(); + + beforeEach(async () => { + handler = new MockP2P(); + context = await createJsonRpcTestSetup(handler, P2PApiSchema); + }); + + afterEach(() => { + tested.add(/^P2PApiSchema\s+([^(]+)/.exec(expect.getState().currentTestName!)![1]); + context.httpServer.close(); + }); + + afterAll(() => { + const all = Object.keys(P2PApiSchema); + expect([...tested].sort()).toEqual(all.sort()); + }); + + it('getAttestationsForSlot', async () => { + const attestations = await context.client.getAttestationsForSlot(BigInt(1), 'proposalId'); + expect(attestations).toEqual([BlockAttestation.empty()]); + expect(attestations[0]).toBeInstanceOf(BlockAttestation); + }); + + it('getEpochProofQuotes', async () => { + const quotes = await context.client.getEpochProofQuotes(BigInt(1)); + expect(quotes).toEqual([EpochProofQuote.empty()]); + expect(quotes[0]).toBeInstanceOf(EpochProofQuote); + }); + + it('getPendingTxs', async () => { + const txs = await context.client.getPendingTxs(); + expect(txs[0]).toBeInstanceOf(Tx); + }); + + it('getEncodedEnr', async () => { + const enr = await context.client.getEncodedEnr(); + expect(enr).toEqual('enr'); + }); + + it('getPeers', async () => { + const peers = await context.client.getPeers(); + expect(peers).toEqual(peers); + }); + + it('getPeers(true)', async () => { + const peers = await context.client.getPeers(true); + expect(peers).toEqual(peers); + }); +}); + +const peers: PeerInfo[] = [ + { status: 'connected', score: 1, id: 'id' }, + { status: 'dialing', dialStatus: 'dialStatus', id: 'id', addresses: ['address'] }, + { status: 'cached', id: 'id', addresses: ['address'], enr: 'enr', dialAttempts: 1 }, +]; + +class MockP2P implements P2PApi { + getAttestationsForSlot(slot: bigint, proposalId?: string | undefined): Promise { + expect(slot).toEqual(1n); + expect(proposalId).toEqual('proposalId'); + return Promise.resolve([BlockAttestation.empty()]); + } + getEpochProofQuotes(epoch: bigint): Promise { + expect(epoch).toEqual(1n); + return Promise.resolve([EpochProofQuote.empty()]); + } + getPendingTxs(): Promise { + return Promise.resolve([Tx.random()]); + } + getEncodedEnr(): Promise { + return Promise.resolve('enr'); + } + getPeers(includePending?: boolean): Promise { + expect(includePending === undefined || includePending === true).toBeTruthy(); + return Promise.resolve(peers); + } +} diff --git a/yarn-project/circuit-types/src/interfaces/p2p.ts b/yarn-project/circuit-types/src/interfaces/p2p.ts new file mode 100644 index 00000000000..00fa526899d --- /dev/null +++ b/yarn-project/circuit-types/src/interfaces/p2p.ts @@ -0,0 +1,71 @@ +import { type ApiSchemaFor, optional, schemas } from '@aztec/foundation/schemas'; + +import { z } from 'zod'; + +import { BlockAttestation } from '../p2p/block_attestation.js'; +import { EpochProofQuote } from '../prover_coordination/epoch_proof_quote.js'; +import { Tx } from '../tx/tx.js'; + +export type PeerInfo = + | { status: 'connected'; score: number; id: string } + | { status: 'dialing'; dialStatus: string; id: string; addresses: string[] } + | { status: 'cached'; id: string; addresses: string[]; enr: string; dialAttempts: number }; + +const PeerInfoSchema = z.discriminatedUnion('status', [ + z.object({ status: z.literal('connected'), score: z.number(), id: z.string() }), + z.object({ status: z.literal('dialing'), dialStatus: z.string(), id: z.string(), addresses: z.array(z.string()) }), + z.object({ + status: z.literal('cached'), + id: z.string(), + addresses: z.array(z.string()), + enr: z.string(), + dialAttempts: z.number(), + }), +]); + +/** Exposed API to the P2P module. */ +export interface P2PApi { + /** + * Queries the Attestation pool for attestations for the given slot + * + * @param slot - the slot to query + * @param proposalId - the proposal id to query, or undefined to query all proposals for the slot + * @returns BlockAttestations + */ + getAttestationsForSlot(slot: bigint, proposalId?: string): Promise; + + /** + * Queries the EpochProofQuote pool for quotes for the given epoch + * + * @param epoch - the epoch to query + * @returns EpochProofQuotes + */ + getEpochProofQuotes(epoch: bigint): Promise; + + /** + * Returns all pending transactions in the transaction pool. + * @returns An array of Txs. + */ + getPendingTxs(): Promise; + + /** + * Returns the ENR for this node, if any. + */ + getEncodedEnr(): Promise; + + /** + * Returns info for all connected, dialing, and cached peers. + */ + getPeers(includePending?: boolean): Promise; +} + +export const P2PApiSchema: ApiSchemaFor = { + getAttestationsForSlot: z + .function() + .args(schemas.BigInt, optional(z.string())) + .returns(z.array(BlockAttestation.schema)), + getEpochProofQuotes: z.function().args(schemas.BigInt).returns(z.array(EpochProofQuote.schema)), + getPendingTxs: z.function().returns(z.array(Tx.schema)), + getEncodedEnr: z.function().returns(z.string().optional()), + getPeers: z.function().args(optional(z.boolean())).returns(z.array(PeerInfoSchema)), +}; diff --git a/yarn-project/circuit-types/src/p2p/block_attestation.ts b/yarn-project/circuit-types/src/p2p/block_attestation.ts index 04ccfdf4d52..4ac0babe690 100644 --- a/yarn-project/circuit-types/src/p2p/block_attestation.ts +++ b/yarn-project/circuit-types/src/p2p/block_attestation.ts @@ -3,8 +3,11 @@ import { keccak256, recoverAddress } from '@aztec/foundation/crypto'; import { type EthAddress } from '@aztec/foundation/eth-address'; import { Signature } from '@aztec/foundation/eth-signature'; import { type Fr } from '@aztec/foundation/fields'; +import { type ZodFor } from '@aztec/foundation/schemas'; import { BufferReader, serializeToBuffer } from '@aztec/foundation/serialize'; +import { z } from 'zod'; + import { ConsensusPayload } from './consensus_payload.js'; import { Gossipable } from './gossipable.js'; import { SignatureDomainSeperator, getHashedSignaturePayloadEthSignedMessage } from './signature_utils.js'; @@ -37,6 +40,15 @@ export class BlockAttestation extends Gossipable { super(); } + static get schema(): ZodFor { + return z + .object({ + payload: ConsensusPayload.schema, + signature: Signature.schema, + }) + .transform(obj => new BlockAttestation(obj.payload, obj.signature)); + } + override p2pMessageIdentifier(): Buffer32 { return new BlockAttestationHash(keccak256(this.signature.toBuffer())); } diff --git a/yarn-project/circuit-types/src/p2p/consensus_payload.ts b/yarn-project/circuit-types/src/p2p/consensus_payload.ts index a043a8d2010..37fedc508da 100644 --- a/yarn-project/circuit-types/src/p2p/consensus_payload.ts +++ b/yarn-project/circuit-types/src/p2p/consensus_payload.ts @@ -5,6 +5,7 @@ import { hexToBuffer } from '@aztec/foundation/string'; import { type FieldsOf } from '@aztec/foundation/types'; import { encodeAbiParameters, parseAbiParameters } from 'viem'; +import { z } from 'zod'; import { TxHash } from '../tx/tx_hash.js'; import { type Signable, type SignatureDomainSeperator } from './signature_utils.js'; @@ -21,6 +22,16 @@ export class ConsensusPayload implements Signable { public readonly txHashes: TxHash[], ) {} + static get schema() { + return z + .object({ + header: BlockHeader.schema, + archive: Fr.schema, + txHashes: z.array(TxHash.schema), + }) + .transform(obj => new ConsensusPayload(obj.header, obj.archive, obj.txHashes)); + } + static getFields(fields: FieldsOf) { return [fields.header, fields.archive, fields.txHashes] as const; } diff --git a/yarn-project/foundation/src/collection/array.ts b/yarn-project/foundation/src/collection/array.ts index 9f37779727e..b2ac6bec873 100644 --- a/yarn-project/foundation/src/collection/array.ts +++ b/yarn-project/foundation/src/collection/array.ts @@ -145,3 +145,13 @@ export function areArraysEqual(a: T[], b: T[], eq: (a: T, b: T) => boolean = export function maxBy(arr: T[], fn: (x: T) => number): T | undefined { return arr.reduce((max, x) => (fn(x) > fn(max) ? x : max), arr[0]); } + +/** Computes the median of a numeric array. Returns undefined if array is empty. */ +export function median(arr: number[]) { + if (arr.length === 0) { + return undefined; + } + const sorted = [...arr].sort((a, b) => a - b); + const mid = Math.floor(sorted.length / 2); + return sorted.length % 2 !== 0 ? sorted[mid] : (sorted[mid - 1] + sorted[mid]) / 2; +} diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 41e2d61b49f..8f6bd085fd3 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -56,6 +56,7 @@ export type EnvVar = | 'L2_QUEUE_SIZE' | 'LOG_ELAPSED_TIME' | 'LOG_JSON' + | 'LOG_MULTILINE' | 'LOG_LEVEL' | 'MNEMONIC' | 'NETWORK_NAME' diff --git a/yarn-project/foundation/src/json-rpc/client/safe_json_rpc_client.ts b/yarn-project/foundation/src/json-rpc/client/safe_json_rpc_client.ts index 8949ca3e2ea..10604259299 100644 --- a/yarn-project/foundation/src/json-rpc/client/safe_json_rpc_client.ts +++ b/yarn-project/foundation/src/json-rpc/client/safe_json_rpc_client.ts @@ -44,18 +44,10 @@ export function createSafeJsonRpcClient( return (schema as ApiSchema)[methodName].returnType().parse(res.result); }; - // Intercept any RPC methods with a proxy - const proxy = new Proxy( - {}, - { - get: (target, method: string) => { - if (['then', 'catch'].includes(method)) { - return Reflect.get(target, method); - } - return (...params: any[]) => request(method, params); - }, - }, - ) as T; + const proxy: any = {}; + for (const method of Object.keys(schema)) { + proxy[method] = (...params: any[]) => request(method, params); + } - return proxy; + return proxy as T; } diff --git a/yarn-project/foundation/src/json-rpc/test/integration.test.ts b/yarn-project/foundation/src/json-rpc/test/integration.test.ts index b48d0f8d7a5..d1611d0acde 100644 --- a/yarn-project/foundation/src/json-rpc/test/integration.test.ts +++ b/yarn-project/foundation/src/json-rpc/test/integration.test.ts @@ -118,10 +118,8 @@ describe('JsonRpc integration', () => { await expect(() => client.fail()).rejects.toThrow('Test state failed'); }); - it('fails if calls non-existing method in handler', async () => { - await expect(() => (client as TestState).forceClear()).rejects.toThrow( - 'Unspecified method forceClear in client schema', - ); + it('fails if calls non-existing method in handler', () => { + expect(() => (client as TestState).forceClear()).toThrow(/not a function/i); }); }); diff --git a/yarn-project/foundation/src/log/pino-logger.ts b/yarn-project/foundation/src/log/pino-logger.ts index 10b19086939..db1b3907584 100644 --- a/yarn-project/foundation/src/log/pino-logger.ts +++ b/yarn-project/foundation/src/log/pino-logger.ts @@ -77,7 +77,9 @@ const pinoPrettyOpts = { customLevels: 'fatal:60,error:50,warn:40,info:30,verbose:25,debug:20,trace:10', customColors: 'fatal:bgRed,error:red,warn:yellow,info:green,verbose:magenta,debug:blue,trace:gray', minimumLevel: 'trace' as const, + singleLine: !['1', 'true'].includes(process.env.LOG_MULTILINE ?? ''), }; + const prettyTransport: pino.TransportSingleOptions = { target: 'pino-pretty', options: pinoPrettyOpts, diff --git a/yarn-project/p2p/src/bootstrap/bootstrap.ts b/yarn-project/p2p/src/bootstrap/bootstrap.ts index d0d459be642..ab4f84b6b79 100644 --- a/yarn-project/p2p/src/bootstrap/bootstrap.ts +++ b/yarn-project/p2p/src/bootstrap/bootstrap.ts @@ -1,3 +1,4 @@ +import { type P2PBootstrapApi } from '@aztec/circuit-types/interfaces'; import { createLogger } from '@aztec/foundation/log'; import { type AztecKVStore } from '@aztec/kv-store'; import { OtelMetricsAdapter, type TelemetryClient } from '@aztec/telemetry-client'; @@ -14,7 +15,7 @@ import { convertToMultiaddr, createLibP2PPeerIdFromPrivateKey, getPeerIdPrivateK /** * Encapsulates a 'Bootstrap' node, used for the purpose of assisting new joiners in acquiring peers. */ -export class BootstrapNode { +export class BootstrapNode implements P2PBootstrapApi { private node?: Discv5 = undefined; private peerId?: PeerId; @@ -47,7 +48,7 @@ export class BootstrapNode { enr.setLocationMultiaddr(publicAddr); enr.set(AZTEC_ENR_KEY, Uint8Array.from([AZTEC_NET])); - this.logger.info(`Starting bootstrap node ${peerId}, listening on ${listenAddrUdp.toString()}`); + this.logger.debug(`Starting bootstrap node ${peerId} listening on ${listenAddrUdp.toString()}`); const metricsRegistry = new OtelMetricsAdapter(this.telemetry); this.node = Discv5.create({ enr, @@ -65,17 +66,15 @@ export class BootstrapNode { }); (this.node as Discv5EventEmitter).on('discovered', async (enr: SignableENR) => { const addr = await enr.getFullMultiaddr('udp'); - this.logger.verbose(`Discovered new peer, enr: ${enr.encodeTxt()}, addr: ${addr?.toString()}`); + this.logger.verbose(`Discovered new peer`, { enr: enr.encodeTxt(), addr: addr?.toString() }); }); try { await this.node.start(); - this.logger.info('Discv5 started'); + this.logger.info('Bootstrap node started', { peerId, enr: enr.encodeTxt(), addr: listenAddrUdp.toString() }); } catch (e) { this.logger.error('Error starting Discv5', e); } - - this.logger.info(`ENR: ${this.node?.enr.encodeTxt()}`); } /** @@ -84,8 +83,9 @@ export class BootstrapNode { */ public async stop() { // stop libp2p + this.logger.debug('Stopping bootstrap node'); await this.node?.stop(); - this.logger.debug('Discv5 has stopped'); + this.logger.info('Bootstrap node stopped'); } /** @@ -105,4 +105,18 @@ export class BootstrapNode { } return this.node?.enr.toENR(); } + + public getEncodedEnr() { + if (!this.node) { + throw new Error('Node not started'); + } + return Promise.resolve(this.node.enr.encodeTxt()); + } + + public getRoutingTable() { + if (!this.node) { + throw new Error('Node not started'); + } + return Promise.resolve(this.node.kadValues().map(enr => enr.encodeTxt())); + } } diff --git a/yarn-project/p2p/src/client/index.ts b/yarn-project/p2p/src/client/index.ts index 9340b9a2574..509e5d51614 100644 --- a/yarn-project/p2p/src/client/index.ts +++ b/yarn-project/p2p/src/client/index.ts @@ -35,6 +35,7 @@ export const createP2PClient = async ( } = {}, ) => { let config = { ..._config }; + const logger = createLogger('p2p'); const store = deps.store ?? (await createStore('p2p', config, createLogger('p2p:lmdb'))); const mempools: MemPools = { @@ -46,6 +47,7 @@ export const createP2PClient = async ( let p2pService; if (_config.p2pEnabled) { + logger.verbose('P2P is enabled. Using LibP2P service.'); config = await configureP2PClientAddresses(_config); // Create peer discovery service @@ -65,6 +67,7 @@ export const createP2PClient = async ( telemetry, ); } else { + logger.verbose('P2P is disabled. Using dummy P2P service'); p2pService = new DummyP2PService(); } return new P2PClient(store, l2BlockSource, mempools, p2pService, config.keepProvenTxsInPoolFor, telemetry); diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index 24a112ae839..5d926dec481 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -6,7 +6,8 @@ import { sleep } from '@aztec/foundation/sleep'; import { type AztecKVStore } from '@aztec/kv-store'; import { openTmpStore } from '@aztec/kv-store/lmdb'; -import { expect, jest } from '@jest/globals'; +import { expect } from '@jest/globals'; +import { type MockProxy, mock } from 'jest-mock-extended'; import { type EpochProofQuotePool, type P2PService } from '../index.js'; import { type AttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js'; @@ -14,59 +15,29 @@ import { type MemPools } from '../mem_pools/interface.js'; import { type TxPool } from '../mem_pools/tx_pool/index.js'; import { P2PClient } from './p2p_client.js'; -/** - * Mockify helper for testing purposes. - */ -type Mockify = { - [P in keyof T]: ReturnType; -}; - describe('In-Memory P2P Client', () => { - let txPool: Mockify; - let attestationPool: Mockify; - let epochProofQuotePool: Mockify; + let txPool: MockProxy; + let attestationPool: MockProxy; + let epochProofQuotePool: MockProxy; let mempools: MemPools; let blockSource: MockL2BlockSource; - let p2pService: Mockify; + let p2pService: MockProxy; let kvStore: AztecKVStore; let client: P2PClient; beforeEach(() => { - txPool = { - addTxs: jest.fn(), - getTxByHash: jest.fn().mockReturnValue(undefined), - deleteTxs: jest.fn(), - getAllTxs: jest.fn().mockReturnValue([]), - getAllTxHashes: jest.fn().mockReturnValue([]), - getMinedTxHashes: jest.fn().mockReturnValue([]), - getPendingTxHashes: jest.fn().mockReturnValue([]), - getTxStatus: jest.fn().mockReturnValue(undefined), - markAsMined: jest.fn(), - markMinedAsPending: jest.fn(), - }; + txPool = mock(); + txPool.getAllTxs.mockReturnValue([]); + txPool.getPendingTxHashes.mockReturnValue([]); + txPool.getMinedTxHashes.mockReturnValue([]); + txPool.getAllTxHashes.mockReturnValue([]); - p2pService = { - start: jest.fn(), - stop: jest.fn(), - propagate: jest.fn(), - registerBlockReceivedCallback: jest.fn(), - sendRequest: jest.fn(), - getEnr: jest.fn(), - }; + p2pService = mock(); - attestationPool = { - addAttestations: jest.fn(), - deleteAttestations: jest.fn(), - deleteAttestationsForSlot: jest.fn(), - deleteAttestationsOlderThan: jest.fn(), - getAttestationsForSlot: jest.fn().mockReturnValue(undefined), - }; + attestationPool = mock(); - epochProofQuotePool = { - addQuote: jest.fn(), - getQuotes: jest.fn().mockReturnValue([]), - deleteQuotesToEpoch: jest.fn(), - }; + epochProofQuotePool = mock(); + epochProofQuotePool.getQuotes.mockReturnValue([]); blockSource = new MockL2BlockSource(); blockSource.createBlocks(100); diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index fdb5f5b23a5..9884d4b4e3f 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -8,6 +8,8 @@ import { L2BlockStream, type L2BlockStreamEvent, type L2Tips, + type P2PApi, + type PeerInfo, type Tx, type TxHash, } from '@aztec/circuit-types'; @@ -54,7 +56,7 @@ export interface P2PSyncState { /** * Interface of a P2P client. **/ -export interface P2P { +export interface P2P extends P2PApi { /** * Broadcasts a block proposal to other peers. * @@ -62,15 +64,6 @@ export interface P2P { */ broadcastProposal(proposal: BlockProposal): void; - /** - * Queries the Attestation pool for attestations for the given slot - * - * @param slot - the slot to query - * @param proposalId - the proposal id to query - * @returns BlockAttestations - */ - getAttestationsForSlot(slot: bigint, proposalId: string): Promise; - /** * Queries the EpochProofQuote pool for quotes for the given epoch * @@ -122,12 +115,6 @@ export interface P2P { **/ deleteTxs(txHashes: TxHash[]): Promise; - /** - * Returns all transactions in the transaction pool. - * @returns An array of Txs. - */ - getTxs(filter: 'all' | 'pending' | 'mined'): Tx[]; - /** * Returns a transaction in the transaction pool by its hash. * @param txHash - Hash of tx to return. @@ -173,9 +160,12 @@ export interface P2P { getStatus(): Promise; /** - * Returns the ENR for this node, if any. + * Returns the ENR of this node, if any. */ getEnr(): ENR | undefined; + + /** Identifies a p2p client. */ + isP2PClient(): true; } /** @@ -245,6 +235,14 @@ export class P2PClient extends WithTracer implements P2P { this.epochProofQuotePool = mempools.epochProofQuotePool; } + public isP2PClient(): true { + return true; + } + + public getPeers(includePending?: boolean): Promise { + return Promise.resolve(this.p2pService.getPeers(includePending)); + } + public getL2BlockHash(number: number): Promise { return Promise.resolve(this.synchedBlockHashes.get(number)); } @@ -444,6 +442,10 @@ export class P2PClient extends WithTracer implements P2P { return tx; } + public getPendingTxs(): Promise { + return Promise.resolve(this.getTxs('pending')); + } + /** * Returns all transactions in the transaction pool. * @returns An array of Txs. @@ -514,6 +516,10 @@ export class P2PClient extends WithTracer implements P2P { return this.p2pService.getEnr(); } + public getEncodedEnr(): Promise { + return Promise.resolve(this.p2pService.getEnr()?.encodeTxt()); + } + /** * Deletes the 'txs' from the pool. * NOT used if we use sendTx as reconcileTxPool will handle this. @@ -709,8 +715,9 @@ export class P2PClient extends WithTracer implements P2P { * @param newState - New state value. */ private setCurrentState(newState: P2PClientState) { + const oldState = this.currentState; this.currentState = newState; - this.log.debug(`Moved to state ${P2PClientState[this.currentState]}`); + this.log.debug(`Moved from state ${P2PClientState[oldState]} to ${P2PClientState[this.currentState]}`); } private async publishStoredTxs() { diff --git a/yarn-project/p2p/src/service/discV5_service.ts b/yarn-project/p2p/src/service/discV5_service.ts index a39d58725e8..5b2ceffcd4c 100644 --- a/yarn-project/p2p/src/service/discV5_service.ts +++ b/yarn-project/p2p/src/service/discV5_service.ts @@ -5,7 +5,7 @@ import { OtelMetricsAdapter, type TelemetryClient } from '@aztec/telemetry-clien import { Discv5, type Discv5EventEmitter } from '@chainsafe/discv5'; import { ENR, SignableENR } from '@chainsafe/enr'; import type { PeerId } from '@libp2p/interface'; -import { multiaddr } from '@multiformats/multiaddr'; +import { type Multiaddr, multiaddr } from '@multiformats/multiaddr'; import EventEmitter from 'events'; import type { P2PConfig } from '../config.js'; @@ -35,6 +35,9 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService /** This instance's ENR */ private enr: SignableENR; + /** UDP listen addr */ + private listenMultiAddrUdp: Multiaddr; + private currentState = PeerDiscoveryState.STOPPED; private bootstrapNodes: string[]; @@ -66,7 +69,7 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService `${convertToMultiaddr(udpAnnounceAddress || tcpAnnounceAddress, 'udp')}/p2p/${peerId.toString()}`, ); - const listenMultiAddrUdp = multiaddr(convertToMultiaddr(udpListenAddress, 'udp')); + this.listenMultiAddrUdp = multiaddr(convertToMultiaddr(udpListenAddress, 'udp')); // set location multiaddr in ENR record this.enr.setLocationMultiaddr(multiAddrUdp); @@ -76,7 +79,7 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService this.discv5 = Discv5.create({ enr: this.enr, peerId, - bindAddrs: { ip4: listenMultiAddrUdp }, + bindAddrs: { ip4: this.listenMultiAddrUdp }, config: { lookupTimeout: 2000, requestTimeout: 2000, @@ -85,14 +88,11 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService metricsRegistry, }); - this.logger.info(`ENR NodeId: ${this.enr.nodeId}`); - this.logger.info(`ENR UDP: ${multiAddrUdp.toString()}`); - (this.discv5 as Discv5EventEmitter).on('discovered', (enr: ENR) => this.onDiscovered(enr)); (this.discv5 as Discv5EventEmitter).on('enrAdded', async (enr: ENR) => { const multiAddrTcp = await enr.getFullMultiaddr('tcp'); const multiAddrUdp = await enr.getFullMultiaddr('udp'); - this.logger.debug(`ENR multiaddr: ${multiAddrTcp?.toString()}, ${multiAddrUdp?.toString()}`); + this.logger.debug(`Added ENR ${enr.encodeTxt()}`, { multiAddrTcp, multiAddrUdp, nodeId: enr.nodeId }); this.onDiscovered(enr); }); } @@ -101,18 +101,23 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService if (this.currentState === PeerDiscoveryState.RUNNING) { throw new Error('DiscV5Service already started'); } - this.logger.info('Starting DiscV5'); + this.logger.debug('Starting DiscV5'); await this.discv5.start(); this.startTime = Date.now(); - this.logger.info('DiscV5 started'); + this.logger.info(`DiscV5 service started`, { + nodeId: this.enr.nodeId, + peerId: this.peerId, + enrUdp: await this.enr.getFullMultiaddr('udp'), + enrTcp: await this.enr.getFullMultiaddr('tcp'), + }); this.currentState = PeerDiscoveryState.RUNNING; // Add bootnode ENR if provided if (this.bootstrapNodes?.length) { // Do this conversion once since it involves an async function call this.bootstrapNodePeerIds = await Promise.all(this.bootstrapNodes.map(enr => ENR.decodeTxt(enr).peerId())); - this.logger.info(`Adding bootstrap ENRs: ${this.bootstrapNodes.join(', ')}`); + this.logger.info(`Adding bootstrap nodes ENRs: ${this.bootstrapNodes.join(', ')}`); try { this.bootstrapNodes.forEach(enr => { this.discv5.addEnr(enr); diff --git a/yarn-project/p2p/src/service/dummy_service.ts b/yarn-project/p2p/src/service/dummy_service.ts index be5b82d9c8e..e108cd47ac6 100644 --- a/yarn-project/p2p/src/service/dummy_service.ts +++ b/yarn-project/p2p/src/service/dummy_service.ts @@ -1,4 +1,4 @@ -import type { BlockAttestation, BlockProposal, Gossipable, TxHash } from '@aztec/circuit-types'; +import type { BlockAttestation, BlockProposal, Gossipable, PeerInfo, TxHash } from '@aztec/circuit-types'; import type { PeerId } from '@libp2p/interface'; import EventEmitter from 'events'; @@ -10,6 +10,11 @@ import { type P2PService, type PeerDiscoveryService, PeerDiscoveryState } from ' * A dummy implementation of the P2P Service. */ export class DummyP2PService implements P2PService { + /** Returns an empty array for peers. */ + getPeers(): PeerInfo[] { + return []; + } + /** * Starts the dummy implementation. * @returns A resolved promise. diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index 8ee0e1789cb..b0f10aec8bf 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -6,6 +6,7 @@ import { type Gossipable, type L2BlockSource, MerkleTreeId, + type PeerInfo, type RawGossipMessage, TopicType, TopicTypeMap, @@ -117,20 +118,17 @@ export class LibP2PService extends WithTracer implements P2PService { throw new Error('P2P service already started'); } - // Log listen & announce addresses + // Get listen & announce addresses for logging const { tcpListenAddress, tcpAnnounceAddress } = this.config; - this.logger.info(`Starting P2P node on ${tcpListenAddress}`); if (!tcpAnnounceAddress) { throw new Error('Announce address not provided.'); } const announceTcpMultiaddr = convertToMultiaddr(tcpAnnounceAddress, 'tcp'); - this.logger.info(`Announcing at ${announceTcpMultiaddr}`); // Start job queue, peer discovery service and libp2p node this.jobQueue.start(); await this.peerDiscoveryService.start(); await this.node.start(); - this.logger.info(`Started P2P client with Peer ID ${this.node.peerId.toString()}`); // Subscribe to standard GossipSub topics by default for (const topic in TopicType) { @@ -157,6 +155,11 @@ export class LibP2PService extends WithTracer implements P2PService { [TX_REQ_PROTOCOL]: this.validateRequestedTx.bind(this), }; await this.reqresp.start(this.requestResponseHandlers, reqrespSubProtocolValidators); + this.logger.info(`Started P2P service`, { + listen: tcpListenAddress, + announce: announceTcpMultiaddr, + peerId: this.node.peerId.toString(), + }); } /** @@ -175,7 +178,6 @@ export class LibP2PService extends WithTracer implements P2PService { this.logger.debug('Stopping LibP2P...'); await this.stopLibP2P(); this.logger.info('LibP2P service stopped'); - this.logger.debug('Stopping request response service...'); } /** @@ -309,6 +311,10 @@ export class LibP2PService extends WithTracer implements P2PService { ); } + public getPeers(includePending?: boolean): PeerInfo[] { + return this.peerManager.getPeers(includePending); + } + /** * Send Request via the ReqResp service * The subprotocol defined will determine the request and response types @@ -583,10 +589,10 @@ export class LibP2PService extends WithTracer implements P2PService { const parent = message.constructor as typeof Gossipable; const identifier = message.p2pMessageIdentifier().toString(); - this.logger.verbose(`[${identifier}] sending`); + this.logger.trace(`Sending message ${identifier}`); const recipientsNum = await this.publishToTopic(parent.p2pTopic, message.toBuffer()); - this.logger.verbose(`[${identifier}] sent to ${recipientsNum} peers`); + this.logger.debug(`Sent message ${identifier} to ${recipientsNum} peers`); } // Libp2p seems to hang sometimes if new peers are initiating connections. @@ -597,7 +603,7 @@ export class LibP2PService extends WithTracer implements P2PService { }); try { await Promise.race([this.node.stop(), timeout]); - this.logger.debug('Libp2p stopped'); + this.logger.debug('LibP2P stopped'); } catch (error) { this.logger.error('Error during stop or timeout:', error); } diff --git a/yarn-project/p2p/src/service/peer_manager.ts b/yarn-project/p2p/src/service/peer_manager.ts index b413db5d59c..7f12a4679bc 100644 --- a/yarn-project/p2p/src/service/peer_manager.ts +++ b/yarn-project/p2p/src/service/peer_manager.ts @@ -1,8 +1,10 @@ +import { type PeerInfo } from '@aztec/circuit-types'; import { createLogger } from '@aztec/foundation/log'; import { type ENR } from '@chainsafe/enr'; import { type PeerId } from '@libp2p/interface'; import { type Multiaddr } from '@multiformats/multiaddr'; +import { inspect } from 'util'; import { type P2PConfig } from '../config.js'; import { type PubSubLibp2p } from '../util.js'; @@ -22,21 +24,22 @@ type CachedPeer = { export class PeerManager { private cachedPeers: Map = new Map(); private peerScoring: PeerScoring; + private heartbeatCounter: number = 0; constructor( private libP2PNode: PubSubLibp2p, private peerDiscoveryService: PeerDiscoveryService, private config: P2PConfig, - private logger = createLogger('p2p:peer_manager'), + private logger = createLogger('p2p:peer-manager'), ) { this.peerScoring = new PeerScoring(config); // Handle new established connections this.libP2PNode.addEventListener('peer:connect', evt => { const peerId = evt.detail; if (this.peerDiscoveryService.isBootstrapPeer(peerId)) { - this.logger.debug(`Connected to bootstrap peer ${peerId.toString()}`); + this.logger.verbose(`Connected to bootstrap peer ${peerId.toString()}`); } else { - this.logger.debug(`Connected to transaction peer ${peerId.toString()}`); + this.logger.verbose(`Connected to transaction peer ${peerId.toString()}`); } }); @@ -44,9 +47,9 @@ export class PeerManager { this.libP2PNode.addEventListener('peer:disconnect', evt => { const peerId = evt.detail; if (this.peerDiscoveryService.isBootstrapPeer(peerId)) { - this.logger.debug(`Disconnected from bootstrap peer ${peerId.toString()}`); + this.logger.verbose(`Disconnected from bootstrap peer ${peerId.toString()}`); } else { - this.logger.debug(`Disconnected from transaction peer ${peerId.toString()}`); + this.logger.verbose(`Disconnected from transaction peer ${peerId.toString()}`); } }); @@ -57,6 +60,7 @@ export class PeerManager { } public heartbeat() { + this.heartbeatCounter++; this.discover(); this.peerScoring.decayAllScores(); } @@ -64,13 +68,47 @@ export class PeerManager { public penalizePeer(peerId: PeerId, penalty: PeerErrorSeverity) { const id = peerId.toString(); const penaltyValue = this.peerScoring.peerPenalties[penalty]; - this.peerScoring.updateScore(id, -penaltyValue); + const newScore = this.peerScoring.updateScore(id, -penaltyValue); + this.logger.verbose(`Penalizing peer ${id} with ${penalty} (new score is ${newScore})`); } public getPeerScore(peerId: string): number { return this.peerScoring.getScore(peerId); } + public getPeers(includePending = false): PeerInfo[] { + const connected = this.libP2PNode + .getPeers() + .map(peer => ({ id: peer.toString(), score: this.getPeerScore(peer.toString()), status: 'connected' as const })); + + if (!includePending) { + return connected; + } + + const dialQueue = this.libP2PNode + .getDialQueue() + .filter(peer => !!peer.peerId) + .map(peer => ({ + id: peer.peerId!.toString(), + status: 'dialing' as const, + dialStatus: peer.status, + addresses: peer.multiaddrs.map(m => m.toString()), + })); + + const cachedPeers = Array.from(this.cachedPeers.values()) + .filter(peer => !dialQueue.some(dialPeer => dialPeer.id && peer.peerId.toString() === dialPeer.id.toString())) + .filter(peer => !connected.some(connPeer => connPeer.id.toString() === peer.peerId.toString())) + .map(peer => ({ + status: 'cached' as const, + id: peer.peerId.toString(), + addresses: [peer.multiaddrTcp.toString()], + dialAttempts: peer.dialAttempts, + enr: peer.enr.encodeTxt(), + })); + + return [...connected, ...dialQueue, ...cachedPeers]; + } + /** * Discovers peers. */ @@ -81,9 +119,13 @@ export class PeerManager { // Calculate how many connections we're looking to make const peersToConnect = this.config.maxPeerCount - connections.length; - this.logger.debug( - `Connections: ${connections.length}, Peers to connect: ${peersToConnect}, maxPeerCount: ${this.config.maxPeerCount}, cachedPeers: ${this.cachedPeers.size}`, - ); + const logLevel = this.heartbeatCounter % 60 === 0 ? 'info' : 'debug'; + this.logger[logLevel](`Connected to ${connections.length} peers`, { + connections: connections.length, + maxPeerCount: this.config.maxPeerCount, + cachedPeers: this.cachedPeers.size, + ...this.peerScoring.getStats(), + }); // Exit if no peers to connect if (peersToConnect <= 0) { @@ -119,7 +161,7 @@ export class PeerManager { // if we need more peers, start randomNodesQuery if (peersToConnect > 0) { - this.logger.debug('Running random nodes query'); + this.logger.trace(`Running random nodes query to connect to ${peersToConnect} peers`); void this.peerDiscoveryService.runRandomNodesQuery(); } } @@ -134,23 +176,25 @@ export class PeerManager { // check if peer is already connected const [peerId, multiaddrTcp] = await Promise.all([enr.peerId(), enr.getFullMultiaddr('tcp')]); - this.logger.debug(`Handling discovered peer ${peerId.toString()}, ${multiaddrTcp?.toString()}`); + this.logger.trace( + `Handling discovered peer ${peerId.toString()} at ${multiaddrTcp?.toString() ?? 'undefined address'}`, + ); // throw if no tcp addr in multiaddr if (!multiaddrTcp) { - this.logger.debug(`No TCP address in discovered node's multiaddr: ${enr.toString()}`); + this.logger.debug(`No TCP address in discovered node's multiaddr ${enr.encodeTxt()}`); return; } const connections = this.libP2PNode.getConnections(); if (connections.some(conn => conn.remotePeer.equals(peerId))) { - this.logger.debug(`Already connected to peer ${peerId.toString()}`); + this.logger.trace(`Already connected to peer ${peerId.toString()}`); return; } // check if peer is already in cache const id = peerId.toString(); if (this.cachedPeers.has(id)) { - this.logger.debug(`Already in cache ${id}`); + this.logger.trace(`Peer already in cache ${id}`); return; } @@ -164,10 +208,9 @@ export class PeerManager { // Determine if we should dial immediately or not if (this.shouldDialPeer()) { - this.logger.debug(`Dialing peer ${id}`); void this.dialPeer(cachedPeer); } else { - this.logger.debug(`Caching peer ${id}`); + this.logger.trace(`Caching peer ${id}`); this.cachedPeers.set(id, cachedPeer); // Prune set of cached peers this.pruneCachedPeers(); @@ -181,12 +224,13 @@ export class PeerManager { this.logger.debug(`Dialing peer ${id}`); try { await this.libP2PNode.dial(peer.multiaddrTcp); - } catch { - this.logger.debug(`Failed to dial peer ${id}`); + } catch (error) { peer.dialAttempts++; if (peer.dialAttempts < MAX_DIAL_ATTEMPTS) { + this.logger.trace(`Failed to dial peer ${id} (attempt ${peer.dialAttempts})`, { error: inspect(error) }); this.cachedPeers.set(id, peer); } else { + this.logger.debug(`Failed to dial peer ${id} (dropping)`, { error: inspect(error) }); this.cachedPeers.delete(id); } } @@ -194,9 +238,10 @@ export class PeerManager { private shouldDialPeer(): boolean { const connections = this.libP2PNode.getConnections().length; - this.logger.debug(`Connections: ${connections}, maxPeerCount: ${this.config.maxPeerCount}`); if (connections >= this.config.maxPeerCount) { - this.logger.debug('Not dialing peer, maxPeerCount reached'); + this.logger.trace( + `Not dialing peer due to max peer count of ${this.config.maxPeerCount} reached (${connections} current connections)`, + ); return false; } return true; @@ -211,6 +256,7 @@ export class PeerManager { // Remove the oldest peers for (const key of this.cachedPeers.keys()) { this.cachedPeers.delete(key); + this.logger.trace(`Pruning peer ${key} from cache`); peersToDelete--; if (peersToDelete <= 0) { break; diff --git a/yarn-project/p2p/src/service/peer_scoring.ts b/yarn-project/p2p/src/service/peer_scoring.ts index d59cb10b182..896eb0a69b6 100644 --- a/yarn-project/p2p/src/service/peer_scoring.ts +++ b/yarn-project/p2p/src/service/peer_scoring.ts @@ -1,3 +1,5 @@ +import { median } from '@aztec/foundation/collection'; + import { type P2PConfig } from '../config.js'; export enum PeerErrorSeverity { @@ -43,7 +45,7 @@ export class PeerScoring { }; } - updateScore(peerId: string, scoreDelta: number): void { + updateScore(peerId: string, scoreDelta: number): number { const currentTime = Date.now(); const lastUpdate = this.lastUpdateTime.get(peerId) || currentTime; const timePassed = currentTime - lastUpdate; @@ -59,6 +61,7 @@ export class PeerScoring { this.scores.set(peerId, currentScore); this.lastUpdateTime.set(peerId, currentTime); + return currentScore; } decayAllScores(): void { @@ -78,4 +81,8 @@ export class PeerScoring { getScore(peerId: string): number { return this.scores.get(peerId) || 0; } + + getStats(): { median: number } { + return { median: median(Array.from(this.scores.values())) ?? 0 }; + } } diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts b/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts index 67436bad987..739da0d5a09 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts @@ -223,7 +223,7 @@ describe('Req Resp p2p client integration', () => { // We want to create a set of nodes and request transaction from them const clients = await createClients(NUMBER_OF_PEERS, /*valid proofs*/ false); const [client1, client2] = clients; - const client2PeerId = (await client2.getEnr()?.peerId())!; + const client2PeerId = await client2.getEnr()!.peerId(); // Give the nodes time to discover each other await sleep(6000); diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts index 349b3a8f6b5..f7091c2d5eb 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts @@ -4,7 +4,7 @@ import { sleep } from '@aztec/foundation/sleep'; import { describe, expect, it, jest } from '@jest/globals'; import { type MockProxy, mock } from 'jest-mock-extended'; -import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js'; +import { CollectiveReqRespTimeoutError } from '../../errors/reqresp.error.js'; import { MOCK_SUB_PROTOCOL_HANDLERS, MOCK_SUB_PROTOCOL_VALIDATORS, @@ -114,7 +114,7 @@ describe('ReqResp', () => { expect(loggerSpy).toHaveBeenCalledWith(errorMessage); }); - describe('TX REQ PROTOCOL', () => { + describe('Tx req protocol', () => { it('Can request a Tx from TxHash', async () => { const tx = mockTx(); const txHash = tx.getTxHash(); @@ -181,10 +181,12 @@ describe('ReqResp', () => { expect(res).toBeUndefined(); // Make sure the error message is logged - const errorMessage = `${ - new IndiviualReqRespTimeoutError().message - } | peerId: ${nodes[1].p2p.peerId.toString()} | subProtocol: ${TX_REQ_PROTOCOL}`; - expect(loggerSpy).toHaveBeenCalledWith(errorMessage); + const peerId = nodes[1].p2p.peerId.toString(); + expect(loggerSpy).toHaveBeenCalledWith( + expect.stringMatching(/Error sending request to peer/i), + expect.any(Error), + { peerId, subProtocol: '/aztec/req/tx/0.1.0' }, + ); // Expect the peer to be penalized for timing out expect(peerManager.penalizePeer).toHaveBeenCalledWith( diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.ts b/yarn-project/p2p/src/service/reqresp/reqresp.ts index 0512c048f83..86aea3ee781 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.ts @@ -210,7 +210,7 @@ export class ReqResp { return result; } catch (e: any) { - this.logger.error(`${e.message} | peerId: ${peerId.toString()} | subProtocol: ${subProtocol}`); + this.logger.error(`Error sending request to peer`, e, { peerId: peerId.toString(), subProtocol }); this.peerManager.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError); } finally { if (stream) { diff --git a/yarn-project/p2p/src/service/service.ts b/yarn-project/p2p/src/service/service.ts index ce486e0b2bb..d668d6ca441 100644 --- a/yarn-project/p2p/src/service/service.ts +++ b/yarn-project/p2p/src/service/service.ts @@ -1,4 +1,4 @@ -import type { BlockAttestation, BlockProposal, Gossipable } from '@aztec/circuit-types'; +import type { BlockAttestation, BlockProposal, Gossipable, PeerInfo } from '@aztec/circuit-types'; import type { ENR } from '@chainsafe/enr'; import type { PeerId } from '@libp2p/interface'; @@ -49,6 +49,8 @@ export interface P2PService { registerBlockReceivedCallback(callback: (block: BlockProposal) => Promise): void; getEnr(): ENR | undefined; + + getPeers(includePending?: boolean): PeerInfo[]; } /** diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index 51cad0f1227..b02b1d45fd1 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -18,6 +18,7 @@ import { compact } from '@aztec/foundation/collection'; import { sha256 } from '@aztec/foundation/crypto'; import { createLogger } from '@aztec/foundation/log'; import { type Maybe } from '@aztec/foundation/types'; +import { type P2P } from '@aztec/p2p'; import { type L1Publisher } from '@aztec/sequencer-client'; import { PublicProcessorFactory } from '@aztec/simulator'; import { type TelemetryClient } from '@aztec/telemetry-client'; @@ -78,6 +79,14 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr this.metrics = new ProverNodeMetrics(telemetryClient, 'ProverNode'); } + public getP2P() { + const asP2PClient = this.coordination as P2P; + if (typeof asP2PClient.isP2PClient === 'function' && asP2PClient.isP2PClient()) { + return asP2PClient; + } + return undefined; + } + async handleClaim(proofClaim: EpochProofClaim): Promise { if (proofClaim.epochToProve === this.latestEpochWeAreProving) { this.log.verbose(`Already proving claim for epoch ${proofClaim.epochToProve}`); diff --git a/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts b/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts index 5acbbd261f6..aeb73d79725 100644 --- a/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts +++ b/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts @@ -208,7 +208,7 @@ describe('sequencer', () => { tx.data.constants.txContext.chainId = chainId; const txHash = tx.getTxHash(); - p2p.getTxs.mockReturnValueOnce([tx]); + p2p.getPendingTxs.mockResolvedValueOnce([tx]); blockBuilder.setBlockCompleted.mockResolvedValue(block); publisher.proposeL2Block.mockResolvedValueOnce(true); @@ -240,7 +240,7 @@ describe('sequencer', () => { const tx = mockTxForRollup(); tx.data.constants.txContext.chainId = chainId; - p2p.getTxs.mockReturnValueOnce([tx]); + p2p.getPendingTxs.mockResolvedValueOnce([tx]); blockBuilder.setBlockCompleted.mockResolvedValue(block); publisher.proposeL2Block.mockResolvedValueOnce(true); @@ -262,7 +262,7 @@ describe('sequencer', () => { tx.data.constants.txContext.chainId = chainId; const txHash = tx.getTxHash(); - p2p.getTxs.mockReturnValue([tx]); + p2p.getPendingTxs.mockResolvedValue([tx]); blockBuilder.setBlockCompleted.mockResolvedValue(block); publisher.proposeL2Block.mockResolvedValueOnce(true); @@ -307,7 +307,7 @@ describe('sequencer', () => { const doubleSpendTx = txs[doubleSpendTxIndex]; - p2p.getTxs.mockReturnValueOnce(txs); + p2p.getPendingTxs.mockResolvedValueOnce(txs); blockBuilder.setBlockCompleted.mockResolvedValue(block); publisher.proposeL2Block.mockResolvedValueOnce(true); @@ -341,7 +341,7 @@ describe('sequencer', () => { const invalidChainTx = txs[invalidChainTxIndex]; const validTxHashes = txs.filter((_, i) => i !== invalidChainTxIndex).map(tx => tx.getTxHash()); - p2p.getTxs.mockReturnValueOnce(txs); + p2p.getPendingTxs.mockResolvedValueOnce(txs); blockBuilder.setBlockCompleted.mockResolvedValue(block); publisher.proposeL2Block.mockResolvedValueOnce(true); @@ -370,7 +370,7 @@ describe('sequencer', () => { }); const validTxHashes = txs.filter((_, i) => i !== invalidTransactionIndex).map(tx => tx.getTxHash()); - p2p.getTxs.mockReturnValueOnce(txs); + p2p.getPendingTxs.mockResolvedValueOnce(txs); blockBuilder.setBlockCompleted.mockResolvedValue(block); publisher.proposeL2Block.mockResolvedValueOnce(true); @@ -407,19 +407,19 @@ describe('sequencer', () => { sequencer.updateConfig({ minTxsPerBlock: 4 }); // block is not built with 0 txs - p2p.getTxs.mockReturnValueOnce([]); - //p2p.getTxs.mockReturnValueOnce(txs.slice(0, 4)); + p2p.getPendingTxs.mockResolvedValueOnce([]); + //p2p.getPendingTxs.mockResolvedValueOnce(txs.slice(0, 4)); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(0); // block is not built with 3 txs - p2p.getTxs.mockReturnValueOnce(txs.slice(0, 3)); + p2p.getPendingTxs.mockResolvedValueOnce(txs.slice(0, 3)); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(0); // block is built with 4 txs - p2p.getTxs.mockReturnValueOnce(txs.slice(0, 4)); + p2p.getPendingTxs.mockResolvedValueOnce(txs.slice(0, 4)); const txHashes = txs.slice(0, 4).map(tx => tx.getTxHash()); await sequencer.doRealWork(); @@ -448,12 +448,12 @@ describe('sequencer', () => { sequencer.updateConfig({ minTxsPerBlock: 4 }); // block is not built with 0 txs - p2p.getTxs.mockReturnValueOnce([]); + p2p.getPendingTxs.mockResolvedValueOnce([]); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(0); // block is not built with 3 txs - p2p.getTxs.mockReturnValueOnce(txs.slice(0, 3)); + p2p.getPendingTxs.mockResolvedValueOnce(txs.slice(0, 3)); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(0); @@ -461,7 +461,7 @@ describe('sequencer', () => { sequencer.flush(); // block is built with 0 txs - p2p.getTxs.mockReturnValueOnce([]); + p2p.getPendingTxs.mockResolvedValueOnce([]); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(1); expect(blockBuilder.startNewBlock).toHaveBeenCalledWith( @@ -489,12 +489,12 @@ describe('sequencer', () => { sequencer.updateConfig({ minTxsPerBlock: 4 }); // block is not built with 0 txs - p2p.getTxs.mockReturnValueOnce([]); + p2p.getPendingTxs.mockResolvedValueOnce([]); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(0); // block is not built with 3 txs - p2p.getTxs.mockReturnValueOnce(txs.slice(0, 3)); + p2p.getPendingTxs.mockResolvedValueOnce(txs.slice(0, 3)); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(0); @@ -503,7 +503,7 @@ describe('sequencer', () => { // block is built with 3 txs const postFlushTxs = txs.slice(0, 3); - p2p.getTxs.mockReturnValueOnce(postFlushTxs); + p2p.getPendingTxs.mockResolvedValueOnce(postFlushTxs); const postFlushTxHashes = postFlushTxs.map(tx => tx.getTxHash()); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(1); @@ -521,7 +521,7 @@ describe('sequencer', () => { const tx = mockTxForRollup(); tx.data.constants.txContext.chainId = chainId; - p2p.getTxs.mockReturnValueOnce([tx]); + p2p.getPendingTxs.mockResolvedValueOnce([tx]); blockBuilder.setBlockCompleted.mockResolvedValue(block); publisher.proposeL2Block.mockResolvedValueOnce(true); @@ -597,7 +597,7 @@ describe('sequencer', () => { tx.data.constants.txContext.chainId = chainId; txHash = tx.getTxHash(); - p2p.getTxs.mockReturnValue([tx]); + p2p.getPendingTxs.mockResolvedValue([tx]); blockBuilder.setBlockCompleted.mockResolvedValue(block); }; diff --git a/yarn-project/sequencer-client/src/sequencer/sequencer.ts b/yarn-project/sequencer-client/src/sequencer/sequencer.ts index 618984a69ce..f1df58e18b0 100644 --- a/yarn-project/sequencer-client/src/sequencer/sequencer.ts +++ b/yarn-project/sequencer-client/src/sequencer/sequencer.ts @@ -289,7 +289,7 @@ export class Sequencer { this.setState(SequencerState.WAITING_FOR_TXS, slot); // Get txs to build the new block. - const pendingTxs = this.p2pClient.getTxs('pending'); + const pendingTxs = await this.p2pClient.getPendingTxs(); if (!this.shouldProposeBlock(historicalHeader, { pendingTxsCount: pendingTxs.length })) { return;