From de3bc3d1fe78762e43b692f6f3334dee9b632c8f Mon Sep 17 00:00:00 2001 From: alvarius Date: Tue, 16 Apr 2024 14:15:40 +0100 Subject: [PATCH] fix(store-sync): reduce latency in waitForTransaction (#2665) Co-authored-by: Kevin Ingersoll --- .changeset/twelve-hairs-fry.md | 5 +++++ packages/store-sync/src/createStoreSync.ts | 11 +++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) create mode 100644 .changeset/twelve-hairs-fry.md diff --git a/.changeset/twelve-hairs-fry.md b/.changeset/twelve-hairs-fry.md new file mode 100644 index 0000000000..dd2a66514e --- /dev/null +++ b/.changeset/twelve-hairs-fry.md @@ -0,0 +1,5 @@ +--- +"@latticexyz/store-sync": patch +--- + +Small optimizations in `waitForTransaction` to parallelize network requests. diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index a489ea0f77..4431a27003 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -26,6 +26,7 @@ import { combineLatest, scan, identity, + mergeMap, } from "rxjs"; import { debug as parentDebug } from "./debug"; import { SyncStep } from "./SyncStep"; @@ -197,7 +198,9 @@ export async function createStoreSync( startBlock = range.startBlock; endBlock = range.endBlock; }), - concatMap((range) => { + // We use `map` instead of `concatMap` here to send the fetch request immediately when a new block range appears, + // instead of sending the next request only when the previous one completed. + map((range) => { const storedBlocks = fetchAndStoreLogs({ publicClient, address, @@ -213,6 +216,8 @@ export async function createStoreSync( return from(storedBlocks); }), + // `concatMap` turns the stream of promises into their awaited values + concatMap(identity), tap(({ blockNumber, logs }) => { debug("stored", logs.length, "logs for block", blockNumber); lastBlockNumberProcessed = blockNumber; @@ -263,7 +268,9 @@ export async function createStoreSync( // This currently blocks for async call on each block processed // We could potentially speed this up a tiny bit by racing to see if 1) tx exists in processed block or 2) fetch tx receipt for latest block processed const hasTransaction$ = recentBlocks$.pipe( - concatMap(async (blocks) => { + // We use `mergeMap` instead of `concatMap` here to send the fetch request immediately when a new block range appears, + // instead of sending the next request only when the previous one completed. + mergeMap(async (blocks) => { const txs = blocks.flatMap((block) => block.logs.map((op) => op.transactionHash).filter(isDefined)); if (txs.includes(tx)) return true;