From 9f2011840f84127502dd5766108a2ecfe6b6a0ed Mon Sep 17 00:00:00 2001 From: alvrs Date: Wed, 11 Dec 2024 15:14:20 +0000 Subject: [PATCH 1/2] add getLogsFromLoadBalancedRpc --- packages/block-logs-stream/src/common.ts | 1 + packages/block-logs-stream/src/fetchLogs.ts | 50 ++++++++- .../src/getLogsFromLoadBalancedRpc.ts | 106 ++++++++++++++++++ packages/block-logs-stream/src/index.ts | 1 + 4 files changed, 152 insertions(+), 6 deletions(-) create mode 100644 packages/block-logs-stream/src/common.ts create mode 100644 packages/block-logs-stream/src/getLogsFromLoadBalancedRpc.ts diff --git a/packages/block-logs-stream/src/common.ts b/packages/block-logs-stream/src/common.ts new file mode 100644 index 0000000000..57275c603b --- /dev/null +++ b/packages/block-logs-stream/src/common.ts @@ -0,0 +1 @@ +export const blockNotFoundMessage = "block not found"; diff --git a/packages/block-logs-stream/src/fetchLogs.ts b/packages/block-logs-stream/src/fetchLogs.ts index b5497ab04f..7c9120d42f 100644 --- a/packages/block-logs-stream/src/fetchLogs.ts +++ b/packages/block-logs-stream/src/fetchLogs.ts @@ -3,6 +3,8 @@ import { bigIntMax, bigIntMin, wait } from "@latticexyz/common/utils"; import { debug } from "./debug"; import { getAction } from "viem/utils"; import { getLogs as viem_getLogs } from "viem/actions"; +import { getLogsFromLoadBalancedRpc } from "./getLogsFromLoadBalancedRpc"; +import { blockNotFoundMessage } from "./common"; export type FetchLogsOptions = { /** @@ -47,6 +49,25 @@ export type FetchLogsOptions = { */ events: abiEvents; } + | { + /** + * Explicitly handle potentially unsynced load balanced RPCs by using a batch call with `eth_getLogs` and `eth_getBlockByNumber`. + * See https://indexsupply.com/shovel/docs/#unsynchronized-ethereum-nodes + */ + handleUnsyncedLoadBalancedRpc: true; + /** + * The HTTP URL of the load balanced RPC. + */ + rpcUrl: string; + /** + * Optional contract address(es) to fetch logs for. + */ + address?: Address | Address[]; + /** + * Events to fetch logs for. + */ + events: abiEvents; + } >; export type FetchLogsResult = { @@ -89,6 +110,8 @@ const BLOCK_RANGE_ERRORS = [ "query exceeds max results", ]; +const UNSYNCED_RPC_ERRORS = [blockNotFoundMessage]; + /** * An asynchronous generator function that fetches logs from the blockchain in a range of blocks. * @@ -113,12 +136,21 @@ export async function* fetchLogs({ }: FetchLogsOptions): AsyncGenerator> { const getLogs = opts.getLogs ?? - (async (blockRange): Promise> => - getAction( - opts.publicClient, - viem_getLogs, - "getLogs", - )({ ...blockRange, address: opts.address, events: opts.events, strict: true })); + (opts.handleUnsyncedLoadBalancedRpc + ? async (blockRange): Promise> => + getLogsFromLoadBalancedRpc({ + ...blockRange, + rpcUrl: opts.rpcUrl, + address: opts.address, + events: opts.events, + strict: true, + }) + : async (blockRange): Promise> => + getAction( + opts.publicClient, + viem_getLogs, + "getLogs", + )({ ...blockRange, address: opts.address, events: opts.events, strict: true })); let fromBlock = initialFromBlock; let blockRange = bigIntMin(maxBlockRange, initialToBlock - fromBlock); @@ -158,6 +190,12 @@ export async function* fetchLogs({ continue; } + if (opts.handleUnsyncedLoadBalancedRpc && UNSYNCED_RPC_ERRORS.some((e) => error.message.includes(e))) { + debug(`got unsynced rpc error, retrying`); + await wait(1000); + continue; + } + throw error; } } diff --git a/packages/block-logs-stream/src/getLogsFromLoadBalancedRpc.ts b/packages/block-logs-stream/src/getLogsFromLoadBalancedRpc.ts new file mode 100644 index 0000000000..28bc49f811 --- /dev/null +++ b/packages/block-logs-stream/src/getLogsFromLoadBalancedRpc.ts @@ -0,0 +1,106 @@ +import { + AbiEvent, + BlockNumber, + BlockTag, + GetLogsParameters, + GetLogsReturnType, + EncodeEventTopicsParameters, + LogTopic, + RpcLog, + encodeEventTopics, + formatLog, + numberToHex, + parseEventLogs, + toHex, + Block, +} from "viem"; +import { blockNotFoundMessage } from "./common"; + +/** + * Mostly equivalent to viem's `getLogs` action, but using a batch rpc call to check if the RPC has the requested block. + * If the RPC doesn't have the requested block, it will throw a "block not found" error. + * Expects a HTTP RPC endpoint that supports batch requests. + */ +export async function getLogsFromLoadBalancedRpc< + const abiEvent extends AbiEvent | undefined = undefined, + const abiEvents extends readonly AbiEvent[] | readonly unknown[] | undefined = abiEvent extends AbiEvent + ? [abiEvent] + : undefined, + strict extends boolean | undefined = undefined, + fromBlock extends BlockNumber | BlockTag | undefined = undefined, + toBlock extends BlockNumber | BlockTag | undefined = undefined, +>({ + rpcUrl, + address, + fromBlock, + toBlock, + event, + events: events_, + args, + strict: strict_, +}: Omit, "blockHash"> & { rpcUrl: string }): Promise< + GetLogsReturnType +> { + const strict = strict_ ?? false; + const events = events_ ?? (event ? [event] : undefined); + + let topics: LogTopic[] = []; + if (events) { + const encoded = (events as AbiEvent[]).flatMap((event) => + encodeEventTopics({ + abi: [event], + eventName: (event as AbiEvent).name, + args: events_ ? undefined : args, + } as EncodeEventTopicsParameters), + ); + // TODO: Clean up type casting + topics = [encoded as LogTopic]; + if (event) topics = topics[0] as LogTopic[]; + } + + const requests = [ + { + method: "eth_getBlockByNumber", + params: [typeof toBlock === "bigint" || typeof toBlock === "number" ? toHex(toBlock) : toBlock, false], + id: 1, + jsonrpc: "2.0", + }, + { + method: "eth_getLogs", + params: [ + { + address, + topics, + fromBlock: typeof fromBlock === "bigint" ? numberToHex(fromBlock) : fromBlock, + toBlock: typeof toBlock === "bigint" ? numberToHex(toBlock) : toBlock, + }, + ], + id: 2, + jsonrpc: "2.0", + }, + ]; + + const results: [Block | undefined, RpcLog[]] = await fetch(rpcUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(requests), + }).then((res) => res.json()); + + const [block, logs] = results; + + if (!block) { + // Throw an explicit error so the caller can retry instead of silently returning an empty array + throw new Error(blockNotFoundMessage); + } + + const formattedLogs = logs.map((log) => formatLog(log)); + if (!events) return formattedLogs as GetLogsReturnType; + return parseEventLogs({ + abi: events, + args: args as never, + logs: formattedLogs, + strict, + }) as unknown as GetLogsReturnType; +} diff --git a/packages/block-logs-stream/src/index.ts b/packages/block-logs-stream/src/index.ts index 3d007724c0..b35763e9ff 100644 --- a/packages/block-logs-stream/src/index.ts +++ b/packages/block-logs-stream/src/index.ts @@ -3,3 +3,4 @@ export * from "./createBlockStream"; export * from "./fetchBlockLogs"; export * from "./fetchLogs"; export * from "./groupLogsByBlockNumber"; +export * from "./getLogsFromLoadBalancedRpc"; From 48c2d9c0f829316ac0ad2037de9f8d064a0f29b6 Mon Sep 17 00:00:00 2001 From: alvrs Date: Wed, 11 Dec 2024 15:37:32 +0000 Subject: [PATCH 2/2] add option to explicitly handle load balanced rpc to createStoreSync --- packages/block-logs-stream/src/common.ts | 2 ++ packages/block-logs-stream/src/fetchLogs.ts | 6 ++-- .../src/getLogsFromLoadBalancedRpc.ts | 12 ++++---- packages/block-logs-stream/src/index.ts | 1 + packages/store-sync/src/common.ts | 29 ++++++++++++++----- packages/store-sync/src/createStoreSync.ts | 9 ++++-- 6 files changed, 40 insertions(+), 19 deletions(-) diff --git a/packages/block-logs-stream/src/common.ts b/packages/block-logs-stream/src/common.ts index 57275c603b..4bc489dc67 100644 --- a/packages/block-logs-stream/src/common.ts +++ b/packages/block-logs-stream/src/common.ts @@ -1 +1,3 @@ export const blockNotFoundMessage = "block not found"; + +export type HttpRpcUrl = `http${string}`; diff --git a/packages/block-logs-stream/src/fetchLogs.ts b/packages/block-logs-stream/src/fetchLogs.ts index 7c9120d42f..5f7cd09cbf 100644 --- a/packages/block-logs-stream/src/fetchLogs.ts +++ b/packages/block-logs-stream/src/fetchLogs.ts @@ -4,7 +4,7 @@ import { debug } from "./debug"; import { getAction } from "viem/utils"; import { getLogs as viem_getLogs } from "viem/actions"; import { getLogsFromLoadBalancedRpc } from "./getLogsFromLoadBalancedRpc"; -import { blockNotFoundMessage } from "./common"; +import { HttpRpcUrl, blockNotFoundMessage } from "./common"; export type FetchLogsOptions = { /** @@ -58,7 +58,7 @@ export type FetchLogsOptions = { /** * The HTTP URL of the load balanced RPC. */ - rpcUrl: string; + httpRpcUrl: HttpRpcUrl; /** * Optional contract address(es) to fetch logs for. */ @@ -140,7 +140,7 @@ export async function* fetchLogs({ ? async (blockRange): Promise> => getLogsFromLoadBalancedRpc({ ...blockRange, - rpcUrl: opts.rpcUrl, + httpRpcUrl: opts.httpRpcUrl, address: opts.address, events: opts.events, strict: true, diff --git a/packages/block-logs-stream/src/getLogsFromLoadBalancedRpc.ts b/packages/block-logs-stream/src/getLogsFromLoadBalancedRpc.ts index 28bc49f811..aa496c8cf5 100644 --- a/packages/block-logs-stream/src/getLogsFromLoadBalancedRpc.ts +++ b/packages/block-logs-stream/src/getLogsFromLoadBalancedRpc.ts @@ -14,7 +14,7 @@ import { toHex, Block, } from "viem"; -import { blockNotFoundMessage } from "./common"; +import { HttpRpcUrl, blockNotFoundMessage } from "./common"; /** * Mostly equivalent to viem's `getLogs` action, but using a batch rpc call to check if the RPC has the requested block. @@ -30,7 +30,7 @@ export async function getLogsFromLoadBalancedRpc< fromBlock extends BlockNumber | BlockTag | undefined = undefined, toBlock extends BlockNumber | BlockTag | undefined = undefined, >({ - rpcUrl, + httpRpcUrl, address, fromBlock, toBlock, @@ -38,9 +38,9 @@ export async function getLogsFromLoadBalancedRpc< events: events_, args, strict: strict_, -}: Omit, "blockHash"> & { rpcUrl: string }): Promise< - GetLogsReturnType -> { +}: Omit, "blockHash"> & { + httpRpcUrl: HttpRpcUrl; +}): Promise> { const strict = strict_ ?? false; const events = events_ ?? (event ? [event] : undefined); @@ -80,7 +80,7 @@ export async function getLogsFromLoadBalancedRpc< }, ]; - const results: [Block | undefined, RpcLog[]] = await fetch(rpcUrl, { + const results: [Block | undefined, RpcLog[]] = await fetch(httpRpcUrl, { method: "POST", headers: { "Content-Type": "application/json", diff --git a/packages/block-logs-stream/src/index.ts b/packages/block-logs-stream/src/index.ts index b35763e9ff..96dcdcc690 100644 --- a/packages/block-logs-stream/src/index.ts +++ b/packages/block-logs-stream/src/index.ts @@ -4,3 +4,4 @@ export * from "./fetchBlockLogs"; export * from "./fetchLogs"; export * from "./groupLogsByBlockNumber"; export * from "./getLogsFromLoadBalancedRpc"; +export * from "./common"; diff --git a/packages/store-sync/src/common.ts b/packages/store-sync/src/common.ts index 02f9ee1fc1..a86e81a525 100644 --- a/packages/store-sync/src/common.ts +++ b/packages/store-sync/src/common.ts @@ -13,6 +13,7 @@ import storeConfig from "@latticexyz/store/mud.config"; import worldConfig from "@latticexyz/world/mud.config"; import { Table as ConfigTable, Schema } from "@latticexyz/config"; import { configToTables } from "./configToTables"; +import { HttpRpcUrl } from "@latticexyz/block-logs-stream"; export const mudTables = { ...configToTables(storeConfig), @@ -66,12 +67,6 @@ export type SyncFilter = { }; export type SyncOptions = { - /** - * [viem `PublicClient`][0] used for fetching logs from the RPC. - * - * [0]: https://viem.sh/docs/clients/public.html - */ - publicClient: PublicClient; /** * MUD Store/World contract address */ @@ -115,7 +110,27 @@ export type SyncOptions = { blockNumber: bigint; logs: readonly StorageAdapterLog[]; }; -}; +} & ( + | { + /** + * [viem `PublicClient`][0] used for fetching logs from the RPC. + * + * [0]: https://viem.sh/docs/clients/public.html + */ + publicClient: PublicClient; + } + | { + /** + * Explicitly handle potentially unsynced load balanced RPCs by using a batch call with `eth_getLogs` and `eth_getBlockByNumber`. + * See https://indexsupply.com/shovel/docs/#unsynchronized-ethereum-nodes + */ + handleUnsyncedLoadBalancedRpc: true; + /** + * The HTTP URL of the load balanced RPC. + */ + httpRpcUrl: HttpRpcUrl; + } +); export type WaitForTransactionResult = Pick; diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index ab3eb005a6..8c8f0a30b9 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -1,5 +1,5 @@ import { storeEventsAbi } from "@latticexyz/store"; -import { GetTransactionReceiptErrorType, Hex } from "viem"; +import { GetTransactionReceiptErrorType, Hex, createPublicClient, http } from "viem"; import { StorageAdapter, StorageAdapterBlock, @@ -59,7 +59,6 @@ type CreateStoreSyncOptions = SyncOptions & { export async function createStoreSync({ storageAdapter, onProgress, - publicClient, address, filters: initialFilters = [], tableIds = [], @@ -69,7 +68,11 @@ export async function createStoreSync({ initialState, initialBlockLogs, indexerUrl: initialIndexerUrl, + ...opts }: CreateStoreSyncOptions): Promise { + const publicClient = + "publicClient" in opts ? opts.publicClient : createPublicClient({ transport: http(opts.httpRpcUrl) }); + const filters: SyncFilter[] = initialFilters.length || tableIds.length ? [...initialFilters, ...tableIds.map((tableId) => ({ tableId })), ...defaultFilters] @@ -257,7 +260,6 @@ export async function createStoreSync({ map(([startBlock, endBlock]) => ({ startBlock, endBlock })), concatMap((range) => { const storedBlocks = fetchAndStoreLogs({ - publicClient, address, events: storeEventsAbi, maxBlockRange, @@ -267,6 +269,7 @@ export async function createStoreSync({ toBlock: range.endBlock, logFilter, storageAdapter, + ...opts, }); return from(storedBlocks);