diff --git a/yarn-project/archiver/src/archiver/store/archiver_store.ts b/yarn-project/archiver/src/archiver/store/archiver_store.ts new file mode 100644 index 00000000000..083f65dfd0b --- /dev/null +++ b/yarn-project/archiver/src/archiver/store/archiver_store.ts @@ -0,0 +1,225 @@ +import { Fr } from '@aztec/circuits.js'; +import { AztecAddress } from '@aztec/foundation/aztec-address'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { AztecKVStore } from '@aztec/kv-store'; +import { + CancelledL1ToL2Message, + ContractData, + ExtendedContractData, + GetUnencryptedLogsResponse, + L1ToL2Message, + L2Block, + L2BlockL2Logs, + L2Tx, + LogFilter, + LogType, + PendingL1ToL2Message, + TxHash, +} from '@aztec/types'; + +import { ArchiverDataStore } from '../archiver_store.js'; +import { BlockStore } from './block_store.js'; +import { ContractStore } from './contract_store.js'; +import { LogStore } from './log_store.js'; +import { MessageStore } from './message_store.js'; + +/** + * LMDB implementation of the ArchiverDataStore interface. + */ +export class ArchiverStore implements ArchiverDataStore { + #blockStore: BlockStore; + #logStore: LogStore; + #contractStore: ContractStore; + #messageStore: MessageStore; + + #log = createDebugLogger('aztec:archiver:lmdb'); + + constructor(private db: AztecKVStore, logsMaxPageSize: number = 1000) { + this.#blockStore = new BlockStore(db); + this.#logStore = new LogStore(db, this.#blockStore, logsMaxPageSize); + this.#contractStore = new ContractStore(db, this.#blockStore); + this.#messageStore = new MessageStore(db); + } + + /** + * Append new blocks to the store's list. + * @param blocks - The L2 blocks to be added to the store. + * @returns True if the operation is successful. + */ + addBlocks(blocks: L2Block[]): Promise { + return this.#blockStore.addBlocks(blocks); + } + + /** + * Gets up to `limit` amount of L2 blocks starting from `from`. + * @param start - Number of the first block to return (inclusive). + * @param limit - The number of blocks to return. + * @returns The requested L2 blocks. + */ + getBlocks(start: number, limit: number): Promise { + try { + return Promise.resolve(Array.from(this.#blockStore.getBlocks(start, limit))); + } catch (err) { + // this function is sync so if any errors are thrown we need to make sure they're passed on as rejected Promises + return Promise.reject(err); + } + } + + /** + * Gets an l2 tx. + * @param txHash - The txHash of the l2 tx. + * @returns The requested L2 tx. + */ + getL2Tx(txHash: TxHash): Promise { + return Promise.resolve(this.#blockStore.getL2Tx(txHash)); + } + + /** + * Append new logs to the store's list. + * @param encryptedLogs - The logs to be added to the store. + * @param unencryptedLogs - The type of the logs to be added to the store. + * @param blockNumber - The block for which to add the logs. + * @returns True if the operation is successful. + */ + addLogs( + encryptedLogs: L2BlockL2Logs | undefined, + unencryptedLogs: L2BlockL2Logs | undefined, + blockNumber: number, + ): Promise { + return this.#logStore.addLogs(encryptedLogs, unencryptedLogs, blockNumber); + } + + /** + * Append new pending L1 to L2 messages to the store. + * @param messages - The L1 to L2 messages to be added to the store. + * @returns True if the operation is successful. + */ + addPendingL1ToL2Messages(messages: PendingL1ToL2Message[]): Promise { + return this.#messageStore.addPendingL1ToL2Messages(messages); + } + + /** + * Remove pending L1 to L2 messages from the store (if they were cancelled). + * @param messages - The message keys to be removed from the store. + * @returns True if the operation is successful. + */ + cancelPendingL1ToL2Messages(messages: CancelledL1ToL2Message[]): Promise { + return this.#messageStore.cancelPendingL1ToL2Messages(messages); + } + + /** + * Messages that have been published in an L2 block are confirmed. + * Add them to the confirmed store, also remove them from the pending store. + * @param entryKeys - The message keys to be removed from the store. + * @returns True if the operation is successful. + */ + confirmL1ToL2Messages(entryKeys: Fr[]): Promise { + return this.#messageStore.confirmL1ToL2Messages(entryKeys); + } + + /** + * Gets up to `limit` amount of pending L1 to L2 messages, sorted by fee + * @param limit - The number of messages to return (by default NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP). + * @returns The requested L1 to L2 message keys. + */ + getPendingL1ToL2MessageKeys(limit: number): Promise { + return this.#messageStore.getPendingL1ToL2MessageKeys(limit); + } + + /** + * Gets the confirmed L1 to L2 message corresponding to the given message key. + * @param messageKey - The message key to look up. + * @returns The requested L1 to L2 message or throws if not found. + */ + getConfirmedL1ToL2Message(messageKey: Fr): Promise { + return this.#messageStore.getConfirmedL1ToL2Message(messageKey); + } + + /** + * Gets up to `limit` amount of logs starting from `from`. + * @param start - Number of the L2 block to which corresponds the first logs to be returned. + * @param limit - The number of logs to return. + * @param logType - Specifies whether to return encrypted or unencrypted logs. + * @returns The requested logs. + */ + getLogs(start: number, limit: number, logType: LogType): Promise { + try { + return Promise.resolve(Array.from(this.#logStore.getLogs(start, limit, logType))); + } catch (err) { + return Promise.reject(err); + } + } + + /** + * Gets unencrypted logs based on the provided filter. + * @param filter - The filter to apply to the logs. + * @returns The requested logs. + */ + getUnencryptedLogs(filter: LogFilter): Promise { + try { + return Promise.resolve(this.#logStore.getUnencryptedLogs(filter)); + } catch (err) { + return Promise.reject(err); + } + } + + /** + * Add new extended contract data from an L2 block to the store's list. + * @param data - List of contracts' data to be added. + * @param blockNum - Number of the L2 block the contract data was deployed in. + * @returns True if the operation is successful. + */ + addExtendedContractData(data: ExtendedContractData[], blockNum: number): Promise { + return this.#contractStore.addExtendedContractData(data, blockNum); + } + + /** + * Get the extended contract data for this contract. + * @param contractAddress - The contract data address. + * @returns The extended contract data or undefined if not found. + */ + getExtendedContractData(contractAddress: AztecAddress): Promise { + return Promise.resolve(this.#contractStore.getExtendedContractData(contractAddress)); + } + + /** + * Lookup all extended contract data in an L2 block. + * @param blockNumber - The block number to get all contract data from. + * @returns All extended contract data in the block (if found). + */ + getExtendedContractDataInBlock(blockNumber: number): Promise { + return Promise.resolve(Array.from(this.#contractStore.getExtendedContractDataInBlock(blockNumber))); + } + + /** + * Get basic info for an L2 contract. + * Contains contract address & the ethereum portal address. + * @param contractAddress - The contract data address. + * @returns ContractData with the portal address (if we didn't throw an error). + */ + getContractData(contractAddress: AztecAddress): Promise { + return Promise.resolve(this.#contractStore.getContractData(contractAddress)); + } + + /** + * Get basic info for an all L2 contracts deployed in a block. + * Contains contract address & the ethereum portal address. + * @param blockNumber - Number of the L2 block where contracts were deployed. + * @returns ContractData with the portal address (if we didn't throw an error). + */ + getContractDataInBlock(blockNumber: number): Promise { + return Promise.resolve(Array.from(this.#contractStore.getContractDataInBlock(blockNumber))); + } + + /** + * Gets the number of the latest L2 block processed. + * @returns The number of the latest L2 block processed. + */ + getBlockNumber(): Promise { + return Promise.resolve(this.#blockStore.getBlockNumber()); + } + + getL1BlockNumber(): Promise { + return Promise.resolve(this.#blockStore.getL1BlockNumber()); + } +} diff --git a/yarn-project/archiver/src/archiver/store/block_store.ts b/yarn-project/archiver/src/archiver/store/block_store.ts new file mode 100644 index 00000000000..f39fa04d1bf --- /dev/null +++ b/yarn-project/archiver/src/archiver/store/block_store.ts @@ -0,0 +1,152 @@ +import { AztecAddress } from '@aztec/circuits.js'; +import { toBigIntBE, toBufferBE } from '@aztec/foundation/bigint-buffer'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { AztecKVStore, AztecMap, MapRange } from '@aztec/kv-store'; +import { INITIAL_L2_BLOCK_NUM, L2Block, L2Tx, TxHash } from '@aztec/types'; + +/* eslint-disable */ +type BlockIndexValue = [blockNumber: number, index: number]; + +type BlockContext = { + blockNumber: number; + l1BlockNumber: Buffer; + block: Buffer; + blockHash: Buffer; +}; +/* eslint-enable */ + +/** + * LMDB implementation of the ArchiverDataStore interface. + */ +export class BlockStore { + /** Map block number to block data */ + #blocks: AztecMap; + + /** Index mapping transaction hash (as a string) to its location in a block */ + #txIndex: AztecMap; + + /** Index mapping a contract's address (as a string) to its location in a block */ + #contractIndex: AztecMap; + + #log = createDebugLogger('aztec:archiver:block_store'); + + constructor(private db: AztecKVStore) { + this.#blocks = db.createMap('archiver_blocks'); + + this.#txIndex = db.createMap('archiver_tx_index'); + this.#contractIndex = db.createMap('archiver_contract_index'); + } + + /** + * Append new blocks to the store's list. + * @param blocks - The L2 blocks to be added to the store. + * @returns True if the operation is successful. + */ + addBlocks(blocks: L2Block[]): Promise { + return this.db.transaction(() => { + for (const block of blocks) { + void this.#blocks.set(block.number, { + blockNumber: block.number, + block: block.toBuffer(), + l1BlockNumber: toBufferBE(block.getL1BlockNumber(), 32), + blockHash: block.getBlockHash(), + }); + + for (const [i, tx] of block.getTxs().entries()) { + if (tx.txHash.isZero()) { + continue; + } + void this.#txIndex.set(tx.txHash.toString(), [block.number, i]); + } + + for (const [i, contractData] of block.newContractData.entries()) { + if (contractData.contractAddress.isZero()) { + continue; + } + + void this.#contractIndex.set(contractData.contractAddress.toString(), [block.number, i]); + } + } + + return true; + }); + } + + /** + * Gets up to `limit` amount of L2 blocks starting from `from`. + * @param start - Number of the first block to return (inclusive). + * @param limit - The number of blocks to return. + * @returns The requested L2 blocks. + */ + *getBlocks(start: number, limit: number): IterableIterator { + for (const blockCtx of this.#blocks.values(this.#computeBlockRange(start, limit))) { + yield L2Block.fromBuffer(blockCtx.block, blockCtx.blockHash); + } + } + + getBlock(blockNumber: number): L2Block | undefined { + const blockCtx = this.#blocks.get(blockNumber); + if (!blockCtx || !blockCtx.block) { + return undefined; + } + + const block = L2Block.fromBuffer(blockCtx.block, blockCtx.blockHash); + + return block; + } + + /** + * Gets an l2 tx. + * @param txHash - The txHash of the l2 tx. + * @returns The requested L2 tx. + */ + getL2Tx(txHash: TxHash): L2Tx | undefined { + const [blockNumber, txIndex] = this.#txIndex.get(txHash.toString()) ?? []; + if (typeof blockNumber !== 'number' || typeof txIndex !== 'number') { + return undefined; + } + + const block = this.getBlock(blockNumber); + return block?.getTx(txIndex); + } + + getL2TxLocation(txHash: TxHash): [blockNumber: number, txIndex: number] | undefined { + return this.#txIndex.get(txHash.toString()); + } + + getContractLocation(contractAddress: AztecAddress): [blockNumber: number, index: number] | undefined { + return this.#contractIndex.get(contractAddress.toString()); + } + + /** + * Gets the number of the latest L2 block processed. + * @returns The number of the latest L2 block processed. + */ + getBlockNumber(): number { + const [lastBlockNumber] = this.#blocks.keys({ reverse: true, limit: 1 }); + return typeof lastBlockNumber === 'number' ? lastBlockNumber : INITIAL_L2_BLOCK_NUM - 1; + } + + getL1BlockNumber(): bigint { + const [lastBlock] = this.#blocks.values({ reverse: true, limit: 1 }); + if (!lastBlock) { + return 0n; + } else { + return toBigIntBE(lastBlock.l1BlockNumber); + } + } + + #computeBlockRange(start: number, limit: number): Required, 'start' | 'end'>> { + if (limit < 1) { + throw new Error(`Invalid limit: ${limit}`); + } + + if (start < INITIAL_L2_BLOCK_NUM) { + this.#log(`Clamping start block ${start} to ${INITIAL_L2_BLOCK_NUM}`); + start = INITIAL_L2_BLOCK_NUM; + } + + const end = start + limit; + return { start, end }; + } +} diff --git a/yarn-project/archiver/src/archiver/store/contract_store.ts b/yarn-project/archiver/src/archiver/store/contract_store.ts new file mode 100644 index 00000000000..027e90b644f --- /dev/null +++ b/yarn-project/archiver/src/archiver/store/contract_store.ts @@ -0,0 +1,93 @@ +import { AztecAddress } from '@aztec/foundation/aztec-address'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { AztecKVStore, AztecMultiMap } from '@aztec/kv-store'; +import { ContractData, ExtendedContractData } from '@aztec/types'; + +import { BlockStore } from './block_store.js'; + +/** + * LMDB implementation of the ArchiverDataStore interface. + */ +export class ContractStore { + #blockStore: BlockStore; + #extendedContractData: AztecMultiMap; + #log = createDebugLogger('aztec:archiver:contract_store'); + + constructor(private db: AztecKVStore, blockStore: BlockStore) { + this.#extendedContractData = db.createMultiMap('archiver_extended_contract_data'); + this.#blockStore = blockStore; + } + + /** + * Add new extended contract data from an L2 block to the store's list. + * @param data - List of contracts' data to be added. + * @param blockNum - Number of the L2 block the contract data was deployed in. + * @returns True if the operation is successful. + */ + addExtendedContractData(data: ExtendedContractData[], blockNum: number): Promise { + return this.#extendedContractData.setValues( + blockNum, + data.map(d => d.toBuffer()), + ); + } + + /** + * Get the extended contract data for this contract. + * @param contractAddress - The contract data address. + * @returns The extended contract data or undefined if not found. + */ + getExtendedContractData(contractAddress: AztecAddress): ExtendedContractData | undefined { + const [blockNumber, _] = this.#blockStore.getContractLocation(contractAddress) ?? []; + + if (typeof blockNumber !== 'number') { + return undefined; + } + + for (const contract of this.#extendedContractData.getValues(blockNumber)) { + const extendedContractData = ExtendedContractData.fromBuffer(contract); + if (extendedContractData.contractData.contractAddress.equals(contractAddress)) { + return extendedContractData; + } + } + + return undefined; + } + + /** + * Lookup all extended contract data in an L2 block. + * @param blockNumber - The block number to get all contract data from. + * @returns All extended contract data in the block (if found). + */ + *getExtendedContractDataInBlock(blockNumber: number): IterableIterator { + for (const contract of this.#extendedContractData.getValues(blockNumber)) { + yield ExtendedContractData.fromBuffer(contract); + } + } + + /** + * Get basic info for an L2 contract. + * Contains contract address & the ethereum portal address. + * @param contractAddress - The contract data address. + * @returns ContractData with the portal address (if we didn't throw an error). + */ + getContractData(contractAddress: AztecAddress): ContractData | undefined { + const [blockNumber, index] = this.#blockStore.getContractLocation(contractAddress) ?? []; + if (typeof blockNumber !== 'number' || typeof index !== 'number') { + return undefined; + } + + const block = this.#blockStore.getBlock(blockNumber); + return block?.newContractData[index]; + } + + /** + * Get basic info for an all L2 contracts deployed in a block. + * Contains contract address & the ethereum portal address. + * @param blockNumber - Number of the L2 block where contracts were deployed. + * @returns ContractData with the portal address (if we didn't throw an error). + */ + getContractDataInBlock(blockNumber: number): ContractData[] { + const block = this.#blockStore.getBlock(blockNumber); + return block?.newContractData ?? []; + } +} diff --git a/yarn-project/archiver/src/archiver/store/log_store.ts b/yarn-project/archiver/src/archiver/store/log_store.ts new file mode 100644 index 00000000000..5ca82bbfc63 --- /dev/null +++ b/yarn-project/archiver/src/archiver/store/log_store.ts @@ -0,0 +1,174 @@ +import { createDebugLogger } from '@aztec/foundation/log'; +import { AztecKVStore, AztecMap } from '@aztec/kv-store'; +import { + ExtendedUnencryptedL2Log, + GetUnencryptedLogsResponse, + INITIAL_L2_BLOCK_NUM, + L2BlockL2Logs, + LogFilter, + LogId, + LogType, + UnencryptedL2Log, +} from '@aztec/types'; + +import { BlockStore } from './block_store.js'; + +/** + * A store for logs + */ +export class LogStore { + #encryptedLogs: AztecMap; + #unencryptedLogs: AztecMap; + #logsMaxPageSize: number; + #log = createDebugLogger('aztec:archiver:log_store'); + + constructor(private db: AztecKVStore, private blockStore: BlockStore, logsMaxPageSize: number = 1000) { + this.#encryptedLogs = db.createMap('archiver_encrypted_logs'); + this.#unencryptedLogs = db.createMap('archiver_unencrypted_logs'); + + this.#logsMaxPageSize = logsMaxPageSize; + } + + /** + * Append new logs to the store's list. + * @param encryptedLogs - The logs to be added to the store. + * @param unencryptedLogs - The type of the logs to be added to the store. + * @param blockNumber - The block for which to add the logs. + * @returns True if the operation is successful. + */ + addLogs( + encryptedLogs: L2BlockL2Logs | undefined, + unencryptedLogs: L2BlockL2Logs | undefined, + blockNumber: number, + ): Promise { + return this.db.transaction(() => { + if (encryptedLogs) { + void this.#encryptedLogs.set(blockNumber, encryptedLogs.toBuffer()); + } + + if (unencryptedLogs) { + void this.#unencryptedLogs.set(blockNumber, unencryptedLogs.toBuffer()); + } + + return true; + }); + } + + /** + * Gets up to `limit` amount of logs starting from `from`. + * @param start - Number of the L2 block to which corresponds the first logs to be returned. + * @param limit - The number of logs to return. + * @param logType - Specifies whether to return encrypted or unencrypted logs. + * @returns The requested logs. + */ + *getLogs(start: number, limit: number, logType: LogType): IterableIterator { + const logMap = logType === LogType.ENCRYPTED ? this.#encryptedLogs : this.#unencryptedLogs; + for (const buffer of logMap.values({ start, limit })) { + yield L2BlockL2Logs.fromBuffer(buffer); + } + } + + /** + * Gets unencrypted logs based on the provided filter. + * @param filter - The filter to apply to the logs. + * @returns The requested logs. + */ + getUnencryptedLogs(filter: LogFilter): GetUnencryptedLogsResponse { + if (filter.afterLog) { + return this.#filterUnencryptedLogsBetweenBlocks(filter); + } else if (filter.txHash) { + return this.#filterUnencryptedLogsOfTx(filter); + } else { + return this.#filterUnencryptedLogsBetweenBlocks(filter); + } + } + + #filterUnencryptedLogsOfTx(filter: LogFilter): GetUnencryptedLogsResponse { + if (!filter.txHash) { + throw new Error('Missing txHash'); + } + + const [blockNumber, txIndex] = this.blockStore.getL2TxLocation(filter.txHash) ?? []; + if (typeof blockNumber !== 'number' || typeof txIndex !== 'number') { + return { logs: [], maxLogsHit: false }; + } + + const unencryptedLogsInBlock = this.#getBlockLogs(blockNumber, LogType.UNENCRYPTED); + const txLogs = unencryptedLogsInBlock.txLogs[txIndex].unrollLogs().map(log => UnencryptedL2Log.fromBuffer(log)); + + const logs: ExtendedUnencryptedL2Log[] = []; + const maxLogsHit = this.#accumulateLogs(logs, blockNumber, txIndex, txLogs, filter); + + return { logs, maxLogsHit }; + } + + #filterUnencryptedLogsBetweenBlocks(filter: LogFilter): GetUnencryptedLogsResponse { + const start = + filter.afterLog?.blockNumber ?? Math.max(filter.fromBlock ?? INITIAL_L2_BLOCK_NUM, INITIAL_L2_BLOCK_NUM); + const end = filter.toBlock; + + if (typeof end === 'number' && end < start) { + return { + logs: [], + maxLogsHit: true, + }; + } + + const logs: ExtendedUnencryptedL2Log[] = []; + + let maxLogsHit = false; + loopOverBlocks: for (const [blockNumber, logBuffer] of this.#unencryptedLogs.entries({ start, end })) { + const unencryptedLogsInBlock = L2BlockL2Logs.fromBuffer(logBuffer); + for (let txIndex = filter.afterLog?.txIndex ?? 0; txIndex < unencryptedLogsInBlock.txLogs.length; txIndex++) { + const txLogs = unencryptedLogsInBlock.txLogs[txIndex].unrollLogs().map(log => UnencryptedL2Log.fromBuffer(log)); + maxLogsHit = this.#accumulateLogs(logs, blockNumber, txIndex, txLogs, filter); + if (maxLogsHit) { + this.#log(`Max logs hit at block ${blockNumber}`); + break loopOverBlocks; + } + } + } + + return { logs, maxLogsHit }; + } + + #accumulateLogs( + results: ExtendedUnencryptedL2Log[], + blockNumber: number, + txIndex: number, + txLogs: UnencryptedL2Log[], + filter: LogFilter, + ): boolean { + let maxLogsHit = false; + let logIndex = typeof filter.afterLog?.logIndex === 'number' ? filter.afterLog.logIndex + 1 : 0; + for (; logIndex < txLogs.length; logIndex++) { + const log = txLogs[logIndex]; + if (filter.contractAddress && !log.contractAddress.equals(filter.contractAddress)) { + continue; + } + + if (filter.selector && !log.selector.equals(filter.selector)) { + continue; + } + + results.push(new ExtendedUnencryptedL2Log(new LogId(blockNumber, txIndex, logIndex), log)); + if (results.length >= this.#logsMaxPageSize) { + maxLogsHit = true; + break; + } + } + + return maxLogsHit; + } + + #getBlockLogs(blockNumber: number, logType: LogType): L2BlockL2Logs { + const logMap = logType === LogType.ENCRYPTED ? this.#encryptedLogs : this.#unencryptedLogs; + const buffer = logMap.get(blockNumber); + + if (!buffer) { + return new L2BlockL2Logs([]); + } + + return L2BlockL2Logs.fromBuffer(buffer); + } +} diff --git a/yarn-project/archiver/src/archiver/store/message_store.ts b/yarn-project/archiver/src/archiver/store/message_store.ts new file mode 100644 index 00000000000..24b257d481c --- /dev/null +++ b/yarn-project/archiver/src/archiver/store/message_store.ts @@ -0,0 +1,238 @@ +import { Fr } from '@aztec/circuits.js'; +import { toBufferBE } from '@aztec/foundation/bigint-buffer'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { AztecKVStore, AztecMap, AztecMultiMap } from '@aztec/kv-store'; +import { CancelledL1ToL2Message, L1ToL2Message, PendingL1ToL2Message } from '@aztec/types'; + +/* eslint-disable */ +type L1ToL2MessageAndCount = { + message: Buffer; + pendingCount: number; + confirmedCount: number; +}; + +type L1ToL2MessageBlockKey = `${string}:${'newMessage' | 'cancelledMessage'}:${number}`; + +function l1ToL2MessageBlockKey( + l1BlockNumber: bigint, + key: 'newMessage' | 'cancelledMessage', + indexInBlock: number, +): L1ToL2MessageBlockKey { + return `${toBufferBE(l1BlockNumber, 32).toString('hex')}:${key}:${indexInBlock}`; +} + +/* eslint-enable */ + +/** + * LMDB implementation of the ArchiverDataStore interface. + */ +export class MessageStore { + /** L1 to L2 messages */ + #messages: AztecMap; + + #pendingCount: AztecMap; + #confirmedCount: AztecMap; + + /** Pending L1 to L2 messages sorted by their fee, in buckets (dupSort=true) */ + #pendingMessagesByFee: AztecMultiMap; + + /** Map to check if a message has already been processed */ + #duplicateCheck: AztecMap; + + #log = createDebugLogger('aztec:archiver:message_store'); + + constructor(private db: AztecKVStore) { + this.#messages = db.createMap('archiver_l1_to_l2_messages'); + this.#pendingCount = db.createMap('archiver_pending_count'); + this.#confirmedCount = db.createMap('archiver_confirmed_count'); + this.#duplicateCheck = db.createMap('archiver_duplicate_check'); + this.#pendingMessagesByFee = db.createMultiMap('archiver_pending_messages_by_fee'); + } + + /** + * Append new pending L1 to L2 messages to the store. + * @param messages - The L1 to L2 messages to be added to the store. + * @returns True if the operation is successful. + */ + addPendingL1ToL2Messages(messages: PendingL1ToL2Message[]): Promise { + return this.db.transaction(() => { + for (const { message, blockNumber, indexInBlock } of messages) { + const messageKey = message.entryKey?.toBuffer(); + if (!messageKey) { + throw new Error('Message does not have an entry key'); + } + + if (this.#duplicateCheck.get(l1ToL2MessageBlockKey(blockNumber, 'newMessage', indexInBlock))) { + // no-op + continue; + } + + void this.#duplicateCheck.set(l1ToL2MessageBlockKey(blockNumber, 'newMessage', indexInBlock), true); + void this.#messages.set(messageKey.toString(), message.toBuffer()); + + this.#updateMessageCountInTx(messageKey, message, -1, 0); + } + return true; + }); + } + + /** + * Remove pending L1 to L2 messages from the store (if they were cancelled). + * @param messages - The message keys to be removed from the store. + * @returns True if the operation is successful. + */ + cancelPendingL1ToL2Messages(messages: CancelledL1ToL2Message[]): Promise { + return this.db.transaction(() => { + for (const { blockNumber, indexInBlock, entryKey } of messages) { + const messageKey = entryKey.toBuffer(); + const dupeKey = l1ToL2MessageBlockKey(blockNumber, 'cancelledMessage', indexInBlock); + const messageInBlock = this.#duplicateCheck.get(dupeKey); + if (messageInBlock) { + continue; + } + + void this.#duplicateCheck.set(dupeKey, true); + + const message = this.#getL1ToL2Message(messageKey); + this.#updateMessageCountInTx(messageKey, message, -1, 0); + } + return true; + }); + } + + /** + * Messages that have been published in an L2 block are confirmed. + * Add them to the confirmed store, also remove them from the pending store. + * @param _entryKeys - The message keys to be removed from the store. + * @returns True if the operation is successful. + */ + confirmL1ToL2Messages(_entryKeys: Fr[]): Promise { + return this.db.transaction(() => { + // for (const entryKey of entryKeys) { + // const messageKey = entryKey.toBuffer(); + // const message = this.#getL1ToL2Message(messageKey); + // this.#updateMessageCountInTx(messageKey, message, -1, 1); + // } + return true; + }); + } + + /** + * Gets up to `limit` amount of pending L1 to L2 messages, sorted by fee + * @param _limit - The number of messages to return (by default NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP). + * @returns The requested L1 to L2 message keys. + */ + getPendingL1ToL2MessageKeys(_limit: number): Promise { + // start a read transaction in order to have a consistent view of the data + // this is all sync code, but better to be safe in case it changes in the future + // or we end up having multiple processes touching the same db + // const transaction = this.#tables.pendingMessagesByFee.useReadTransaction(); + + // try { + // // get all the keys, in reverse order + // const fees = this.#tables.pendingMessagesByFee.getKeys({ reverse: true, transaction }); + // const messages: Fr[] = []; + + // loopOverFees: for (const fee of fees) { + // const pendingMessages = this.#tables.pendingMessagesByFee.getValues(fee, { transaction }); + // this.#log(`Found pending messages for ${fee}`); + + // for (const messageKey of pendingMessages) { + // const messageWithCount = this.#tables.l1ToL2Messages.get(messageKey, { transaction }); + // if (!messageWithCount || messageWithCount.pendingCount === 0) { + // this.#log( + // `Message ${messageKey.toString( + // 'hex', + // )} has no pending count but it got picked up by getPEndingL1ToL2MessageKeys`, + // ); + // continue; + // } + // const toAdd = Array(messageWithCount.pendingCount).fill(Fr.fromBuffer(messageKey)); + // this.#log(`Adding ${toAdd.length} copies of ${messageKey.toString('hex')} for ${fee}`); + // messages.push(...toAdd); + + // if (messages.length >= limit) { + // break loopOverFees; + // } + // } + // } + + // return Promise.resolve(messages); + // } catch (err) { + // return Promise.reject(err); + // } finally { + // transaction.done(); + // } + return Promise.resolve([]); + } + + /** + * Gets the confirmed L1 to L2 message corresponding to the given message key. + * @param messageKey - The message key to look up. + * @returns The requested L1 to L2 message or throws if not found. + */ + getConfirmedL1ToL2Message(messageKey: Fr): Promise { + const value = this.#messages.get(messageKey.toString()); + if (!value) { + return Promise.reject(new Error(`Message with key ${messageKey} not found`)); + } + + if (value.confirmedCount === 0) { + return Promise.reject(new Error(`Message with key ${messageKey} not confirmed`)); + } + + return Promise.resolve(L1ToL2Message.fromBuffer(value.message)); + } + + #getL1ToL2Message(entryKey: Buffer): L1ToL2Message { + const value = this.#messages.get(entryKey.toString()); + if (!value) { + throw new Error('Unknown message: ' + entryKey.toString()); + } + + return L1ToL2Message.fromBuffer(value.message); + } + + /** + * Atomically updates the pending and confirmed count for a message. + * If both counts are 0 after adding their respective deltas, the message is removed from the store. + * + * Only call this method from inside a _transaction_! + * + * @param messageKey - The message key to update. + * @param message - The message to update. + * @param deltaPendingCount - The amount to add to the pending count. + * @param deltaConfirmedCount - The amount to add to the confirmed count. + */ + #updateMessageCountInTx( + messageKey: Buffer, + message: L1ToL2Message, + deltaPendingCount: number, + deltaConfirmedCount: number, + ): void { + // const entry = this.#l1ToL2Messages.getEntry(messageKey); + // if (!entry) { + // return; + // } + // const { value } = entry; + // value.pendingCount = Math.max(0, value.pendingCount + deltaPendingCount); + // value.confirmedCount = Math.max(0, value.confirmedCount + deltaConfirmedCount); + // this.#log( + // `Updating count of ${messageKey.toString('hex')} to ${value.pendingCount} pending and ${ + // value.confirmedCount + // } confirmed}`, + // ); + // if (value.pendingCount === 0) { + // this.#log(`Removing message ${messageKey.toString('hex')} from pending messages group with fee ${message.fee}`); + // void this.#tables.pendingMessagesByFee.remove(message.fee, messageKey); + // } else if (value.pendingCount > 0) { + // this.#log(`Adding message ${messageKey.toString('hex')} to pending message group with fee ${message.fee}`); + // void this.#tables.pendingMessagesByFee.put(message.fee, messageKey); + // } + // if (value.pendingCount === 0 && value.confirmedCount === 0) { + // void this.#tables.l1ToL2Messages.remove(messageKey); + // } else { + // void this.#tables.l1ToL2Messages.put(messageKey, value); + // } + } +} diff --git a/yarn-project/kv-store/src/lmdb/collection.ts b/yarn-project/kv-store/src/lmdb/collection.ts new file mode 100644 index 00000000000..13ea5904e94 --- /dev/null +++ b/yarn-project/kv-store/src/lmdb/collection.ts @@ -0,0 +1,73 @@ +/* eslint-disable jsdoc/require-jsdoc */ +import { Database, Key } from 'lmdb'; + +type ItemId = number | string; + +/** The shape of a key that stores a value in an array */ +type CollectionKey = [ds: 'collection', name: string, item: 'item', itemId: ItemId | Buffer]; + +type RangeSpec = { + /** The index of the first item to include */ + from?: number; + /** The index of the last item to include */ + to?: number; + reverse?: boolean; + limit?: number; +}; + +export class AztecLmdbCollection { + #db: Database; + #name: string; + + #keyStart: CollectionKey; + #keyEnd: CollectionKey; + + constructor(db: Database, name: string) { + this.#name = name; + this.#db = db as Database; + + // exploit the fact that the keys are sorted lexicographically + // and primitive values go in between an empty Buffer and a Buffer of 255 + this.#keyStart = ['collection', name, 'item', Buffer.alloc(0)]; + this.#keyEnd = ['collection', name, 'item', Buffer.alloc(255)]; + } + + upsert(...values: T[]) { + return this.#db.childTransaction(() => { + for (const val of values) { + void this.#db.put(this.#key(val.id), val); + } + }); + } + + delete(...ids: ItemId[]) { + return this.#db.childTransaction(() => { + for (const id of ids) { + void this.#db.remove(this.#key(id)); + } + }); + } + + get(id: ItemId): T | undefined { + return this.#db.get(this.#key(id)); + } + + *values(range: RangeSpec = {}): IterableIterator { + const { from, to, reverse = false, limit } = range; + + const iterator = this.#db.getRange({ + start: typeof from !== 'undefined' ? this.#key(from) : this.#keyStart, + end: typeof to !== 'undefined' ? this.#key(to) : this.#keyEnd, + reverse, + limit, + }); + + for (const { value } of iterator) { + yield value; + } + } + + #key(itemId: ItemId): CollectionKey { + return ['collection', this.#name, 'item', itemId]; + } +}