Skip to content

Commit

Permalink
chore: Batch archiver requests
Browse files Browse the repository at this point in the history
Instead of downloading every block and keeping it in memory, download in
batches of (configurable) 100 blocks and store them progressively.
  • Loading branch information
spalladino committed Dec 6, 2024
1 parent 4fcbc59 commit da5bed8
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 185 deletions.
4 changes: 1 addition & 3 deletions yarn-project/archiver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
"formatting": "run -T prettier --check ./src && run -T eslint ./src",
"formatting:fix": "run -T eslint --fix ./src && run -T prettier -w ./src",
"test": "NODE_NO_WARNINGS=1 node --experimental-vm-modules ../node_modules/.bin/jest --passWithNoTests",
"start": "node ./dest",
"start:dev": "tsc-watch -p tsconfig.json --onSuccess 'yarn start'",
"test:integration": "concurrently -k -s first -c reset,dim -n test,anvil \"yarn test:integration:run\" \"anvil\"",
"test:integration:run": "NODE_NO_WARNINGS=1 node --experimental-vm-modules $(yarn bin jest) --no-cache --config jest.integration.config.json"
},
Expand Down Expand Up @@ -107,4 +105,4 @@
"engines": {
"node": ">=18"
}
}
}
14 changes: 10 additions & 4 deletions yarn-project/archiver/src/archiver/archiver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,17 @@ describe('Archiver', () => {

archiver = new Archiver(
publicClient,
rollupAddress,
inboxAddress,
registryAddress,
{ rollupAddress, inboxAddress, registryAddress },
archiverStore,
1000,
{ pollingIntervalMs: 1000, batchSize: 1000 },
instrumentation,
{
l1GenesisTime: BigInt(Math.ceil(Date.now() / 1000)),
l1StartBlock: 1n,
epochDuration: 4,
slotDuration: 24,
ethereumSlotDuration: 12,
},
);

blocks = blockNumbers.map(x => L2Block.random(x, txsPerBlock, x + 1, 2));
Expand Down Expand Up @@ -376,6 +381,7 @@ describe('Archiver', () => {
xit('handles an upcoming L2 prune', () => {});

// logs should be created in order of how archiver syncs.
// TODO(palla/log): This is brittle. We should instead fake the getLogs impl so it returns the logs we have predefined.
const mockGetLogs = (logs: {
messageSent?: ReturnType<typeof makeMessageSentEventWithIndexInL2BlockSubtree>[];
L2BlockProposed?: ReturnType<typeof makeL2BlockProposedEvent>[];
Expand Down
189 changes: 95 additions & 94 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import { Fr } from '@aztec/foundation/fields';
import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log';
import { RunningPromise } from '@aztec/foundation/running-promise';
import { count } from '@aztec/foundation/string';
import { Timer } from '@aztec/foundation/timer';
import { elapsed } from '@aztec/foundation/timer';
import { InboxAbi, RollupAbi } from '@aztec/l1-artifacts';
import {
ContractClassRegisteredEvent,
Expand All @@ -61,7 +61,7 @@ import {

import { type ArchiverDataStore, type ArchiverL1SynchPoint } from './archiver_store.js';
import { type ArchiverConfig } from './config.js';
import { retrieveBlockFromRollup, retrieveL1ToL2Messages } from './data_retrieval.js';
import { retrieveBlocksFromRollup, retrieveL1ToL2Messages } from './data_retrieval.js';
import {
getEpochNumberAtTimestamp,
getSlotAtTimestamp,
Expand Down Expand Up @@ -112,25 +112,23 @@ export class Archiver implements ArchiveSource {
*/
constructor(
private readonly publicClient: PublicClient<HttpTransport, Chain>,
private readonly rollupAddress: EthAddress,
readonly inboxAddress: EthAddress,
private readonly registryAddress: EthAddress,
private readonly l1Addresses: { rollupAddress: EthAddress; inboxAddress: EthAddress; registryAddress: EthAddress },
readonly dataStore: ArchiverDataStore,
private readonly pollingIntervalMs: number,
private readonly config: { pollingIntervalMs: number; batchSize: number },
private readonly instrumentation: ArchiverInstrumentation,
private readonly l1constants: L1RollupConstants = EmptyL1RollupConstants,
private readonly l1constants: L1RollupConstants,
private readonly log: DebugLogger = createDebugLogger('aztec:archiver'),
) {
this.store = new ArchiverStoreHelper(dataStore);

this.rollup = getContract({
address: rollupAddress.toString(),
address: l1Addresses.rollupAddress.toString(),
abi: RollupAbi,
client: publicClient,
});

this.inbox = getContract({
address: inboxAddress.toString(),
address: l1Addresses.inboxAddress.toString(),
abi: InboxAbi,
client: publicClient,
});
Expand Down Expand Up @@ -171,11 +169,12 @@ export class Archiver implements ArchiveSource {

const archiver = new Archiver(
publicClient,
config.l1Contracts.rollupAddress,
config.l1Contracts.inboxAddress,
config.l1Contracts.registryAddress,
config.l1Contracts,
archiverStore,
config.archiverPollingIntervalMS ?? 10_000,
{
pollingIntervalMs: config.archiverPollingIntervalMS ?? 10_000,
batchSize: config.archiverBatchSize ?? 100,
},
new ArchiverInstrumentation(telemetry, () => archiverStore.estimateSize()),
{ l1StartBlock, l1GenesisTime, epochDuration, slotDuration, ethereumSlotDuration },
);
Expand All @@ -196,7 +195,7 @@ export class Archiver implements ArchiveSource {
await this.sync(blockUntilSynced);
}

this.runningPromise = new RunningPromise(() => this.safeSync(), this.pollingIntervalMs);
this.runningPromise = new RunningPromise(() => this.safeSync(), this.config.pollingIntervalMs);
this.runningPromise.start();
}

Expand All @@ -213,9 +212,8 @@ export class Archiver implements ArchiveSource {

/**
* Fetches logs from L1 contracts and processes them.
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
*/
private async sync(blockUntilSynced: boolean) {
private async sync(initialRun: boolean) {
/**
* We keep track of three "pointers" to L1 blocks:
* 1. the last L1 block that published an L2 block
Expand All @@ -232,9 +230,9 @@ export class Archiver implements ArchiveSource {
const { blocksSynchedTo = l1StartBlock, messagesSynchedTo = l1StartBlock } = await this.store.getSynchPoint();
const currentL1BlockNumber = await this.publicClient.getBlockNumber();

if (blockUntilSynced) {
if (initialRun) {
this.log.info(
`Starting archiver sync to rollup contract ${this.rollupAddress.toString()} from L1 block ${Math.min(
`Starting archiver sync to rollup contract ${this.l1Addresses.rollupAddress.toString()} from L1 block ${Math.min(
Number(blocksSynchedTo),
Number(messagesSynchedTo),
)} to current L1 block ${currentL1BlockNumber}`,
Expand All @@ -261,7 +259,7 @@ export class Archiver implements ArchiveSource {
*/

// ********** Events that are processed per L1 block **********
await this.handleL1ToL2Messages(blockUntilSynced, messagesSynchedTo, currentL1BlockNumber);
await this.handleL1ToL2Messages(messagesSynchedTo, currentL1BlockNumber);

// Store latest l1 block number and timestamp seen. Used for epoch and slots calculations.
if (!this.l1BlockNumber || this.l1BlockNumber < currentL1BlockNumber) {
Expand All @@ -272,7 +270,7 @@ export class Archiver implements ArchiveSource {
// ********** Events that are processed per L2 block **********
if (currentL1BlockNumber > blocksSynchedTo) {
// First we retrieve new L2 blocks
const { provenBlockNumber } = await this.handleL2blocks(blockUntilSynced, blocksSynchedTo, currentL1BlockNumber);
const { provenBlockNumber } = await this.handleL2blocks(blocksSynchedTo, currentL1BlockNumber);
// And then we prune the current epoch if it'd reorg on next submission.
// Note that we don't do this before retrieving L2 blocks because we may need to retrieve
// blocks from more than 2 epochs ago, so we want to make sure we have the latest view of
Expand All @@ -281,7 +279,7 @@ export class Archiver implements ArchiveSource {
await this.handleEpochPrune(provenBlockNumber, currentL1BlockNumber);
}

if (blockUntilSynced) {
if (initialRun) {
this.log.info(`Initial archiver sync to L1 block ${currentL1BlockNumber} complete.`);
}
}
Expand Down Expand Up @@ -311,11 +309,18 @@ export class Archiver implements ArchiveSource {
}
}

private async handleL1ToL2Messages(
blockUntilSynced: boolean,
messagesSynchedTo: bigint,
currentL1BlockNumber: bigint,
) {
private nextRange(end: bigint, limit: bigint): [bigint, bigint] {
const batchSize = (this.config.batchSize * this.l1constants.slotDuration) / this.l1constants.ethereumSlotDuration;
const nextStart = end + 1n;
const nextEnd = nextStart + BigInt(batchSize);
if (nextEnd > limit) {
return [nextStart, limit];
}
return [nextStart, nextEnd];
}

private async handleL1ToL2Messages(messagesSynchedTo: bigint, currentL1BlockNumber: bigint) {
this.log.trace(`Handling L1 to L2 messages from ${messagesSynchedTo} to ${currentL1BlockNumber}.`);
if (currentL1BlockNumber <= messagesSynchedTo) {
return;
}
Expand All @@ -325,30 +330,30 @@ export class Archiver implements ArchiveSource {

if (localTotalMessageCount === destinationTotalMessageCount) {
await this.store.setMessageSynchedL1BlockNumber(currentL1BlockNumber);
this.log.debug(
this.log.trace(
`Retrieved no new L1 to L2 messages between L1 blocks ${messagesSynchedTo + 1n} and ${currentL1BlockNumber}.`,
);
return;
}

const retrievedL1ToL2Messages = await retrieveL1ToL2Messages(
this.inbox,
blockUntilSynced,
messagesSynchedTo + 1n,
currentL1BlockNumber,
);

await this.store.addL1ToL2Messages(retrievedL1ToL2Messages);

this.log.verbose(
`Retrieved ${retrievedL1ToL2Messages.retrievedData.length} new L1 to L2 messages between L1 blocks ${
messagesSynchedTo + 1n
} and ${currentL1BlockNumber}.`,
);
// Retrieve messages in batches. Each batch is estimated to acommodate up to L2 'blockBatchSize' blocks,
let searchStartBlock: bigint = messagesSynchedTo;
let searchEndBlock: bigint = messagesSynchedTo;
do {
[searchStartBlock, searchEndBlock] = this.nextRange(searchEndBlock, currentL1BlockNumber);
this.log.trace(`Retrieving L1 to L2 messages between L1 blocks ${searchStartBlock} and ${searchEndBlock}.`);
const retrievedL1ToL2Messages = await retrieveL1ToL2Messages(this.inbox, searchStartBlock, searchEndBlock);
this.log.verbose(
`Retrieved ${retrievedL1ToL2Messages.retrievedData.length} new L1 to L2 messages between L1 blocks ${searchStartBlock} and ${searchEndBlock}.`,
);
await this.store.addL1ToL2Messages(retrievedL1ToL2Messages);
for (const msg of retrievedL1ToL2Messages.retrievedData) {
this.log.debug(`Downloaded L1 to L2 message`, { index: msg.index, leaf: msg.leaf.toString() });
}
} while (searchEndBlock < currentL1BlockNumber);
}

private async handleL2blocks(
blockUntilSynced: boolean,
blocksSynchedTo: bigint,
currentL1BlockNumber: bigint,
): Promise<{ provenBlockNumber: bigint }> {
Expand Down Expand Up @@ -436,56 +441,60 @@ export class Archiver implements ArchiveSource {
}
}

// TODO(palla/log) Downgrade to trace
this.log.debug(`Retrieving L2 blocks from L1 block ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`);
const retrievedBlocks = await retrieveBlockFromRollup(
this.rollup,
this.publicClient,
blockUntilSynced,
blocksSynchedTo + 1n, // TODO(palla/reorg): If the L2 reorg was due to an L1 reorg, we need to start search earlier
currentL1BlockNumber,
this.log,
);
// Retrieve L2 blocks in batches. Each batch is estimated to acommodate up to L2 'blockBatchSize' blocks,
// computed using the L2 block time vs the L1 block time.
let searchStartBlock: bigint = blocksSynchedTo;
let searchEndBlock: bigint = blocksSynchedTo;

do {
[searchStartBlock, searchEndBlock] = this.nextRange(searchEndBlock, currentL1BlockNumber);

this.log.trace(`Retrieving L2 blocks from L1 block ${searchStartBlock} to ${searchEndBlock}`);
const retrievedBlocks = await retrieveBlocksFromRollup(
this.rollup,
this.publicClient,
searchStartBlock, // TODO(palla/reorg): If the L2 reorg was due to an L1 reorg, we need to start search earlier
searchEndBlock,
this.log,
);

if (retrievedBlocks.length === 0) {
// We are not calling `setBlockSynchedL1BlockNumber` because it may cause sync issues if based off infura.
// See further details in earlier comments.
// TODO(palla/log) Downgrade to trace
this.log.debug(`Retrieved no new L2 blocks from L1 block ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`);
return { provenBlockNumber };
}
if (retrievedBlocks.length === 0) {
// We are not calling `setBlockSynchedL1BlockNumber` because it may cause sync issues if based off infura.
// See further details in earlier comments.
this.log.trace(`Retrieved no new L2 blocks from L1 block ${searchStartBlock} to ${searchEndBlock}`);
continue;
}

const lastProcessedL1BlockNumber = retrievedBlocks[retrievedBlocks.length - 1].l1.blockNumber;
this.log.debug(
`Retrieved ${retrievedBlocks.length} new L2 blocks between L1 blocks ${
blocksSynchedTo + 1n
} and ${currentL1BlockNumber} with last processed L1 block ${lastProcessedL1BlockNumber}.`,
);
const lastProcessedL1BlockNumber = retrievedBlocks[retrievedBlocks.length - 1].l1.blockNumber;
this.log.debug(
`Retrieved ${retrievedBlocks.length} new L2 blocks between L1 blocks ${searchStartBlock} and ${searchEndBlock} with last processed L1 block ${lastProcessedL1BlockNumber}.`,
);

for (const block of retrievedBlocks) {
this.log.debug(`Ingesting new L2 block ${block.data.number}`, {
...block.data.header.globalVariables.toInspect(),
blockHash: block.data.hash,
l1BlockNumber: block.l1.blockNumber,
});
}
for (const block of retrievedBlocks) {
this.log.debug(`Ingesting new L2 block ${block.data.number}`, {
...block.data.header.globalVariables.toInspect(),
blockHash: block.data.hash,
l1BlockNumber: block.l1.blockNumber,
});
}

const timer = new Timer();
await this.store.addBlocks(retrievedBlocks);
const [processDuration] = await elapsed(() => this.store.addBlocks(retrievedBlocks));
await this.store.addBlocks(retrievedBlocks);
this.instrumentation.processNewBlocks(
processDuration / retrievedBlocks.length,
retrievedBlocks.map(b => b.data),
);

for (const block of retrievedBlocks) {
this.log.info(`Downloaded L2 block ${block.data.number}`, {
blockHash: block.data.hash(),
blockNumber: block.data.number,
});
}
for (const block of retrievedBlocks) {
this.log.info(`Downloaded L2 block ${block.data.number}`, {
blockHash: block.data.hash(),
blockNumber: block.data.number,
});
}
} while (searchEndBlock < currentL1BlockNumber);

// Important that we update AFTER inserting the blocks.
await updateProvenBlock();
this.instrumentation.processNewBlocks(
timer.ms() / retrievedBlocks.length,
retrievedBlocks.map(b => b.data),
);

return { provenBlockNumber };
}
Expand All @@ -503,11 +512,11 @@ export class Archiver implements ArchiveSource {
}

public getRollupAddress(): Promise<EthAddress> {
return Promise.resolve(this.rollupAddress);
return Promise.resolve(this.l1Addresses.rollupAddress);
}

public getRegistryAddress(): Promise<EthAddress> {
return Promise.resolve(this.registryAddress);
return Promise.resolve(this.l1Addresses.registryAddress);
}

public getL1BlockNumber(): bigint {
Expand Down Expand Up @@ -1096,11 +1105,3 @@ type L1RollupConstants = {
epochDuration: number;
ethereumSlotDuration: number;
};

const EmptyL1RollupConstants: L1RollupConstants = {
l1StartBlock: 0n,
l1GenesisTime: 0n,
epochDuration: 0,
slotDuration: 0,
ethereumSlotDuration: 0,
};
Loading

0 comments on commit da5bed8

Please sign in to comment.