Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add option to explicitly handle fetching logs from load balanced rpcs #3394

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/block-logs-stream/src/common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export const blockNotFoundMessage = "block not found";

export type HttpRpcUrl = `http${string}`;
50 changes: 44 additions & 6 deletions packages/block-logs-stream/src/fetchLogs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { bigIntMax, bigIntMin, wait } from "@latticexyz/common/utils";
import { debug } from "./debug";
import { getAction } from "viem/utils";
import { getLogs as viem_getLogs } from "viem/actions";
import { getLogsFromLoadBalancedRpc } from "./getLogsFromLoadBalancedRpc";
import { HttpRpcUrl, blockNotFoundMessage } from "./common";

export type FetchLogsOptions<abiEvents extends readonly AbiEvent[]> = {
/**
Expand Down Expand Up @@ -47,6 +49,25 @@ export type FetchLogsOptions<abiEvents extends readonly AbiEvent[]> = {
*/
events: abiEvents;
}
| {
/**
* Explicitly handle potentially unsynced load balanced RPCs by using a batch call with `eth_getLogs` and `eth_getBlockByNumber`.
* See https://indexsupply.com/shovel/docs/#unsynchronized-ethereum-nodes
*/
handleUnsyncedLoadBalancedRpc: true;
/**
* The HTTP URL of the load balanced RPC.
*/
httpRpcUrl: HttpRpcUrl;
/**
* Optional contract address(es) to fetch logs for.
*/
address?: Address | Address[];
/**
* Events to fetch logs for.
*/
events: abiEvents;
}
>;

export type FetchLogsResult<abiEvents extends readonly AbiEvent[]> = {
Expand Down Expand Up @@ -89,6 +110,8 @@ const BLOCK_RANGE_ERRORS = [
"query exceeds max results",
];

const UNSYNCED_RPC_ERRORS = [blockNotFoundMessage];

/**
* An asynchronous generator function that fetches logs from the blockchain in a range of blocks.
*
Expand All @@ -113,12 +136,21 @@ export async function* fetchLogs<abiEvents extends readonly AbiEvent[]>({
}: FetchLogsOptions<abiEvents>): AsyncGenerator<FetchLogsResult<abiEvents>> {
const getLogs =
opts.getLogs ??
(async (blockRange): Promise<GetLogsReturnType<undefined, abiEvents, true, BlockNumber, BlockNumber>> =>
getAction(
opts.publicClient,
viem_getLogs,
"getLogs",
)({ ...blockRange, address: opts.address, events: opts.events, strict: true }));
(opts.handleUnsyncedLoadBalancedRpc
? async (blockRange): Promise<GetLogsReturnType<undefined, abiEvents, true, BlockNumber, BlockNumber>> =>
getLogsFromLoadBalancedRpc({
...blockRange,
httpRpcUrl: opts.httpRpcUrl,
address: opts.address,
events: opts.events,
strict: true,
})
: async (blockRange): Promise<GetLogsReturnType<undefined, abiEvents, true, BlockNumber, BlockNumber>> =>
getAction(
opts.publicClient,
viem_getLogs,
"getLogs",
)({ ...blockRange, address: opts.address, events: opts.events, strict: true }));

let fromBlock = initialFromBlock;
let blockRange = bigIntMin(maxBlockRange, initialToBlock - fromBlock);
Expand Down Expand Up @@ -158,6 +190,12 @@ export async function* fetchLogs<abiEvents extends readonly AbiEvent[]>({
continue;
}

if (opts.handleUnsyncedLoadBalancedRpc && UNSYNCED_RPC_ERRORS.some((e) => error.message.includes(e))) {
debug(`got unsynced rpc error, retrying`);
await wait(1000);
continue;
}

throw error;
}
}
Expand Down
106 changes: 106 additions & 0 deletions packages/block-logs-stream/src/getLogsFromLoadBalancedRpc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import {
AbiEvent,
BlockNumber,
BlockTag,
GetLogsParameters,
GetLogsReturnType,
EncodeEventTopicsParameters,
LogTopic,
RpcLog,
encodeEventTopics,
formatLog,
numberToHex,
parseEventLogs,
toHex,
Block,
} from "viem";
import { HttpRpcUrl, blockNotFoundMessage } from "./common";

/**
* Mostly equivalent to viem's `getLogs` action, but using a batch rpc call to check if the RPC has the requested block.
* If the RPC doesn't have the requested block, it will throw a "block not found" error.
* Expects a HTTP RPC endpoint that supports batch requests.
*/
export async function getLogsFromLoadBalancedRpc<
const abiEvent extends AbiEvent | undefined = undefined,
const abiEvents extends readonly AbiEvent[] | readonly unknown[] | undefined = abiEvent extends AbiEvent
? [abiEvent]
: undefined,
strict extends boolean | undefined = undefined,
fromBlock extends BlockNumber | BlockTag | undefined = undefined,
toBlock extends BlockNumber | BlockTag | undefined = undefined,
>({
httpRpcUrl,
address,
fromBlock,
toBlock,
event,
events: events_,
args,
strict: strict_,
}: Omit<GetLogsParameters<abiEvent, abiEvents, strict, fromBlock, toBlock>, "blockHash"> & {
httpRpcUrl: HttpRpcUrl;
}): Promise<GetLogsReturnType<abiEvent, abiEvents, strict, fromBlock, toBlock>> {
const strict = strict_ ?? false;
const events = events_ ?? (event ? [event] : undefined);

let topics: LogTopic[] = [];
if (events) {
const encoded = (events as AbiEvent[]).flatMap((event) =>
encodeEventTopics({
abi: [event],
eventName: (event as AbiEvent).name,
args: events_ ? undefined : args,
} as EncodeEventTopicsParameters),
);
// TODO: Clean up type casting
topics = [encoded as LogTopic];
if (event) topics = topics[0] as LogTopic[];
}

const requests = [
{
method: "eth_getBlockByNumber",
params: [typeof toBlock === "bigint" || typeof toBlock === "number" ? toHex(toBlock) : toBlock, false],
id: 1,
jsonrpc: "2.0",
},
{
method: "eth_getLogs",
params: [
{
address,
topics,
fromBlock: typeof fromBlock === "bigint" ? numberToHex(fromBlock) : fromBlock,
toBlock: typeof toBlock === "bigint" ? numberToHex(toBlock) : toBlock,
},
],
id: 2,
jsonrpc: "2.0",
},
];

const results: [Block | undefined, RpcLog[]] = await fetch(httpRpcUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(requests),
}).then((res) => res.json());

const [block, logs] = results;

if (!block) {
// Throw an explicit error so the caller can retry instead of silently returning an empty array
throw new Error(blockNotFoundMessage);
}

const formattedLogs = logs.map((log) => formatLog(log));
if (!events) return formattedLogs as GetLogsReturnType<abiEvent, abiEvents, strict, fromBlock, toBlock>;
return parseEventLogs({
abi: events,
args: args as never,
logs: formattedLogs,
strict,
}) as unknown as GetLogsReturnType<abiEvent, abiEvents, strict, fromBlock, toBlock>;
}
2 changes: 2 additions & 0 deletions packages/block-logs-stream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ export * from "./createBlockStream";
export * from "./fetchBlockLogs";
export * from "./fetchLogs";
export * from "./groupLogsByBlockNumber";
export * from "./getLogsFromLoadBalancedRpc";
export * from "./common";
29 changes: 22 additions & 7 deletions packages/store-sync/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import storeConfig from "@latticexyz/store/mud.config";
import worldConfig from "@latticexyz/world/mud.config";
import { Table as ConfigTable, Schema } from "@latticexyz/config";
import { configToTables } from "./configToTables";
import { HttpRpcUrl } from "@latticexyz/block-logs-stream";

export const mudTables = {
...configToTables(storeConfig),
Expand Down Expand Up @@ -66,12 +67,6 @@ export type SyncFilter = {
};

export type SyncOptions = {
/**
* [viem `PublicClient`][0] used for fetching logs from the RPC.
*
* [0]: https://viem.sh/docs/clients/public.html
*/
publicClient: PublicClient;
/**
* MUD Store/World contract address
*/
Expand Down Expand Up @@ -115,7 +110,27 @@ export type SyncOptions = {
blockNumber: bigint;
logs: readonly StorageAdapterLog[];
};
};
} & (
| {
/**
* [viem `PublicClient`][0] used for fetching logs from the RPC.
*
* [0]: https://viem.sh/docs/clients/public.html
*/
publicClient: PublicClient;
}
| {
/**
* Explicitly handle potentially unsynced load balanced RPCs by using a batch call with `eth_getLogs` and `eth_getBlockByNumber`.
* See https://indexsupply.com/shovel/docs/#unsynchronized-ethereum-nodes
*/
handleUnsyncedLoadBalancedRpc: true;
/**
* The HTTP URL of the load balanced RPC.
*/
httpRpcUrl: HttpRpcUrl;
}
);

export type WaitForTransactionResult = Pick<TransactionReceipt, "blockNumber" | "status" | "transactionHash">;

Expand Down
9 changes: 6 additions & 3 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { storeEventsAbi } from "@latticexyz/store";
import { GetTransactionReceiptErrorType, Hex } from "viem";
import { GetTransactionReceiptErrorType, Hex, createPublicClient, http } from "viem";
import {
StorageAdapter,
StorageAdapterBlock,
Expand Down Expand Up @@ -59,7 +59,6 @@ type CreateStoreSyncOptions = SyncOptions & {
export async function createStoreSync({
storageAdapter,
onProgress,
publicClient,
address,
filters: initialFilters = [],
tableIds = [],
Expand All @@ -69,7 +68,11 @@ export async function createStoreSync({
initialState,
initialBlockLogs,
indexerUrl: initialIndexerUrl,
...opts
}: CreateStoreSyncOptions): Promise<SyncResult> {
const publicClient =
"publicClient" in opts ? opts.publicClient : createPublicClient({ transport: http(opts.httpRpcUrl) });

const filters: SyncFilter[] =
initialFilters.length || tableIds.length
? [...initialFilters, ...tableIds.map((tableId) => ({ tableId })), ...defaultFilters]
Expand Down Expand Up @@ -257,7 +260,6 @@ export async function createStoreSync({
map(([startBlock, endBlock]) => ({ startBlock, endBlock })),
concatMap((range) => {
const storedBlocks = fetchAndStoreLogs({
publicClient,
address,
events: storeEventsAbi,
maxBlockRange,
Expand All @@ -267,6 +269,7 @@ export async function createStoreSync({
toBlock: range.endBlock,
logFilter,
storageAdapter,
...opts,
});

return from(storedBlocks);
Expand Down
Loading