diff --git a/yarn-project/aztec/terraform/node/main.tf b/yarn-project/aztec/terraform/node/main.tf index 71963fd8074..ea9a4b8454a 100644 --- a/yarn-project/aztec/terraform/node/main.tf +++ b/yarn-project/aztec/terraform/node/main.tf @@ -324,6 +324,10 @@ resource "aws_ecs_task_definition" "aztec-node" { name = "P2P_PEER_CHECK_INTERVAL_MS" value = "2000" }, + { + name = "P2P_TX_POOL_KEEP_PROVEN_FOR", + value = tostring(var.P2P_TX_POOL_KEEP_PROVEN_FOR) + }, { name = "PROVER_AGENTS" value = "0" diff --git a/yarn-project/aztec/terraform/node/variables.tf b/yarn-project/aztec/terraform/node/variables.tf index a9a69fff3c5..620157e8c25 100644 --- a/yarn-project/aztec/terraform/node/variables.tf +++ b/yarn-project/aztec/terraform/node/variables.tf @@ -72,6 +72,11 @@ variable "P2P_ENABLED" { default = true } +variable "P2P_TX_POOL_KEEP_PROVEN_FOR" { + type = number + default = 64 +} + variable "PROVING_ENABLED" { type = bool default = false diff --git a/yarn-project/p2p/src/client/index.ts b/yarn-project/p2p/src/client/index.ts index 2bde5c4c1e7..86b5e2a8672 100644 --- a/yarn-project/p2p/src/client/index.ts +++ b/yarn-project/p2p/src/client/index.ts @@ -63,5 +63,5 @@ export const createP2PClient = async ( } else { p2pService = new DummyP2PService(); } - return new P2PClient(store, l2BlockSource, txPool, p2pService); + return new P2PClient(store, l2BlockSource, txPool, p2pService, config.keepProvenTxsInPoolFor); }; diff --git a/yarn-project/p2p/src/client/mocks.ts b/yarn-project/p2p/src/client/mocks.ts index 4b8508811d3..744fb7dd2d8 100644 --- a/yarn-project/p2p/src/client/mocks.ts +++ b/yarn-project/p2p/src/client/mocks.ts @@ -8,14 +8,23 @@ export class MockBlockSource implements L2BlockSource { private l2Blocks: L2Block[] = []; private txEffects: TxEffect[] = []; - constructor(private numBlocks = 100) { - for (let i = 0; i < this.numBlocks; i++) { - const block = L2Block.random(i); + constructor(numBlocks = 100, private provenBlockNumber?: number) { + this.addBlocks(numBlocks); + } + + public addBlocks(numBlocks: number) { + for (let i = 0; i < numBlocks; i++) { + const blockNum = this.l2Blocks.length; + const block = L2Block.random(blockNum); this.l2Blocks.push(block); this.txEffects.push(...block.body.txEffects); } } + public setProvenBlockNumber(provenBlockNumber: number) { + this.provenBlockNumber = provenBlockNumber; + } + /** * Method to fetch the rollup contract address at the base-layer. * @returns The rollup address. @@ -40,8 +49,8 @@ export class MockBlockSource implements L2BlockSource { return Promise.resolve(this.l2Blocks.length - 1); } - public getProvenBlockNumber(): Promise { - return this.getBlockNumber(); + public async getProvenBlockNumber(): Promise { + return this.provenBlockNumber ?? (await this.getBlockNumber()); } /** @@ -59,8 +68,12 @@ export class MockBlockSource implements L2BlockSource { * @param limit - The maximum number of blocks to return. * @returns The requested mocked L2 blocks. */ - public getBlocks(from: number, limit: number) { - return Promise.resolve(this.l2Blocks.slice(from, from + limit)); + public getBlocks(from: number, limit: number, proven?: boolean) { + return Promise.resolve( + this.l2Blocks + .slice(from, from + limit) + .filter(b => !proven || this.provenBlockNumber === undefined || b.number <= this.provenBlockNumber), + ); } /** diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index 0e51054cf03..d064264d4c3 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -1,4 +1,5 @@ -import { type L2BlockSource, mockTx } from '@aztec/circuit-types'; +import { mockTx } from '@aztec/circuit-types'; +import { retryUntil } from '@aztec/foundation/retry'; import { type AztecKVStore } from '@aztec/kv-store'; import { openTmpStore } from '@aztec/kv-store/utils'; @@ -18,7 +19,7 @@ type Mockify = { describe('In-Memory P2P Client', () => { let txPool: Mockify; - let blockSource: L2BlockSource; + let blockSource: MockBlockSource; let p2pService: Mockify; let kvStore: AztecKVStore; let client: P2PClient; @@ -45,9 +46,14 @@ describe('In-Memory P2P Client', () => { blockSource = new MockBlockSource(); kvStore = openTmpStore(); - client = new P2PClient(kvStore, blockSource, txPool, p2pService); + client = new P2PClient(kvStore, blockSource, txPool, p2pService, 0); }); + const advanceToProvenBlock = async (provenBlockNum: number) => { + blockSource.setProvenBlockNumber(provenBlockNum); + await retryUntil(() => Promise.resolve(client.getSyncedProvenBlockNum() >= provenBlockNum), 'synced', 10, 0.1); + }; + it('can start & stop', async () => { expect(await client.isReady()).toEqual(false); @@ -98,7 +104,34 @@ describe('In-Memory P2P Client', () => { await client.start(); await client.stop(); - const client2 = new P2PClient(kvStore, blockSource, txPool, p2pService); + const client2 = new P2PClient(kvStore, blockSource, txPool, p2pService, 0); expect(client2.getSyncedLatestBlockNum()).toEqual(client.getSyncedLatestBlockNum()); }); + + it('deletes txs once block is proven', async () => { + blockSource.setProvenBlockNumber(0); + await client.start(); + expect(txPool.deleteTxs).not.toHaveBeenCalled(); + + await advanceToProvenBlock(5); + expect(txPool.deleteTxs).toHaveBeenCalledTimes(5); + await client.stop(); + }); + + it('deletes txs after waiting the set number of blocks', async () => { + client = new P2PClient(kvStore, blockSource, txPool, p2pService, 10); + blockSource.setProvenBlockNumber(0); + await client.start(); + expect(txPool.deleteTxs).not.toHaveBeenCalled(); + + await advanceToProvenBlock(5); + expect(txPool.deleteTxs).not.toHaveBeenCalled(); + + await advanceToProvenBlock(12); + expect(txPool.deleteTxs).toHaveBeenCalledTimes(2); + + await advanceToProvenBlock(20); + expect(txPool.deleteTxs).toHaveBeenCalledTimes(10); + await client.stop(); + }); }); diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index d4b74da2eae..89fee2b4898 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -123,6 +123,7 @@ export class P2PClient implements P2P { * @param l2BlockSource - P2P client's source for fetching existing blocks. * @param txPool - The client's instance of a transaction pool. Defaults to in-memory implementation. * @param p2pService - The concrete instance of p2p networking to use. + * @param keepProvenTxsFor - How many blocks have to pass after a block is proven before its txs are deleted (zero to delete immediately once proven). * @param log - A logger. */ constructor( @@ -130,6 +131,7 @@ export class P2PClient implements P2P { private l2BlockSource: L2BlockSource, private txPool: TxPool, private p2pService: P2PService, + private keepProvenTxsFor: number, private log = createDebugLogger('aztec:p2p'), ) { const { p2pBlockCheckIntervalMS: checkInterval, p2pL2QueueSize } = getP2PConfigEnvVars(); @@ -340,6 +342,7 @@ export class P2PClient implements P2P { * @returns Empty promise. */ private async deleteTxsFromBlocks(blocks: L2Block[]): Promise { + this.log.debug(`Deleting txs from blocks ${blocks[0].number} to ${blocks[blocks.length - 1].number}`); for (const block of blocks) { const txHashes = block.body.txEffects.map(txEffect => txEffect.txHash); await this.txPool.deleteTxs(txHashes); @@ -363,16 +366,28 @@ export class P2PClient implements P2P { } /** - * Handles new proven blocks by deleting the txs in them. - * @param blocks - A list of existing blocks with txs that the P2P client needs to ensure the tx pool is reconciled with. + * Handles new proven blocks by deleting the txs in them, or by deleting the txs in blocks `keepProvenTxsFor` ago. + * @param blocks - A list of proven L2 blocks. * @returns Empty promise. */ private async handleProvenL2Blocks(blocks: L2Block[]): Promise { if (!blocks.length) { return Promise.resolve(); } - await this.deleteTxsFromBlocks(blocks); + + const firstBlockNum = blocks[0].number; const lastBlockNum = blocks[blocks.length - 1].number; + + if (this.keepProvenTxsFor === 0) { + await this.deleteTxsFromBlocks(blocks); + } else if (lastBlockNum - this.keepProvenTxsFor >= INITIAL_L2_BLOCK_NUM) { + const fromBlock = Math.max(INITIAL_L2_BLOCK_NUM, firstBlockNum - this.keepProvenTxsFor); + const toBlock = lastBlockNum - this.keepProvenTxsFor; + const limit = toBlock - fromBlock + 1; + const blocksToDeleteTxsFrom = await this.l2BlockSource.getBlocks(fromBlock, limit, true); + await this.deleteTxsFromBlocks(blocksToDeleteTxsFrom); + } + await this.synchedProvenBlockNumber.set(lastBlockNum); this.log.debug(`Synched to proven block ${lastBlockNum}`); await this.startServiceIfSynched(); diff --git a/yarn-project/p2p/src/config.ts b/yarn-project/p2p/src/config.ts index c8d6c5d5c54..0735cbd010b 100644 --- a/yarn-project/p2p/src/config.ts +++ b/yarn-project/p2p/src/config.ts @@ -88,6 +88,9 @@ export interface P2PConfig { * If announceUdpAddress or announceTcpAddress are not provided, query for the IP address of the machine. Default is false. */ queryForIp: boolean; + + /** How many blocks have to pass after a block is proven before its txs are deleted (zero to delete immediately once proven) */ + keepProvenTxsInPoolFor: number; } /** @@ -113,6 +116,7 @@ export function getP2PConfigEnvVars(): P2PConfig { TX_GOSSIP_VERSION, P2P_TX_PROTOCOL, P2P_QUERY_FOR_IP, + P2P_TX_POOL_KEEP_PROVEN_FOR, } = process.env; // P2P listen & announce addresses passed in format: : // P2P announce multiaddrs passed in format: /ip4/// @@ -134,6 +138,7 @@ export function getP2PConfigEnvVars(): P2PConfig { dataDirectory: DATA_DIRECTORY, txGossipVersion: TX_GOSSIP_VERSION ? new SemVer(TX_GOSSIP_VERSION) : new SemVer('0.1.0'), queryForIp: P2P_QUERY_FOR_IP === 'true', + keepProvenTxsInPoolFor: P2P_TX_POOL_KEEP_PROVEN_FOR ? +P2P_TX_POOL_KEEP_PROVEN_FOR : 0, }; return envVars; } diff --git a/yarn-project/p2p/src/service/discv5_service.test.ts b/yarn-project/p2p/src/service/discv5_service.test.ts index 67442f0a87e..4a278f37822 100644 --- a/yarn-project/p2p/src/service/discv5_service.test.ts +++ b/yarn-project/p2p/src/service/discv5_service.test.ts @@ -5,6 +5,7 @@ import type { PeerId } from '@libp2p/interface'; import { SemVer } from 'semver'; import { BootstrapNode } from '../bootstrap/bootstrap.js'; +import { type P2PConfig } from '../config.js'; import { DiscV5Service } from './discV5_service.js'; import { createLibP2PPeerId } from './libp2p_service.js'; import { PeerDiscoveryState } from './service.js'; @@ -122,7 +123,7 @@ describe('Discv5Service', () => { const createNode = async (port: number) => { const bootnodeAddr = bootNode.getENR().encodeTxt(); const peerId = await createLibP2PPeerId(); - const config = { + const config: P2PConfig = { ...baseConfig, tcpListenAddress: `0.0.0.0:${port}`, udpListenAddress: `0.0.0.0:${port}`, @@ -135,6 +136,7 @@ describe('Discv5Service', () => { p2pEnabled: true, p2pL2QueueSize: 100, txGossipVersion: new SemVer('0.1.0'), + keepProvenTxsInPoolFor: 0, }; return new DiscV5Service(peerId, config); };