Skip to content

Commit

Permalink
refactor: reduce redundant event fetching (#8628)
Browse files Browse the repository at this point in the history
Fixes #8565. 

- Adds a `status` function to the rollup contract
- Archiver will update `lastSynchedL1Block` for blocks and skip looking
for events if the rollup contract state tells it that there will be no
events.

I was running into some oddities with the timing of updates to the
`lastSynchedL1Block` for the blockstore. Namely, seems like it was only
updated if events were found, leading to a large number of "re-fetch" as
we keep looking over the same ranges until an event swing by.
  • Loading branch information
LHerskind authored Sep 19, 2024
1 parent 5aca3f3 commit 6903291
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 89 deletions.
49 changes: 35 additions & 14 deletions l1-contracts/src/core/Rollup.sol
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,27 @@ contract Rollup is Leonidas, IRollup, ITestRollup {
setupEpoch();
}

function status(uint256 myHeaderBlockNumber)
external
view
override(IRollup)
returns (
uint256 provenBlockNumber,
bytes32 provenArchive,
uint256 pendingBlockNumber,
bytes32 pendingArchive,
bytes32 archiveOfMyBlock
)
{
return (
tips.provenBlockNumber,
blocks[tips.provenBlockNumber].archive,
tips.pendingBlockNumber,
blocks[tips.pendingBlockNumber].archive,
archiveAt(myHeaderBlockNumber)
);
}

/**
* @notice Prune the pending chain up to the last proven block
*
Expand Down Expand Up @@ -382,20 +403,6 @@ contract Rollup is Leonidas, IRollup, ITestRollup {
emit L2ProofVerified(header.globalVariables.blockNumber, _proverId);
}

/**
* @notice Get the archive root of a specific block
*
* @param _blockNumber - The block number to get the archive root of
*
* @return bytes32 - The archive root of the block
*/
function archiveAt(uint256 _blockNumber) external view override(IRollup) returns (bytes32) {
if (_blockNumber <= tips.pendingBlockNumber) {
return blocks[_blockNumber].archive;
}
return bytes32(0);
}

/**
* @notice Check if msg.sender can propose at a given time
*
Expand Down Expand Up @@ -482,6 +489,20 @@ contract Rollup is Leonidas, IRollup, ITestRollup {
return tips.pendingBlockNumber;
}

/**
* @notice Get the archive root of a specific block
*
* @param _blockNumber - The block number to get the archive root of
*
* @return bytes32 - The archive root of the block
*/
function archiveAt(uint256 _blockNumber) public view override(IRollup) returns (bytes32) {
if (_blockNumber <= tips.pendingBlockNumber) {
return blocks[_blockNumber].archive;
}
return bytes32(0);
}

/**
* @notice Validates the header for submission
*
Expand Down
11 changes: 11 additions & 0 deletions l1-contracts/src/core/interfaces/IRollup.sol
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ interface IRollup {

function L1_BLOCK_AT_GENESIS() external view returns (uint256);

function status(uint256 myHeaderBlockNumber)
external
view
returns (
uint256 provenBlockCount,
bytes32 provenArchive,
uint256 pendingBlockCount,
bytes32 pendingArchive,
bytes32 archiveOfMyBlock
);

// TODO(#7346): Integrate batch rollups
// function submitRootProof(
// bytes32 _previousArchive,
Expand Down
61 changes: 43 additions & 18 deletions yarn-project/archiver/src/archiver/archiver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
LogType,
UnencryptedL2BlockL2Logs,
} from '@aztec/circuit-types';
import { GENESIS_ARCHIVE_ROOT } from '@aztec/circuits.js';
import { EthAddress } from '@aztec/foundation/eth-address';
import { Fr } from '@aztec/foundation/fields';
import { sleep } from '@aztec/foundation/sleep';
Expand All @@ -30,6 +31,7 @@ import { MemoryArchiverStore } from './memory_archiver_store/memory_archiver_sto
interface MockRollupContractRead {
archiveAt: (args: readonly [bigint]) => Promise<`0x${string}`>;
getProvenBlockNumber: () => Promise<bigint>;
status: (args: readonly [bigint]) => Promise<[bigint, `0x${string}`, bigint, `0x${string}`, `0x${string}`]>;
}

describe('Archiver', () => {
Expand All @@ -47,6 +49,8 @@ describe('Archiver', () => {
let archiver: Archiver;
let blocks: L2Block[];

const GENESIS_ROOT = new Fr(GENESIS_ARCHIVE_ROOT).toString();

beforeEach(() => {
now = +new Date();
publicClient = mock<PublicClient<HttpTransport, Chain>>({
Expand Down Expand Up @@ -90,6 +94,10 @@ describe('Archiver', () => {

publicClient.getBlockNumber.mockResolvedValueOnce(2500n).mockResolvedValueOnce(2600n).mockResolvedValueOnce(2700n);

rollupRead.status
.mockResolvedValueOnce([0n, GENESIS_ROOT, 1n, blocks[0].archive.root.toString(), GENESIS_ROOT])
.mockResolvedValue([0n, GENESIS_ROOT, 3n, blocks[2].archive.root.toString(), blocks[0].archive.root.toString()]);

mockGetLogs({
messageSent: [makeMessageSentEvent(98n, 1n, 0n), makeMessageSentEvent(99n, 1n, 1n)],
L2BlockProposed: [makeL2BlockProposedEvent(101n, 1n, blocks[0].archive.root.toString())],
Expand Down Expand Up @@ -180,7 +188,9 @@ describe('Archiver', () => {
expect((await archiver.getBlocks(1, 100, true)).map(b => b.number)).toEqual([1]);
}, 10_000);

it('does not sync past current block number', async () => {
it('ignores block 3 because it have been pruned (simulate pruning)', async () => {
const loggerSpy = jest.spyOn((archiver as any).log, 'warn');

let latestBlockNum = await archiver.getBlockNumber();
expect(latestBlockNum).toEqual(0);

Expand All @@ -191,17 +201,20 @@ describe('Archiver', () => {
// Here we set the current L1 block number to 102. L1 to L2 messages after this should not be read.
publicClient.getBlockNumber.mockResolvedValue(102n);

const badArchive = Fr.random().toString();

rollupRead.status.mockResolvedValue([0n, GENESIS_ROOT, 2n, blocks[1].archive.root.toString(), GENESIS_ROOT]);

mockGetLogs({
messageSent: [makeMessageSentEvent(66n, 1n, 0n), makeMessageSentEvent(68n, 1n, 1n)],
L2BlockProposed: [
makeL2BlockProposedEvent(70n, 1n, blocks[0].archive.root.toString()),
makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString()),
makeL2BlockProposedEvent(90n, 3n, badArchive),
],
});

mockGetLogs({});

rollupTxs.slice(0, numL2BlocksInTest).forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx));
rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx));

await archiver.start(false);

Expand All @@ -211,10 +224,12 @@ describe('Archiver', () => {

latestBlockNum = await archiver.getBlockNumber();
expect(latestBlockNum).toEqual(numL2BlocksInTest);
const errorMessage = `Archive mismatch matching, ignoring block ${3} with archive: ${badArchive}, expected ${blocks[2].archive.root.toString()}`;
expect(loggerSpy).toHaveBeenCalledWith(errorMessage);
}, 10_000);

it('ignores block 3 because it have been pruned (simulate pruning)', async () => {
const loggerSpy = jest.spyOn((archiver as any).log, 'warn');
it('skip event search if not blocks found', async () => {
const loggerSpy = jest.spyOn((archiver as any).log, 'verbose');

let latestBlockNum = await archiver.getBlockNumber();
expect(latestBlockNum).toEqual(0);
Expand All @@ -223,22 +238,24 @@ describe('Archiver', () => {

const rollupTxs = blocks.map(makeRollupTx);

// Here we set the current L1 block number to 102. L1 to L2 messages after this should not be read.
publicClient.getBlockNumber.mockResolvedValue(102n);

const badArchive = Fr.random().toString();
publicClient.getBlockNumber.mockResolvedValueOnce(50n).mockResolvedValueOnce(100n);
rollupRead.status
.mockResolvedValueOnce([0n, GENESIS_ROOT, 0n, GENESIS_ROOT, GENESIS_ROOT])
.mockResolvedValueOnce([0n, GENESIS_ROOT, 2n, blocks[1].archive.root.toString(), GENESIS_ROOT]);

// This can look slightly odd, but we will need to do an empty request for the messages, and will entirely skip
// a call to the proposed blocks because of changes with status.
mockGetLogs({
messageSent: [],
});
mockGetLogs({
messageSent: [makeMessageSentEvent(66n, 1n, 0n), makeMessageSentEvent(68n, 1n, 1n)],
L2BlockProposed: [
makeL2BlockProposedEvent(70n, 1n, blocks[0].archive.root.toString()),
makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString()),
makeL2BlockProposedEvent(90n, 3n, badArchive),
],
});

mockGetLogs({});

rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx));

await archiver.start(false);
Expand All @@ -249,18 +266,26 @@ describe('Archiver', () => {

latestBlockNum = await archiver.getBlockNumber();
expect(latestBlockNum).toEqual(numL2BlocksInTest);
const errorMessage = `Archive mismatch matching, ignoring block ${3} with archive: ${badArchive}, expected ${blocks[2].archive.root.toString()}`;
expect(loggerSpy).toHaveBeenCalledWith(errorMessage);

// For some reason, this is 1-indexed.
expect(loggerSpy).toHaveBeenNthCalledWith(
1,
`Retrieved no new L1 -> L2 messages between L1 blocks ${1n} and ${50}.`,
);
expect(loggerSpy).toHaveBeenNthCalledWith(2, `No blocks to retrieve from ${1n} to ${50n}`);
}, 10_000);

// logs should be created in order of how archiver syncs.
const mockGetLogs = (logs: {
messageSent?: ReturnType<typeof makeMessageSentEvent>[];
L2BlockProposed?: ReturnType<typeof makeL2BlockProposedEvent>[];
}) => {
publicClient.getLogs
.mockResolvedValueOnce(logs.messageSent ?? [])
.mockResolvedValueOnce(logs.L2BlockProposed ?? []);
if (logs.messageSent) {
publicClient.getLogs.mockResolvedValueOnce(logs.messageSent);
}
if (logs.L2BlockProposed) {
publicClient.getLogs.mockResolvedValueOnce(logs.L2BlockProposed);
}
};
});

Expand Down
119 changes: 73 additions & 46 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,15 +251,21 @@ export class Archiver implements ArchiveSource {
currentL1BlockNumber,
);

if (retrievedL1ToL2Messages.retrievedData.length !== 0) {
await this.store.addL1ToL2Messages(retrievedL1ToL2Messages);

if (retrievedL1ToL2Messages.retrievedData.length === 0) {
await this.store.setMessageSynchedL1BlockNumber(currentL1BlockNumber);
this.log.verbose(
`Retrieved ${retrievedL1ToL2Messages.retrievedData.length} new L1 -> L2 messages between L1 blocks ${
messagesSynchedTo + 1n
} and ${currentL1BlockNumber}.`,
`Retrieved no new L1 -> L2 messages between L1 blocks ${messagesSynchedTo + 1n} and ${currentL1BlockNumber}.`,
);
return;
}

await this.store.addL1ToL2Messages(retrievedL1ToL2Messages);

this.log.verbose(
`Retrieved ${retrievedL1ToL2Messages.retrievedData.length} new L1 -> L2 messages between L1 blocks ${
messagesSynchedTo + 1n
} and ${currentL1BlockNumber}.`,
);
}

private async updateLastProvenL2Block(provenSynchedTo: bigint, currentL1BlockNumber: bigint) {
Expand All @@ -281,6 +287,29 @@ export class Archiver implements ArchiveSource {
return;
}

const lastBlock = await this.getBlock(-1);

const [, , pendingBlockNumber, pendingArchive, archiveOfMyBlock] = await this.rollup.read.status([
BigInt(lastBlock?.number ?? 0),
]);

const noBlocksButInitial = lastBlock === undefined && pendingBlockNumber == 0n;
const noBlockSinceLast =
lastBlock &&
pendingBlockNumber === BigInt(lastBlock.number) &&
pendingArchive === lastBlock.archive.root.toString();

if (noBlocksButInitial || noBlockSinceLast) {
await this.store.setBlockSynchedL1BlockNumber(currentL1BlockNumber);
this.log.verbose(`No blocks to retrieve from ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`);
return;
}

if (lastBlock && archiveOfMyBlock !== lastBlock.archive.root.toString()) {
// @todo Either `prune` have been called, or L1 have re-orged deep enough to remove a block.
// Issue#8620 and Issue#8621
}

this.log.debug(`Retrieving blocks from ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`);
const retrievedBlocks = await retrieveBlockFromRollup(
this.rollup,
Expand All @@ -291,59 +320,57 @@ export class Archiver implements ArchiveSource {
this.log,
);

(retrievedBlocks.length ? this.log.verbose : this.log.debug)(
`Retrieved ${retrievedBlocks.length || 'no'} new L2 blocks between L1 blocks ${
if (retrievedBlocks.length === 0) {
await this.store.setBlockSynchedL1BlockNumber(currentL1BlockNumber);
this.log.verbose(`Retrieved no new blocks from ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`);
return;
}

this.log.debug(
`Retrieved ${retrievedBlocks.length} new L2 blocks between L1 blocks ${
blocksSynchedTo + 1n
} and ${currentL1BlockNumber}.`,
);

const lastProcessedL1BlockNumber =
retrievedBlocks.length > 0 ? retrievedBlocks[retrievedBlocks.length - 1].l1.blockNumber : blocksSynchedTo;
const lastProcessedL1BlockNumber = retrievedBlocks[retrievedBlocks.length - 1].l1.blockNumber;

this.log.debug(
`Processing retrieved blocks ${retrievedBlocks
.map(b => b.data.number)
.join(',')} with last processed L1 block ${lastProcessedL1BlockNumber}`,
);

// If we actually received something, we will use it.
if (retrievedBlocks.length > 0) {
await Promise.all(
retrievedBlocks.map(block => {
return this.store.addLogs(
block.data.body.noteEncryptedLogs,
block.data.body.encryptedLogs,
block.data.body.unencryptedLogs,
block.data.number,
);
}),
);

// Unroll all logs emitted during the retrieved blocks and extract any contract classes and instances from them
await Promise.all(
retrievedBlocks.map(async block => {
const blockLogs = block.data.body.txEffects
.flatMap(txEffect => (txEffect ? [txEffect.unencryptedLogs] : []))
.flatMap(txLog => txLog.unrollLogs());
await this.storeRegisteredContractClasses(blockLogs, block.data.number);
await this.storeDeployedContractInstances(blockLogs, block.data.number);
await this.storeBroadcastedIndividualFunctions(blockLogs, block.data.number);
}),
);
await Promise.all(
retrievedBlocks.map(block => {
return this.store.addLogs(
block.data.body.noteEncryptedLogs,
block.data.body.encryptedLogs,
block.data.body.unencryptedLogs,
block.data.number,
);
}),
);

const timer = new Timer();
await this.store.addBlocks(retrievedBlocks);
this.instrumentation.processNewBlocks(
timer.ms() / retrievedBlocks.length,
retrievedBlocks.map(b => b.data),
);
const lastL2BlockNumber = retrievedBlocks[retrievedBlocks.length - 1].data.number;
this.log.verbose(`Processed ${retrievedBlocks.length} new L2 blocks up to ${lastL2BlockNumber}`);
}
// Unroll all logs emitted during the retrieved blocks and extract any contract classes and instances from them
await Promise.all(
retrievedBlocks.map(async block => {
const blockLogs = block.data.body.txEffects
.flatMap(txEffect => (txEffect ? [txEffect.unencryptedLogs] : []))
.flatMap(txLog => txLog.unrollLogs());
await this.storeRegisteredContractClasses(blockLogs, block.data.number);
await this.storeDeployedContractInstances(blockLogs, block.data.number);
await this.storeBroadcastedIndividualFunctions(blockLogs, block.data.number);
}),
);

if (retrievedBlocks.length > 0 || blockUntilSynced) {
(blockUntilSynced ? this.log.info : this.log.verbose)(`Synced to L1 block ${currentL1BlockNumber}`);
}
const timer = new Timer();
await this.store.addBlocks(retrievedBlocks);
this.instrumentation.processNewBlocks(
timer.ms() / retrievedBlocks.length,
retrievedBlocks.map(b => b.data),
);
const lastL2BlockNumber = retrievedBlocks[retrievedBlocks.length - 1].data.number;
this.log.verbose(`Processed ${retrievedBlocks.length} new L2 blocks up to ${lastL2BlockNumber}`);
}

/**
Expand Down
Loading

0 comments on commit 6903291

Please sign in to comment.