Skip to content

Commit

Permalink
refactor: keep track of blocks emitting/cancelling messages (AztecPro…
Browse files Browse the repository at this point in the history
…tocol#4028)

This PR refactors the archiver to keep track of which L1 block last
emitted messages or cancelled messages (similar to how it discovers L2
blocks) in order to make the synch process more efficient.
  • Loading branch information
alexghr authored Jan 17, 2024
1 parent b897798 commit 4a16cf7
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 342 deletions.
3 changes: 2 additions & 1 deletion yarn-project/archiver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
"^(\\.{1,2}/.*)\\.[cm]?js$": "$1"
},
"testRegex": "./src/.*\\.test\\.(js|mjs|ts)$",
"rootDir": "./src"
"rootDir": "./src",
"workerThreads": true
},
"dependencies": {
"@aztec/circuit-types": "workspace:^",
Expand Down
93 changes: 52 additions & 41 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,28 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
*/
private async sync(blockUntilSynced: boolean) {
/**
* We keep track of three "pointers" to L1 blocks:
* 1. the last L1 block that published an L2 block
* 2. the last L1 block that added L1 to L2 messages
* 3. the last L1 block that cancelled L1 to L2 messages
*
* We do this to deal with L1 data providers that are eventually consistent (e.g. Infura).
* We guard against seeing block X with no data at one point, and later, the provider processes the block and it has data.
* The archiver will stay back, until there's data on L1 that will move the pointers forward.
*
* This code does not handle reorgs.
*/
const lastL1Blocks = await this.store.getL1BlockNumber();
const currentL1BlockNumber = await this.publicClient.getBlockNumber();
// this makes the archiver more resilient to eventually-consistent eth providers like Infura
// it _will_ process the same L1 blocks over and over again until the L2 chain advances
// one thing to handle now is that we will process the same L1 to L2 messages over and over again
// so the store needs to account for that.
const lastProcessedL1BlockNumber = await this.store.getL1BlockNumber();
if (currentL1BlockNumber <= lastProcessedL1BlockNumber) {
// reducing logs, otherwise this gets triggered on every loop (1s)
if (currentL1BlockNumber !== this.lastLoggedL1BlockNumber) {
this.log(`No new blocks to process, current block number: ${currentL1BlockNumber}`);
this.lastLoggedL1BlockNumber = currentL1BlockNumber;
}

if (
currentL1BlockNumber <= lastL1Blocks.addedBlock &&
currentL1BlockNumber <= lastL1Blocks.addedMessages &&
currentL1BlockNumber <= lastL1Blocks.cancelledMessages
) {
// chain hasn't moved forward
// or it's been rolled back
return;
}

Expand All @@ -152,69 +162,69 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
* to ensure that data is read exactly once.
*
* The first is the problem of eventually consistent ETH service providers like Infura.
* We currently read from the last L1 block that we saw emit an L2 block. This could mean
* that the archiver ends up looking at the same L1 block multiple times (e.g. if we last saw
* an L2 block emitted at L1 block 10, we'd constantly ask for L1 blocks from 11 onwards until
* we see another L2 block). For this to work message and block processing need to be idempotent.
* We should re-visit this before mainnet launch.
* Each L1 read operation will query data from the last L1 block that it saw emit its kind of data.
* (so pending L1 to L2 messages will read from the last L1 block that emitted a message and so on)
* This will mean the archiver will lag behind L1 and will only advance when there's L2-relevant activity on the chain.
*
* The second is that in between the various calls to L1, the block number can move meaning some
* of the following calls will return data for blocks that were not present during earlier calls.
* It's possible that we actually received messages in block currentBlockNumber + 1 meaning the next time
* we do this sync we get the same message again. Additionally, the call to get cancelled L1 to L2 messages
* could read from a block not present when retrieving pending messages. If a message was added and cancelled
* in the same eth block then we could try and cancel a non-existent pending message.
*
* To combat this for the time being we simply ensure that all data retrieval methods only retrieve
* data up to the currentBlockNumber captured at the top of this function. We might want to improve on this
* in future but for the time being it should give us the guarantees that we need
*
*/

// ********** Events that are processed in between blocks **********
// ********** Events that are processed per L1 block **********

// Process l1ToL2Messages, these are consumed as time passes, not each block
const retrievedPendingL1ToL2Messages = await retrieveNewPendingL1ToL2Messages(
this.publicClient,
this.inboxAddress,
blockUntilSynced,
lastProcessedL1BlockNumber + 1n, // + 1 to prevent re-including messages from the last processed block
lastL1Blocks.addedMessages + 1n,
currentL1BlockNumber,
);
const retrievedCancelledL1ToL2Messages = await retrieveNewCancelledL1ToL2Messages(
this.publicClient,
this.inboxAddress,
blockUntilSynced,
lastProcessedL1BlockNumber + 1n,
lastL1Blocks.cancelledMessages + 1n,
currentL1BlockNumber,
);

// TODO (#717): optimize this - there could be messages in confirmed that are also in pending.
// Or messages in pending that are also cancelled in the same block. No need to modify storage for them.
// group pending messages and cancelled messages by their L1 block number
const messagesByBlock = new Map<bigint, [L1ToL2Message[], Fr[]]>();
for (const [message, blockNumber] of retrievedPendingL1ToL2Messages.retrievedData) {
const messages = messagesByBlock.get(blockNumber) || [[], []];
messages[0].push(message);
messagesByBlock.set(blockNumber, messages);
}

if (retrievedPendingL1ToL2Messages.retrievedData.length) {
// Store l1 to l2 messages
this.log(`Adding ${retrievedPendingL1ToL2Messages.retrievedData.length} pending l1 to l2 messages to store`);
await this.store.addPendingL1ToL2Messages(retrievedPendingL1ToL2Messages.retrievedData);
for (const [messageKey, blockNumber] of retrievedCancelledL1ToL2Messages.retrievedData) {
const messages = messagesByBlock.get(blockNumber) || [[], []];
messages[1].push(messageKey);
messagesByBlock.set(blockNumber, messages);
}

if (retrievedCancelledL1ToL2Messages.retrievedData.length) {
// remove cancelled messages from the pending message store:
// process messages from each L1 block in sequence
const l1BlocksWithMessages = Array.from(messagesByBlock.keys()).sort((a, b) => (a < b ? -1 : a === b ? 0 : 1));
for (const l1Block of l1BlocksWithMessages) {
const [newMessages, cancelledMessages] = messagesByBlock.get(l1Block)!;
this.log(
`Removing ${retrievedCancelledL1ToL2Messages.retrievedData.length} pending l1 to l2 messages from store where messages were cancelled`,
`Adding ${newMessages.length} new messages and ${cancelledMessages.length} cancelled messages in L1 block ${l1Block}`,
);
await this.store.cancelPendingL1ToL2Messages(retrievedCancelledL1ToL2Messages.retrievedData);
await this.store.addPendingL1ToL2Messages(newMessages, l1Block);
await this.store.cancelPendingL1ToL2Messages(cancelledMessages, l1Block);
}

// ********** Events that are processed per block **********
// ********** Events that are processed per L2 block **********

// Read all data from chain and then write to our stores at the end
const nextExpectedL2BlockNum = BigInt((await this.store.getBlockNumber()) + 1);
const retrievedBlocks = await retrieveBlocks(
this.publicClient,
this.rollupAddress,
blockUntilSynced,
lastProcessedL1BlockNumber + 1n,
lastL1Blocks.addedBlock + 1n,
currentL1BlockNumber,
nextExpectedL2BlockNum,
);
Expand All @@ -224,7 +234,7 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
} else {
this.log(
`Retrieved ${retrievedBlocks.retrievedData.length} new L2 blocks between L1 blocks ${
lastProcessedL1BlockNumber + 1n
lastL1Blocks.addedBlock + 1n
} and ${currentL1BlockNumber}.`,
);
}
Expand All @@ -238,7 +248,7 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
this.publicClient,
this.contractDeploymentEmitterAddress,
blockUntilSynced,
lastProcessedL1BlockNumber + 1n,
lastL1Blocks.addedBlock + 1n,
currentL1BlockNumber,
blockHashMapping,
);
Expand All @@ -264,9 +274,10 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource

// from retrieved L2Blocks, confirm L1 to L2 messages that have been published
// from each l2block fetch all messageKeys in a flattened array:
const messageKeysToRemove = retrievedBlocks.retrievedData.map(l2block => l2block.newL1ToL2Messages).flat();
this.log(`Confirming l1 to l2 messages in store`);
await this.store.confirmL1ToL2Messages(messageKeysToRemove);
for (const block of retrievedBlocks.retrievedData) {
await this.store.confirmL1ToL2Messages(block.newL1ToL2Messages);
}

// store retrieved L2 blocks after removing new logs information.
// remove logs to serve "lightweight" block information. Logs can be fetched separately if needed.
Expand Down
24 changes: 18 additions & 6 deletions yarn-project/archiver/src/archiver/archiver_store.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {
CancelledL1ToL2Message,
ContractData,
ExtendedContractData,
GetUnencryptedLogsResponse,
Expand All @@ -9,12 +8,23 @@ import {
L2Tx,
LogFilter,
LogType,
PendingL1ToL2Message,
TxHash,
} from '@aztec/circuit-types';
import { Fr } from '@aztec/circuits.js';
import { AztecAddress } from '@aztec/foundation/aztec-address';

/**
* Represents the latest L1 block processed by the archiver for various objects in L2.
*/
export type ArchiverL1SynchPoint = {
/** The last L1 block that added a new L2 block. */
addedBlock: bigint;
/** The last L1 block that added pending messages */
addedMessages: bigint;
/** The last L1 block that cancelled messages */
cancelledMessages: bigint;
};

/**
* Interface describing a data store to be used by the archiver to store all its relevant data
* (blocks, encrypted logs, aztec contract data extended contract data).
Expand Down Expand Up @@ -58,16 +68,18 @@ export interface ArchiverDataStore {
/**
* Append new pending L1 to L2 messages to the store.
* @param messages - The L1 to L2 messages to be added to the store.
* @param l1BlockNumber - The block number of the L1 block that added the messages.
* @returns True if the operation is successful.
*/
addPendingL1ToL2Messages(messages: PendingL1ToL2Message[]): Promise<boolean>;
addPendingL1ToL2Messages(messages: L1ToL2Message[], l1BlockNumber: bigint): Promise<boolean>;

/**
* Remove pending L1 to L2 messages from the store (if they were cancelled).
* @param message - The message keys to be removed from the store.
* @param l1BlockNumber - The block number of the L1 block that cancelled the messages.
* @returns True if the operation is successful.
*/
cancelPendingL1ToL2Messages(message: CancelledL1ToL2Message[]): Promise<boolean>;
cancelPendingL1ToL2Messages(message: Fr[], l1BlockNumber: bigint): Promise<boolean>;

/**
* Messages that have been published in an L2 block are confirmed.
Expand Down Expand Up @@ -152,7 +164,7 @@ export interface ArchiverDataStore {
getBlockNumber(): Promise<number>;

/**
* Gets the number of the latest L1 block processed.
* Gets the last L1 block number processed by the archiver
*/
getL1BlockNumber(): Promise<bigint>;
getL1BlockNumber(): Promise<ArchiverL1SynchPoint>;
}
Loading

0 comments on commit 4a16cf7

Please sign in to comment.