From 8176af62f14d612a81bbd2a55660c6d1cc68b135 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Mon, 29 Jul 2024 12:38:10 -0300 Subject: [PATCH] feat: Do not evict tx objects from p2p tx pool immediately Tweaks the p2p client so it waits a configurable number of blocks before deleting proven txs from its pool. This can help with reorgs or troubleshooting, and it also allows slow provers to get the tx objects they need in a scenario where multiple provers can submit a proof for the same block. --- yarn-project/aztec/terraform/node/main.tf | 4 ++ .../aztec/terraform/node/variables.tf | 5 +++ yarn-project/p2p/src/client/index.ts | 2 +- yarn-project/p2p/src/client/mocks.ts | 27 ++++++++---- .../p2p/src/client/p2p_client.test.ts | 41 +++++++++++++++++-- yarn-project/p2p/src/client/p2p_client.ts | 21 ++++++++-- yarn-project/p2p/src/config.ts | 5 +++ .../p2p/src/service/discv5_service.test.ts | 4 +- 8 files changed, 93 insertions(+), 16 deletions(-) diff --git a/yarn-project/aztec/terraform/node/main.tf b/yarn-project/aztec/terraform/node/main.tf index 71963fd8074..ea9a4b8454a 100644 --- a/yarn-project/aztec/terraform/node/main.tf +++ b/yarn-project/aztec/terraform/node/main.tf @@ -324,6 +324,10 @@ resource "aws_ecs_task_definition" "aztec-node" { name = "P2P_PEER_CHECK_INTERVAL_MS" value = "2000" }, + { + name = "P2P_TX_POOL_KEEP_PROVEN_FOR", + value = tostring(var.P2P_TX_POOL_KEEP_PROVEN_FOR) + }, { name = "PROVER_AGENTS" value = "0" diff --git a/yarn-project/aztec/terraform/node/variables.tf b/yarn-project/aztec/terraform/node/variables.tf index a9a69fff3c5..620157e8c25 100644 --- a/yarn-project/aztec/terraform/node/variables.tf +++ b/yarn-project/aztec/terraform/node/variables.tf @@ -72,6 +72,11 @@ variable "P2P_ENABLED" { default = true } +variable "P2P_TX_POOL_KEEP_PROVEN_FOR" { + type = number + default = 64 +} + variable "PROVING_ENABLED" { type = bool default = false diff --git a/yarn-project/p2p/src/client/index.ts b/yarn-project/p2p/src/client/index.ts index 2bde5c4c1e7..86b5e2a8672 100644 --- a/yarn-project/p2p/src/client/index.ts +++ b/yarn-project/p2p/src/client/index.ts @@ -63,5 +63,5 @@ export const createP2PClient = async ( } else { p2pService = new DummyP2PService(); } - return new P2PClient(store, l2BlockSource, txPool, p2pService); + return new P2PClient(store, l2BlockSource, txPool, p2pService, config.keepProvenTxsInPoolFor); }; diff --git a/yarn-project/p2p/src/client/mocks.ts b/yarn-project/p2p/src/client/mocks.ts index 4b8508811d3..744fb7dd2d8 100644 --- a/yarn-project/p2p/src/client/mocks.ts +++ b/yarn-project/p2p/src/client/mocks.ts @@ -8,14 +8,23 @@ export class MockBlockSource implements L2BlockSource { private l2Blocks: L2Block[] = []; private txEffects: TxEffect[] = []; - constructor(private numBlocks = 100) { - for (let i = 0; i < this.numBlocks; i++) { - const block = L2Block.random(i); + constructor(numBlocks = 100, private provenBlockNumber?: number) { + this.addBlocks(numBlocks); + } + + public addBlocks(numBlocks: number) { + for (let i = 0; i < numBlocks; i++) { + const blockNum = this.l2Blocks.length; + const block = L2Block.random(blockNum); this.l2Blocks.push(block); this.txEffects.push(...block.body.txEffects); } } + public setProvenBlockNumber(provenBlockNumber: number) { + this.provenBlockNumber = provenBlockNumber; + } + /** * Method to fetch the rollup contract address at the base-layer. * @returns The rollup address. @@ -40,8 +49,8 @@ export class MockBlockSource implements L2BlockSource { return Promise.resolve(this.l2Blocks.length - 1); } - public getProvenBlockNumber(): Promise { - return this.getBlockNumber(); + public async getProvenBlockNumber(): Promise { + return this.provenBlockNumber ?? (await this.getBlockNumber()); } /** @@ -59,8 +68,12 @@ export class MockBlockSource implements L2BlockSource { * @param limit - The maximum number of blocks to return. * @returns The requested mocked L2 blocks. */ - public getBlocks(from: number, limit: number) { - return Promise.resolve(this.l2Blocks.slice(from, from + limit)); + public getBlocks(from: number, limit: number, proven?: boolean) { + return Promise.resolve( + this.l2Blocks + .slice(from, from + limit) + .filter(b => !proven || this.provenBlockNumber === undefined || b.number <= this.provenBlockNumber), + ); } /** diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index 0e51054cf03..d064264d4c3 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -1,4 +1,5 @@ -import { type L2BlockSource, mockTx } from '@aztec/circuit-types'; +import { mockTx } from '@aztec/circuit-types'; +import { retryUntil } from '@aztec/foundation/retry'; import { type AztecKVStore } from '@aztec/kv-store'; import { openTmpStore } from '@aztec/kv-store/utils'; @@ -18,7 +19,7 @@ type Mockify = { describe('In-Memory P2P Client', () => { let txPool: Mockify; - let blockSource: L2BlockSource; + let blockSource: MockBlockSource; let p2pService: Mockify; let kvStore: AztecKVStore; let client: P2PClient; @@ -45,9 +46,14 @@ describe('In-Memory P2P Client', () => { blockSource = new MockBlockSource(); kvStore = openTmpStore(); - client = new P2PClient(kvStore, blockSource, txPool, p2pService); + client = new P2PClient(kvStore, blockSource, txPool, p2pService, 0); }); + const advanceToProvenBlock = async (provenBlockNum: number) => { + blockSource.setProvenBlockNumber(provenBlockNum); + await retryUntil(() => Promise.resolve(client.getSyncedProvenBlockNum() >= provenBlockNum), 'synced', 10, 0.1); + }; + it('can start & stop', async () => { expect(await client.isReady()).toEqual(false); @@ -98,7 +104,34 @@ describe('In-Memory P2P Client', () => { await client.start(); await client.stop(); - const client2 = new P2PClient(kvStore, blockSource, txPool, p2pService); + const client2 = new P2PClient(kvStore, blockSource, txPool, p2pService, 0); expect(client2.getSyncedLatestBlockNum()).toEqual(client.getSyncedLatestBlockNum()); }); + + it('deletes txs once block is proven', async () => { + blockSource.setProvenBlockNumber(0); + await client.start(); + expect(txPool.deleteTxs).not.toHaveBeenCalled(); + + await advanceToProvenBlock(5); + expect(txPool.deleteTxs).toHaveBeenCalledTimes(5); + await client.stop(); + }); + + it('deletes txs after waiting the set number of blocks', async () => { + client = new P2PClient(kvStore, blockSource, txPool, p2pService, 10); + blockSource.setProvenBlockNumber(0); + await client.start(); + expect(txPool.deleteTxs).not.toHaveBeenCalled(); + + await advanceToProvenBlock(5); + expect(txPool.deleteTxs).not.toHaveBeenCalled(); + + await advanceToProvenBlock(12); + expect(txPool.deleteTxs).toHaveBeenCalledTimes(2); + + await advanceToProvenBlock(20); + expect(txPool.deleteTxs).toHaveBeenCalledTimes(10); + await client.stop(); + }); }); diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index d4b74da2eae..89fee2b4898 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -123,6 +123,7 @@ export class P2PClient implements P2P { * @param l2BlockSource - P2P client's source for fetching existing blocks. * @param txPool - The client's instance of a transaction pool. Defaults to in-memory implementation. * @param p2pService - The concrete instance of p2p networking to use. + * @param keepProvenTxsFor - How many blocks have to pass after a block is proven before its txs are deleted (zero to delete immediately once proven). * @param log - A logger. */ constructor( @@ -130,6 +131,7 @@ export class P2PClient implements P2P { private l2BlockSource: L2BlockSource, private txPool: TxPool, private p2pService: P2PService, + private keepProvenTxsFor: number, private log = createDebugLogger('aztec:p2p'), ) { const { p2pBlockCheckIntervalMS: checkInterval, p2pL2QueueSize } = getP2PConfigEnvVars(); @@ -340,6 +342,7 @@ export class P2PClient implements P2P { * @returns Empty promise. */ private async deleteTxsFromBlocks(blocks: L2Block[]): Promise { + this.log.debug(`Deleting txs from blocks ${blocks[0].number} to ${blocks[blocks.length - 1].number}`); for (const block of blocks) { const txHashes = block.body.txEffects.map(txEffect => txEffect.txHash); await this.txPool.deleteTxs(txHashes); @@ -363,16 +366,28 @@ export class P2PClient implements P2P { } /** - * Handles new proven blocks by deleting the txs in them. - * @param blocks - A list of existing blocks with txs that the P2P client needs to ensure the tx pool is reconciled with. + * Handles new proven blocks by deleting the txs in them, or by deleting the txs in blocks `keepProvenTxsFor` ago. + * @param blocks - A list of proven L2 blocks. * @returns Empty promise. */ private async handleProvenL2Blocks(blocks: L2Block[]): Promise { if (!blocks.length) { return Promise.resolve(); } - await this.deleteTxsFromBlocks(blocks); + + const firstBlockNum = blocks[0].number; const lastBlockNum = blocks[blocks.length - 1].number; + + if (this.keepProvenTxsFor === 0) { + await this.deleteTxsFromBlocks(blocks); + } else if (lastBlockNum - this.keepProvenTxsFor >= INITIAL_L2_BLOCK_NUM) { + const fromBlock = Math.max(INITIAL_L2_BLOCK_NUM, firstBlockNum - this.keepProvenTxsFor); + const toBlock = lastBlockNum - this.keepProvenTxsFor; + const limit = toBlock - fromBlock + 1; + const blocksToDeleteTxsFrom = await this.l2BlockSource.getBlocks(fromBlock, limit, true); + await this.deleteTxsFromBlocks(blocksToDeleteTxsFrom); + } + await this.synchedProvenBlockNumber.set(lastBlockNum); this.log.debug(`Synched to proven block ${lastBlockNum}`); await this.startServiceIfSynched(); diff --git a/yarn-project/p2p/src/config.ts b/yarn-project/p2p/src/config.ts index c8d6c5d5c54..0735cbd010b 100644 --- a/yarn-project/p2p/src/config.ts +++ b/yarn-project/p2p/src/config.ts @@ -88,6 +88,9 @@ export interface P2PConfig { * If announceUdpAddress or announceTcpAddress are not provided, query for the IP address of the machine. Default is false. */ queryForIp: boolean; + + /** How many blocks have to pass after a block is proven before its txs are deleted (zero to delete immediately once proven) */ + keepProvenTxsInPoolFor: number; } /** @@ -113,6 +116,7 @@ export function getP2PConfigEnvVars(): P2PConfig { TX_GOSSIP_VERSION, P2P_TX_PROTOCOL, P2P_QUERY_FOR_IP, + P2P_TX_POOL_KEEP_PROVEN_FOR, } = process.env; // P2P listen & announce addresses passed in format: : // P2P announce multiaddrs passed in format: /ip4/// @@ -134,6 +138,7 @@ export function getP2PConfigEnvVars(): P2PConfig { dataDirectory: DATA_DIRECTORY, txGossipVersion: TX_GOSSIP_VERSION ? new SemVer(TX_GOSSIP_VERSION) : new SemVer('0.1.0'), queryForIp: P2P_QUERY_FOR_IP === 'true', + keepProvenTxsInPoolFor: P2P_TX_POOL_KEEP_PROVEN_FOR ? +P2P_TX_POOL_KEEP_PROVEN_FOR : 0, }; return envVars; } diff --git a/yarn-project/p2p/src/service/discv5_service.test.ts b/yarn-project/p2p/src/service/discv5_service.test.ts index 67442f0a87e..4a278f37822 100644 --- a/yarn-project/p2p/src/service/discv5_service.test.ts +++ b/yarn-project/p2p/src/service/discv5_service.test.ts @@ -5,6 +5,7 @@ import type { PeerId } from '@libp2p/interface'; import { SemVer } from 'semver'; import { BootstrapNode } from '../bootstrap/bootstrap.js'; +import { type P2PConfig } from '../config.js'; import { DiscV5Service } from './discV5_service.js'; import { createLibP2PPeerId } from './libp2p_service.js'; import { PeerDiscoveryState } from './service.js'; @@ -122,7 +123,7 @@ describe('Discv5Service', () => { const createNode = async (port: number) => { const bootnodeAddr = bootNode.getENR().encodeTxt(); const peerId = await createLibP2PPeerId(); - const config = { + const config: P2PConfig = { ...baseConfig, tcpListenAddress: `0.0.0.0:${port}`, udpListenAddress: `0.0.0.0:${port}`, @@ -135,6 +136,7 @@ describe('Discv5Service', () => { p2pEnabled: true, p2pL2QueueSize: 100, txGossipVersion: new SemVer('0.1.0'), + keepProvenTxsInPoolFor: 0, }; return new DiscV5Service(peerId, config); };