Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): paginate world deploy logs #3217

Merged
merged 8 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/good-crabs-trade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latticexyz/cli": patch
---

When deploying to an existing world, the deployer now paginates with [`fetchLogs`](https://github.com/latticexyz/mud/blob/main/packages/block-logs-stream/src/fetchLogs.ts) to find the world deployment.
7 changes: 7 additions & 0 deletions .changeset/large-windows-sort.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@latticexyz/block-logs-stream": patch
---

- For block range size errors, `fetchLogs` now reduces the max block range for subsequent requests in its loop. For block out of range or response size errors, only the current request's block range is reduced until the request succeeds, then it resets to the max block range.
- Added `fetchBlockLogs` to request all block logs in one async call rather than an async generator.
holic marked this conversation as resolved.
Show resolved Hide resolved
- Loosened the `publicClient` type and switched to tree shakable actions.
5 changes: 3 additions & 2 deletions packages/block-logs-stream/src/blockRangeToLogs.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { blockRangeToLogs } from "./blockRangeToLogs";
import { Subject, lastValueFrom, map, toArray } from "rxjs";
import { EIP1193RequestFn, RpcLog, Transport, createPublicClient, createTransport } from "viem";
import { EIP1193RequestFn, RpcLog, Transport, createClient, createTransport } from "viem";
import { wait } from "@latticexyz/common/utils";

vi.useFakeTimers();
Expand All @@ -14,9 +14,10 @@ const mockTransport: Transport = () =>
// eslint-disable-next-line @typescript-eslint/no-explicit-any
request: mockedTransportRequest as any,
type: "mock",
retryCount: 0,
});

const publicClient = createPublicClient({
const publicClient = createClient({
transport: mockTransport,
});

Expand Down
20 changes: 10 additions & 10 deletions packages/block-logs-stream/src/blockRangeToLogs.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
import { EMPTY, OperatorFunction, concatMap, from, pipe, tap } from "rxjs";
import { FetchLogsResult, fetchLogs } from "./fetchLogs";
import { AbiEvent } from "abitype";
import { Address, BlockNumber, PublicClient } from "viem";
import { Address, BlockNumber, Client } from "viem";
import { debug } from "./debug";

export type BlockRangeToLogsOptions<TAbiEvents extends readonly AbiEvent[]> = {
export type BlockRangeToLogsOptions<abiEvents extends readonly AbiEvent[]> = {
/**
* [viem `PublicClient`][0] used for fetching logs from the RPC.
* [viem `Client`][0] used for fetching logs from the RPC.
*
* [0]: https://viem.sh/docs/clients/public.html
*/
publicClient: PublicClient;
publicClient: Client;
/**
* Optional contract address(es) to fetch logs for.
*/
address?: Address | Address[];
/**
* Events to fetch logs for.
*/
events: TAbiEvents;
events: abiEvents;
/**
* Optional maximum block range, if your RPC limits the amount of blocks fetched at a time.
*/
maxBlockRange?: bigint;
};

export type BlockRangeToLogsResult<TAbiEvents extends readonly AbiEvent[]> = OperatorFunction<
export type BlockRangeToLogsResult<abiEvents extends readonly AbiEvent[]> = OperatorFunction<
{ startBlock: BlockNumber; endBlock: BlockNumber },
FetchLogsResult<TAbiEvents>
FetchLogsResult<abiEvents>
>;

/**
Expand All @@ -38,12 +38,12 @@ export type BlockRangeToLogsResult<TAbiEvents extends readonly AbiEvent[]> = Ope
* @param {BlockRangeToLogsOptions<AbiEvent[]>} options See `BlockRangeToLogsOptions`.
* @returns {BlockRangeToLogsResult<AbiEvent[]>} An operator function that transforms a stream of block ranges into a stream of fetched logs.
*/
export function blockRangeToLogs<TAbiEvents extends readonly AbiEvent[]>({
export function blockRangeToLogs<abiEvents extends readonly AbiEvent[]>({
publicClient,
address,
events,
maxBlockRange,
}: BlockRangeToLogsOptions<TAbiEvents>): BlockRangeToLogsResult<TAbiEvents> {
}: BlockRangeToLogsOptions<abiEvents>): BlockRangeToLogsResult<abiEvents> {
let fromBlock: bigint;
let toBlock: bigint;

Expand All @@ -56,7 +56,7 @@ export function blockRangeToLogs<TAbiEvents extends readonly AbiEvent[]>({
// so it always uses the latest `toBlock` value.
concatMap(() => {
if (fromBlock > toBlock) return EMPTY;
debug("fetching logs for block range", { fromBlock, toBlock });
debug(`fetching logs for block range ${fromBlock}-${toBlock}`);
return from(
fetchLogs({
publicClient,
Expand Down
22 changes: 14 additions & 8 deletions packages/block-logs-stream/src/createBlockStream.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import { Observable } from "rxjs";
import type { Block, BlockTag, PublicClient } from "viem";
import type { Block, BlockTag, Client } from "viem";
import { watchBlocks } from "viem/actions";
import { getAction } from "viem/utils";

export type CreateBlockStreamOptions<TBlockTag extends BlockTag> = {
publicClient: PublicClient;
blockTag: TBlockTag;
export type CreateBlockStreamOptions<blockTag extends BlockTag> = {
publicClient: Client;
blockTag: blockTag;
};

export type CreateBlockStreamResult<TBlockTag extends BlockTag> = Observable<Block<bigint, false, TBlockTag>>;
export type CreateBlockStreamResult<blockTag extends BlockTag> = Observable<Block<bigint, false, blockTag>>;

export function createBlockStream<TBlockTag extends BlockTag>({
export function createBlockStream<blockTag extends BlockTag>({
publicClient,
blockTag,
}: CreateBlockStreamOptions<TBlockTag>): CreateBlockStreamResult<TBlockTag> {
}: CreateBlockStreamOptions<blockTag>): CreateBlockStreamResult<blockTag> {
return new Observable(function subscribe(subscriber) {
return publicClient.watchBlocks({
return getAction(
publicClient,
watchBlocks,
"watchBlocks",
)({
blockTag,
emitOnBegin: true,
onBlock: (block) => subscriber.next(block),
Expand Down
25 changes: 25 additions & 0 deletions packages/block-logs-stream/src/fetchBlockLogs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { AbiEvent, Log } from "viem";
import { GroupLogsByBlockNumberResult, groupLogsByBlockNumber } from "./groupLogsByBlockNumber";
import { FetchLogsOptions, FetchLogsResult, fetchLogs } from "./fetchLogs";

/**
* Fetches all logs from the blockchain for the given range, grouped by block number.
*
* @remarks
* The function will fetch logs according to the given options.
* If the function encounters rate limits, it will retry until `maxRetryCount` is reached.
* If the function encounters a block range that is too large, it will half the block range and retry, until the block range can't be halved anymore.
*
* @param {FetchLogsOptions<AbiEvent[]>} options See `FetchLogsOptions`.
*
* @throws Will throw an error if the block range can't be reduced any further.
*/
export async function fetchBlockLogs<abiEvents extends readonly AbiEvent[]>(
opts: FetchLogsOptions<abiEvents>,
): Promise<GroupLogsByBlockNumberResult<Log<bigint, number, false, undefined, true, abiEvents, undefined>>> {
const results: FetchLogsResult<abiEvents>[] = [];
for await (const result of fetchLogs(opts)) {
results.push(result);
}
return groupLogsByBlockNumber(results.flatMap((result) => result.logs));
}
123 changes: 7 additions & 116 deletions packages/block-logs-stream/src/fetchLogs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
RpcLog,
RpcRequestError,
Transport,
createPublicClient,
createClient,
createTransport,
hexToNumber,
} from "viem";
Expand All @@ -19,9 +19,10 @@ const mockTransport: Transport = () =>
// eslint-disable-next-line @typescript-eslint/no-explicit-any
request: mockedTransportRequest as any,
type: "mock",
retryCount: 0,
});

const publicClient = createPublicClient({
const publicClient = createClient({
transport: mockTransport,
});

Expand Down Expand Up @@ -175,36 +176,6 @@ describe("fetchLogs", () => {

expect(requests).toMatchInlineSnapshot(`
[
[
{
"address": "0x",
"fromBlock": "0x0",
"toBlock": "0x3e8",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x0",
"toBlock": "0x3e8",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x0",
"toBlock": "0x3e8",
"topics": [
[],
],
},
],
[
{
"address": "0x",
Expand All @@ -225,46 +196,6 @@ describe("fetchLogs", () => {
],
},
],
[
{
"address": "0x",
"fromBlock": "0x1f5",
"toBlock": "0x5dd",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x1f5",
"toBlock": "0x5dd",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x1f5",
"toBlock": "0x5dd",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x1f5",
"toBlock": "0x5dd",
"topics": [
[],
],
},
],
[
{
"address": "0x",
Expand All @@ -279,47 +210,7 @@ describe("fetchLogs", () => {
{
"address": "0x",
"fromBlock": "0x3ea",
"toBlock": "0x7d0",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x3ea",
"toBlock": "0x7d0",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x3ea",
"toBlock": "0x7d0",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x3ea",
"toBlock": "0x7d0",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x3ea",
"toBlock": "0x5dd",
"toBlock": "0x5de",
"topics": [
[],
],
Expand All @@ -328,7 +219,7 @@ describe("fetchLogs", () => {
[
{
"address": "0x",
"fromBlock": "0x5de",
"fromBlock": "0x5df",
"toBlock": "0x7d0",
"topics": [
[],
Expand All @@ -353,10 +244,10 @@ describe("fetchLogs", () => {
{
"fromBlock": 1002n,
"logs": [],
"toBlock": 1501n,
"toBlock": 1502n,
},
{
"fromBlock": 1502n,
"fromBlock": 1503n,
"logs": [],
"toBlock": 2000n,
},
Expand Down
Loading
Loading