diff --git a/packages/store-sync/src/common.ts b/packages/store-sync/src/common.ts index a7873ccbd9..bd5a20aee9 100644 --- a/packages/store-sync/src/common.ts +++ b/packages/store-sync/src/common.ts @@ -109,7 +109,6 @@ export type SyncOptions = { export type SyncResult = { latestBlock$: Observable; latestBlockNumber$: Observable; - blockLogs$: Observable; storedBlockLogs$: Observable; waitForTransaction: (tx: Hex) => Promise; }; diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index 59d83bdf51..5bcffb6f89 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -1,12 +1,19 @@ import { StoreConfig, storeEventsAbi } from "@latticexyz/store"; import { Hex, TransactionReceiptNotFoundError } from "viem"; -import { StorageAdapter, StorageAdapterBlock, SyncFilter, SyncOptions, SyncResult, internalTableIds } from "./common"; -import { createBlockStream, blockRangeToLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream"; +import { + StorageAdapter, + StorageAdapterBlock, + StoreEventsLog, + SyncFilter, + SyncOptions, + SyncResult, + internalTableIds, +} from "./common"; +import { createBlockStream } from "@latticexyz/block-logs-stream"; import { filter, map, tap, - mergeMap, from, concat, concatMap, @@ -24,6 +31,7 @@ import { debug as parentDebug } from "./debug"; import { SyncStep } from "./SyncStep"; import { bigIntMax, chunk, isDefined } from "@latticexyz/common/utils"; import { getSnapshot } from "./getSnapshot"; +import { fetchAndStoreLogs } from "./fetchAndStoreLogs"; const debug = parentDebug.extend("createStoreSync"); @@ -58,6 +66,16 @@ export async function createStoreSync ? [...initialFilters, ...tableIds.map((tableId) => ({ tableId })), ...defaultFilters] : []; + const logFilter = filters.length + ? (log: StoreEventsLog): boolean => + filters.some( + (filter) => + filter.tableId === log.args.tableId && + (filter.key0 == null || filter.key0 === log.args.keyTuple[0]) && + (filter.key1 == null || filter.key1 === log.args.keyTuple[1]) + ) + : undefined; + const initialBlockLogs$ = defer(async (): Promise => { const chainId = publicClient.chain?.id ?? (await publicClient.getChainId()); @@ -164,71 +182,60 @@ export async function createStoreSync let startBlock: bigint | null = null; let endBlock: bigint | null = null; - const blockLogs$ = combineLatest([startBlock$, latestBlockNumber$]).pipe( + let lastBlockNumberProcessed: bigint | null = null; + + const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe( map(([startBlock, endBlock]) => ({ startBlock, endBlock })), tap((range) => { startBlock = range.startBlock; endBlock = range.endBlock; }), - blockRangeToLogs({ - publicClient, - address, - events: storeEventsAbi, - // TODO: pass filters in here so we can filter at RPC level - maxBlockRange, + concatMap((range) => { + const storedBlocks = fetchAndStoreLogs({ + publicClient, + address, + events: storeEventsAbi, + maxBlockRange, + fromBlock: lastBlockNumberProcessed + ? bigIntMax(range.startBlock, lastBlockNumberProcessed + 1n) + : range.startBlock, + toBlock: range.endBlock, + storageAdapter, + logFilter, + }); + + return from(storedBlocks); }), - map(({ toBlock, logs }) => { - if (!filters.length) return { toBlock, logs }; - const filteredLogs = logs.filter((log) => - filters.some( - (filter) => - filter.tableId === log.args.tableId && - (filter.key0 == null || filter.key0 === log.args.keyTuple[0]) && - (filter.key1 == null || filter.key1 === log.args.keyTuple[1]) - ) - ); - return { toBlock, logs: filteredLogs }; + tap(({ blockNumber, logs }) => { + debug("stored", logs.length, "logs for block", blockNumber); + lastBlockNumberProcessed = blockNumber; + + if (startBlock != null && endBlock != null) { + if (blockNumber < endBlock) { + const totalBlocks = endBlock - startBlock; + const processedBlocks = lastBlockNumberProcessed - startBlock; + onProgress?.({ + step: SyncStep.RPC, + percentage: Number((processedBlocks * 1000n) / totalBlocks) / 10, + latestBlockNumber: endBlock, + lastBlockNumberProcessed, + message: "Hydrating from RPC", + }); + } else { + onProgress?.({ + step: SyncStep.LIVE, + percentage: 100, + latestBlockNumber: endBlock, + lastBlockNumberProcessed, + message: "All caught up!", + }); + } + } }), - mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))), share() ); - let lastBlockNumberProcessed: bigint | null = null; - const storedBlockLogs$ = concat( - storedInitialBlockLogs$, - blockLogs$.pipe( - concatMap(async (block) => { - await storageAdapter(block); - return block; - }), - tap(({ blockNumber, logs }) => { - debug("stored", logs.length, "logs for block", blockNumber); - lastBlockNumberProcessed = blockNumber; - - if (startBlock != null && endBlock != null) { - if (blockNumber < endBlock) { - const totalBlocks = endBlock - startBlock; - const processedBlocks = lastBlockNumberProcessed - startBlock; - onProgress?.({ - step: SyncStep.RPC, - percentage: Number((processedBlocks * 1000n) / totalBlocks) / 10, - latestBlockNumber: endBlock, - lastBlockNumberProcessed, - message: "Hydrating from RPC", - }); - } else { - onProgress?.({ - step: SyncStep.LIVE, - percentage: 100, - latestBlockNumber: endBlock, - lastBlockNumberProcessed, - message: "All caught up!", - }); - } - } - }) - ) - ).pipe(share()); + const storedBlockLogs$ = concat(storedInitialBlockLogs$, storedBlock$).pipe(share()); // keep 10 blocks worth processed transactions in memory const recentBlocksWindow = 10; @@ -274,7 +281,6 @@ export async function createStoreSync return { latestBlock$, latestBlockNumber$, - blockLogs$, storedBlockLogs$, waitForTransaction, }; diff --git a/packages/store-sync/src/fetchAndStoreLogs.ts b/packages/store-sync/src/fetchAndStoreLogs.ts new file mode 100644 index 0000000000..76a5f734e5 --- /dev/null +++ b/packages/store-sync/src/fetchAndStoreLogs.ts @@ -0,0 +1,22 @@ +import { FetchLogsOptions, fetchLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream"; +import { StoreEventsAbi } from "@latticexyz/store"; +import { StorageAdapter, StorageAdapterBlock, StoreEventsLog } from "./common"; + +type FetchAndStoreLogsOptions = FetchLogsOptions & { + storageAdapter: StorageAdapter; + logFilter?: (log: StoreEventsLog) => boolean; +}; + +export async function* fetchAndStoreLogs({ + storageAdapter, + logFilter, + ...fetchLogsOptions +}: FetchAndStoreLogsOptions): AsyncGenerator { + for await (const { logs, toBlock } of fetchLogs(fetchLogsOptions)) { + const blocks = groupLogsByBlockNumber(logFilter ? logs.filter(logFilter) : logs, toBlock); + for (const block of blocks) { + await storageAdapter(block); + yield block; + } + } +}