From 93f9315d2e9f1d51195ec831f6dd3e6fe16c468e Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Fri, 15 Dec 2023 12:42:32 +0000 Subject: [PATCH] feat: PXE adds note processors for stored accounts (#3673) This PR makes the PXE add note processors for each stored key in the keystore when it is started. This way any previously stored accounts start syncing with the chain immediately. --- .../archiver/src/archiver/archiver.ts | 25 +++++----- yarn-project/pxe/src/config/index.ts | 2 +- .../pxe/src/database/kv_pxe_database.ts | 34 ++++++++++--- yarn-project/pxe/src/database/memory_db.ts | 20 +++++++- yarn-project/pxe/src/database/pxe_database.ts | 25 +++++++++- .../src/database/pxe_database_test_suite.ts | 6 +-- .../src/note_processor/note_processor.test.ts | 22 +++++++++ .../pxe/src/note_processor/note_processor.ts | 25 +++++----- .../pxe/src/pxe_service/pxe_service.ts | 19 ++++++- .../pxe/src/synchronizer/synchronizer.ts | 49 ++++++++++--------- 10 files changed, 161 insertions(+), 66 deletions(-) diff --git a/yarn-project/archiver/src/archiver/archiver.ts b/yarn-project/archiver/src/archiver/archiver.ts index e8860ceac92..1357d331861 100644 --- a/yarn-project/archiver/src/archiver/archiver.ts +++ b/yarn-project/archiver/src/archiver/archiver.ts @@ -47,11 +47,6 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource */ private runningPromise?: RunningPromise; - /** - * Next L1 block number to fetch `L2BlockProcessed` logs from (i.e. `fromBlock` in eth_getLogs). - */ - private nextL2BlockFromL1Block = 0n; - /** * Use this to track logged block in order to avoid repeating the same message. */ @@ -220,11 +215,21 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource this.publicClient, this.rollupAddress, blockUntilSynced, - this.nextL2BlockFromL1Block, + lastProcessedL1BlockNumber + 1n, currentL1BlockNumber, nextExpectedL2BlockNum, ); + if (retrievedBlocks.retrievedData.length === 0) { + return; + } else { + this.log( + `Retrieved ${retrievedBlocks.retrievedData.length} new L2 blocks between L1 blocks ${ + lastProcessedL1BlockNumber + 1n + } and ${currentL1BlockNumber}.`, + ); + } + // create the block number -> block hash mapping to ensure we retrieve the appropriate events const blockHashMapping: { [key: number]: Buffer | undefined } = {}; retrievedBlocks.retrievedData.forEach((block: L2Block) => { @@ -234,13 +239,10 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource this.publicClient, this.contractDeploymentEmitterAddress, blockUntilSynced, - this.nextL2BlockFromL1Block, + lastProcessedL1BlockNumber + 1n, currentL1BlockNumber, blockHashMapping, ); - if (retrievedBlocks.retrievedData.length === 0) { - return; - } this.log(`Retrieved ${retrievedBlocks.retrievedData.length} block(s) from chain`); @@ -280,9 +282,6 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource ); }), ); - - // set the L1 block for the next search - this.nextL2BlockFromL1Block = retrievedBlocks.nextEthBlockNumber; } /** diff --git a/yarn-project/pxe/src/config/index.ts b/yarn-project/pxe/src/config/index.ts index e96a5e64519..7ad14c519c6 100644 --- a/yarn-project/pxe/src/config/index.ts +++ b/yarn-project/pxe/src/config/index.ts @@ -10,7 +10,7 @@ import { fileURLToPath } from 'url'; export interface PXEServiceConfig { /** The interval to wait between polling for new blocks. */ l2BlockPollingIntervalMS: number; - /** L2 block to start scanning from */ + /** L2 block to start scanning from for new accounts */ l2StartingBlock: number; /** Where to store PXE data. If not set will store in memory */ diff --git a/yarn-project/pxe/src/database/kv_pxe_database.ts b/yarn-project/pxe/src/database/kv_pxe_database.ts index 8e5e27c2b8b..9c8768ad904 100644 --- a/yarn-project/pxe/src/database/kv_pxe_database.ts +++ b/yarn-project/pxe/src/database/kv_pxe_database.ts @@ -1,5 +1,5 @@ import { AztecAddress, BlockHeader, CompleteAddress } from '@aztec/circuits.js'; -import { Fr } from '@aztec/foundation/fields'; +import { Fr, Point } from '@aztec/foundation/fields'; import { AztecArray, AztecKVStore, AztecMap, AztecMultiMap, AztecSingleton } from '@aztec/kv-store'; import { ContractDao, MerkleTreeId, NoteFilter, PublicKey } from '@aztec/types'; @@ -7,18 +7,20 @@ import { NoteDao } from './note_dao.js'; import { PxeDatabase } from './pxe_database.js'; /** Serialized structure of a block header */ -type SerializedBlockHeader = { +type SynchronizedBlock = { /** The tree roots when the block was created */ roots: Record; /** The hash of the global variables */ globalVariablesHash: string; + /** The block number */ + blockNumber: number; }; /** * A PXE database backed by LMDB. */ export class KVPxeDatabase implements PxeDatabase { - #blockHeader: AztecSingleton; + #synchronizedBlock: AztecSingleton; #addresses: AztecArray; #addressIndex: AztecMap; #authWitnesses: AztecMap; @@ -30,6 +32,7 @@ export class KVPxeDatabase implements PxeDatabase { #notesByStorageSlot: AztecMultiMap; #notesByTxHash: AztecMultiMap; #notesByOwner: AztecMultiMap; + #syncedBlockPerPublicKey: AztecMap; #db: AztecKVStore; constructor(db: AztecKVStore) { @@ -40,9 +43,11 @@ export class KVPxeDatabase implements PxeDatabase { this.#authWitnesses = db.createMap('auth_witnesses'); this.#capsules = db.createArray('capsules'); - this.#blockHeader = db.createSingleton('block_header'); this.#contracts = db.createMap('contracts'); + this.#synchronizedBlock = db.createSingleton('block_header'); + this.#syncedBlockPerPublicKey = db.createMap('synced_block_per_public_key'); + this.#notes = db.createArray('notes'); this.#nullifiedNotes = db.createMap('nullified_notes'); @@ -173,7 +178,7 @@ export class KVPxeDatabase implements PxeDatabase { } getTreeRoots(): Record { - const roots = this.#blockHeader.get()?.roots; + const roots = this.#synchronizedBlock.get()?.roots; if (!roots) { throw new Error(`Tree roots not set`); } @@ -188,8 +193,9 @@ export class KVPxeDatabase implements PxeDatabase { }; } - async setBlockHeader(blockHeader: BlockHeader): Promise { - await this.#blockHeader.set({ + async setBlockData(blockNumber: number, blockHeader: BlockHeader): Promise { + await this.#synchronizedBlock.set({ + blockNumber, globalVariablesHash: blockHeader.globalVariablesHash.toString(), roots: { [MerkleTreeId.NOTE_HASH_TREE]: blockHeader.noteHashTreeRoot.toString(), @@ -202,8 +208,12 @@ export class KVPxeDatabase implements PxeDatabase { }); } + getBlockNumber(): number | undefined { + return this.#synchronizedBlock.get()?.blockNumber; + } + getBlockHeader(): BlockHeader { - const value = this.#blockHeader.get(); + const value = this.#synchronizedBlock.get(); if (!value) { throw new Error(`Block header not set`); } @@ -261,6 +271,14 @@ export class KVPxeDatabase implements PxeDatabase { return Promise.resolve(Array.from(this.#addresses).map(v => CompleteAddress.fromBuffer(v))); } + getSynchedBlockNumberForPublicKey(publicKey: Point): number | undefined { + return this.#syncedBlockPerPublicKey.get(publicKey.toString()); + } + + setSynchedBlockNumberForPublicKey(publicKey: Point, blockNumber: number): Promise { + return this.#syncedBlockPerPublicKey.set(publicKey.toString(), blockNumber); + } + estimateSize(): number { const notesSize = Array.from(this.#getAllNonNullifiedNotes()).reduce((sum, note) => sum + note.getSize(), 0); const authWitsSize = Array.from(this.#authWitnesses.values()).reduce( diff --git a/yarn-project/pxe/src/database/memory_db.ts b/yarn-project/pxe/src/database/memory_db.ts index 641603b0210..2cacfd651b9 100644 --- a/yarn-project/pxe/src/database/memory_db.ts +++ b/yarn-project/pxe/src/database/memory_db.ts @@ -1,6 +1,6 @@ import { BlockHeader, CompleteAddress, PublicKey } from '@aztec/circuits.js'; import { AztecAddress } from '@aztec/foundation/aztec-address'; -import { Fr } from '@aztec/foundation/fields'; +import { Fr, Point } from '@aztec/foundation/fields'; import { createDebugLogger } from '@aztec/foundation/log'; import { MerkleTreeId, NoteFilter } from '@aztec/types'; @@ -18,8 +18,10 @@ export class MemoryDB extends MemoryContractDatabase implements PxeDatabase { private notesTable: NoteDao[] = []; private treeRoots: Record | undefined; private globalVariablesHash: Fr | undefined; + private blockNumber: number | undefined; private addresses: CompleteAddress[] = []; private authWitnesses: Record = {}; + private syncedBlockPerPublicKey = new Map(); // A capsule is a "blob" of data that is passed to the contract through an oracle. // We are using a stack to keep track of the capsules that are passed to the contract. private capsuleStack: Fr[][] = []; @@ -134,8 +136,9 @@ export class MemoryDB extends MemoryContractDatabase implements PxeDatabase { ); } - public setBlockHeader(blockHeader: BlockHeader): Promise { + public setBlockData(blockNumber: number, blockHeader: BlockHeader): Promise { this.globalVariablesHash = blockHeader.globalVariablesHash; + this.blockNumber = blockNumber; this.setTreeRoots({ [MerkleTreeId.NOTE_HASH_TREE]: blockHeader.noteHashTreeRoot, [MerkleTreeId.NULLIFIER_TREE]: blockHeader.nullifierTreeRoot, @@ -148,6 +151,10 @@ export class MemoryDB extends MemoryContractDatabase implements PxeDatabase { return Promise.resolve(); } + public getBlockNumber(): number | undefined { + return this.blockNumber; + } + public addCompleteAddress(completeAddress: CompleteAddress): Promise { const accountIndex = this.addresses.findIndex(r => r.address.equals(completeAddress.address)); if (accountIndex !== -1) { @@ -174,6 +181,15 @@ export class MemoryDB extends MemoryContractDatabase implements PxeDatabase { return Promise.resolve(this.addresses); } + getSynchedBlockNumberForPublicKey(publicKey: Point): number | undefined { + return this.syncedBlockPerPublicKey.get(publicKey.toString()); + } + + setSynchedBlockNumberForPublicKey(publicKey: Point, blockNumber: number): Promise { + this.syncedBlockPerPublicKey.set(publicKey.toString(), blockNumber); + return Promise.resolve(true); + } + public estimateSize() { const notesSize = this.notesTable.reduce((sum, note) => sum + note.getSize(), 0); const treeRootsSize = this.treeRoots ? Object.entries(this.treeRoots).length * Fr.SIZE_IN_BYTES : 0; diff --git a/yarn-project/pxe/src/database/pxe_database.ts b/yarn-project/pxe/src/database/pxe_database.ts index 0c7f1551770..b2e9432a58b 100644 --- a/yarn-project/pxe/src/database/pxe_database.ts +++ b/yarn-project/pxe/src/database/pxe_database.ts @@ -79,6 +79,12 @@ export interface PxeDatabase extends ContractDatabase { */ getTreeRoots(): Record; + /** + * Gets the most recently processed block number. + * @returns The most recently processed block number or undefined if never synched. + */ + getBlockNumber(): number | undefined; + /** * Retrieve the stored Block Header from the database. * The function returns a Promise that resolves to the Block Header. @@ -86,6 +92,9 @@ export interface PxeDatabase extends ContractDatabase { * Throws an error if the block header is not available within the database. * * note: this data is a combination of the tree roots and the global variables hash. + * + * @returns The Block Header. + * @throws If no block have been processed yet. */ getBlockHeader(): BlockHeader; @@ -94,10 +103,11 @@ export interface PxeDatabase extends ContractDatabase { * This function updates the 'global variables hash' and `tree roots` property of the instance * Note that this will overwrite any existing hash or roots in the database. * + * @param blockNumber - The block number of the most recent block * @param blockHeader - An object containing the most recent block header. * @returns A Promise that resolves when the hash has been successfully updated in the database. */ - setBlockHeader(blockHeader: BlockHeader): Promise; + setBlockData(blockNumber: number, blockHeader: BlockHeader): Promise; /** * Adds complete address to the database. @@ -121,6 +131,19 @@ export interface PxeDatabase extends ContractDatabase { */ getCompleteAddresses(): Promise; + /** + * Updates up to which block number we have processed notes for a given public key. + * @param publicKey - The public key to set the synched block number for. + * @param blockNumber - The block number to set. + */ + setSynchedBlockNumberForPublicKey(publicKey: PublicKey, blockNumber: number): Promise; + + /** + * Get the synched block number for a given public key. + * @param publicKey - The public key to get the synched block number for. + */ + getSynchedBlockNumberForPublicKey(publicKey: PublicKey): number | undefined; + /** * Returns the estimated size in bytes of this db. * @returns The estimated size in bytes of this db. diff --git a/yarn-project/pxe/src/database/pxe_database_test_suite.ts b/yarn-project/pxe/src/database/pxe_database_test_suite.ts index 69eaff032b9..9f705256b9d 100644 --- a/yarn-project/pxe/src/database/pxe_database_test_suite.ts +++ b/yarn-project/pxe/src/database/pxe_database_test_suite.ts @@ -1,6 +1,6 @@ import { AztecAddress, BlockHeader, CompleteAddress } from '@aztec/circuits.js'; import { Fr, Point } from '@aztec/foundation/fields'; -import { MerkleTreeId, NoteFilter, randomTxHash } from '@aztec/types'; +import { INITIAL_L2_BLOCK_NUM, MerkleTreeId, NoteFilter, randomTxHash } from '@aztec/types'; import { NoteDao } from './note_dao.js'; import { randomNoteDao } from './note_dao.test.js'; @@ -155,13 +155,13 @@ export function describePxeDatabase(getDatabase: () => PxeDatabase) { const blockHeader = BlockHeader.random(); blockHeader.privateKernelVkTreeRoot = Fr.zero(); - await database.setBlockHeader(blockHeader); + await database.setBlockData(INITIAL_L2_BLOCK_NUM, blockHeader); expect(database.getBlockHeader()).toEqual(blockHeader); }); it('retrieves the merkle tree roots from the block', async () => { const blockHeader = BlockHeader.random(); - await database.setBlockHeader(blockHeader); + await database.setBlockData(INITIAL_L2_BLOCK_NUM, blockHeader); expect(database.getTreeRoots()).toEqual({ [MerkleTreeId.NOTE_HASH_TREE]: blockHeader.noteHashTreeRoot, [MerkleTreeId.NULLIFIER_TREE]: blockHeader.nullifierTreeRoot, diff --git a/yarn-project/pxe/src/note_processor/note_processor.test.ts b/yarn-project/pxe/src/note_processor/note_processor.test.ts index a317c24cddd..f3ee16ea28a 100644 --- a/yarn-project/pxe/src/note_processor/note_processor.test.ts +++ b/yarn-project/pxe/src/note_processor/note_processor.test.ts @@ -223,4 +223,26 @@ describe('Note Processor', () => { addedNoteDaos.forEach(info => nonceSet.add(info.nonce.value)); expect(nonceSet.size).toBe(notes.length); }); + + it('advances the block number', async () => { + const { blockContexts, encryptedLogsArr } = mockData([[2]]); + await noteProcessor.process(blockContexts, encryptedLogsArr); + expect(noteProcessor.status.syncedToBlock).toEqual(blockContexts.at(-1)?.block.number); + }); + + it('should restore the last block number processed and ignore the starting block', async () => { + const { blockContexts, encryptedLogsArr } = mockData([[2]]); + await noteProcessor.process(blockContexts, encryptedLogsArr); + + const newNoteProcessor = new NoteProcessor( + owner.getPublicKey(), + keyStore, + database, + aztecNode, + INITIAL_L2_BLOCK_NUM, + simulator, + ); + + expect(newNoteProcessor.status).toEqual(noteProcessor.status); + }); }); diff --git a/yarn-project/pxe/src/note_processor/note_processor.ts b/yarn-project/pxe/src/note_processor/note_processor.ts index 436085df589..7a69fb2e007 100644 --- a/yarn-project/pxe/src/note_processor/note_processor.ts +++ b/yarn-project/pxe/src/note_processor/note_processor.ts @@ -4,7 +4,7 @@ import { Grumpkin } from '@aztec/circuits.js/barretenberg'; import { Fr } from '@aztec/foundation/fields'; import { createDebugLogger } from '@aztec/foundation/log'; import { Timer } from '@aztec/foundation/timer'; -import { AztecNode, KeyStore, L1NotePayload, L2BlockContext, L2BlockL2Logs } from '@aztec/types'; +import { AztecNode, INITIAL_L2_BLOCK_NUM, KeyStore, L1NotePayload, L2BlockContext, L2BlockL2Logs } from '@aztec/types'; import { NoteProcessorStats } from '@aztec/types/stats'; import { PxeDatabase } from '../database/index.js'; @@ -30,9 +30,6 @@ interface ProcessedData { * before storing them against their owner. */ export class NoteProcessor { - /** The latest L2 block number that the note processor has synchronized to. */ - private syncedToBlock = 0; - /** Keeps track of processing time since an instance is created. */ public readonly timer: Timer = new Timer(); @@ -47,12 +44,10 @@ export class NoteProcessor { private keyStore: KeyStore, private db: PxeDatabase, private node: AztecNode, - private startingBlock: number, + private startingBlock: number = INITIAL_L2_BLOCK_NUM, private simulator = getAcirSimulator(db, node, keyStore), private log = createDebugLogger('aztec:note_processor'), - ) { - this.syncedToBlock = this.startingBlock - 1; - } + ) {} /** * Check if the NoteProcessor is synchronized with the remote block number. @@ -63,14 +58,18 @@ export class NoteProcessor { */ public async isSynchronized() { const remoteBlockNumber = await this.node.getBlockNumber(); - return this.syncedToBlock === remoteBlockNumber; + return this.getSyncedToBlock() === remoteBlockNumber; } /** * Returns synchronization status (ie up to which block has been synced ) for this note processor. */ public get status() { - return { syncedToBlock: this.syncedToBlock }; + return { syncedToBlock: this.getSyncedToBlock() }; + } + + private getSyncedToBlock(): number { + return this.db.getSynchedBlockNumberForPublicKey(this.publicKey) ?? this.startingBlock - 1; } /** @@ -171,8 +170,10 @@ export class NoteProcessor { await this.processBlocksAndNotes(blocksAndNotes); - this.syncedToBlock = l2BlockContexts[l2BlockContexts.length - 1].block.number; - this.log(`Synched block ${this.syncedToBlock}`); + const syncedToBlock = l2BlockContexts[l2BlockContexts.length - 1].block.number; + await this.db.setSynchedBlockNumberForPublicKey(this.publicKey, syncedToBlock); + + this.log(`Synched block ${syncedToBlock}`); } /** diff --git a/yarn-project/pxe/src/pxe_service/pxe_service.ts b/yarn-project/pxe/src/pxe_service/pxe_service.ts index f4cf6eb29d9..e761155a41c 100644 --- a/yarn-project/pxe/src/pxe_service/pxe_service.ts +++ b/yarn-project/pxe/src/pxe_service/pxe_service.ts @@ -92,12 +92,27 @@ export class PXEService implements PXE { * @returns A promise that resolves when the server has started successfully. */ public async start() { - const { l2BlockPollingIntervalMS, l2StartingBlock } = this.config; - await this.synchronizer.start(l2StartingBlock, 1, l2BlockPollingIntervalMS); + const { l2BlockPollingIntervalMS } = this.config; + await this.synchronizer.start(1, l2BlockPollingIntervalMS); + await this.restoreNoteProcessors(); const info = await this.getNodeInfo(); this.log.info(`Started PXE connected to chain ${info.chainId} version ${info.protocolVersion}`); } + private async restoreNoteProcessors() { + const publicKeys = await this.keyStore.getAccounts(); + const publicKeysSet = new Set(publicKeys.map(k => k.toString())); + + const registeredAddresses = await this.db.getCompleteAddresses(); + + for (const address of registeredAddresses) { + if (!publicKeysSet.has(address.publicKey.toString())) { + continue; + } + this.synchronizer.addAccount(address.publicKey, this.keyStore, this.config.l2StartingBlock); + } + } + /** * Stops the PXE Service, halting processing of new transactions and shutting down the synchronizer. * This function ensures that all ongoing tasks are completed before stopping the server. diff --git a/yarn-project/pxe/src/synchronizer/synchronizer.ts b/yarn-project/pxe/src/synchronizer/synchronizer.ts index 76663ea34fb..434b7de0c91 100644 --- a/yarn-project/pxe/src/synchronizer/synchronizer.ts +++ b/yarn-project/pxe/src/synchronizer/synchronizer.ts @@ -20,8 +20,7 @@ export class Synchronizer { private noteProcessors: NoteProcessor[] = []; private interruptibleSleep = new InterruptibleSleep(); private running = false; - private initialSyncBlockNumber = 0; - private synchedToBlock = 0; + private initialSyncBlockNumber = INITIAL_L2_BLOCK_NUM - 1; private log: DebugLogger; private noteProcessorsToCatchUp: NoteProcessor[] = []; @@ -34,21 +33,15 @@ export class Synchronizer { * Continuously processes the fetched data for all note processors until stopped. If there is no data * available, it retries after a specified interval. * - * @param from - The starting position for fetching encrypted logs and blocks. * @param limit - The maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration. * @param retryInterval - The time interval (in ms) to wait before retrying if no data is available. */ - public async start(from = INITIAL_L2_BLOCK_NUM, limit = 1, retryInterval = 1000) { + public async start(limit = 1, retryInterval = 1000) { if (this.running) { return; } this.running = true; - if (from < this.synchedToBlock + 1) { - throw new Error(`From block ${from} is smaller than the currently synched block ${this.synchedToBlock}`); - } - this.synchedToBlock = from - 1; - await this.initialSync(); const run = async () => { @@ -68,14 +61,17 @@ export class Synchronizer { } protected async initialSync() { - const [blockNumber, blockHeader] = await Promise.all([this.node.getBlockNumber(), this.node.getBlockHeader()]); - this.initialSyncBlockNumber = blockNumber; - this.synchedToBlock = this.initialSyncBlockNumber; - await this.db.setBlockHeader(blockHeader); + // fast forward to the latest block + const [latestBlockNumber, latestBlockHeader] = await Promise.all([ + this.node.getBlockNumber(), + this.node.getBlockHeader(), + ]); + this.initialSyncBlockNumber = latestBlockNumber; + await this.db.setBlockData(latestBlockNumber, latestBlockHeader); } protected async work(limit = 1, retryInterval = 1000): Promise { - const from = this.synchedToBlock + 1; + const from = this.getSynchedBlockNumber() + 1; try { let encryptedLogs = await this.node.getLogs(from, limit, LogType.ENCRYPTED); if (!encryptedLogs.length) { @@ -112,8 +108,8 @@ export class Synchronizer { block.attachLogs(unencryptedLogs[i], LogType.UNENCRYPTED); }); - // Wrap blocks in block contexts. - const blockContexts = blocks.map(block => new L2BlockContext(block)); + // Wrap blocks in block contexts & only keep those that match our query + const blockContexts = blocks.filter(block => block.number >= from).map(block => new L2BlockContext(block)); // Update latest tree roots from the most recent block const latestBlock = blockContexts[blockContexts.length - 1]; @@ -124,8 +120,6 @@ export class Synchronizer { for (const noteProcessor of this.noteProcessors) { await noteProcessor.process(blockContexts, encryptedLogs); } - - this.synchedToBlock = latestBlock.block.number; } catch (err) { this.log.error(`Error in synchronizer work`, err); await this.interruptibleSleep.sleep(retryInterval); @@ -134,7 +128,9 @@ export class Synchronizer { protected async workNoteProcessorCatchUp(limit = 1, retryInterval = 1000): Promise { const noteProcessor = this.noteProcessorsToCatchUp[0]; - if (noteProcessor.status.syncedToBlock === this.synchedToBlock) { + const toBlockNumber = this.getSynchedBlockNumber(); + + if (noteProcessor.status.syncedToBlock >= toBlockNumber) { // Note processor already synched, nothing to do this.noteProcessorsToCatchUp.shift(); this.noteProcessors.push(noteProcessor); @@ -143,7 +139,7 @@ export class Synchronizer { const from = noteProcessor.status.syncedToBlock + 1; // Ensuring that the note processor does not sync further than the main sync. - limit = Math.min(limit, this.synchedToBlock - from + 1); + limit = Math.min(limit, toBlockNumber - from + 1); if (limit < 1) { throw new Error(`Unexpected limit ${limit} for note processor catch up`); @@ -176,7 +172,7 @@ export class Synchronizer { this.log(`Forwarding ${logCount} encrypted logs and blocks to note processor in catch up mode`); await noteProcessor.process(blockContexts, encryptedLogs); - if (noteProcessor.status.syncedToBlock === this.synchedToBlock) { + if (noteProcessor.status.syncedToBlock === toBlockNumber) { // Note processor caught up, move it to `noteProcessors` from `noteProcessorsToCatchUp`. this.log(`Note processor for ${noteProcessor.publicKey.toString()} has caught up`, { eventName: 'note-processor-caught-up', @@ -212,7 +208,7 @@ export class Synchronizer { globalsHash, ); - await this.db.setBlockHeader(blockHeader); + await this.db.setBlockData(block.number, blockHeader); } /** @@ -272,6 +268,10 @@ export class Synchronizer { return await processor.isSynchronized(); } + private getSynchedBlockNumber() { + return this.db.getBlockNumber() ?? this.initialSyncBlockNumber; + } + /** * Checks whether all the blocks were processed (tree roots updated, txs updated with block info, etc.). * @returns True if there are no outstanding blocks to be synched. @@ -280,7 +280,7 @@ export class Synchronizer { */ public async isGlobalStateSynchronized() { const latest = await this.node.getBlockNumber(); - return latest <= this.synchedToBlock; + return latest <= this.getSynchedBlockNumber(); } /** @@ -288,8 +288,9 @@ export class Synchronizer { * @returns The latest block synchronized for blocks, and the latest block synched for notes for each public key being tracked. */ public getSyncStatus() { + const lastBlockNumber = this.getSynchedBlockNumber(); return { - blocks: this.synchedToBlock, + blocks: lastBlockNumber, notes: Object.fromEntries(this.noteProcessors.map(n => [n.publicKey.toString(), n.status.syncedToBlock])), }; }