Skip to content

Commit

Permalink
feat: persistent archiver store (#3410)
Browse files Browse the repository at this point in the history
This PR adds a new implementation for an archiver store that's backed by
LMDB. See #3361 for details.

Fix #3361
  • Loading branch information
alexghr authored and Maddiaa0 committed Nov 28, 2023
1 parent 5bd2fb9 commit 9b7f47c
Show file tree
Hide file tree
Showing 35 changed files with 2,433 additions and 867 deletions.
7 changes: 3 additions & 4 deletions cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@
"auditability",
"hardfork",
"composablity",
"counterparty"
"counterparty",
"lmdb"
],
"ignorePaths": [
"node_modules/",
Expand All @@ -253,7 +254,5 @@
"lib",
"*.cmake"
],
"flagWords": [
"anonymous"
]
"flagWords": ["anonymous"]
}
1 change: 1 addition & 0 deletions yarn-project/archiver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"@aztec/types": "workspace:^",
"@types/lodash.omit": "^4.5.7",
"debug": "^4.3.4",
"lmdb": "^2.9.1",
"lodash.omit": "^4.5.0",
"tsc-watch": "^6.0.0",
"tslib": "^2.5.0",
Expand Down
6 changes: 2 additions & 4 deletions yarn-project/archiver/src/archiver/archiver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import times from 'lodash.times';
import { Chain, HttpTransport, Log, PublicClient, Transaction, encodeFunctionData, toHex } from 'viem';

import { Archiver } from './archiver.js';
import { ArchiverDataStore, MemoryArchiverStore } from './archiver_store.js';
import { ArchiverDataStore } from './archiver_store.js';
import { MemoryArchiverStore } from './memory_archiver_store/memory_archiver_store.js';

describe('Archiver', () => {
const rollupAddress = EthAddress.ZERO.toString();
Expand All @@ -34,7 +35,6 @@ describe('Archiver', () => {
EthAddress.fromString(inboxAddress),
EthAddress.fromString(registryAddress),
EthAddress.fromString(contractDeploymentEmitterAddress),
0,
archiverStore,
1000,
);
Expand Down Expand Up @@ -138,7 +138,6 @@ describe('Archiver', () => {
EthAddress.fromString(inboxAddress),
EthAddress.fromString(registryAddress),
EthAddress.fromString(contractDeploymentEmitterAddress),
0,
archiverStore,
1000,
);
Expand Down Expand Up @@ -216,7 +215,6 @@ describe('Archiver', () => {
EthAddress.fromString(inboxAddress),
EthAddress.fromString(registryAddress),
EthAddress.fromString(contractDeploymentEmitterAddress),
0,
archiverStore,
1000,
);
Expand Down
93 changes: 40 additions & 53 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { EthAddress } from '@aztec/foundation/eth-address';
import { Fr } from '@aztec/foundation/fields';
import { DebugLogger, createDebugLogger } from '@aztec/foundation/log';
import { RunningPromise } from '@aztec/foundation/running-promise';
import { RegistryAbi } from '@aztec/l1-artifacts';
import {
ContractData,
ContractDataSource,
Expand All @@ -26,9 +25,9 @@ import {
} from '@aztec/types';

import omit from 'lodash.omit';
import { Chain, HttpTransport, PublicClient, createPublicClient, getContract, http } from 'viem';
import { Chain, HttpTransport, PublicClient, createPublicClient, http } from 'viem';

import { ArchiverDataStore, MemoryArchiverStore } from './archiver_store.js';
import { ArchiverDataStore } from './archiver_store.js';
import { ArchiverConfig } from './config.js';
import {
retrieveBlocks,
Expand All @@ -53,11 +52,6 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
*/
private nextL2BlockFromL1Block = 0n;

/**
* Last Processed Block Number
*/
private lastProcessedL1BlockNumber = 0n;

/**
* Use this to track logged block in order to avoid repeating the same message.
*/
Expand All @@ -81,46 +75,36 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
private readonly inboxAddress: EthAddress,
private readonly registryAddress: EthAddress,
private readonly contractDeploymentEmitterAddress: EthAddress,
searchStartBlock: number,
private readonly store: ArchiverDataStore,
private readonly pollingIntervalMs = 10_000,
private readonly log: DebugLogger = createDebugLogger('aztec:archiver'),
) {
this.nextL2BlockFromL1Block = BigInt(searchStartBlock);
this.lastProcessedL1BlockNumber = BigInt(searchStartBlock);
}
) {}

/**
* Creates a new instance of the Archiver and blocks until it syncs from chain.
* @param config - The archiver's desired configuration.
* @param archiverStore - The backing store for the archiver.
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
* @returns - An instance of the archiver.
*/
public static async createAndSync(config: ArchiverConfig, blockUntilSynced = true): Promise<Archiver> {
public static async createAndSync(
config: ArchiverConfig,
archiverStore: ArchiverDataStore,
blockUntilSynced = true,
): Promise<Archiver> {
const chain = createEthereumChain(config.rpcUrl, config.apiKey);
const publicClient = createPublicClient({
chain: chain.chainInfo,
transport: http(chain.rpcUrl),
pollingInterval: config.viemPollingIntervalMS,
});

// ask the registry for the block number when the rollup was deployed
// this is the block from which archiver has to search from
const registryContract = getContract({
address: config.l1Contracts.registryAddress.toString(),
abi: RegistryAbi,
publicClient,
});
const searchStartBlock = Number((await registryContract.read.getCurrentSnapshot()).blockNumber);

const archiverStore = new MemoryArchiverStore(config.maxLogs ?? 1000);
const archiver = new Archiver(
publicClient,
config.l1Contracts.rollupAddress,
config.l1Contracts.inboxAddress,
config.l1Contracts.registryAddress,
config.l1Contracts.contractDeploymentEmitterAddress,
searchStartBlock,
archiverStore,
config.archiverPollingIntervalMS,
);
Expand Down Expand Up @@ -152,7 +136,12 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
*/
private async sync(blockUntilSynced: boolean) {
const currentL1BlockNumber = await this.publicClient.getBlockNumber();
if (currentL1BlockNumber <= this.lastProcessedL1BlockNumber) {
// this makes the archiver more resilient to eventually-consistent eth providers like Infura
// it _will_ process the same L1 blocks over and over again until the L2 chain advances
// one thing to handle now is that we will process the same L1 to L2 messages over and over again
// so the store needs to account for that.
const lastProcessedL1BlockNumber = await this.store.getL1BlockNumber();
if (currentL1BlockNumber <= lastProcessedL1BlockNumber) {
// reducing logs, otherwise this gets triggered on every loop (1s)
if (currentL1BlockNumber !== this.lastLoggedL1BlockNumber) {
this.log(`No new blocks to process, current block number: ${currentL1BlockNumber}`);
Expand All @@ -169,14 +158,14 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
* to ensure that data is read exactly once.
*
* The first is the problem of eventually consistent ETH service providers like Infura.
* We are not currently handling this correctly in the case of L1 to L2 messages and we will
* want to re-visit L2 Block and contract data retrieval at a later stage. This is not
* currently a problem but will need to be addressed before a mainnet release.
* We currently read from the last L1 block that we saw emit an L2 block. This could mean
* that the archiver ends up looking at the same L1 block multiple times (e.g. if we last saw
* an L2 block emitted at L1 block 10, we'd constantly ask for L1 blocks from 11 onwards until
* we see another L2 block). For this to work message and block processing need to be idempotent.
* We should re-visit this before mainnet launch.
*
* The second is that in between the various calls to L1, the block number can move meaning some
* of the following calls will return data for blocks that were not present during earlier calls.
* This is a problem for example when setting the last block number marker for L1 to L2 messages -
* this.lastProcessedBlockNumber = currentBlockNumber;
* It's possible that we actually received messages in block currentBlockNumber + 1 meaning the next time
* we do this sync we get the same message again. Additionally, the call to get cancelled L1 to L2 messages
* could read from a block not present when retrieving pending messages. If a message was added and cancelled
Expand All @@ -195,14 +184,14 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
this.publicClient,
this.inboxAddress,
blockUntilSynced,
this.lastProcessedL1BlockNumber + 1n, // + 1 to prevent re-including messages from the last processed block
lastProcessedL1BlockNumber + 1n, // + 1 to prevent re-including messages from the last processed block
currentL1BlockNumber,
);
const retrievedCancelledL1ToL2Messages = await retrieveNewCancelledL1ToL2Messages(
this.publicClient,
this.inboxAddress,
blockUntilSynced,
this.lastProcessedL1BlockNumber + 1n,
lastProcessedL1BlockNumber + 1n,
currentL1BlockNumber,
);

Expand All @@ -215,8 +204,6 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
this.log('Removing pending l1 to l2 messages from store where messages were cancelled');
await this.store.cancelPendingL1ToL2Messages(retrievedCancelledL1ToL2Messages.retrievedData);

this.lastProcessedL1BlockNumber = currentL1BlockNumber;

// ********** Events that are processed per block **********

// Read all data from chain and then write to our stores at the end
Expand Down Expand Up @@ -252,26 +239,22 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource

this.log(`Retrieved ${retrievedBlocks.retrievedData.length} block(s) from chain`);

// store encrypted logs from L2 Blocks that we have retrieved
const encryptedLogs = retrievedBlocks.retrievedData.map(block => {
return block.newEncryptedLogs!;
});
await this.store.addLogs(encryptedLogs, LogType.ENCRYPTED);

// store unencrypted logs from L2 Blocks that we have retrieved
const unencryptedLogs = retrievedBlocks.retrievedData.map(block => {
return block.newUnencryptedLogs!;
});
await this.store.addLogs(unencryptedLogs, LogType.UNENCRYPTED);
await Promise.all(
retrievedBlocks.retrievedData.map(block =>
this.store.addLogs(block.newEncryptedLogs, block.newUnencryptedLogs, block.number),
),
);

// store contracts for which we have retrieved L2 blocks
const lastKnownL2BlockNum = retrievedBlocks.retrievedData[retrievedBlocks.retrievedData.length - 1].number;
retrievedContracts.retrievedData.forEach(async ([contracts, l2BlockNum], index) => {
this.log(`Retrieved extended contract data for l2 block number: ${index}`);
if (l2BlockNum <= lastKnownL2BlockNum) {
await this.store.addExtendedContractData(contracts, l2BlockNum);
}
});
await Promise.all(
retrievedContracts.retrievedData.map(async ([contracts, l2BlockNum]) => {
this.log(`Retrieved extended contract data for l2 block number: ${l2BlockNum}`);
if (l2BlockNum <= lastKnownL2BlockNum) {
await this.store.addExtendedContractData(contracts, l2BlockNum);
}
}),
);

// from retrieved L2Blocks, confirm L1 to L2 messages that have been published
// from each l2block fetch all messageKeys in a flattened array:
Expand All @@ -285,7 +268,11 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
retrievedBlocks.retrievedData.map(block => {
// Ensure we pad the L1 to L2 message array to the full size before storing.
block.newL1ToL2Messages = padArrayEnd(block.newL1ToL2Messages, Fr.ZERO, NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP);
return L2Block.fromFields(omit(block, ['newEncryptedLogs', 'newUnencryptedLogs']), block.getBlockHash());
return L2Block.fromFields(
omit(block, ['newEncryptedLogs', 'newUnencryptedLogs']),
block.getBlockHash(),
block.getL1BlockNumber(),
);
}),
);

Expand Down
Loading

0 comments on commit 9b7f47c

Please sign in to comment.