From 0aa7cc6965747e47d519f0b09e2930eac4b96145 Mon Sep 17 00:00:00 2001 From: Dhvani Patel Date: Thu, 13 Jun 2024 12:46:41 +0100 Subject: [PATCH] fix(store-sync): batch call block number with logs to account for unsynced RPCs --- packages/block-logs-stream/src/fetchLogs.ts | 13 ++++++++++- packages/store-sync/src/createStoreSync.ts | 24 ++++++++++++++------- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/packages/block-logs-stream/src/fetchLogs.ts b/packages/block-logs-stream/src/fetchLogs.ts index d6de091619..34fd9d5657 100644 --- a/packages/block-logs-stream/src/fetchLogs.ts +++ b/packages/block-logs-stream/src/fetchLogs.ts @@ -90,7 +90,18 @@ export async function* fetchLogs({ try { const toBlock = fromBlock + blockRange; debug("getting logs", { fromBlock, toBlock }); - const logs = await publicClient.getLogs({ ...getLogsOpts, fromBlock, toBlock, strict: true }); + + const [latestBlockNumber, logs] = await Promise.all([ + publicClient.getBlockNumber({ cacheTime: 0 }), + publicClient.getLogs({ ...getLogsOpts, fromBlock, toBlock, strict: true }), + ]); + if (latestBlockNumber < toBlock) { + const blockTimeInSeconds = 2; + const seconds = Number(toBlock - latestBlockNumber) * blockTimeInSeconds; + debug(`latest block number ${latestBlockNumber} is less than toBlock ${toBlock}, retrying in ${seconds}s`); + await wait(1000 * seconds); + continue; + } yield { fromBlock, toBlock, logs }; fromBlock = toBlock + 1n; blockRange = bigIntMin(maxBlockRange, getLogsOpts.toBlock - fromBlock); diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index a359d8d813..859b2b3a59 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -187,19 +187,22 @@ export async function createStoreSync( tap((startBlock) => debug("starting sync from block", startBlock)), ); + let startBlock: bigint | null = null; + let endBlock: bigint | null = null; + let lastBlockNumberProcessed: bigint | null = null; + const latestBlock$ = createBlockStream({ publicClient, blockTag: followBlockTag }).pipe(shareReplay(1)); const latestBlockNumber$ = latestBlock$.pipe( map((block) => block.number), tap((blockNumber) => { debug("on block number", blockNumber, "for", followBlockTag, "block tag"); }), + filter((blockNumber) => { + return lastBlockNumberProcessed == null || blockNumber > lastBlockNumberProcessed; + }), shareReplay(1), ); - let startBlock: bigint | null = null; - let endBlock: bigint | null = null; - let lastBlockNumberProcessed: bigint | null = null; - const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe( map(([startBlock, endBlock]) => ({ startBlock, endBlock })), tap((range) => { @@ -207,15 +210,20 @@ export async function createStoreSync( endBlock = range.endBlock; }), concatMap((range) => { + const fromBlock = lastBlockNumberProcessed + ? bigIntMax(range.startBlock, lastBlockNumberProcessed + 1n) + : range.startBlock; + const toBlock = range.endBlock; + if (toBlock < fromBlock) { + throw new Error(`toBlock ${toBlock} is less than fromBlock ${fromBlock}`); + } const storedBlocks = fetchAndStoreLogs({ publicClient, address, events: storeEventsAbi, maxBlockRange, - fromBlock: lastBlockNumberProcessed - ? bigIntMax(range.startBlock, lastBlockNumberProcessed + 1n) - : range.startBlock, - toBlock: range.endBlock, + fromBlock: fromBlock, + toBlock: toBlock, storageAdapter, logFilter, });