diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index 74c0ba9e824..8b5b4e001c6 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -32,8 +32,10 @@ import { import { MerkleTrees, ServerWorldStateSynchroniser, + WorldStateConfig, WorldStateSynchroniser, computePublicDataTreeLeafIndex, + getConfigEnvVars as getWorldStateConfig, } from '@aztec/world-state'; import { default as levelup } from 'levelup'; @@ -54,7 +56,6 @@ export class AztecNodeService implements AztecNode { protected unencryptedLogsSource: L2LogsSource, protected contractDataSource: ContractDataSource, protected l1ToL2MessageSource: L1ToL2MessageSource, - protected merkleTreeDB: MerkleTrees, protected worldStateSynchroniser: WorldStateSynchroniser, protected sequencer: SequencerClient, protected chainId: number, @@ -80,7 +81,8 @@ export class AztecNodeService implements AztecNode { // now create the merkle trees and the world state syncher const merkleTreeDB = await MerkleTrees.new(levelup(createMemDown()), await CircuitsWasm.get()); - const worldStateSynchroniser = new ServerWorldStateSynchroniser(merkleTreeDB, archiver); + const worldStateConfig: WorldStateConfig = getWorldStateConfig(); + const worldStateSynchroniser = new ServerWorldStateSynchroniser(merkleTreeDB, archiver, worldStateConfig); // start both and wait for them to sync from the block source await Promise.all([p2pClient.start(), worldStateSynchroniser.start()]); @@ -101,7 +103,6 @@ export class AztecNodeService implements AztecNode { archiver, archiver, archiver, - merkleTreeDB, worldStateSynchroniser, sequencer, config.chainId, @@ -216,7 +217,6 @@ export class AztecNodeService implements AztecNode { await this.sequencer.stop(); await this.p2pClient.stop(); await this.worldStateSynchroniser.stop(); - await this.merkleTreeDB.stop(); await this.blockSource.stop(); this.log.info(`Stopped`); } @@ -243,8 +243,9 @@ export class AztecNodeService implements AztecNode { * @param leafValue - The value to search for. * @returns The index of the given leaf in the contracts tree or undefined if not found. */ - public findContractIndex(leafValue: Buffer): Promise { - return this.merkleTreeDB.findLeafIndex(MerkleTreeId.CONTRACT_TREE, leafValue, false); + public async findContractIndex(leafValue: Buffer): Promise { + const committedDb = await this.getWorldState(); + return committedDb.findLeafIndex(MerkleTreeId.CONTRACT_TREE, leafValue); } /** @@ -252,8 +253,9 @@ export class AztecNodeService implements AztecNode { * @param leafIndex - The index of the leaf for which the sibling path is required. * @returns The sibling path for the leaf index. */ - public getContractPath(leafIndex: bigint): Promise> { - return this.merkleTreeDB.getSiblingPath(MerkleTreeId.CONTRACT_TREE, leafIndex, false); + public async getContractPath(leafIndex: bigint): Promise> { + const committedDb = await this.getWorldState(); + return committedDb.getSiblingPath(MerkleTreeId.CONTRACT_TREE, leafIndex); } /** @@ -261,8 +263,9 @@ export class AztecNodeService implements AztecNode { * @param leafValue - The value to search for. * @returns The index of the given leaf in the private data tree or undefined if not found. */ - public findCommitmentIndex(leafValue: Buffer): Promise { - return this.merkleTreeDB.findLeafIndex(MerkleTreeId.PRIVATE_DATA_TREE, leafValue, false); + public async findCommitmentIndex(leafValue: Buffer): Promise { + const committedDb = await this.getWorldState(); + return committedDb.findLeafIndex(MerkleTreeId.PRIVATE_DATA_TREE, leafValue); } /** @@ -270,8 +273,9 @@ export class AztecNodeService implements AztecNode { * @param leafIndex - The index of the leaf for which the sibling path is required. * @returns The sibling path for the leaf index. */ - public getDataTreePath(leafIndex: bigint): Promise> { - return this.merkleTreeDB.getSiblingPath(MerkleTreeId.PRIVATE_DATA_TREE, leafIndex, false); + public async getDataTreePath(leafIndex: bigint): Promise> { + const committedDb = await this.getWorldState(); + return committedDb.getSiblingPath(MerkleTreeId.PRIVATE_DATA_TREE, leafIndex); } /** @@ -282,12 +286,9 @@ export class AztecNodeService implements AztecNode { */ public async getL1ToL2MessageAndIndex(messageKey: Fr): Promise { // todo: #697 - make this one lookup. + const committedDb = await this.getWorldState(); const message = await this.l1ToL2MessageSource.getConfirmedL1ToL2Message(messageKey); - const index = (await this.merkleTreeDB.findLeafIndex( - MerkleTreeId.L1_TO_L2_MESSAGES_TREE, - messageKey.toBuffer(), - false, - ))!; + const index = (await committedDb.findLeafIndex(MerkleTreeId.L1_TO_L2_MESSAGES_TREE, messageKey.toBuffer()))!; return Promise.resolve({ message, index }); } @@ -296,8 +297,9 @@ export class AztecNodeService implements AztecNode { * @param leafIndex - Index of the leaf in the tree. * @returns The sibling path. */ - public getL1ToL2MessagesTreePath(leafIndex: bigint): Promise> { - return this.merkleTreeDB.getSiblingPath(MerkleTreeId.L1_TO_L2_MESSAGES_TREE, leafIndex, false); + public async getL1ToL2MessagesTreePath(leafIndex: bigint): Promise> { + const committedDb = await this.getWorldState(); + return committedDb.getSiblingPath(MerkleTreeId.L1_TO_L2_MESSAGES_TREE, leafIndex); } /** @@ -308,8 +310,9 @@ export class AztecNodeService implements AztecNode { * Note: Aztec's version of `eth_getStorageAt`. */ public async getPublicStorageAt(contract: AztecAddress, slot: bigint): Promise { + const committedDb = await this.getWorldState(); const leafIndex = computePublicDataTreeLeafIndex(contract, new Fr(slot), await CircuitsWasm.get()); - return this.merkleTreeDB.getLeafValue(MerkleTreeId.PUBLIC_DATA_TREE, leafIndex, false); + return committedDb.getLeafValue(MerkleTreeId.PUBLIC_DATA_TREE, leafIndex); } /** @@ -317,7 +320,7 @@ export class AztecNodeService implements AztecNode { * @returns The current committed roots for the data trees. */ public async getTreeRoots(): Promise> { - const committedDb = this.worldStateSynchroniser.getCommitted(); + const committedDb = await this.getWorldState(); const getTreeRoot = async (id: MerkleTreeId) => Fr.fromBuffer((await committedDb.getTreeInfo(id)).root); const [privateDataTree, nullifierTree, contractTree, l1ToL2MessagesTree, blocksTree, publicDataTree] = @@ -345,7 +348,7 @@ export class AztecNodeService implements AztecNode { * @returns The current committed block data. */ public async getHistoricBlockData(): Promise { - const committedDb = this.worldStateSynchroniser.getCommitted(); + const committedDb = await this.getWorldState(); const [roots, globalsHash] = await Promise.all([this.getTreeRoots(), committedDb.getLatestGlobalVariablesHash()]); return new HistoricBlockData( @@ -359,4 +362,27 @@ export class AztecNodeService implements AztecNode { globalsHash, ); } + + /** + * Returns an instance of MerkleTreeOperations having first ensured the world state is fully synched + * @returns An instance of a committed MerkleTreeOperations + */ + private async getWorldState() { + try { + // Attempt to sync the world state if necessary + await this.syncWorldState(); + } catch (err) { + this.log.error(`Error getting world state: ${err}`); + } + return this.worldStateSynchroniser.getCommitted(); + } + + /** + * Ensure we fully sync the world state + * @returns A promise that fulfils once the world state is synced + */ + private async syncWorldState() { + const blockSourceHeight = await this.blockSource.getBlockHeight(); + await this.worldStateSynchroniser.syncImmediate(blockSourceHeight); + } } diff --git a/yarn-project/aztec-sandbox/src/index.ts b/yarn-project/aztec-sandbox/src/index.ts index e58cfc26085..16ed1baeba6 100644 --- a/yarn-project/aztec-sandbox/src/index.ts +++ b/yarn-project/aztec-sandbox/src/index.ts @@ -98,7 +98,7 @@ async function main() { accountStrings.push(` Public Key: ${completeAddress.publicKey.toString()}\n\n`); } } - logger.info(`${splash}\n${github}\n\n`.concat(...accountStrings)); + logger.info(`${splash}\n${github}\n\n`.concat(...accountStrings).concat(`\nAztec Sandbox now ready for use!`)); } main().catch(err => { diff --git a/yarn-project/types/src/l2_block_downloader/l2_block_downloader.ts b/yarn-project/types/src/l2_block_downloader/l2_block_downloader.ts index 37ddc85650f..ccb051cd70e 100644 --- a/yarn-project/types/src/l2_block_downloader/l2_block_downloader.ts +++ b/yarn-project/types/src/l2_block_downloader/l2_block_downloader.ts @@ -1,4 +1,4 @@ -import { MemoryFifo, Semaphore } from '@aztec/foundation/fifo'; +import { MemoryFifo, Semaphore, SerialQueue } from '@aztec/foundation/fifo'; import { createDebugLogger } from '@aztec/foundation/log'; import { InterruptableSleep } from '@aztec/foundation/sleep'; @@ -18,7 +18,8 @@ export class L2BlockDownloader { private from = 0; private interruptableSleep = new InterruptableSleep(); private semaphore: Semaphore; - private queue = new MemoryFifo(); + private jobQueue = new SerialQueue(); + private blockQueue = new MemoryFifo(); constructor(private l2BlockSource: L2BlockSource, maxQueueSize: number, private pollIntervalMS = 10000) { this.semaphore = new Semaphore(maxQueueSize); @@ -29,59 +30,82 @@ export class L2BlockDownloader { * @param from - The block number to start downloading from. Defaults to INITIAL_L2_BLOCK_NUM. */ public start(from = INITIAL_L2_BLOCK_NUM) { - this.from = from; - if (this.running) { this.interruptableSleep.interrupt(); return; } - + this.from = from; this.running = true; const fn = async () => { while (this.running) { try { - const blocks = await this.l2BlockSource.getL2Blocks(this.from, 10); - - if (!blocks.length) { - await this.interruptableSleep.sleep(this.pollIntervalMS); - continue; - } - - // Blocks if there are maxQueueSize results in the queue, until released after the callback. - await this.semaphore.acquire(); - this.queue.put(blocks); - this.from += blocks.length; + await this.jobQueue.put(() => this.collectBlocks()); + await this.interruptableSleep.sleep(this.pollIntervalMS); } catch (err) { log.error(err); await this.interruptableSleep.sleep(this.pollIntervalMS); } } }; - + this.jobQueue.start(); this.runningPromise = fn(); } + /** + * Repeatedly queries the block source and adds the received blocks to the block queue. + * Stops when no further blocks are received. + * @returns The total number of blocks added to the block queue. + */ + private async collectBlocks() { + let totalBlocks = 0; + while (true) { + const blocks = await this.l2BlockSource.getL2Blocks(this.from, 10); + if (!blocks.length) { + return totalBlocks; + } + await this.semaphore.acquire(); + this.blockQueue.put(blocks); + this.from += blocks.length; + totalBlocks += blocks.length; + } + } + /** * Stops the downloader. */ public async stop() { this.running = false; this.interruptableSleep.interrupt(); - this.queue.cancel(); + await this.jobQueue.cancel(); + this.blockQueue.cancel(); await this.runningPromise; } /** * Gets the next batch of blocks from the queue. + * @param timeout - optional timeout value to prevent permanent blocking * @returns The next batch of blocks from the queue. */ - public async getL2Blocks() { - const blocks = await this.queue.get(); - if (!blocks) { + public async getL2Blocks(timeout?: number) { + try { + const blocks = await this.blockQueue.get(timeout); + if (!blocks) { + return []; + } + this.semaphore.release(); + return blocks; + } catch (err) { + // nothing to do return []; } - this.semaphore.release(); - return blocks; + } + + /** + * Forces an immediate request for blocks. + * @returns A promise that fulfills once the poll is complete + */ + public pollImmediate(): Promise { + return this.jobQueue.put(() => this.collectBlocks()); } } diff --git a/yarn-project/world-state/src/merkle-tree/merkle_tree_operations_facade.ts b/yarn-project/world-state/src/merkle-tree/merkle_tree_operations_facade.ts index 95bda95fc4f..b54fe8092af 100644 --- a/yarn-project/world-state/src/merkle-tree/merkle_tree_operations_facade.ts +++ b/yarn-project/world-state/src/merkle-tree/merkle_tree_operations_facade.ts @@ -44,8 +44,9 @@ export class MerkleTreeOperationsFacade implements MerkleTreeOperations { * @param index - The index of the leaf for which a sibling path is required. * @returns A promise with the sibling path of the specified leaf index. */ - getSiblingPath(treeId: MerkleTreeId, index: bigint): Promise> { - return this.trees.getSiblingPath(treeId, index, this.includeUncommitted); + async getSiblingPath(treeId: MerkleTreeId, index: bigint): Promise> { + const path = await this.trees.getSiblingPath(treeId, index, this.includeUncommitted); + return path as unknown as SiblingPath; } /** diff --git a/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.test.ts b/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.test.ts index ce4e592d7cd..513984392c6 100644 --- a/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.test.ts +++ b/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.test.ts @@ -26,7 +26,7 @@ import { import { jest } from '@jest/globals'; import times from 'lodash.times'; -import { MerkleTreeDb } from '../index.js'; +import { MerkleTreeDb, MerkleTrees, WorldStateConfig } from '../index.js'; import { ServerWorldStateSynchroniser } from './server_world_state_synchroniser.js'; import { WorldStateRunningState } from './world_state_synchroniser.js'; @@ -96,8 +96,13 @@ const getMockBlock = (blockNumber: number, newContractsCommitments?: Buffer[]) = return block; }; -const createSynchroniser = (merkleTreeDb: any, rollupSource: any) => - new ServerWorldStateSynchroniser(merkleTreeDb as MerkleTreeDb, rollupSource as L2BlockSource); +const createSynchroniser = (merkleTreeDb: any, rollupSource: any, blockCheckInterval = 100) => { + const worldStateConfig: WorldStateConfig = { + worldStateBlockCheckIntervalMS: blockCheckInterval, + l2QueueSize: 1000, + }; + return new ServerWorldStateSynchroniser(merkleTreeDb as MerkleTrees, rollupSource as L2BlockSource, worldStateConfig); +}; const log = createDebugLogger('aztec:server_world_state_synchroniser_test'); @@ -126,8 +131,27 @@ describe('server_world_state_synchroniser', () => { commit: jest.fn().mockImplementation(() => Promise.resolve()), rollback: jest.fn().mockImplementation(() => Promise.resolve()), handleL2Block: jest.fn().mockImplementation(() => Promise.resolve()), + stop: jest.fn().mockImplementation(() => Promise.resolve()), } as any; + const performInitialSync = async (server: ServerWorldStateSynchroniser) => { + // test initial state + let status = await server.status(); + expect(status.syncedToL2Block).toEqual(0); + expect(status.state).toEqual(WorldStateRunningState.IDLE); + + // create the initial blocks + nextBlocks = Array(LATEST_BLOCK_NUMBER) + .fill(0) + .map((_, index: number) => getMockBlock(index + 1)); + + // start the sync process and await it + await server.start().catch(err => log.error('Sync not completed: ', err)); + + status = await server.status(); + expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER); + }; + it('can be constructed', () => { expect(() => createSynchroniser(merkleTreeDb, rollupSource)).not.toThrow(); }); @@ -284,4 +308,121 @@ describe('server_world_state_synchroniser', () => { expect(merkleTreeDb.handleL2Block).toHaveBeenCalledTimes(totalBlocks); await server.stop(); }); + + it('can immediately sync to latest', async () => { + const server = createSynchroniser(merkleTreeDb, rollupSource, 10000); + + await performInitialSync(server); + + // the server should now be asleep for a long time + // we will add a new block and force an immediate sync + nextBlocks = [getMockBlock(LATEST_BLOCK_NUMBER + 1)]; + await server.syncImmediate(); + + let status = await server.status(); + expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER + 1); + + nextBlocks = [getMockBlock(LATEST_BLOCK_NUMBER + 2), getMockBlock(LATEST_BLOCK_NUMBER + 3)]; + await server.syncImmediate(); + + status = await server.status(); + expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER + 3); + + // stop the synchroniser + await server.stop(); + + // check the final status + status = await server.status(); + expect(status.state).toEqual(WorldStateRunningState.STOPPED); + expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER + 3); + }); + + it('can immediately sync to a minimum block number', async () => { + const server = createSynchroniser(merkleTreeDb, rollupSource, 10000); + + await performInitialSync(server); + + // the server should now be asleep for a long time + // we will add 20 blocks and force a sync to at least LATEST + 5 + nextBlocks = Array(20) + .fill(0) + .map((_, index: number) => getMockBlock(index + 1 + LATEST_BLOCK_NUMBER)); + await server.syncImmediate(LATEST_BLOCK_NUMBER + 5); + + // we should have synced all of the blocks + let status = await server.status(); + expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER + 20); + + // stop the synchroniser + await server.stop(); + + // check the final status + status = await server.status(); + expect(status.state).toEqual(WorldStateRunningState.STOPPED); + expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER + 20); + }); + + it('can immediately sync to a minimum block in the past', async () => { + const server = createSynchroniser(merkleTreeDb, rollupSource, 10000); + + await performInitialSync(server); + // syncing to a block in the past should succeed + await server.syncImmediate(LATEST_BLOCK_NUMBER - 1); + // syncing to the current block should succeed + await server.syncImmediate(LATEST_BLOCK_NUMBER); + + // we should have synced all of the blocks + let status = await server.status(); + expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER); + + // stop the synchroniser + await server.stop(); + + // check the final status + status = await server.status(); + expect(status.state).toEqual(WorldStateRunningState.STOPPED); + expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER); + }); + + it('throws if you try to sync to an unavailable block', async () => { + const server = createSynchroniser(merkleTreeDb, rollupSource, 10000); + + await performInitialSync(server); + + // the server should now be asleep for a long time + // we will add 2 blocks and force a sync to at least LATEST + 5 + nextBlocks = Array(2) + .fill(0) + .map((_, index: number) => getMockBlock(index + 1 + LATEST_BLOCK_NUMBER)); + await expect(server.syncImmediate(LATEST_BLOCK_NUMBER + 5)).rejects.toThrow( + `Unable to sync to block height ${LATEST_BLOCK_NUMBER + 5}, currently synced to block ${LATEST_BLOCK_NUMBER + 2}`, + ); + + let status = await server.status(); + expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER + 2); + + // stop the synchroniser + await server.stop(); + + // check the final status + status = await server.status(); + expect(status.state).toEqual(WorldStateRunningState.STOPPED); + expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER + 2); + }); + + it('throws if you try to immediate sync when not running', async () => { + const server = createSynchroniser(merkleTreeDb, rollupSource, 10000); + + // test initial state + const status = await server.status(); + expect(status.syncedToL2Block).toEqual(0); + expect(status.state).toEqual(WorldStateRunningState.IDLE); + + // create an initial block + nextBlocks = Array(LATEST_BLOCK_NUMBER) + .fill(0) + .map((_, index: number) => getMockBlock(index + 1)); + + await expect(server.syncImmediate()).rejects.toThrow(`World State is not running, unable to perform sync`); + }); }); diff --git a/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.ts b/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.ts index a7664a3c7fd..43fcebef38a 100644 --- a/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.ts +++ b/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.ts @@ -1,9 +1,10 @@ +import { SerialQueue } from '@aztec/foundation/fifo'; import { createDebugLogger } from '@aztec/foundation/log'; import { L2Block, L2BlockDownloader, L2BlockSource } from '@aztec/types'; -import { MerkleTreeDb, MerkleTreeOperations } from '../index.js'; +import { MerkleTreeOperations, MerkleTrees } from '../index.js'; import { MerkleTreeOperationsFacade } from '../merkle-tree/merkle_tree_operations_facade.js'; -import { getConfigEnvVars } from './config.js'; +import { WorldStateConfig } from './config.js'; import { WorldStateRunningState, WorldStateStatus, WorldStateSynchroniser } from './world_state_synchroniser.js'; /** @@ -18,16 +19,17 @@ export class ServerWorldStateSynchroniser implements WorldStateSynchroniser { private l2BlockDownloader: L2BlockDownloader; private syncPromise: Promise = Promise.resolve(); private syncResolve?: () => void = undefined; + private jobQueue = new SerialQueue(); private stopping = false; private runningPromise: Promise = Promise.resolve(); private currentState: WorldStateRunningState = WorldStateRunningState.IDLE; constructor( - private merkleTreeDb: MerkleTreeDb, + private merkleTreeDb: MerkleTrees, private l2BlockSource: L2BlockSource, + config: WorldStateConfig, private log = createDebugLogger('aztec:world_state'), ) { - const config = getConfigEnvVars(); this.l2BlockDownloader = new L2BlockDownloader( l2BlockSource, config.l2QueueSize, @@ -73,10 +75,10 @@ export class ServerWorldStateSynchroniser implements WorldStateSynchroniser { // start looking for further blocks const blockProcess = async () => { while (!this.stopping) { - const blocks = await this.l2BlockDownloader.getL2Blocks(); - await this.handleL2Blocks(blocks); + await this.jobQueue.put(() => this.collectAndProcessBlocks()); } }; + this.jobQueue.start(); this.runningPromise = blockProcess(); this.l2BlockDownloader.start(blockToDownloadFrom); this.log(`Started block downloader from block ${blockToDownloadFrom}`); @@ -87,6 +89,8 @@ export class ServerWorldStateSynchroniser implements WorldStateSynchroniser { this.log('Stopping world state...'); this.stopping = true; await this.l2BlockDownloader.stop(); + await this.jobQueue.cancel(); + await this.merkleTreeDb.stop(); await this.runningPromise; this.setCurrentState(WorldStateRunningState.STOPPED); } @@ -99,6 +103,56 @@ export class ServerWorldStateSynchroniser implements WorldStateSynchroniser { return Promise.resolve(status); } + /** + * Forces an immediate sync + * @param blockHeight - The minimum block height that we must sync to + * @returns A promise that resolves once the sync has completed. + */ + public async syncImmediate(blockHeight?: number): Promise { + if (this.currentState !== WorldStateRunningState.RUNNING) { + throw new Error(`World State is not running, unable to perform sync`); + } + // If we have been given a block height to sync to and we have reached that height + // then return. + if (blockHeight !== undefined && blockHeight <= this.currentL2BlockNum) { + return; + } + const blockToSyncTo = blockHeight === undefined ? 'latest' : `${blockHeight}`; + this.log(`World State at block ${this.currentL2BlockNum}, told to sync to block ${blockToSyncTo}...`); + // ensure any outstanding block updates are completed first. + await this.jobQueue.syncPoint(); + while (true) { + // Check the block height again + if (blockHeight !== undefined && blockHeight <= this.currentL2BlockNum) { + return; + } + // Poll for more blocks + const numBlocks = await this.l2BlockDownloader.pollImmediate(); + this.log(`Block download immediate poll yielded ${numBlocks} blocks`); + if (numBlocks) { + // More blocks were received, process them and go round again + await this.jobQueue.put(() => this.collectAndProcessBlocks()); + continue; + } + // No blocks are available, if we have been given a block height then we can't achieve it + if (blockHeight !== undefined) { + throw new Error( + `Unable to sync to block height ${blockHeight}, currently synced to block ${this.currentL2BlockNum}`, + ); + } + return; + } + } + + /** + * Checks for the availability of new blocks and processes them. + */ + private async collectAndProcessBlocks() { + // This request for blocks will timeout after 1 second if no blocks are received + const blocks = await this.l2BlockDownloader.getL2Blocks(1); + await this.handleL2Blocks(blocks); + } + /** * Handles a list of L2 blocks (i.e. Inserts the new commitments into the merkle tree). * @param l2Blocks - The L2 blocks to handle. diff --git a/yarn-project/world-state/src/synchroniser/world_state_synchroniser.ts b/yarn-project/world-state/src/synchroniser/world_state_synchroniser.ts index e7a14ee6d4e..e48143fc6f7 100644 --- a/yarn-project/world-state/src/synchroniser/world_state_synchroniser.ts +++ b/yarn-project/world-state/src/synchroniser/world_state_synchroniser.ts @@ -45,6 +45,13 @@ export interface WorldStateSynchroniser { */ stop(): Promise; + /** + * Forces an immediate sync to an optionally provided minimum block height + * @param blockHeight - The minimum block height that we must sync to + * @returns A promise that resolves once the sync has completed. + */ + syncImmediate(blockHeight?: number): Promise; + /** * Returns an instance of MerkleTreeOperations that will include uncommitted data. * @returns An instance of MerkleTreeOperations that will include uncommitted data. diff --git a/yarn-project/world-state/src/world-state-db/index.ts b/yarn-project/world-state/src/world-state-db/index.ts index 77081f3e0dd..418b0de19bb 100644 --- a/yarn-project/world-state/src/world-state-db/index.ts +++ b/yarn-project/world-state/src/world-state-db/index.ts @@ -123,7 +123,7 @@ export interface MerkleTreeOperations { * @param treeId - The tree to be queried for a sibling path. * @param index - The index of the leaf for which a sibling path should be returned. */ - getSiblingPath(treeId: MerkleTreeId, index: bigint): Promise>; + getSiblingPath(treeId: MerkleTreeId, index: bigint): Promise>; /** * Returns the previous index for a given value in an indexed tree.