Skip to content

Commit

Permalink
chore: try making the archiver less brittle
Browse files Browse the repository at this point in the history
  • Loading branch information
LHerskind committed Aug 9, 2024
1 parent c483039 commit 281f150
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 28 deletions.
29 changes: 24 additions & 5 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ export class Archiver implements ArchiveSource {
// the metadata
let retrievedBlocks: DataRetrieval<L2Block>;
{
// @todo @LHerskind Investigate how necessary that nextExpectedL2BlockNum really is.
// Also, I would expect it to break horribly if we have a reorg.
const retrievedBlockMetadata = await retrieveBlockMetadataFromRollup(
this.publicClient,
this.rollupAddress,
Expand All @@ -248,24 +250,41 @@ export class Archiver implements ArchiveSource {
([header]) => header.contentCommitment.txsEffectsHash,
);

// @note @LHerskind We will occasionally be hitting this point BEFORE, we have actually retrieved the bodies.
// The main reason this have not been an issue earlier is because:
// i) the design previously published the body in one tx and the header in another,
// which in an anvil auto mine world mean that they are separate blocks.
// ii) We have been lucky that latency have been small enough to not matter.
const blockBodiesFromStore = await this.store.getBlockBodies(retrievedBodyHashes);

if (retrievedBlockMetadata.retrievedData.length !== blockBodiesFromStore.length) {
throw new Error('Block headers length does not equal block bodies length');
this.log.warn('Block headers length does not equal block bodies length');
}

const blocks = retrievedBlockMetadata.retrievedData.map(
(blockMetadata, i) => new L2Block(blockMetadata[1], blockMetadata[0], blockBodiesFromStore[i]),
);
const blocks: L2Block[] = [];
for (let i = 0; i < retrievedBlockMetadata.retrievedData.length; i++) {
const [header, archive] = retrievedBlockMetadata.retrievedData[i];
const blockBody = blockBodiesFromStore[i];
if (blockBody) {
blocks.push(new L2Block(archive, header, blockBody));
} else {
this.log.warn(`Block body not found for block ${header.globalVariables.blockNumber.toBigInt()}.`);
}
}

(blocks.length ? this.log.verbose : this.log.debug)(
`Retrieved ${blocks.length || 'no'} new L2 blocks between L1 blocks ${
blocksSynchedTo + 1n
} and ${currentL1BlockNumber}.`,
);

// Set the `lastProcessedL1BlockNumber` to the smallest of the header and body retrieval
const min = (a: bigint, b: bigint) => (a < b ? a : b);
retrievedBlocks = {
lastProcessedL1BlockNumber: retrievedBlockMetadata.lastProcessedL1BlockNumber,
lastProcessedL1BlockNumber: min(
retrievedBlockMetadata.lastProcessedL1BlockNumber,
retrievedBlockBodies.lastProcessedL1BlockNumber,
),
retrievedData: blocks,
};
}
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/archiver/src/archiver/archiver_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export interface ArchiverDataStore {
* @param txsEffectsHashes - A list of txsEffectsHashes.
* @returns The requested L2 block bodies
*/
getBlockBodies(txsEffectsHashes: Buffer[]): Promise<Body[]>;
getBlockBodies(txsEffectsHashes: Buffer[]): Promise<(Body | undefined)[]>;

/**
* Gets up to `limit` amount of L2 blocks starting from `from`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,21 @@ export class BlockBodyStore {
* @param txsEffectsHashes - The txsEffectsHashes list that corresponds to the blockBodies we want to retrieve
* @returns The requested L2 block bodies
*/
async getBlockBodies(txsEffectsHashes: Buffer[]): Promise<Body[]> {
async getBlockBodies(txsEffectsHashes: Buffer[]): Promise<(Body | undefined)[]> {
const blockBodiesBuffer = await this.db.transaction(() =>
txsEffectsHashes.map(txsEffectsHash => this.#blockBodies.get(txsEffectsHash.toString('hex'))),
);

if (blockBodiesBuffer.some(bodyBuffer => bodyBuffer === undefined)) {
this.log.error(
'Block body buffer is undefined',
txsEffectsHashes.map(txsEffectsHash => txsEffectsHash.toString('hex')),
);
throw new Error('Block body buffer is undefined');
const blockBodies: (Body | undefined)[] = [];
for (let i = 0; i < blockBodiesBuffer.length; i++) {
const blockBodyBuffer = blockBodiesBuffer[i];
if (blockBodyBuffer === undefined) {
this.log.warn(`Block body buffer is undefined for txsEffectsHash: ${txsEffectsHashes[i].toString('hex')}`);
}
blockBodies.push(blockBodyBuffer ? Body.fromBuffer(blockBodyBuffer) : undefined);
}

return blockBodiesBuffer.map(blockBodyBuffer => Body.fromBuffer(blockBodyBuffer!));
return blockBodies;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ export class KVArchiverDataStore implements ArchiverDataStore {
* @param txsEffectsHashes - A list of txsEffectsHashes (body hashes).
* @returns The requested L2 block bodies
*/
getBlockBodies(txsEffectsHashes: Buffer[]): Promise<Body[]> {
getBlockBodies(txsEffectsHashes: Buffer[]): Promise<(Body | undefined)[]> {
return this.#blockBodyStore.getBlockBodies(txsEffectsHashes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,10 @@ export class MemoryArchiverStore implements ArchiverDataStore {
* @param txsEffectsHashes - A list of txsEffectsHashes (body hashes).
* @returns The requested L2 block bodies
*/
getBlockBodies(txsEffectsHashes: Buffer[]): Promise<Body[]> {
const blockBodies = txsEffectsHashes.map(txsEffectsHash => this.l2BlockBodies.get(txsEffectsHash.toString('hex')));

if (blockBodies.some(bodyBuffer => bodyBuffer === undefined)) {
throw new Error('Block body is undefined');
}

return Promise.resolve(blockBodies as Body[]);
getBlockBodies(txsEffectsHashes: Buffer[]): Promise<(Body | undefined)[]> {
return Promise.resolve(
txsEffectsHashes.map(txsEffectsHash => this.l2BlockBodies.get(txsEffectsHash.toString('hex'))),
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
type L2Block,
createDebugLogger,
mockTx,
sleep,
} from '@aztec/aztec.js';
// eslint-disable-next-line no-restricted-imports
import {
Expand Down Expand Up @@ -108,6 +109,7 @@ describe('L1Publisher integration', () => {
let feeRecipient: AztecAddress;

let ethCheatCodes: EthCheatCodes;
let worldStateSynchronizer: ServerWorldStateSynchronizer;

// To update the test data, run "export AZTEC_GENERATE_TEST_DATA=1" in shell and run the tests again
// If you have issues with RPC_URL, it is likely that you need to set the RPC_URL in the shell as well
Expand All @@ -118,10 +120,6 @@ describe('L1Publisher integration', () => {
const currentTime = (await publicClient.getBlock()).timestamp;
const currentSlot = await rollup.read.getCurrentSlot();
const timestamp = (await rollup.read.getTimestampForSlot([currentSlot + 1n])) - BigInt(ETHEREUM_SLOT_DURATION);

// @todo @LHerskind figure out why we have issues here if we do not entirely ENTER the next slot
// My guess would be that it is somewhere because of a bad calculation with global variables or such

if (timestamp > currentTime) {
await ethCheatCodes.warp(Number(timestamp));
}
Expand Down Expand Up @@ -167,7 +165,7 @@ describe('L1Publisher integration', () => {
l2QueueSize: 10,
worldStateProvenBlocksOnly: false,
};
const worldStateSynchronizer = new ServerWorldStateSynchronizer(tmpStore, builderDb, blockSource, worldStateConfig);
worldStateSynchronizer = new ServerWorldStateSynchronizer(tmpStore, builderDb, blockSource, worldStateConfig);
await worldStateSynchronizer.start();
builder = await TxProver.new(config, new NoopTelemetryClient());
prover = builder.createBlockProver(builderDb.asLatest());
Expand Down Expand Up @@ -371,6 +369,9 @@ describe('L1Publisher integration', () => {
let toConsume = await inbox.read.toConsume();

for (let i = 0; i < numberOfConsecutiveBlocks; i++) {
// @note Make sure that the state is up to date before we start building.
await worldStateSynchronizer.syncImmediate();

const l1ToL2Content = range(NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP, 128 * i + 1 + 0x400).map(fr);

for (let j = 0; j < l1ToL2Content.length; j++) {
Expand Down Expand Up @@ -484,6 +485,9 @@ describe('L1Publisher integration', () => {
const blockNumber = await publicClient.getBlockNumber();

for (let i = 0; i < numberOfConsecutiveBlocks; i++) {
// @note Make sure that the state is up to date before we start building.
await worldStateSynchronizer.syncImmediate();

const l1ToL2Messages = new Array(NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP).fill(new Fr(0n));
const txs = [makeEmptyProcessedTx(), makeEmptyProcessedTx()];

Expand Down

0 comments on commit 281f150

Please sign in to comment.