Skip to content

Commit

Permalink
feat: Configure world-state to follow the proven chain only
Browse files Browse the repository at this point in the history
Related to #7346
  • Loading branch information
spalladino committed Jul 10, 2024
1 parent 2645eab commit b0be9f5
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 44 deletions.
76 changes: 58 additions & 18 deletions yarn-project/archiver/src/archiver/archiver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ describe('Archiver', () => {
const registryAddress = EthAddress.ZERO;
const availabilityOracleAddress = EthAddress.ZERO;
const blockNumbers = [1, 2, 3];

let publicClient: MockProxy<PublicClient<HttpTransport, Chain>>;
let archiverStore: ArchiverDataStore;

Expand Down Expand Up @@ -61,23 +62,28 @@ describe('Archiver', () => {
const rollupTxs = blocks.map(makeRollupTx);

publicClient.getBlockNumber.mockResolvedValueOnce(2500n).mockResolvedValueOnce(2600n).mockResolvedValueOnce(2700n);
// logs should be created in order of how archiver syncs.
publicClient.getLogs
.mockResolvedValueOnce([makeMessageSentEvent(98n, 1n, 0n), makeMessageSentEvent(99n, 1n, 1n)])
.mockResolvedValueOnce([makeTxsPublishedEvent(101n, blocks[0].body.getTxsEffectsHash())])
.mockResolvedValueOnce([makeL2BlockProcessedEvent(101n, 1n)])
.mockResolvedValueOnce([

mockGetLogs({
messageSent: [makeMessageSentEvent(98n, 1n, 0n), makeMessageSentEvent(99n, 1n, 1n)],
txPublished: [makeTxsPublishedEvent(101n, blocks[0].body.getTxsEffectsHash())],
l2BlockProcessed: [makeL2BlockProcessedEvent(101n, 1n)],
proofVerified: [makeProofVerifiedEvent(102n, 1n)],
});

mockGetLogs({
messageSent: [
makeMessageSentEvent(2504n, 2n, 0n),
makeMessageSentEvent(2505n, 2n, 1n),
makeMessageSentEvent(2505n, 2n, 2n),
makeMessageSentEvent(2506n, 3n, 1n),
])
.mockResolvedValueOnce([
],
txPublished: [
makeTxsPublishedEvent(2510n, blocks[1].body.getTxsEffectsHash()),
makeTxsPublishedEvent(2520n, blocks[2].body.getTxsEffectsHash()),
])
.mockResolvedValueOnce([makeL2BlockProcessedEvent(2510n, 2n), makeL2BlockProcessedEvent(2520n, 3n)])
.mockResolvedValue([]);
],
l2BlockProcessed: [makeL2BlockProcessedEvent(2510n, 2n), makeL2BlockProcessedEvent(2520n, 3n)],
});

publicClient.getTransaction.mockResolvedValueOnce(publishTxs[0]);
publicClient.getTransaction.mockResolvedValueOnce(rollupTxs[0]);

Expand Down Expand Up @@ -141,6 +147,14 @@ describe('Archiver', () => {
expect(totalNumUnencryptedLogs).toEqual(expectedTotalNumUnencryptedLogs);
});

// Check last proven block number
const provenBlockNumber = await archiver.getProvenBlockNumber();
expect(provenBlockNumber).toEqual(1);

// Check getting only proven blocks
expect((await archiver.getBlocks(1, 100)).map(b => b.number)).toEqual([1, 2, 3]);
expect((await archiver.getBlocks(1, 100, true)).map(b => b.number)).toEqual([1]);

await archiver.stop();
}, 10_000);

Expand All @@ -167,15 +181,18 @@ describe('Archiver', () => {

// Here we set the current L1 block number to 102. L1 to L2 messages after this should not be read.
publicClient.getBlockNumber.mockResolvedValue(102n);
// add all of the L1 to L2 messages to the mock
publicClient.getLogs
.mockResolvedValueOnce([makeMessageSentEvent(66n, 1n, 0n), makeMessageSentEvent(68n, 1n, 1n)])
.mockResolvedValueOnce([

mockGetLogs({
messageSent: [makeMessageSentEvent(66n, 1n, 0n), makeMessageSentEvent(68n, 1n, 1n)],
txPublished: [
makeTxsPublishedEvent(70n, blocks[0].body.getTxsEffectsHash()),
makeTxsPublishedEvent(80n, blocks[1].body.getTxsEffectsHash()),
])
.mockResolvedValueOnce([makeL2BlockProcessedEvent(70n, 1n), makeL2BlockProcessedEvent(80n, 2n)])
.mockResolvedValue([]);
],
l2BlockProcessed: [makeL2BlockProcessedEvent(70n, 1n), makeL2BlockProcessedEvent(80n, 2n)],
});

mockGetLogs({});

publishTxs.slice(0, numL2BlocksInTest).forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx));
rollupTxs.slice(0, numL2BlocksInTest).forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx));

Expand All @@ -191,6 +208,20 @@ describe('Archiver', () => {

await archiver.stop();
}, 10_000);

// logs should be created in order of how archiver syncs.
const mockGetLogs = (logs: {
messageSent?: ReturnType<typeof makeMessageSentEvent>[];
txPublished?: ReturnType<typeof makeTxsPublishedEvent>[];
l2BlockProcessed?: ReturnType<typeof makeL2BlockProcessedEvent>[];
proofVerified?: ReturnType<typeof makeProofVerifiedEvent>[];
}) => {
publicClient.getLogs
.mockResolvedValueOnce(logs.messageSent ?? [])
.mockResolvedValueOnce(logs.txPublished ?? [])
.mockResolvedValueOnce(logs.l2BlockProcessed ?? [])
.mockResolvedValueOnce(logs.proofVerified ?? []);
};
});

/**
Expand Down Expand Up @@ -240,6 +271,15 @@ function makeMessageSentEvent(l1BlockNum: bigint, l2BlockNumber: bigint, index:
} as Log<bigint, number, false, undefined, true, typeof InboxAbi, 'MessageSent'>;
}

function makeProofVerifiedEvent(l1BlockNum: bigint, l2BlockNumber: bigint) {
return {
blockNumber: l1BlockNum,
args: {
blockNumber: l2BlockNumber,
},
} as Log<bigint, number, false, undefined, true, typeof RollupAbi, 'L2ProofVerified'>;
}

/**
* Makes a fake rollup tx for testing purposes.
* @param block - The L2Block.
Expand Down
37 changes: 34 additions & 3 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { type EthAddress } from '@aztec/foundation/eth-address';
import { Fr } from '@aztec/foundation/fields';
import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log';
import { RunningPromise } from '@aztec/foundation/running-promise';
import { RollupAbi } from '@aztec/l1-artifacts';
import { ClassRegistererAddress } from '@aztec/protocol-contracts/class-registerer';
import { type TelemetryClient } from '@aztec/telemetry-client';
import {
Expand All @@ -40,7 +41,7 @@ import {
} from '@aztec/types/contracts';

import groupBy from 'lodash.groupby';
import { type Chain, type HttpTransport, type PublicClient, createPublicClient, http } from 'viem';
import { type Chain, type HttpTransport, type PublicClient, createPublicClient, getAbiItem, http } from 'viem';

import { type ArchiverDataStore } from './archiver_store.js';
import { type ArchiverConfig } from './config.js';
Expand Down Expand Up @@ -297,6 +298,28 @@ export class Archiver implements ArchiveSource {

await this.store.addBlocks(retrievedBlocks);
this.instrumentation.processNewBlocks(retrievedBlocks.retrievedData);

// Fetch the logs for proven blocks in the block range and update the last proven block number.
// Note it's ok to read repeated data here, since we're just using the largest number we see on the logs.
await this.updateLastProvenL2Block(l1SynchPoint.blocksSynchedTo, currentL1BlockNumber);
}

private async updateLastProvenL2Block(fromBlock: bigint, toBlock: bigint) {
const logs = await this.publicClient.getLogs({
address: this.rollupAddress.toString(),
fromBlock,
toBlock,
strict: true,
event: getAbiItem({ abi: RollupAbi, name: 'L2ProofVerified' }),
});

const lastLog = logs[logs.length - 1];
if (!lastLog) {
return;
}

const provenBlockNumber = lastLog.args.blockNumber;
await this.store.setProvenL2BlockNumber(Number(provenBlockNumber));
}

/**
Expand Down Expand Up @@ -390,10 +413,14 @@ export class Archiver implements ArchiveSource {
* Gets up to `limit` amount of L2 blocks starting from `from`.
* @param from - Number of the first block to return (inclusive).
* @param limit - The number of blocks to return.
* @param proven - If true, only return blocks that have been proven.
* @returns The requested L2 blocks.
*/
public getBlocks(from: number, limit: number): Promise<L2Block[]> {
return this.store.getBlocks(from, limit);
public async getBlocks(from: number, limit: number, proven?: boolean): Promise<L2Block[]> {
const limitWithProven = proven
? Math.min(limit, Math.max((await this.store.getProvenL2BlockNumber()) - from + 1, 0))
: limit;
return limitWithProven === 0 ? [] : this.store.getBlocks(from, limitWithProven);
}

/**
Expand Down Expand Up @@ -471,6 +498,10 @@ export class Archiver implements ArchiveSource {
return this.store.getSynchedL2BlockNumber();
}

public getProvenBlockNumber(): Promise<number> {
return this.store.getProvenL2BlockNumber();
}

public getContractClass(id: Fr): Promise<ContractClassPublic | undefined> {
return this.store.getContractClass(id);
}
Expand Down
12 changes: 12 additions & 0 deletions yarn-project/archiver/src/archiver/archiver_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,18 @@ export interface ArchiverDataStore {
*/
getSynchedL2BlockNumber(): Promise<number>;

/**
* Gets the number of the latest proven L2 block processed.
* @returns The number of the latest proven L2 block processed.
*/
getProvenL2BlockNumber(): Promise<number>;

/**
* Stores the number of the latest proven L2 block processed.
* @param l2BlockNumber - The number of the latest proven L2 block processed.
*/
setProvenL2BlockNumber(l2BlockNumber: number): Promise<void>;

/**
* Gets the synch point of the archiver
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ export class BlockStore {
/** Stores L1 block number in which the last processed L2 block was included */
#lastSynchedL1Block: AztecSingleton<bigint>;

/** Stores last proven L2 block number */
#lastProvenL2Block: AztecSingleton<number>;

/** Index mapping transaction hash (as a string) to its location in a block */
#txIndex: AztecMap<string, BlockIndexValue>;

Expand All @@ -39,6 +42,7 @@ export class BlockStore {
this.#txIndex = db.openMap('archiver_tx_index');
this.#contractIndex = db.openMap('archiver_contract_index');
this.#lastSynchedL1Block = db.openSingleton('archiver_last_synched_l1_block');
this.#lastProvenL2Block = db.openSingleton('archiver_last_proven_l2_block');
}

/**
Expand Down Expand Up @@ -181,6 +185,14 @@ export class BlockStore {
return this.#lastSynchedL1Block.get() ?? 0n;
}

getProvenL2BlockNumber(): number {
return this.#lastProvenL2Block.get() ?? 0;
}

async setProvenL2BlockNumber(blockNumber: number) {
await this.#lastProvenL2Block.set(blockNumber);
}

#computeBlockRange(start: number, limit: number): Required<Pick<Range<number>, 'start' | 'end'>> {
if (limit < 1) {
throw new Error(`Invalid limit: ${limit}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ export class KVArchiverDataStore implements ArchiverDataStore {
return Promise.resolve(this.#blockStore.getSynchedL2BlockNumber());
}

getProvenL2BlockNumber(): Promise<number> {
return Promise.resolve(this.#blockStore.getProvenL2BlockNumber());
}

async setProvenL2BlockNumber(blockNumber: number) {
await this.#blockStore.setProvenL2BlockNumber(blockNumber);
}

/**
* Gets the last L1 block number processed by the archiver
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export class MemoryArchiverStore implements ArchiverDataStore {

private lastL1BlockNewBlocks: bigint = 0n;
private lastL1BlockNewMessages: bigint = 0n;
private lastProvenL2BlockNumber: number = 0;

constructor(
/** The max number of logs that can be obtained in 1 "getUnencryptedLogs" call. */
Expand Down Expand Up @@ -433,6 +434,15 @@ export class MemoryArchiverStore implements ArchiverDataStore {
return Promise.resolve(this.l2Blocks[this.l2Blocks.length - 1].number);
}

public getProvenL2BlockNumber(): Promise<number> {
return Promise.resolve(this.lastProvenL2BlockNumber);
}

public setProvenL2BlockNumber(l2BlockNumber: number): Promise<void> {
this.lastProvenL2BlockNumber = l2BlockNumber;
return Promise.resolve();
}

public getSynchPoint(): Promise<ArchiverL1SynchPoint> {
return Promise.resolve({
blocksSynchedTo: this.lastL1BlockNewBlocks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,23 @@ export class L2BlockDownloader {
private running = false;
private from = 0;
private interruptibleSleep = new InterruptibleSleep();
private semaphore: Semaphore;
private jobQueue = new SerialQueue();
private blockQueue = new MemoryFifo<L2Block[]>();
private readonly semaphore: Semaphore;
private readonly jobQueue = new SerialQueue();
private readonly blockQueue = new MemoryFifo<L2Block[]>();
private readonly proven: boolean;
private readonly pollIntervalMS: number;

constructor(private l2BlockSource: L2BlockSource, maxQueueSize: number, private pollIntervalMS = 10000) {
this.semaphore = new Semaphore(maxQueueSize);
constructor(
private l2BlockSource: L2BlockSource,
opts: {
maxQueueSize: number;
proven?: boolean;
pollIntervalMS?: number;
},
) {
this.pollIntervalMS = opts.pollIntervalMS ?? 1000;
this.proven = opts.proven ?? false;
this.semaphore = new Semaphore(opts.maxQueueSize);
}

/**
Expand Down Expand Up @@ -62,7 +73,7 @@ export class L2BlockDownloader {
private async collectBlocks() {
let totalBlocks = 0;
while (true) {
const blocks = await this.l2BlockSource.getBlocks(this.from, 10);
const blocks = await this.l2BlockSource.getBlocks(this.from, 10, this.proven);
if (!blocks.length) {
return totalBlocks;
}
Expand Down
9 changes: 8 additions & 1 deletion yarn-project/circuit-types/src/l2_block_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ export interface L2BlockSource {
*/
getBlockNumber(): Promise<number>;

/**
* Gets the number of the latest L2 block proven seen by the block source implementation.
* @returns The number of the latest L2 block proven seen by the block source implementation.
*/
getProvenBlockNumber(): Promise<number>;

/**
* Gets an l2 block. If a negative number is passed, the block returned is the most recent.
* @param number - The block number to return (inclusive).
Expand All @@ -38,9 +44,10 @@ export interface L2BlockSource {
* Gets up to `limit` amount of L2 blocks starting from `from`.
* @param from - Number of the first block to return (inclusive).
* @param limit - The maximum number of blocks to return.
* @param proven - If true, only return blocks that have been proven.
* @returns The requested L2 blocks.
*/
getBlocks(from: number, limit: number): Promise<L2Block[]>;
getBlocks(from: number, limit: number, proven?: boolean): Promise<L2Block[]>;

/**
* Gets a tx effect.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ describe('L1Publisher integration', () => {
const worldStateConfig: WorldStateConfig = {
worldStateBlockCheckIntervalMS: 10000,
l2QueueSize: 10,
worldStateProvenBlocksOnly: false,
};
const worldStateSynchronizer = new ServerWorldStateSynchronizer(tmpStore, builderDb, blockSource, worldStateConfig);
await worldStateSynchronizer.start();
Expand Down
4 changes: 4 additions & 0 deletions yarn-project/p2p/src/client/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ export class MockBlockSource implements L2BlockSource {
return Promise.resolve(this.l2Blocks.length - 1);
}

public getProvenBlockNumber(): Promise<number> {
return this.getBlockNumber();
}

/**
* Gets an l2 block.
* @param number - The block number to return (inclusive).
Expand Down
5 changes: 4 additions & 1 deletion yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ export class P2PClient implements P2P {
private log = createDebugLogger('aztec:p2p'),
) {
const { p2pBlockCheckIntervalMS: checkInterval, p2pL2QueueSize } = getP2PConfigEnvVars();
this.blockDownloader = new L2BlockDownloader(l2BlockSource, p2pL2QueueSize, checkInterval);
this.blockDownloader = new L2BlockDownloader(l2BlockSource, {
maxQueueSize: p2pL2QueueSize,
pollIntervalMS: checkInterval,
});
this.synchedBlockNumber = store.openSingleton('p2p_pool_last_l2_block');
}

Expand Down
Loading

0 comments on commit b0be9f5

Please sign in to comment.