Skip to content

Commit

Permalink
refactor: process l1-l2 messages per block
Browse files Browse the repository at this point in the history
  • Loading branch information
alexghr committed Jan 15, 2024
1 parent 208991a commit f6e66e9
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 339 deletions.
3 changes: 2 additions & 1 deletion yarn-project/archiver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
"^(\\.{1,2}/.*)\\.[cm]?js$": "$1"
},
"testRegex": "./src/.*\\.test\\.(js|mjs|ts)$",
"rootDir": "./src"
"rootDir": "./src",
"workerThreads": true
},
"dependencies": {
"@aztec/circuit-types": "workspace:^",
Expand Down
93 changes: 52 additions & 41 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,28 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
*/
private async sync(blockUntilSynced: boolean) {
/**
* We keep track of three "pointers" to L1 blocks:
* 1. the last L1 block that published an L2 block
* 2. the last L1 block that added L1 to L2 messages
* 3. the last L1 block that cancelled L1 to L2 messages
*
* We do this to deal with L1 data providers that are eventually consistent (e.g. Infura).
* We guard against seeing block X with no data at one point, and later, the provider processes the block and it has data.
* The archiver will stay back, until there's data on L1 that will move the pointers forward.
*
* This code does not handle reorgs.
*/
const lastL1Blocks = await this.store.getL1BlockNumber();
const currentL1BlockNumber = await this.publicClient.getBlockNumber();
// this makes the archiver more resilient to eventually-consistent eth providers like Infura
// it _will_ process the same L1 blocks over and over again until the L2 chain advances
// one thing to handle now is that we will process the same L1 to L2 messages over and over again
// so the store needs to account for that.
const lastProcessedL1BlockNumber = await this.store.getL1BlockNumber();
if (currentL1BlockNumber <= lastProcessedL1BlockNumber) {
// reducing logs, otherwise this gets triggered on every loop (1s)
if (currentL1BlockNumber !== this.lastLoggedL1BlockNumber) {
this.log(`No new blocks to process, current block number: ${currentL1BlockNumber}`);
this.lastLoggedL1BlockNumber = currentL1BlockNumber;
}

if (
currentL1BlockNumber <= lastL1Blocks.addedBlock &&
currentL1BlockNumber <= lastL1Blocks.addedMessages &&
currentL1BlockNumber <= lastL1Blocks.cancelledMessages
) {
// chain hasn't moved forward
// or it's been rolled back
return;
}

Expand All @@ -152,69 +162,69 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
* to ensure that data is read exactly once.
*
* The first is the problem of eventually consistent ETH service providers like Infura.
* We currently read from the last L1 block that we saw emit an L2 block. This could mean
* that the archiver ends up looking at the same L1 block multiple times (e.g. if we last saw
* an L2 block emitted at L1 block 10, we'd constantly ask for L1 blocks from 11 onwards until
* we see another L2 block). For this to work message and block processing need to be idempotent.
* We should re-visit this before mainnet launch.
* Each L1 read operation will query data from the last L1 block that it saw emit its kind of data.
* (so pending L1 to L2 messages will read from the last L1 block that emitted a message and so on)
* This will mean the archiver will lag behind L1 and will only advance when there's L2-relevant activity on the chain.
*
* The second is that in between the various calls to L1, the block number can move meaning some
* of the following calls will return data for blocks that were not present during earlier calls.
* It's possible that we actually received messages in block currentBlockNumber + 1 meaning the next time
* we do this sync we get the same message again. Additionally, the call to get cancelled L1 to L2 messages
* could read from a block not present when retrieving pending messages. If a message was added and cancelled
* in the same eth block then we could try and cancel a non-existent pending message.
*
* To combat this for the time being we simply ensure that all data retrieval methods only retrieve
* data up to the currentBlockNumber captured at the top of this function. We might want to improve on this
* in future but for the time being it should give us the guarantees that we need
*
*/

// ********** Events that are processed in between blocks **********
// ********** Events that are processed per L1 block **********

// Process l1ToL2Messages, these are consumed as time passes, not each block
const retrievedPendingL1ToL2Messages = await retrieveNewPendingL1ToL2Messages(
this.publicClient,
this.inboxAddress,
blockUntilSynced,
lastProcessedL1BlockNumber + 1n, // + 1 to prevent re-including messages from the last processed block
lastL1Blocks.addedMessages + 1n,
currentL1BlockNumber,
);
const retrievedCancelledL1ToL2Messages = await retrieveNewCancelledL1ToL2Messages(
this.publicClient,
this.inboxAddress,
blockUntilSynced,
lastProcessedL1BlockNumber + 1n,
lastL1Blocks.cancelledMessages + 1n,
currentL1BlockNumber,
);

// TODO (#717): optimize this - there could be messages in confirmed that are also in pending.
// Or messages in pending that are also cancelled in the same block. No need to modify storage for them.
// group pending messages and cancelled messages by their L1 block number
const messagesByBlock = new Map<bigint, [L1ToL2Message[], Fr[]]>();
for (const [message, blockNumber] of retrievedPendingL1ToL2Messages.retrievedData) {
const messages = messagesByBlock.get(blockNumber) || [[], []];
messages[0].push(message);
messagesByBlock.set(blockNumber, messages);
}

if (retrievedPendingL1ToL2Messages.retrievedData.length) {
// Store l1 to l2 messages
this.log(`Adding ${retrievedPendingL1ToL2Messages.retrievedData.length} pending l1 to l2 messages to store`);
await this.store.addPendingL1ToL2Messages(retrievedPendingL1ToL2Messages.retrievedData);
for (const [messageKey, blockNumber] of retrievedCancelledL1ToL2Messages.retrievedData) {
const messages = messagesByBlock.get(blockNumber) || [[], []];
messages[1].push(messageKey);
messagesByBlock.set(blockNumber, messages);
}

if (retrievedCancelledL1ToL2Messages.retrievedData.length) {
// remove cancelled messages from the pending message store:
// process messages from each L1 block in sequence
const l1BlocksWithMessages = Array.from(messagesByBlock.keys()).sort((a, b) => (a < b ? -1 : a === b ? 0 : 1));
for (const l1Block of l1BlocksWithMessages) {
const [newMessages, cancelledMessages] = messagesByBlock.get(l1Block)!;
this.log(
`Removing ${retrievedCancelledL1ToL2Messages.retrievedData.length} pending l1 to l2 messages from store where messages were cancelled`,
`Adding ${newMessages.length} new messages and ${cancelledMessages.length} cancelled messages in L1 block ${l1Block}`,
);
await this.store.cancelPendingL1ToL2Messages(retrievedCancelledL1ToL2Messages.retrievedData);
await this.store.addPendingL1ToL2Messages(newMessages, l1Block);
await this.store.cancelPendingL1ToL2Messages(cancelledMessages, l1Block);
}

// ********** Events that are processed per block **********
// ********** Events that are processed per L2 block **********

// Read all data from chain and then write to our stores at the end
const nextExpectedL2BlockNum = BigInt((await this.store.getBlockNumber()) + 1);
const retrievedBlocks = await retrieveBlocks(
this.publicClient,
this.rollupAddress,
blockUntilSynced,
lastProcessedL1BlockNumber + 1n,
lastL1Blocks.addedBlock + 1n,
currentL1BlockNumber,
nextExpectedL2BlockNum,
);
Expand All @@ -224,7 +234,7 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
} else {
this.log(
`Retrieved ${retrievedBlocks.retrievedData.length} new L2 blocks between L1 blocks ${
lastProcessedL1BlockNumber + 1n
lastL1Blocks.addedBlock + 1n
} and ${currentL1BlockNumber}.`,
);
}
Expand All @@ -238,7 +248,7 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
this.publicClient,
this.contractDeploymentEmitterAddress,
blockUntilSynced,
lastProcessedL1BlockNumber + 1n,
lastL1Blocks.addedBlock + 1n,
currentL1BlockNumber,
blockHashMapping,
);
Expand All @@ -264,9 +274,10 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource

// from retrieved L2Blocks, confirm L1 to L2 messages that have been published
// from each l2block fetch all messageKeys in a flattened array:
const messageKeysToRemove = retrievedBlocks.retrievedData.map(l2block => l2block.newL1ToL2Messages).flat();
this.log(`Confirming l1 to l2 messages in store`);
await this.store.confirmL1ToL2Messages(messageKeysToRemove);
for (const block of retrievedBlocks.retrievedData) {
await this.store.confirmL1ToL2Messages(block.newL1ToL2Messages);
}

// store retrieved L2 blocks after removing new logs information.
// remove logs to serve "lightweight" block information. Logs can be fetched separately if needed.
Expand Down
17 changes: 12 additions & 5 deletions yarn-project/archiver/src/archiver/archiver_store.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {
CancelledL1ToL2Message,
ContractData,
ExtendedContractData,
GetUnencryptedLogsResponse,
Expand All @@ -9,7 +8,6 @@ import {
L2Tx,
LogFilter,
LogType,
PendingL1ToL2Message,
TxHash,
} from '@aztec/circuit-types';
import { Fr } from '@aztec/circuits.js';
Expand Down Expand Up @@ -58,16 +56,18 @@ export interface ArchiverDataStore {
/**
* Append new pending L1 to L2 messages to the store.
* @param messages - The L1 to L2 messages to be added to the store.
* @param l1BlockNumber - The block number of the L1 block that added the messages.
* @returns True if the operation is successful.
*/
addPendingL1ToL2Messages(messages: PendingL1ToL2Message[]): Promise<boolean>;
addPendingL1ToL2Messages(messages: L1ToL2Message[], l1BlockNumber: bigint): Promise<boolean>;

/**
* Remove pending L1 to L2 messages from the store (if they were cancelled).
* @param message - The message keys to be removed from the store.
* @param l1BlockNumber - The block number of the L1 block that cancelled the messages.
* @returns True if the operation is successful.
*/
cancelPendingL1ToL2Messages(message: CancelledL1ToL2Message[]): Promise<boolean>;
cancelPendingL1ToL2Messages(message: Fr[], l1BlockNumber: bigint): Promise<boolean>;

/**
* Messages that have been published in an L2 block are confirmed.
Expand Down Expand Up @@ -154,5 +154,12 @@ export interface ArchiverDataStore {
/**
* Gets the number of the latest L1 block processed.
*/
getL1BlockNumber(): Promise<bigint>;
getL1BlockNumber(): Promise<{
/** The last L1 block that added a new L2 block. */
addedBlock: bigint;
/** The last L1 block that added pending messages */
addedMessages: bigint;
/** The last L1 block that cancelled messages */
cancelledMessages: bigint;
}>;
}
Loading

0 comments on commit f6e66e9

Please sign in to comment.