Skip to content

Commit

Permalink
chore: archiver cleanup (#8599)
Browse files Browse the repository at this point in the history
Cleanup in the `archiver`.

- Deletes the `block_body_store` as blocks and bodies are published at
the same time we don't need the separation.
- Stop synching proven events, we don't need that to figure out the
current proven block number.
- Split the sync into sub functions to easier encapsulate logic.
  • Loading branch information
LHerskind authored Sep 18, 2024
1 parent 1785737 commit 184cc88
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 354 deletions.
30 changes: 8 additions & 22 deletions yarn-project/archiver/src/archiver/archiver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { MemoryArchiverStore } from './memory_archiver_store/memory_archiver_sto

interface MockRollupContractRead {
archiveAt: (args: readonly [bigint]) => Promise<`0x${string}`>;
getProvenBlockNumber: () => Promise<bigint>;
}

describe('Archiver', () => {
Expand All @@ -40,9 +41,9 @@ describe('Archiver', () => {
let publicClient: MockProxy<PublicClient<HttpTransport, Chain>>;
let instrumentation: MockProxy<ArchiverInstrumentation>;
let archiverStore: ArchiverDataStore;
let proverId: Fr;
let now: number;

let rollupRead: MockProxy<MockRollupContractRead>;
let archiver: Archiver;
let blocks: L2Block[];

Expand All @@ -56,7 +57,6 @@ describe('Archiver', () => {

instrumentation = mock({ isEnabled: () => true });
archiverStore = new MemoryArchiverStore(1000);
proverId = Fr.random();

archiver = new Archiver(
publicClient,
Expand All @@ -70,9 +70,11 @@ describe('Archiver', () => {

blocks = blockNumbers.map(x => L2Block.random(x, 4, x, x + 1, 2, 2));

((archiver as any).rollup as any).read = mock<MockRollupContractRead>({
rollupRead = mock<MockRollupContractRead>({
archiveAt: (args: readonly [bigint]) => Promise.resolve(blocks[Number(args[0] - 1n)].archive.root.toString()),
});

((archiver as any).rollup as any).read = rollupRead;
});

afterEach(async () => {
Expand All @@ -91,9 +93,10 @@ describe('Archiver', () => {
mockGetLogs({
messageSent: [makeMessageSentEvent(98n, 1n, 0n), makeMessageSentEvent(99n, 1n, 1n)],
L2BlockProposed: [makeL2BlockProposedEvent(101n, 1n, blocks[0].archive.root.toString())],
proofVerified: [makeProofVerifiedEvent(102n, 1n, proverId)],
});

rollupRead.getProvenBlockNumber.mockResolvedValueOnce(1n);

mockGetLogs({
messageSent: [
makeMessageSentEvent(2504n, 2n, 0n),
Expand Down Expand Up @@ -175,11 +178,6 @@ describe('Archiver', () => {
// Check getting only proven blocks
expect((await archiver.getBlocks(1, 100)).map(b => b.number)).toEqual([1, 2, 3]);
expect((await archiver.getBlocks(1, 100, true)).map(b => b.number)).toEqual([1]);

// Check instrumentation of proven blocks
expect(instrumentation.processProofsVerified).toHaveBeenCalledWith([
{ delay: 1000n, l1BlockNumber: 102n, l2BlockNumber: 1n, proverId: proverId.toString() },
]);
}, 10_000);

it('does not sync past current block number', async () => {
Expand Down Expand Up @@ -259,12 +257,10 @@ describe('Archiver', () => {
const mockGetLogs = (logs: {
messageSent?: ReturnType<typeof makeMessageSentEvent>[];
L2BlockProposed?: ReturnType<typeof makeL2BlockProposedEvent>[];
proofVerified?: ReturnType<typeof makeProofVerifiedEvent>[];
}) => {
publicClient.getLogs
.mockResolvedValueOnce(logs.messageSent ?? [])
.mockResolvedValueOnce(logs.L2BlockProposed ?? [])
.mockResolvedValueOnce(logs.proofVerified ?? []);
.mockResolvedValueOnce(logs.L2BlockProposed ?? []);
};
});

Expand Down Expand Up @@ -300,16 +296,6 @@ function makeMessageSentEvent(l1BlockNum: bigint, l2BlockNumber: bigint, index:
} as Log<bigint, number, false, undefined, true, typeof InboxAbi, 'MessageSent'>;
}

function makeProofVerifiedEvent(l1BlockNum: bigint, l2BlockNumber: bigint, proverId: Fr) {
return {
blockNumber: l1BlockNum,
args: {
blockNumber: l2BlockNumber,
proverId: proverId.toString(),
},
} as Log<bigint, number, false, undefined, true, typeof RollupAbi, 'L2ProofVerified'>;
}

/**
* Makes a fake rollup tx for testing purposes.
* @param block - The L2Block.
Expand Down
163 changes: 44 additions & 119 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import {
import { createEthereumChain } from '@aztec/ethereum';
import { type ContractArtifact } from '@aztec/foundation/abi';
import { type AztecAddress } from '@aztec/foundation/aztec-address';
import { compactArray, unique } from '@aztec/foundation/collection';
import { type EthAddress } from '@aztec/foundation/eth-address';
import { Fr } from '@aztec/foundation/fields';
import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log';
Expand Down Expand Up @@ -55,12 +54,7 @@ import {

import { type ArchiverDataStore } from './archiver_store.js';
import { type ArchiverConfig } from './config.js';
import {
getL1BlockTime,
retrieveBlockFromRollup,
retrieveL1ToL2Messages,
retrieveL2ProofVerifiedEvents,
} from './data_retrieval.js';
import { retrieveBlockFromRollup, retrieveL1ToL2Messages } from './data_retrieval.js';
import { ArchiverInstrumentation } from './instrumentation.js';
import { type SingletonDataRetrieval } from './structs/data_retrieval.js';

Expand Down Expand Up @@ -206,31 +200,12 @@ export class Archiver implements ArchiveSource {
* This code does not handle reorgs.
*/
const {
blockBodiesSynchedTo = this.l1StartBlock,
blocksSynchedTo = this.l1StartBlock,
messagesSynchedTo = this.l1StartBlock,
provenLogsSynchedTo = this.l1StartBlock,
} = await this.store.getSynchPoint();
const currentL1BlockNumber = await this.publicClient.getBlockNumber();

if (
currentL1BlockNumber <= blocksSynchedTo &&
currentL1BlockNumber <= messagesSynchedTo &&
currentL1BlockNumber <= blockBodiesSynchedTo &&
currentL1BlockNumber <= provenLogsSynchedTo
) {
// chain hasn't moved forward
// or it's been rolled back
this.log.debug(`Nothing to sync`, {
currentL1BlockNumber,
blocksSynchedTo,
messagesSynchedTo,
provenLogsSynchedTo,
blockBodiesSynchedTo,
});
return;
}

// ********** Ensuring Consistency of data pulled from L1 **********

/**
Expand All @@ -250,9 +225,24 @@ export class Archiver implements ArchiveSource {
* in future but for the time being it should give us the guarantees that we need
*/

await this.updateLastProvenL2Block(provenLogsSynchedTo, currentL1BlockNumber);

// ********** Events that are processed per L1 block **********

await this.handleL1ToL2Messages(blockUntilSynced, messagesSynchedTo, currentL1BlockNumber);

// ********** Events that are processed per L2 block **********
await this.handleL2blocks(blockUntilSynced, blocksSynchedTo, currentL1BlockNumber);
}

private async handleL1ToL2Messages(
blockUntilSynced: boolean,
messagesSynchedTo: bigint,
currentL1BlockNumber: bigint,
) {
if (currentL1BlockNumber <= messagesSynchedTo) {
return;
}

const retrievedL1ToL2Messages = await retrieveL1ToL2Messages(
this.inbox,
Expand All @@ -262,14 +252,34 @@ export class Archiver implements ArchiveSource {
);

if (retrievedL1ToL2Messages.retrievedData.length !== 0) {
await this.store.addL1ToL2Messages(retrievedL1ToL2Messages);

this.log.verbose(
`Retrieved ${retrievedL1ToL2Messages.retrievedData.length} new L1 -> L2 messages between L1 blocks ${
messagesSynchedTo + 1n
} and ${currentL1BlockNumber}.`,
);
}
}

await this.store.addL1ToL2Messages(retrievedL1ToL2Messages);
private async updateLastProvenL2Block(provenSynchedTo: bigint, currentL1BlockNumber: bigint) {
if (currentL1BlockNumber <= provenSynchedTo) {
return;
}

const provenBlockNumber = await this.rollup.read.getProvenBlockNumber();
if (provenBlockNumber) {
await this.store.setProvenL2BlockNumber({
retrievedData: Number(provenBlockNumber),
lastProcessedL1BlockNumber: currentL1BlockNumber,
});
}
}

private async handleL2blocks(blockUntilSynced: boolean, blocksSynchedTo: bigint, currentL1BlockNumber: bigint) {
if (currentL1BlockNumber <= blocksSynchedTo) {
return;
}

this.log.debug(`Retrieving blocks from ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`);
const retrievedBlocks = await retrieveBlockFromRollup(
Expand All @@ -281,8 +291,6 @@ export class Archiver implements ArchiveSource {
this.log,
);

// Add the body

(retrievedBlocks.length ? this.log.verbose : this.log.debug)(
`Retrieved ${retrievedBlocks.length || 'no'} new L2 blocks between L1 blocks ${
blocksSynchedTo + 1n
Expand All @@ -298,13 +306,16 @@ export class Archiver implements ArchiveSource {
.join(',')} with last processed L1 block ${lastProcessedL1BlockNumber}`,
);

// If we actually received something, we will use it.
if (retrievedBlocks.length > 0) {
await Promise.all(
retrievedBlocks.map(block => {
const noteEncryptedLogs = block.data.body.noteEncryptedLogs;
const encryptedLogs = block.data.body.encryptedLogs;
const unencryptedLogs = block.data.body.unencryptedLogs;
return this.store.addLogs(noteEncryptedLogs, encryptedLogs, unencryptedLogs, block.data.number);
return this.store.addLogs(
block.data.body.noteEncryptedLogs,
block.data.body.encryptedLogs,
block.data.body.unencryptedLogs,
block.data.number,
);
}),
);

Expand All @@ -321,10 +332,6 @@ export class Archiver implements ArchiveSource {
);

const timer = new Timer();
await this.store.addBlockBodies({
lastProcessedL1BlockNumber: lastProcessedL1BlockNumber,
retrievedData: retrievedBlocks.map(b => b.data.body),
});
await this.store.addBlocks(retrievedBlocks);
this.instrumentation.processNewBlocks(
timer.ms() / retrievedBlocks.length,
Expand All @@ -334,93 +341,11 @@ export class Archiver implements ArchiveSource {
this.log.verbose(`Processed ${retrievedBlocks.length} new L2 blocks up to ${lastL2BlockNumber}`);
}

// Fetch the logs for proven blocks in the block range and update the last proven block number.
if (currentL1BlockNumber > provenLogsSynchedTo) {
await this.updateLastProvenL2Block(provenLogsSynchedTo + 1n, currentL1BlockNumber);
}

if (retrievedBlocks.length > 0 || blockUntilSynced) {
(blockUntilSynced ? this.log.info : this.log.verbose)(`Synced to L1 block ${currentL1BlockNumber}`);
}
}

private async updateLastProvenL2Block(fromBlock: bigint, toBlock: bigint) {
const logs = await retrieveL2ProofVerifiedEvents(this.publicClient, this.rollupAddress, fromBlock, toBlock);
const lastLog = logs[logs.length - 1];
if (!lastLog) {
return;
}

const provenBlockNumber = lastLog.l2BlockNumber;
if (!provenBlockNumber) {
throw new Error(`Missing argument blockNumber from L2ProofVerified event`);
}

await this.emitProofVerifiedMetrics(logs);

const currentProvenBlockNumber = await this.store.getProvenL2BlockNumber();
if (provenBlockNumber > currentProvenBlockNumber) {
// Update the last proven block number
this.log.verbose(`Updated last proven block number from ${currentProvenBlockNumber} to ${provenBlockNumber}`);
await this.store.setProvenL2BlockNumber({
retrievedData: Number(provenBlockNumber),
lastProcessedL1BlockNumber: lastLog.l1BlockNumber,
});
this.instrumentation.updateLastProvenBlock(Number(provenBlockNumber));
} else {
// We set the last processed L1 block number to the last L1 block number in the range to avoid duplicate processing
await this.store.setProvenL2BlockNumber({
retrievedData: Number(currentProvenBlockNumber),
lastProcessedL1BlockNumber: lastLog.l1BlockNumber,
});
}
}

/**
* Emits as metrics the block number proven, who proved it, and how much time passed since it was submitted.
* @param logs - The ProofVerified logs to emit metrics for, as collected from `retrieveL2ProofVerifiedEvents`.
**/
private async emitProofVerifiedMetrics(logs: { l1BlockNumber: bigint; l2BlockNumber: bigint; proverId: Fr }[]) {
if (!logs.length || !this.instrumentation.isEnabled()) {
return;
}

const l1BlockTimes = new Map(
await Promise.all(
unique(logs.map(log => log.l1BlockNumber)).map(
async blockNumber => [blockNumber, await getL1BlockTime(this.publicClient, blockNumber)] as const,
),
),
);

// Collect L2 block times for all the blocks verified, this is the time in which the block proven was
// originally submitted to L1, using the L1 timestamp of the transaction.
const getL2BlockTime = async (blockNumber: bigint) =>
(await this.store.getBlocks(Number(blockNumber), 1))[0]?.l1.timestamp;

const l2BlockTimes = new Map(
await Promise.all(
unique(logs.map(log => log.l2BlockNumber)).map(
async blockNumber => [blockNumber, await getL2BlockTime(blockNumber)] as const,
),
),
);

// Emit the prover id and the time difference between block submission and proof.
this.instrumentation.processProofsVerified(
compactArray(
logs.map(log => {
const l1BlockTime = l1BlockTimes.get(log.l1BlockNumber)!;
const l2BlockTime = l2BlockTimes.get(log.l2BlockNumber);
if (!l2BlockTime) {
return undefined;
}
return { ...log, delay: l1BlockTime - l2BlockTime, proverId: log.proverId.toString() };
}),
),
);
}

/**
* Extracts and stores contract classes out of ContractClassRegistered events emitted by the class registerer contract.
* @param allLogs - All logs emitted in a bunch of blocks.
Expand Down
18 changes: 0 additions & 18 deletions yarn-project/archiver/src/archiver/archiver_store.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {
type Body,
type EncryptedL2BlockL2Logs,
type EncryptedNoteL2BlockL2Logs,
type FromLogType,
Expand Down Expand Up @@ -33,8 +32,6 @@ import { type L1Published } from './structs/published.js';
export type ArchiverL1SynchPoint = {
/** Number of the last L1 block that added a new L2 block metadata. */
blocksSynchedTo?: bigint;
/** Number of the last L1 block that added a new L2 block body. */
blockBodiesSynchedTo?: bigint;
/** Number of the last L1 block that added L1 -> L2 messages from the Inbox. */
messagesSynchedTo?: bigint;
/** Number of the last L1 block that added a new proven block. */
Expand All @@ -53,21 +50,6 @@ export interface ArchiverDataStore {
*/
addBlocks(blocks: L1Published<L2Block>[]): Promise<boolean>;

/**
* Append new block bodies to the store's list.
* @param blockBodies - The L2 block bodies to be added to the store.
* @returns True if the operation is successful.
*/
addBlockBodies(blockBodies: DataRetrieval<Body>): Promise<boolean>;

/**
* Gets block bodies that have the same txsEffectsHashes as we supply.
*
* @param txsEffectsHashes - A list of txsEffectsHashes.
* @returns The requested L2 block bodies
*/
getBlockBodies(txsEffectsHashes: Buffer[]): Promise<(Body | undefined)[]>;

/**
* Gets up to `limit` amount of L2 blocks starting from `from`.
* @param from - Number of the first block to return (inclusive).
Expand Down
Loading

0 comments on commit 184cc88

Please sign in to comment.