Skip to content

Commit

Permalink
add getLogsFromLoadBalancedRpc
Browse files Browse the repository at this point in the history
  • Loading branch information
alvrs committed Dec 11, 2024
1 parent 1cea106 commit 9f20118
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 6 deletions.
1 change: 1 addition & 0 deletions packages/block-logs-stream/src/common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const blockNotFoundMessage = "block not found";
50 changes: 44 additions & 6 deletions packages/block-logs-stream/src/fetchLogs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { bigIntMax, bigIntMin, wait } from "@latticexyz/common/utils";
import { debug } from "./debug";
import { getAction } from "viem/utils";
import { getLogs as viem_getLogs } from "viem/actions";
import { getLogsFromLoadBalancedRpc } from "./getLogsFromLoadBalancedRpc";
import { blockNotFoundMessage } from "./common";

export type FetchLogsOptions<abiEvents extends readonly AbiEvent[]> = {
/**
Expand Down Expand Up @@ -47,6 +49,25 @@ export type FetchLogsOptions<abiEvents extends readonly AbiEvent[]> = {
*/
events: abiEvents;
}
| {
/**
* Explicitly handle potentially unsynced load balanced RPCs by using a batch call with `eth_getLogs` and `eth_getBlockByNumber`.
* See https://indexsupply.com/shovel/docs/#unsynchronized-ethereum-nodes
*/
handleUnsyncedLoadBalancedRpc: true;
/**
* The HTTP URL of the load balanced RPC.
*/
rpcUrl: string;
/**
* Optional contract address(es) to fetch logs for.
*/
address?: Address | Address[];
/**
* Events to fetch logs for.
*/
events: abiEvents;
}
>;

export type FetchLogsResult<abiEvents extends readonly AbiEvent[]> = {
Expand Down Expand Up @@ -89,6 +110,8 @@ const BLOCK_RANGE_ERRORS = [
"query exceeds max results",
];

const UNSYNCED_RPC_ERRORS = [blockNotFoundMessage];

/**
* An asynchronous generator function that fetches logs from the blockchain in a range of blocks.
*
Expand All @@ -113,12 +136,21 @@ export async function* fetchLogs<abiEvents extends readonly AbiEvent[]>({
}: FetchLogsOptions<abiEvents>): AsyncGenerator<FetchLogsResult<abiEvents>> {
const getLogs =
opts.getLogs ??
(async (blockRange): Promise<GetLogsReturnType<undefined, abiEvents, true, BlockNumber, BlockNumber>> =>
getAction(
opts.publicClient,
viem_getLogs,
"getLogs",
)({ ...blockRange, address: opts.address, events: opts.events, strict: true }));
(opts.handleUnsyncedLoadBalancedRpc
? async (blockRange): Promise<GetLogsReturnType<undefined, abiEvents, true, BlockNumber, BlockNumber>> =>
getLogsFromLoadBalancedRpc({
...blockRange,
rpcUrl: opts.rpcUrl,
address: opts.address,
events: opts.events,
strict: true,
})
: async (blockRange): Promise<GetLogsReturnType<undefined, abiEvents, true, BlockNumber, BlockNumber>> =>
getAction(
opts.publicClient,
viem_getLogs,
"getLogs",
)({ ...blockRange, address: opts.address, events: opts.events, strict: true }));

let fromBlock = initialFromBlock;
let blockRange = bigIntMin(maxBlockRange, initialToBlock - fromBlock);
Expand Down Expand Up @@ -158,6 +190,12 @@ export async function* fetchLogs<abiEvents extends readonly AbiEvent[]>({
continue;
}

if (opts.handleUnsyncedLoadBalancedRpc && UNSYNCED_RPC_ERRORS.some((e) => error.message.includes(e))) {
debug(`got unsynced rpc error, retrying`);
await wait(1000);
continue;
}

throw error;
}
}
Expand Down
106 changes: 106 additions & 0 deletions packages/block-logs-stream/src/getLogsFromLoadBalancedRpc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import {
AbiEvent,
BlockNumber,
BlockTag,
GetLogsParameters,
GetLogsReturnType,
EncodeEventTopicsParameters,
LogTopic,
RpcLog,
encodeEventTopics,
formatLog,
numberToHex,
parseEventLogs,
toHex,
Block,
} from "viem";
import { blockNotFoundMessage } from "./common";

/**
* Mostly equivalent to viem's `getLogs` action, but using a batch rpc call to check if the RPC has the requested block.
* If the RPC doesn't have the requested block, it will throw a "block not found" error.
* Expects a HTTP RPC endpoint that supports batch requests.
*/
export async function getLogsFromLoadBalancedRpc<
const abiEvent extends AbiEvent | undefined = undefined,
const abiEvents extends readonly AbiEvent[] | readonly unknown[] | undefined = abiEvent extends AbiEvent
? [abiEvent]
: undefined,
strict extends boolean | undefined = undefined,
fromBlock extends BlockNumber | BlockTag | undefined = undefined,
toBlock extends BlockNumber | BlockTag | undefined = undefined,
>({
rpcUrl,
address,
fromBlock,
toBlock,
event,
events: events_,
args,
strict: strict_,
}: Omit<GetLogsParameters<abiEvent, abiEvents, strict, fromBlock, toBlock>, "blockHash"> & { rpcUrl: string }): Promise<
GetLogsReturnType<abiEvent, abiEvents, strict, fromBlock, toBlock>
> {
const strict = strict_ ?? false;
const events = events_ ?? (event ? [event] : undefined);

let topics: LogTopic[] = [];
if (events) {
const encoded = (events as AbiEvent[]).flatMap((event) =>
encodeEventTopics({
abi: [event],
eventName: (event as AbiEvent).name,
args: events_ ? undefined : args,
} as EncodeEventTopicsParameters),
);
// TODO: Clean up type casting
topics = [encoded as LogTopic];
if (event) topics = topics[0] as LogTopic[];
}

const requests = [
{
method: "eth_getBlockByNumber",
params: [typeof toBlock === "bigint" || typeof toBlock === "number" ? toHex(toBlock) : toBlock, false],
id: 1,
jsonrpc: "2.0",
},
{
method: "eth_getLogs",
params: [
{
address,
topics,
fromBlock: typeof fromBlock === "bigint" ? numberToHex(fromBlock) : fromBlock,
toBlock: typeof toBlock === "bigint" ? numberToHex(toBlock) : toBlock,
},
],
id: 2,
jsonrpc: "2.0",
},
];

const results: [Block | undefined, RpcLog[]] = await fetch(rpcUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(requests),
}).then((res) => res.json());

const [block, logs] = results;

if (!block) {
// Throw an explicit error so the caller can retry instead of silently returning an empty array
throw new Error(blockNotFoundMessage);
}

const formattedLogs = logs.map((log) => formatLog(log));
if (!events) return formattedLogs as GetLogsReturnType<abiEvent, abiEvents, strict, fromBlock, toBlock>;
return parseEventLogs({
abi: events,
args: args as never,
logs: formattedLogs,
strict,
}) as unknown as GetLogsReturnType<abiEvent, abiEvents, strict, fromBlock, toBlock>;
}
1 change: 1 addition & 0 deletions packages/block-logs-stream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export * from "./createBlockStream";
export * from "./fetchBlockLogs";
export * from "./fetchLogs";
export * from "./groupLogsByBlockNumber";
export * from "./getLogsFromLoadBalancedRpc";

0 comments on commit 9f20118

Please sign in to comment.