Skip to content

Commit

Permalink
feat(cli): paginate world deploy logs (#3217)
Browse files Browse the repository at this point in the history
  • Loading branch information
holic authored Sep 23, 2024
1 parent 7c7bdb2 commit 0f5b291
Show file tree
Hide file tree
Showing 14 changed files with 150 additions and 180 deletions.
5 changes: 5 additions & 0 deletions .changeset/good-crabs-trade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latticexyz/cli": patch
---

When deploying to an existing world, the deployer now paginates with [`fetchLogs`](https://github.com/latticexyz/mud/blob/main/packages/block-logs-stream/src/fetchLogs.ts) to find the world deployment.
7 changes: 7 additions & 0 deletions .changeset/large-windows-sort.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@latticexyz/block-logs-stream": patch
---

- For block range size errors, `fetchLogs` now reduces the max block range for subsequent requests in its loop. For block out of range or response size errors, only the current request's block range is reduced until the request succeeds, then it resets to the max block range.
- Added `fetchBlockLogs` to find all matching logs of the given block range, grouped by block number, in a single async call.
- Loosened the `publicClient` type and switched to tree shakable actions.
5 changes: 3 additions & 2 deletions packages/block-logs-stream/src/blockRangeToLogs.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { blockRangeToLogs } from "./blockRangeToLogs";
import { Subject, lastValueFrom, map, toArray } from "rxjs";
import { EIP1193RequestFn, RpcLog, Transport, createPublicClient, createTransport } from "viem";
import { EIP1193RequestFn, RpcLog, Transport, createClient, createTransport } from "viem";
import { wait } from "@latticexyz/common/utils";

vi.useFakeTimers();
Expand All @@ -14,9 +14,10 @@ const mockTransport: Transport = () =>
// eslint-disable-next-line @typescript-eslint/no-explicit-any
request: mockedTransportRequest as any,
type: "mock",
retryCount: 0,
});

const publicClient = createPublicClient({
const publicClient = createClient({
transport: mockTransport,
});

Expand Down
20 changes: 10 additions & 10 deletions packages/block-logs-stream/src/blockRangeToLogs.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
import { EMPTY, OperatorFunction, concatMap, from, pipe, tap } from "rxjs";
import { FetchLogsResult, fetchLogs } from "./fetchLogs";
import { AbiEvent } from "abitype";
import { Address, BlockNumber, PublicClient } from "viem";
import { Address, BlockNumber, Client } from "viem";
import { debug } from "./debug";

export type BlockRangeToLogsOptions<TAbiEvents extends readonly AbiEvent[]> = {
export type BlockRangeToLogsOptions<abiEvents extends readonly AbiEvent[]> = {
/**
* [viem `PublicClient`][0] used for fetching logs from the RPC.
* [viem `Client`][0] used for fetching logs from the RPC.
*
* [0]: https://viem.sh/docs/clients/public.html
*/
publicClient: PublicClient;
publicClient: Client;
/**
* Optional contract address(es) to fetch logs for.
*/
address?: Address | Address[];
/**
* Events to fetch logs for.
*/
events: TAbiEvents;
events: abiEvents;
/**
* Optional maximum block range, if your RPC limits the amount of blocks fetched at a time.
*/
maxBlockRange?: bigint;
};

export type BlockRangeToLogsResult<TAbiEvents extends readonly AbiEvent[]> = OperatorFunction<
export type BlockRangeToLogsResult<abiEvents extends readonly AbiEvent[]> = OperatorFunction<
{ startBlock: BlockNumber; endBlock: BlockNumber },
FetchLogsResult<TAbiEvents>
FetchLogsResult<abiEvents>
>;

/**
Expand All @@ -38,12 +38,12 @@ export type BlockRangeToLogsResult<TAbiEvents extends readonly AbiEvent[]> = Ope
* @param {BlockRangeToLogsOptions<AbiEvent[]>} options See `BlockRangeToLogsOptions`.
* @returns {BlockRangeToLogsResult<AbiEvent[]>} An operator function that transforms a stream of block ranges into a stream of fetched logs.
*/
export function blockRangeToLogs<TAbiEvents extends readonly AbiEvent[]>({
export function blockRangeToLogs<abiEvents extends readonly AbiEvent[]>({
publicClient,
address,
events,
maxBlockRange,
}: BlockRangeToLogsOptions<TAbiEvents>): BlockRangeToLogsResult<TAbiEvents> {
}: BlockRangeToLogsOptions<abiEvents>): BlockRangeToLogsResult<abiEvents> {
let fromBlock: bigint;
let toBlock: bigint;

Expand All @@ -56,7 +56,7 @@ export function blockRangeToLogs<TAbiEvents extends readonly AbiEvent[]>({
// so it always uses the latest `toBlock` value.
concatMap(() => {
if (fromBlock > toBlock) return EMPTY;
debug("fetching logs for block range", { fromBlock, toBlock });
debug(`fetching logs for block range ${fromBlock}-${toBlock}`);
return from(
fetchLogs({
publicClient,
Expand Down
22 changes: 14 additions & 8 deletions packages/block-logs-stream/src/createBlockStream.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import { Observable } from "rxjs";
import type { Block, BlockTag, PublicClient } from "viem";
import type { Block, BlockTag, Client } from "viem";
import { watchBlocks } from "viem/actions";
import { getAction } from "viem/utils";

export type CreateBlockStreamOptions<TBlockTag extends BlockTag> = {
publicClient: PublicClient;
blockTag: TBlockTag;
export type CreateBlockStreamOptions<blockTag extends BlockTag> = {
publicClient: Client;
blockTag: blockTag;
};

export type CreateBlockStreamResult<TBlockTag extends BlockTag> = Observable<Block<bigint, false, TBlockTag>>;
export type CreateBlockStreamResult<blockTag extends BlockTag> = Observable<Block<bigint, false, blockTag>>;

export function createBlockStream<TBlockTag extends BlockTag>({
export function createBlockStream<blockTag extends BlockTag>({
publicClient,
blockTag,
}: CreateBlockStreamOptions<TBlockTag>): CreateBlockStreamResult<TBlockTag> {
}: CreateBlockStreamOptions<blockTag>): CreateBlockStreamResult<blockTag> {
return new Observable(function subscribe(subscriber) {
return publicClient.watchBlocks({
return getAction(
publicClient,
watchBlocks,
"watchBlocks",
)({
blockTag,
emitOnBegin: true,
onBlock: (block) => subscriber.next(block),
Expand Down
26 changes: 26 additions & 0 deletions packages/block-logs-stream/src/fetchBlockLogs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { AbiEvent } from "viem";
import { GroupLogsByBlockNumberResult, groupLogsByBlockNumber } from "./groupLogsByBlockNumber";
import { FetchLogsOptions, FetchLogsResult, fetchLogs } from "./fetchLogs";
import { iteratorToArray } from "@latticexyz/common/utils";

/**
* Fetches all logs from the blockchain for the given range, grouped by block number.
*
* @remarks
* The function will fetch logs according to the given options.
* If the function encounters rate limits, it will retry until `maxRetryCount` is reached.
* If the function encounters a block range that is too large, it will half the block range and retry, until the block range can't be halved anymore.
*
* @param {FetchLogsOptions<AbiEvent[]>} options See `FetchLogsOptions`.
*
* @returns {GroupLogsByBlockNumberResult} See `GroupLogsByBlockNumberResult`.
*
* @throws Will throw an error if the block range can't be reduced any further.
*/
export async function fetchBlockLogs<abiEvents extends readonly AbiEvent[]>(
opts: FetchLogsOptions<abiEvents>,
): Promise<GroupLogsByBlockNumberResult<FetchLogsResult<abiEvents>["logs"][number]>> {
const fetchedLogs = await iteratorToArray(fetchLogs(opts));
const logs = fetchedLogs.flatMap(({ logs }) => logs);
return groupLogsByBlockNumber(logs);
}
123 changes: 7 additions & 116 deletions packages/block-logs-stream/src/fetchLogs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
RpcLog,
RpcRequestError,
Transport,
createPublicClient,
createClient,
createTransport,
hexToNumber,
} from "viem";
Expand All @@ -19,9 +19,10 @@ const mockTransport: Transport = () =>
// eslint-disable-next-line @typescript-eslint/no-explicit-any
request: mockedTransportRequest as any,
type: "mock",
retryCount: 0,
});

const publicClient = createPublicClient({
const publicClient = createClient({
transport: mockTransport,
});

Expand Down Expand Up @@ -175,36 +176,6 @@ describe("fetchLogs", () => {

expect(requests).toMatchInlineSnapshot(`
[
[
{
"address": "0x",
"fromBlock": "0x0",
"toBlock": "0x3e8",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x0",
"toBlock": "0x3e8",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x0",
"toBlock": "0x3e8",
"topics": [
[],
],
},
],
[
{
"address": "0x",
Expand All @@ -225,46 +196,6 @@ describe("fetchLogs", () => {
],
},
],
[
{
"address": "0x",
"fromBlock": "0x1f5",
"toBlock": "0x5dd",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x1f5",
"toBlock": "0x5dd",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x1f5",
"toBlock": "0x5dd",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x1f5",
"toBlock": "0x5dd",
"topics": [
[],
],
},
],
[
{
"address": "0x",
Expand All @@ -279,47 +210,7 @@ describe("fetchLogs", () => {
{
"address": "0x",
"fromBlock": "0x3ea",
"toBlock": "0x7d0",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x3ea",
"toBlock": "0x7d0",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x3ea",
"toBlock": "0x7d0",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x3ea",
"toBlock": "0x7d0",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x3ea",
"toBlock": "0x5dd",
"toBlock": "0x5de",
"topics": [
[],
],
Expand All @@ -328,7 +219,7 @@ describe("fetchLogs", () => {
[
{
"address": "0x",
"fromBlock": "0x5de",
"fromBlock": "0x5df",
"toBlock": "0x7d0",
"topics": [
[],
Expand All @@ -353,10 +244,10 @@ describe("fetchLogs", () => {
{
"fromBlock": 1002n,
"logs": [],
"toBlock": 1501n,
"toBlock": 1502n,
},
{
"fromBlock": 1502n,
"fromBlock": 1503n,
"logs": [],
"toBlock": 2000n,
},
Expand Down
Loading

0 comments on commit 0f5b291

Please sign in to comment.