Skip to content

Commit

Permalink
feat: Do not evict tx objects from p2p tx pool immediately
Browse files Browse the repository at this point in the history
Tweaks the p2p client so it waits a configurable number of blocks before
deleting proven txs from its pool. This can help with reorgs or
troubleshooting, and it also allows slow provers to get the tx objects
they need in a scenario where multiple provers can submit a proof for
the same block.
  • Loading branch information
spalladino committed Jul 29, 2024
1 parent f2029be commit 8176af6
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 16 deletions.
4 changes: 4 additions & 0 deletions yarn-project/aztec/terraform/node/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ resource "aws_ecs_task_definition" "aztec-node" {
name = "P2P_PEER_CHECK_INTERVAL_MS"
value = "2000"
},
{
name = "P2P_TX_POOL_KEEP_PROVEN_FOR",
value = tostring(var.P2P_TX_POOL_KEEP_PROVEN_FOR)
},
{
name = "PROVER_AGENTS"
value = "0"
Expand Down
5 changes: 5 additions & 0 deletions yarn-project/aztec/terraform/node/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ variable "P2P_ENABLED" {
default = true
}

variable "P2P_TX_POOL_KEEP_PROVEN_FOR" {
type = number
default = 64
}

variable "PROVING_ENABLED" {
type = bool
default = false
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,5 @@ export const createP2PClient = async (
} else {
p2pService = new DummyP2PService();
}
return new P2PClient(store, l2BlockSource, txPool, p2pService);
return new P2PClient(store, l2BlockSource, txPool, p2pService, config.keepProvenTxsInPoolFor);
};
27 changes: 20 additions & 7 deletions yarn-project/p2p/src/client/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,23 @@ export class MockBlockSource implements L2BlockSource {
private l2Blocks: L2Block[] = [];
private txEffects: TxEffect[] = [];

constructor(private numBlocks = 100) {
for (let i = 0; i < this.numBlocks; i++) {
const block = L2Block.random(i);
constructor(numBlocks = 100, private provenBlockNumber?: number) {
this.addBlocks(numBlocks);
}

public addBlocks(numBlocks: number) {
for (let i = 0; i < numBlocks; i++) {
const blockNum = this.l2Blocks.length;
const block = L2Block.random(blockNum);
this.l2Blocks.push(block);
this.txEffects.push(...block.body.txEffects);
}
}

public setProvenBlockNumber(provenBlockNumber: number) {
this.provenBlockNumber = provenBlockNumber;
}

/**
* Method to fetch the rollup contract address at the base-layer.
* @returns The rollup address.
Expand All @@ -40,8 +49,8 @@ export class MockBlockSource implements L2BlockSource {
return Promise.resolve(this.l2Blocks.length - 1);
}

public getProvenBlockNumber(): Promise<number> {
return this.getBlockNumber();
public async getProvenBlockNumber(): Promise<number> {
return this.provenBlockNumber ?? (await this.getBlockNumber());
}

/**
Expand All @@ -59,8 +68,12 @@ export class MockBlockSource implements L2BlockSource {
* @param limit - The maximum number of blocks to return.
* @returns The requested mocked L2 blocks.
*/
public getBlocks(from: number, limit: number) {
return Promise.resolve(this.l2Blocks.slice(from, from + limit));
public getBlocks(from: number, limit: number, proven?: boolean) {
return Promise.resolve(
this.l2Blocks
.slice(from, from + limit)
.filter(b => !proven || this.provenBlockNumber === undefined || b.number <= this.provenBlockNumber),
);
}

/**
Expand Down
41 changes: 37 additions & 4 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { type L2BlockSource, mockTx } from '@aztec/circuit-types';
import { mockTx } from '@aztec/circuit-types';
import { retryUntil } from '@aztec/foundation/retry';
import { type AztecKVStore } from '@aztec/kv-store';
import { openTmpStore } from '@aztec/kv-store/utils';

Expand All @@ -18,7 +19,7 @@ type Mockify<T> = {

describe('In-Memory P2P Client', () => {
let txPool: Mockify<TxPool>;
let blockSource: L2BlockSource;
let blockSource: MockBlockSource;
let p2pService: Mockify<P2PService>;
let kvStore: AztecKVStore;
let client: P2PClient;
Expand All @@ -45,9 +46,14 @@ describe('In-Memory P2P Client', () => {
blockSource = new MockBlockSource();

kvStore = openTmpStore();
client = new P2PClient(kvStore, blockSource, txPool, p2pService);
client = new P2PClient(kvStore, blockSource, txPool, p2pService, 0);
});

const advanceToProvenBlock = async (provenBlockNum: number) => {
blockSource.setProvenBlockNumber(provenBlockNum);
await retryUntil(() => Promise.resolve(client.getSyncedProvenBlockNum() >= provenBlockNum), 'synced', 10, 0.1);
};

it('can start & stop', async () => {
expect(await client.isReady()).toEqual(false);

Expand Down Expand Up @@ -98,7 +104,34 @@ describe('In-Memory P2P Client', () => {
await client.start();
await client.stop();

const client2 = new P2PClient(kvStore, blockSource, txPool, p2pService);
const client2 = new P2PClient(kvStore, blockSource, txPool, p2pService, 0);
expect(client2.getSyncedLatestBlockNum()).toEqual(client.getSyncedLatestBlockNum());
});

it('deletes txs once block is proven', async () => {
blockSource.setProvenBlockNumber(0);
await client.start();
expect(txPool.deleteTxs).not.toHaveBeenCalled();

await advanceToProvenBlock(5);
expect(txPool.deleteTxs).toHaveBeenCalledTimes(5);
await client.stop();
});

it('deletes txs after waiting the set number of blocks', async () => {
client = new P2PClient(kvStore, blockSource, txPool, p2pService, 10);
blockSource.setProvenBlockNumber(0);
await client.start();
expect(txPool.deleteTxs).not.toHaveBeenCalled();

await advanceToProvenBlock(5);
expect(txPool.deleteTxs).not.toHaveBeenCalled();

await advanceToProvenBlock(12);
expect(txPool.deleteTxs).toHaveBeenCalledTimes(2);

await advanceToProvenBlock(20);
expect(txPool.deleteTxs).toHaveBeenCalledTimes(10);
await client.stop();
});
});
21 changes: 18 additions & 3 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,15 @@ export class P2PClient implements P2P {
* @param l2BlockSource - P2P client's source for fetching existing blocks.
* @param txPool - The client's instance of a transaction pool. Defaults to in-memory implementation.
* @param p2pService - The concrete instance of p2p networking to use.
* @param keepProvenTxsFor - How many blocks have to pass after a block is proven before its txs are deleted (zero to delete immediately once proven).
* @param log - A logger.
*/
constructor(
store: AztecKVStore,
private l2BlockSource: L2BlockSource,
private txPool: TxPool,
private p2pService: P2PService,
private keepProvenTxsFor: number,
private log = createDebugLogger('aztec:p2p'),
) {
const { p2pBlockCheckIntervalMS: checkInterval, p2pL2QueueSize } = getP2PConfigEnvVars();
Expand Down Expand Up @@ -340,6 +342,7 @@ export class P2PClient implements P2P {
* @returns Empty promise.
*/
private async deleteTxsFromBlocks(blocks: L2Block[]): Promise<void> {
this.log.debug(`Deleting txs from blocks ${blocks[0].number} to ${blocks[blocks.length - 1].number}`);
for (const block of blocks) {
const txHashes = block.body.txEffects.map(txEffect => txEffect.txHash);
await this.txPool.deleteTxs(txHashes);
Expand All @@ -363,16 +366,28 @@ export class P2PClient implements P2P {
}

/**
* Handles new proven blocks by deleting the txs in them.
* @param blocks - A list of existing blocks with txs that the P2P client needs to ensure the tx pool is reconciled with.
* Handles new proven blocks by deleting the txs in them, or by deleting the txs in blocks `keepProvenTxsFor` ago.
* @param blocks - A list of proven L2 blocks.
* @returns Empty promise.
*/
private async handleProvenL2Blocks(blocks: L2Block[]): Promise<void> {
if (!blocks.length) {
return Promise.resolve();
}
await this.deleteTxsFromBlocks(blocks);

const firstBlockNum = blocks[0].number;
const lastBlockNum = blocks[blocks.length - 1].number;

if (this.keepProvenTxsFor === 0) {
await this.deleteTxsFromBlocks(blocks);
} else if (lastBlockNum - this.keepProvenTxsFor >= INITIAL_L2_BLOCK_NUM) {
const fromBlock = Math.max(INITIAL_L2_BLOCK_NUM, firstBlockNum - this.keepProvenTxsFor);
const toBlock = lastBlockNum - this.keepProvenTxsFor;
const limit = toBlock - fromBlock + 1;
const blocksToDeleteTxsFrom = await this.l2BlockSource.getBlocks(fromBlock, limit, true);
await this.deleteTxsFromBlocks(blocksToDeleteTxsFrom);
}

await this.synchedProvenBlockNumber.set(lastBlockNum);
this.log.debug(`Synched to proven block ${lastBlockNum}`);
await this.startServiceIfSynched();
Expand Down
5 changes: 5 additions & 0 deletions yarn-project/p2p/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ export interface P2PConfig {
* If announceUdpAddress or announceTcpAddress are not provided, query for the IP address of the machine. Default is false.
*/
queryForIp: boolean;

/** How many blocks have to pass after a block is proven before its txs are deleted (zero to delete immediately once proven) */
keepProvenTxsInPoolFor: number;
}

/**
Expand All @@ -113,6 +116,7 @@ export function getP2PConfigEnvVars(): P2PConfig {
TX_GOSSIP_VERSION,
P2P_TX_PROTOCOL,
P2P_QUERY_FOR_IP,
P2P_TX_POOL_KEEP_PROVEN_FOR,
} = process.env;
// P2P listen & announce addresses passed in format: <IP_ADDRESS>:<PORT>
// P2P announce multiaddrs passed in format: /ip4/<IP_ADDRESS>/<protocol>/<PORT>
Expand All @@ -134,6 +138,7 @@ export function getP2PConfigEnvVars(): P2PConfig {
dataDirectory: DATA_DIRECTORY,
txGossipVersion: TX_GOSSIP_VERSION ? new SemVer(TX_GOSSIP_VERSION) : new SemVer('0.1.0'),
queryForIp: P2P_QUERY_FOR_IP === 'true',
keepProvenTxsInPoolFor: P2P_TX_POOL_KEEP_PROVEN_FOR ? +P2P_TX_POOL_KEEP_PROVEN_FOR : 0,
};
return envVars;
}
4 changes: 3 additions & 1 deletion yarn-project/p2p/src/service/discv5_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { PeerId } from '@libp2p/interface';
import { SemVer } from 'semver';

import { BootstrapNode } from '../bootstrap/bootstrap.js';
import { type P2PConfig } from '../config.js';
import { DiscV5Service } from './discV5_service.js';
import { createLibP2PPeerId } from './libp2p_service.js';
import { PeerDiscoveryState } from './service.js';
Expand Down Expand Up @@ -122,7 +123,7 @@ describe('Discv5Service', () => {
const createNode = async (port: number) => {
const bootnodeAddr = bootNode.getENR().encodeTxt();
const peerId = await createLibP2PPeerId();
const config = {
const config: P2PConfig = {
...baseConfig,
tcpListenAddress: `0.0.0.0:${port}`,
udpListenAddress: `0.0.0.0:${port}`,
Expand All @@ -135,6 +136,7 @@ describe('Discv5Service', () => {
p2pEnabled: true,
p2pL2QueueSize: 100,
txGossipVersion: new SemVer('0.1.0'),
keepProvenTxsInPoolFor: 0,
};
return new DiscV5Service(peerId, config);
};
Expand Down

0 comments on commit 8176af6

Please sign in to comment.