diff --git a/packages/common/src/LruMap.ts b/packages/common/src/LruMap.ts index 7d0d3b5f30..a1661857b1 100644 --- a/packages/common/src/LruMap.ts +++ b/packages/common/src/LruMap.ts @@ -12,6 +12,15 @@ export class LruMap extends Map { this.maxSize = size; } + override get(key: key): value | undefined { + const value = super.get(key); + if (this.has(key)) { + this.delete(key); + this.set(key, value as never); + } + return value; + } + override set(key: key, value: value): this { super.set(key, value); if (this.maxSize && this.size > this.maxSize) { diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index 121cf0fdd2..a781b78428 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -1,5 +1,5 @@ import { storeEventsAbi } from "@latticexyz/store"; -import { GetTransactionReceiptErrorType, Hex } from "viem"; +import { GetTransactionReceiptErrorType, Hex, parseEventLogs } from "viem"; import { StorageAdapter, StorageAdapterBlock, @@ -10,7 +10,7 @@ import { internalTableIds, WaitForTransactionResult, } from "./common"; -import { createBlockStream } from "@latticexyz/block-logs-stream"; +import { createBlockStream, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream"; import { filter, map, @@ -25,14 +25,17 @@ import { catchError, shareReplay, combineLatest, - scan, mergeMap, + BehaviorSubject, + ignoreElements, + first, } from "rxjs"; import { debug as parentDebug } from "./debug"; import { SyncStep } from "./SyncStep"; import { bigIntMax, chunk, isDefined, waitForIdle } from "@latticexyz/common/utils"; import { getSnapshot } from "./getSnapshot"; import { fetchAndStoreLogs } from "./fetchAndStoreLogs"; +import { LruMap } from "@latticexyz/common"; const debug = parentDebug.extend("createStoreSync"); @@ -199,6 +202,27 @@ export async function createStoreSync({ let endBlock: bigint | null = null; let lastBlockNumberProcessed: bigint | null = null; + // For chains that provide guaranteed receipts ahead of block mining, we can apply the logs immediately. + // This works because, once the block is mined, the same logs will be applied. Store events are defined in + // such a way that reapplying the same logs will mean that the storage adapter + // is kept up to date. + + let optimisticLogs: readonly StoreEventsLog[] = []; + async function applyOptimisticLogs(blockNumber: bigint): Promise { + const logs = optimisticLogs.filter((log) => log.blockNumber > blockNumber); + if (logs.length) { + debug("applying", logs.length, "optimistic logs"); + const blocks = groupLogsByBlockNumber(logs).filter((block) => block.logs.length); + for (const block of blocks) { + debug("applying optimistic logs for block", block.blockNumber); + await storageAdapter(block); + } + } + optimisticLogs = logs; + } + + const storageAdapterLock$ = new BehaviorSubject(false); + const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe( map(([startBlock, endBlock]) => ({ startBlock, endBlock })), tap((range) => { @@ -219,21 +243,41 @@ export async function createStoreSync({ logFilter, }); - return from(storedBlocks); - }), - tap(({ blockNumber, logs }) => { - debug("stored", logs.length, "logs for block", blockNumber); - lastBlockNumberProcessed = blockNumber; + const storedBlock$ = from(storedBlocks).pipe(share()); + return concat( + storageAdapterLock$.pipe( + first((lock) => lock === false), + tap(() => storageAdapterLock$.next(true)), + ignoreElements(), + ), + storedBlock$.pipe( + tap(({ blockNumber, logs }) => { + debug("stored", logs.length, "logs for block", blockNumber); + lastBlockNumberProcessed = blockNumber; + }), + ), + of(true).pipe( + concatMap(async () => { + if (lastBlockNumberProcessed != null) { + await applyOptimisticLogs(lastBlockNumberProcessed); + } + }), + tap(() => storageAdapterLock$.next(false)), + ignoreElements(), + ), + ); + }), + tap(({ blockNumber }) => { if (startBlock != null && endBlock != null) { if (blockNumber < endBlock) { const totalBlocks = endBlock - startBlock; - const processedBlocks = lastBlockNumberProcessed - startBlock; + const processedBlocks = blockNumber - startBlock; onProgress?.({ step: SyncStep.RPC, percentage: Number((processedBlocks * 1000n) / totalBlocks) / 10, latestBlockNumber: endBlock, - lastBlockNumberProcessed, + lastBlockNumberProcessed: blockNumber, message: "Hydrating from RPC", }); } else { @@ -241,7 +285,7 @@ export async function createStoreSync({ step: SyncStep.LIVE, percentage: 100, latestBlockNumber: endBlock, - lastBlockNumberProcessed, + lastBlockNumberProcessed: blockNumber, message: "All caught up!", }); } @@ -250,27 +294,27 @@ export async function createStoreSync({ share(), ); - const storedBlockLogs$ = concat(storedInitialBlockLogs$, storedBlock$).pipe(share()); - // keep 10 blocks worth processed transactions in memory const recentBlocksWindow = 10; - // most recent block first, for ease of pulling the first one off the array - const recentBlocks$ = storedBlockLogs$.pipe( - scan( - (recentBlocks, block) => [block, ...recentBlocks].slice(0, recentBlocksWindow), - [], - ), - filter((recentBlocks) => recentBlocks.length > 0), - shareReplay(1), + const recentBlocks$ = new BehaviorSubject([]); + + const storedBlockLogs$ = concat(storedInitialBlockLogs$, storedBlock$).pipe( + tap((block) => { + // most recent block first, for ease of pulling the first one off the array + recentBlocks$.next([block, ...recentBlocks$.value].slice(0, recentBlocksWindow)); + }), + share(), ); // TODO: move to its own file so we can test it, have its own debug instance, etc. - async function waitForTransaction(tx: Hex): Promise { - debug("waiting for tx", tx); + const waitPromises = new LruMap>(1024); + function waitForTransaction(tx: Hex): Promise { + const existingPromise = waitPromises.get(tx); + if (existingPromise) return existingPromise; // This currently blocks for async call on each block processed // We could potentially speed this up a tiny bit by racing to see if 1) tx exists in processed block or 2) fetch tx receipt for latest block processed - const hasTransaction$ = recentBlocks$.pipe( + const receipt$ = recentBlocks$.pipe( // We use `mergeMap` instead of `concatMap` here to send the fetch request immediately when a new block range appears, // instead of sending the next request only when the previous one completed. mergeMap(async (blocks) => { @@ -284,11 +328,26 @@ export async function createStoreSync({ try { const lastBlock = blocks[0]; - debug("fetching tx receipt for block", lastBlock.blockNumber); - const { status, blockNumber, transactionHash } = await publicClient.getTransactionReceipt({ hash: tx }); - if (lastBlock.blockNumber >= blockNumber) { - return { status, blockNumber, transactionHash }; + debug("fetching tx receipt after seeing block", lastBlock.blockNumber); + const receipt = await publicClient.getTransactionReceipt({ hash: tx }); + debug("got receipt", receipt.status); + if (receipt.status === "success") { + const logs = parseEventLogs({ abi: storeEventsAbi, logs: receipt.logs }); + if (logs.length) { + // wait for lock to clear + // unclear what happens if two waitForTransaction calls are triggered simultaneously and both get released for the same lock emission? + await firstValueFrom(storageAdapterLock$.pipe(filter((lock) => lock === false))); + storageAdapterLock$.next(true); + optimisticLogs = [...optimisticLogs, ...logs]; + await applyOptimisticLogs(lastBlock.blockNumber); + storageAdapterLock$.next(false); + } } + return { + status: receipt.status, + blockNumber: receipt.blockNumber, + transactionHash: receipt.transactionHash, + }; } catch (e) { const error = e as GetTransactionReceiptErrorType; if (error.name === "TransactionReceiptNotFoundError") { @@ -297,9 +356,13 @@ export async function createStoreSync({ throw error; } }), + filter(isDefined), ); - return await firstValueFrom(hasTransaction$.pipe(filter(isDefined))); + debug("waiting for tx", tx); + const promise = firstValueFrom(receipt$); + waitPromises.set(tx, promise); + return promise; } return { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4ad28c4a1d..e70c8d50a1 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -809,10 +809,10 @@ importers: version: 8.3.4 jest: specifier: ^29.3.1 - version: 29.5.0(@types/node@18.15.11) + version: 29.5.0(@types/node@20.12.12) ts-jest: specifier: ^29.0.5 - version: 29.0.5(@babel/core@7.21.4)(@jest/types@29.6.3)(babel-jest@29.5.0(@babel/core@7.21.4))(jest@29.5.0(@types/node@18.15.11))(typescript@5.4.2) + version: 29.0.5(@babel/core@7.25.2)(@jest/types@29.6.3)(babel-jest@29.5.0(@babel/core@7.25.2))(jest@29.5.0(@types/node@20.12.12))(typescript@5.4.2) tsup: specifier: ^6.7.0 version: 6.7.0(postcss@8.4.31)(typescript@5.4.2) @@ -1200,10 +1200,10 @@ importers: version: 27.4.1 jest: specifier: ^29.3.1 - version: 29.5.0(@types/node@20.12.12) + version: 29.5.0(@types/node@18.15.11) ts-jest: specifier: ^29.0.5 - version: 29.0.5(@babel/core@7.25.2)(@jest/types@29.6.3)(babel-jest@29.5.0(@babel/core@7.25.2))(jest@29.5.0(@types/node@20.12.12))(typescript@5.4.2) + version: 29.0.5(@babel/core@7.21.4)(@jest/types@29.6.3)(babel-jest@29.5.0(@babel/core@7.21.4))(jest@29.5.0(@types/node@18.15.11))(typescript@5.4.2) tsup: specifier: ^6.7.0 version: 6.7.0(postcss@8.4.31)(typescript@5.4.2)