From 4a16cf78558b5cf705b5d560699318c1cc63a9c0 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Wed, 17 Jan 2024 09:48:34 +0000 Subject: [PATCH] refactor: keep track of blocks emitting/cancelling messages (#4028) This PR refactors the archiver to keep track of which L1 block last emitted messages or cancelled messages (similar to how it discovers L2 blocks) in order to make the synch process more efficient. --- yarn-project/archiver/package.json | 3 +- .../archiver/src/archiver/archiver.ts | 93 ++++++----- .../archiver/src/archiver/archiver_store.ts | 24 ++- .../src/archiver/archiver_store_test_suite.ts | 155 ++++++++---------- .../archiver/src/archiver/data_retrieval.ts | 11 +- .../archiver/src/archiver/eth_log_handlers.ts | 41 ++--- .../src/archiver/lmdb_archiver_store.test.ts | 24 +-- .../src/archiver/lmdb_archiver_store.ts | 121 ++++++-------- .../l1_to_l2_message_store.test.ts | 38 ++--- .../l1_to_l2_message_store.ts | 25 +-- .../memory_archiver_store.ts | 48 ++++-- .../circuit-types/src/l1_to_l2_message.ts | 28 ---- 12 files changed, 269 insertions(+), 342 deletions(-) diff --git a/yarn-project/archiver/package.json b/yarn-project/archiver/package.json index 4ebbd12dfdb..3dba1489b60 100644 --- a/yarn-project/archiver/package.json +++ b/yarn-project/archiver/package.json @@ -31,7 +31,8 @@ "^(\\.{1,2}/.*)\\.[cm]?js$": "$1" }, "testRegex": "./src/.*\\.test\\.(js|mjs|ts)$", - "rootDir": "./src" + "rootDir": "./src", + "workerThreads": true }, "dependencies": { "@aztec/circuit-types": "workspace:^", diff --git a/yarn-project/archiver/src/archiver/archiver.ts b/yarn-project/archiver/src/archiver/archiver.ts index 55bc7f09580..26eaaa71eeb 100644 --- a/yarn-project/archiver/src/archiver/archiver.ts +++ b/yarn-project/archiver/src/archiver/archiver.ts @@ -129,18 +129,28 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource * @param blockUntilSynced - If true, blocks until the archiver has fully synced. */ private async sync(blockUntilSynced: boolean) { + /** + * We keep track of three "pointers" to L1 blocks: + * 1. the last L1 block that published an L2 block + * 2. the last L1 block that added L1 to L2 messages + * 3. the last L1 block that cancelled L1 to L2 messages + * + * We do this to deal with L1 data providers that are eventually consistent (e.g. Infura). + * We guard against seeing block X with no data at one point, and later, the provider processes the block and it has data. + * The archiver will stay back, until there's data on L1 that will move the pointers forward. + * + * This code does not handle reorgs. + */ + const lastL1Blocks = await this.store.getL1BlockNumber(); const currentL1BlockNumber = await this.publicClient.getBlockNumber(); - // this makes the archiver more resilient to eventually-consistent eth providers like Infura - // it _will_ process the same L1 blocks over and over again until the L2 chain advances - // one thing to handle now is that we will process the same L1 to L2 messages over and over again - // so the store needs to account for that. - const lastProcessedL1BlockNumber = await this.store.getL1BlockNumber(); - if (currentL1BlockNumber <= lastProcessedL1BlockNumber) { - // reducing logs, otherwise this gets triggered on every loop (1s) - if (currentL1BlockNumber !== this.lastLoggedL1BlockNumber) { - this.log(`No new blocks to process, current block number: ${currentL1BlockNumber}`); - this.lastLoggedL1BlockNumber = currentL1BlockNumber; - } + + if ( + currentL1BlockNumber <= lastL1Blocks.addedBlock && + currentL1BlockNumber <= lastL1Blocks.addedMessages && + currentL1BlockNumber <= lastL1Blocks.cancelledMessages + ) { + // chain hasn't moved forward + // or it's been rolled back return; } @@ -152,61 +162,61 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource * to ensure that data is read exactly once. * * The first is the problem of eventually consistent ETH service providers like Infura. - * We currently read from the last L1 block that we saw emit an L2 block. This could mean - * that the archiver ends up looking at the same L1 block multiple times (e.g. if we last saw - * an L2 block emitted at L1 block 10, we'd constantly ask for L1 blocks from 11 onwards until - * we see another L2 block). For this to work message and block processing need to be idempotent. - * We should re-visit this before mainnet launch. + * Each L1 read operation will query data from the last L1 block that it saw emit its kind of data. + * (so pending L1 to L2 messages will read from the last L1 block that emitted a message and so on) + * This will mean the archiver will lag behind L1 and will only advance when there's L2-relevant activity on the chain. * * The second is that in between the various calls to L1, the block number can move meaning some * of the following calls will return data for blocks that were not present during earlier calls. - * It's possible that we actually received messages in block currentBlockNumber + 1 meaning the next time - * we do this sync we get the same message again. Additionally, the call to get cancelled L1 to L2 messages - * could read from a block not present when retrieving pending messages. If a message was added and cancelled - * in the same eth block then we could try and cancel a non-existent pending message. - * * To combat this for the time being we simply ensure that all data retrieval methods only retrieve * data up to the currentBlockNumber captured at the top of this function. We might want to improve on this * in future but for the time being it should give us the guarantees that we need - * */ - // ********** Events that are processed in between blocks ********** + // ********** Events that are processed per L1 block ********** // Process l1ToL2Messages, these are consumed as time passes, not each block const retrievedPendingL1ToL2Messages = await retrieveNewPendingL1ToL2Messages( this.publicClient, this.inboxAddress, blockUntilSynced, - lastProcessedL1BlockNumber + 1n, // + 1 to prevent re-including messages from the last processed block + lastL1Blocks.addedMessages + 1n, currentL1BlockNumber, ); const retrievedCancelledL1ToL2Messages = await retrieveNewCancelledL1ToL2Messages( this.publicClient, this.inboxAddress, blockUntilSynced, - lastProcessedL1BlockNumber + 1n, + lastL1Blocks.cancelledMessages + 1n, currentL1BlockNumber, ); - // TODO (#717): optimize this - there could be messages in confirmed that are also in pending. - // Or messages in pending that are also cancelled in the same block. No need to modify storage for them. + // group pending messages and cancelled messages by their L1 block number + const messagesByBlock = new Map(); + for (const [message, blockNumber] of retrievedPendingL1ToL2Messages.retrievedData) { + const messages = messagesByBlock.get(blockNumber) || [[], []]; + messages[0].push(message); + messagesByBlock.set(blockNumber, messages); + } - if (retrievedPendingL1ToL2Messages.retrievedData.length) { - // Store l1 to l2 messages - this.log(`Adding ${retrievedPendingL1ToL2Messages.retrievedData.length} pending l1 to l2 messages to store`); - await this.store.addPendingL1ToL2Messages(retrievedPendingL1ToL2Messages.retrievedData); + for (const [messageKey, blockNumber] of retrievedCancelledL1ToL2Messages.retrievedData) { + const messages = messagesByBlock.get(blockNumber) || [[], []]; + messages[1].push(messageKey); + messagesByBlock.set(blockNumber, messages); } - if (retrievedCancelledL1ToL2Messages.retrievedData.length) { - // remove cancelled messages from the pending message store: + // process messages from each L1 block in sequence + const l1BlocksWithMessages = Array.from(messagesByBlock.keys()).sort((a, b) => (a < b ? -1 : a === b ? 0 : 1)); + for (const l1Block of l1BlocksWithMessages) { + const [newMessages, cancelledMessages] = messagesByBlock.get(l1Block)!; this.log( - `Removing ${retrievedCancelledL1ToL2Messages.retrievedData.length} pending l1 to l2 messages from store where messages were cancelled`, + `Adding ${newMessages.length} new messages and ${cancelledMessages.length} cancelled messages in L1 block ${l1Block}`, ); - await this.store.cancelPendingL1ToL2Messages(retrievedCancelledL1ToL2Messages.retrievedData); + await this.store.addPendingL1ToL2Messages(newMessages, l1Block); + await this.store.cancelPendingL1ToL2Messages(cancelledMessages, l1Block); } - // ********** Events that are processed per block ********** + // ********** Events that are processed per L2 block ********** // Read all data from chain and then write to our stores at the end const nextExpectedL2BlockNum = BigInt((await this.store.getBlockNumber()) + 1); @@ -214,7 +224,7 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource this.publicClient, this.rollupAddress, blockUntilSynced, - lastProcessedL1BlockNumber + 1n, + lastL1Blocks.addedBlock + 1n, currentL1BlockNumber, nextExpectedL2BlockNum, ); @@ -224,7 +234,7 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource } else { this.log( `Retrieved ${retrievedBlocks.retrievedData.length} new L2 blocks between L1 blocks ${ - lastProcessedL1BlockNumber + 1n + lastL1Blocks.addedBlock + 1n } and ${currentL1BlockNumber}.`, ); } @@ -238,7 +248,7 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource this.publicClient, this.contractDeploymentEmitterAddress, blockUntilSynced, - lastProcessedL1BlockNumber + 1n, + lastL1Blocks.addedBlock + 1n, currentL1BlockNumber, blockHashMapping, ); @@ -264,9 +274,10 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource // from retrieved L2Blocks, confirm L1 to L2 messages that have been published // from each l2block fetch all messageKeys in a flattened array: - const messageKeysToRemove = retrievedBlocks.retrievedData.map(l2block => l2block.newL1ToL2Messages).flat(); this.log(`Confirming l1 to l2 messages in store`); - await this.store.confirmL1ToL2Messages(messageKeysToRemove); + for (const block of retrievedBlocks.retrievedData) { + await this.store.confirmL1ToL2Messages(block.newL1ToL2Messages); + } // store retrieved L2 blocks after removing new logs information. // remove logs to serve "lightweight" block information. Logs can be fetched separately if needed. diff --git a/yarn-project/archiver/src/archiver/archiver_store.ts b/yarn-project/archiver/src/archiver/archiver_store.ts index 44fe30cf9e5..0d4c1f13c26 100644 --- a/yarn-project/archiver/src/archiver/archiver_store.ts +++ b/yarn-project/archiver/src/archiver/archiver_store.ts @@ -1,5 +1,4 @@ import { - CancelledL1ToL2Message, ContractData, ExtendedContractData, GetUnencryptedLogsResponse, @@ -9,12 +8,23 @@ import { L2Tx, LogFilter, LogType, - PendingL1ToL2Message, TxHash, } from '@aztec/circuit-types'; import { Fr } from '@aztec/circuits.js'; import { AztecAddress } from '@aztec/foundation/aztec-address'; +/** + * Represents the latest L1 block processed by the archiver for various objects in L2. + */ +export type ArchiverL1SynchPoint = { + /** The last L1 block that added a new L2 block. */ + addedBlock: bigint; + /** The last L1 block that added pending messages */ + addedMessages: bigint; + /** The last L1 block that cancelled messages */ + cancelledMessages: bigint; +}; + /** * Interface describing a data store to be used by the archiver to store all its relevant data * (blocks, encrypted logs, aztec contract data extended contract data). @@ -58,16 +68,18 @@ export interface ArchiverDataStore { /** * Append new pending L1 to L2 messages to the store. * @param messages - The L1 to L2 messages to be added to the store. + * @param l1BlockNumber - The block number of the L1 block that added the messages. * @returns True if the operation is successful. */ - addPendingL1ToL2Messages(messages: PendingL1ToL2Message[]): Promise; + addPendingL1ToL2Messages(messages: L1ToL2Message[], l1BlockNumber: bigint): Promise; /** * Remove pending L1 to L2 messages from the store (if they were cancelled). * @param message - The message keys to be removed from the store. + * @param l1BlockNumber - The block number of the L1 block that cancelled the messages. * @returns True if the operation is successful. */ - cancelPendingL1ToL2Messages(message: CancelledL1ToL2Message[]): Promise; + cancelPendingL1ToL2Messages(message: Fr[], l1BlockNumber: bigint): Promise; /** * Messages that have been published in an L2 block are confirmed. @@ -152,7 +164,7 @@ export interface ArchiverDataStore { getBlockNumber(): Promise; /** - * Gets the number of the latest L1 block processed. + * Gets the last L1 block number processed by the archiver */ - getL1BlockNumber(): Promise; + getL1BlockNumber(): Promise; } diff --git a/yarn-project/archiver/src/archiver/archiver_store_test_suite.ts b/yarn-project/archiver/src/archiver/archiver_store_test_suite.ts index a044d6afcbb..788c0894d4e 100644 --- a/yarn-project/archiver/src/archiver/archiver_store_test_suite.ts +++ b/yarn-project/archiver/src/archiver/archiver_store_test_suite.ts @@ -1,5 +1,4 @@ import { - CancelledL1ToL2Message, ExtendedContractData, INITIAL_L2_BLOCK_NUM, L1ToL2Message, @@ -7,7 +6,6 @@ import { L2BlockContext, LogId, LogType, - PendingL1ToL2Message, TxHash, UnencryptedL2Log, } from '@aztec/circuit-types'; @@ -88,12 +86,39 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch describe('getL1BlockNumber', () => { it('returns 0n if no blocks have been added', async () => { - await expect(store.getL1BlockNumber()).resolves.toEqual(0n); + await expect(store.getL1BlockNumber()).resolves.toEqual({ + addedBlock: 0n, + addedMessages: 0n, + cancelledMessages: 0n, + }); }); it('returns the L1 block number in which the most recent L2 block was published', async () => { await store.addBlocks(blocks); - await expect(store.getL1BlockNumber()).resolves.toEqual(blocks.at(-1)!.getL1BlockNumber()); + await expect(store.getL1BlockNumber()).resolves.toEqual({ + addedBlock: blocks.at(-1)!.getL1BlockNumber(), + addedMessages: 0n, + cancelledMessages: 0n, + }); + }); + + it('returns the L1 block number that most recently added pending messages', async () => { + await store.addPendingL1ToL2Messages([L1ToL2Message.random(Fr.random())], 1n); + await expect(store.getL1BlockNumber()).resolves.toEqual({ + addedBlock: 0n, + addedMessages: 1n, + cancelledMessages: 0n, + }); + }); + it('returns the L1 block number that most recently cancelled pending messages', async () => { + const message = L1ToL2Message.random(Fr.random()); + await store.addPendingL1ToL2Messages([message], 1n); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 2n); + await expect(store.getL1BlockNumber()).resolves.toEqual({ + addedBlock: 0n, + addedMessages: 1n, + cancelledMessages: 2n, + }); }); }); @@ -151,66 +176,49 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch describe('addPendingL1ToL2Messages', () => { it('stores pending L1 to L2 messages', async () => { - await expect( - store.addPendingL1ToL2Messages([new PendingL1ToL2Message(L1ToL2Message.random(Fr.random()), 1n, 0)]), - ).resolves.toEqual(true); + await expect(store.addPendingL1ToL2Messages([L1ToL2Message.random(Fr.random())], 1n)).resolves.toEqual(true); }); it('allows duplicate pending messages in different positions in the same block', async () => { const message = L1ToL2Message.random(Fr.random()); - await expect( - store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 1n, 1), - ]), - ).resolves.toEqual(true); + await expect(store.addPendingL1ToL2Messages([message, message], 1n)).resolves.toEqual(true); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey!, message.entryKey!]); }); it('allows duplicate pending messages in different blocks', async () => { const message = L1ToL2Message.random(Fr.random()); - await expect( - store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 2n, 0), - ]), - ).resolves.toEqual(true); + await expect(store.addPendingL1ToL2Messages([message], 1n)).resolves.toEqual(true); + await expect(store.addPendingL1ToL2Messages([message], 2n)).resolves.toEqual(true); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey!, message.entryKey!]); }); it('is idempotent', async () => { const message = L1ToL2Message.random(Fr.random()); - await expect( - store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 1n, 0), - ]), - ).resolves.toEqual(true); + await expect(store.addPendingL1ToL2Messages([message], 1n)).resolves.toEqual(true); + await expect(store.addPendingL1ToL2Messages([message], 1n)).resolves.toEqual(false); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey!]); }); }); describe('getPendingL1ToL2Messages', () => { it('returns previously stored pending L1 to L2 messages', async () => { - const messageCtx = new PendingL1ToL2Message(L1ToL2Message.random(Fr.random()), 1n, 0); - await store.addPendingL1ToL2Messages([messageCtx]); - await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([messageCtx.message.entryKey!]); + const message = L1ToL2Message.random(Fr.random()); + await store.addPendingL1ToL2Messages([message], 1n); + await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([message.entryKey!]); }); it('returns messages ordered by fee', async () => { - const messageCtxs = Array.from({ length: 3 }).map( - (_, i) => new PendingL1ToL2Message(L1ToL2Message.random(Fr.random()), 1n, i), - ); + const messages = Array.from({ length: 3 }, () => L1ToL2Message.random(Fr.random())); // add a duplicate message - messageCtxs.push(new PendingL1ToL2Message(messageCtxs[0].message, 1n, 3)); + messages.push(messages[0]); - await store.addPendingL1ToL2Messages(messageCtxs); + await store.addPendingL1ToL2Messages(messages, 1n); - messageCtxs.sort((a, b) => b.message.fee - a.message.fee); - await expect(store.getPendingL1ToL2MessageKeys(messageCtxs.length)).resolves.toEqual( - messageCtxs.map(({ message }) => message.entryKey!), + messages.sort((a, b) => b.fee - a.fee); + await expect(store.getPendingL1ToL2MessageKeys(messages.length)).resolves.toEqual( + messages.map(message => message.entryKey!), ); }); @@ -221,33 +229,29 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch describe('confirmL1ToL2Messages', () => { it('updates a message from pending to confirmed', async () => { - const messageCtx = new PendingL1ToL2Message(L1ToL2Message.random(Fr.random()), 1n, 0); - await store.addPendingL1ToL2Messages([messageCtx]); - await expect(store.confirmL1ToL2Messages([messageCtx.message.entryKey!])).resolves.toEqual(true); + const message = L1ToL2Message.random(Fr.random()); + await store.addPendingL1ToL2Messages([message], 1n); + await expect(store.confirmL1ToL2Messages([message.entryKey!])).resolves.toEqual(true); }); it('once confirmed, a message is no longer pending', async () => { - const pendingMessage = new PendingL1ToL2Message(L1ToL2Message.random(Fr.random()), 1n, 0); - await store.addPendingL1ToL2Messages([pendingMessage]); - await store.confirmL1ToL2Messages([pendingMessage.message.entryKey!]); + const message = L1ToL2Message.random(Fr.random()); + await store.addPendingL1ToL2Messages([message], 1n); + await store.confirmL1ToL2Messages([message.entryKey!]); await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([]); }); it('once confirmed a message can also be pending if added again', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 1n, 0)]); + await store.addPendingL1ToL2Messages([message], 1n); await store.confirmL1ToL2Messages([message.entryKey!]); - await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 2n, 0)]); + await store.addPendingL1ToL2Messages([message], 2n); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey!]); }); it('once confirmed a message can remain pending if more of it were pending', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 1n, 1), - ]); - + await store.addPendingL1ToL2Messages([message, message], 1n); await store.confirmL1ToL2Messages([message.entryKey!]); await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([message.entryKey!]); }); @@ -256,80 +260,61 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch describe('cancelL1ToL2Messages', () => { it('cancels a pending message', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 1n, 0)]); - await store.cancelPendingL1ToL2Messages([new CancelledL1ToL2Message(message.entryKey!, 1n, 0)]); + await store.addPendingL1ToL2Messages([message], 1n); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 1n); await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([]); }); it('cancels only one of the pending messages if duplicates exist', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 1n, 1), - ]); - await store.cancelPendingL1ToL2Messages([new CancelledL1ToL2Message(message.entryKey!, 2n, 0)]); + await store.addPendingL1ToL2Messages([message, message], 1n); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 1n); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey]); }); it('once canceled a message can also be pending if added again', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 1n, 0)]); + await store.addPendingL1ToL2Messages([message], 1n); - await store.cancelPendingL1ToL2Messages([new CancelledL1ToL2Message(message.entryKey!, 1n, 0)]); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 1n); await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([]); - await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 2n, 0)]); + await store.addPendingL1ToL2Messages([message], 2n); await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([message.entryKey!]); }); it('allows adding and cancelling in the same block', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 1n, 0)]); - await store.cancelPendingL1ToL2Messages([new CancelledL1ToL2Message(message.entryKey!, 1n, 0)]); + await store.addPendingL1ToL2Messages([message], 1n); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 1n); await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([]); }); it('allows duplicates cancellations in different positions in the same block', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 1n, 1), - ]); + await store.addPendingL1ToL2Messages([message, message], 1n); - await store.cancelPendingL1ToL2Messages([ - new CancelledL1ToL2Message(message.entryKey!, 2n, 0), - new CancelledL1ToL2Message(message.entryKey!, 2n, 1), - ]); + await store.cancelPendingL1ToL2Messages([message.entryKey!, message.entryKey!], 1n); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([]); }); it('allows duplicates cancellations in different blocks', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 1n, 1), - ]); + await store.addPendingL1ToL2Messages([message, message], 1n); - await store.cancelPendingL1ToL2Messages([ - new CancelledL1ToL2Message(message.entryKey!, 2n, 0), - new CancelledL1ToL2Message(message.entryKey!, 3n, 0), - ]); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 2n); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 3n); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([]); }); it('is idempotent', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 1n, 1), - ]); + await store.addPendingL1ToL2Messages([message, message], 1n); - await store.cancelPendingL1ToL2Messages([ - new CancelledL1ToL2Message(message.entryKey!, 2n, 0), - new CancelledL1ToL2Message(message.entryKey!, 2n, 0), - ]); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 2n); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 2n); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey!]); }); diff --git a/yarn-project/archiver/src/archiver/data_retrieval.ts b/yarn-project/archiver/src/archiver/data_retrieval.ts index 81ec341e160..b0673fd9cc8 100644 --- a/yarn-project/archiver/src/archiver/data_retrieval.ts +++ b/yarn-project/archiver/src/archiver/data_retrieval.ts @@ -1,4 +1,5 @@ -import { CancelledL1ToL2Message, ExtendedContractData, L2Block, PendingL1ToL2Message } from '@aztec/circuit-types'; +import { ExtendedContractData, L1ToL2Message, L2Block } from '@aztec/circuit-types'; +import { Fr } from '@aztec/circuits.js'; import { EthAddress } from '@aztec/foundation/eth-address'; import { PublicClient } from 'viem'; @@ -123,8 +124,8 @@ export async function retrieveNewPendingL1ToL2Messages( blockUntilSynced: boolean, searchStartBlock: bigint, searchEndBlock: bigint, -): Promise> { - const retrievedNewL1ToL2Messages: PendingL1ToL2Message[] = []; +): Promise> { + const retrievedNewL1ToL2Messages: [L1ToL2Message, bigint][] = []; do { if (searchStartBlock > searchEndBlock) { break; @@ -161,8 +162,8 @@ export async function retrieveNewCancelledL1ToL2Messages( blockUntilSynced: boolean, searchStartBlock: bigint, searchEndBlock: bigint, -): Promise> { - const retrievedNewCancelledL1ToL2Messages: CancelledL1ToL2Message[] = []; +): Promise> { + const retrievedNewCancelledL1ToL2Messages: [Fr, bigint][] = []; do { if (searchStartBlock > searchEndBlock) { break; diff --git a/yarn-project/archiver/src/archiver/eth_log_handlers.ts b/yarn-project/archiver/src/archiver/eth_log_handlers.ts index ebdd62a3e98..52133e86d17 100644 --- a/yarn-project/archiver/src/archiver/eth_log_handlers.ts +++ b/yarn-project/archiver/src/archiver/eth_log_handlers.ts @@ -1,5 +1,4 @@ import { - CancelledL1ToL2Message, ContractData, EncodedContractFunction, ExtendedContractData, @@ -7,7 +6,6 @@ import { L1ToL2Message, L2Actor, L2Block, - PendingL1ToL2Message, } from '@aztec/circuit-types'; import { AztecAddress } from '@aztec/foundation/aztec-address'; import { EthAddress } from '@aztec/foundation/eth-address'; @@ -24,26 +22,23 @@ import { Hex, Log, PublicClient, decodeFunctionData, getAbiItem, getAddress, hex */ export function processPendingL1ToL2MessageAddedLogs( logs: Log[], -): PendingL1ToL2Message[] { - const l1ToL2Messages: PendingL1ToL2Message[] = []; - for (const [index, log] of logs.entries()) { +): [L1ToL2Message, bigint][] { + const l1ToL2Messages: [L1ToL2Message, bigint][] = []; + for (const log of logs) { const { sender, senderChainId, recipient, recipientVersion, content, secretHash, deadline, fee, entryKey } = log.args; - l1ToL2Messages.push( - new PendingL1ToL2Message( - new L1ToL2Message( - new L1Actor(EthAddress.fromString(sender), Number(senderChainId)), - new L2Actor(AztecAddress.fromString(recipient), Number(recipientVersion)), - Fr.fromString(content), - Fr.fromString(secretHash), - deadline, - Number(fee), - Fr.fromString(entryKey), - ), - log.blockNumber!, - index, + l1ToL2Messages.push([ + new L1ToL2Message( + new L1Actor(EthAddress.fromString(sender), Number(senderChainId)), + new L2Actor(AztecAddress.fromString(recipient), Number(recipientVersion)), + Fr.fromString(content), + Fr.fromString(secretHash), + deadline, + Number(fee), + Fr.fromString(entryKey), ), - ); + log.blockNumber!, + ]); } return l1ToL2Messages; } @@ -55,10 +50,10 @@ export function processPendingL1ToL2MessageAddedLogs( */ export function processCancelledL1ToL2MessagesLogs( logs: Log[], -): CancelledL1ToL2Message[] { - const cancelledL1ToL2Messages: CancelledL1ToL2Message[] = []; - for (const [index, log] of logs.entries()) { - cancelledL1ToL2Messages.push(new CancelledL1ToL2Message(Fr.fromString(log.args.entryKey), log.blockNumber!, index)); +): [Fr, bigint][] { + const cancelledL1ToL2Messages: [Fr, bigint][] = []; + for (const log of logs) { + cancelledL1ToL2Messages.push([Fr.fromString(log.args.entryKey), log.blockNumber!]); } return cancelledL1ToL2Messages; } diff --git a/yarn-project/archiver/src/archiver/lmdb_archiver_store.test.ts b/yarn-project/archiver/src/archiver/lmdb_archiver_store.test.ts index 63014846b50..7a1ceacb4f1 100644 --- a/yarn-project/archiver/src/archiver/lmdb_archiver_store.test.ts +++ b/yarn-project/archiver/src/archiver/lmdb_archiver_store.test.ts @@ -1,33 +1,13 @@ -import { mkdtemp, rm } from 'fs/promises'; -import { RootDatabase, open } from 'lmdb'; -import { tmpdir } from 'os'; -import { join } from 'path'; +import { open } from 'lmdb'; import { describeArchiverDataStore } from './archiver_store_test_suite.js'; import { LMDBArchiverStore } from './lmdb_archiver_store.js'; describe('LMDB Memory Store', () => { let archiverStore: LMDBArchiverStore; - let tmpDbLocation: string; - let tmpDb: RootDatabase; - - beforeAll(async () => { - tmpDbLocation = await mkdtemp(join(tmpdir(), 'archiver-store-test-')); - tmpDb = open(tmpDbLocation, {}); - }); - - afterAll(async () => { - await tmpDb.close(); - await rm(tmpDbLocation, { recursive: true }); - }); beforeEach(() => { - archiverStore = new LMDBArchiverStore(tmpDb); - }); - - afterEach(async () => { - await archiverStore?.close(); - await tmpDb.clearAsync(); + archiverStore = new LMDBArchiverStore(open({} as any)); }); describeArchiverDataStore('LMDBArchiverStore', () => archiverStore); diff --git a/yarn-project/archiver/src/archiver/lmdb_archiver_store.ts b/yarn-project/archiver/src/archiver/lmdb_archiver_store.ts index e4f66c8fca1..6343c7dd0d9 100644 --- a/yarn-project/archiver/src/archiver/lmdb_archiver_store.ts +++ b/yarn-project/archiver/src/archiver/lmdb_archiver_store.ts @@ -1,5 +1,4 @@ import { - CancelledL1ToL2Message, ContractData, ExtendedContractData, ExtendedUnencryptedL2Log, @@ -12,18 +11,16 @@ import { LogFilter, LogId, LogType, - PendingL1ToL2Message, TxHash, UnencryptedL2Log, } from '@aztec/circuit-types'; import { Fr } from '@aztec/circuits.js'; import { AztecAddress } from '@aztec/foundation/aztec-address'; -import { toBigIntBE, toBufferBE } from '@aztec/foundation/bigint-buffer'; import { createDebugLogger } from '@aztec/foundation/log'; import { Database, RangeOptions, RootDatabase } from 'lmdb'; -import { ArchiverDataStore } from './archiver_store.js'; +import { ArchiverDataStore, ArchiverL1SynchPoint } from './archiver_store.js'; /* eslint-disable */ type L1ToL2MessageAndCount = { @@ -32,26 +29,19 @@ type L1ToL2MessageAndCount = { confirmedCount: number; }; -type L1ToL2MessageBlockKey = `${string}:${'newMessage' | 'cancelledMessage'}:${number}`; - -function l1ToL2MessageBlockKey( - l1BlockNumber: bigint, - key: 'newMessage' | 'cancelledMessage', - indexInBlock: number, -): L1ToL2MessageBlockKey { - return `${toBufferBE(l1BlockNumber, 32).toString('hex')}:${key}:${indexInBlock}`; -} - type BlockIndexValue = [blockNumber: number, index: number]; type BlockContext = { block?: Uint8Array; blockHash?: Uint8Array; - l1BlockNumber?: Uint8Array; + l1BlockNumber?: bigint; encryptedLogs?: Uint8Array; unencryptedLogs?: Uint8Array; extendedContractData?: Array; }; + +const L1_BLOCK_ADDED_PENDING_MESSAGE = 'l1BlockAddedPendingMessage'; +const L1_BLOCK_CANCELLED_MESSAGE = 'l1BlockCancelledMessage'; /* eslint-enable */ /** @@ -68,7 +58,7 @@ export class LMDBArchiverStore implements ArchiverDataStore { /** L1 to L2 messages */ l1ToL2Messages: Database; /** Which blocks emitted which messages */ - l1ToL2MessagesByBlock: Database; + l1ToL2MessagesByBlock: Database; /** Pending L1 to L2 messages sorted by their fee, in buckets (dupSort=true) */ pendingMessagesByFee: Database; }; @@ -97,7 +87,7 @@ export class LMDBArchiverStore implements ArchiverDataStore { }), l1ToL2MessagesByBlock: db.openDB('l1_to_l2_message_nonces', { keyEncoding: 'ordered-binary', - encoding: 'binary', + encoding: 'msgpack', }), pendingMessagesByFee: db.openDB('pending_messages_by_fee', { keyEncoding: 'ordered-binary', @@ -125,7 +115,7 @@ export class LMDBArchiverStore implements ArchiverDataStore { for (const block of blocks) { const blockCtx = this.#tables.blocks.get(block.number) ?? {}; blockCtx.block = block.toBuffer(); - blockCtx.l1BlockNumber = toBufferBE(block.getL1BlockNumber(), 32); + blockCtx.l1BlockNumber = block.getL1BlockNumber(); blockCtx.blockHash = block.getBlockHash(); // no need to await, all writes are enqueued in the transaction @@ -225,76 +215,61 @@ export class LMDBArchiverStore implements ArchiverDataStore { /** * Append new pending L1 to L2 messages to the store. * @param messages - The L1 to L2 messages to be added to the store. + * @param l1BlockNumber - The L1 block number for which to add the messages. * @returns True if the operation is successful. */ - addPendingL1ToL2Messages(messages: PendingL1ToL2Message[]): Promise { + addPendingL1ToL2Messages(messages: L1ToL2Message[], l1BlockNumber: bigint): Promise { return this.#tables.l1ToL2Messages.transaction(() => { - for (const { message, blockNumber, indexInBlock } of messages) { + if ((this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_ADDED_PENDING_MESSAGE) ?? 0n) >= l1BlockNumber) { + return false; + } + // ensure we don't add the same messages twice + void this.#tables.l1ToL2MessagesByBlock.put(L1_BLOCK_ADDED_PENDING_MESSAGE, l1BlockNumber); + + for (const message of messages) { const messageKey = message.entryKey?.toBuffer(); if (!messageKey) { throw new Error('Message does not have an entry key'); } - const dupeKey = l1ToL2MessageBlockKey(blockNumber, 'newMessage', indexInBlock); - const messageInBlock = this.#tables.l1ToL2MessagesByBlock.get(dupeKey); - - if (messageInBlock?.equals(messageKey)) { - continue; - } else { - if (messageInBlock) { - this.#log( - `Previously add pending message ${messageInBlock.toString( - 'hex', - )} at ${dupeKey.toString()}, now got ${messageKey.toString('hex')}`, - ); - } - - void this.#tables.l1ToL2MessagesByBlock.put(dupeKey, messageKey); - } - - let messageWithCount = this.#tables.l1ToL2Messages.get(messageKey); - if (!messageWithCount) { - messageWithCount = { + let messageCtx = this.#tables.l1ToL2Messages.get(messageKey); + if (!messageCtx) { + messageCtx = { message: message.toBuffer(), pendingCount: 0, confirmedCount: 0, }; - void this.#tables.l1ToL2Messages.put(messageKey, messageWithCount); + void this.#tables.l1ToL2Messages.put(messageKey, messageCtx); } this.#updateMessageCountInTx(messageKey, message, 1, 0); } + return true; }); } /** * Remove pending L1 to L2 messages from the store (if they were cancelled). - * @param messages - The message keys to be removed from the store. + * @param cancelledMessages - The message keys to be removed from the store. + * @param l1BlockNumber - The L1 block number for which to remove the messages. * @returns True if the operation is successful. */ - cancelPendingL1ToL2Messages(messages: CancelledL1ToL2Message[]): Promise { + cancelPendingL1ToL2Messages(cancelledMessages: Fr[], l1BlockNumber: bigint): Promise { return this.#tables.l1ToL2Messages.transaction(() => { - for (const { blockNumber, indexInBlock, entryKey } of messages) { - const messageKey = entryKey.toBuffer(); - const dupeKey = l1ToL2MessageBlockKey(blockNumber, 'cancelledMessage', indexInBlock); - const messageInBlock = this.#tables.l1ToL2MessagesByBlock.get(dupeKey); - if (messageInBlock?.equals(messageKey)) { + if ((this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_CANCELLED_MESSAGE) ?? 0n) >= l1BlockNumber) { + return false; + } + void this.#tables.l1ToL2MessagesByBlock.put(L1_BLOCK_CANCELLED_MESSAGE, l1BlockNumber); + + for (const messageKey of cancelledMessages) { + const message = this.#getL1ToL2Message(messageKey.toBuffer()); + if (!message) { continue; - } else { - if (messageInBlock) { - this.#log( - `Previously add pending message ${messageInBlock.toString( - 'hex', - )} at ${dupeKey.toString()}, now got ${messageKey.toString('hex')}`, - ); - } - void this.#tables.l1ToL2MessagesByBlock.put(dupeKey, messageKey); } - - const message = this.#getL1ToL2Message(messageKey); - this.#updateMessageCountInTx(messageKey, message, -1, 0); + this.#updateMessageCountInTx(messageKey.toBuffer(), message, -1, 0); } + return true; }); } @@ -600,19 +575,21 @@ export class LMDBArchiverStore implements ArchiverDataStore { return Promise.resolve(typeof lastBlockNumber === 'number' ? lastBlockNumber : INITIAL_L2_BLOCK_NUM - 1); } - getL1BlockNumber(): Promise { + /** + * Gets the last L1 block number processed by the archiver + */ + getL1BlockNumber(): Promise { // inverse range with no start/end will return the last value - const [lastBlock] = this.#tables.blocks.getRange({ reverse: true, limit: 1 }).asArray; - if (!lastBlock) { - return Promise.resolve(0n); - } else { - const blockCtx = lastBlock.value; - if (!blockCtx.l1BlockNumber) { - return Promise.reject(new Error('L1 block number not found')); - } else { - return Promise.resolve(toBigIntBE(asBuffer(blockCtx.l1BlockNumber))); - } - } + const [lastL2Block] = this.#tables.blocks.getRange({ reverse: true, limit: 1 }).asArray; + const addedBlock = lastL2Block?.value?.l1BlockNumber ?? 0n; + const addedMessages = this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_ADDED_PENDING_MESSAGE) ?? 0n; + const cancelledMessages = this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_CANCELLED_MESSAGE) ?? 0n; + + return Promise.resolve({ + addedBlock, + addedMessages, + cancelledMessages, + }); } #getBlock(blockNumber: number, withLogs = false): L2Block | undefined { diff --git a/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.test.ts b/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.test.ts index 1c910e2d6b5..3a60f300409 100644 --- a/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.test.ts +++ b/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.test.ts @@ -16,21 +16,15 @@ describe('l1_to_l2_message_store', () => { }); it('addMessage adds a message', () => { - store.addMessage(entryKey, msg, 1n, 0); + store.addMessage(entryKey, msg); expect(store.getMessage(entryKey)).toEqual(msg); }); it('addMessage increments the count if the message is already in the store', () => { - store.addMessage(entryKey, msg, 1n, 0); - store.addMessage(entryKey, msg, 1n, 1); + store.addMessage(entryKey, msg); + store.addMessage(entryKey, msg); expect(store.getMessageAndCount(entryKey)).toEqual({ message: msg, count: 2 }); }); - - it('addMessage does not increment the count if the message is already in the store at the same position', () => { - store.addMessage(entryKey, msg, 1n, 0); - store.addMessage(entryKey, msg, 1n, 0); - expect(store.getMessageAndCount(entryKey)).toEqual({ message: msg, count: 1 }); - }); }); describe('pending_l1_to_l2_message_store', () => { @@ -46,22 +40,22 @@ describe('pending_l1_to_l2_message_store', () => { }); it('removeMessage removes the message if the count is 1', () => { - store.addMessage(entryKey, msg, 1n, 0); - store.removeMessage(entryKey, 2n, 0); + store.addMessage(entryKey, msg); + store.removeMessage(entryKey); expect(store.getMessage(entryKey)).toBeUndefined(); }); it("handles case when removing a message that doesn't exist", () => { - expect(() => store.removeMessage(new Fr(0), 1n, 0)).not.toThrow(); + expect(() => store.removeMessage(new Fr(0))).not.toThrow(); const one = new Fr(1); - expect(() => store.removeMessage(one, 1n, 0)).toThrow(`Message with key ${one.value} not found in store`); + expect(() => store.removeMessage(one)).toThrow(`Message with key ${one.value} not found in store`); }); it('removeMessage decrements the count if the message is already in the store', () => { - store.addMessage(entryKey, msg, 1n, 0); - store.addMessage(entryKey, msg, 1n, 1); - store.addMessage(entryKey, msg, 1n, 2); - store.removeMessage(entryKey, 2n, 0); + store.addMessage(entryKey, msg); + store.addMessage(entryKey, msg); + store.addMessage(entryKey, msg); + store.removeMessage(entryKey); expect(store.getMessageAndCount(entryKey)).toEqual({ message: msg, count: 2 }); }); @@ -70,21 +64,21 @@ describe('pending_l1_to_l2_message_store', () => { }); it('getMessageKeys returns an empty array if limit is 0', () => { - store.addMessage(entryKey, msg, 1n, 0); + store.addMessage(entryKey, msg); expect(store.getMessageKeys(0)).toEqual([]); }); it('get messages for a non-empty store when limit > number of messages in store', () => { const entryKeys = [1, 2, 3, 4, 5].map(x => new Fr(x)); - entryKeys.forEach((entryKey, i) => { - store.addMessage(entryKey, L1ToL2Message.random(), 1n, i); + entryKeys.forEach(entryKey => { + store.addMessage(entryKey, L1ToL2Message.random()); }); expect(store.getMessageKeys(10).length).toEqual(5); }); it('get messages returns messages sorted by fees and also includes multiple of the same message', () => { const entryKeys = [1, 2, 3, 3, 3, 4].map(x => new Fr(x)); - entryKeys.forEach((entryKey, i) => { + entryKeys.forEach(entryKey => { // set msg.fee to entryKey to test the sort. const msg = new L1ToL2Message( L1Actor.random(), @@ -95,7 +89,7 @@ describe('pending_l1_to_l2_message_store', () => { Number(entryKey.value), entryKey, ); - store.addMessage(entryKey, msg, 1n, i); + store.addMessage(entryKey, msg); }); const expectedMessageFees = [4n, 3n, 3n, 3n]; // the top 4. const receivedMessageFees = store.getMessageKeys(4).map(key => key.value); diff --git a/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.ts b/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.ts index e5a7ced4bfc..ae6c6988957 100644 --- a/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.ts +++ b/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.ts @@ -11,21 +11,11 @@ export class L1ToL2MessageStore { * messages (and the number of times the message has been seen). */ protected store: Map = new Map(); - private messagesByBlock = new Set(); constructor() {} - addMessage(messageKey: Fr, message: L1ToL2Message, l1BlocKNumber: bigint, messageIndex: number) { - if (this.messagesByBlock.has(`${l1BlocKNumber}-${messageIndex}`)) { - return; - } - this.messagesByBlock.add(`${l1BlocKNumber}-${messageIndex}`); - - this.addMessageUnsafe(messageKey, message); - } - - addMessageUnsafe(messageKey: Fr, message: L1ToL2Message) { - const messageKeyBigInt = messageKey.value; + addMessage(messageKey: Fr, message: L1ToL2Message) { + const messageKeyBigInt = messageKey.toBigInt(); const msgAndCount = this.store.get(messageKeyBigInt); if (msgAndCount) { msgAndCount.count++; @@ -48,7 +38,6 @@ export class L1ToL2MessageStore { * for removing messages or fetching multiple messages. */ export class PendingL1ToL2MessageStore extends L1ToL2MessageStore { - private cancelledMessagesByBlock = new Set(); getMessageKeys(limit: number): Fr[] { if (limit < 1) { return []; @@ -68,20 +57,12 @@ export class PendingL1ToL2MessageStore extends L1ToL2MessageStore { return messages; } - removeMessage(messageKey: Fr, l1BlockNumber: bigint, messageIndex: number) { + removeMessage(messageKey: Fr) { // ignore 0 - messageKey is a hash, so a 0 can probabilistically never occur. It is best to skip it. if (messageKey.equals(Fr.ZERO)) { return; } - if (this.cancelledMessagesByBlock.has(`${l1BlockNumber}-${messageIndex}`)) { - return; - } - this.cancelledMessagesByBlock.add(`${l1BlockNumber}-${messageIndex}`); - this.removeMessageUnsafe(messageKey); - } - - removeMessageUnsafe(messageKey: Fr) { const messageKeyBigInt = messageKey.value; const msgAndCount = this.store.get(messageKeyBigInt); if (!msgAndCount) { diff --git a/yarn-project/archiver/src/archiver/memory_archiver_store/memory_archiver_store.ts b/yarn-project/archiver/src/archiver/memory_archiver_store/memory_archiver_store.ts index 1c2c2fd5369..62824ae1fe3 100644 --- a/yarn-project/archiver/src/archiver/memory_archiver_store/memory_archiver_store.ts +++ b/yarn-project/archiver/src/archiver/memory_archiver_store/memory_archiver_store.ts @@ -1,5 +1,4 @@ import { - CancelledL1ToL2Message, ContractData, ExtendedContractData, ExtendedUnencryptedL2Log, @@ -13,7 +12,6 @@ import { LogFilter, LogId, LogType, - PendingL1ToL2Message, TxHash, UnencryptedL2Log, } from '@aztec/circuit-types'; @@ -70,6 +68,9 @@ export class MemoryArchiverStore implements ArchiverDataStore { */ private pendingL1ToL2Messages: PendingL1ToL2MessageStore = new PendingL1ToL2MessageStore(); + private lastL1BlockAddedMessages: bigint = 0n; + private lastL1BlockCancelledMessages: bigint = 0n; + constructor( /** The max number of logs that can be obtained in 1 "getUnencryptedLogs" call. */ public readonly maxLogs: number, @@ -108,11 +109,17 @@ export class MemoryArchiverStore implements ArchiverDataStore { /** * Append new pending L1 to L2 messages to the store. * @param messages - The L1 to L2 messages to be added to the store. + * @param l1BlockNumber - The L1 block number for which to add the messages. * @returns True if the operation is successful (always in this implementation). */ - public addPendingL1ToL2Messages(messages: PendingL1ToL2Message[]): Promise { - for (const { message, blockNumber, indexInBlock } of messages) { - this.pendingL1ToL2Messages.addMessage(message.entryKey!, message, blockNumber, indexInBlock); + public addPendingL1ToL2Messages(messages: L1ToL2Message[], l1BlockNumber: bigint): Promise { + if (l1BlockNumber <= this.lastL1BlockAddedMessages) { + return Promise.resolve(false); + } + + this.lastL1BlockAddedMessages = l1BlockNumber; + for (const message of messages) { + this.pendingL1ToL2Messages.addMessage(message.entryKey!, message); } return Promise.resolve(true); } @@ -120,11 +127,17 @@ export class MemoryArchiverStore implements ArchiverDataStore { /** * Remove pending L1 to L2 messages from the store (if they were cancelled). * @param messages - The message keys to be removed from the store. + * @param l1BlockNumber - The L1 block number for which to remove the messages. * @returns True if the operation is successful (always in this implementation). */ - public cancelPendingL1ToL2Messages(messages: CancelledL1ToL2Message[]): Promise { - messages.forEach(({ entryKey, blockNumber, indexInBlock }) => { - this.pendingL1ToL2Messages.removeMessage(entryKey, blockNumber, indexInBlock); + public cancelPendingL1ToL2Messages(messages: Fr[], l1BlockNumber: bigint): Promise { + if (l1BlockNumber <= this.lastL1BlockCancelledMessages) { + return Promise.resolve(false); + } + + this.lastL1BlockCancelledMessages = l1BlockNumber; + messages.forEach(entryKey => { + this.pendingL1ToL2Messages.removeMessage(entryKey); }); return Promise.resolve(true); } @@ -137,8 +150,8 @@ export class MemoryArchiverStore implements ArchiverDataStore { */ public confirmL1ToL2Messages(messageKeys: Fr[]): Promise { messageKeys.forEach(messageKey => { - this.confirmedL1ToL2Messages.addMessageUnsafe(messageKey, this.pendingL1ToL2Messages.getMessage(messageKey)!); - this.pendingL1ToL2Messages.removeMessageUnsafe(messageKey); + this.confirmedL1ToL2Messages.addMessage(messageKey, this.pendingL1ToL2Messages.getMessage(messageKey)!); + this.pendingL1ToL2Messages.removeMessage(messageKey); }); return Promise.resolve(true); } @@ -390,10 +403,15 @@ export class MemoryArchiverStore implements ArchiverDataStore { return Promise.resolve(this.l2BlockContexts[this.l2BlockContexts.length - 1].block.number); } - public getL1BlockNumber(): Promise { - if (this.l2BlockContexts.length === 0) { - return Promise.resolve(0n); - } - return Promise.resolve(this.l2BlockContexts[this.l2BlockContexts.length - 1].block.getL1BlockNumber()); + public getL1BlockNumber() { + const addedBlock = this.l2BlockContexts[this.l2BlockContexts.length - 1]?.block?.getL1BlockNumber() ?? 0n; + const addedMessages = this.lastL1BlockAddedMessages; + const cancelledMessages = this.lastL1BlockCancelledMessages; + + return Promise.resolve({ + addedBlock, + addedMessages, + cancelledMessages, + }); } } diff --git a/yarn-project/circuit-types/src/l1_to_l2_message.ts b/yarn-project/circuit-types/src/l1_to_l2_message.ts index e6697384c6b..a072e6c270b 100644 --- a/yarn-project/circuit-types/src/l1_to_l2_message.ts +++ b/yarn-project/circuit-types/src/l1_to_l2_message.ts @@ -62,34 +62,6 @@ export class L1ToL2MessageAndIndex { } } -/** - * An L1 to L2 message emitted in a particular L1 block. - */ -export class PendingL1ToL2Message { - constructor( - /** the message */ - public readonly message: L1ToL2Message, - /** the L1 block this message was emitted in */ - public readonly blockNumber: bigint, - /** at which index in the L1 block this message was emitted */ - public readonly indexInBlock: number, - ) {} -} - -/** - * An L1 to L2 message that was cancelled. - */ -export class CancelledL1ToL2Message { - constructor( - /** the message */ - public readonly entryKey: Fr, - /** the L1 block this message was emitted in */ - public readonly blockNumber: bigint, - /** at which index in the L1 block this message was emitted */ - public readonly indexInBlock: number, - ) {} -} - /** * The format of an L1 to L2 Message. */