diff --git a/yarn-project/archiver/package.json b/yarn-project/archiver/package.json index 66a7c825d9b..b837e1d9b38 100644 --- a/yarn-project/archiver/package.json +++ b/yarn-project/archiver/package.json @@ -39,6 +39,7 @@ "@aztec/circuits.js": "workspace:^", "@aztec/ethereum": "workspace:^", "@aztec/foundation": "workspace:^", + "@aztec/kv-store": "workspace:^", "@aztec/l1-artifacts": "workspace:^", "@types/lodash.omit": "^4.5.7", "debug": "^4.3.4", diff --git a/yarn-project/archiver/src/archiver/index.ts b/yarn-project/archiver/src/archiver/index.ts index 0ef2b0025c8..a7294537624 100644 --- a/yarn-project/archiver/src/archiver/index.ts +++ b/yarn-project/archiver/src/archiver/index.ts @@ -1,5 +1,5 @@ export * from './archiver.js'; export * from './config.js'; export { MemoryArchiverStore } from './memory_archiver_store/memory_archiver_store.js'; -export { LMDBArchiverStore } from './lmdb_archiver_store.js'; export { ArchiverDataStore } from './archiver_store.js'; +export { KVArchiverDataStore } from './kv_archiver_store/kv_archiver_store.js'; diff --git a/yarn-project/archiver/src/archiver/kv_archiver_store/block_store.ts b/yarn-project/archiver/src/archiver/kv_archiver_store/block_store.ts new file mode 100644 index 00000000000..025221fe73f --- /dev/null +++ b/yarn-project/archiver/src/archiver/kv_archiver_store/block_store.ts @@ -0,0 +1,170 @@ +import { INITIAL_L2_BLOCK_NUM, L2Block, L2Tx, TxHash } from '@aztec/circuit-types'; +import { AztecAddress } from '@aztec/circuits.js'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { AztecKVStore, AztecMap, Range } from '@aztec/kv-store'; + +/* eslint-disable */ +type BlockIndexValue = [blockNumber: number, index: number]; + +type BlockContext = { + blockNumber: number; + l1BlockNumber: bigint; + block: Buffer; + blockHash: Buffer; +}; +/* eslint-enable */ + +/** + * LMDB implementation of the ArchiverDataStore interface. + */ +export class BlockStore { + /** Map block number to block data */ + #blocks: AztecMap; + + /** Index mapping transaction hash (as a string) to its location in a block */ + #txIndex: AztecMap; + + /** Index mapping a contract's address (as a string) to its location in a block */ + #contractIndex: AztecMap; + + #log = createDebugLogger('aztec:archiver:block_store'); + + constructor(private db: AztecKVStore) { + this.#blocks = db.createMap('archiver_blocks'); + + this.#txIndex = db.createMap('archiver_tx_index'); + this.#contractIndex = db.createMap('archiver_contract_index'); + } + + /** + * Append new blocks to the store's list. + * @param blocks - The L2 blocks to be added to the store. + * @returns True if the operation is successful. + */ + addBlocks(blocks: L2Block[]): Promise { + return this.db.transaction(() => { + for (const block of blocks) { + void this.#blocks.set(block.number, { + blockNumber: block.number, + block: block.toBuffer(), + l1BlockNumber: block.getL1BlockNumber(), + blockHash: block.getBlockHash(), + }); + + for (const [i, tx] of block.getTxs().entries()) { + if (tx.txHash.isZero()) { + continue; + } + void this.#txIndex.set(tx.txHash.toString(), [block.number, i]); + } + + for (const [i, contractData] of block.newContractData.entries()) { + if (contractData.contractAddress.isZero()) { + continue; + } + + void this.#contractIndex.set(contractData.contractAddress.toString(), [block.number, i]); + } + } + + return true; + }); + } + + /** + * Gets up to `limit` amount of L2 blocks starting from `from`. + * @param start - Number of the first block to return (inclusive). + * @param limit - The number of blocks to return. + * @returns The requested L2 blocks, without logs attached + */ + *getBlocks(start: number, limit: number): IterableIterator { + for (const blockCtx of this.#blocks.values(this.#computeBlockRange(start, limit))) { + yield L2Block.fromBuffer(blockCtx.block, blockCtx.blockHash); + } + } + + /** + * Gets an L2 block. + * @param blockNumber - The number of the block to return. + * @returns The requested L2 block, without logs attached + */ + getBlock(blockNumber: number): L2Block | undefined { + const blockCtx = this.#blocks.get(blockNumber); + if (!blockCtx || !blockCtx.block) { + return undefined; + } + + const block = L2Block.fromBuffer(blockCtx.block, blockCtx.blockHash); + + return block; + } + + /** + * Gets an l2 tx. + * @param txHash - The txHash of the l2 tx. + * @returns The requested L2 tx. + */ + getL2Tx(txHash: TxHash): L2Tx | undefined { + const [blockNumber, txIndex] = this.getL2TxLocation(txHash) ?? []; + if (typeof blockNumber !== 'number' || typeof txIndex !== 'number') { + return undefined; + } + + const block = this.getBlock(blockNumber); + return block?.getTx(txIndex); + } + + /** + * Looks up which block included the requested L2 tx. + * @param txHash - The txHash of the l2 tx. + * @returns The block number and index of the tx. + */ + getL2TxLocation(txHash: TxHash): [blockNumber: number, txIndex: number] | undefined { + return this.#txIndex.get(txHash.toString()); + } + + /** + * Looks up which block deployed a particular contract. + * @param contractAddress - The address of the contract to look up. + * @returns The block number and index of the contract. + */ + getContractLocation(contractAddress: AztecAddress): [blockNumber: number, index: number] | undefined { + return this.#contractIndex.get(contractAddress.toString()); + } + + /** + * Gets the number of the latest L2 block processed. + * @returns The number of the latest L2 block processed. + */ + getBlockNumber(): number { + const [lastBlockNumber] = this.#blocks.keys({ reverse: true, limit: 1 }); + return typeof lastBlockNumber === 'number' ? lastBlockNumber : INITIAL_L2_BLOCK_NUM - 1; + } + + /** + * Gets the most recent L1 block processed. + * @returns The L1 block that published the latest L2 block + */ + getL1BlockNumber(): bigint { + const [lastBlock] = this.#blocks.values({ reverse: true, limit: 1 }); + if (!lastBlock) { + return 0n; + } else { + return lastBlock.l1BlockNumber; + } + } + + #computeBlockRange(start: number, limit: number): Required, 'start' | 'end'>> { + if (limit < 1) { + throw new Error(`Invalid limit: ${limit}`); + } + + if (start < INITIAL_L2_BLOCK_NUM) { + this.#log(`Clamping start block ${start} to ${INITIAL_L2_BLOCK_NUM}`); + start = INITIAL_L2_BLOCK_NUM; + } + + const end = start + limit; + return { start, end }; + } +} diff --git a/yarn-project/archiver/src/archiver/kv_archiver_store/contract_store.ts b/yarn-project/archiver/src/archiver/kv_archiver_store/contract_store.ts new file mode 100644 index 00000000000..055b25af20d --- /dev/null +++ b/yarn-project/archiver/src/archiver/kv_archiver_store/contract_store.ts @@ -0,0 +1,93 @@ +import { ContractData, ExtendedContractData } from '@aztec/circuit-types'; +import { AztecAddress } from '@aztec/foundation/aztec-address'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { AztecKVStore, AztecMap } from '@aztec/kv-store'; + +import { BlockStore } from './block_store.js'; + +/** + * LMDB implementation of the ArchiverDataStore interface. + */ +export class ContractStore { + #blockStore: BlockStore; + #extendedContractData: AztecMap; + #log = createDebugLogger('aztec:archiver:contract_store'); + + constructor(private db: AztecKVStore, blockStore: BlockStore) { + this.#extendedContractData = db.createMap('archiver_extended_contract_data'); + this.#blockStore = blockStore; + } + + /** + * Add new extended contract data from an L2 block to the store's list. + * @param data - List of contracts' data to be added. + * @param blockNum - Number of the L2 block the contract data was deployed in. + * @returns True if the operation is successful. + */ + addExtendedContractData(data: ExtendedContractData[], blockNum: number): Promise { + return this.#extendedContractData.swap(blockNum, (existingData = []) => { + existingData.push(...data.map(d => d.toBuffer())); + return existingData; + }); + } + + /** + * Get the extended contract data for this contract. + * @param contractAddress - The contract data address. + * @returns The extended contract data or undefined if not found. + */ + getExtendedContractData(contractAddress: AztecAddress): ExtendedContractData | undefined { + const [blockNumber, _] = this.#blockStore.getContractLocation(contractAddress) ?? []; + + if (typeof blockNumber !== 'number') { + return undefined; + } + + for (const contract of this.#extendedContractData.get(blockNumber) ?? []) { + const extendedContractData = ExtendedContractData.fromBuffer(contract); + if (extendedContractData.contractData.contractAddress.equals(contractAddress)) { + return extendedContractData; + } + } + + return undefined; + } + + /** + * Lookup all extended contract data in an L2 block. + * @param blockNumber - The block number to get all contract data from. + * @returns All extended contract data in the block (if found). + */ + getExtendedContractDataInBlock(blockNumber: number): Array { + return (this.#extendedContractData.get(blockNumber) ?? []).map(contract => + ExtendedContractData.fromBuffer(contract), + ); + } + + /** + * Get basic info for an L2 contract. + * Contains contract address & the ethereum portal address. + * @param contractAddress - The contract data address. + * @returns ContractData with the portal address (if we didn't throw an error). + */ + getContractData(contractAddress: AztecAddress): ContractData | undefined { + const [blockNumber, index] = this.#blockStore.getContractLocation(contractAddress) ?? []; + if (typeof blockNumber !== 'number' || typeof index !== 'number') { + return undefined; + } + + const block = this.#blockStore.getBlock(blockNumber); + return block?.newContractData[index]; + } + + /** + * Get basic info for an all L2 contracts deployed in a block. + * Contains contract address & the ethereum portal address. + * @param blockNumber - Number of the L2 block where contracts were deployed. + * @returns ContractData with the portal address (if we didn't throw an error). + */ + getContractDataInBlock(blockNumber: number): ContractData[] { + const block = this.#blockStore.getBlock(blockNumber); + return block?.newContractData ?? []; + } +} diff --git a/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.test.ts b/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.test.ts new file mode 100644 index 00000000000..2903ea6fe9c --- /dev/null +++ b/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.test.ts @@ -0,0 +1,15 @@ +import { EthAddress } from '@aztec/circuits.js'; +import { AztecLmdbStore } from '@aztec/kv-store'; + +import { describeArchiverDataStore } from '../archiver_store_test_suite.js'; +import { KVArchiverDataStore } from './kv_archiver_store.js'; + +describe('KVArchiverDataStore', () => { + let archiverStore: KVArchiverDataStore; + + beforeEach(async () => { + archiverStore = new KVArchiverDataStore(await AztecLmdbStore.create(EthAddress.random())); + }); + + describeArchiverDataStore('ArchiverStore', () => archiverStore); +}); diff --git a/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.ts b/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.ts new file mode 100644 index 00000000000..74be29d9d58 --- /dev/null +++ b/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.ts @@ -0,0 +1,241 @@ +import { + ContractData, + ExtendedContractData, + GetUnencryptedLogsResponse, + L1ToL2Message, + L2Block, + L2BlockL2Logs, + L2Tx, + LogFilter, + LogType, + TxHash, +} from '@aztec/circuit-types'; +import { Fr } from '@aztec/circuits.js'; +import { AztecAddress } from '@aztec/foundation/aztec-address'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { AztecKVStore } from '@aztec/kv-store'; + +import { ArchiverDataStore, ArchiverL1SynchPoint } from '../archiver_store.js'; +import { BlockStore } from './block_store.js'; +import { ContractStore } from './contract_store.js'; +import { LogStore } from './log_store.js'; +import { MessageStore } from './message_store.js'; + +/** + * LMDB implementation of the ArchiverDataStore interface. + */ +export class KVArchiverDataStore implements ArchiverDataStore { + #blockStore: BlockStore; + #logStore: LogStore; + #contractStore: ContractStore; + #messageStore: MessageStore; + + #log = createDebugLogger('aztec:archiver:lmdb'); + + constructor(db: AztecKVStore, logsMaxPageSize: number = 1000) { + this.#blockStore = new BlockStore(db); + this.#logStore = new LogStore(db, this.#blockStore, logsMaxPageSize); + this.#contractStore = new ContractStore(db, this.#blockStore); + this.#messageStore = new MessageStore(db); + } + + /** + * Append new blocks to the store's list. + * @param blocks - The L2 blocks to be added to the store. + * @returns True if the operation is successful. + */ + addBlocks(blocks: L2Block[]): Promise { + return this.#blockStore.addBlocks(blocks); + } + + /** + * Gets up to `limit` amount of L2 blocks starting from `from`. + * The blocks returned do not contain any logs. + * + * @param start - Number of the first block to return (inclusive). + * @param limit - The number of blocks to return. + * @returns The requested L2 blocks, without any logs attached + */ + getBlocks(start: number, limit: number): Promise { + try { + return Promise.resolve(Array.from(this.#blockStore.getBlocks(start, limit))); + } catch (err) { + // this function is sync so if any errors are thrown we need to make sure they're passed on as rejected Promises + return Promise.reject(err); + } + } + + /** + * Gets an l2 tx. + * @param txHash - The txHash of the l2 tx. + * @returns The requested L2 tx. + */ + getL2Tx(txHash: TxHash): Promise { + return Promise.resolve(this.#blockStore.getL2Tx(txHash)); + } + + /** + * Append new logs to the store's list. + * @param encryptedLogs - The logs to be added to the store. + * @param unencryptedLogs - The type of the logs to be added to the store. + * @param blockNumber - The block for which to add the logs. + * @returns True if the operation is successful. + */ + addLogs( + encryptedLogs: L2BlockL2Logs | undefined, + unencryptedLogs: L2BlockL2Logs | undefined, + blockNumber: number, + ): Promise { + return this.#logStore.addLogs(encryptedLogs, unencryptedLogs, blockNumber); + } + + /** + * Append new pending L1 to L2 messages to the store. + * @param messages - The L1 to L2 messages to be added to the store. + * @param l1BlockNumber - The L1 block number for which to add the messages. + * @returns True if the operation is successful. + */ + addPendingL1ToL2Messages(messages: L1ToL2Message[], l1BlockNumber: bigint): Promise { + return Promise.resolve(this.#messageStore.addPendingMessages(messages, l1BlockNumber)); + } + + /** + * Remove pending L1 to L2 messages from the store (if they were cancelled). + * @param messages - The message keys to be removed from the store. + * @param l1BlockNumber - The L1 block number for which to remove the messages. + * @returns True if the operation is successful. + */ + cancelPendingL1ToL2Messages(messages: Fr[], l1BlockNumber: bigint): Promise { + return Promise.resolve(this.#messageStore.cancelPendingMessages(messages, l1BlockNumber)); + } + + /** + * Messages that have been published in an L2 block are confirmed. + * Add them to the confirmed store, also remove them from the pending store. + * @param entryKeys - The message keys to be removed from the store. + * @param blockNumber - The block for which to add the messages. + * @returns True if the operation is successful. + */ + confirmL1ToL2Messages(entryKeys: Fr[]): Promise { + return this.#messageStore.confirmPendingMessages(entryKeys); + } + + /** + * Gets up to `limit` amount of pending L1 to L2 messages, sorted by fee + * @param limit - The number of messages to return (by default NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP). + * @returns The requested L1 to L2 message keys. + */ + getPendingL1ToL2MessageKeys(limit: number): Promise { + return Promise.resolve(this.#messageStore.getPendingMessageKeysByFee(limit)); + } + + /** + * Gets the confirmed L1 to L2 message corresponding to the given message key. + * @param messageKey - The message key to look up. + * @returns The requested L1 to L2 message or throws if not found. + */ + getConfirmedL1ToL2Message(messageKey: Fr): Promise { + try { + return Promise.resolve(this.#messageStore.getConfirmedMessage(messageKey)); + } catch (err) { + return Promise.reject(err); + } + } + + /** + * Gets up to `limit` amount of logs starting from `from`. + * @param start - Number of the L2 block to which corresponds the first logs to be returned. + * @param limit - The number of logs to return. + * @param logType - Specifies whether to return encrypted or unencrypted logs. + * @returns The requested logs. + */ + getLogs(start: number, limit: number, logType: LogType): Promise { + try { + return Promise.resolve(Array.from(this.#logStore.getLogs(start, limit, logType))); + } catch (err) { + return Promise.reject(err); + } + } + + /** + * Gets unencrypted logs based on the provided filter. + * @param filter - The filter to apply to the logs. + * @returns The requested logs. + */ + getUnencryptedLogs(filter: LogFilter): Promise { + try { + return Promise.resolve(this.#logStore.getUnencryptedLogs(filter)); + } catch (err) { + return Promise.reject(err); + } + } + + /** + * Add new extended contract data from an L2 block to the store's list. + * @param data - List of contracts' data to be added. + * @param blockNum - Number of the L2 block the contract data was deployed in. + * @returns True if the operation is successful. + */ + addExtendedContractData(data: ExtendedContractData[], blockNum: number): Promise { + return this.#contractStore.addExtendedContractData(data, blockNum); + } + + /** + * Get the extended contract data for this contract. + * @param contractAddress - The contract data address. + * @returns The extended contract data or undefined if not found. + */ + getExtendedContractData(contractAddress: AztecAddress): Promise { + return Promise.resolve(this.#contractStore.getExtendedContractData(contractAddress)); + } + + /** + * Lookup all extended contract data in an L2 block. + * @param blockNumber - The block number to get all contract data from. + * @returns All extended contract data in the block (if found). + */ + getExtendedContractDataInBlock(blockNumber: number): Promise { + return Promise.resolve(Array.from(this.#contractStore.getExtendedContractDataInBlock(blockNumber))); + } + + /** + * Get basic info for an L2 contract. + * Contains contract address & the ethereum portal address. + * @param contractAddress - The contract data address. + * @returns ContractData with the portal address (if we didn't throw an error). + */ + getContractData(contractAddress: AztecAddress): Promise { + return Promise.resolve(this.#contractStore.getContractData(contractAddress)); + } + + /** + * Get basic info for an all L2 contracts deployed in a block. + * Contains contract address & the ethereum portal address. + * @param blockNumber - Number of the L2 block where contracts were deployed. + * @returns ContractData with the portal address (if we didn't throw an error). + */ + getContractDataInBlock(blockNumber: number): Promise { + return Promise.resolve(Array.from(this.#contractStore.getContractDataInBlock(blockNumber))); + } + + /** + * Gets the number of the latest L2 block processed. + * @returns The number of the latest L2 block processed. + */ + getBlockNumber(): Promise { + return Promise.resolve(this.#blockStore.getBlockNumber()); + } + + /** + * Gets the last L1 block number processed by the archiver + */ + getL1BlockNumber(): Promise { + const addedBlock = this.#blockStore.getL1BlockNumber(); + const { addedMessages, cancelledMessages } = this.#messageStore.getL1BlockNumber(); + return Promise.resolve({ + addedBlock, + addedMessages, + cancelledMessages, + }); + } +} diff --git a/yarn-project/archiver/src/archiver/kv_archiver_store/log_store.ts b/yarn-project/archiver/src/archiver/kv_archiver_store/log_store.ts new file mode 100644 index 00000000000..afab800fb48 --- /dev/null +++ b/yarn-project/archiver/src/archiver/kv_archiver_store/log_store.ts @@ -0,0 +1,174 @@ +import { + ExtendedUnencryptedL2Log, + GetUnencryptedLogsResponse, + INITIAL_L2_BLOCK_NUM, + L2BlockL2Logs, + LogFilter, + LogId, + LogType, + UnencryptedL2Log, +} from '@aztec/circuit-types'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { AztecKVStore, AztecMap } from '@aztec/kv-store'; + +import { BlockStore } from './block_store.js'; + +/** + * A store for logs + */ +export class LogStore { + #encryptedLogs: AztecMap; + #unencryptedLogs: AztecMap; + #logsMaxPageSize: number; + #log = createDebugLogger('aztec:archiver:log_store'); + + constructor(private db: AztecKVStore, private blockStore: BlockStore, logsMaxPageSize: number = 1000) { + this.#encryptedLogs = db.createMap('archiver_encrypted_logs'); + this.#unencryptedLogs = db.createMap('archiver_unencrypted_logs'); + + this.#logsMaxPageSize = logsMaxPageSize; + } + + /** + * Append new logs to the store's list. + * @param encryptedLogs - The logs to be added to the store. + * @param unencryptedLogs - The type of the logs to be added to the store. + * @param blockNumber - The block for which to add the logs. + * @returns True if the operation is successful. + */ + addLogs( + encryptedLogs: L2BlockL2Logs | undefined, + unencryptedLogs: L2BlockL2Logs | undefined, + blockNumber: number, + ): Promise { + return this.db.transaction(() => { + if (encryptedLogs) { + void this.#encryptedLogs.set(blockNumber, encryptedLogs.toBuffer()); + } + + if (unencryptedLogs) { + void this.#unencryptedLogs.set(blockNumber, unencryptedLogs.toBuffer()); + } + + return true; + }); + } + + /** + * Gets up to `limit` amount of logs starting from `from`. + * @param start - Number of the L2 block to which corresponds the first logs to be returned. + * @param limit - The number of logs to return. + * @param logType - Specifies whether to return encrypted or unencrypted logs. + * @returns The requested logs. + */ + *getLogs(start: number, limit: number, logType: LogType): IterableIterator { + const logMap = logType === LogType.ENCRYPTED ? this.#encryptedLogs : this.#unencryptedLogs; + for (const buffer of logMap.values({ start, limit })) { + yield L2BlockL2Logs.fromBuffer(buffer); + } + } + + /** + * Gets unencrypted logs based on the provided filter. + * @param filter - The filter to apply to the logs. + * @returns The requested logs. + */ + getUnencryptedLogs(filter: LogFilter): GetUnencryptedLogsResponse { + if (filter.afterLog) { + return this.#filterUnencryptedLogsBetweenBlocks(filter); + } else if (filter.txHash) { + return this.#filterUnencryptedLogsOfTx(filter); + } else { + return this.#filterUnencryptedLogsBetweenBlocks(filter); + } + } + + #filterUnencryptedLogsOfTx(filter: LogFilter): GetUnencryptedLogsResponse { + if (!filter.txHash) { + throw new Error('Missing txHash'); + } + + const [blockNumber, txIndex] = this.blockStore.getL2TxLocation(filter.txHash) ?? []; + if (typeof blockNumber !== 'number' || typeof txIndex !== 'number') { + return { logs: [], maxLogsHit: false }; + } + + const unencryptedLogsInBlock = this.#getBlockLogs(blockNumber, LogType.UNENCRYPTED); + const txLogs = unencryptedLogsInBlock.txLogs[txIndex].unrollLogs().map(log => UnencryptedL2Log.fromBuffer(log)); + + const logs: ExtendedUnencryptedL2Log[] = []; + const maxLogsHit = this.#accumulateLogs(logs, blockNumber, txIndex, txLogs, filter); + + return { logs, maxLogsHit }; + } + + #filterUnencryptedLogsBetweenBlocks(filter: LogFilter): GetUnencryptedLogsResponse { + const start = + filter.afterLog?.blockNumber ?? Math.max(filter.fromBlock ?? INITIAL_L2_BLOCK_NUM, INITIAL_L2_BLOCK_NUM); + const end = filter.toBlock; + + if (typeof end === 'number' && end < start) { + return { + logs: [], + maxLogsHit: true, + }; + } + + const logs: ExtendedUnencryptedL2Log[] = []; + + let maxLogsHit = false; + loopOverBlocks: for (const [blockNumber, logBuffer] of this.#unencryptedLogs.entries({ start, end })) { + const unencryptedLogsInBlock = L2BlockL2Logs.fromBuffer(logBuffer); + for (let txIndex = filter.afterLog?.txIndex ?? 0; txIndex < unencryptedLogsInBlock.txLogs.length; txIndex++) { + const txLogs = unencryptedLogsInBlock.txLogs[txIndex].unrollLogs().map(log => UnencryptedL2Log.fromBuffer(log)); + maxLogsHit = this.#accumulateLogs(logs, blockNumber, txIndex, txLogs, filter); + if (maxLogsHit) { + this.#log(`Max logs hit at block ${blockNumber}`); + break loopOverBlocks; + } + } + } + + return { logs, maxLogsHit }; + } + + #accumulateLogs( + results: ExtendedUnencryptedL2Log[], + blockNumber: number, + txIndex: number, + txLogs: UnencryptedL2Log[], + filter: LogFilter, + ): boolean { + let maxLogsHit = false; + let logIndex = typeof filter.afterLog?.logIndex === 'number' ? filter.afterLog.logIndex + 1 : 0; + for (; logIndex < txLogs.length; logIndex++) { + const log = txLogs[logIndex]; + if (filter.contractAddress && !log.contractAddress.equals(filter.contractAddress)) { + continue; + } + + if (filter.selector && !log.selector.equals(filter.selector)) { + continue; + } + + results.push(new ExtendedUnencryptedL2Log(new LogId(blockNumber, txIndex, logIndex), log)); + if (results.length >= this.#logsMaxPageSize) { + maxLogsHit = true; + break; + } + } + + return maxLogsHit; + } + + #getBlockLogs(blockNumber: number, logType: LogType): L2BlockL2Logs { + const logMap = logType === LogType.ENCRYPTED ? this.#encryptedLogs : this.#unencryptedLogs; + const buffer = logMap.get(blockNumber); + + if (!buffer) { + return new L2BlockL2Logs([]); + } + + return L2BlockL2Logs.fromBuffer(buffer); + } +} diff --git a/yarn-project/archiver/src/archiver/kv_archiver_store/message_store.ts b/yarn-project/archiver/src/archiver/kv_archiver_store/message_store.ts new file mode 100644 index 00000000000..81a83e21560 --- /dev/null +++ b/yarn-project/archiver/src/archiver/kv_archiver_store/message_store.ts @@ -0,0 +1,170 @@ +import { L1ToL2Message } from '@aztec/circuit-types'; +import { Fr } from '@aztec/circuits.js'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { AztecCounter, AztecKVStore, AztecMap, AztecSingleton } from '@aztec/kv-store'; + +/** + * A message stored in the database + */ +type Message = { + /** The L1ToL2Message */ + message: Buffer; + /** The message's fee */ + fee: number; + /** Has it _ever_ been confirmed? */ + confirmed: boolean; +}; + +/** + * LMDB implementation of the ArchiverDataStore interface. + */ +export class MessageStore { + #messages: AztecMap; + #pendingMessagesByFee: AztecCounter<[number, string]>; + #lastL1BlockAddingMessages: AztecSingleton; + #lastL1BlockCancellingMessages: AztecSingleton; + + #log = createDebugLogger('aztec:archiver:message_store'); + + constructor(private db: AztecKVStore) { + this.#messages = db.createMap('archiver_l1_to_l2_messages'); + this.#pendingMessagesByFee = db.createCounter('archiver_messages_by_fee'); + this.#lastL1BlockAddingMessages = db.createSingleton('archiver_last_l1_block_adding_messages'); + this.#lastL1BlockCancellingMessages = db.createSingleton('archiver_last_l1_block_cancelling_messages'); + } + + /** + * Gets the last L1 block number that emitted new messages and the block that cancelled messages. + * @returns The last L1 block number processed + */ + getL1BlockNumber() { + return { + addedMessages: this.#lastL1BlockAddingMessages.get() ?? 0n, + cancelledMessages: this.#lastL1BlockCancellingMessages.get() ?? 0n, + }; + } + + /** + * Append new pending L1 to L2 messages to the store. + * @param messages - The L1 to L2 messages to be added to the store. + * @param l1BlockNumber - The L1 block number for which to add the messages. + * @returns True if the operation is successful. + */ + addPendingMessages(messages: L1ToL2Message[], l1BlockNumber: bigint): Promise { + return this.db.transaction(() => { + const lastL1BlockNumber = this.#lastL1BlockAddingMessages.get() ?? 0n; + if (lastL1BlockNumber >= l1BlockNumber) { + return false; + } + + void this.#lastL1BlockAddingMessages.set(l1BlockNumber); + + for (const message of messages) { + const messageKey = message.entryKey?.toString(); + if (!messageKey) { + throw new Error('Message does not have an entry key'); + } + + void this.#messages.setIfNotExists(messageKey, { + message: message.toBuffer(), + fee: message.fee, + confirmed: false, + }); + + void this.#pendingMessagesByFee.update([message.fee, messageKey], 1); + } + + return true; + }); + } + + /** + * Remove pending L1 to L2 messages from the store (if they were cancelled). + * @param messageKeys - The message keys to be removed from the store. + * @param l1BlockNumber - The L1 block number for which to remove the messages. + * @returns True if the operation is successful. + */ + cancelPendingMessages(messageKeys: Fr[], l1BlockNumber: bigint): Promise { + return this.db.transaction(() => { + const lastL1BlockNumber = this.#lastL1BlockCancellingMessages.get() ?? 0n; + if (lastL1BlockNumber >= l1BlockNumber) { + return false; + } + + void this.#lastL1BlockCancellingMessages.set(l1BlockNumber); + + for (const messageKey of messageKeys) { + const messageCtx = this.#messages.get(messageKey.toString()); + if (!messageCtx) { + throw new Error(`Message ${messageKey.toString()} not found`); + } + + void this.#pendingMessagesByFee.update([messageCtx.fee, messageKey.toString()], -1); + } + + return true; + }); + } + + /** + * Messages that have been published in an L2 block are confirmed. + * Add them to the confirmed store, also remove them from the pending store. + * @param messageKeys - The message keys to be removed from the store. + * @returns True if the operation is successful. + */ + confirmPendingMessages(messageKeys: Fr[]): Promise { + return this.db.transaction(() => { + for (const messageKey of messageKeys) { + const messageCtx = this.#messages.get(messageKey.toString()); + if (!messageCtx) { + throw new Error(`Message ${messageKey.toString()} not found`); + } + messageCtx.confirmed = true; + + void this.#messages.set(messageKey.toString(), messageCtx); + void this.#pendingMessagesByFee.update([messageCtx.fee, messageKey.toString()], -1); + } + + return true; + }); + } + + /** + * Gets the confirmed L1 to L2 message corresponding to the given message key. + * @param messageKey - The message key to look up. + * @returns The requested L1 to L2 message or throws if not found. + */ + getConfirmedMessage(messageKey: Fr): L1ToL2Message { + const messageCtx = this.#messages.get(messageKey.toString()); + if (!messageCtx) { + throw new Error(`Message ${messageKey.toString()} not found`); + } + + if (!messageCtx.confirmed) { + throw new Error(`Message ${messageKey.toString()} not confirmed`); + } + + return L1ToL2Message.fromBuffer(messageCtx.message); + } + + /** + * Gets up to `limit` amount of pending L1 to L2 messages, sorted by fee + * @param limit - The number of messages to return (by default NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP). + * @returns The requested L1 to L2 message keys. + */ + getPendingMessageKeysByFee(limit: number): Fr[] { + const messageKeys: Fr[] = []; + + for (const [[_, messageKey], count] of this.#pendingMessagesByFee.entries({ + reverse: true, + })) { + // put `count` copies of this message in the result list + messageKeys.push(...Array(count).fill(Fr.fromString(messageKey))); + if (messageKeys.length >= limit) { + break; + } + } + + return messageKeys; + } +} diff --git a/yarn-project/archiver/src/archiver/lmdb_archiver_store.test.ts b/yarn-project/archiver/src/archiver/lmdb_archiver_store.test.ts deleted file mode 100644 index 7a1ceacb4f1..00000000000 --- a/yarn-project/archiver/src/archiver/lmdb_archiver_store.test.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { open } from 'lmdb'; - -import { describeArchiverDataStore } from './archiver_store_test_suite.js'; -import { LMDBArchiverStore } from './lmdb_archiver_store.js'; - -describe('LMDB Memory Store', () => { - let archiverStore: LMDBArchiverStore; - - beforeEach(() => { - archiverStore = new LMDBArchiverStore(open({} as any)); - }); - - describeArchiverDataStore('LMDBArchiverStore', () => archiverStore); -}); diff --git a/yarn-project/archiver/src/archiver/lmdb_archiver_store.ts b/yarn-project/archiver/src/archiver/lmdb_archiver_store.ts deleted file mode 100644 index 6343c7dd0d9..00000000000 --- a/yarn-project/archiver/src/archiver/lmdb_archiver_store.ts +++ /dev/null @@ -1,708 +0,0 @@ -import { - ContractData, - ExtendedContractData, - ExtendedUnencryptedL2Log, - GetUnencryptedLogsResponse, - INITIAL_L2_BLOCK_NUM, - L1ToL2Message, - L2Block, - L2BlockL2Logs, - L2Tx, - LogFilter, - LogId, - LogType, - TxHash, - UnencryptedL2Log, -} from '@aztec/circuit-types'; -import { Fr } from '@aztec/circuits.js'; -import { AztecAddress } from '@aztec/foundation/aztec-address'; -import { createDebugLogger } from '@aztec/foundation/log'; - -import { Database, RangeOptions, RootDatabase } from 'lmdb'; - -import { ArchiverDataStore, ArchiverL1SynchPoint } from './archiver_store.js'; - -/* eslint-disable */ -type L1ToL2MessageAndCount = { - message: Buffer; - pendingCount: number; - confirmedCount: number; -}; - -type BlockIndexValue = [blockNumber: number, index: number]; - -type BlockContext = { - block?: Uint8Array; - blockHash?: Uint8Array; - l1BlockNumber?: bigint; - encryptedLogs?: Uint8Array; - unencryptedLogs?: Uint8Array; - extendedContractData?: Array; -}; - -const L1_BLOCK_ADDED_PENDING_MESSAGE = 'l1BlockAddedPendingMessage'; -const L1_BLOCK_CANCELLED_MESSAGE = 'l1BlockCancelledMessage'; -/* eslint-enable */ - -/** - * LMDB implementation of the ArchiverDataStore interface. - */ -export class LMDBArchiverStore implements ArchiverDataStore { - #tables: { - /** Where block information will be stored */ - blocks: Database; - /** Transactions index */ - txIndex: Database; - /** Contracts index */ - contractIndex: Database; - /** L1 to L2 messages */ - l1ToL2Messages: Database; - /** Which blocks emitted which messages */ - l1ToL2MessagesByBlock: Database; - /** Pending L1 to L2 messages sorted by their fee, in buckets (dupSort=true) */ - pendingMessagesByFee: Database; - }; - - #logsMaxPageSize: number; - - #log = createDebugLogger('aztec:archiver:lmdb'); - - constructor(db: RootDatabase, logsMaxPageSize: number = 1000) { - this.#tables = { - blocks: db.openDB('blocks', { - keyEncoding: 'uint32', - encoding: 'msgpack', - }), - txIndex: db.openDB('tx_index', { - keyEncoding: 'binary', - encoding: 'msgpack', - }), - contractIndex: db.openDB('contract_index', { - keyEncoding: 'binary', - encoding: 'msgpack', - }), - l1ToL2Messages: db.openDB('l1_to_l2_messages', { - keyEncoding: 'binary', - encoding: 'msgpack', - }), - l1ToL2MessagesByBlock: db.openDB('l1_to_l2_message_nonces', { - keyEncoding: 'ordered-binary', - encoding: 'msgpack', - }), - pendingMessagesByFee: db.openDB('pending_messages_by_fee', { - keyEncoding: 'ordered-binary', - encoding: 'binary', - dupSort: true, - }), - }; - - this.#logsMaxPageSize = logsMaxPageSize; - } - - public async close() { - await Promise.all(Object.values(this.#tables).map(table => table.close())); - } - - /** - * Append new blocks to the store's list. - * @param blocks - The L2 blocks to be added to the store. - * @returns True if the operation is successful. - */ - addBlocks(blocks: L2Block[]): Promise { - // LMDB transactions are shared across databases, so we can use a single transaction for all the writes - // https://github.com/kriszyp/lmdb-js/blob/67505a979ab63187953355a88747a7ad703d50b6/README.md#dbopendbdatabase-stringnamestring - return this.#tables.blocks.transaction(() => { - for (const block of blocks) { - const blockCtx = this.#tables.blocks.get(block.number) ?? {}; - blockCtx.block = block.toBuffer(); - blockCtx.l1BlockNumber = block.getL1BlockNumber(); - blockCtx.blockHash = block.getBlockHash(); - - // no need to await, all writes are enqueued in the transaction - // awaiting would interrupt the execution flow of this callback and "leak" the transaction to some other part - // of the system and any writes would then be part of our transaction here - void this.#tables.blocks.put(block.number, blockCtx); - - for (const [i, tx] of block.getTxs().entries()) { - if (tx.txHash.isZero()) { - continue; - } - void this.#tables.txIndex.put(tx.txHash.buffer, [block.number, i]); - } - - for (const [i, contractData] of block.newContractData.entries()) { - if (contractData.contractAddress.isZero()) { - continue; - } - - void this.#tables.contractIndex.put(contractData.contractAddress.toBuffer(), [block.number, i]); - } - } - - return true; - }); - } - - /** - * Gets up to `limit` amount of L2 blocks starting from `from`. - * @param start - Number of the first block to return (inclusive). - * @param limit - The number of blocks to return. - * @returns The requested L2 blocks. - */ - getBlocks(start: number, limit: number): Promise { - try { - const blocks = this.#tables.blocks - .getRange(this.#computeBlockRange(start, limit)) - .filter(({ value }) => value.block) - .map(({ value }) => { - const block = L2Block.fromBuffer( - asBuffer(value.block!), - value.blockHash ? asBuffer(value.blockHash) : undefined, - ); - return block; - }).asArray; - - return Promise.resolve(blocks); - } catch (err) { - // this function is sync so if any errors are thrown we need to make sure they're passed on as rejected Promises - return Promise.reject(err); - } - } - - /** - * Gets an l2 tx. - * @param txHash - The txHash of the l2 tx. - * @returns The requested L2 tx. - */ - getL2Tx(txHash: TxHash): Promise { - const [blockNumber, txIndex] = this.#tables.txIndex.get(txHash.buffer) ?? []; - if (typeof blockNumber !== 'number' || typeof txIndex !== 'number') { - return Promise.resolve(undefined); - } - - const block = this.#getBlock(blockNumber); - return Promise.resolve(block?.getTx(txIndex)); - } - - /** - * Append new logs to the store's list. - * @param encryptedLogs - The logs to be added to the store. - * @param unencryptedLogs - The type of the logs to be added to the store. - * @param blockNumber - The block for which to add the logs. - * @returns True if the operation is successful. - */ - addLogs( - encryptedLogs: L2BlockL2Logs | undefined, - unencryptedLogs: L2BlockL2Logs | undefined, - blockNumber: number, - ): Promise { - return this.#tables.blocks.transaction(() => { - const blockCtx = this.#tables.blocks.get(blockNumber) ?? {}; - - if (encryptedLogs) { - blockCtx.encryptedLogs = encryptedLogs.toBuffer(); - } - - if (unencryptedLogs) { - blockCtx.unencryptedLogs = unencryptedLogs.toBuffer(); - } - - void this.#tables.blocks.put(blockNumber, blockCtx); - return true; - }); - } - - /** - * Append new pending L1 to L2 messages to the store. - * @param messages - The L1 to L2 messages to be added to the store. - * @param l1BlockNumber - The L1 block number for which to add the messages. - * @returns True if the operation is successful. - */ - addPendingL1ToL2Messages(messages: L1ToL2Message[], l1BlockNumber: bigint): Promise { - return this.#tables.l1ToL2Messages.transaction(() => { - if ((this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_ADDED_PENDING_MESSAGE) ?? 0n) >= l1BlockNumber) { - return false; - } - // ensure we don't add the same messages twice - void this.#tables.l1ToL2MessagesByBlock.put(L1_BLOCK_ADDED_PENDING_MESSAGE, l1BlockNumber); - - for (const message of messages) { - const messageKey = message.entryKey?.toBuffer(); - if (!messageKey) { - throw new Error('Message does not have an entry key'); - } - - let messageCtx = this.#tables.l1ToL2Messages.get(messageKey); - if (!messageCtx) { - messageCtx = { - message: message.toBuffer(), - pendingCount: 0, - confirmedCount: 0, - }; - void this.#tables.l1ToL2Messages.put(messageKey, messageCtx); - } - - this.#updateMessageCountInTx(messageKey, message, 1, 0); - } - - return true; - }); - } - - /** - * Remove pending L1 to L2 messages from the store (if they were cancelled). - * @param cancelledMessages - The message keys to be removed from the store. - * @param l1BlockNumber - The L1 block number for which to remove the messages. - * @returns True if the operation is successful. - */ - cancelPendingL1ToL2Messages(cancelledMessages: Fr[], l1BlockNumber: bigint): Promise { - return this.#tables.l1ToL2Messages.transaction(() => { - if ((this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_CANCELLED_MESSAGE) ?? 0n) >= l1BlockNumber) { - return false; - } - void this.#tables.l1ToL2MessagesByBlock.put(L1_BLOCK_CANCELLED_MESSAGE, l1BlockNumber); - - for (const messageKey of cancelledMessages) { - const message = this.#getL1ToL2Message(messageKey.toBuffer()); - if (!message) { - continue; - } - this.#updateMessageCountInTx(messageKey.toBuffer(), message, -1, 0); - } - - return true; - }); - } - - /** - * Messages that have been published in an L2 block are confirmed. - * Add them to the confirmed store, also remove them from the pending store. - * @param entryKeys - The message keys to be removed from the store. - * @returns True if the operation is successful. - */ - confirmL1ToL2Messages(entryKeys: Fr[]): Promise { - return this.#tables.l1ToL2Messages.transaction(() => { - for (const entryKey of entryKeys) { - const messageKey = entryKey.toBuffer(); - const message = this.#getL1ToL2Message(messageKey); - this.#updateMessageCountInTx(messageKey, message, -1, 1); - } - return true; - }); - } - - /** - * Gets up to `limit` amount of pending L1 to L2 messages, sorted by fee - * @param limit - The number of messages to return (by default NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP). - * @returns The requested L1 to L2 message keys. - */ - getPendingL1ToL2MessageKeys(limit: number): Promise { - // start a read transaction in order to have a consistent view of the data - // this is all sync code, but better to be safe in case it changes in the future - // or we end up having multiple processes touching the same db - const transaction = this.#tables.pendingMessagesByFee.useReadTransaction(); - - try { - // get all the keys, in reverse order - const fees = this.#tables.pendingMessagesByFee.getKeys({ reverse: true, transaction }); - const messages: Fr[] = []; - - loopOverFees: for (const fee of fees) { - const pendingMessages = this.#tables.pendingMessagesByFee.getValues(fee, { transaction }); - this.#log(`Found pending messages for ${fee}`); - - for (const messageKey of pendingMessages) { - const messageWithCount = this.#tables.l1ToL2Messages.get(messageKey, { transaction }); - if (!messageWithCount || messageWithCount.pendingCount === 0) { - this.#log( - `Message ${messageKey.toString( - 'hex', - )} has no pending count but it got picked up by getPEndingL1ToL2MessageKeys`, - ); - continue; - } - const toAdd = Array(messageWithCount.pendingCount).fill(Fr.fromBuffer(messageKey)); - this.#log(`Adding ${toAdd.length} copies of ${messageKey.toString('hex')} for ${fee}`); - messages.push(...toAdd); - - if (messages.length >= limit) { - break loopOverFees; - } - } - } - - return Promise.resolve(messages); - } catch (err) { - return Promise.reject(err); - } finally { - transaction.done(); - } - } - - /** - * Gets the confirmed L1 to L2 message corresponding to the given message key. - * @param messageKey - The message key to look up. - * @returns The requested L1 to L2 message or throws if not found. - */ - getConfirmedL1ToL2Message(messageKey: Fr): Promise { - const value = this.#tables.l1ToL2Messages.get(messageKey.toBuffer()); - if (!value) { - return Promise.reject(new Error(`Message with key ${messageKey} not found`)); - } - - if (value.confirmedCount === 0) { - return Promise.reject(new Error(`Message with key ${messageKey} not confirmed`)); - } - - return Promise.resolve(L1ToL2Message.fromBuffer(value.message)); - } - - /** - * Gets up to `limit` amount of logs starting from `from`. - * @param start - Number of the L2 block to which corresponds the first logs to be returned. - * @param limit - The number of logs to return. - * @param logType - Specifies whether to return encrypted or unencrypted logs. - * @returns The requested logs. - */ - getLogs(start: number, limit: number, logType: LogType): Promise { - try { - const blockCtxKey = logType === LogType.ENCRYPTED ? 'encryptedLogs' : 'unencryptedLogs'; - const logs = this.#tables.blocks - .getRange(this.#computeBlockRange(start, limit)) - .map(({ value: { [blockCtxKey]: logs } }) => - logs ? L2BlockL2Logs.fromBuffer(asBuffer(logs)) : new L2BlockL2Logs([]), - ).asArray; - - return Promise.resolve(logs); - } catch (err) { - return Promise.reject(err); - } - } - - /** - * Gets unencrypted logs based on the provided filter. - * @param filter - The filter to apply to the logs. - * @returns The requested logs. - */ - getUnencryptedLogs(filter: LogFilter): Promise { - try { - if (filter.afterLog) { - return Promise.resolve(this.#filterUnencryptedLogsBetweenBlocks(filter)); - } else if (filter.txHash) { - return Promise.resolve(this.#filterUnencryptedLogsOfTx(filter)); - } else { - return Promise.resolve(this.#filterUnencryptedLogsBetweenBlocks(filter)); - } - } catch (err) { - return Promise.reject(err); - } - } - - #filterUnencryptedLogsOfTx(filter: LogFilter): GetUnencryptedLogsResponse { - if (!filter.txHash) { - throw new Error('Missing txHash'); - } - - const [blockNumber, txIndex] = this.#tables.txIndex.get(filter.txHash.buffer) ?? []; - if (typeof blockNumber !== 'number' || typeof txIndex !== 'number') { - return { logs: [], maxLogsHit: false }; - } - - const unencryptedLogsInBlock = this.#getBlockLogs(blockNumber, LogType.UNENCRYPTED); - const txLogs = unencryptedLogsInBlock.txLogs[txIndex].unrollLogs().map(log => UnencryptedL2Log.fromBuffer(log)); - - const logs: ExtendedUnencryptedL2Log[] = []; - const maxLogsHit = this.#accumulateLogs(logs, blockNumber, txIndex, txLogs, filter); - - return { logs, maxLogsHit }; - } - - #filterUnencryptedLogsBetweenBlocks(filter: LogFilter): GetUnencryptedLogsResponse { - const start = - filter.afterLog?.blockNumber ?? Math.max(filter.fromBlock ?? INITIAL_L2_BLOCK_NUM, INITIAL_L2_BLOCK_NUM); - const end = filter.toBlock; - - if (typeof end === 'number' && end < start) { - return { - logs: [], - maxLogsHit: true, - }; - } - - const logs: ExtendedUnencryptedL2Log[] = []; - - const blockNumbers = this.#tables.blocks.getKeys({ start, end, snapshot: false }); - let maxLogsHit = false; - - loopOverBlocks: for (const blockNumber of blockNumbers) { - const unencryptedLogsInBlock = this.#getBlockLogs(blockNumber, LogType.UNENCRYPTED); - for (let txIndex = filter.afterLog?.txIndex ?? 0; txIndex < unencryptedLogsInBlock.txLogs.length; txIndex++) { - const txLogs = unencryptedLogsInBlock.txLogs[txIndex].unrollLogs().map(log => UnencryptedL2Log.fromBuffer(log)); - maxLogsHit = this.#accumulateLogs(logs, blockNumber, txIndex, txLogs, filter); - if (maxLogsHit) { - break loopOverBlocks; - } - } - } - - return { logs, maxLogsHit }; - } - - #accumulateLogs( - results: ExtendedUnencryptedL2Log[], - blockNumber: number, - txIndex: number, - txLogs: UnencryptedL2Log[], - filter: LogFilter, - ): boolean { - let maxLogsHit = false; - let logIndex = typeof filter.afterLog?.logIndex === 'number' ? filter.afterLog.logIndex + 1 : 0; - for (; logIndex < txLogs.length; logIndex++) { - const log = txLogs[logIndex]; - if (filter.contractAddress && !log.contractAddress.equals(filter.contractAddress)) { - continue; - } - - if (filter.selector && !log.selector.equals(filter.selector)) { - continue; - } - - results.push(new ExtendedUnencryptedL2Log(new LogId(blockNumber, txIndex, logIndex), log)); - if (results.length >= this.#logsMaxPageSize) { - maxLogsHit = true; - break; - } - } - - return maxLogsHit; - } - - /** - * Add new extended contract data from an L2 block to the store's list. - * @param data - List of contracts' data to be added. - * @param blockNum - Number of the L2 block the contract data was deployed in. - * @returns True if the operation is successful. - */ - addExtendedContractData(data: ExtendedContractData[], blockNum: number): Promise { - return this.#tables.blocks.transaction(() => { - const blockCtx = this.#tables.blocks.get(blockNum) ?? {}; - if (!blockCtx.extendedContractData) { - blockCtx.extendedContractData = []; - } - this.#log(`Adding ${data.length} extended contract data to block ${blockNum}`); - blockCtx.extendedContractData.push(...data.map(data => data.toBuffer())); - void this.#tables.blocks.put(blockNum, blockCtx); - - return true; - }); - } - - /** - * Get the extended contract data for this contract. - * @param contractAddress - The contract data address. - * @returns The extended contract data or undefined if not found. - */ - getExtendedContractData(contractAddress: AztecAddress): Promise { - const [blockNumber, _] = this.#tables.contractIndex.get(contractAddress.toBuffer()) ?? []; - - if (typeof blockNumber !== 'number') { - return Promise.resolve(undefined); - } - - const blockCtx = this.#tables.blocks.get(blockNumber); - if (!blockCtx) { - return Promise.resolve(undefined); - } - - for (const data of blockCtx.extendedContractData ?? []) { - const extendedContractData = ExtendedContractData.fromBuffer(asBuffer(data)); - if (extendedContractData.contractData.contractAddress.equals(contractAddress)) { - return Promise.resolve(extendedContractData); - } - } - - return Promise.resolve(undefined); - } - - /** - * Lookup all extended contract data in an L2 block. - * @param blockNum - The block number to get all contract data from. - * @returns All extended contract data in the block (if found). - */ - getExtendedContractDataInBlock(blockNum: number): Promise { - const blockCtx = this.#tables.blocks.get(blockNum); - if (!blockCtx || !blockCtx.extendedContractData) { - return Promise.resolve([]); - } - - return Promise.resolve(blockCtx.extendedContractData.map(data => ExtendedContractData.fromBuffer(asBuffer(data)))); - } - - /** - * Get basic info for an L2 contract. - * Contains contract address & the ethereum portal address. - * @param contractAddress - The contract data address. - * @returns ContractData with the portal address (if we didn't throw an error). - */ - getContractData(contractAddress: AztecAddress): Promise { - const [blockNumber, index] = this.#tables.contractIndex.get(contractAddress.toBuffer()) ?? []; - if (typeof blockNumber !== 'number' || typeof index !== 'number') { - return Promise.resolve(undefined); - } - - const block = this.#getBlock(blockNumber); - return Promise.resolve(block?.newContractData[index]); - } - - /** - * Get basic info for an all L2 contracts deployed in a block. - * Contains contract address & the ethereum portal address. - * @param blockNumber - Number of the L2 block where contracts were deployed. - * @returns ContractData with the portal address (if we didn't throw an error). - */ - getContractDataInBlock(blockNumber: number): Promise { - const block = this.#getBlock(blockNumber); - return Promise.resolve(block?.newContractData ?? []); - } - - /** - * Gets the number of the latest L2 block processed. - * @returns The number of the latest L2 block processed. - */ - getBlockNumber(): Promise { - // inverse range with no start/end will return the last key - const [lastBlockNumber] = this.#tables.blocks.getKeys({ reverse: true, limit: 1 }).asArray; - return Promise.resolve(typeof lastBlockNumber === 'number' ? lastBlockNumber : INITIAL_L2_BLOCK_NUM - 1); - } - - /** - * Gets the last L1 block number processed by the archiver - */ - getL1BlockNumber(): Promise { - // inverse range with no start/end will return the last value - const [lastL2Block] = this.#tables.blocks.getRange({ reverse: true, limit: 1 }).asArray; - const addedBlock = lastL2Block?.value?.l1BlockNumber ?? 0n; - const addedMessages = this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_ADDED_PENDING_MESSAGE) ?? 0n; - const cancelledMessages = this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_CANCELLED_MESSAGE) ?? 0n; - - return Promise.resolve({ - addedBlock, - addedMessages, - cancelledMessages, - }); - } - - #getBlock(blockNumber: number, withLogs = false): L2Block | undefined { - const blockCtx = this.#tables.blocks.get(blockNumber); - if (!blockCtx || !blockCtx.block) { - return undefined; - } - - const block = L2Block.fromBuffer( - asBuffer(blockCtx.block), - blockCtx.blockHash ? asBuffer(blockCtx.blockHash) : undefined, - ); - - if (withLogs) { - if (blockCtx.encryptedLogs) { - block.attachLogs(L2BlockL2Logs.fromBuffer(asBuffer(blockCtx.encryptedLogs)), LogType.ENCRYPTED); - } - - if (blockCtx.unencryptedLogs) { - block.attachLogs(L2BlockL2Logs.fromBuffer(asBuffer(blockCtx.unencryptedLogs)), LogType.UNENCRYPTED); - } - } - - return block; - } - - #getBlockLogs(blockNumber: number, logType: LogType): L2BlockL2Logs { - const blockCtx = this.#tables.blocks.get(blockNumber); - const logs = blockCtx?.[logType === LogType.ENCRYPTED ? 'encryptedLogs' : 'unencryptedLogs']; - - if (!logs) { - return new L2BlockL2Logs([]); - } - - return L2BlockL2Logs.fromBuffer(asBuffer(logs)); - } - - #computeBlockRange(start: number, limit: number): Required> { - if (limit < 1) { - throw new Error(`Invalid limit: ${limit}`); - } - - if (start < INITIAL_L2_BLOCK_NUM) { - this.#log(`Clamping start block ${start} to ${INITIAL_L2_BLOCK_NUM}`); - start = INITIAL_L2_BLOCK_NUM; - } - - const end = start + limit; - return { start, end }; - } - - #getL1ToL2Message(entryKey: Buffer): L1ToL2Message { - const value = this.#tables.l1ToL2Messages.get(entryKey); - if (!value) { - throw new Error('Unknown message: ' + entryKey.toString()); - } - - return L1ToL2Message.fromBuffer(value.message); - } - - /** - * Atomically updates the pending and confirmed count for a message. - * If both counts are 0 after adding their respective deltas, the message is removed from the store. - * - * Only call this method from inside a _transaction_! - * - * @param messageKey - The message key to update. - * @param message - The message to update. - * @param deltaPendingCount - The amount to add to the pending count. - * @param deltaConfirmedCount - The amount to add to the confirmed count. - */ - #updateMessageCountInTx( - messageKey: Buffer, - message: L1ToL2Message, - deltaPendingCount: number, - deltaConfirmedCount: number, - ): void { - const entry = this.#tables.l1ToL2Messages.getEntry(messageKey); - if (!entry) { - return; - } - - const { value } = entry; - - value.pendingCount = Math.max(0, value.pendingCount + deltaPendingCount); - value.confirmedCount = Math.max(0, value.confirmedCount + deltaConfirmedCount); - - this.#log( - `Updating count of ${messageKey.toString('hex')} to ${value.pendingCount} pending and ${ - value.confirmedCount - } confirmed}`, - ); - - if (value.pendingCount === 0) { - this.#log(`Removing message ${messageKey.toString('hex')} from pending messages group with fee ${message.fee}`); - void this.#tables.pendingMessagesByFee.remove(message.fee, messageKey); - } else if (value.pendingCount > 0) { - this.#log(`Adding message ${messageKey.toString('hex')} to pending message group with fee ${message.fee}`); - void this.#tables.pendingMessagesByFee.put(message.fee, messageKey); - } - - if (value.pendingCount === 0 && value.confirmedCount === 0) { - void this.#tables.l1ToL2Messages.remove(messageKey); - } else { - void this.#tables.l1ToL2Messages.put(messageKey, value); - } - } -} - -/** - * Creates a Buffer viewing the same memory location as the passed array. - * @param arr - A Uint8Array - */ -function asBuffer(arr: Uint8Array | Buffer): Buffer { - return Buffer.isBuffer(arr) ? arr : Buffer.from(arr.buffer, arr.byteOffset, arr.length / arr.BYTES_PER_ELEMENT); -} diff --git a/yarn-project/archiver/tsconfig.json b/yarn-project/archiver/tsconfig.json index 902e0c05490..6a0c794ba6d 100644 --- a/yarn-project/archiver/tsconfig.json +++ b/yarn-project/archiver/tsconfig.json @@ -18,6 +18,9 @@ { "path": "../foundation" }, + { + "path": "../kv-store" + }, { "path": "../l1-artifacts" } diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index 1d24a3e17b4..0c65a9c488d 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -1,4 +1,4 @@ -import { Archiver, LMDBArchiverStore } from '@aztec/archiver'; +import { Archiver, KVArchiverDataStore } from '@aztec/archiver'; import { AztecNode, ContractData, @@ -107,10 +107,10 @@ export class AztecNodeService implements AztecNode { const log = createDebugLogger('aztec:node'); const store = await AztecLmdbStore.create(config.l1Contracts.rollupAddress, config.dataDirectory); - const [nodeDb, worldStateDb] = await openDb(config, log); + const [_, worldStateDb] = await openDb(config, log); // first create and sync the archiver - const archiverStore = new LMDBArchiverStore(nodeDb, config.maxLogs); + const archiverStore = new KVArchiverDataStore(store, config.maxLogs); const archiver = await Archiver.createAndSync(config, archiverStore, true); // we identify the P2P transaction protocol by using the rollup contract address. diff --git a/yarn-project/aztec-nr/aztec/src/state_vars/public_state.nr b/yarn-project/aztec-nr/aztec/src/state_vars/public_state.nr index b24c4a3323e..10970a92c0d 100644 --- a/yarn-project/aztec-nr/aztec/src/state_vars/public_state.nr +++ b/yarn-project/aztec-nr/aztec/src/state_vars/public_state.nr @@ -31,7 +31,7 @@ impl PublicState { // docs:start:public_state_struct_read pub fn read(self) -> T { - assert(self.context.private.is_none(), "Public state writes only supported in public functions"); + assert(self.context.private.is_none(), "Public state reads only supported in public functions"); storage_read(self.storage_slot, self.serialization_methods.deserialize) } // docs:end:public_state_struct_read diff --git a/yarn-project/end-to-end/package.json b/yarn-project/end-to-end/package.json index a75d100da31..88ccca29173 100644 --- a/yarn-project/end-to-end/package.json +++ b/yarn-project/end-to-end/package.json @@ -32,6 +32,7 @@ "@aztec/cli": "workspace:^", "@aztec/ethereum": "workspace:^", "@aztec/foundation": "workspace:^", + "@aztec/kv-store": "workspace:^", "@aztec/l1-artifacts": "workspace:^", "@aztec/merkle-tree": "workspace:^", "@aztec/noir-contracts": "workspace:^", @@ -57,7 +58,6 @@ "koa": "^2.14.2", "koa-static": "^5.0.0", "levelup": "^5.1.1", - "lmdb": "^2.9.1", "lodash.compact": "^3.0.1", "lodash.every": "^4.6.0", "memdown": "^6.1.1", diff --git a/yarn-project/end-to-end/src/integration_archiver_l1_to_l2.test.ts b/yarn-project/end-to-end/src/integration_archiver_l1_to_l2.test.ts index b546356c630..eb57e00eb21 100644 --- a/yarn-project/end-to-end/src/integration_archiver_l1_to_l2.test.ts +++ b/yarn-project/end-to-end/src/integration_archiver_l1_to_l2.test.ts @@ -1,4 +1,4 @@ -import { Archiver, LMDBArchiverStore } from '@aztec/archiver'; +import { Archiver, KVArchiverDataStore } from '@aztec/archiver'; import { AztecNodeConfig } from '@aztec/aztec-node'; import { AztecAddress, @@ -10,9 +10,9 @@ import { Wallet, computeMessageSecretHash, } from '@aztec/aztec.js'; +import { AztecLmdbStore } from '@aztec/kv-store'; import { TokenContract } from '@aztec/noir-contracts/Token'; -import { open } from 'lmdb'; import { Chain, HttpTransport, PublicClient } from 'viem'; import { delay, deployAndInitializeTokenAndBridgeContracts, setNextBlockTimestamp, setup } from './fixtures/utils.js'; @@ -43,7 +43,7 @@ describe('archiver integration with l1 to l2 messages', () => { config.archiverPollingIntervalMS = 100; archiver = await Archiver.createAndSync( { ...config, l1Contracts: deployL1ContractsValues.l1ContractAddresses }, - new LMDBArchiverStore(open({} as any)), + new KVArchiverDataStore(await AztecLmdbStore.create(deployL1ContractsValues.l1ContractAddresses.rollupAddress)), ); const walletClient = deployL1ContractsValues.walletClient; diff --git a/yarn-project/end-to-end/tsconfig.json b/yarn-project/end-to-end/tsconfig.json index 2cd953c345d..e467ab65bf4 100644 --- a/yarn-project/end-to-end/tsconfig.json +++ b/yarn-project/end-to-end/tsconfig.json @@ -33,6 +33,9 @@ { "path": "../foundation" }, + { + "path": "../kv-store" + }, { "path": "../l1-artifacts" }, diff --git a/yarn-project/kv-store/src/index.ts b/yarn-project/kv-store/src/index.ts index 2a71333f9e6..b35a4fb3d53 100644 --- a/yarn-project/kv-store/src/index.ts +++ b/yarn-project/kv-store/src/index.ts @@ -1,5 +1,7 @@ export * from './interfaces/array.js'; export * from './interfaces/map.js'; +export * from './interfaces/counter.js'; export * from './interfaces/singleton.js'; export * from './interfaces/store.js'; export * from './lmdb/store.js'; +export { Range } from './interfaces/common.js'; diff --git a/yarn-project/kv-store/src/interfaces/common.ts b/yarn-project/kv-store/src/interfaces/common.ts new file mode 100644 index 00000000000..c4e0effa8c8 --- /dev/null +++ b/yarn-project/kv-store/src/interfaces/common.ts @@ -0,0 +1,18 @@ +/** + * The key type for use with the kv-store + */ +export type Key = string | number | Array; + +/** + * A range of keys to iterate over. + */ +export type Range = { + /** The key of the first item to include */ + start?: K; + /** The key of the last item to include */ + end?: K; + /** Whether to iterate in reverse */ + reverse?: boolean; + /** The maximum number of items to iterate over */ + limit?: number; +}; diff --git a/yarn-project/kv-store/src/interfaces/counter.ts b/yarn-project/kv-store/src/interfaces/counter.ts new file mode 100644 index 00000000000..0f68626e691 --- /dev/null +++ b/yarn-project/kv-store/src/interfaces/counter.ts @@ -0,0 +1,43 @@ +import { Key, Range } from './common.js'; + +/** + * A map that counts how many times it sees a key. Once 0 is reached, that key is removed from the map. + * Iterating over the map will only return keys that have a count over 0. + * + * Keys are stored in sorted order + */ +export interface AztecCounter { + /** + * Resets the count of the given key to the given value. + * @param key - The key to reset + * @param value - The value to reset the key to + */ + set(key: K, value: number): Promise; + + /** + * Updates the count of the given key by the given delta. This can be used to increment or decrement the count. + * Once a key's count reaches 0, it is removed from the map. + * + * @param key - The key to update + * @param delta - The amount to modify the key by + */ + update(key: K, delta: number): Promise; + + /** + * Gets the current count. + * @param key - The key to get the count of + */ + get(key: K): number; + + /** + * Returns keys in the map in sorted order. Only returns keys that have been seen at least once. + * @param range - The range of keys to iterate over + */ + keys(range: Range): IterableIterator; + + /** + * Returns keys and their counts in the map sorted by the key. Only returns keys that have been seen at least once. + * @param range - The range of keys to iterate over + */ + entries(range: Range): IterableIterator<[K, number]>; +} diff --git a/yarn-project/kv-store/src/interfaces/map.ts b/yarn-project/kv-store/src/interfaces/map.ts index 8de773837b7..0916146a4ab 100644 --- a/yarn-project/kv-store/src/interfaces/map.ts +++ b/yarn-project/kv-store/src/interfaces/map.ts @@ -1,7 +1,9 @@ +import { Key, Range } from './common.js'; + /** * A map backed by a persistent store. */ -export interface AztecMap { +export interface AztecMap { /** * Gets the value at the given key. * @param key - The key to get the value from @@ -22,6 +24,13 @@ export interface AztecMap { */ set(key: K, val: V): Promise; + /** + * Atomically swap the value at the given key + * @param key - The key to swap the value at + * @param fn - The function to swap the value with + */ + swap(key: K, fn: (val: V | undefined) => V): Promise; + /** * Sets the value at the given key if it does not already exist. * @param key - The key to set the value at @@ -36,25 +45,28 @@ export interface AztecMap { delete(key: K): Promise; /** - * Iterates over the map's key-value entries + * Iterates over the map's key-value entries in the key's natural order + * @param range - The range of keys to iterate over */ - entries(): IterableIterator<[K, V]>; + entries(range?: Range): IterableIterator<[K, V]>; /** - * Iterates over the map's values + * Iterates over the map's values in the key's natural order + * @param range - The range of keys to iterate over */ - values(): IterableIterator; + values(range?: Range): IterableIterator; /** - * Iterates over the map's keys + * Iterates over the map's keys in the key's natural order + * @param range - The range of keys to iterate over */ - keys(): IterableIterator; + keys(range?: Range): IterableIterator; } /** * A map backed by a persistent store that can have multiple values for a single key. */ -export interface AztecMultiMap extends AztecMap { +export interface AztecMultiMap extends AztecMap { /** * Gets all the values at the given key. * @param key - The key to get the values from diff --git a/yarn-project/kv-store/src/interfaces/store.ts b/yarn-project/kv-store/src/interfaces/store.ts index d7ccfa3cd29..73a2901387c 100644 --- a/yarn-project/kv-store/src/interfaces/store.ts +++ b/yarn-project/kv-store/src/interfaces/store.ts @@ -1,4 +1,6 @@ import { AztecArray } from './array.js'; +import { Key } from './common.js'; +import { AztecCounter } from './counter.js'; import { AztecMap, AztecMultiMap } from './map.js'; import { AztecSingleton } from './singleton.js'; @@ -32,6 +34,12 @@ export interface AztecKVStore { */ createSingleton(name: string): AztecSingleton; + /** + * Creates a new count map. + * @param name - name of the counter + */ + createCounter(name: string): AztecCounter; + /** * Starts a transaction. All calls to read/write data while in a transaction are queued and executed atomically. * @param callback - The callback to execute in a transaction diff --git a/yarn-project/kv-store/src/lmdb/counter.test.ts b/yarn-project/kv-store/src/lmdb/counter.test.ts new file mode 100644 index 00000000000..fdd1204b15c --- /dev/null +++ b/yarn-project/kv-store/src/lmdb/counter.test.ts @@ -0,0 +1,122 @@ +import { randomBytes } from 'crypto'; +import { Database, open } from 'lmdb'; + +import { LmdbAztecCounter } from './counter.js'; + +describe('LmdbAztecCounter', () => { + let db: Database; + + beforeEach(() => { + db = open({} as any); + }); + + describe.each([ + ['floating point number', () => Math.random()], + ['integers', () => (Math.random() * 1000) | 0], + ['strings', () => randomBytes(8).toString('hex')], + ['strings', () => [Math.random(), randomBytes(8).toString('hex')]], + ])('counts occurrences of %s values', (_, genKey) => { + let counter: LmdbAztecCounter>; + beforeEach(() => { + counter = new LmdbAztecCounter(db, 'test'); + }); + + it('returns 0 for unknown keys', () => { + expect(counter.get(genKey())).toEqual(0); + }); + + it('increments values', async () => { + const key = genKey(); + await counter.update(key, 1); + + expect(counter.get(key)).toEqual(1); + }); + + it('decrements values', async () => { + const key = genKey(); + await counter.update(key, 1); + await counter.update(key, -1); + + expect(counter.get(key)).toEqual(0); + }); + + it('throws when decrementing below zero', async () => { + const key = genKey(); + await counter.update(key, 1); + + await expect(counter.update(key, -2)).rejects.toThrow(); + }); + + it('increments values by a delta', async () => { + const key = genKey(); + await counter.update(key, 1); + await counter.update(key, 2); + + expect(counter.get(key)).toEqual(3); + }); + + it('resets the counter', async () => { + const key = genKey(); + await counter.update(key, 1); + await counter.update(key, 2); + await counter.set(key, 0); + + expect(counter.get(key)).toEqual(0); + }); + + it('iterates over entries', async () => { + const key = genKey(); + await counter.update(key, 1); + await counter.update(key, 2); + + expect([...counter.entries()]).toEqual([[key, 3]]); + }); + }); + + it.each([ + [ + [ + ['c', 2342], + ['a', 8], + ['b', 1], + ], + [ + ['a', 8], + ['b', 1], + ['c', 2342], + ], + ], + [ + [ + [10, 2], + [18, 1], + [1, 2], + ], + [ + [1, 2], + [10, 2], + [18, 1], + ], + ], + [ + [ + [[10, 'a'], 1], + [[10, 'c'], 2], + [[11, 'b'], 1], + [[9, 'f'], 1], + [[10, 'b'], 1], + ], + [ + [[9, 'f'], 1], + [[10, 'a'], 1], + [[10, 'b'], 1], + [[10, 'c'], 2], + [[11, 'b'], 1], + ], + ], + ])('iterates in key order', async (insertOrder, expectedOrder) => { + const counter = new LmdbAztecCounter(db, 'test'); + await Promise.all(insertOrder.map(([key, value]) => counter.update(key, value as number))); + expect([...counter.entries()]).toEqual(expectedOrder); + }); +}); diff --git a/yarn-project/kv-store/src/lmdb/counter.ts b/yarn-project/kv-store/src/lmdb/counter.ts new file mode 100644 index 00000000000..74886e89dbf --- /dev/null +++ b/yarn-project/kv-store/src/lmdb/counter.ts @@ -0,0 +1,57 @@ +import { Key as BaseKey, Database } from 'lmdb'; + +import { Key, Range } from '../interfaces/common.js'; +import { AztecCounter } from '../interfaces/counter.js'; +import { LmdbAztecMap } from './map.js'; + +/** + * A counter implementation backed by LMDB + */ +export class LmdbAztecCounter implements AztecCounter { + #db: Database; + #name: string; + #map: LmdbAztecMap; + + constructor(db: Database, name: string) { + this.#db = db; + this.#name = name; + this.#map = new LmdbAztecMap(db, name); + } + + set(key: K, value: number): Promise { + return this.#map.set(key, value); + } + + update(key: K, delta = 1): Promise { + return this.#db.childTransaction(() => { + const current = this.#map.get(key) ?? 0; + const next = current + delta; + + if (next < 0) { + throw new Error(`Cannot update ${key} in counter ${this.#name} below zero`); + } + + if (next === 0) { + void this.#map.delete(key); + } else { + // store the key inside the entry because LMDB might return an internal representation + // of the key when iterating over the database + void this.#map.set(key, next); + } + + return true; + }); + } + + get(key: K): number { + return this.#map.get(key) ?? 0; + } + + entries(range: Range = {}): IterableIterator<[K, number]> { + return this.#map.entries(range); + } + + keys(range: Range = {}): IterableIterator { + return this.#map.keys(range); + } +} diff --git a/yarn-project/kv-store/src/lmdb/map.test.ts b/yarn-project/kv-store/src/lmdb/map.test.ts index 5319e0a26c3..007b4c4eb8f 100644 --- a/yarn-project/kv-store/src/lmdb/map.test.ts +++ b/yarn-project/kv-store/src/lmdb/map.test.ts @@ -41,26 +41,24 @@ describe('LmdbAztecMap', () => { await map.set('foo', 'bar'); await map.set('baz', 'qux'); - expect([...map.entries()]).toEqual( - expect.arrayContaining([ - ['foo', 'bar'], - ['baz', 'qux'], - ]), - ); + expect([...map.entries()]).toEqual([ + ['baz', 'qux'], + ['foo', 'bar'], + ]); }); it('should be able to iterate over values', async () => { await map.set('foo', 'bar'); - await map.set('baz', 'qux'); + await map.set('baz', 'quux'); - expect([...map.values()]).toEqual(expect.arrayContaining(['bar', 'qux'])); + expect([...map.values()]).toEqual(['quux', 'bar']); }); it('should be able to iterate over keys', async () => { await map.set('foo', 'bar'); await map.set('baz', 'qux'); - expect([...map.keys()]).toEqual(expect.arrayContaining(['foo', 'baz'])); + expect([...map.keys()]).toEqual(['baz', 'foo']); }); it('should be able to get multiple values for a single key', async () => { @@ -69,4 +67,33 @@ describe('LmdbAztecMap', () => { expect([...map.getValues('foo')]).toEqual(['bar', 'baz']); }); + + it('supports tuple keys', async () => { + const map = new LmdbAztecMap<[number, string], string>(db, 'test'); + + await map.set([5, 'bar'], 'val'); + await map.set([0, 'foo'], 'val'); + + expect([...map.keys()]).toEqual([ + [0, 'foo'], + [5, 'bar'], + ]); + + expect(map.get([5, 'bar'])).toEqual('val'); + }); + + it('supports range queries', async () => { + await map.set('a', 'a'); + await map.set('b', 'b'); + await map.set('c', 'c'); + await map.set('d', 'd'); + + expect([...map.keys({ start: 'b', end: 'c' })]).toEqual(['b']); + expect([...map.keys({ start: 'b' })]).toEqual(['b', 'c', 'd']); + expect([...map.keys({ end: 'c' })]).toEqual(['a', 'b']); + expect([...map.keys({ start: 'b', end: 'c', reverse: true })]).toEqual(['c']); + expect([...map.keys({ start: 'b', limit: 1 })]).toEqual(['b']); + expect([...map.keys({ start: 'b', reverse: true })]).toEqual(['d', 'c']); + expect([...map.keys({ end: 'b', reverse: true })]).toEqual(['b', 'a']); + }); }); diff --git a/yarn-project/kv-store/src/lmdb/map.ts b/yarn-project/kv-store/src/lmdb/map.ts index b883b809738..6e5fa67ef1e 100644 --- a/yarn-project/kv-store/src/lmdb/map.ts +++ b/yarn-project/kv-store/src/lmdb/map.ts @@ -1,20 +1,30 @@ -import { Database, Key } from 'lmdb'; +import { Database, RangeOptions } from 'lmdb'; +import { Key, Range } from '../interfaces/common.js'; import { AztecMultiMap } from '../interfaces/map.js'; /** The slot where a key-value entry would be stored */ -type MapKeyValueSlot = ['map', string, 'slot', K]; +type MapValueSlot = ['map', string, 'slot', K]; /** * A map backed by LMDB. */ -export class LmdbAztecMap implements AztecMultiMap { - protected db: Database>; +export class LmdbAztecMap implements AztecMultiMap { + protected db: Database<[K, V], MapValueSlot>; protected name: string; - constructor(rootDb: Database, mapName: string) { + #startSentinel: MapValueSlot; + #endSentinel: MapValueSlot; + + constructor(rootDb: Database, mapName: string) { this.name = mapName; - this.db = rootDb as Database>; + this.db = rootDb as Database<[K, V], MapValueSlot>; + + // sentinels are used to define the start and end of the map + // with LMDB's key encoding, no _primitive value_ can be "less than" an empty buffer or greater than Byte 255 + // these will be used later to answer range queries + this.#startSentinel = ['map', this.name, 'slot', Buffer.from([])]; + this.#endSentinel = ['map', this.name, 'slot', Buffer.from([255])]; } close(): Promise { @@ -22,13 +32,13 @@ export class LmdbAztecMap implements AztecMultiMap } get(key: K): V | undefined { - return this.db.get(this.#slot(key)) as V | undefined; + return this.db.get(this.#slot(key))?.[1]; } *getValues(key: K): IterableIterator { const values = this.db.getValues(this.#slot(key)); for (const value of values) { - yield value; + yield value?.[1]; } } @@ -37,13 +47,23 @@ export class LmdbAztecMap implements AztecMultiMap } set(key: K, val: V): Promise { - return this.db.put(this.#slot(key), val); + return this.db.put(this.#slot(key), [key, val]); + } + + swap(key: K, fn: (val: V | undefined) => V): Promise { + return this.db.childTransaction(() => { + const slot = this.#slot(key); + const entry = this.db.get(slot); + void this.db.put(slot, [key, fn(entry?.[1])]); + + return true; + }); } setIfNotExists(key: K, val: V): Promise { const slot = this.#slot(key); return this.db.ifNoExists(slot, () => { - void this.db.put(slot, val); + void this.db.put(slot, [key, val]); }); } @@ -52,37 +72,58 @@ export class LmdbAztecMap implements AztecMultiMap } async deleteValue(key: K, val: V): Promise { - await this.db.remove(this.#slot(key), val); + await this.db.remove(this.#slot(key), [key, val]); } - *entries(): IterableIterator<[K, V]> { - const iterator = this.db.getRange({ - start: ['map', this.name, 'slot'], - }); - - for (const { key, value } of iterator) { - if (key[0] !== 'map' || key[1] !== this.name) { - break; - } - - const originalKey = key[3]; - yield [originalKey, value]; + *entries(range: Range = {}): IterableIterator<[K, V]> { + const { reverse = false, limit } = range; + // LMDB has a quirk where it expects start > end when reverse=true + // in that case, we need to swap the start and end sentinels + const start = reverse + ? range.end + ? this.#slot(range.end) + : this.#endSentinel + : range.start + ? this.#slot(range.start) + : this.#startSentinel; + + const end = reverse + ? range.start + ? this.#slot(range.start) + : this.#startSentinel + : range.end + ? this.#slot(range.end) + : this.#endSentinel; + + const lmdbRange: RangeOptions = { + start, + end, + reverse, + limit, + }; + + const iterator = this.db.getRange(lmdbRange); + + for (const { + value: [key, value], + } of iterator) { + yield [key, value]; } } - *values(): IterableIterator { - for (const [_, value] of this.entries()) { + *values(range: Range = {}): IterableIterator { + for (const [_, value] of this.entries(range)) { yield value; } } - *keys(): IterableIterator { - for (const [key, _] of this.entries()) { + *keys(range: Range = {}): IterableIterator { + for (const [key, _] of this.entries(range)) { yield key; } } - #slot(key: K): MapKeyValueSlot { + #slot(key: K): MapValueSlot { return ['map', this.name, 'slot', key]; } } diff --git a/yarn-project/kv-store/src/lmdb/store.ts b/yarn-project/kv-store/src/lmdb/store.ts index 9ede111a875..8c3cebd0737 100644 --- a/yarn-project/kv-store/src/lmdb/store.ts +++ b/yarn-project/kv-store/src/lmdb/store.ts @@ -4,10 +4,12 @@ import { Logger, createDebugLogger } from '@aztec/foundation/log'; import { Database, Key, RootDatabase, open } from 'lmdb'; import { AztecArray } from '../interfaces/array.js'; +import { AztecCounter } from '../interfaces/counter.js'; import { AztecMap, AztecMultiMap } from '../interfaces/map.js'; import { AztecSingleton } from '../interfaces/singleton.js'; import { AztecKVStore } from '../interfaces/store.js'; import { LmdbAztecArray } from './array.js'; +import { LmdbAztecCounter } from './counter.js'; import { LmdbAztecMap } from './map.js'; import { LmdbAztecSingleton } from './singleton.js'; @@ -88,6 +90,10 @@ export class AztecLmdbStore implements AztecKVStore { return new LmdbAztecMap(this.#multiMapData, name); } + createCounter>(name: string): AztecCounter { + return new LmdbAztecCounter(this.#data, name); + } + /** * Creates a new AztecArray in the store. * @param name - Name of the array diff --git a/yarn-project/yarn.lock b/yarn-project/yarn.lock index d2945982290..5e7bbb29474 100644 --- a/yarn-project/yarn.lock +++ b/yarn-project/yarn.lock @@ -115,6 +115,7 @@ __metadata: "@aztec/circuits.js": "workspace:^" "@aztec/ethereum": "workspace:^" "@aztec/foundation": "workspace:^" + "@aztec/kv-store": "workspace:^" "@aztec/l1-artifacts": "workspace:^" "@jest/globals": ^29.5.0 "@types/debug": ^4.1.7 @@ -399,6 +400,7 @@ __metadata: "@aztec/cli": "workspace:^" "@aztec/ethereum": "workspace:^" "@aztec/foundation": "workspace:^" + "@aztec/kv-store": "workspace:^" "@aztec/l1-artifacts": "workspace:^" "@aztec/merkle-tree": "workspace:^" "@aztec/noir-contracts": "workspace:^" @@ -425,7 +427,6 @@ __metadata: koa: ^2.14.2 koa-static: ^5.0.0 levelup: ^5.1.1 - lmdb: ^2.9.1 lodash.compact: ^3.0.1 lodash.every: ^4.6.0 memdown: ^6.1.1