diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index 6fff586545..a72de4a7db 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -28,9 +28,7 @@ import { mergeMap, BehaviorSubject, ignoreElements, - last, first, - defaultIfEmpty, } from "rxjs"; import { debug as parentDebug } from "./debug"; import { SyncStep } from "./SyncStep"; @@ -244,7 +242,8 @@ export async function createStoreSync({ storageAdapter, logFilter, }); - const storedBlock$ = from(storedBlocks); + + const storedBlock$ = from(storedBlocks).pipe(share()); return concat( storageAdapterLock$.pipe( @@ -252,32 +251,33 @@ export async function createStoreSync({ tap(() => storageAdapterLock$.next(true)), ignoreElements(), ), - storedBlock$, storedBlock$.pipe( - defaultIfEmpty(null), - last(), - concatMap(async (block) => { - if (block == null) return; - await applyOptimisticLogs(block.blockNumber); + tap(({ blockNumber, logs }) => { + debug("stored", logs.length, "logs for block", blockNumber); + lastBlockNumberProcessed = blockNumber; + }), + ), + of(true).pipe( + concatMap(async () => { + if (lastBlockNumberProcessed != null) { + await applyOptimisticLogs(lastBlockNumberProcessed); + } }), tap(() => storageAdapterLock$.next(false)), ignoreElements(), ), ); }), - tap(({ blockNumber, logs }) => { - debug("stored", logs.length, "logs for block", blockNumber); - lastBlockNumberProcessed = blockNumber; - + tap(({ blockNumber }) => { if (startBlock != null && endBlock != null) { if (blockNumber < endBlock) { const totalBlocks = endBlock - startBlock; - const processedBlocks = lastBlockNumberProcessed - startBlock; + const processedBlocks = blockNumber - startBlock; onProgress?.({ step: SyncStep.RPC, percentage: Number((processedBlocks * 1000n) / totalBlocks) / 10, latestBlockNumber: endBlock, - lastBlockNumberProcessed, + lastBlockNumberProcessed: blockNumber, message: "Hydrating from RPC", }); } else { @@ -285,7 +285,7 @@ export async function createStoreSync({ step: SyncStep.LIVE, percentage: 100, latestBlockNumber: endBlock, - lastBlockNumberProcessed, + lastBlockNumberProcessed: blockNumber, message: "All caught up!", }); } @@ -334,10 +334,9 @@ export async function createStoreSync({ if (receipt.status === "success") { const logs = parseEventLogs({ abi: storeEventsAbi, logs: receipt.logs }); if (logs.length) { - debug("applying", logs.length, "optimistic logs"); // wait for lock to clear + // unclear what happens if two waitForTransaction calls are triggered simultaneously and both get released for the same lock emission? await firstValueFrom(storageAdapterLock$.pipe(filter((lock) => lock === false))); - storageAdapterLock$.next(true); optimisticLogs = [...optimisticLogs, ...logs]; await applyOptimisticLogs(lastBlock.blockNumber);