From 4c89941d0c3803ce72b86e76eead95a23d80d810 Mon Sep 17 00:00:00 2001 From: PhilWindle <60546371+PhilWindle@users.noreply.github.com> Date: Tue, 22 Aug 2023 18:39:20 +0100 Subject: [PATCH] fix: Fix race condition between RPC Server and Aztec Node (#1700) This PR introduces the following changes: 1. Refactored the way in which the world-state syncs it's blocks from the configured block source. 2. When the world state is accessed, the node first checks to see if it is in sync. If not then it performs an immediate sync to bring it up to the latest state. # Checklist: Remove the checklist to signal you've completed it. Enable auto-merge if the PR is ready to merge. - [ ] If the pull request requires a cryptography review (e.g. cryptographic algorithm implementations) I have added the 'crypto' tag. - [ ] I have reviewed my diff in github, line by line and removed unexpected formatting changes, testing logs, or commented-out code. - [ ] Every change is related to the PR description. - [ ] I have [linked](https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue) this pull request to relevant issues (if any exist). --------- Co-authored-by: Santiago Palladino --- .../aztec-node/src/aztec-node/server.ts | 70 ++++++--- yarn-project/aztec-sandbox/src/index.ts | 2 +- .../l2_block_downloader.ts | 70 ++++++--- .../merkle_tree_operations_facade.ts | 5 +- .../server_world_state_synchroniser.test.ts | 147 +++++++++++++++++- .../server_world_state_synchroniser.ts | 66 +++++++- .../synchroniser/world_state_synchroniser.ts | 7 + .../world-state/src/world-state-db/index.ts | 2 +- 8 files changed, 311 insertions(+), 58 deletions(-) diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index 74c0ba9e824..8b5b4e001c6 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -32,8 +32,10 @@ import { import { MerkleTrees, ServerWorldStateSynchroniser, + WorldStateConfig, WorldStateSynchroniser, computePublicDataTreeLeafIndex, + getConfigEnvVars as getWorldStateConfig, } from '@aztec/world-state'; import { default as levelup } from 'levelup'; @@ -54,7 +56,6 @@ export class AztecNodeService implements AztecNode { protected unencryptedLogsSource: L2LogsSource, protected contractDataSource: ContractDataSource, protected l1ToL2MessageSource: L1ToL2MessageSource, - protected merkleTreeDB: MerkleTrees, protected worldStateSynchroniser: WorldStateSynchroniser, protected sequencer: SequencerClient, protected chainId: number, @@ -80,7 +81,8 @@ export class AztecNodeService implements AztecNode { // now create the merkle trees and the world state syncher const merkleTreeDB = await MerkleTrees.new(levelup(createMemDown()), await CircuitsWasm.get()); - const worldStateSynchroniser = new ServerWorldStateSynchroniser(merkleTreeDB, archiver); + const worldStateConfig: WorldStateConfig = getWorldStateConfig(); + const worldStateSynchroniser = new ServerWorldStateSynchroniser(merkleTreeDB, archiver, worldStateConfig); // start both and wait for them to sync from the block source await Promise.all([p2pClient.start(), worldStateSynchroniser.start()]); @@ -101,7 +103,6 @@ export class AztecNodeService implements AztecNode { archiver, archiver, archiver, - merkleTreeDB, worldStateSynchroniser, sequencer, config.chainId, @@ -216,7 +217,6 @@ export class AztecNodeService implements AztecNode { await this.sequencer.stop(); await this.p2pClient.stop(); await this.worldStateSynchroniser.stop(); - await this.merkleTreeDB.stop(); await this.blockSource.stop(); this.log.info(`Stopped`); } @@ -243,8 +243,9 @@ export class AztecNodeService implements AztecNode { * @param leafValue - The value to search for. * @returns The index of the given leaf in the contracts tree or undefined if not found. */ - public findContractIndex(leafValue: Buffer): Promise { - return this.merkleTreeDB.findLeafIndex(MerkleTreeId.CONTRACT_TREE, leafValue, false); + public async findContractIndex(leafValue: Buffer): Promise { + const committedDb = await this.getWorldState(); + return committedDb.findLeafIndex(MerkleTreeId.CONTRACT_TREE, leafValue); } /** @@ -252,8 +253,9 @@ export class AztecNodeService implements AztecNode { * @param leafIndex - The index of the leaf for which the sibling path is required. * @returns The sibling path for the leaf index. */ - public getContractPath(leafIndex: bigint): Promise> { - return this.merkleTreeDB.getSiblingPath(MerkleTreeId.CONTRACT_TREE, leafIndex, false); + public async getContractPath(leafIndex: bigint): Promise> { + const committedDb = await this.getWorldState(); + return committedDb.getSiblingPath(MerkleTreeId.CONTRACT_TREE, leafIndex); } /** @@ -261,8 +263,9 @@ export class AztecNodeService implements AztecNode { * @param leafValue - The value to search for. * @returns The index of the given leaf in the private data tree or undefined if not found. */ - public findCommitmentIndex(leafValue: Buffer): Promise { - return this.merkleTreeDB.findLeafIndex(MerkleTreeId.PRIVATE_DATA_TREE, leafValue, false); + public async findCommitmentIndex(leafValue: Buffer): Promise { + const committedDb = await this.getWorldState(); + return committedDb.findLeafIndex(MerkleTreeId.PRIVATE_DATA_TREE, leafValue); } /** @@ -270,8 +273,9 @@ export class AztecNodeService implements AztecNode { * @param leafIndex - The index of the leaf for which the sibling path is required. * @returns The sibling path for the leaf index. */ - public getDataTreePath(leafIndex: bigint): Promise> { - return this.merkleTreeDB.getSiblingPath(MerkleTreeId.PRIVATE_DATA_TREE, leafIndex, false); + public async getDataTreePath(leafIndex: bigint): Promise> { + const committedDb = await this.getWorldState(); + return committedDb.getSiblingPath(MerkleTreeId.PRIVATE_DATA_TREE, leafIndex); } /** @@ -282,12 +286,9 @@ export class AztecNodeService implements AztecNode { */ public async getL1ToL2MessageAndIndex(messageKey: Fr): Promise { // todo: #697 - make this one lookup. + const committedDb = await this.getWorldState(); const message = await this.l1ToL2MessageSource.getConfirmedL1ToL2Message(messageKey); - const index = (await this.merkleTreeDB.findLeafIndex( - MerkleTreeId.L1_TO_L2_MESSAGES_TREE, - messageKey.toBuffer(), - false, - ))!; + const index = (await committedDb.findLeafIndex(MerkleTreeId.L1_TO_L2_MESSAGES_TREE, messageKey.toBuffer()))!; return Promise.resolve({ message, index }); } @@ -296,8 +297,9 @@ export class AztecNodeService implements AztecNode { * @param leafIndex - Index of the leaf in the tree. * @returns The sibling path. */ - public getL1ToL2MessagesTreePath(leafIndex: bigint): Promise> { - return this.merkleTreeDB.getSiblingPath(MerkleTreeId.L1_TO_L2_MESSAGES_TREE, leafIndex, false); + public async getL1ToL2MessagesTreePath(leafIndex: bigint): Promise> { + const committedDb = await this.getWorldState(); + return committedDb.getSiblingPath(MerkleTreeId.L1_TO_L2_MESSAGES_TREE, leafIndex); } /** @@ -308,8 +310,9 @@ export class AztecNodeService implements AztecNode { * Note: Aztec's version of `eth_getStorageAt`. */ public async getPublicStorageAt(contract: AztecAddress, slot: bigint): Promise { + const committedDb = await this.getWorldState(); const leafIndex = computePublicDataTreeLeafIndex(contract, new Fr(slot), await CircuitsWasm.get()); - return this.merkleTreeDB.getLeafValue(MerkleTreeId.PUBLIC_DATA_TREE, leafIndex, false); + return committedDb.getLeafValue(MerkleTreeId.PUBLIC_DATA_TREE, leafIndex); } /** @@ -317,7 +320,7 @@ export class AztecNodeService implements AztecNode { * @returns The current committed roots for the data trees. */ public async getTreeRoots(): Promise> { - const committedDb = this.worldStateSynchroniser.getCommitted(); + const committedDb = await this.getWorldState(); const getTreeRoot = async (id: MerkleTreeId) => Fr.fromBuffer((await committedDb.getTreeInfo(id)).root); const [privateDataTree, nullifierTree, contractTree, l1ToL2MessagesTree, blocksTree, publicDataTree] = @@ -345,7 +348,7 @@ export class AztecNodeService implements AztecNode { * @returns The current committed block data. */ public async getHistoricBlockData(): Promise { - const committedDb = this.worldStateSynchroniser.getCommitted(); + const committedDb = await this.getWorldState(); const [roots, globalsHash] = await Promise.all([this.getTreeRoots(), committedDb.getLatestGlobalVariablesHash()]); return new HistoricBlockData( @@ -359,4 +362,27 @@ export class AztecNodeService implements AztecNode { globalsHash, ); } + + /** + * Returns an instance of MerkleTreeOperations having first ensured the world state is fully synched + * @returns An instance of a committed MerkleTreeOperations + */ + private async getWorldState() { + try { + // Attempt to sync the world state if necessary + await this.syncWorldState(); + } catch (err) { + this.log.error(`Error getting world state: ${err}`); + } + return this.worldStateSynchroniser.getCommitted(); + } + + /** + * Ensure we fully sync the world state + * @returns A promise that fulfils once the world state is synced + */ + private async syncWorldState() { + const blockSourceHeight = await this.blockSource.getBlockHeight(); + await this.worldStateSynchroniser.syncImmediate(blockSourceHeight); + } } diff --git a/yarn-project/aztec-sandbox/src/index.ts b/yarn-project/aztec-sandbox/src/index.ts index e58cfc26085..16ed1baeba6 100644 --- a/yarn-project/aztec-sandbox/src/index.ts +++ b/yarn-project/aztec-sandbox/src/index.ts @@ -98,7 +98,7 @@ async function main() { accountStrings.push(` Public Key: ${completeAddress.publicKey.toString()}\n\n`); } } - logger.info(`${splash}\n${github}\n\n`.concat(...accountStrings)); + logger.info(`${splash}\n${github}\n\n`.concat(...accountStrings).concat(`\nAztec Sandbox now ready for use!`)); } main().catch(err => { diff --git a/yarn-project/types/src/l2_block_downloader/l2_block_downloader.ts b/yarn-project/types/src/l2_block_downloader/l2_block_downloader.ts index 37ddc85650f..ccb051cd70e 100644 --- a/yarn-project/types/src/l2_block_downloader/l2_block_downloader.ts +++ b/yarn-project/types/src/l2_block_downloader/l2_block_downloader.ts @@ -1,4 +1,4 @@ -import { MemoryFifo, Semaphore } from '@aztec/foundation/fifo'; +import { MemoryFifo, Semaphore, SerialQueue } from '@aztec/foundation/fifo'; import { createDebugLogger } from '@aztec/foundation/log'; import { InterruptableSleep } from '@aztec/foundation/sleep'; @@ -18,7 +18,8 @@ export class L2BlockDownloader { private from = 0; private interruptableSleep = new InterruptableSleep(); private semaphore: Semaphore; - private queue = new MemoryFifo(); + private jobQueue = new SerialQueue(); + private blockQueue = new MemoryFifo(); constructor(private l2BlockSource: L2BlockSource, maxQueueSize: number, private pollIntervalMS = 10000) { this.semaphore = new Semaphore(maxQueueSize); @@ -29,59 +30,82 @@ export class L2BlockDownloader { * @param from - The block number to start downloading from. Defaults to INITIAL_L2_BLOCK_NUM. */ public start(from = INITIAL_L2_BLOCK_NUM) { - this.from = from; - if (this.running) { this.interruptableSleep.interrupt(); return; } - + this.from = from; this.running = true; const fn = async () => { while (this.running) { try { - const blocks = await this.l2BlockSource.getL2Blocks(this.from, 10); - - if (!blocks.length) { - await this.interruptableSleep.sleep(this.pollIntervalMS); - continue; - } - - // Blocks if there are maxQueueSize results in the queue, until released after the callback. - await this.semaphore.acquire(); - this.queue.put(blocks); - this.from += blocks.length; + await this.jobQueue.put(() => this.collectBlocks()); + await this.interruptableSleep.sleep(this.pollIntervalMS); } catch (err) { log.error(err); await this.interruptableSleep.sleep(this.pollIntervalMS); } } }; - + this.jobQueue.start(); this.runningPromise = fn(); } + /** + * Repeatedly queries the block source and adds the received blocks to the block queue. + * Stops when no further blocks are received. + * @returns The total number of blocks added to the block queue. + */ + private async collectBlocks() { + let totalBlocks = 0; + while (true) { + const blocks = await this.l2BlockSource.getL2Blocks(this.from, 10); + if (!blocks.length) { + return totalBlocks; + } + await this.semaphore.acquire(); + this.blockQueue.put(blocks); + this.from += blocks.length; + totalBlocks += blocks.length; + } + } + /** * Stops the downloader. */ public async stop() { this.running = false; this.interruptableSleep.interrupt(); - this.queue.cancel(); + await this.jobQueue.cancel(); + this.blockQueue.cancel(); await this.runningPromise; } /** * Gets the next batch of blocks from the queue. + * @param timeout - optional timeout value to prevent permanent blocking * @returns The next batch of blocks from the queue. */ - public async getL2Blocks() { - const blocks = await this.queue.get(); - if (!blocks) { + public async getL2Blocks(timeout?: number) { + try { + const blocks = await this.blockQueue.get(timeout); + if (!blocks) { + return []; + } + this.semaphore.release(); + return blocks; + } catch (err) { + // nothing to do return []; } - this.semaphore.release(); - return blocks; + } + + /** + * Forces an immediate request for blocks. + * @returns A promise that fulfills once the poll is complete + */ + public pollImmediate(): Promise { + return this.jobQueue.put(() => this.collectBlocks()); } } diff --git a/yarn-project/world-state/src/merkle-tree/merkle_tree_operations_facade.ts b/yarn-project/world-state/src/merkle-tree/merkle_tree_operations_facade.ts index 95bda95fc4f..b54fe8092af 100644 --- a/yarn-project/world-state/src/merkle-tree/merkle_tree_operations_facade.ts +++ b/yarn-project/world-state/src/merkle-tree/merkle_tree_operations_facade.ts @@ -44,8 +44,9 @@ export class MerkleTreeOperationsFacade implements MerkleTreeOperations { * @param index - The index of the leaf for which a sibling path is required. * @returns A promise with the sibling path of the specified leaf index. */ - getSiblingPath(treeId: MerkleTreeId, index: bigint): Promise> { - return this.trees.getSiblingPath(treeId, index, this.includeUncommitted); + async getSiblingPath(treeId: MerkleTreeId, index: bigint): Promise> { + const path = await this.trees.getSiblingPath(treeId, index, this.includeUncommitted); + return path as unknown as SiblingPath; } /** diff --git a/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.test.ts b/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.test.ts index ce4e592d7cd..513984392c6 100644 --- a/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.test.ts +++ b/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.test.ts @@ -26,7 +26,7 @@ import { import { jest } from '@jest/globals'; import times from 'lodash.times'; -import { MerkleTreeDb } from '../index.js'; +import { MerkleTreeDb, MerkleTrees, WorldStateConfig } from '../index.js'; import { ServerWorldStateSynchroniser } from './server_world_state_synchroniser.js'; import { WorldStateRunningState } from './world_state_synchroniser.js'; @@ -96,8 +96,13 @@ const getMockBlock = (blockNumber: number, newContractsCommitments?: Buffer[]) = return block; }; -const createSynchroniser = (merkleTreeDb: any, rollupSource: any) => - new ServerWorldStateSynchroniser(merkleTreeDb as MerkleTreeDb, rollupSource as L2BlockSource); +const createSynchroniser = (merkleTreeDb: any, rollupSource: any, blockCheckInterval = 100) => { + const worldStateConfig: WorldStateConfig = { + worldStateBlockCheckIntervalMS: blockCheckInterval, + l2QueueSize: 1000, + }; + return new ServerWorldStateSynchroniser(merkleTreeDb as MerkleTrees, rollupSource as L2BlockSource, worldStateConfig); +}; const log = createDebugLogger('aztec:server_world_state_synchroniser_test'); @@ -126,8 +131,27 @@ describe('server_world_state_synchroniser', () => { commit: jest.fn().mockImplementation(() => Promise.resolve()), rollback: jest.fn().mockImplementation(() => Promise.resolve()), handleL2Block: jest.fn().mockImplementation(() => Promise.resolve()), + stop: jest.fn().mockImplementation(() => Promise.resolve()), } as any; + const performInitialSync = async (server: ServerWorldStateSynchroniser) => { + // test initial state + let status = await server.status(); + expect(status.syncedToL2Block).toEqual(0); + expect(status.state).toEqual(WorldStateRunningState.IDLE); + + // create the initial blocks + nextBlocks = Array(LATEST_BLOCK_NUMBER) + .fill(0) + .map((_, index: number) => getMockBlock(index + 1)); + + // start the sync process and await it + await server.start().catch(err => log.error('Sync not completed: ', err)); + + status = await server.status(); + expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER); + }; + it('can be constructed', () => { expect(() => createSynchroniser(merkleTreeDb, rollupSource)).not.toThrow(); }); @@ -284,4 +308,121 @@ describe('server_world_state_synchroniser', () => { expect(merkleTreeDb.handleL2Block).toHaveBeenCalledTimes(totalBlocks); await server.stop(); }); + + it('can immediately sync to latest', async () => { + const server = createSynchroniser(merkleTreeDb, rollupSource, 10000); + + await performInitialSync(server); + + // the server should now be asleep for a long time + // we will add a new block and force an immediate sync + nextBlocks = [getMockBlock(LATEST_BLOCK_NUMBER + 1)]; + await server.syncImmediate(); + + let status = await server.status(); + expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER + 1); + + nextBlocks = [getMockBlock(LATEST_BLOCK_NUMBER + 2), getMockBlock(LATEST_BLOCK_NUMBER + 3)]; + await server.syncImmediate(); + + status = await server.status(); + expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER + 3); + + // stop the synchroniser + await server.stop(); + + // check the final status + status = await server.status(); + expect(status.state).toEqual(WorldStateRunningState.STOPPED); + expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER + 3); + }); + + it('can immediately sync to a minimum block number', async () => { + const server = createSynchroniser(merkleTreeDb, rollupSource, 10000); + + await performInitialSync(server); + + // the server should now be asleep for a long time + // we will add 20 blocks and force a sync to at least LATEST + 5 + nextBlocks = Array(20) + .fill(0) + .map((_, index: number) => getMockBlock(index + 1 + LATEST_BLOCK_NUMBER)); + await server.syncImmediate(LATEST_BLOCK_NUMBER + 5); + + // we should have synced all of the blocks + let status = await server.status(); + expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER + 20); + + // stop the synchroniser + await server.stop(); + + // check the final status + status = await server.status(); + expect(status.state).toEqual(WorldStateRunningState.STOPPED); + expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER + 20); + }); + + it('can immediately sync to a minimum block in the past', async () => { + const server = createSynchroniser(merkleTreeDb, rollupSource, 10000); + + await performInitialSync(server); + // syncing to a block in the past should succeed + await server.syncImmediate(LATEST_BLOCK_NUMBER - 1); + // syncing to the current block should succeed + await server.syncImmediate(LATEST_BLOCK_NUMBER); + + // we should have synced all of the blocks + let status = await server.status(); + expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER); + + // stop the synchroniser + await server.stop(); + + // check the final status + status = await server.status(); + expect(status.state).toEqual(WorldStateRunningState.STOPPED); + expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER); + }); + + it('throws if you try to sync to an unavailable block', async () => { + const server = createSynchroniser(merkleTreeDb, rollupSource, 10000); + + await performInitialSync(server); + + // the server should now be asleep for a long time + // we will add 2 blocks and force a sync to at least LATEST + 5 + nextBlocks = Array(2) + .fill(0) + .map((_, index: number) => getMockBlock(index + 1 + LATEST_BLOCK_NUMBER)); + await expect(server.syncImmediate(LATEST_BLOCK_NUMBER + 5)).rejects.toThrow( + `Unable to sync to block height ${LATEST_BLOCK_NUMBER + 5}, currently synced to block ${LATEST_BLOCK_NUMBER + 2}`, + ); + + let status = await server.status(); + expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER + 2); + + // stop the synchroniser + await server.stop(); + + // check the final status + status = await server.status(); + expect(status.state).toEqual(WorldStateRunningState.STOPPED); + expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER + 2); + }); + + it('throws if you try to immediate sync when not running', async () => { + const server = createSynchroniser(merkleTreeDb, rollupSource, 10000); + + // test initial state + const status = await server.status(); + expect(status.syncedToL2Block).toEqual(0); + expect(status.state).toEqual(WorldStateRunningState.IDLE); + + // create an initial block + nextBlocks = Array(LATEST_BLOCK_NUMBER) + .fill(0) + .map((_, index: number) => getMockBlock(index + 1)); + + await expect(server.syncImmediate()).rejects.toThrow(`World State is not running, unable to perform sync`); + }); }); diff --git a/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.ts b/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.ts index a7664a3c7fd..43fcebef38a 100644 --- a/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.ts +++ b/yarn-project/world-state/src/synchroniser/server_world_state_synchroniser.ts @@ -1,9 +1,10 @@ +import { SerialQueue } from '@aztec/foundation/fifo'; import { createDebugLogger } from '@aztec/foundation/log'; import { L2Block, L2BlockDownloader, L2BlockSource } from '@aztec/types'; -import { MerkleTreeDb, MerkleTreeOperations } from '../index.js'; +import { MerkleTreeOperations, MerkleTrees } from '../index.js'; import { MerkleTreeOperationsFacade } from '../merkle-tree/merkle_tree_operations_facade.js'; -import { getConfigEnvVars } from './config.js'; +import { WorldStateConfig } from './config.js'; import { WorldStateRunningState, WorldStateStatus, WorldStateSynchroniser } from './world_state_synchroniser.js'; /** @@ -18,16 +19,17 @@ export class ServerWorldStateSynchroniser implements WorldStateSynchroniser { private l2BlockDownloader: L2BlockDownloader; private syncPromise: Promise = Promise.resolve(); private syncResolve?: () => void = undefined; + private jobQueue = new SerialQueue(); private stopping = false; private runningPromise: Promise = Promise.resolve(); private currentState: WorldStateRunningState = WorldStateRunningState.IDLE; constructor( - private merkleTreeDb: MerkleTreeDb, + private merkleTreeDb: MerkleTrees, private l2BlockSource: L2BlockSource, + config: WorldStateConfig, private log = createDebugLogger('aztec:world_state'), ) { - const config = getConfigEnvVars(); this.l2BlockDownloader = new L2BlockDownloader( l2BlockSource, config.l2QueueSize, @@ -73,10 +75,10 @@ export class ServerWorldStateSynchroniser implements WorldStateSynchroniser { // start looking for further blocks const blockProcess = async () => { while (!this.stopping) { - const blocks = await this.l2BlockDownloader.getL2Blocks(); - await this.handleL2Blocks(blocks); + await this.jobQueue.put(() => this.collectAndProcessBlocks()); } }; + this.jobQueue.start(); this.runningPromise = blockProcess(); this.l2BlockDownloader.start(blockToDownloadFrom); this.log(`Started block downloader from block ${blockToDownloadFrom}`); @@ -87,6 +89,8 @@ export class ServerWorldStateSynchroniser implements WorldStateSynchroniser { this.log('Stopping world state...'); this.stopping = true; await this.l2BlockDownloader.stop(); + await this.jobQueue.cancel(); + await this.merkleTreeDb.stop(); await this.runningPromise; this.setCurrentState(WorldStateRunningState.STOPPED); } @@ -99,6 +103,56 @@ export class ServerWorldStateSynchroniser implements WorldStateSynchroniser { return Promise.resolve(status); } + /** + * Forces an immediate sync + * @param blockHeight - The minimum block height that we must sync to + * @returns A promise that resolves once the sync has completed. + */ + public async syncImmediate(blockHeight?: number): Promise { + if (this.currentState !== WorldStateRunningState.RUNNING) { + throw new Error(`World State is not running, unable to perform sync`); + } + // If we have been given a block height to sync to and we have reached that height + // then return. + if (blockHeight !== undefined && blockHeight <= this.currentL2BlockNum) { + return; + } + const blockToSyncTo = blockHeight === undefined ? 'latest' : `${blockHeight}`; + this.log(`World State at block ${this.currentL2BlockNum}, told to sync to block ${blockToSyncTo}...`); + // ensure any outstanding block updates are completed first. + await this.jobQueue.syncPoint(); + while (true) { + // Check the block height again + if (blockHeight !== undefined && blockHeight <= this.currentL2BlockNum) { + return; + } + // Poll for more blocks + const numBlocks = await this.l2BlockDownloader.pollImmediate(); + this.log(`Block download immediate poll yielded ${numBlocks} blocks`); + if (numBlocks) { + // More blocks were received, process them and go round again + await this.jobQueue.put(() => this.collectAndProcessBlocks()); + continue; + } + // No blocks are available, if we have been given a block height then we can't achieve it + if (blockHeight !== undefined) { + throw new Error( + `Unable to sync to block height ${blockHeight}, currently synced to block ${this.currentL2BlockNum}`, + ); + } + return; + } + } + + /** + * Checks for the availability of new blocks and processes them. + */ + private async collectAndProcessBlocks() { + // This request for blocks will timeout after 1 second if no blocks are received + const blocks = await this.l2BlockDownloader.getL2Blocks(1); + await this.handleL2Blocks(blocks); + } + /** * Handles a list of L2 blocks (i.e. Inserts the new commitments into the merkle tree). * @param l2Blocks - The L2 blocks to handle. diff --git a/yarn-project/world-state/src/synchroniser/world_state_synchroniser.ts b/yarn-project/world-state/src/synchroniser/world_state_synchroniser.ts index e7a14ee6d4e..e48143fc6f7 100644 --- a/yarn-project/world-state/src/synchroniser/world_state_synchroniser.ts +++ b/yarn-project/world-state/src/synchroniser/world_state_synchroniser.ts @@ -45,6 +45,13 @@ export interface WorldStateSynchroniser { */ stop(): Promise; + /** + * Forces an immediate sync to an optionally provided minimum block height + * @param blockHeight - The minimum block height that we must sync to + * @returns A promise that resolves once the sync has completed. + */ + syncImmediate(blockHeight?: number): Promise; + /** * Returns an instance of MerkleTreeOperations that will include uncommitted data. * @returns An instance of MerkleTreeOperations that will include uncommitted data. diff --git a/yarn-project/world-state/src/world-state-db/index.ts b/yarn-project/world-state/src/world-state-db/index.ts index 77081f3e0dd..418b0de19bb 100644 --- a/yarn-project/world-state/src/world-state-db/index.ts +++ b/yarn-project/world-state/src/world-state-db/index.ts @@ -123,7 +123,7 @@ export interface MerkleTreeOperations { * @param treeId - The tree to be queried for a sibling path. * @param index - The index of the leaf for which a sibling path should be returned. */ - getSiblingPath(treeId: MerkleTreeId, index: bigint): Promise>; + getSiblingPath(treeId: MerkleTreeId, index: bigint): Promise>; /** * Returns the previous index for a given value in an indexed tree.