Skip to content

Commit

Permalink
add option to explicitly handle load balanced rpc to createStoreSync
Browse files Browse the repository at this point in the history
  • Loading branch information
alvrs committed Dec 11, 2024
1 parent 9f20118 commit 48c2d9c
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 19 deletions.
2 changes: 2 additions & 0 deletions packages/block-logs-stream/src/common.ts
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
export const blockNotFoundMessage = "block not found";

export type HttpRpcUrl = `http${string}`;
6 changes: 3 additions & 3 deletions packages/block-logs-stream/src/fetchLogs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { debug } from "./debug";
import { getAction } from "viem/utils";
import { getLogs as viem_getLogs } from "viem/actions";
import { getLogsFromLoadBalancedRpc } from "./getLogsFromLoadBalancedRpc";
import { blockNotFoundMessage } from "./common";
import { HttpRpcUrl, blockNotFoundMessage } from "./common";

export type FetchLogsOptions<abiEvents extends readonly AbiEvent[]> = {
/**
Expand Down Expand Up @@ -58,7 +58,7 @@ export type FetchLogsOptions<abiEvents extends readonly AbiEvent[]> = {
/**
* The HTTP URL of the load balanced RPC.
*/
rpcUrl: string;
httpRpcUrl: HttpRpcUrl;
/**
* Optional contract address(es) to fetch logs for.
*/
Expand Down Expand Up @@ -140,7 +140,7 @@ export async function* fetchLogs<abiEvents extends readonly AbiEvent[]>({
? async (blockRange): Promise<GetLogsReturnType<undefined, abiEvents, true, BlockNumber, BlockNumber>> =>
getLogsFromLoadBalancedRpc({
...blockRange,
rpcUrl: opts.rpcUrl,
httpRpcUrl: opts.httpRpcUrl,
address: opts.address,
events: opts.events,
strict: true,
Expand Down
12 changes: 6 additions & 6 deletions packages/block-logs-stream/src/getLogsFromLoadBalancedRpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
toHex,
Block,
} from "viem";
import { blockNotFoundMessage } from "./common";
import { HttpRpcUrl, blockNotFoundMessage } from "./common";

/**
* Mostly equivalent to viem's `getLogs` action, but using a batch rpc call to check if the RPC has the requested block.
Expand All @@ -30,17 +30,17 @@ export async function getLogsFromLoadBalancedRpc<
fromBlock extends BlockNumber | BlockTag | undefined = undefined,
toBlock extends BlockNumber | BlockTag | undefined = undefined,
>({
rpcUrl,
httpRpcUrl,
address,
fromBlock,
toBlock,
event,
events: events_,
args,
strict: strict_,
}: Omit<GetLogsParameters<abiEvent, abiEvents, strict, fromBlock, toBlock>, "blockHash"> & { rpcUrl: string }): Promise<
GetLogsReturnType<abiEvent, abiEvents, strict, fromBlock, toBlock>
> {
}: Omit<GetLogsParameters<abiEvent, abiEvents, strict, fromBlock, toBlock>, "blockHash"> & {
httpRpcUrl: HttpRpcUrl;
}): Promise<GetLogsReturnType<abiEvent, abiEvents, strict, fromBlock, toBlock>> {
const strict = strict_ ?? false;
const events = events_ ?? (event ? [event] : undefined);

Expand Down Expand Up @@ -80,7 +80,7 @@ export async function getLogsFromLoadBalancedRpc<
},
];

const results: [Block | undefined, RpcLog[]] = await fetch(rpcUrl, {
const results: [Block | undefined, RpcLog[]] = await fetch(httpRpcUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
Expand Down
1 change: 1 addition & 0 deletions packages/block-logs-stream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ export * from "./fetchBlockLogs";
export * from "./fetchLogs";
export * from "./groupLogsByBlockNumber";
export * from "./getLogsFromLoadBalancedRpc";
export * from "./common";
29 changes: 22 additions & 7 deletions packages/store-sync/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import storeConfig from "@latticexyz/store/mud.config";
import worldConfig from "@latticexyz/world/mud.config";
import { Table as ConfigTable, Schema } from "@latticexyz/config";
import { configToTables } from "./configToTables";
import { HttpRpcUrl } from "@latticexyz/block-logs-stream";

export const mudTables = {
...configToTables(storeConfig),
Expand Down Expand Up @@ -66,12 +67,6 @@ export type SyncFilter = {
};

export type SyncOptions = {
/**
* [viem `PublicClient`][0] used for fetching logs from the RPC.
*
* [0]: https://viem.sh/docs/clients/public.html
*/
publicClient: PublicClient;
/**
* MUD Store/World contract address
*/
Expand Down Expand Up @@ -115,7 +110,27 @@ export type SyncOptions = {
blockNumber: bigint;
logs: readonly StorageAdapterLog[];
};
};
} & (
| {
/**
* [viem `PublicClient`][0] used for fetching logs from the RPC.
*
* [0]: https://viem.sh/docs/clients/public.html
*/
publicClient: PublicClient;
}
| {
/**
* Explicitly handle potentially unsynced load balanced RPCs by using a batch call with `eth_getLogs` and `eth_getBlockByNumber`.
* See https://indexsupply.com/shovel/docs/#unsynchronized-ethereum-nodes
*/
handleUnsyncedLoadBalancedRpc: true;
/**
* The HTTP URL of the load balanced RPC.
*/
httpRpcUrl: HttpRpcUrl;
}
);

export type WaitForTransactionResult = Pick<TransactionReceipt, "blockNumber" | "status" | "transactionHash">;

Expand Down
9 changes: 6 additions & 3 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { storeEventsAbi } from "@latticexyz/store";
import { GetTransactionReceiptErrorType, Hex } from "viem";
import { GetTransactionReceiptErrorType, Hex, createPublicClient, http } from "viem";
import {
StorageAdapter,
StorageAdapterBlock,
Expand Down Expand Up @@ -59,7 +59,6 @@ type CreateStoreSyncOptions = SyncOptions & {
export async function createStoreSync({
storageAdapter,
onProgress,
publicClient,
address,
filters: initialFilters = [],
tableIds = [],
Expand All @@ -69,7 +68,11 @@ export async function createStoreSync({
initialState,
initialBlockLogs,
indexerUrl: initialIndexerUrl,
...opts
}: CreateStoreSyncOptions): Promise<SyncResult> {
const publicClient =
"publicClient" in opts ? opts.publicClient : createPublicClient({ transport: http(opts.httpRpcUrl) });

const filters: SyncFilter[] =
initialFilters.length || tableIds.length
? [...initialFilters, ...tableIds.map((tableId) => ({ tableId })), ...defaultFilters]
Expand Down Expand Up @@ -257,7 +260,6 @@ export async function createStoreSync({
map(([startBlock, endBlock]) => ({ startBlock, endBlock })),
concatMap((range) => {
const storedBlocks = fetchAndStoreLogs({
publicClient,
address,
events: storeEventsAbi,
maxBlockRange,
Expand All @@ -267,6 +269,7 @@ export async function createStoreSync({
toBlock: range.endBlock,
logFilter,
storageAdapter,
...opts,
});

return from(storedBlocks);
Expand Down

0 comments on commit 48c2d9c

Please sign in to comment.