diff --git a/docker-compose.provernet.yml b/docker-compose.provernet.yml index 58bb54087c4..57d7d374f35 100644 --- a/docker-compose.provernet.yml +++ b/docker-compose.provernet.yml @@ -136,6 +136,8 @@ services: BOT_PUBLIC_TRANSFERS_PER_TX: 0 BOT_NO_WAIT_FOR_TRANSFERS: true BOT_NO_START: false + BOT_MAX_CONSECUTIVE_ERRORS: 3 + BOT_STOP_WHEN_UNHEALTHY: true PXE_PROVER_ENABLED: "${PROVER_REAL_PROOFS:-false}" PROVER_REAL_PROOFS: "${PROVER_REAL_PROOFS:-false}" BB_SKIP_CLEANUP: "${BB_SKIP_CLEANUP:-0}" # Persist tmp dirs for debugging @@ -150,7 +152,7 @@ services: test: [ "CMD", "curl", "-fSs", "http://127.0.0.1:80/status" ] interval: 3s timeout: 30s - start_period: 10s + start_period: 90s restart: on-failure:5 command: [ "start", "--bot", "--pxe" ] diff --git a/yarn-project/archiver/src/archiver/archiver.ts b/yarn-project/archiver/src/archiver/archiver.ts index 8e6523c3283..fab3955bf14 100644 --- a/yarn-project/archiver/src/archiver/archiver.ts +++ b/yarn-project/archiver/src/archiver/archiver.ts @@ -343,7 +343,7 @@ export class Archiver implements ArchiveSource { localBlockForDestinationProvenBlockNumber && provenArchive === localBlockForDestinationProvenBlockNumber.archive.root.toString() ) { - this.log.info(`Updating the proven block number to ${provenBlockNumber} and epoch to ${provenEpochNumber}`); + this.log.verbose(`Updating the proven block number to ${provenBlockNumber} and epoch to ${provenEpochNumber}`); await this.store.setProvenL2BlockNumber(Number(provenBlockNumber)); // if we are here then we must have a valid proven epoch number await this.store.setProvenL2EpochNumber(Number(provenEpochNumber)); diff --git a/yarn-project/archiver/src/test/mock_l2_block_source.ts b/yarn-project/archiver/src/test/mock_l2_block_source.ts index 5921831cb2d..dc85bdfdc66 100644 --- a/yarn-project/archiver/src/test/mock_l2_block_source.ts +++ b/yarn-project/archiver/src/test/mock_l2_block_source.ts @@ -67,8 +67,8 @@ export class MockL2BlockSource implements L2BlockSource { return Promise.resolve(this.l2Blocks.length); } - public async getProvenBlockNumber(): Promise { - return this.provenBlockNumber ?? (await this.getBlockNumber()); + public getProvenBlockNumber(): Promise { + return Promise.resolve(this.provenBlockNumber); } public getProvenL2EpochNumber(): Promise { diff --git a/yarn-project/aztec/src/cli/cli.ts b/yarn-project/aztec/src/cli/cli.ts index 7dd198383ec..24251b0165e 100644 --- a/yarn-project/aztec/src/cli/cli.ts +++ b/yarn-project/aztec/src/cli/cli.ts @@ -117,7 +117,7 @@ export function injectAztecCommands(program: Command, userLog: LogFn, debugLogge const app = rpcServer.getApp(options.apiPrefix); // add status route - const statusRouter = createStatusRouter(options.apiPrefix); + const statusRouter = createStatusRouter(() => rpcServer.isHealthy(), options.apiPrefix); app.use(statusRouter.routes()).use(statusRouter.allowedMethods()); const httpServer = http.createServer(app.callback()); diff --git a/yarn-project/aztec/src/cli/cmds/start_txe.ts b/yarn-project/aztec/src/cli/cmds/start_txe.ts index e1acbfde924..64198b580a4 100644 --- a/yarn-project/aztec/src/cli/cmds/start_txe.ts +++ b/yarn-project/aztec/src/cli/cmds/start_txe.ts @@ -9,7 +9,7 @@ export const startTXE = (options: any, debugLogger: DebugLogger) => { const txeServer = createTXERpcServer(debugLogger); const app = txeServer.getApp(); // add status route - const statusRouter = createStatusRouter(); + const statusRouter = createStatusRouter(() => txeServer.isHealthy()); app.use(statusRouter.routes()).use(statusRouter.allowedMethods()); const httpServer = http.createServer(app.callback()); diff --git a/yarn-project/bot/src/config.ts b/yarn-project/bot/src/config.ts index c143a851307..a4f36f0ab8a 100644 --- a/yarn-project/bot/src/config.ts +++ b/yarn-project/bot/src/config.ts @@ -53,6 +53,10 @@ export type BotConfig = { daGasLimit: number | undefined; /** Token contract to use */ contract: SupportedTokenContracts; + /** The maximum number of consecutive errors before the bot shuts down */ + maxConsecutiveErrors: number; + /** Stops the bot if service becomes unhealthy */ + stopWhenUnhealthy: boolean; }; export const botConfigMappings: ConfigMappingsType = { @@ -164,6 +168,16 @@ export const botConfigMappings: ConfigMappingsType = { return val as SupportedTokenContracts; }, }, + maxConsecutiveErrors: { + env: 'BOT_MAX_CONSECUTIVE_ERRORS', + description: 'The maximum number of consecutive errors before the bot shuts down', + ...numberConfigHelper(0), + }, + stopWhenUnhealthy: { + env: 'BOT_STOP_WHEN_UNHEALTHY', + description: 'Stops the bot if service becomes unhealthy', + ...booleanConfigHelper(false), + }, }; export function getBotConfigFromEnv(): BotConfig { diff --git a/yarn-project/bot/src/rpc.ts b/yarn-project/bot/src/rpc.ts index 32487d667c1..5c2cd6c5634 100644 --- a/yarn-project/bot/src/rpc.ts +++ b/yarn-project/bot/src/rpc.ts @@ -12,5 +12,5 @@ import { type BotRunner } from './runner.js'; * @returns An JSON-RPC HTTP server */ export function createBotRunnerRpcServer(botRunner: BotRunner) { - return new JsonRpcServer(botRunner, { AztecAddress, EthAddress, Fr, TxHash }, {}, []); + return new JsonRpcServer(botRunner, { AztecAddress, EthAddress, Fr, TxHash }, {}, [], () => botRunner.isHealthy()); } diff --git a/yarn-project/bot/src/runner.ts b/yarn-project/bot/src/runner.ts index 7810a76837f..54b0e853b1e 100644 --- a/yarn-project/bot/src/runner.ts +++ b/yarn-project/bot/src/runner.ts @@ -10,6 +10,8 @@ export class BotRunner { private pxe?: PXE; private node: AztecNode; private runningPromise: RunningPromise; + private consecutiveErrors = 0; + private healthy = true; public constructor(private config: BotConfig, dependencies: { pxe?: PXE; node?: AztecNode }) { this.pxe = dependencies.pxe; @@ -52,6 +54,10 @@ export class BotRunner { this.log.info(`Stopped bot`); } + public isHealthy() { + return this.runningPromise.isRunning() && this.healthy; + } + /** Returns whether the bot is running. */ public isRunning() { return this.runningPromise.isRunning(); @@ -96,8 +102,10 @@ export class BotRunner { try { await bot.run(); + this.consecutiveErrors = 0; } catch (err) { - this.log.error(`Error running bot: ${err}`); + this.consecutiveErrors += 1; + this.log.error(`Error running bot consecutiveCount=${this.consecutiveErrors}: ${err}`); throw err; } } @@ -130,6 +138,15 @@ export class BotRunner { await this.run(); } catch (err) { // Already logged in run() + if (this.config.maxConsecutiveErrors > 0 && this.consecutiveErrors >= this.config.maxConsecutiveErrors) { + this.log.error(`Too many errors bot is unhealthy`); + this.healthy = false; + } + } + + if (!this.healthy && this.config.stopWhenUnhealthy) { + this.log.error(`Stopping bot due to errors`); + process.exit(1); // workaround docker not restarting the container if its unhealthy. We have to exit instead } } } diff --git a/yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.ts b/yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.ts index e8ceac709c5..7a1d179dd16 100644 --- a/yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.ts +++ b/yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.ts @@ -74,7 +74,7 @@ export class L2BlockStream { while (latestBlockNumber < sourceTips.latest.number) { const from = latestBlockNumber + 1; const limit = Math.min(this.opts.batchSize ?? 20, sourceTips.latest.number - from + 1); - this.log.debug(`Requesting blocks from ${from} limit ${limit}`); + this.log.debug(`Requesting blocks from ${from} limit ${limit} proven=${this.opts.proven}`); const blocks = await this.l2BlockSource.getBlocks(from, limit, this.opts.proven); if (blocks.length === 0) { break; diff --git a/yarn-project/end-to-end/src/e2e_block_building.test.ts b/yarn-project/end-to-end/src/e2e_block_building.test.ts index 8fd6fc85953..9b372ce0af6 100644 --- a/yarn-project/end-to-end/src/e2e_block_building.test.ts +++ b/yarn-project/end-to-end/src/e2e_block_building.test.ts @@ -11,6 +11,7 @@ import { Fr, L1NotePayload, type PXE, + TxStatus, type Wallet, deriveKeys, retryUntil, @@ -442,7 +443,7 @@ describe('e2e_block_building', () => { expect(tx1.blockNumber).toEqual(initialBlockNumber + 1); expect(await contract.methods.get_public_value(ownerAddress).simulate()).toEqual(20n); - // Now move to a new epoch and past the proof claim window + // Now move to a new epoch and past the proof claim window to cause a reorg logger.info('Advancing past the proof claim window'); await cheatCodes.rollup.advanceToNextEpoch(); await cheatCodes.rollup.advanceSlots(AZTEC_EPOCH_PROOF_CLAIM_WINDOW_IN_L2_SLOTS + 1); // off-by-one? @@ -450,13 +451,25 @@ describe('e2e_block_building', () => { // Wait a bit before spawning a new pxe await sleep(2000); + // tx1 is valid because it was build against a proven block number + // the sequencer will bring it back on chain + await retryUntil( + async () => (await aztecNode.getTxReceipt(tx1.txHash)).status === TxStatus.SUCCESS, + 'wait for re-inclusion', + 60, + 1, + ); + + const newTx1Receipt = await aztecNode.getTxReceipt(tx1.txHash); + expect(newTx1Receipt.blockNumber).toEqual(tx1.blockNumber); + expect(newTx1Receipt.blockHash).not.toEqual(tx1.blockHash); + // Send another tx which should be mined a block that is built on the reorg'd chain // We need to send it from a new pxe since pxe doesn't detect reorgs (yet) logger.info(`Creating new PXE service`); const pxeServiceConfig = { ...getPXEServiceConfig() }; const newPxe = await createPXEService(aztecNode, pxeServiceConfig); const newWallet = await createAccount(newPxe); - expect(await pxe.getBlockNumber()).toEqual(initialBlockNumber + 1); // TODO: Contract.at should automatically register the instance in the pxe logger.info(`Registering contract at ${contract.address} in new pxe`); @@ -465,8 +478,8 @@ describe('e2e_block_building', () => { logger.info('Sending new tx on reorgd chain'); const tx2 = await contractFromNewPxe.methods.increment_public_value(ownerAddress, 10).send().wait(); - expect(await contractFromNewPxe.methods.get_public_value(ownerAddress).simulate()).toEqual(10n); - expect(tx2.blockNumber).toEqual(initialBlockNumber + 2); + expect(await contractFromNewPxe.methods.get_public_value(ownerAddress).simulate()).toEqual(30n); + expect(tx2.blockNumber).toEqual(initialBlockNumber + 3); }); }); }); diff --git a/yarn-project/end-to-end/src/prover-coordination/e2e_prover_coordination.test.ts b/yarn-project/end-to-end/src/prover-coordination/e2e_prover_coordination.test.ts index 11a2dc5c075..f5b15cfb4e5 100644 --- a/yarn-project/end-to-end/src/prover-coordination/e2e_prover_coordination.test.ts +++ b/yarn-project/end-to-end/src/prover-coordination/e2e_prover_coordination.test.ts @@ -5,6 +5,7 @@ import { type DebugLogger, EpochProofQuote, EpochProofQuotePayload, + TxStatus, createDebugLogger, sleep, } from '@aztec/aztec.js'; @@ -387,12 +388,12 @@ describe('e2e_prover_coordination', () => { // Progress epochs with a block in each until we hit a reorg // Note tips are block numbers, not slots await expectTips({ pending: 3n, proven: 3n }); - await contract.methods.create_note(recipient, recipient, 10).send().wait(); + const tx2BeforeReorg = await contract.methods.create_note(recipient, recipient, 10).send().wait(); await expectTips({ pending: 4n, proven: 3n }); // Go to epoch 3 await advanceToNextEpoch(); - await contract.methods.create_note(recipient, recipient, 10).send().wait(); + const tx3BeforeReorg = await contract.methods.create_note(recipient, recipient, 10).send().wait(); await expectTips({ pending: 5n, proven: 3n }); // Go to epoch 4 !!! REORG !!! ay caramba !!! @@ -401,15 +402,32 @@ describe('e2e_prover_coordination', () => { // Wait a bit for the sequencer / node to notice a re-org await sleep(2000); + // the sequencer will add valid txs again but in a new block + const tx2AfterReorg = await ctx.aztecNode.getTxReceipt(tx2BeforeReorg.txHash); + const tx3AfterReorg = await ctx.aztecNode.getTxReceipt(tx3BeforeReorg.txHash); + + // the tx from epoch 2 is still valid since it references a proven block + // this will be added back onto the chain + expect(tx2AfterReorg.status).toEqual(TxStatus.SUCCESS); + expect(tx2AfterReorg.blockNumber).toEqual(tx2BeforeReorg.blockNumber); + expect(tx2AfterReorg.blockHash).not.toEqual(tx2BeforeReorg.blockHash); + + // the tx from epoch 3 is not valid anymore, since it was built against a reorged block + // should be dropped + expect(tx3AfterReorg.status).toEqual(TxStatus.DROPPED); + // new pxe, as it does not support reorgs const pxeServiceConfig = { ...getPXEServiceConfig() }; const newPxe = await createPXEService(ctx.aztecNode, pxeServiceConfig); const newWallet = await createAccount(newPxe); const newWalletAddress = newWallet.getAddress(); - // The chain will re-org back to block 3, but creating a new account will produce a block, so we expect - // 4 blocks in the pending chain here! - await expectTips({ pending: 4n, proven: 3n }); + // The chain will prune back to block 3 + // then include the txs from the pruned epochs that are still valid + // bringing us back to block 4 (same number, different hash) + // creating a new account will produce another block + // so we expect 5 blocks in the pending chain here! + await expectTips({ pending: 5n, proven: 3n }); // Submit proof claim for the new epoch const quoteForEpoch4 = await makeEpochProofQuote({ @@ -427,7 +445,7 @@ describe('e2e_prover_coordination', () => { logger.info('Sending new tx on reorged chain'); await contractFromNewPxe.methods.create_note(newWalletAddress, newWalletAddress, 10).send().wait(); - await expectTips({ pending: 5n, proven: 3n }); + await expectTips({ pending: 6n, proven: 3n }); // Expect the proof claim to be accepted for the chain after the reorg await expectProofClaimOnL1({ ...quoteForEpoch4.payload, proposer: publisherAddress }); diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 06a7745487b..6913e02ec13 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -33,6 +33,8 @@ export type EnvVar = | 'BOT_TOKEN_SALT' | 'BOT_TX_INTERVAL_SECONDS' | 'BOT_TX_MINED_WAIT_SECONDS' + | 'BOT_MAX_CONSECUTIVE_ERRORS' + | 'BOT_STOP_WHEN_UNHEALTHY' | 'COINBASE' | 'DATA_DIRECTORY' | 'DEBUG' @@ -61,6 +63,7 @@ export type EnvVar = | 'OTEL_SERVICE_NAME' | 'OUTBOX_CONTRACT_ADDRESS' | 'P2P_BLOCK_CHECK_INTERVAL_MS' + | 'P2P_BLOCK_REQUEST_BATCH_SIZE' | 'P2P_ENABLED' | 'P2P_GOSSIPSUB_D' | 'P2P_GOSSIPSUB_DHI' diff --git a/yarn-project/foundation/src/json-rpc/server/json_rpc_server.ts b/yarn-project/foundation/src/json-rpc/server/json_rpc_server.ts index 178d1c5bdc3..6d6833e00bd 100644 --- a/yarn-project/foundation/src/json-rpc/server/json_rpc_server.ts +++ b/yarn-project/foundation/src/json-rpc/server/json_rpc_server.ts @@ -33,11 +33,16 @@ export class JsonRpcServer { private objectClassMap: JsonClassConverterInput, /** List of methods to disallow from calling remotely */ public readonly disallowedMethods: string[] = [], + private healthCheck: StatusCheckFn = () => true, private log = createDebugLogger('json-rpc:server'), ) { this.proxy = new JsonProxy(handler, stringClassMap, objectClassMap); } + public isHealthy(): boolean | Promise { + return this.healthCheck(); + } + /** * Get an express app object. * @param prefix - Our server prefix. @@ -205,15 +210,25 @@ export class JsonRpcServer { } } +export type StatusCheckFn = () => boolean | Promise; + /** * Creates a router for handling a plain status request that will return 200 status when running. + * @param getCurrentStatus - List of health check functions to run. * @param apiPrefix - The prefix to use for all api requests * @returns - The router for handling status requests. */ -export function createStatusRouter(apiPrefix = '') { +export function createStatusRouter(getCurrentStatus: StatusCheckFn, apiPrefix = '') { const router = new Router({ prefix: `${apiPrefix}` }); - router.get('/status', (ctx: Koa.Context) => { - ctx.status = 200; + router.get('/status', async (ctx: Koa.Context) => { + let ok: boolean; + try { + ok = (await getCurrentStatus()) === true; + } catch (err) { + ok = false; + } + + ctx.status = ok ? 200 : 500; }); return router; } @@ -296,5 +311,22 @@ export function createNamespacedJsonRpcServer( { stringClassMap: {}, objectClassMap: {} } as ClassMaps, ); - return new JsonRpcServer(Object.create(handler), classMaps.stringClassMap, classMaps.objectClassMap, [], log); + const aggregateHealthCheck = async () => { + const statuses = await Promise.allSettled( + servers.flatMap(services => + Object.entries(services).map(async ([name, service]) => ({ name, healthy: await service.isHealthy() })), + ), + ); + const allHealthy = statuses.every(result => result.status === 'fulfilled' && result.value.healthy); + return allHealthy; + }; + + return new JsonRpcServer( + Object.create(handler), + classMaps.stringClassMap, + classMaps.objectClassMap, + [], + aggregateHealthCheck, + log, + ); } diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index 8c9cd44e0d6..c19fd464dcc 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -1,6 +1,8 @@ import { MockL2BlockSource } from '@aztec/archiver/test'; -import { mockEpochProofQuote, mockTx } from '@aztec/circuit-types'; +import { L2Block, mockEpochProofQuote, mockTx } from '@aztec/circuit-types'; +import { Fr } from '@aztec/circuits.js'; import { retryUntil } from '@aztec/foundation/retry'; +import { sleep } from '@aztec/foundation/sleep'; import { type AztecKVStore } from '@aztec/kv-store'; import { openTmpStore } from '@aztec/kv-store/utils'; import { type TelemetryClient } from '@aztec/telemetry-client'; @@ -43,6 +45,7 @@ describe('In-Memory P2P Client', () => { getPendingTxHashes: jest.fn().mockReturnValue([]), getTxStatus: jest.fn().mockReturnValue(undefined), markAsMined: jest.fn(), + markMinedAsPending: jest.fn(), }; p2pService = { @@ -236,5 +239,95 @@ describe('In-Memory P2P Client', () => { expect(epochProofQuotePool.deleteQuotesToEpoch).toBeCalledWith(3n); }); + describe('Chain prunes', () => { + it('moves the tips on a chain reorg', async () => { + blockSource.setProvenBlockNumber(0); + await client.start(); + + await advanceToProvenBlock(90); + + await expect(client.getL2Tips()).resolves.toEqual({ + latest: { number: 100, hash: expect.any(String) }, + proven: { number: 90, hash: expect.any(String) }, + finalized: { number: 90, hash: expect.any(String) }, + }); + + blockSource.removeBlocks(10); + + // give the client a chance to react to the reorg + await sleep(100); + + await expect(client.getL2Tips()).resolves.toEqual({ + latest: { number: 90, hash: expect.any(String) }, + proven: { number: 90, hash: expect.any(String) }, + finalized: { number: 90, hash: expect.any(String) }, + }); + + blockSource.addBlocks([L2Block.random(91), L2Block.random(92)]); + + // give the client a chance to react to the new blocks + await sleep(100); + + await expect(client.getL2Tips()).resolves.toEqual({ + latest: { number: 92, hash: expect.any(String) }, + proven: { number: 90, hash: expect.any(String) }, + finalized: { number: 90, hash: expect.any(String) }, + }); + }); + + it('deletes txs created from a pruned block', async () => { + client = new P2PClient(kvStore, blockSource, mempools, p2pService, 10, telemetryClient); + blockSource.setProvenBlockNumber(0); + await client.start(); + + // add two txs to the pool. One build against block 90, one against block 95 + // then prune the chain back to block 90 + // only one tx should be deleted + const goodTx = mockTx(); + goodTx.data.constants.historicalHeader.globalVariables.blockNumber = new Fr(90); + + const badTx = mockTx(); + badTx.data.constants.historicalHeader.globalVariables.blockNumber = new Fr(95); + + txPool.getAllTxs.mockReturnValue([goodTx, badTx]); + + blockSource.removeBlocks(10); + await sleep(150); + expect(txPool.deleteTxs).toHaveBeenCalledWith([badTx.getTxHash()]); + await client.stop(); + }); + + it('moves mined and valid txs back to the pending set', async () => { + client = new P2PClient(kvStore, blockSource, mempools, p2pService, 10, telemetryClient); + blockSource.setProvenBlockNumber(0); + await client.start(); + + // add three txs to the pool built against different blocks + // then prune the chain back to block 90 + // only one tx should be deleted + const goodButOldTx = mockTx(); + goodButOldTx.data.constants.historicalHeader.globalVariables.blockNumber = new Fr(89); + + const goodTx = mockTx(); + goodTx.data.constants.historicalHeader.globalVariables.blockNumber = new Fr(90); + + const badTx = mockTx(); + badTx.data.constants.historicalHeader.globalVariables.blockNumber = new Fr(95); + + txPool.getAllTxs.mockReturnValue([goodButOldTx, goodTx, badTx]); + txPool.getMinedTxHashes.mockReturnValue([ + [goodButOldTx.getTxHash(), 90], + [goodTx.getTxHash(), 91], + ]); + + blockSource.removeBlocks(10); + await sleep(150); + expect(txPool.deleteTxs).toHaveBeenCalledWith([badTx.getTxHash()]); + await sleep(150); + expect(txPool.markMinedAsPending).toHaveBeenCalledWith([goodTx.getTxHash()]); + await client.stop(); + }); + }); + // TODO(https://github.com/AztecProtocol/aztec-packages/issues/7971): tests for attestation pool pruning }); diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index f47d6e36140..a8bd5954eb4 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -3,15 +3,17 @@ import { type BlockProposal, type EpochProofQuote, type L2Block, - L2BlockDownloader, type L2BlockId, type L2BlockSource, + L2BlockStream, + type L2BlockStreamEvent, + type L2Tips, type Tx, type TxHash, } from '@aztec/circuit-types'; import { INITIAL_L2_BLOCK_NUM } from '@aztec/circuits.js/constants'; import { createDebugLogger } from '@aztec/foundation/log'; -import { type AztecKVStore, type AztecSingleton } from '@aztec/kv-store'; +import { type AztecKVStore, type AztecMap, type AztecSingleton } from '@aztec/kv-store'; import { Attributes, type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client'; import { type ENR } from '@chainsafe/enr'; @@ -179,12 +181,6 @@ export interface P2P { * The P2P client implementation. */ export class P2PClient extends WithTracer implements P2P { - /** L2 block download to stay in sync with latest blocks. */ - private latestBlockDownloader: L2BlockDownloader; - - /** L2 block download to stay in sync with proven blocks. */ - private provenBlockDownloader: L2BlockDownloader; - /** Property that indicates whether the client is running. */ private stopping = false; @@ -197,6 +193,7 @@ export class P2PClient extends WithTracer implements P2P { private latestBlockNumberAtStart = -1; private provenBlockNumberAtStart = -1; + private synchedBlockHashes: AztecMap; private synchedLatestBlockNumber: AztecSingleton; private synchedProvenBlockNumber: AztecSingleton; @@ -204,6 +201,8 @@ export class P2PClient extends WithTracer implements P2P { private attestationPool: AttestationPool; private epochProofQuotePool: EpochProofQuotePool; + private blockStream; + /** * In-memory P2P client constructor. * @param store - The client's instance of the KV store. @@ -224,14 +223,14 @@ export class P2PClient extends WithTracer implements P2P { ) { super(telemetryClient, 'P2PClient'); - const { blockCheckIntervalMS: checkInterval, l2QueueSize: p2pL2QueueSize } = getP2PConfigFromEnv(); - const l2DownloaderOpts = { maxQueueSize: p2pL2QueueSize, pollIntervalMS: checkInterval }; - // TODO(palla/prover-node): This effectively downloads blocks twice from the archiver, which is an issue - // if the archiver is remote. We should refactor this so the downloader keeps a single queue and handles - // latest/proven metadata, as well as block reorgs. - this.latestBlockDownloader = new L2BlockDownloader(l2BlockSource, l2DownloaderOpts); - this.provenBlockDownloader = new L2BlockDownloader(l2BlockSource, { ...l2DownloaderOpts, proven: true }); + const { blockCheckIntervalMS, blockRequestBatchSize } = getP2PConfigFromEnv(); + this.blockStream = new L2BlockStream(l2BlockSource, this, this, { + batchSize: blockRequestBatchSize, + pollIntervalMS: blockCheckIntervalMS, + }); + + this.synchedBlockHashes = store.openMap('p2p_pool_block_hashes'); this.synchedLatestBlockNumber = store.openSingleton('p2p_pool_last_l2_block'); this.synchedProvenBlockNumber = store.openSingleton('p2p_pool_last_proven_l2_block'); @@ -240,6 +239,64 @@ export class P2PClient extends WithTracer implements P2P { this.epochProofQuotePool = mempools.epochProofQuotePool; } + public getL2BlockHash(number: number): Promise { + return Promise.resolve(this.synchedBlockHashes.get(number)); + } + + public getL2Tips(): Promise { + const latestBlockNumber = this.getSyncedLatestBlockNum(); + let latestBlockHash: string | undefined; + const provenBlockNumber = this.getSyncedProvenBlockNum(); + let provenBlockHash: string | undefined; + + if (latestBlockNumber > 0) { + latestBlockHash = this.synchedBlockHashes.get(latestBlockNumber); + if (typeof latestBlockHash === 'undefined') { + this.log.warn(`Block hash for latest block ${latestBlockNumber} not found`); + throw new Error(); + } + } + + if (provenBlockNumber > 0) { + provenBlockHash = this.synchedBlockHashes.get(provenBlockNumber); + if (typeof provenBlockHash === 'undefined') { + this.log.warn(`Block hash for proven block ${provenBlockNumber} not found`); + throw new Error(); + } + } + + return Promise.resolve({ + latest: { hash: latestBlockHash!, number: latestBlockNumber }, + proven: { hash: provenBlockHash!, number: provenBlockNumber }, + finalized: { hash: provenBlockHash!, number: provenBlockNumber }, + }); + } + + public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise { + this.log.debug(`Handling block stream event ${event.type}`); + switch (event.type) { + case 'blocks-added': + await this.handleLatestL2Blocks(event.blocks); + break; + case 'chain-finalized': + // TODO (alexg): I think we can prune the block hashes map here + break; + case 'chain-proven': { + const from = this.getSyncedProvenBlockNum() + 1; + const limit = event.blockNumber - from + 1; + await this.handleProvenL2Blocks(await this.l2BlockSource.getBlocks(from, limit)); + break; + } + case 'chain-pruned': + await this.handlePruneL2Blocks(event.blockNumber); + break; + default: { + const _: never = event; + break; + } + } + } + #assertIsReady() { // this.log.info('Checking if p2p client is ready, current state: ', this.currentState); if (!this.isReady()) { @@ -304,21 +361,7 @@ export class P2PClient extends WithTracer implements P2P { // publish any txs in TxPool after its doing initial sync this.syncPromise = this.syncPromise.then(() => this.publishStoredTxs()); - // start looking for further blocks - const processLatest = async () => { - while (!this.stopping) { - await this.latestBlockDownloader.getBlocks(1).then(this.handleLatestL2Blocks.bind(this)); - } - }; - const processProven = async () => { - while (!this.stopping) { - await this.provenBlockDownloader.getBlocks(1).then(this.handleProvenL2Blocks.bind(this)); - } - }; - - this.runningPromise = Promise.all([processLatest(), processProven()]).then(() => {}); - this.latestBlockDownloader.start(syncedLatestBlock); - this.provenBlockDownloader.start(syncedLatestBlock); + this.blockStream.start(); this.log.verbose(`Started block downloader from block ${syncedLatestBlock}`); return this.syncPromise; @@ -333,8 +376,7 @@ export class P2PClient extends WithTracer implements P2P { this.stopping = true; await this.p2pService.stop(); this.log.debug('Stopped p2p service'); - await this.latestBlockDownloader.stop(); - await this.provenBlockDownloader.stop(); + await this.blockStream.stop(); this.log.debug('Stopped block downloader'); await this.runningPromise; this.setCurrentState(P2PClientState.STOPPED); @@ -406,7 +448,7 @@ export class P2PClient extends WithTracer implements P2P { } else if (filter === 'mined') { return this.txPool .getMinedTxHashes() - .map(txHash => this.txPool.getTxByHash(txHash)) + .map(([txHash]) => this.txPool.getTxByHash(txHash)) .filter((tx): tx is Tx => !!tx); } else if (filter === 'pending') { return this.txPool @@ -525,7 +567,7 @@ export class P2PClient extends WithTracer implements P2P { private async markTxsAsMinedFromBlocks(blocks: L2Block[]): Promise { for (const block of blocks) { const txHashes = block.body.txEffects.map(txEffect => txEffect.txHash); - await this.txPool.markAsMined(txHashes); + await this.txPool.markAsMined(txHashes, block.number); } } @@ -551,8 +593,10 @@ export class P2PClient extends WithTracer implements P2P { if (!blocks.length) { return Promise.resolve(); } + await this.markTxsAsMinedFromBlocks(blocks); const lastBlockNum = blocks[blocks.length - 1].number; + await Promise.all(blocks.map(block => this.synchedBlockHashes.set(block.number, block.hash().toString()))); await this.synchedLatestBlockNumber.set(lastBlockNum); this.log.debug(`Synched to latest block ${lastBlockNum}`); await this.startServiceIfSynched(); @@ -590,6 +634,46 @@ export class P2PClient extends WithTracer implements P2P { await this.startServiceIfSynched(); } + /** + * Updates the tx pool after a chain prune. + * @param latestBlock - The block number the chain was pruned to. + */ + private async handlePruneL2Blocks(latestBlock: number): Promise { + const txsToDelete: TxHash[] = []; + for (const tx of this.txPool.getAllTxs()) { + // every tx that's been generated against a block that has now been pruned is no longer valid + if (tx.data.constants.historicalHeader.globalVariables.blockNumber.toNumber() > latestBlock) { + txsToDelete.push(tx.getTxHash()); + } + } + + this.log.info( + `Detected chain prune. Removing invalid txs count=${ + txsToDelete.length + } newLatestBlock=${latestBlock} previousLatestBlock=${this.getSyncedLatestBlockNum()}`, + ); + + // delete invalid txs (both pending and mined) + await this.txPool.deleteTxs(txsToDelete); + + // everything left in the mined set was built against a block on the proven chain so its still valid + // move back to pending the txs that were reorged out of the chain + // NOTE: we can't move _all_ txs back to pending because the tx pool could keep hold of mined txs for longer + // (see this.keepProvenTxsFor) + const txsToMoveToPending: TxHash[] = []; + for (const [txHash, blockNumber] of this.txPool.getMinedTxHashes()) { + if (blockNumber > latestBlock) { + txsToMoveToPending.push(txHash); + } + } + + this.log.info(`Moving ${txsToMoveToPending.length} mined txs back to pending`); + await this.txPool.markMinedAsPending(txsToMoveToPending); + + await this.synchedLatestBlockNumber.set(latestBlock); + // no need to update block hashes, as they will be updated as new blocks are added + } + private async startServiceIfSynched() { if ( this.currentState === P2PClientState.SYNCHING && diff --git a/yarn-project/p2p/src/config.ts b/yarn-project/p2p/src/config.ts index 351e28de8b0..c98bc9d741a 100644 --- a/yarn-project/p2p/src/config.ts +++ b/yarn-project/p2p/src/config.ts @@ -23,6 +23,11 @@ export interface P2PConfig extends P2PReqRespConfig { */ blockCheckIntervalMS: number; + /** + * The number of blocks to fetch in a single batch. + */ + blockRequestBatchSize: number; + /** * The frequency in which to check for new peers. */ @@ -295,6 +300,11 @@ export const p2pConfigMappings: ConfigMappingsType = { description: 'The chain id of the L1 chain.', ...numberConfigHelper(31337), }, + blockRequestBatchSize: { + env: 'P2P_BLOCK_REQUEST_BATCH_SIZE', + description: 'The number of blocks to fetch in a single batch.', + ...numberConfigHelper(20), + }, ...p2pReqRespConfigMappings, }; diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts index 432eefd012a..04d931c4240 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts @@ -19,7 +19,7 @@ export class AztecKVTxPool implements TxPool { /** Index for pending txs. */ #pendingTxs: AztecSet; /** Index for mined txs. */ - #minedTxs: AztecSet; + #minedTxs: AztecMap; #log: Logger; @@ -32,7 +32,7 @@ export class AztecKVTxPool implements TxPool { */ constructor(store: AztecKVStore, telemetry: TelemetryClient, log = createDebugLogger('aztec:tx_pool')) { this.#txs = store.openMap('txs'); - this.#minedTxs = store.openSet('minedTxs'); + this.#minedTxs = store.openMap('minedTxs'); this.#pendingTxs = store.openSet('pendingTxs'); this.#store = store; @@ -40,12 +40,12 @@ export class AztecKVTxPool implements TxPool { this.#metrics = new PoolInstrumentation(telemetry, 'AztecKVTxPool'); } - public markAsMined(txHashes: TxHash[]): Promise { + public markAsMined(txHashes: TxHash[], blockNumber: number): Promise { return this.#store.transaction(() => { let deleted = 0; for (const hash of txHashes) { const key = hash.toString(); - void this.#minedTxs.add(key); + void this.#minedTxs.set(key, blockNumber); if (this.#pendingTxs.has(key)) { deleted++; void this.#pendingTxs.delete(key); @@ -56,12 +56,41 @@ export class AztecKVTxPool implements TxPool { }); } + public markMinedAsPending(txHashes: TxHash[]): Promise { + if (txHashes.length === 0) { + return Promise.resolve(); + } + + return this.#store.transaction(() => { + let deleted = 0; + let added = 0; + for (const hash of txHashes) { + const key = hash.toString(); + if (this.#minedTxs.has(key)) { + deleted++; + void this.#minedTxs.delete(key); + } + + if (this.#txs.has(key)) { + added++; + void this.#pendingTxs.add(key); + } + } + + this.#metrics.recordRemovedObjects(deleted, 'mined'); + this.#metrics.recordAddedObjects(added, 'pending'); + }); + } + public getPendingTxHashes(): TxHash[] { return Array.from(this.#pendingTxs.entries()).map(x => TxHash.fromString(x)); } - public getMinedTxHashes(): TxHash[] { - return Array.from(this.#minedTxs.entries()).map(x => TxHash.fromString(x)); + public getMinedTxHashes(): [TxHash, number][] { + return Array.from(this.#minedTxs.entries()).map(([txHash, blockNumber]) => [ + TxHash.fromString(txHash), + blockNumber, + ]); } public getTxStatus(txHash: TxHash): 'pending' | 'mined' | undefined { diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts index 9e6d72ea5a4..f7d6b59fea4 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts @@ -14,7 +14,7 @@ export class InMemoryTxPool implements TxPool { * Our tx pool, stored as a Map in-memory, with K: tx hash and V: the transaction. */ private txs: Map; - private minedTxs: Set; + private minedTxs: Map; private pendingTxs: Set; private metrics: PoolInstrumentation; @@ -25,15 +25,15 @@ export class InMemoryTxPool implements TxPool { */ constructor(telemetry: TelemetryClient, private log = createDebugLogger('aztec:tx_pool')) { this.txs = new Map(); - this.minedTxs = new Set(); + this.minedTxs = new Map(); this.pendingTxs = new Set(); this.metrics = new PoolInstrumentation(telemetry, 'InMemoryTxPool'); } - public markAsMined(txHashes: TxHash[]): Promise { + public markAsMined(txHashes: TxHash[], blockNumber: number): Promise { const keys = txHashes.map(x => x.toBigInt()); for (const key of keys) { - this.minedTxs.add(key); + this.minedTxs.set(key, blockNumber); this.pendingTxs.delete(key); } this.metrics.recordRemovedObjects(txHashes.length, 'pending'); @@ -41,12 +41,38 @@ export class InMemoryTxPool implements TxPool { return Promise.resolve(); } + public markMinedAsPending(txHashes: TxHash[]): Promise { + if (txHashes.length === 0) { + return Promise.resolve(); + } + + const keys = txHashes.map(x => x.toBigInt()); + let deleted = 0; + let added = 0; + for (const key of keys) { + if (this.minedTxs.delete(key)) { + deleted++; + } + + // only add back to the pending set if we have the tx object + if (this.txs.has(key)) { + added++; + this.pendingTxs.add(key); + } + } + + this.metrics.recordRemovedObjects(deleted, 'mined'); + this.metrics.recordAddedObjects(added, 'pending'); + + return Promise.resolve(); + } + public getPendingTxHashes(): TxHash[] { return Array.from(this.pendingTxs).map(x => TxHash.fromBigInt(x)); } - public getMinedTxHashes(): TxHash[] { - return Array.from(this.minedTxs).map(x => TxHash.fromBigInt(x)); + public getMinedTxHashes(): [TxHash, number][] { + return Array.from(this.minedTxs.entries()).map(([txHash, blockNumber]) => [TxHash.fromBigInt(txHash), blockNumber]); } public getTxStatus(txHash: TxHash): 'pending' | 'mined' | undefined { diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts index 4cf434bb3e0..01511951f8a 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts @@ -21,7 +21,14 @@ export interface TxPool { * Marks the set of txs as mined, as opposed to pending. * @param txHashes - Hashes of the txs to flag as mined. */ - markAsMined(txHashes: TxHash[]): Promise; + markAsMined(txHashes: TxHash[], blockNumber: number): Promise; + + /** + * Moves mined txs back to the pending set in the case of a reorg. + * Note: txs not known by this peer will be ignored. + * @param txHashes - Hashes of the txs to flag as pending. + */ + markMinedAsPending(txHashes: TxHash[]): Promise; /** * Deletes transactions from the pool. Tx hashes that are not present are ignored. @@ -51,7 +58,7 @@ export interface TxPool { * Gets the hashes of mined transactions currently in the tx pool. * @returns An array of mined transaction hashes found in the tx pool. */ - getMinedTxHashes(): TxHash[]; + getMinedTxHashes(): [tx: TxHash, blockNumber: number][]; /** * Returns whether the given tx hash is flagged as pending or mined. diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool_test_suite.ts b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool_test_suite.ts index 2c72c1afa80..35af12fbd68 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool_test_suite.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool_test_suite.ts @@ -38,14 +38,46 @@ export function describeTxPool(getTxPool: () => TxPool) { const tx2 = mockTx(2); await pool.addTxs([tx1, tx2]); - await pool.markAsMined([tx1.getTxHash()]); + await pool.markAsMined([tx1.getTxHash()], 1); expect(pool.getTxByHash(tx1.getTxHash())).toEqual(tx1); expect(pool.getTxStatus(tx1.getTxHash())).toEqual('mined'); - expect(pool.getMinedTxHashes()).toEqual([tx1.getTxHash()]); + expect(pool.getMinedTxHashes()).toEqual([[tx1.getTxHash(), 1]]); expect(pool.getPendingTxHashes()).toEqual([tx2.getTxHash()]); }); + it('Marks txs as pending after being mined', async () => { + const tx1 = mockTx(1); + const tx2 = mockTx(2); + + await pool.addTxs([tx1, tx2]); + await pool.markAsMined([tx1.getTxHash()], 1); + + await pool.markMinedAsPending([tx1.getTxHash()]); + expect(pool.getMinedTxHashes()).toEqual([]); + const pending = pool.getPendingTxHashes(); + expect(pending).toHaveLength(2); + expect(pending).toEqual(expect.arrayContaining([tx1.getTxHash(), tx2.getTxHash()])); + }); + + it('Only marks txs as pending if they are known', async () => { + const tx1 = mockTx(1); + // simulate a situation where not all peers have all the txs + const someTxHashThatThisPeerDidNotSee = mockTx(2).getTxHash(); + await pool.addTxs([tx1]); + // this peer knows that tx2 was mined, but it does not have the tx object + await pool.markAsMined([tx1.getTxHash(), someTxHashThatThisPeerDidNotSee], 1); + expect(pool.getMinedTxHashes()).toEqual([ + [tx1.getTxHash(), 1], + [someTxHashThatThisPeerDidNotSee, 1], + ]); + + // reorg: both txs should now become available again + await pool.markMinedAsPending([tx1.getTxHash(), someTxHashThatThisPeerDidNotSee]); + expect(pool.getMinedTxHashes()).toEqual([]); + expect(pool.getPendingTxHashes()).toEqual([tx1.getTxHash()]); // tx2 is not in the pool + }); + it('Returns all transactions in the pool', async () => { const tx1 = mockTx(1); const tx2 = mockTx(2); diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts b/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts index cb88b9a4ac5..3a1dc6f502e 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts @@ -57,6 +57,7 @@ const makeMockPools = () => { getPendingTxHashes: jest.fn().mockReturnValue([]), getTxStatus: jest.fn().mockReturnValue(undefined), markAsMined: jest.fn(), + markMinedAsPending: jest.fn(), }, attestationPool: { addAttestations: jest.fn(), diff --git a/yarn-project/package.json b/yarn-project/package.json index a0f0c56a374..a1547a4ccda 100644 --- a/yarn-project/package.json +++ b/yarn-project/package.json @@ -12,7 +12,7 @@ "format": "yarn prettier --cache -w .", "test": "FORCE_COLOR=true yarn workspaces foreach --exclude @aztec/aztec3-packages --exclude @aztec/end-to-end --exclude @aztec/prover-client -p -v run test && yarn workspaces foreach --include @aztec/end-to-end -p -v run test:unit", "build": "FORCE_COLOR=true yarn workspaces foreach --parallel --topological-dev --verbose --exclude @aztec/aztec3-packages --exclude @aztec/docs run build", - "build:fast": "cd foundation && yarn build && cd ../circuits.js && yarn build && cd ../l1-artifacts && yarn generate && cd .. && yarn generate && tsc -b", + "build:fast": "cd foundation && yarn build && cd ../l1-artifacts && yarn build && cd ../circuits.js && yarn build && cd .. && yarn generate && tsc -b", "build:dev": "./watch.sh", "generate": "FORCE_COLOR=true yarn workspaces foreach --parallel --topological-dev --verbose run generate", "clean": "yarn workspaces foreach -p -v run clean" diff --git a/yarn-project/sequencer-client/src/sequencer/sequencer.ts b/yarn-project/sequencer-client/src/sequencer/sequencer.ts index 8e3890b0699..21ae7179c6b 100644 --- a/yarn-project/sequencer-client/src/sequencer/sequencer.ts +++ b/yarn-project/sequencer-client/src/sequencer/sequencer.ts @@ -414,7 +414,11 @@ export class Sequencer { this.metrics.recordNewBlock(newGlobalVariables.blockNumber.toNumber(), validTxs.length); const workTimer = new Timer(); this.state = SequencerState.CREATING_BLOCK; - this.log.info(`Building block ${newGlobalVariables.blockNumber.toNumber()} with ${validTxs.length} transactions`); + this.log.info( + `Building blockNumber=${newGlobalVariables.blockNumber.toNumber()} txCount=${ + validTxs.length + } slotNumber=${newGlobalVariables.slotNumber.toNumber()}`, + ); // Get l1 to l2 messages from the contract this.log.debug('Requesting L1 to L2 messages from contract'); diff --git a/yarn-project/txe/src/bin/index.ts b/yarn-project/txe/src/bin/index.ts index 98187dcfb81..a2fba6f3777 100644 --- a/yarn-project/txe/src/bin/index.ts +++ b/yarn-project/txe/src/bin/index.ts @@ -18,7 +18,7 @@ function main() { const txeServer = createTXERpcServer(logger); const app = txeServer.getApp(); // add status route - const statusRouter = createStatusRouter(); + const statusRouter = createStatusRouter(() => txeServer.isHealthy()); app.use(statusRouter.routes()).use(statusRouter.allowedMethods()); const httpServer = http.createServer(app.callback());