Skip to content

Commit

Permalink
Merge branch 'main' into patch-2
Browse files Browse the repository at this point in the history
  • Loading branch information
aLIEzsss4 authored Aug 31, 2023
2 parents a667a14 + d8f0ee3 commit f792df0
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 41 deletions.
1 change: 0 additions & 1 deletion packages/store-sync/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
"debug": "^4.3.4",
"drizzle-orm": "^0.27.0",
"kysely": "^0.26.1",
"p-retry": "^5.1.2",
"rxjs": "7.5.5",
"sql.js": "^1.8.0",
"superjson": "^1.12.4",
Expand Down
2 changes: 1 addition & 1 deletion packages/store-sync/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,5 @@ export type SyncResult<TConfig extends StoreConfig = StoreConfig> = {
latestBlockNumber$: Observable<bigint>;
blockLogs$: Observable<BlockLogs>;
blockStorageOperations$: Observable<BlockStorageOperations<TConfig>>;
waitForTransaction: (tx: Hex) => Promise<{ receipt: TransactionReceipt }>;
waitForTransaction: (tx: Hex) => Promise<void>;
};
76 changes: 40 additions & 36 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ConfigToKeyPrimitives, ConfigToValuePrimitives, StoreConfig, storeEventsAbi } from "@latticexyz/store";
import { Hex, TransactionReceipt, WaitForTransactionReceiptTimeoutError } from "viem";
import { Hex, TransactionReceiptNotFoundError } from "viem";
import { SetRecordOperation, SyncOptions, SyncResult, TableWithRecords } from "./common";
import { createBlockStream, blockRangeToLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream";
import {
Expand All @@ -17,14 +17,15 @@ import {
catchError,
shareReplay,
combineLatest,
scan,
identity,
} from "rxjs";
import pRetry from "p-retry";
import { blockLogsToStorage } from "./blockLogsToStorage";
import { BlockStorageOperations, blockLogsToStorage } from "./blockLogsToStorage";
import { debug as parentDebug } from "./debug";
import { createIndexerClient } from "./trpc-indexer";
import { BlockLogsToStorageOptions } from "./blockLogsToStorage";
import { SyncStep } from "./SyncStep";
import { chunk } from "@latticexyz/common/utils";
import { chunk, isDefined } from "@latticexyz/common/utils";

const debug = parentDebug.extend("createStoreSync");

Expand Down Expand Up @@ -229,42 +230,45 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
)
).pipe(share());

async function waitForTransaction(tx: Hex): Promise<{
receipt: TransactionReceipt;
}> {
// viem doesn't retry timeouts, so we'll wrap in a retry
const receipt = await pRetry(
(attempt) => {
// Wait for tx to be mined
debug("waiting for tx receipt", tx, "attempt", attempt);
return publicClient.waitForTransactionReceipt({
hash: tx,
timeout: publicClient.pollingInterval * 2 * attempt,
});
},
{
retries: 3,
onFailedAttempt: (error) => {
if (error instanceof WaitForTransactionReceiptTimeoutError) {
debug("timed out waiting for tx receipt, trying again", tx);
return;
// keep 10 blocks worth processed transactions in memory
const recentBlocksWindow = 10;
// most recent block first, for ease of pulling the first one off the array
const recentBlocks$ = blockStorageOperations$.pipe(
scan<BlockStorageOperations, BlockStorageOperations[]>(
(recentBlocks, block) => [block, ...recentBlocks].slice(0, recentBlocksWindow),
[]
),
filter((recentBlocks) => recentBlocks.length > 0),
shareReplay(1)
);

// TODO: move to its own file so we can test it, have its own debug instance, etc.
async function waitForTransaction(tx: Hex): Promise<void> {
debug("waiting for tx", tx);

// This currently blocks for async call on each block processed
// We could potentially speed this up a tiny bit by racing to see if 1) tx exists in processed block or 2) fetch tx receipt for latest block processed
const hasTransaction$ = recentBlocks$.pipe(
concatMap(async (blocks) => {
const txs = blocks.flatMap((block) => block.operations.map((op) => op.log?.transactionHash).filter(isDefined));
if (txs.includes(tx)) return true;

try {
const lastBlock = blocks[0];
debug("fetching tx receipt for block", lastBlock.blockNumber);
const receipt = await publicClient.getTransactionReceipt({ hash: tx });
return lastBlock.blockNumber >= receipt.blockNumber;
} catch (error) {
if (error instanceof TransactionReceiptNotFoundError) {
return false;
}
throw error;
},
}
}
}),
tap((result) => debug("has tx?", tx, result))
);
debug("got tx receipt", tx, receipt);

// If we haven't processed a block yet or we haven't processed the block for the tx, wait for it
if (lastBlockNumberProcessed == null || lastBlockNumberProcessed < receipt.blockNumber) {
debug("waiting for tx block to be processed", tx, receipt.blockNumber);
await firstValueFrom(
blockStorageOperations$.pipe(filter(({ blockNumber }) => blockNumber >= receipt.blockNumber))
);
}
debug("tx block was processed", tx, receipt.blockNumber);

return { receipt };
await firstValueFrom(hasTransaction$.pipe(filter(identity)));
}

return {
Expand Down
3 changes: 0 additions & 3 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f792df0

Please sign in to comment.