Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Do not evict tx objects from p2p tx pool immediately #7652

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions yarn-project/aztec/terraform/node/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ resource "aws_ecs_task_definition" "aztec-node" {
name = "P2P_PEER_CHECK_INTERVAL_MS"
value = "2000"
},
{
name = "P2P_TX_POOL_KEEP_PROVEN_FOR",
value = tostring(var.P2P_TX_POOL_KEEP_PROVEN_FOR)
},
{
name = "PROVER_AGENTS"
value = "0"
Expand Down
5 changes: 5 additions & 0 deletions yarn-project/aztec/terraform/node/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ variable "P2P_ENABLED" {
default = true
}

variable "P2P_TX_POOL_KEEP_PROVEN_FOR" {
type = number
default = 64
}

variable "PROVING_ENABLED" {
type = bool
default = false
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,5 @@ export const createP2PClient = async (
} else {
p2pService = new DummyP2PService();
}
return new P2PClient(store, l2BlockSource, txPool, p2pService);
return new P2PClient(store, l2BlockSource, txPool, p2pService, config.keepProvenTxsInPoolFor);
};
27 changes: 20 additions & 7 deletions yarn-project/p2p/src/client/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,23 @@ export class MockBlockSource implements L2BlockSource {
private l2Blocks: L2Block[] = [];
private txEffects: TxEffect[] = [];

constructor(private numBlocks = 100) {
for (let i = 0; i < this.numBlocks; i++) {
const block = L2Block.random(i);
constructor(numBlocks = 100, private provenBlockNumber?: number) {
this.addBlocks(numBlocks);
}

public addBlocks(numBlocks: number) {
for (let i = 0; i < numBlocks; i++) {
const blockNum = this.l2Blocks.length;
const block = L2Block.random(blockNum);
this.l2Blocks.push(block);
this.txEffects.push(...block.body.txEffects);
}
}

public setProvenBlockNumber(provenBlockNumber: number) {
this.provenBlockNumber = provenBlockNumber;
}

/**
* Method to fetch the rollup contract address at the base-layer.
* @returns The rollup address.
Expand All @@ -40,8 +49,8 @@ export class MockBlockSource implements L2BlockSource {
return Promise.resolve(this.l2Blocks.length - 1);
}

public getProvenBlockNumber(): Promise<number> {
return this.getBlockNumber();
public async getProvenBlockNumber(): Promise<number> {
return this.provenBlockNumber ?? (await this.getBlockNumber());
}

/**
Expand All @@ -59,8 +68,12 @@ export class MockBlockSource implements L2BlockSource {
* @param limit - The maximum number of blocks to return.
* @returns The requested mocked L2 blocks.
*/
public getBlocks(from: number, limit: number) {
return Promise.resolve(this.l2Blocks.slice(from, from + limit));
public getBlocks(from: number, limit: number, proven?: boolean) {
return Promise.resolve(
this.l2Blocks
.slice(from, from + limit)
.filter(b => !proven || this.provenBlockNumber === undefined || b.number <= this.provenBlockNumber),
);
}

/**
Expand Down
41 changes: 37 additions & 4 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { type L2BlockSource, mockTx } from '@aztec/circuit-types';
import { mockTx } from '@aztec/circuit-types';
import { retryUntil } from '@aztec/foundation/retry';
import { type AztecKVStore } from '@aztec/kv-store';
import { openTmpStore } from '@aztec/kv-store/utils';

Expand All @@ -18,7 +19,7 @@ type Mockify<T> = {

describe('In-Memory P2P Client', () => {
let txPool: Mockify<TxPool>;
let blockSource: L2BlockSource;
let blockSource: MockBlockSource;
let p2pService: Mockify<P2PService>;
let kvStore: AztecKVStore;
let client: P2PClient;
Expand All @@ -45,9 +46,14 @@ describe('In-Memory P2P Client', () => {
blockSource = new MockBlockSource();

kvStore = openTmpStore();
client = new P2PClient(kvStore, blockSource, txPool, p2pService);
client = new P2PClient(kvStore, blockSource, txPool, p2pService, 0);
});

const advanceToProvenBlock = async (provenBlockNum: number) => {
blockSource.setProvenBlockNumber(provenBlockNum);
await retryUntil(() => Promise.resolve(client.getSyncedProvenBlockNum() >= provenBlockNum), 'synced', 10, 0.1);
};

it('can start & stop', async () => {
expect(await client.isReady()).toEqual(false);

Expand Down Expand Up @@ -98,7 +104,34 @@ describe('In-Memory P2P Client', () => {
await client.start();
await client.stop();

const client2 = new P2PClient(kvStore, blockSource, txPool, p2pService);
const client2 = new P2PClient(kvStore, blockSource, txPool, p2pService, 0);
expect(client2.getSyncedLatestBlockNum()).toEqual(client.getSyncedLatestBlockNum());
});

it('deletes txs once block is proven', async () => {
blockSource.setProvenBlockNumber(0);
await client.start();
expect(txPool.deleteTxs).not.toHaveBeenCalled();

await advanceToProvenBlock(5);
expect(txPool.deleteTxs).toHaveBeenCalledTimes(5);
await client.stop();
});

it('deletes txs after waiting the set number of blocks', async () => {
client = new P2PClient(kvStore, blockSource, txPool, p2pService, 10);
blockSource.setProvenBlockNumber(0);
await client.start();
expect(txPool.deleteTxs).not.toHaveBeenCalled();

await advanceToProvenBlock(5);
expect(txPool.deleteTxs).not.toHaveBeenCalled();

await advanceToProvenBlock(12);
expect(txPool.deleteTxs).toHaveBeenCalledTimes(2);

await advanceToProvenBlock(20);
expect(txPool.deleteTxs).toHaveBeenCalledTimes(10);
await client.stop();
});
});
21 changes: 18 additions & 3 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,15 @@ export class P2PClient implements P2P {
* @param l2BlockSource - P2P client's source for fetching existing blocks.
* @param txPool - The client's instance of a transaction pool. Defaults to in-memory implementation.
* @param p2pService - The concrete instance of p2p networking to use.
* @param keepProvenTxsFor - How many blocks have to pass after a block is proven before its txs are deleted (zero to delete immediately once proven).
* @param log - A logger.
*/
constructor(
store: AztecKVStore,
private l2BlockSource: L2BlockSource,
private txPool: TxPool,
private p2pService: P2PService,
private keepProvenTxsFor: number,
private log = createDebugLogger('aztec:p2p'),
) {
const { p2pBlockCheckIntervalMS: checkInterval, p2pL2QueueSize } = getP2PConfigEnvVars();
Expand Down Expand Up @@ -340,6 +342,7 @@ export class P2PClient implements P2P {
* @returns Empty promise.
*/
private async deleteTxsFromBlocks(blocks: L2Block[]): Promise<void> {
this.log.debug(`Deleting txs from blocks ${blocks[0].number} to ${blocks[blocks.length - 1].number}`);
for (const block of blocks) {
const txHashes = block.body.txEffects.map(txEffect => txEffect.txHash);
await this.txPool.deleteTxs(txHashes);
Expand All @@ -363,16 +366,28 @@ export class P2PClient implements P2P {
}

/**
* Handles new proven blocks by deleting the txs in them.
* @param blocks - A list of existing blocks with txs that the P2P client needs to ensure the tx pool is reconciled with.
* Handles new proven blocks by deleting the txs in them, or by deleting the txs in blocks `keepProvenTxsFor` ago.
* @param blocks - A list of proven L2 blocks.
* @returns Empty promise.
*/
private async handleProvenL2Blocks(blocks: L2Block[]): Promise<void> {
if (!blocks.length) {
return Promise.resolve();
}
await this.deleteTxsFromBlocks(blocks);

const firstBlockNum = blocks[0].number;
const lastBlockNum = blocks[blocks.length - 1].number;

if (this.keepProvenTxsFor === 0) {
await this.deleteTxsFromBlocks(blocks);
} else if (lastBlockNum - this.keepProvenTxsFor >= INITIAL_L2_BLOCK_NUM) {
const fromBlock = Math.max(INITIAL_L2_BLOCK_NUM, firstBlockNum - this.keepProvenTxsFor);
const toBlock = lastBlockNum - this.keepProvenTxsFor;
const limit = toBlock - fromBlock + 1;
const blocksToDeleteTxsFrom = await this.l2BlockSource.getBlocks(fromBlock, limit, true);
await this.deleteTxsFromBlocks(blocksToDeleteTxsFrom);
}

await this.synchedProvenBlockNumber.set(lastBlockNum);
this.log.debug(`Synched to proven block ${lastBlockNum}`);
await this.startServiceIfSynched();
Expand Down
5 changes: 5 additions & 0 deletions yarn-project/p2p/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ export interface P2PConfig {
* If announceUdpAddress or announceTcpAddress are not provided, query for the IP address of the machine. Default is false.
*/
queryForIp: boolean;

/** How many blocks have to pass after a block is proven before its txs are deleted (zero to delete immediately once proven) */
keepProvenTxsInPoolFor: number;
}

/**
Expand All @@ -113,6 +116,7 @@ export function getP2PConfigEnvVars(): P2PConfig {
TX_GOSSIP_VERSION,
P2P_TX_PROTOCOL,
P2P_QUERY_FOR_IP,
P2P_TX_POOL_KEEP_PROVEN_FOR,
} = process.env;
// P2P listen & announce addresses passed in format: <IP_ADDRESS>:<PORT>
// P2P announce multiaddrs passed in format: /ip4/<IP_ADDRESS>/<protocol>/<PORT>
Expand All @@ -134,6 +138,7 @@ export function getP2PConfigEnvVars(): P2PConfig {
dataDirectory: DATA_DIRECTORY,
txGossipVersion: TX_GOSSIP_VERSION ? new SemVer(TX_GOSSIP_VERSION) : new SemVer('0.1.0'),
queryForIp: P2P_QUERY_FOR_IP === 'true',
keepProvenTxsInPoolFor: P2P_TX_POOL_KEEP_PROVEN_FOR ? +P2P_TX_POOL_KEEP_PROVEN_FOR : 0,
};
return envVars;
}
4 changes: 3 additions & 1 deletion yarn-project/p2p/src/service/discv5_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { PeerId } from '@libp2p/interface';
import { SemVer } from 'semver';

import { BootstrapNode } from '../bootstrap/bootstrap.js';
import { type P2PConfig } from '../config.js';
import { DiscV5Service } from './discV5_service.js';
import { createLibP2PPeerId } from './libp2p_service.js';
import { PeerDiscoveryState } from './service.js';
Expand Down Expand Up @@ -122,7 +123,7 @@ describe('Discv5Service', () => {
const createNode = async (port: number) => {
const bootnodeAddr = bootNode.getENR().encodeTxt();
const peerId = await createLibP2PPeerId();
const config = {
const config: P2PConfig = {
...baseConfig,
tcpListenAddress: `0.0.0.0:${port}`,
udpListenAddress: `0.0.0.0:${port}`,
Expand All @@ -135,6 +136,7 @@ describe('Discv5Service', () => {
p2pEnabled: true,
p2pL2QueueSize: 100,
txGossipVersion: new SemVer('0.1.0'),
keepProvenTxsInPoolFor: 0,
};
return new DiscV5Service(peerId, config);
};
Expand Down
Loading