Skip to content

Commit

Permalink
feat(cli,store): fetch table-specific logs (#3245)
Browse files Browse the repository at this point in the history
  • Loading branch information
holic authored Oct 6, 2024
1 parent 8fa3c6a commit 7ddcf64
Show file tree
Hide file tree
Showing 25 changed files with 949 additions and 139 deletions.
5 changes: 5 additions & 0 deletions .changeset/early-colts-smile.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latticexyz/block-logs-stream": patch
---

`fetchLogs` and `blockRangeToLogs` now accept a `getLogs` option to override the default behavior.
5 changes: 5 additions & 0 deletions .changeset/gold-mangos-sneeze.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latticexyz/store": patch
---

Added `getStoreLogs` and `flattenStoreLogs` to aid in fetching data from store contracts. For now, these are internal exports and considered unstable/experimental.
5 changes: 5 additions & 0 deletions .changeset/nervous-clouds-collect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latticexyz/cli": patch
---

Deployer now has a better method for fetching store logs from the world that should be more efficient and resilient to block range errors and rate limiting.
12 changes: 12 additions & 0 deletions .changeset/ten-llamas-protect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
"@latticexyz/common": patch
---

Added `logSort` method to help when sorting logs fetched from RPC, where they come back ordered relative to the topics used.

```ts
import { logSort } from "@latticexyz/common";

const logs = getLogs(...);
logs.sort(logSort);
```
48 changes: 10 additions & 38 deletions packages/block-logs-stream/src/blockRangeToLogs.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,13 @@
import { EMPTY, OperatorFunction, concatMap, from, pipe, tap } from "rxjs";
import { FetchLogsResult, fetchLogs } from "./fetchLogs";
import { FetchLogsOptions, FetchLogsResult, fetchLogs } from "./fetchLogs";
import { AbiEvent } from "abitype";
import { Address, BlockNumber, Client } from "viem";
import { BlockNumber, UnionOmit } from "viem";
import { debug } from "./debug";

export type BlockRangeToLogsOptions<abiEvents extends readonly AbiEvent[]> = {
/**
* [viem `Client`][0] used for fetching logs from the RPC.
*
* [0]: https://viem.sh/docs/clients/public.html
*/
publicClient: Client;
/**
* Optional contract address(es) to fetch logs for.
*/
address?: Address | Address[];
/**
* Events to fetch logs for.
*/
events: abiEvents;
/**
* Optional maximum block range, if your RPC limits the amount of blocks fetched at a time.
*/
maxBlockRange?: bigint;
};
export type BlockRangeToLogsOptions<abiEvents extends readonly AbiEvent[]> = UnionOmit<
FetchLogsOptions<abiEvents>,
"fromBlock" | "toBlock"
>;

export type BlockRangeToLogsResult<abiEvents extends readonly AbiEvent[]> = OperatorFunction<
{ startBlock: BlockNumber; endBlock: BlockNumber },
Expand All @@ -38,12 +22,9 @@ export type BlockRangeToLogsResult<abiEvents extends readonly AbiEvent[]> = Oper
* @param {BlockRangeToLogsOptions<AbiEvent[]>} options See `BlockRangeToLogsOptions`.
* @returns {BlockRangeToLogsResult<AbiEvent[]>} An operator function that transforms a stream of block ranges into a stream of fetched logs.
*/
export function blockRangeToLogs<abiEvents extends readonly AbiEvent[]>({
publicClient,
address,
events,
maxBlockRange,
}: BlockRangeToLogsOptions<abiEvents>): BlockRangeToLogsResult<abiEvents> {
export function blockRangeToLogs<abiEvents extends readonly AbiEvent[]>(
opts: BlockRangeToLogsOptions<abiEvents>,
): BlockRangeToLogsResult<abiEvents> {
let fromBlock: bigint;
let toBlock: bigint;

Expand All @@ -57,16 +38,7 @@ export function blockRangeToLogs<abiEvents extends readonly AbiEvent[]>({
concatMap(() => {
if (fromBlock > toBlock) return EMPTY;
debug(`fetching logs for block range ${fromBlock}-${toBlock}`);
return from(
fetchLogs({
publicClient,
address,
events,
fromBlock,
toBlock,
maxBlockRange,
}),
).pipe(
return from(fetchLogs<abiEvents>({ ...opts, fromBlock, toBlock })).pipe(
tap(({ toBlock }) => {
fromBlock = toBlock + 1n;
}),
Expand Down
74 changes: 46 additions & 28 deletions packages/block-logs-stream/src/fetchLogs.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,11 @@
import { AbiEvent } from "abitype";
import { Address, Client, BlockNumber, GetLogsReturnType } from "viem";
import { Address, Client, BlockNumber, GetLogsReturnType, OneOf } from "viem";
import { bigIntMin, wait } from "@latticexyz/common/utils";
import { debug } from "./debug";
import { getAction } from "viem/utils";
import { getLogs } from "viem/actions";
import { getLogs as viem_getLogs } from "viem/actions";

export type FetchLogsOptions<abiEvents extends readonly AbiEvent[]> = {
/**
* [viem `Client`][0] used for fetching logs from the RPC.
*
* [0]: https://viem.sh/docs/clients/public.html
*/
publicClient: Client;
/**
* Optional contract address(es) to fetch logs for.
*/
address?: Address | Address[];
/**
* Events to fetch logs for.
*/
events: abiEvents;
/**
* The block number to start fetching logs from (inclusive).
*/
Expand All @@ -36,7 +22,33 @@ export type FetchLogsOptions<abiEvents extends readonly AbiEvent[]> = {
* Optional maximum amount of retries if the RPC returns a rate limit error. Defaults to 3.
*/
maxRetryCount?: number;
};
} & OneOf<
| {
/**
* Async function to return logs for the given block range.
*/
getLogs: (args: {
fromBlock: bigint;
toBlock: bigint;
}) => Promise<GetLogsReturnType<undefined, abiEvents, true, BlockNumber, BlockNumber>>;
}
| {
/**
* [viem `Client`][0] used for fetching logs from the RPC.
*
* [0]: https://viem.sh/docs/clients/public.html
*/
publicClient: Client;
/**
* Optional contract address(es) to fetch logs for.
*/
address?: Address | Address[];
/**
* Events to fetch logs for.
*/
events: abiEvents;
}
>;

export type FetchLogsResult<abiEvents extends readonly AbiEvent[]> = {
fromBlock: BlockNumber;
Expand Down Expand Up @@ -96,25 +108,31 @@ const BLOCK_RANGE_ERRORS = [
export async function* fetchLogs<abiEvents extends readonly AbiEvent[]>({
maxBlockRange = 1000n,
maxRetryCount = 3,
publicClient,
...getLogsOpts
fromBlock: initialFromBlock,
toBlock: initialToBlock,
...opts
}: FetchLogsOptions<abiEvents>): AsyncGenerator<FetchLogsResult<abiEvents>> {
let fromBlock = getLogsOpts.fromBlock;
let blockRange = bigIntMin(maxBlockRange, getLogsOpts.toBlock - fromBlock);
const getLogs =
opts.getLogs ??
(async (blockRange): Promise<GetLogsReturnType<undefined, abiEvents, true, BlockNumber, BlockNumber>> =>
getAction(
opts.publicClient,
viem_getLogs,
"getLogs",
)({ ...blockRange, address: opts.address, events: opts.events, strict: true }));

let fromBlock = initialFromBlock;
let blockRange = bigIntMin(maxBlockRange, initialToBlock - fromBlock);
let retryCount = 0;

while (fromBlock <= getLogsOpts.toBlock) {
while (fromBlock <= initialToBlock) {
try {
const toBlock = fromBlock + blockRange;
debug(`getting logs for blocks ${fromBlock}-${toBlock} (${blockRange} blocks, ${maxBlockRange} max)`);
const logs = await getAction(
publicClient,
getLogs,
"getLogs",
)({ ...getLogsOpts, fromBlock, toBlock, strict: true });
const logs = await getLogs({ fromBlock, toBlock });
yield { fromBlock, toBlock, logs };
fromBlock = toBlock + 1n;
blockRange = bigIntMin(maxBlockRange, getLogsOpts.toBlock - fromBlock);
blockRange = bigIntMin(maxBlockRange, initialToBlock - fromBlock);
retryCount = 0;
} catch (error: unknown) {
if (!(error instanceof Error)) throw error;
Expand Down
23 changes: 7 additions & 16 deletions packages/block-logs-stream/src/groupLogsByBlockNumber.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { BlockNumber } from "viem";
import { bigIntSort, isDefined } from "@latticexyz/common/utils";
import { logSort } from "@latticexyz/common";
import { bigIntSort, groupBy } from "@latticexyz/common/utils";

type PartialLog = { readonly blockNumber: bigint; readonly logIndex: number };

Expand Down Expand Up @@ -29,22 +30,12 @@ export function groupLogsByBlockNumber<log extends PartialLog>(
const blockNumbers = Array.from(new Set(logs.map((log) => log.blockNumber)));
blockNumbers.sort(bigIntSort);

const groupedBlocks = blockNumbers
.map((blockNumber) => {
const blockLogs = logs.filter((log) => log.blockNumber === blockNumber);
if (!blockLogs.length) return;
blockLogs.sort((a, b) => (a.logIndex < b.logIndex ? -1 : a.logIndex > b.logIndex ? 1 : 0));
const sortedLogs = logs.slice().sort(logSort);
const groupedBlocks = Array.from(groupBy(sortedLogs, (log) => log.blockNumber).entries())
.map(([blockNumber, logs]) => ({ blockNumber, logs }))
.filter((block) => block.logs.length > 0);

if (!blockLogs.length) return;

return {
blockNumber,
logs: blockLogs,
};
})
.filter(isDefined);

const lastBlockNumber = blockNumbers.length > 0 ? blockNumbers[blockNumbers.length - 1] : null;
const lastBlockNumber = blockNumbers.at(-1);

if (toBlock != null && (lastBlockNumber == null || toBlock > lastBlockNumber)) {
groupedBlocks.push({
Expand Down
26 changes: 13 additions & 13 deletions packages/cli/src/deploy/getResourceAccess.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { Client, parseAbiItem, Hex, Address, getAddress } from "viem";
import { Client, Hex, Address, getAddress } from "viem";
import { WorldDeploy } from "./common";
import { debug } from "./debug";
import { storeSpliceStaticDataEvent } from "@latticexyz/store";
import { getLogs } from "viem/actions";
import { decodeKey, getKeySchema, getSchemaTypes } from "@latticexyz/protocol-parser/internal";
import { getTableValue } from "./getTableValue";
import worldConfig from "@latticexyz/world/mud.config";
import { fetchBlockLogs } from "@latticexyz/block-logs-stream";
import { flattenStoreLogs, getStoreLogs } from "@latticexyz/store/internal";

export async function getResourceAccess({
client,
Expand All @@ -14,21 +14,21 @@ export async function getResourceAccess({
readonly client: Client;
readonly worldDeploy: WorldDeploy;
}): Promise<readonly { readonly resourceId: Hex; readonly address: Address }[]> {
// This assumes we only use `ResourceAccess._set(...)`, which is true as of this writing.
// TODO: PR to viem's getLogs to accept topics array so we can filter on all store events and quickly recreate this table's current state

debug("looking up resource access for", worldDeploy.address);

const logs = await getLogs(client, {
strict: true,
const blockLogs = await fetchBlockLogs({
fromBlock: worldDeploy.deployBlock,
toBlock: worldDeploy.stateBlock,
address: worldDeploy.address,
// our usage of `ResourceAccess._set(...)` emits a splice instead of set record
// TODO: https://github.com/latticexyz/mud/issues/479
event: parseAbiItem(storeSpliceStaticDataEvent),
args: { tableId: worldConfig.namespaces.world.tables.ResourceAccess.tableId },
async getLogs({ fromBlock, toBlock }) {
return getStoreLogs(client, {
address: worldDeploy.address,
fromBlock,
toBlock,
tableId: worldConfig.namespaces.world.tables.ResourceAccess.tableId,
});
},
});
const logs = flattenStoreLogs(blockLogs.flatMap((block) => block.logs));

const keys = logs.map((log) =>
decodeKey(getSchemaTypes(getKeySchema(worldConfig.namespaces.world.tables.ResourceAccess)), log.args.keyTuple),
Expand Down
43 changes: 16 additions & 27 deletions packages/cli/src/deploy/getResourceIds.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { Client, parseAbiItem, Hex, HttpRequestError } from "viem";
import { getLogs } from "viem/actions";
import { storeSpliceStaticDataEvent } from "@latticexyz/store";
import { Client, Hex } from "viem";
import { flattenStoreLogs, getStoreLogs } from "@latticexyz/store/internal";
import { WorldDeploy } from "./common";
import { debug } from "./debug";
import pRetry from "p-retry";
import storeConfig from "@latticexyz/store/mud.config";
import { fetchBlockLogs } from "@latticexyz/block-logs-stream";

export async function getResourceIds({
client,
Expand All @@ -13,32 +12,22 @@ export async function getResourceIds({
readonly client: Client;
readonly worldDeploy: WorldDeploy;
}): Promise<readonly Hex[]> {
// This assumes we only use `ResourceIds._setExists(true)`, which is true as of this writing.
// TODO: PR to viem's getLogs to accept topics array so we can filter on all store events and quickly recreate this table's current state

debug("looking up resource IDs for", worldDeploy.address);
const logs = await pRetry(
() =>
getLogs(client, {
strict: true,
address: worldDeploy.address,
fromBlock: worldDeploy.deployBlock,
toBlock: worldDeploy.stateBlock,
event: parseAbiItem(storeSpliceStaticDataEvent),
args: { tableId: storeConfig.namespaces.store.tables.ResourceIds.tableId },
}),
{
retries: 3,
onFailedAttempt: async (error) => {
const shouldRetry =
error instanceof HttpRequestError && error.status === 400 && error.message.includes("block is out of range");

if (!shouldRetry) {
throw error;
}
},
const blockLogs = await fetchBlockLogs({
fromBlock: worldDeploy.deployBlock,
toBlock: worldDeploy.stateBlock,
async getLogs({ fromBlock, toBlock }) {
return getStoreLogs(client, {
address: worldDeploy.address,
fromBlock,
toBlock,
tableId: storeConfig.namespaces.store.tables.ResourceIds.tableId,
});
},
);
});
const logs = flattenStoreLogs(blockLogs.flatMap((block) => block.logs));

const resourceIds = logs.map((log) => log.args.keyTuple[0]);
debug("found", resourceIds.length, "resource IDs for", worldDeploy.address);

Expand Down
Loading

0 comments on commit 7ddcf64

Please sign in to comment.