Skip to content

Commit

Permalink
feat: Build blocks using txs with higher fee first (#11093)
Browse files Browse the repository at this point in the history
Updates the tx pool to store pending tx hashes sorted by fee. We use the
sum of the l2 and da priority fees for this.

Fixes #11084
  • Loading branch information
spalladino authored Jan 8, 2025
1 parent 8105d37 commit def7cd7
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 47 deletions.
5 changes: 5 additions & 0 deletions yarn-project/circuit-types/src/tx/tx.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
ClientIvcProof,
Fr,
type GasSettings,
PrivateKernelTailCircuitPublicInputs,
PrivateLog,
type PrivateToPublicAccumulatedData,
Expand Down Expand Up @@ -88,6 +89,10 @@ export class Tx extends Gossipable {
return this.publicTeardownFunctionCall.isEmpty() ? undefined : this.publicTeardownFunctionCall;
}

getGasSettings(): GasSettings {
return this.data.constants.txContext.gasSettings;
}

/**
* Deserializes the Tx object from a Buffer.
* @param buffer - Buffer or BufferReader object to deserialize.
Expand Down
101 changes: 57 additions & 44 deletions yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Tx, TxHash } from '@aztec/circuit-types';
import { type TxAddedToPoolStats } from '@aztec/circuit-types/stats';
import { type Logger, createLogger } from '@aztec/foundation/log';
import { type AztecKVStore, type AztecMap, type AztecSet } from '@aztec/kv-store';
import { type AztecKVStore, type AztecMap, type AztecMultiMap } from '@aztec/kv-store';
import { type TelemetryClient } from '@aztec/telemetry-client';

import { PoolInstrumentation, PoolName } from '../instrumentation.js';
import { getPendingTxPriority } from './priority.js';
import { type TxPool } from './tx_pool.js';

/**
Expand All @@ -16,10 +17,11 @@ export class AztecKVTxPool implements TxPool {
/** Our tx pool, stored as a Map, with K: tx hash and V: the transaction. */
#txs: AztecMap<string, Buffer>;

/** Index for pending txs. */
#pendingTxs: AztecSet<string>;
/** Index for mined txs. */
#minedTxs: AztecMap<string, number>;
/** Index from tx hash to the block number in which they were mined, filtered by mined txs. */
#minedTxHashToBlock: AztecMap<string, number>;

/** Index from tx priority (stored as hex) to its tx hash, filtered by pending txs. */
#pendingTxPriorityToHash: AztecMultiMap<string, string>;

#log: Logger;

Expand All @@ -32,27 +34,34 @@ export class AztecKVTxPool implements TxPool {
*/
constructor(store: AztecKVStore, telemetry: TelemetryClient, log = createLogger('p2p:tx_pool')) {
this.#txs = store.openMap('txs');
this.#minedTxs = store.openMap('minedTxs');
this.#pendingTxs = store.openSet('pendingTxs');
this.#minedTxHashToBlock = store.openMap('txHashToBlockMined');
this.#pendingTxPriorityToHash = store.openMultiMap('pendingTxFeeToHash');

this.#store = store;
this.#log = log;
this.#metrics = new PoolInstrumentation(telemetry, PoolName.TX_POOL, () => store.estimateSize());
}

public markAsMined(txHashes: TxHash[], blockNumber: number): Promise<void> {
if (txHashes.length === 0) {
return Promise.resolve();
}

let deletedPending = 0;
return this.#store.transaction(() => {
let deleted = 0;
for (const hash of txHashes) {
const key = hash.toString();
void this.#minedTxs.set(key, blockNumber);
if (this.#pendingTxs.has(key)) {
deleted++;
void this.#pendingTxs.delete(key);
void this.#minedTxHashToBlock.set(key, blockNumber);

const tx = this.getTxByHash(hash);
if (tx) {
deletedPending++;
const fee = getPendingTxPriority(tx);
void this.#pendingTxPriorityToHash.deleteValue(fee, key);
}
}
this.#metrics.recordRemovedObjects(deleted, 'pending');
this.#metrics.recordAddedObjects(txHashes.length, 'mined');
this.#metrics.recordRemovedObjects(deletedPending, 'pending');
});
}

Expand All @@ -61,44 +70,41 @@ export class AztecKVTxPool implements TxPool {
return Promise.resolve();
}

let markedAsPending = 0;
return this.#store.transaction(() => {
let deleted = 0;
let added = 0;
for (const hash of txHashes) {
const key = hash.toString();
if (this.#minedTxs.has(key)) {
deleted++;
void this.#minedTxs.delete(key);
}
void this.#minedTxHashToBlock.delete(key);

if (this.#txs.has(key)) {
added++;
void this.#pendingTxs.add(key);
const tx = this.getTxByHash(hash);
if (tx) {
void this.#pendingTxPriorityToHash.set(getPendingTxPriority(tx), key);
markedAsPending++;
}
}

this.#metrics.recordRemovedObjects(deleted, 'mined');
this.#metrics.recordAddedObjects(added, 'pending');
this.#metrics.recordAddedObjects(markedAsPending, 'pending');
this.#metrics.recordRemovedObjects(markedAsPending, 'mined');
});
}

public getPendingTxHashes(): TxHash[] {
return Array.from(this.#pendingTxs.entries()).map(x => TxHash.fromString(x));
return Array.from(this.#pendingTxPriorityToHash.values({ reverse: true })).map(x => TxHash.fromString(x));
}

public getMinedTxHashes(): [TxHash, number][] {
return Array.from(this.#minedTxs.entries()).map(([txHash, blockNumber]) => [
return Array.from(this.#minedTxHashToBlock.entries()).map(([txHash, blockNumber]) => [
TxHash.fromString(txHash),
blockNumber,
]);
}

public getTxStatus(txHash: TxHash): 'pending' | 'mined' | undefined {
const key = txHash.toString();
if (this.#pendingTxs.has(key)) {
return 'pending';
} else if (this.#minedTxs.has(key)) {
if (this.#minedTxHashToBlock.has(key)) {
return 'mined';
} else if (this.#txs.has(key)) {
return 'pending';
} else {
return undefined;
}
Expand All @@ -120,22 +126,22 @@ export class AztecKVTxPool implements TxPool {
* @returns Empty promise.
*/
public addTxs(txs: Tx[]): Promise<void> {
const txHashes = txs.map(tx => tx.getTxHash());
return this.#store.transaction(() => {
let pendingCount = 0;
for (const [i, tx] of txs.entries()) {
const txHash = txHashes[i];
for (const tx of txs) {
const txHash = tx.getTxHash();
this.#log.verbose(`Adding tx ${txHash.toString()} to pool`, {
eventName: 'tx-added-to-pool',
...tx.getStats(),
} satisfies TxAddedToPoolStats);

const key = txHash.toString();
void this.#txs.set(key, tx.toBuffer());
if (!this.#minedTxs.has(key)) {

if (!this.#minedTxHashToBlock.has(key)) {
pendingCount++;
// REFACTOR: Use an lmdb conditional write to avoid race conditions with this write tx
void this.#pendingTxs.add(key);
void this.#pendingTxPriorityToHash.set(getPendingTxPriority(tx), key);
this.#metrics.recordSize(tx);
}
}
Expand All @@ -150,20 +156,27 @@ export class AztecKVTxPool implements TxPool {
* @returns The number of transactions that was deleted from the pool.
*/
public deleteTxs(txHashes: TxHash[]): Promise<void> {
let pendingDeleted = 0;
let minedDeleted = 0;

return this.#store.transaction(() => {
let pendingDeleted = 0;
let minedDeleted = 0;
for (const hash of txHashes) {
const key = hash.toString();
void this.#txs.delete(key);
if (this.#pendingTxs.has(key)) {
pendingDeleted++;
void this.#pendingTxs.delete(key);
}
const tx = this.getTxByHash(hash);

if (tx) {
const fee = getPendingTxPriority(tx);
void this.#pendingTxPriorityToHash.deleteValue(fee, key);

const isMined = this.#minedTxHashToBlock.has(key);
if (isMined) {
minedDeleted++;
} else {
pendingDeleted++;
}

if (this.#minedTxs.has(key)) {
minedDeleted++;
void this.#minedTxs.delete(key);
void this.#txs.delete(key);
void this.#minedTxHashToBlock.delete(key);
}
}

Expand Down
6 changes: 5 additions & 1 deletion yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createLogger } from '@aztec/foundation/log';
import { type TelemetryClient } from '@aztec/telemetry-client';

import { PoolInstrumentation, PoolName } from '../instrumentation.js';
import { getPendingTxPriority } from './priority.js';
import { type TxPool } from './tx_pool.js';

/**
Expand Down Expand Up @@ -68,7 +69,10 @@ export class InMemoryTxPool implements TxPool {
}

public getPendingTxHashes(): TxHash[] {
return Array.from(this.pendingTxs).map(x => TxHash.fromBigInt(x));
return this.getAllTxs()
.sort((tx1, tx2) => -getPendingTxPriority(tx1).localeCompare(getPendingTxPriority(tx2)))
.map(tx => tx.getTxHash())
.filter(txHash => this.pendingTxs.has(txHash.toBigInt()));
}

public getMinedTxHashes(): [TxHash, number][] {
Expand Down
13 changes: 13 additions & 0 deletions yarn-project/p2p/src/mem_pools/tx_pool/priority.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { type Tx } from '@aztec/circuit-types';
import { Buffer32 } from '@aztec/foundation/buffer';

/**
* Returns a string representing the priority of a tx.
* Txs with a higher priority value are returned first when retrieving pending tx hashes.
* We currently use the sum of the priority fees for the tx for this value, represented as hex.
*/
export function getPendingTxPriority(tx: Tx): string {
const priorityFees = tx.getGasSettings().maxPriorityFeesPerGas;
const totalFees = priorityFees.feePerDaGas.toBigInt() + priorityFees.feePerL2Gas.toBigInt();
return Buffer32.fromBigInt(totalFees).toString();
}
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export interface TxPool {
getAllTxHashes(): TxHash[];

/**
* Gets the hashes of pending transactions currently in the tx pool.
* Gets the hashes of pending transactions currently in the tx pool sorted by priority (see getPendingTxPriority).
* @returns An array of pending transaction hashes found in the tx pool.
*/
getPendingTxHashes(): TxHash[];
Expand Down
22 changes: 21 additions & 1 deletion yarn-project/p2p/src/mem_pools/tx_pool/tx_pool_test_suite.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { mockTx } from '@aztec/circuit-types';
import { type Tx, mockTx } from '@aztec/circuit-types';
import { GasFees } from '@aztec/circuits.js';
import { unfreeze } from '@aztec/foundation/types';

import { type TxPool } from './tx_pool.js';

Expand Down Expand Up @@ -101,4 +103,22 @@ export function describeTxPool(getTxPool: () => TxPool) {
expect(poolTxHashes).toHaveLength(3);
expect(poolTxHashes).toEqual(expect.arrayContaining([tx1.getTxHash(), tx2.getTxHash(), tx3.getTxHash()]));
});

it('Returns pending tx hashes sorted by priority', async () => {
const withPriorityFee = (tx: Tx, fee: number) => {
unfreeze(tx.data.constants.txContext.gasSettings).maxPriorityFeesPerGas = new GasFees(fee, fee);
return tx;
};

const tx1 = withPriorityFee(mockTx(0), 1000);
const tx2 = withPriorityFee(mockTx(1), 100);
const tx3 = withPriorityFee(mockTx(2), 200);
const tx4 = withPriorityFee(mockTx(3), 3000);

await pool.addTxs([tx1, tx2, tx3, tx4]);

const poolTxHashes = pool.getPendingTxHashes();
expect(poolTxHashes).toHaveLength(4);
expect(poolTxHashes).toEqual([tx4, tx1, tx3, tx2].map(tx => tx.getTxHash()));
});
}

0 comments on commit def7cd7

Please sign in to comment.