From 51a7be00825e2a90737e824356d3067ef27951aa Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 26 Jun 2023 14:36:29 +0100 Subject: [PATCH 01/30] feat(block-events-stream): add block events stream package --- packages/block-events-stream/.eslintrc | 6 + packages/block-events-stream/.gitignore | 1 + packages/block-events-stream/.npmignore | 6 + packages/block-events-stream/package.json | 44 ++++++ packages/block-events-stream/src/common.ts | 21 +++ .../src/createBlockEventsStream.ts | 126 ++++++++++++++++++ .../src/createBlockNumberStream.ts | 44 ++++++ .../src/createBlockStream.ts | 38 ++++++ packages/block-events-stream/src/debug.ts | 3 + packages/block-events-stream/src/getLogs.ts | 71 ++++++++++ packages/block-events-stream/src/index.ts | 4 + .../src/isNonPendingBlock.ts | 12 ++ .../src/isNonPendingLog.ts | 19 +++ packages/block-events-stream/src/utils.ts | 7 + packages/block-events-stream/tsconfig.json | 14 ++ packages/block-events-stream/tsup.config.ts | 11 ++ pnpm-lock.yaml | 37 +++++ 17 files changed, 464 insertions(+) create mode 100644 packages/block-events-stream/.eslintrc create mode 100644 packages/block-events-stream/.gitignore create mode 100644 packages/block-events-stream/.npmignore create mode 100644 packages/block-events-stream/package.json create mode 100644 packages/block-events-stream/src/common.ts create mode 100644 packages/block-events-stream/src/createBlockEventsStream.ts create mode 100644 packages/block-events-stream/src/createBlockNumberStream.ts create mode 100644 packages/block-events-stream/src/createBlockStream.ts create mode 100644 packages/block-events-stream/src/debug.ts create mode 100644 packages/block-events-stream/src/getLogs.ts create mode 100644 packages/block-events-stream/src/index.ts create mode 100644 packages/block-events-stream/src/isNonPendingBlock.ts create mode 100644 packages/block-events-stream/src/isNonPendingLog.ts create mode 100644 packages/block-events-stream/src/utils.ts create mode 100644 packages/block-events-stream/tsconfig.json create mode 100644 packages/block-events-stream/tsup.config.ts diff --git a/packages/block-events-stream/.eslintrc b/packages/block-events-stream/.eslintrc new file mode 100644 index 0000000000..6db0063ad7 --- /dev/null +++ b/packages/block-events-stream/.eslintrc @@ -0,0 +1,6 @@ +{ + "extends": ["../../.eslintrc"], + "rules": { + "@typescript-eslint/explicit-function-return-type": "error" + } +} diff --git a/packages/block-events-stream/.gitignore b/packages/block-events-stream/.gitignore new file mode 100644 index 0000000000..1521c8b765 --- /dev/null +++ b/packages/block-events-stream/.gitignore @@ -0,0 +1 @@ +dist diff --git a/packages/block-events-stream/.npmignore b/packages/block-events-stream/.npmignore new file mode 100644 index 0000000000..84815f1eba --- /dev/null +++ b/packages/block-events-stream/.npmignore @@ -0,0 +1,6 @@ +* + +!dist/** +!src/** +!package.json +!README.md diff --git a/packages/block-events-stream/package.json b/packages/block-events-stream/package.json new file mode 100644 index 0000000000..1ae4581be5 --- /dev/null +++ b/packages/block-events-stream/package.json @@ -0,0 +1,44 @@ +{ + "name": "@latticexyz/block-events-stream", + "version": "1.42.0", + "description": "Create a stream of EVM block events", + "repository": { + "type": "git", + "url": "https://github.com/latticexyz/mud.git", + "directory": "packages/block-events-stream" + }, + "license": "MIT", + "type": "module", + "exports": { + ".": "./dist/index.js" + }, + "types": "src/index.ts", + "scripts": { + "build": "pnpm run build:js", + "build:js": "tsup", + "clean": "pnpm run clean:js", + "clean:js": "rimraf dist", + "dev": "tsup --watch", + "lint": "eslint .", + "test": "vitest typecheck --run --passWithNoTests && vitest --run --passWithNoTests" + }, + "dependencies": { + "@latticexyz/common": "workspace:*", + "@latticexyz/config": "workspace:*", + "@latticexyz/schema-type": "workspace:*", + "@latticexyz/store": "workspace:*", + "abitype": "0.8.7", + "debug": "^4.3.4", + "rxjs": "7.5.5", + "viem": "1.1.7" + }, + "devDependencies": { + "@types/debug": "^4.1.7", + "tsup": "^6.7.0", + "vitest": "0.31.4" + }, + "publishConfig": { + "access": "public" + }, + "gitHead": "914a1e0ae4a573d685841ca2ea921435057deb8f" +} diff --git a/packages/block-events-stream/src/common.ts b/packages/block-events-stream/src/common.ts new file mode 100644 index 0000000000..2933aba815 --- /dev/null +++ b/packages/block-events-stream/src/common.ts @@ -0,0 +1,21 @@ +import { BehaviorSubject, Observable } from "rxjs"; +import type { BlockNumber, Hex } from "viem"; +import type { AbiEvent } from "abitype"; +import { NonPendingLog } from "./isNonPendingLog"; +import { GetLogsReturnType } from "./getLogs"; + +export type ReadonlyBehaviorSubject = Pick, "subscribe" | "pipe" | "value" | "getValue">; + +export type BlockEvents = { + blockNumber: BlockNumber; + blockHash: Hex; + events: NonPendingLog>[]; +}; + +export type BlockEventsStream = Observable>; + +export type BlockEventsFromStream> = TStream extends BlockEventsStream< + infer TAbiEvent +> + ? BlockEvents + : never; diff --git a/packages/block-events-stream/src/createBlockEventsStream.ts b/packages/block-events-stream/src/createBlockEventsStream.ts new file mode 100644 index 0000000000..c3622a7481 --- /dev/null +++ b/packages/block-events-stream/src/createBlockEventsStream.ts @@ -0,0 +1,126 @@ +import { BehaviorSubject, Subject } from "rxjs"; +import type { BlockNumber, Hex, PublicClient } from "viem"; +import type { AbiEvent } from "abitype"; +import { BlockEvents, BlockEventsStream, ReadonlyBehaviorSubject } from "./common"; +import { bigIntMin } from "./utils"; +import { isNonPendingLog } from "./isNonPendingLog"; +import { debug } from "./debug"; +import { createBlockNumberStream } from "./createBlockNumberStream"; +import { getLogs } from "./getLogs"; +import { storeEventsAbi } from "@latticexyz/store"; + +export type CreateBlockEventsStreamOptions = { + publicClient: PublicClient; + fromBlock?: BlockNumber; + toBlock?: BlockNumber | ReadonlyBehaviorSubject; + address?: Hex; + events: readonly TAbiEvent[]; + maxBlockRange?: number; // defaults to 1000 +}; + +export async function createBlockEventsStream({ + publicClient, + fromBlock: initialFromBlock, + toBlock: initialToBlock, + address, + events, + maxBlockRange = 1000, +}: CreateBlockEventsStreamOptions): Promise> { + debug("createBlockEventsStream", { initialFromBlock, initialToBlock, address, events, maxBlockRange }); + + if (initialFromBlock == null) { + debug("getting earliest block"); + const earliestBlock = await publicClient.getBlock({ blockTag: "earliest" }); + debug("earliest block", earliestBlock); + if (earliestBlock.number == null) { + // TODO: better error + throw new Error(`pending or missing earliest block`); + } + initialFromBlock = earliestBlock.number; + } + + if (initialToBlock == null) { + debug("creating latest block number stream"); + initialToBlock = await createBlockNumberStream({ publicClient, blockTag: "latest" }); + } + + const stream = new Subject>(); + fetchBlockRange( + initialFromBlock, + maxBlockRange, + initialToBlock instanceof BehaviorSubject ? initialToBlock.value : initialToBlock + ); + + async function fetchBlockRange(fromBlock: bigint, maxBlockRange: number, lastBlockNumber: bigint): Promise { + try { + const toBlock = bigIntMin(fromBlock + BigInt(maxBlockRange), lastBlockNumber); + debug("fetching block range", { fromBlock, toBlock }); + + // TODO: swap this with viem `getLogs` call when viem supports multiple events: https://github.com/wagmi-dev/viem/pull/633 + const logs = await getLogs({ + publicClient, + address, + fromBlock, + toBlock, + events: storeEventsAbi, + }); + + // TODO: do something other than just throwing out pending logs + const nonPendingLogs = logs.filter(isNonPendingLog); + + if (logs.length !== nonPendingLogs.length) { + // TODO: better error + console.warn("pending logs discarded"); + } + + // TODO: handle RPC block range errors + // TODO: handle RPC rate limit errors (hopefully via client retry policy) + + const blockNumbers = Array.from(new Set(nonPendingLogs.map((log) => log.blockNumber))); + blockNumbers.sort((a, b) => (a < b ? -1 : a > b ? 1 : 0)); + + for (const blockNumber of blockNumbers) { + const blockLogs = nonPendingLogs.filter((log) => log.blockNumber === blockNumber); + blockLogs.sort((a, b) => (a.logIndex < b.logIndex ? -1 : a.logIndex > b.logIndex ? 1 : 0)); + + if (blockLogs.length) { + debug("emitting events for block", { blockNumber, blockHash: blockLogs[0].blockHash, events: blockLogs }); + stream.next({ + blockNumber, + blockHash: blockLogs[0].blockHash, + events: blockLogs, + // TODO: figure out why we need to cast this + } as any as BlockEvents); + } + } + + if (toBlock < lastBlockNumber) { + fetchBlockRange(toBlock + 1n, maxBlockRange, lastBlockNumber); + return; + } + + if (initialToBlock instanceof BehaviorSubject) { + if (initialToBlock.value > toBlock) { + fetchBlockRange(toBlock + 1n, maxBlockRange, initialToBlock.value); + return; + } + + debug("waiting for next block"); + const sub = initialToBlock.subscribe((blockNumber) => { + if (blockNumber > toBlock) { + sub.unsubscribe(); + fetchBlockRange(toBlock + 1n, maxBlockRange, blockNumber); + } + }); + return; + } + + stream.complete(); + } catch (error: unknown) { + // TODO: do more specific error handling? + stream.error(error); + } + } + + return stream.asObservable(); +} diff --git a/packages/block-events-stream/src/createBlockNumberStream.ts b/packages/block-events-stream/src/createBlockNumberStream.ts new file mode 100644 index 0000000000..6fdb6ed1fb --- /dev/null +++ b/packages/block-events-stream/src/createBlockNumberStream.ts @@ -0,0 +1,44 @@ +import { BehaviorSubject, filter, map } from "rxjs"; +import type { Block, BlockNumber, BlockTag, PublicClient } from "viem"; +import { createBlockStream } from "./createBlockStream"; +import { ReadonlyBehaviorSubject } from "./common"; +import { isNonPendingBlock } from "./isNonPendingBlock"; + +// TODO: pass through viem's types, e.g. WatchBlocksParameters -> GetBlockReturnType + +export type CreateBlockNumberStreamOptions = + | { + publicClient: PublicClient; + blockTag: Omit; + block$?: never; + } + | { + publicClient?: never; + blockTag?: never; + block$: ReadonlyBehaviorSubject; + }; + +export async function createBlockNumberStream({ + publicClient, + blockTag, + block$: initialBlock$, +}: CreateBlockNumberStreamOptions): Promise> { + const block$ = initialBlock$ ?? (await createBlockStream({ publicClient, blockTag: blockTag as BlockTag })); + const block = block$.value; + + if (!block.number) { + // TODO: better error + throw new Error(`${blockTag} block missing or pending`); + } + + const blockNumber$ = new BehaviorSubject(block.number); + + block$ + .pipe( + filter(isNonPendingBlock), + map((block) => block.number) + ) + .subscribe(blockNumber$); + + return blockNumber$; +} diff --git a/packages/block-events-stream/src/createBlockStream.ts b/packages/block-events-stream/src/createBlockStream.ts new file mode 100644 index 0000000000..837a782e61 --- /dev/null +++ b/packages/block-events-stream/src/createBlockStream.ts @@ -0,0 +1,38 @@ +import { BehaviorSubject } from "rxjs"; +import type { Block, BlockTag, PublicClient } from "viem"; +import { ReadonlyBehaviorSubject } from "./common"; + +// TODO: pass through viem's types, e.g. WatchBlocksParameters -> GetBlockReturnType +// TODO: make stream closeable to make use of unwatch? + +export type CreateBlockStreamOptions = { + publicClient: PublicClient; + blockTag: BlockTag; +}; + +export function createBlockStream({ + publicClient, + blockTag, +}: CreateBlockStreamOptions): Promise> { + return new Promise((resolve, reject) => { + let stream: BehaviorSubject | undefined; + // TODO: do something with unwatch? + const unwatch = publicClient.watchBlocks({ + blockTag, + emitOnBegin: true, + onBlock: (block) => { + if (!stream) { + stream = new BehaviorSubject(block); + // TODO: return actual readonly behavior subject rather than just a type? + resolve(stream as ReadonlyBehaviorSubject); + } else { + stream.next(block); + } + }, + onError: (error) => { + reject(error); + stream?.error(error); + }, + }); + }); +} diff --git a/packages/block-events-stream/src/debug.ts b/packages/block-events-stream/src/debug.ts new file mode 100644 index 0000000000..b6536cc8aa --- /dev/null +++ b/packages/block-events-stream/src/debug.ts @@ -0,0 +1,3 @@ +import createDebug from "debug"; + +export const debug = createDebug("mud:block-events-stream"); diff --git a/packages/block-events-stream/src/getLogs.ts b/packages/block-events-stream/src/getLogs.ts new file mode 100644 index 0000000000..242b75471d --- /dev/null +++ b/packages/block-events-stream/src/getLogs.ts @@ -0,0 +1,71 @@ +import { AbiEvent } from "abitype"; +import { + Address, + BlockNumber, + BlockTag, + Log, + PublicClient, + decodeEventLog, + encodeEventTopics, + numberToHex, + formatLog, +} from "viem"; +import { isDefined } from "@latticexyz/common/utils"; + +// Based on https://github.com/wagmi-dev/viem/blob/main/src/actions/public/getLogs.ts +// TODO: swap this out once viem has support for multiple events: https://github.com/wagmi-dev/viem/pull/633 + +export type GetLogsOptions = { + publicClient: PublicClient; + address?: Address | Address[]; + events: TAbiEvents; + fromBlock: BlockNumber | BlockTag; + toBlock: BlockNumber | BlockTag; +}; + +export type GetLogsReturnType = Log< + bigint, + number, + TAbiEvents[number], + true, + TAbiEvents +>; + +export async function getLogs({ + publicClient, + address, + events, + fromBlock, + toBlock, +}: GetLogsOptions): Promise[]> { + const topics = [events.flatMap((event) => encodeEventTopics({ abi: [event], eventName: event.name }))]; + + const logs = await publicClient.request({ + method: "eth_getLogs", + params: [ + { + address, + topics, + fromBlock: typeof fromBlock === "bigint" ? numberToHex(fromBlock) : fromBlock, + toBlock: typeof toBlock === "bigint" ? numberToHex(toBlock) : toBlock, + }, + ], + }); + + return logs + .map((log) => { + try { + const { eventName, args } = decodeEventLog({ + abi: events, + data: log.data, + topics: log.topics, + strict: true, + }); + return formatLog(log, { args, eventName }); + } catch (err) { + // We're using strict mode, so just skip if there is an error decoding. + return; + } + }) + .filter(isDefined) as GetLogsReturnType[]; +} diff --git a/packages/block-events-stream/src/index.ts b/packages/block-events-stream/src/index.ts new file mode 100644 index 0000000000..c767c01088 --- /dev/null +++ b/packages/block-events-stream/src/index.ts @@ -0,0 +1,4 @@ +export * from "./common"; +export * from "./createBlockEventsStream"; +export * from "./createBlockNumberStream"; +export * from "./createBlockStream"; diff --git a/packages/block-events-stream/src/isNonPendingBlock.ts b/packages/block-events-stream/src/isNonPendingBlock.ts new file mode 100644 index 0000000000..5545fc23a5 --- /dev/null +++ b/packages/block-events-stream/src/isNonPendingBlock.ts @@ -0,0 +1,12 @@ +import type { Block } from "viem"; + +export type NonPendingBlock = TBlock & { + hash: NonNullable; + logsBloom: NonNullable; + nonce: NonNullable; + number: NonNullable; +}; + +export function isNonPendingBlock(block: TBlock): block is NonPendingBlock { + return block.hash != null && block.logsBloom != null && block.nonce != null && block.number != null; +} diff --git a/packages/block-events-stream/src/isNonPendingLog.ts b/packages/block-events-stream/src/isNonPendingLog.ts new file mode 100644 index 0000000000..6110d18171 --- /dev/null +++ b/packages/block-events-stream/src/isNonPendingLog.ts @@ -0,0 +1,19 @@ +import type { Log } from "viem"; + +export type NonPendingLog = TLog & { + blockHash: NonNullable; + blockNumber: NonNullable; + logIndex: NonNullable; + transactionHash: NonNullable; + transactionIndex: NonNullable; +}; + +export function isNonPendingLog(log: TLog): log is NonPendingLog { + return ( + log.blockHash != null && + log.blockNumber != null && + log.logIndex != null && + log.transactionHash != null && + log.transactionIndex != null + ); +} diff --git a/packages/block-events-stream/src/utils.ts b/packages/block-events-stream/src/utils.ts new file mode 100644 index 0000000000..33b0f506bc --- /dev/null +++ b/packages/block-events-stream/src/utils.ts @@ -0,0 +1,7 @@ +export function bigIntMin(...args: bigint[]): bigint { + return args.reduce((m, e) => (e < m ? e : m)); +} + +export function bigIntMax(...args: bigint[]): bigint { + return args.reduce((m, e) => (e > m ? e : m)); +} diff --git a/packages/block-events-stream/tsconfig.json b/packages/block-events-stream/tsconfig.json new file mode 100644 index 0000000000..e590f0c026 --- /dev/null +++ b/packages/block-events-stream/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "es2021", + "module": "esnext", + "moduleResolution": "node", + "declaration": true, + "sourceMap": true, + "outDir": "dist", + "isolatedModules": true, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "strict": true + } +} diff --git a/packages/block-events-stream/tsup.config.ts b/packages/block-events-stream/tsup.config.ts new file mode 100644 index 0000000000..b755469f90 --- /dev/null +++ b/packages/block-events-stream/tsup.config.ts @@ -0,0 +1,11 @@ +import { defineConfig } from "tsup"; + +export default defineConfig({ + entry: ["src/index.ts"], + target: "esnext", + format: ["esm"], + dts: false, + sourcemap: true, + clean: true, + minify: true, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8b6dc5b620..42053798e5 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -45,6 +45,43 @@ importers: specifier: ^4.9.5 version: 4.9.5 + packages/block-events-stream: + dependencies: + '@latticexyz/common': + specifier: workspace:* + version: link:../common + '@latticexyz/config': + specifier: workspace:* + version: link:../config + '@latticexyz/schema-type': + specifier: workspace:* + version: link:../schema-type + '@latticexyz/store': + specifier: workspace:* + version: link:../store + abitype: + specifier: 0.8.7 + version: 0.8.7(typescript@5.0.4) + debug: + specifier: ^4.3.4 + version: 4.3.4(supports-color@8.1.1) + rxjs: + specifier: 7.5.5 + version: 7.5.5 + viem: + specifier: 1.1.7 + version: 1.1.7(typescript@5.0.4) + devDependencies: + '@types/debug': + specifier: ^4.1.7 + version: 4.1.7 + tsup: + specifier: ^6.7.0 + version: 6.7.0(postcss@8.4.23)(typescript@5.0.4) + vitest: + specifier: 0.31.4 + version: 0.31.4 + packages/cli: dependencies: '@ethersproject/abi': From 1952a98f80a84ef3aa4ad1146762322fb264c193 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 26 Jun 2023 16:37:54 +0100 Subject: [PATCH 02/30] wip anvil test --- packages/block-events-stream/package.json | 1 + .../src/createBlockEventsStream.test.ts | 28 ++++++++++ pnpm-lock.yaml | 54 ++++++++++++++++--- 3 files changed, 76 insertions(+), 7 deletions(-) create mode 100644 packages/block-events-stream/src/createBlockEventsStream.test.ts diff --git a/packages/block-events-stream/package.json b/packages/block-events-stream/package.json index 1ae4581be5..4cb77520dc 100644 --- a/packages/block-events-stream/package.json +++ b/packages/block-events-stream/package.json @@ -34,6 +34,7 @@ }, "devDependencies": { "@types/debug": "^4.1.7", + "@viem/anvil": "^0.0.6", "tsup": "^6.7.0", "vitest": "0.31.4" }, diff --git a/packages/block-events-stream/src/createBlockEventsStream.test.ts b/packages/block-events-stream/src/createBlockEventsStream.test.ts new file mode 100644 index 0000000000..a1ed8367c4 --- /dev/null +++ b/packages/block-events-stream/src/createBlockEventsStream.test.ts @@ -0,0 +1,28 @@ +import { describe, expect, it } from "vitest"; +import { createBlockEventsStream } from "./createBlockEventsStream"; +import { createPublicClient, createTestClient, http } from "viem"; +import { foundry } from "viem/chains"; +import { storeEventsAbi } from "@latticexyz/store"; +import { createAnvil } from "@viem/anvil"; + +describe("createBlockEventsStream", () => { + it("streams events grouped by block", async () => { + const anvil = createAnvil(); + await anvil.start(); + + const publicClient = createPublicClient({ + chain: foundry, + transport: http(), + }); + + const testClient = createTestClient({ + chain: foundry, + mode: "anvil", + transport: http(), + }); + + const block$ = await createBlockEventsStream({ publicClient, events: storeEventsAbi }); + + await anvil.stop(); + }); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 42053798e5..8d8f67fdd1 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -75,6 +75,9 @@ importers: '@types/debug': specifier: ^4.1.7 version: 4.1.7 + '@viem/anvil': + specifier: ^0.0.6 + version: 0.0.6(debug@4.3.4) tsup: specifier: ^6.7.0 version: 6.7.0(postcss@8.4.23)(typescript@5.0.4) @@ -4019,6 +4022,19 @@ packages: '@use-gesture/core': 10.2.9 dev: false + /@viem/anvil@0.0.6(debug@4.3.4): + resolution: {integrity: sha512-OjKR/+FVwzuygXYFqP8MBal1SXG8bT2gbZwqqB0XuLw81LNBBvmE/Repm6+5kkBh4IUj0PhYdrqOsnayS14Gtg==} + dependencies: + execa: 7.1.1 + get-port: 6.1.2 + http-proxy: 1.18.1(debug@4.3.4) + ws: 8.13.0 + transitivePeerDependencies: + - bufferutil + - debug + - utf-8-validate + dev: true + /@vitejs/plugin-react@4.0.0(vite@4.3.6): resolution: {integrity: sha512-HX0XzMjL3hhOYm+0s95pb0Z7F8O81G7joUHgfDd/9J/ZZf5k4xX6QAMFkKsHFxaHlf6X7GD7+XuaZ66ULiJuhQ==} engines: {node: ^14.18.0 || >=16.0.0} @@ -6621,6 +6637,21 @@ packages: strip-final-newline: 3.0.0 dev: false + /execa@7.1.1: + resolution: {integrity: sha512-wH0eMf/UXckdUYnO21+HDztteVv05rq2GXksxT4fCGeHkBhw1DROXh40wcjMcRqDOWE7iPJ4n3M7e2+YFP+76Q==} + engines: {node: ^14.18.0 || ^16.14.0 || >=18.0.0} + dependencies: + cross-spawn: 7.0.3 + get-stream: 6.0.1 + human-signals: 4.3.1 + is-stream: 3.0.0 + merge-stream: 2.0.0 + npm-run-path: 5.1.0 + onetime: 6.0.0 + signal-exit: 3.0.7 + strip-final-newline: 3.0.0 + dev: true + /execcli@5.0.6: resolution: {integrity: sha512-du+uy/Ew2P90PKjSHI89u/XuqVaBDzvaJ6ePn40JaOy7owFQNsYDbd5AoR5A559HEAb1i5HO22rJxtgVonf5Bg==} engines: {node: '>=8', npm: '>=4'} @@ -7052,6 +7083,11 @@ packages: engines: {node: '>=8'} dev: true + /get-port@6.1.2: + resolution: {integrity: sha512-BrGGraKm2uPqurfGVj/z97/zv8dPleC6x9JBNRTrDNtCkkRF4rPwrQXFgL7+I+q8QSdU4ntLQX2D7KIxSy8nGw==} + engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + dev: true + /get-stream@5.2.0: resolution: {integrity: sha512-nBF+F1rAZVCu/p7rjzgA+Yb4lfYXrpl7a6VmJrU8wF9I1CKvP/QwPNZHnOlwbTkY6dvtFIzFMSyQXbLoTQPRpA==} engines: {node: '>=8'} @@ -7516,6 +7552,17 @@ packages: - supports-color dev: true + /http-proxy@1.18.1(debug@4.3.4): + resolution: {integrity: sha512-7mz/721AbnJwIVbnaSv1Cz3Am0ZLT/UBwkC92VlxhXv/k/BBQfM2fXElQNC27BVGr0uwUpplYPQM9LnaBMR5NQ==} + engines: {node: '>=8.0.0'} + dependencies: + eventemitter3: 4.0.7 + follow-redirects: 1.15.2(debug@4.3.4) + requires-port: 1.0.0 + transitivePeerDependencies: + - debug + dev: true + /https-proxy-agent@5.0.1: resolution: {integrity: sha512-dFcAjpTQFgoLMzC2VwU+C/CbS7uRL0lWmxDITmqm7C+7F0Odmj6s9l6alZc6AELXhrnggM2CeWSXHGOdX2YtwA==} engines: {node: '>= 6'} @@ -7538,7 +7585,6 @@ packages: /human-signals@4.3.1: resolution: {integrity: sha512-nZXjEF2nbo7lIw3mgYjItAfgQXog3OjJogSbKa2CQIIvSGWcKgeJnQlNXip6NglNzYH45nSRiEVimMvYL8DDqQ==} engines: {node: '>=14.18.0'} - dev: false /humanize-ms@1.2.1: resolution: {integrity: sha512-Fl70vYtsAFb/C06PTS9dZBo7ihau+Tu/DNCk/OyHhea07S+aeMWpFFkUaXRa8fI+ScZbEI8dfSxwY7gxZ9SAVQ==} @@ -8015,7 +8061,6 @@ packages: /is-stream@3.0.0: resolution: {integrity: sha512-LnQR4bZ9IADDRSkvpqMGvt/tEJWclzklNgSw48V5EAaAeDd6qGvN8ei6k5p0tvxSR171VmGyHuTiAOfxAbr8kA==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} - dev: false /is-string@1.0.7: resolution: {integrity: sha512-tE2UXzivje6ofPW7l23cjDOMa09gb7xlAqG6jG5ej6uPV32TlWP3NKPigtaGeHNu9fohccRYvIiZMfOOnOYUtg==} @@ -9518,7 +9563,6 @@ packages: /mimic-fn@4.0.0: resolution: {integrity: sha512-vqiC06CuhBTUdZH+RYl8sFrL096vA45Ok5ISO6sE/Mr1jRbGH4Csnhi8f3wKVl7x8mO4Au7Ir9D3Oyv1VYMFJw==} engines: {node: '>=12'} - dev: false /min-indent@1.0.1: resolution: {integrity: sha512-I9jwMn07Sy/IwOj3zVkVik2JTvgpaykDZEigL6Rx6N9LbMywwUSMtxET+7lVoDLLd3O3IXwJwvuuns8UB/HeAg==} @@ -10142,7 +10186,6 @@ packages: engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} dependencies: path-key: 4.0.0 - dev: false /npmlog@6.0.2: resolution: {integrity: sha512-/vBvz5Jfr9dT/aFWd0FIRf+T/Q2WBsLENygUaFUqstqsycmZAP/t5BvFJTK0viFmSUxiUKTUplWy5vt+rvKIxg==} @@ -10351,7 +10394,6 @@ packages: engines: {node: '>=12'} dependencies: mimic-fn: 4.0.0 - dev: false /open@8.4.2: resolution: {integrity: sha512-7x81NCL719oNbsq/3mh+hVrAWmFuEYUqrq/Iw3kUzH8ReypT9QQ0BLoJS7/G9k6N81XjW4qHWtjWwe/9eLy1EQ==} @@ -10687,7 +10729,6 @@ packages: /path-key@4.0.0: resolution: {integrity: sha512-haREypq7xkM7ErfgIyA0z+Bj4AGKlMSdlQE2jvJo6huWD1EdkKYV+G/T4nq0YEF2vgTT8kqMFKo1uHn950r4SQ==} engines: {node: '>=12'} - dev: false /path-parse@1.0.7: resolution: {integrity: sha512-LDJzPVEEEPR+y48z93A0Ed0yXb8pAByGWo/k5YYdYgpY2/2EsOsksJrq7lOHxryrVOn1ejG6oAp8ahvOIQD8sw==} @@ -12138,7 +12179,6 @@ packages: /strip-final-newline@3.0.0: resolution: {integrity: sha512-dOESqjYr96iWYylGObzd39EuNTa5VJxyvVAEm5Jnh7KGo75V43Hk1odPQkNDyXNmUR6k+gEiDVXnjB8HJ3crXw==} engines: {node: '>=12'} - dev: false /strip-hex-prefix@1.0.0: resolution: {integrity: sha512-q8d4ue7JGEiVcypji1bALTos+0pWtyGlivAWyPuTkHzuTCJqrK9sWxYQZUq6Nq3cuyv3bm734IhHvHtGGURU6A==} From 8ce81506c0f3ba5d90d9ea74746fa81e7d6cef42 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 26 Jun 2023 19:50:11 +0100 Subject: [PATCH 03/30] Revert "wip anvil test" This reverts commit 1952a98f80a84ef3aa4ad1146762322fb264c193. --- packages/block-events-stream/package.json | 1 - .../src/createBlockEventsStream.test.ts | 28 ---------- pnpm-lock.yaml | 54 +++---------------- 3 files changed, 7 insertions(+), 76 deletions(-) delete mode 100644 packages/block-events-stream/src/createBlockEventsStream.test.ts diff --git a/packages/block-events-stream/package.json b/packages/block-events-stream/package.json index 4cb77520dc..1ae4581be5 100644 --- a/packages/block-events-stream/package.json +++ b/packages/block-events-stream/package.json @@ -34,7 +34,6 @@ }, "devDependencies": { "@types/debug": "^4.1.7", - "@viem/anvil": "^0.0.6", "tsup": "^6.7.0", "vitest": "0.31.4" }, diff --git a/packages/block-events-stream/src/createBlockEventsStream.test.ts b/packages/block-events-stream/src/createBlockEventsStream.test.ts deleted file mode 100644 index a1ed8367c4..0000000000 --- a/packages/block-events-stream/src/createBlockEventsStream.test.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { describe, expect, it } from "vitest"; -import { createBlockEventsStream } from "./createBlockEventsStream"; -import { createPublicClient, createTestClient, http } from "viem"; -import { foundry } from "viem/chains"; -import { storeEventsAbi } from "@latticexyz/store"; -import { createAnvil } from "@viem/anvil"; - -describe("createBlockEventsStream", () => { - it("streams events grouped by block", async () => { - const anvil = createAnvil(); - await anvil.start(); - - const publicClient = createPublicClient({ - chain: foundry, - transport: http(), - }); - - const testClient = createTestClient({ - chain: foundry, - mode: "anvil", - transport: http(), - }); - - const block$ = await createBlockEventsStream({ publicClient, events: storeEventsAbi }); - - await anvil.stop(); - }); -}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8d8f67fdd1..42053798e5 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -75,9 +75,6 @@ importers: '@types/debug': specifier: ^4.1.7 version: 4.1.7 - '@viem/anvil': - specifier: ^0.0.6 - version: 0.0.6(debug@4.3.4) tsup: specifier: ^6.7.0 version: 6.7.0(postcss@8.4.23)(typescript@5.0.4) @@ -4022,19 +4019,6 @@ packages: '@use-gesture/core': 10.2.9 dev: false - /@viem/anvil@0.0.6(debug@4.3.4): - resolution: {integrity: sha512-OjKR/+FVwzuygXYFqP8MBal1SXG8bT2gbZwqqB0XuLw81LNBBvmE/Repm6+5kkBh4IUj0PhYdrqOsnayS14Gtg==} - dependencies: - execa: 7.1.1 - get-port: 6.1.2 - http-proxy: 1.18.1(debug@4.3.4) - ws: 8.13.0 - transitivePeerDependencies: - - bufferutil - - debug - - utf-8-validate - dev: true - /@vitejs/plugin-react@4.0.0(vite@4.3.6): resolution: {integrity: sha512-HX0XzMjL3hhOYm+0s95pb0Z7F8O81G7joUHgfDd/9J/ZZf5k4xX6QAMFkKsHFxaHlf6X7GD7+XuaZ66ULiJuhQ==} engines: {node: ^14.18.0 || >=16.0.0} @@ -6637,21 +6621,6 @@ packages: strip-final-newline: 3.0.0 dev: false - /execa@7.1.1: - resolution: {integrity: sha512-wH0eMf/UXckdUYnO21+HDztteVv05rq2GXksxT4fCGeHkBhw1DROXh40wcjMcRqDOWE7iPJ4n3M7e2+YFP+76Q==} - engines: {node: ^14.18.0 || ^16.14.0 || >=18.0.0} - dependencies: - cross-spawn: 7.0.3 - get-stream: 6.0.1 - human-signals: 4.3.1 - is-stream: 3.0.0 - merge-stream: 2.0.0 - npm-run-path: 5.1.0 - onetime: 6.0.0 - signal-exit: 3.0.7 - strip-final-newline: 3.0.0 - dev: true - /execcli@5.0.6: resolution: {integrity: sha512-du+uy/Ew2P90PKjSHI89u/XuqVaBDzvaJ6ePn40JaOy7owFQNsYDbd5AoR5A559HEAb1i5HO22rJxtgVonf5Bg==} engines: {node: '>=8', npm: '>=4'} @@ -7083,11 +7052,6 @@ packages: engines: {node: '>=8'} dev: true - /get-port@6.1.2: - resolution: {integrity: sha512-BrGGraKm2uPqurfGVj/z97/zv8dPleC6x9JBNRTrDNtCkkRF4rPwrQXFgL7+I+q8QSdU4ntLQX2D7KIxSy8nGw==} - engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} - dev: true - /get-stream@5.2.0: resolution: {integrity: sha512-nBF+F1rAZVCu/p7rjzgA+Yb4lfYXrpl7a6VmJrU8wF9I1CKvP/QwPNZHnOlwbTkY6dvtFIzFMSyQXbLoTQPRpA==} engines: {node: '>=8'} @@ -7552,17 +7516,6 @@ packages: - supports-color dev: true - /http-proxy@1.18.1(debug@4.3.4): - resolution: {integrity: sha512-7mz/721AbnJwIVbnaSv1Cz3Am0ZLT/UBwkC92VlxhXv/k/BBQfM2fXElQNC27BVGr0uwUpplYPQM9LnaBMR5NQ==} - engines: {node: '>=8.0.0'} - dependencies: - eventemitter3: 4.0.7 - follow-redirects: 1.15.2(debug@4.3.4) - requires-port: 1.0.0 - transitivePeerDependencies: - - debug - dev: true - /https-proxy-agent@5.0.1: resolution: {integrity: sha512-dFcAjpTQFgoLMzC2VwU+C/CbS7uRL0lWmxDITmqm7C+7F0Odmj6s9l6alZc6AELXhrnggM2CeWSXHGOdX2YtwA==} engines: {node: '>= 6'} @@ -7585,6 +7538,7 @@ packages: /human-signals@4.3.1: resolution: {integrity: sha512-nZXjEF2nbo7lIw3mgYjItAfgQXog3OjJogSbKa2CQIIvSGWcKgeJnQlNXip6NglNzYH45nSRiEVimMvYL8DDqQ==} engines: {node: '>=14.18.0'} + dev: false /humanize-ms@1.2.1: resolution: {integrity: sha512-Fl70vYtsAFb/C06PTS9dZBo7ihau+Tu/DNCk/OyHhea07S+aeMWpFFkUaXRa8fI+ScZbEI8dfSxwY7gxZ9SAVQ==} @@ -8061,6 +8015,7 @@ packages: /is-stream@3.0.0: resolution: {integrity: sha512-LnQR4bZ9IADDRSkvpqMGvt/tEJWclzklNgSw48V5EAaAeDd6qGvN8ei6k5p0tvxSR171VmGyHuTiAOfxAbr8kA==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + dev: false /is-string@1.0.7: resolution: {integrity: sha512-tE2UXzivje6ofPW7l23cjDOMa09gb7xlAqG6jG5ej6uPV32TlWP3NKPigtaGeHNu9fohccRYvIiZMfOOnOYUtg==} @@ -9563,6 +9518,7 @@ packages: /mimic-fn@4.0.0: resolution: {integrity: sha512-vqiC06CuhBTUdZH+RYl8sFrL096vA45Ok5ISO6sE/Mr1jRbGH4Csnhi8f3wKVl7x8mO4Au7Ir9D3Oyv1VYMFJw==} engines: {node: '>=12'} + dev: false /min-indent@1.0.1: resolution: {integrity: sha512-I9jwMn07Sy/IwOj3zVkVik2JTvgpaykDZEigL6Rx6N9LbMywwUSMtxET+7lVoDLLd3O3IXwJwvuuns8UB/HeAg==} @@ -10186,6 +10142,7 @@ packages: engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} dependencies: path-key: 4.0.0 + dev: false /npmlog@6.0.2: resolution: {integrity: sha512-/vBvz5Jfr9dT/aFWd0FIRf+T/Q2WBsLENygUaFUqstqsycmZAP/t5BvFJTK0viFmSUxiUKTUplWy5vt+rvKIxg==} @@ -10394,6 +10351,7 @@ packages: engines: {node: '>=12'} dependencies: mimic-fn: 4.0.0 + dev: false /open@8.4.2: resolution: {integrity: sha512-7x81NCL719oNbsq/3mh+hVrAWmFuEYUqrq/Iw3kUzH8ReypT9QQ0BLoJS7/G9k6N81XjW4qHWtjWwe/9eLy1EQ==} @@ -10729,6 +10687,7 @@ packages: /path-key@4.0.0: resolution: {integrity: sha512-haREypq7xkM7ErfgIyA0z+Bj4AGKlMSdlQE2jvJo6huWD1EdkKYV+G/T4nq0YEF2vgTT8kqMFKo1uHn950r4SQ==} engines: {node: '>=12'} + dev: false /path-parse@1.0.7: resolution: {integrity: sha512-LDJzPVEEEPR+y48z93A0Ed0yXb8pAByGWo/k5YYdYgpY2/2EsOsksJrq7lOHxryrVOn1ejG6oAp8ahvOIQD8sw==} @@ -12179,6 +12138,7 @@ packages: /strip-final-newline@3.0.0: resolution: {integrity: sha512-dOESqjYr96iWYylGObzd39EuNTa5VJxyvVAEm5Jnh7KGo75V43Hk1odPQkNDyXNmUR6k+gEiDVXnjB8HJ3crXw==} engines: {node: '>=12'} + dev: false /strip-hex-prefix@1.0.0: resolution: {integrity: sha512-q8d4ue7JGEiVcypji1bALTos+0pWtyGlivAWyPuTkHzuTCJqrK9sWxYQZUq6Nq3cuyv3bm734IhHvHtGGURU6A==} From 711e1afd1c120c01166b4503f32e6ea8079bd155 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 26 Jun 2023 19:51:33 +0100 Subject: [PATCH 04/30] accidentally left in a store refernence --- packages/block-events-stream/package.json | 1 - packages/block-events-stream/src/createBlockEventsStream.ts | 3 +-- pnpm-lock.yaml | 3 --- 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/packages/block-events-stream/package.json b/packages/block-events-stream/package.json index 1ae4581be5..654b008318 100644 --- a/packages/block-events-stream/package.json +++ b/packages/block-events-stream/package.json @@ -26,7 +26,6 @@ "@latticexyz/common": "workspace:*", "@latticexyz/config": "workspace:*", "@latticexyz/schema-type": "workspace:*", - "@latticexyz/store": "workspace:*", "abitype": "0.8.7", "debug": "^4.3.4", "rxjs": "7.5.5", diff --git a/packages/block-events-stream/src/createBlockEventsStream.ts b/packages/block-events-stream/src/createBlockEventsStream.ts index c3622a7481..4ca2ec0065 100644 --- a/packages/block-events-stream/src/createBlockEventsStream.ts +++ b/packages/block-events-stream/src/createBlockEventsStream.ts @@ -7,7 +7,6 @@ import { isNonPendingLog } from "./isNonPendingLog"; import { debug } from "./debug"; import { createBlockNumberStream } from "./createBlockNumberStream"; import { getLogs } from "./getLogs"; -import { storeEventsAbi } from "@latticexyz/store"; export type CreateBlockEventsStreamOptions = { publicClient: PublicClient; @@ -62,7 +61,7 @@ export async function createBlockEventsStream({ address, fromBlock, toBlock, - events: storeEventsAbi, + events, }); // TODO: do something other than just throwing out pending logs diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 42053798e5..458d8d4af4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -56,9 +56,6 @@ importers: '@latticexyz/schema-type': specifier: workspace:* version: link:../schema-type - '@latticexyz/store': - specifier: workspace:* - version: link:../store abitype: specifier: 0.8.7 version: 0.8.7(typescript@5.0.4) From c1af973f1a6cf72393a7d21dc57b1b1fdbf3e04f Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Tue, 27 Jun 2023 08:52:37 -0700 Subject: [PATCH 05/30] Update packages/block-events-stream/src/createBlockEventsStream.ts Co-authored-by: alvarius <89248902+alvrs@users.noreply.github.com> --- packages/block-events-stream/src/createBlockEventsStream.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/block-events-stream/src/createBlockEventsStream.ts b/packages/block-events-stream/src/createBlockEventsStream.ts index 4ca2ec0065..74eabf7c6f 100644 --- a/packages/block-events-stream/src/createBlockEventsStream.ts +++ b/packages/block-events-stream/src/createBlockEventsStream.ts @@ -14,7 +14,8 @@ export type CreateBlockEventsStreamOptions = { toBlock?: BlockNumber | ReadonlyBehaviorSubject; address?: Hex; events: readonly TAbiEvent[]; - maxBlockRange?: number; // defaults to 1000 + /** Defaults to 1000 */ + maxBlockRange?: number; }; export async function createBlockEventsStream({ From 0f55aeb0914d9fcc36ba73c97c245bea6eb3065c Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Wed, 28 Jun 2023 10:22:22 +0100 Subject: [PATCH 06/30] make streams closeable I don't love this design --- .../src/createBlockEventsStream.ts | 40 ++++++++++++++----- .../src/createBlockNumberStream.ts | 30 +++++++++++--- .../src/createBlockStream.ts | 13 ++++-- 3 files changed, 64 insertions(+), 19 deletions(-) diff --git a/packages/block-events-stream/src/createBlockEventsStream.ts b/packages/block-events-stream/src/createBlockEventsStream.ts index 74eabf7c6f..9340409575 100644 --- a/packages/block-events-stream/src/createBlockEventsStream.ts +++ b/packages/block-events-stream/src/createBlockEventsStream.ts @@ -18,6 +18,11 @@ export type CreateBlockEventsStreamOptions = { maxBlockRange?: number; }; +export type CreateBlockEventsStreamResult = { + stream: BlockEventsStream; + close: () => void; +}; + export async function createBlockEventsStream({ publicClient, fromBlock: initialFromBlock, @@ -25,7 +30,7 @@ export async function createBlockEventsStream({ address, events, maxBlockRange = 1000, -}: CreateBlockEventsStreamOptions): Promise> { +}: CreateBlockEventsStreamOptions): Promise> { debug("createBlockEventsStream", { initialFromBlock, initialToBlock, address, events, maxBlockRange }); if (initialFromBlock == null) { @@ -39,9 +44,12 @@ export async function createBlockEventsStream({ initialFromBlock = earliestBlock.number; } + let closeInitialToBlock: (() => void) | undefined; if (initialToBlock == null) { debug("creating latest block number stream"); - initialToBlock = await createBlockNumberStream({ publicClient, blockTag: "latest" }); + const blockNumber$ = await createBlockNumberStream({ publicClient, blockTag: "latest" }); + initialToBlock = blockNumber$.stream; + closeInitialToBlock = blockNumber$.close; } const stream = new Subject>(); @@ -105,14 +113,15 @@ export async function createBlockEventsStream({ return; } - debug("waiting for next block"); - const sub = initialToBlock.subscribe((blockNumber) => { - if (blockNumber > toBlock) { - sub.unsubscribe(); - fetchBlockRange(toBlock + 1n, maxBlockRange, blockNumber); - } - }); - return; + if (!initialToBlock.closed) { + const sub = initialToBlock.subscribe((blockNumber) => { + if (blockNumber > toBlock) { + sub.unsubscribe(); + fetchBlockRange(toBlock + 1n, maxBlockRange, blockNumber); + } + }); + return; + } } stream.complete(); @@ -122,5 +131,14 @@ export async function createBlockEventsStream({ } } - return stream.asObservable(); + return { + stream: stream.asObservable(), + close: (): void => { + stream.complete(); + if (initialToBlock instanceof BehaviorSubject) { + initialToBlock.complete(); + } + closeInitialToBlock?.(); + }, + }; } diff --git a/packages/block-events-stream/src/createBlockNumberStream.ts b/packages/block-events-stream/src/createBlockNumberStream.ts index 6fdb6ed1fb..6270e0afbb 100644 --- a/packages/block-events-stream/src/createBlockNumberStream.ts +++ b/packages/block-events-stream/src/createBlockNumberStream.ts @@ -18,13 +18,26 @@ export type CreateBlockNumberStreamOptions = block$: ReadonlyBehaviorSubject; }; +export type CreateBlockNumberStreamResult = { + stream: ReadonlyBehaviorSubject; + close: () => void; +}; + export async function createBlockNumberStream({ publicClient, blockTag, block$: initialBlock$, -}: CreateBlockNumberStreamOptions): Promise> { - const block$ = initialBlock$ ?? (await createBlockStream({ publicClient, blockTag: blockTag as BlockTag })); - const block = block$.value; +}: CreateBlockNumberStreamOptions): Promise { + const block$ = initialBlock$ + ? { + stream: initialBlock$, + close: (): void => { + // don't close the user-provided stream + }, + } + : await createBlockStream({ publicClient, blockTag: blockTag as BlockTag }); + + const block = block$.stream.value; if (!block.number) { // TODO: better error @@ -33,12 +46,19 @@ export async function createBlockNumberStream({ const blockNumber$ = new BehaviorSubject(block.number); - block$ + const { unsubscribe } = block$.stream .pipe( filter(isNonPendingBlock), map((block) => block.number) ) .subscribe(blockNumber$); - return blockNumber$; + return { + stream: blockNumber$, + close: (): void => { + unsubscribe(); + blockNumber$.complete(); + block$.close(); + }, + }; } diff --git a/packages/block-events-stream/src/createBlockStream.ts b/packages/block-events-stream/src/createBlockStream.ts index 837a782e61..dd6022424d 100644 --- a/packages/block-events-stream/src/createBlockStream.ts +++ b/packages/block-events-stream/src/createBlockStream.ts @@ -3,17 +3,21 @@ import type { Block, BlockTag, PublicClient } from "viem"; import { ReadonlyBehaviorSubject } from "./common"; // TODO: pass through viem's types, e.g. WatchBlocksParameters -> GetBlockReturnType -// TODO: make stream closeable to make use of unwatch? export type CreateBlockStreamOptions = { publicClient: PublicClient; blockTag: BlockTag; }; +export type CreateBlockStreamResult = { + stream: ReadonlyBehaviorSubject; + close: () => void; +}; + export function createBlockStream({ publicClient, blockTag, -}: CreateBlockStreamOptions): Promise> { +}: CreateBlockStreamOptions): Promise { return new Promise((resolve, reject) => { let stream: BehaviorSubject | undefined; // TODO: do something with unwatch? @@ -24,7 +28,10 @@ export function createBlockStream({ if (!stream) { stream = new BehaviorSubject(block); // TODO: return actual readonly behavior subject rather than just a type? - resolve(stream as ReadonlyBehaviorSubject); + resolve({ + stream: stream as ReadonlyBehaviorSubject, + close: () => unwatch(), + }); } else { stream.next(block); } From 9e1d9ea79b64c40e45ba3e0dff0fcbc34f3d3792 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Wed, 28 Jun 2023 10:46:39 +0100 Subject: [PATCH 07/30] clean up --- .../block-events-stream/src/createBlockEventsStream.ts | 10 ++++------ .../block-events-stream/src/createBlockNumberStream.ts | 4 +--- packages/block-events-stream/src/createBlockStream.ts | 3 --- packages/block-events-stream/src/getLogs.ts | 1 + packages/block-events-stream/src/utils.ts | 6 ++++++ 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/packages/block-events-stream/src/createBlockEventsStream.ts b/packages/block-events-stream/src/createBlockEventsStream.ts index 9340409575..793f47f54d 100644 --- a/packages/block-events-stream/src/createBlockEventsStream.ts +++ b/packages/block-events-stream/src/createBlockEventsStream.ts @@ -2,7 +2,7 @@ import { BehaviorSubject, Subject } from "rxjs"; import type { BlockNumber, Hex, PublicClient } from "viem"; import type { AbiEvent } from "abitype"; import { BlockEvents, BlockEventsStream, ReadonlyBehaviorSubject } from "./common"; -import { bigIntMin } from "./utils"; +import { bigIntMin, bigIntSort } from "./utils"; import { isNonPendingLog } from "./isNonPendingLog"; import { debug } from "./debug"; import { createBlockNumberStream } from "./createBlockNumberStream"; @@ -38,7 +38,7 @@ export async function createBlockEventsStream({ const earliestBlock = await publicClient.getBlock({ blockTag: "earliest" }); debug("earliest block", earliestBlock); if (earliestBlock.number == null) { - // TODO: better error + // This is an edge case that shouldn't happen unless you are ignoring types or something weird happens with viem/RPC. throw new Error(`pending or missing earliest block`); } initialFromBlock = earliestBlock.number; @@ -73,11 +73,9 @@ export async function createBlockEventsStream({ events, }); - // TODO: do something other than just throwing out pending logs const nonPendingLogs = logs.filter(isNonPendingLog); - if (logs.length !== nonPendingLogs.length) { - // TODO: better error + // This is an edge case that shouldn't happen unless you are ignoring types or something weird happens with viem/RPC. console.warn("pending logs discarded"); } @@ -85,7 +83,7 @@ export async function createBlockEventsStream({ // TODO: handle RPC rate limit errors (hopefully via client retry policy) const blockNumbers = Array.from(new Set(nonPendingLogs.map((log) => log.blockNumber))); - blockNumbers.sort((a, b) => (a < b ? -1 : a > b ? 1 : 0)); + blockNumbers.sort(bigIntSort); for (const blockNumber of blockNumbers) { const blockLogs = nonPendingLogs.filter((log) => log.blockNumber === blockNumber); diff --git a/packages/block-events-stream/src/createBlockNumberStream.ts b/packages/block-events-stream/src/createBlockNumberStream.ts index 6270e0afbb..e7a525c6d1 100644 --- a/packages/block-events-stream/src/createBlockNumberStream.ts +++ b/packages/block-events-stream/src/createBlockNumberStream.ts @@ -4,8 +4,6 @@ import { createBlockStream } from "./createBlockStream"; import { ReadonlyBehaviorSubject } from "./common"; import { isNonPendingBlock } from "./isNonPendingBlock"; -// TODO: pass through viem's types, e.g. WatchBlocksParameters -> GetBlockReturnType - export type CreateBlockNumberStreamOptions = | { publicClient: PublicClient; @@ -40,7 +38,7 @@ export async function createBlockNumberStream({ const block = block$.stream.value; if (!block.number) { - // TODO: better error + // This is an edge case that shouldn't happen unless you are ignoring types. throw new Error(`${blockTag} block missing or pending`); } diff --git a/packages/block-events-stream/src/createBlockStream.ts b/packages/block-events-stream/src/createBlockStream.ts index dd6022424d..77551ab571 100644 --- a/packages/block-events-stream/src/createBlockStream.ts +++ b/packages/block-events-stream/src/createBlockStream.ts @@ -2,8 +2,6 @@ import { BehaviorSubject } from "rxjs"; import type { Block, BlockTag, PublicClient } from "viem"; import { ReadonlyBehaviorSubject } from "./common"; -// TODO: pass through viem's types, e.g. WatchBlocksParameters -> GetBlockReturnType - export type CreateBlockStreamOptions = { publicClient: PublicClient; blockTag: BlockTag; @@ -20,7 +18,6 @@ export function createBlockStream({ }: CreateBlockStreamOptions): Promise { return new Promise((resolve, reject) => { let stream: BehaviorSubject | undefined; - // TODO: do something with unwatch? const unwatch = publicClient.watchBlocks({ blockTag, emitOnBegin: true, diff --git a/packages/block-events-stream/src/getLogs.ts b/packages/block-events-stream/src/getLogs.ts index 242b75471d..2b70b0941e 100644 --- a/packages/block-events-stream/src/getLogs.ts +++ b/packages/block-events-stream/src/getLogs.ts @@ -64,6 +64,7 @@ export async function getLogs({ return formatLog(log, { args, eventName }); } catch (err) { // We're using strict mode, so just skip if there is an error decoding. + // https://viem.sh/docs/actions/public/getLogs.html#strict-mode return; } }) diff --git a/packages/block-events-stream/src/utils.ts b/packages/block-events-stream/src/utils.ts index 33b0f506bc..c396d67e39 100644 --- a/packages/block-events-stream/src/utils.ts +++ b/packages/block-events-stream/src/utils.ts @@ -1,3 +1,5 @@ +// javascript, y u no support bigints better? + export function bigIntMin(...args: bigint[]): bigint { return args.reduce((m, e) => (e < m ? e : m)); } @@ -5,3 +7,7 @@ export function bigIntMin(...args: bigint[]): bigint { export function bigIntMax(...args: bigint[]): bigint { return args.reduce((m, e) => (e > m ? e : m)); } + +export function bigIntSort(a: bigint, b: bigint): -1 | 0 | 1 { + return a < b ? -1 : a > b ? 1 : 0; +} From 3120c2f56d1aa584ab75160a39c51b7e2498954b Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Wed, 28 Jun 2023 10:47:24 +0100 Subject: [PATCH 08/30] add log back in --- packages/block-events-stream/src/createBlockEventsStream.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/block-events-stream/src/createBlockEventsStream.ts b/packages/block-events-stream/src/createBlockEventsStream.ts index 793f47f54d..0b950fbef9 100644 --- a/packages/block-events-stream/src/createBlockEventsStream.ts +++ b/packages/block-events-stream/src/createBlockEventsStream.ts @@ -112,6 +112,7 @@ export async function createBlockEventsStream({ } if (!initialToBlock.closed) { + debug("waiting for next block"); const sub = initialToBlock.subscribe((blockNumber) => { if (blockNumber > toBlock) { sub.unsubscribe(); From e9fcb0f748b73af7fbf6e2288f33ad1b28c5c433 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Wed, 28 Jun 2023 11:06:36 +0100 Subject: [PATCH 09/30] move comments --- packages/block-events-stream/src/createBlockEventsStream.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/block-events-stream/src/createBlockEventsStream.ts b/packages/block-events-stream/src/createBlockEventsStream.ts index 0b950fbef9..46e2d343a1 100644 --- a/packages/block-events-stream/src/createBlockEventsStream.ts +++ b/packages/block-events-stream/src/createBlockEventsStream.ts @@ -64,6 +64,8 @@ export async function createBlockEventsStream({ const toBlock = bigIntMin(fromBlock + BigInt(maxBlockRange), lastBlockNumber); debug("fetching block range", { fromBlock, toBlock }); + // TODO: handle RPC block range errors + // TODO: handle RPC rate limit errors (hopefully via viem client retry policy) // TODO: swap this with viem `getLogs` call when viem supports multiple events: https://github.com/wagmi-dev/viem/pull/633 const logs = await getLogs({ publicClient, @@ -79,9 +81,6 @@ export async function createBlockEventsStream({ console.warn("pending logs discarded"); } - // TODO: handle RPC block range errors - // TODO: handle RPC rate limit errors (hopefully via client retry policy) - const blockNumbers = Array.from(new Set(nonPendingLogs.map((log) => log.blockNumber))); blockNumbers.sort(bigIntSort); From db64640a7290c13d446e3a2ccb3ff2243ff0de59 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 30 Jun 2023 10:03:02 +0100 Subject: [PATCH 10/30] refactor with just streams --- packages/block-events-stream/src/common.ts | 21 --- .../src/createBlockEventsStream.ts | 142 ------------------ .../src/createBlockNumberStream.ts | 62 -------- .../src/createBlockStream.ts | 36 +---- packages/block-events-stream/src/fetchLogs.ts | 63 ++++++++ packages/block-events-stream/src/getLogs.ts | 8 +- .../src/groupLogsByBlockNumber.ts | 36 +++++ packages/block-events-stream/src/index.ts | 8 +- .../src/latestBlockNumberToLogs.ts | 43 ++++++ packages/block-events-stream/src/utils.ts | 4 + 10 files changed, 162 insertions(+), 261 deletions(-) delete mode 100644 packages/block-events-stream/src/common.ts delete mode 100644 packages/block-events-stream/src/createBlockEventsStream.ts delete mode 100644 packages/block-events-stream/src/createBlockNumberStream.ts create mode 100644 packages/block-events-stream/src/fetchLogs.ts create mode 100644 packages/block-events-stream/src/groupLogsByBlockNumber.ts create mode 100644 packages/block-events-stream/src/latestBlockNumberToLogs.ts diff --git a/packages/block-events-stream/src/common.ts b/packages/block-events-stream/src/common.ts deleted file mode 100644 index 2933aba815..0000000000 --- a/packages/block-events-stream/src/common.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { BehaviorSubject, Observable } from "rxjs"; -import type { BlockNumber, Hex } from "viem"; -import type { AbiEvent } from "abitype"; -import { NonPendingLog } from "./isNonPendingLog"; -import { GetLogsReturnType } from "./getLogs"; - -export type ReadonlyBehaviorSubject = Pick, "subscribe" | "pipe" | "value" | "getValue">; - -export type BlockEvents = { - blockNumber: BlockNumber; - blockHash: Hex; - events: NonPendingLog>[]; -}; - -export type BlockEventsStream = Observable>; - -export type BlockEventsFromStream> = TStream extends BlockEventsStream< - infer TAbiEvent -> - ? BlockEvents - : never; diff --git a/packages/block-events-stream/src/createBlockEventsStream.ts b/packages/block-events-stream/src/createBlockEventsStream.ts deleted file mode 100644 index 46e2d343a1..0000000000 --- a/packages/block-events-stream/src/createBlockEventsStream.ts +++ /dev/null @@ -1,142 +0,0 @@ -import { BehaviorSubject, Subject } from "rxjs"; -import type { BlockNumber, Hex, PublicClient } from "viem"; -import type { AbiEvent } from "abitype"; -import { BlockEvents, BlockEventsStream, ReadonlyBehaviorSubject } from "./common"; -import { bigIntMin, bigIntSort } from "./utils"; -import { isNonPendingLog } from "./isNonPendingLog"; -import { debug } from "./debug"; -import { createBlockNumberStream } from "./createBlockNumberStream"; -import { getLogs } from "./getLogs"; - -export type CreateBlockEventsStreamOptions = { - publicClient: PublicClient; - fromBlock?: BlockNumber; - toBlock?: BlockNumber | ReadonlyBehaviorSubject; - address?: Hex; - events: readonly TAbiEvent[]; - /** Defaults to 1000 */ - maxBlockRange?: number; -}; - -export type CreateBlockEventsStreamResult = { - stream: BlockEventsStream; - close: () => void; -}; - -export async function createBlockEventsStream({ - publicClient, - fromBlock: initialFromBlock, - toBlock: initialToBlock, - address, - events, - maxBlockRange = 1000, -}: CreateBlockEventsStreamOptions): Promise> { - debug("createBlockEventsStream", { initialFromBlock, initialToBlock, address, events, maxBlockRange }); - - if (initialFromBlock == null) { - debug("getting earliest block"); - const earliestBlock = await publicClient.getBlock({ blockTag: "earliest" }); - debug("earliest block", earliestBlock); - if (earliestBlock.number == null) { - // This is an edge case that shouldn't happen unless you are ignoring types or something weird happens with viem/RPC. - throw new Error(`pending or missing earliest block`); - } - initialFromBlock = earliestBlock.number; - } - - let closeInitialToBlock: (() => void) | undefined; - if (initialToBlock == null) { - debug("creating latest block number stream"); - const blockNumber$ = await createBlockNumberStream({ publicClient, blockTag: "latest" }); - initialToBlock = blockNumber$.stream; - closeInitialToBlock = blockNumber$.close; - } - - const stream = new Subject>(); - fetchBlockRange( - initialFromBlock, - maxBlockRange, - initialToBlock instanceof BehaviorSubject ? initialToBlock.value : initialToBlock - ); - - async function fetchBlockRange(fromBlock: bigint, maxBlockRange: number, lastBlockNumber: bigint): Promise { - try { - const toBlock = bigIntMin(fromBlock + BigInt(maxBlockRange), lastBlockNumber); - debug("fetching block range", { fromBlock, toBlock }); - - // TODO: handle RPC block range errors - // TODO: handle RPC rate limit errors (hopefully via viem client retry policy) - // TODO: swap this with viem `getLogs` call when viem supports multiple events: https://github.com/wagmi-dev/viem/pull/633 - const logs = await getLogs({ - publicClient, - address, - fromBlock, - toBlock, - events, - }); - - const nonPendingLogs = logs.filter(isNonPendingLog); - if (logs.length !== nonPendingLogs.length) { - // This is an edge case that shouldn't happen unless you are ignoring types or something weird happens with viem/RPC. - console.warn("pending logs discarded"); - } - - const blockNumbers = Array.from(new Set(nonPendingLogs.map((log) => log.blockNumber))); - blockNumbers.sort(bigIntSort); - - for (const blockNumber of blockNumbers) { - const blockLogs = nonPendingLogs.filter((log) => log.blockNumber === blockNumber); - blockLogs.sort((a, b) => (a.logIndex < b.logIndex ? -1 : a.logIndex > b.logIndex ? 1 : 0)); - - if (blockLogs.length) { - debug("emitting events for block", { blockNumber, blockHash: blockLogs[0].blockHash, events: blockLogs }); - stream.next({ - blockNumber, - blockHash: blockLogs[0].blockHash, - events: blockLogs, - // TODO: figure out why we need to cast this - } as any as BlockEvents); - } - } - - if (toBlock < lastBlockNumber) { - fetchBlockRange(toBlock + 1n, maxBlockRange, lastBlockNumber); - return; - } - - if (initialToBlock instanceof BehaviorSubject) { - if (initialToBlock.value > toBlock) { - fetchBlockRange(toBlock + 1n, maxBlockRange, initialToBlock.value); - return; - } - - if (!initialToBlock.closed) { - debug("waiting for next block"); - const sub = initialToBlock.subscribe((blockNumber) => { - if (blockNumber > toBlock) { - sub.unsubscribe(); - fetchBlockRange(toBlock + 1n, maxBlockRange, blockNumber); - } - }); - return; - } - } - - stream.complete(); - } catch (error: unknown) { - // TODO: do more specific error handling? - stream.error(error); - } - } - - return { - stream: stream.asObservable(), - close: (): void => { - stream.complete(); - if (initialToBlock instanceof BehaviorSubject) { - initialToBlock.complete(); - } - closeInitialToBlock?.(); - }, - }; -} diff --git a/packages/block-events-stream/src/createBlockNumberStream.ts b/packages/block-events-stream/src/createBlockNumberStream.ts deleted file mode 100644 index e7a525c6d1..0000000000 --- a/packages/block-events-stream/src/createBlockNumberStream.ts +++ /dev/null @@ -1,62 +0,0 @@ -import { BehaviorSubject, filter, map } from "rxjs"; -import type { Block, BlockNumber, BlockTag, PublicClient } from "viem"; -import { createBlockStream } from "./createBlockStream"; -import { ReadonlyBehaviorSubject } from "./common"; -import { isNonPendingBlock } from "./isNonPendingBlock"; - -export type CreateBlockNumberStreamOptions = - | { - publicClient: PublicClient; - blockTag: Omit; - block$?: never; - } - | { - publicClient?: never; - blockTag?: never; - block$: ReadonlyBehaviorSubject; - }; - -export type CreateBlockNumberStreamResult = { - stream: ReadonlyBehaviorSubject; - close: () => void; -}; - -export async function createBlockNumberStream({ - publicClient, - blockTag, - block$: initialBlock$, -}: CreateBlockNumberStreamOptions): Promise { - const block$ = initialBlock$ - ? { - stream: initialBlock$, - close: (): void => { - // don't close the user-provided stream - }, - } - : await createBlockStream({ publicClient, blockTag: blockTag as BlockTag }); - - const block = block$.stream.value; - - if (!block.number) { - // This is an edge case that shouldn't happen unless you are ignoring types. - throw new Error(`${blockTag} block missing or pending`); - } - - const blockNumber$ = new BehaviorSubject(block.number); - - const { unsubscribe } = block$.stream - .pipe( - filter(isNonPendingBlock), - map((block) => block.number) - ) - .subscribe(blockNumber$); - - return { - stream: blockNumber$, - close: (): void => { - unsubscribe(); - blockNumber$.complete(); - block$.close(); - }, - }; -} diff --git a/packages/block-events-stream/src/createBlockStream.ts b/packages/block-events-stream/src/createBlockStream.ts index 77551ab571..0603aa08e5 100644 --- a/packages/block-events-stream/src/createBlockStream.ts +++ b/packages/block-events-stream/src/createBlockStream.ts @@ -1,42 +1,20 @@ -import { BehaviorSubject } from "rxjs"; +import { Observable } from "rxjs"; import type { Block, BlockTag, PublicClient } from "viem"; -import { ReadonlyBehaviorSubject } from "./common"; export type CreateBlockStreamOptions = { publicClient: PublicClient; blockTag: BlockTag; }; -export type CreateBlockStreamResult = { - stream: ReadonlyBehaviorSubject; - close: () => void; -}; +export type CreateBlockStreamResult = Observable; -export function createBlockStream({ - publicClient, - blockTag, -}: CreateBlockStreamOptions): Promise { - return new Promise((resolve, reject) => { - let stream: BehaviorSubject | undefined; - const unwatch = publicClient.watchBlocks({ +export function createBlockStream({ publicClient, blockTag }: CreateBlockStreamOptions): CreateBlockStreamResult { + return new Observable(function subscribe(subscriber) { + return publicClient.watchBlocks({ blockTag, emitOnBegin: true, - onBlock: (block) => { - if (!stream) { - stream = new BehaviorSubject(block); - // TODO: return actual readonly behavior subject rather than just a type? - resolve({ - stream: stream as ReadonlyBehaviorSubject, - close: () => unwatch(), - }); - } else { - stream.next(block); - } - }, - onError: (error) => { - reject(error); - stream?.error(error); - }, + onBlock: subscriber.next, + onError: subscriber.error, }); }); } diff --git a/packages/block-events-stream/src/fetchLogs.ts b/packages/block-events-stream/src/fetchLogs.ts new file mode 100644 index 0000000000..8c443eeb00 --- /dev/null +++ b/packages/block-events-stream/src/fetchLogs.ts @@ -0,0 +1,63 @@ +import { AbiEvent, Address } from "abitype"; +import { GetLogsResult, getLogs } from "./getLogs"; +import { bigIntMin, wait } from "./utils"; +import { PublicClient, BlockNumber, LimitExceededRpcError } from "viem"; +import { debug } from "./debug"; + +export type FetchLogsOptions = { + publicClient: PublicClient; + address?: Address | Address[]; + events: TAbiEvents; + fromBlock: BlockNumber; + toBlock: BlockNumber; + maxBlockRange?: bigint; + retryCount?: number; +}; + +export type FetchLogsResult = { + fromBlock: BlockNumber; + toBlock: BlockNumber; + logs: GetLogsResult; +}; + +export async function fetchLogs({ + maxBlockRange = 1000n, + retryCount = 0, + ...getLogsOpts +}: FetchLogsOptions): Promise> { + try { + const fromBlock = getLogsOpts.fromBlock; + const blockRange = bigIntMin(maxBlockRange, getLogsOpts.toBlock - fromBlock); + const toBlock = fromBlock + blockRange; + + const logs = await getLogs({ ...getLogsOpts, fromBlock, toBlock }); + return { fromBlock, toBlock, logs }; + } catch (error: unknown) { + if (!(error instanceof Error)) throw error; + + if (error.message === "rate limit" && retryCount < 10) { + const seconds = 2 * retryCount; + debug(`too many requests, retrying in ${seconds}s`, error); + await wait(1000 * seconds); + return await fetchLogs({ ...getLogsOpts, maxBlockRange, retryCount: retryCount + 1 }); + } + + // TODO: replace this with a real error + if (error.message === "block range exceeded") { + const blockRange = getLogsOpts.toBlock - getLogsOpts.fromBlock; + const newBlockRange = blockRange / 2n; + if (newBlockRange <= 0n) { + throw new Error("can't reduce block range any further"); + } + debug("block range exceeded, trying a smaller block range", error); + return await fetchLogs({ + ...getLogsOpts, + toBlock: getLogsOpts.fromBlock + newBlockRange, + maxBlockRange, + retryCount, + }); + } + + throw error; + } +} diff --git a/packages/block-events-stream/src/getLogs.ts b/packages/block-events-stream/src/getLogs.ts index 2b70b0941e..0d3a4cc11f 100644 --- a/packages/block-events-stream/src/getLogs.ts +++ b/packages/block-events-stream/src/getLogs.ts @@ -23,13 +23,13 @@ export type GetLogsOptions = { toBlock: BlockNumber | BlockTag; }; -export type GetLogsReturnType = Log< +export type GetLogsResult = Log< bigint, number, TAbiEvents[number], true, TAbiEvents ->; +>[]; export async function getLogs({ publicClient, @@ -37,7 +37,7 @@ export async function getLogs({ events, fromBlock, toBlock, -}: GetLogsOptions): Promise[]> { +}: GetLogsOptions): Promise> { const topics = [events.flatMap((event) => encodeEventTopics({ abi: [event], eventName: event.name }))]; const logs = await publicClient.request({ @@ -68,5 +68,5 @@ export async function getLogs({ return; } }) - .filter(isDefined) as GetLogsReturnType[]; + .filter(isDefined) as GetLogsResult; } diff --git a/packages/block-events-stream/src/groupLogsByBlockNumber.ts b/packages/block-events-stream/src/groupLogsByBlockNumber.ts new file mode 100644 index 0000000000..ac94f25f40 --- /dev/null +++ b/packages/block-events-stream/src/groupLogsByBlockNumber.ts @@ -0,0 +1,36 @@ +import { BlockNumber, Hex, Log } from "viem"; +import { NonPendingLog, isNonPendingLog } from "./isNonPendingLog"; +import { bigIntSort } from "./utils"; +import { isDefined } from "@latticexyz/common/utils"; + +export function groupLogsByBlockNumber( + logs: TLog[] +): { blockNumber: BlockNumber; blockHash: Hex; events: NonPendingLog[] }[] { + // Pending logs don't have block numbers, so filter them out. + const nonPendingLogs = logs.filter(isNonPendingLog); + if (logs.length !== nonPendingLogs.length) { + console.warn( + "pending logs discarded", + logs.filter((log) => !isNonPendingLog(log)) + ); + } + + const blockNumbers = Array.from(new Set(nonPendingLogs.map((log) => log.blockNumber))); + blockNumbers.sort(bigIntSort); + + return blockNumbers + .map((blockNumber) => { + const blockLogs = nonPendingLogs.filter((log) => log.blockNumber === blockNumber); + if (!blockLogs.length) return; + blockLogs.sort((a, b) => (a.logIndex < b.logIndex ? -1 : a.logIndex > b.logIndex ? 1 : 0)); + + if (!blockLogs.length) return; + + return { + blockNumber, + blockHash: blockLogs[0].blockHash, + events: blockLogs, + }; + }) + .filter(isDefined); +} diff --git a/packages/block-events-stream/src/index.ts b/packages/block-events-stream/src/index.ts index c767c01088..468f28bf0d 100644 --- a/packages/block-events-stream/src/index.ts +++ b/packages/block-events-stream/src/index.ts @@ -1,4 +1,6 @@ -export * from "./common"; -export * from "./createBlockEventsStream"; -export * from "./createBlockNumberStream"; export * from "./createBlockStream"; +export * from "./fetchLogs"; +export * from "./groupLogsByBlockNumber"; +export * from "./isNonPendingBlock"; +export * from "./isNonPendingLog"; +export * from "./latestBlockNumberToLogs"; diff --git a/packages/block-events-stream/src/latestBlockNumberToLogs.ts b/packages/block-events-stream/src/latestBlockNumberToLogs.ts new file mode 100644 index 0000000000..c4f037ed83 --- /dev/null +++ b/packages/block-events-stream/src/latestBlockNumberToLogs.ts @@ -0,0 +1,43 @@ +import { OperatorFunction, exhaustMap, from, tap } from "rxjs"; +import { FetchLogsResult, fetchLogs } from "./fetchLogs"; +import { AbiEvent, Address } from "abitype"; +import { BlockNumber, PublicClient } from "viem"; + +export type BlockRangeToLogsOptions = { + publicClient: PublicClient; + address?: Address | Address[]; + events: TAbiEvents; + fromBlock: BlockNumber; + maxBlockRange?: bigint; +}; + +export type BlockRangeToLogsResult = OperatorFunction< + BlockNumber, + FetchLogsResult +>; + +export function blockRangeToLogs({ + publicClient, + address, + events, + fromBlock: initialFromBlock, + maxBlockRange, +}: BlockRangeToLogsOptions): BlockRangeToLogsResult { + let fromBlock = initialFromBlock; + return exhaustMap((latestBlockNumber: bigint) => + from( + fetchLogs({ + publicClient, + address, + events, + fromBlock, + toBlock: latestBlockNumber, + maxBlockRange, + }) + ).pipe( + tap((result) => { + fromBlock = result.toBlock + 1n; + }) + ) + ); +} diff --git a/packages/block-events-stream/src/utils.ts b/packages/block-events-stream/src/utils.ts index c396d67e39..b365dd9dd1 100644 --- a/packages/block-events-stream/src/utils.ts +++ b/packages/block-events-stream/src/utils.ts @@ -11,3 +11,7 @@ export function bigIntMax(...args: bigint[]): bigint { export function bigIntSort(a: bigint, b: bigint): -1 | 0 | 1 { return a < b ? -1 : a > b ? 1 : 0; } + +export function wait(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} From 63f6cc02b275c73fdebf6451337e756267fc4476 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 30 Jun 2023 10:05:51 +0100 Subject: [PATCH 11/30] add README with example --- packages/block-events-stream/README.md | 29 ++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 packages/block-events-stream/README.md diff --git a/packages/block-events-stream/README.md b/packages/block-events-stream/README.md new file mode 100644 index 0000000000..aecbca0802 --- /dev/null +++ b/packages/block-events-stream/README.md @@ -0,0 +1,29 @@ +# Block events stream + +## Example + +```ts +import { + createBlockStream, + isNonPendingBlock, + blockRangeToLogs, + groupLogsByBlockNumber, +} from "@latticexyz/block-events-stream"; + +const latestBlock$ = await createBlockStream({ publicClient, blockTag: "latest" }); + +const latestBlockNumber$ = latestBlock$.pipe( + filter(isNonPendingBlock), + map((block) => block.number) +); + +latestBlockNumber$ + .pipe( + blockRangeToLogs(0n), + map(({ logs }) => from(groupLogsByBlockNumber(logs))), + mergeAll() + ) + .subscribe((block) => { + console.log("got events for block", block); + }); +``` From cf6b1018852bcd97bf09472af636442b1b2c05bb Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 30 Jun 2023 14:17:11 +0100 Subject: [PATCH 12/30] renamed --- packages/block-events-stream/README.md | 9 +++++++-- .../block-events-stream/src/latestBlockNumberToLogs.ts | 8 ++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/packages/block-events-stream/README.md b/packages/block-events-stream/README.md index aecbca0802..1ea36072dc 100644 --- a/packages/block-events-stream/README.md +++ b/packages/block-events-stream/README.md @@ -6,8 +6,8 @@ import { createBlockStream, isNonPendingBlock, - blockRangeToLogs, groupLogsByBlockNumber, + latestBlockNumberToLogs, } from "@latticexyz/block-events-stream"; const latestBlock$ = await createBlockStream({ publicClient, blockTag: "latest" }); @@ -19,7 +19,12 @@ const latestBlockNumber$ = latestBlock$.pipe( latestBlockNumber$ .pipe( - blockRangeToLogs(0n), + latestBlockNumberToLogs({ + publicClient, + address, + events, + fromBlock: 0n, + }), map(({ logs }) => from(groupLogsByBlockNumber(logs))), mergeAll() ) diff --git a/packages/block-events-stream/src/latestBlockNumberToLogs.ts b/packages/block-events-stream/src/latestBlockNumberToLogs.ts index c4f037ed83..d975ec1e44 100644 --- a/packages/block-events-stream/src/latestBlockNumberToLogs.ts +++ b/packages/block-events-stream/src/latestBlockNumberToLogs.ts @@ -3,7 +3,7 @@ import { FetchLogsResult, fetchLogs } from "./fetchLogs"; import { AbiEvent, Address } from "abitype"; import { BlockNumber, PublicClient } from "viem"; -export type BlockRangeToLogsOptions = { +export type LatestBlockNumberToLogsOptions = { publicClient: PublicClient; address?: Address | Address[]; events: TAbiEvents; @@ -11,18 +11,18 @@ export type BlockRangeToLogsOptions = { maxBlockRange?: bigint; }; -export type BlockRangeToLogsResult = OperatorFunction< +export type LatestBlockNumberToLogsResult = OperatorFunction< BlockNumber, FetchLogsResult >; -export function blockRangeToLogs({ +export function latestBlockNumberToLogs({ publicClient, address, events, fromBlock: initialFromBlock, maxBlockRange, -}: BlockRangeToLogsOptions): BlockRangeToLogsResult { +}: LatestBlockNumberToLogsOptions): LatestBlockNumberToLogsResult { let fromBlock = initialFromBlock; return exhaustMap((latestBlockNumber: bigint) => from( From 2a5ba9887d0fbfd8d8941b1f0ba13aa7b544e33f Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 30 Jun 2023 15:44:46 +0100 Subject: [PATCH 13/30] rename again and take in a tuple as input --- packages/block-events-stream/README.md | 6 ++--- ...ockNumberToLogs.ts => blockRangeToLogs.ts} | 24 +++++++++---------- packages/block-events-stream/src/index.ts | 2 +- 3 files changed, 15 insertions(+), 17 deletions(-) rename packages/block-events-stream/src/{latestBlockNumberToLogs.ts => blockRangeToLogs.ts} (52%) diff --git a/packages/block-events-stream/README.md b/packages/block-events-stream/README.md index 1ea36072dc..ac8404ef07 100644 --- a/packages/block-events-stream/README.md +++ b/packages/block-events-stream/README.md @@ -7,7 +7,7 @@ import { createBlockStream, isNonPendingBlock, groupLogsByBlockNumber, - latestBlockNumberToLogs, + blockRangeToLogs, } from "@latticexyz/block-events-stream"; const latestBlock$ = await createBlockStream({ publicClient, blockTag: "latest" }); @@ -19,11 +19,11 @@ const latestBlockNumber$ = latestBlock$.pipe( latestBlockNumber$ .pipe( - latestBlockNumberToLogs({ + map((latestBlockNumber) => [0n, latestBlockNumber]) + blockRangeToLogs({ publicClient, address, events, - fromBlock: 0n, }), map(({ logs }) => from(groupLogsByBlockNumber(logs))), mergeAll() diff --git a/packages/block-events-stream/src/latestBlockNumberToLogs.ts b/packages/block-events-stream/src/blockRangeToLogs.ts similarity index 52% rename from packages/block-events-stream/src/latestBlockNumberToLogs.ts rename to packages/block-events-stream/src/blockRangeToLogs.ts index d975ec1e44..c2f7714bfb 100644 --- a/packages/block-events-stream/src/latestBlockNumberToLogs.ts +++ b/packages/block-events-stream/src/blockRangeToLogs.ts @@ -3,41 +3,39 @@ import { FetchLogsResult, fetchLogs } from "./fetchLogs"; import { AbiEvent, Address } from "abitype"; import { BlockNumber, PublicClient } from "viem"; -export type LatestBlockNumberToLogsOptions = { +export type BlockRangeToLogsOptions = { publicClient: PublicClient; address?: Address | Address[]; events: TAbiEvents; - fromBlock: BlockNumber; maxBlockRange?: bigint; }; -export type LatestBlockNumberToLogsResult = OperatorFunction< - BlockNumber, +export type BlockRangeToLogsResult = OperatorFunction< + [BlockNumber, BlockNumber], FetchLogsResult >; -export function latestBlockNumberToLogs({ +export function blockRangeToLogs({ publicClient, address, events, - fromBlock: initialFromBlock, maxBlockRange, -}: LatestBlockNumberToLogsOptions): LatestBlockNumberToLogsResult { - let fromBlock = initialFromBlock; - return exhaustMap((latestBlockNumber: bigint) => - from( +}: BlockRangeToLogsOptions): BlockRangeToLogsResult { + return exhaustMap(([startBlock, endBlock]) => { + let fromBlock = startBlock; + return from( fetchLogs({ publicClient, address, events, fromBlock, - toBlock: latestBlockNumber, + toBlock: endBlock, maxBlockRange, }) ).pipe( tap((result) => { fromBlock = result.toBlock + 1n; }) - ) - ); + ); + }); } diff --git a/packages/block-events-stream/src/index.ts b/packages/block-events-stream/src/index.ts index 468f28bf0d..f7ee62ef5b 100644 --- a/packages/block-events-stream/src/index.ts +++ b/packages/block-events-stream/src/index.ts @@ -1,6 +1,6 @@ +export * from "./blockRangeToLogs"; export * from "./createBlockStream"; export * from "./fetchLogs"; export * from "./groupLogsByBlockNumber"; export * from "./isNonPendingBlock"; export * from "./isNonPendingLog"; -export * from "./latestBlockNumberToLogs"; From 58d454172122bfbf0cd8c50551325de715b26f17 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 30 Jun 2023 15:52:14 +0100 Subject: [PATCH 14/30] fix scope --- packages/block-events-stream/src/blockRangeToLogs.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/block-events-stream/src/blockRangeToLogs.ts b/packages/block-events-stream/src/blockRangeToLogs.ts index c2f7714bfb..12cb3e71cb 100644 --- a/packages/block-events-stream/src/blockRangeToLogs.ts +++ b/packages/block-events-stream/src/blockRangeToLogs.ts @@ -21,8 +21,9 @@ export function blockRangeToLogs({ events, maxBlockRange, }: BlockRangeToLogsOptions): BlockRangeToLogsResult { + let fromBlock: bigint | null = null; return exhaustMap(([startBlock, endBlock]) => { - let fromBlock = startBlock; + fromBlock ??= startBlock; return from( fetchLogs({ publicClient, From 6a8d908bf292abddfd1de0c1abe41de19faf98c4 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 30 Jun 2023 16:02:31 +0100 Subject: [PATCH 15/30] add TODO --- packages/block-events-stream/src/fetchLogs.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/block-events-stream/src/fetchLogs.ts b/packages/block-events-stream/src/fetchLogs.ts index 8c443eeb00..7719a6e4c2 100644 --- a/packages/block-events-stream/src/fetchLogs.ts +++ b/packages/block-events-stream/src/fetchLogs.ts @@ -1,7 +1,7 @@ import { AbiEvent, Address } from "abitype"; import { GetLogsResult, getLogs } from "./getLogs"; import { bigIntMin, wait } from "./utils"; -import { PublicClient, BlockNumber, LimitExceededRpcError } from "viem"; +import { PublicClient, BlockNumber } from "viem"; import { debug } from "./debug"; export type FetchLogsOptions = { @@ -35,6 +35,7 @@ export async function fetchLogs({ } catch (error: unknown) { if (!(error instanceof Error)) throw error; + // TODO: replace this with a real error if (error.message === "rate limit" && retryCount < 10) { const seconds = 2 * retryCount; debug(`too many requests, retrying in ${seconds}s`, error); From e2b8650c1bff6470d91f2a6ce4e12549a79c0165 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 30 Jun 2023 16:13:02 +0100 Subject: [PATCH 16/30] add tests for grouping logs --- .../src/groupLogsByBlockNumber.test.ts | 110 ++++++++++++++++++ .../src/groupLogsByBlockNumber.ts | 4 +- 2 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 packages/block-events-stream/src/groupLogsByBlockNumber.test.ts diff --git a/packages/block-events-stream/src/groupLogsByBlockNumber.test.ts b/packages/block-events-stream/src/groupLogsByBlockNumber.test.ts new file mode 100644 index 0000000000..ceee06e5be --- /dev/null +++ b/packages/block-events-stream/src/groupLogsByBlockNumber.test.ts @@ -0,0 +1,110 @@ +import { describe, it, expect } from "vitest"; +import { groupLogsByBlockNumber } from "./groupLogsByBlockNumber"; +import { Log } from "viem"; + +describe("groupLogsByBlockNumber", () => { + it("groups logs by block number and correctly sorts them", () => { + const logs = [ + { + blockNumber: 1n, + blockHash: "0x", + logIndex: 4, + transactionHash: "0x", + transactionIndex: 0, + }, + { + blockNumber: 5n, + blockHash: "0x", + logIndex: 0, + transactionHash: "0x", + transactionIndex: 0, + }, + { + blockNumber: 1n, + blockHash: "0x", + logIndex: 0, + transactionHash: "0x", + transactionIndex: 0, + }, + { + blockNumber: 1n, + blockHash: "0x", + logIndex: 2, + transactionHash: "0x", + transactionIndex: 0, + }, + { + blockNumber: null, + blockHash: null, + logIndex: null, + transactionHash: null, + transactionIndex: null, + }, + { + blockNumber: 3n, + blockHash: "0x", + logIndex: 3, + transactionHash: "0x", + transactionIndex: 0, + }, + ] as any as Log[]; + + expect(groupLogsByBlockNumber(logs)).toMatchInlineSnapshot(` + [ + { + "blockHash": "0x", + "blockNumber": 1n, + "events": [ + { + "blockHash": "0x", + "blockNumber": 1n, + "logIndex": 0, + "transactionHash": "0x", + "transactionIndex": 0, + }, + { + "blockHash": "0x", + "blockNumber": 1n, + "logIndex": 2, + "transactionHash": "0x", + "transactionIndex": 0, + }, + { + "blockHash": "0x", + "blockNumber": 1n, + "logIndex": 4, + "transactionHash": "0x", + "transactionIndex": 0, + }, + ], + }, + { + "blockHash": "0x", + "blockNumber": 3n, + "events": [ + { + "blockHash": "0x", + "blockNumber": 3n, + "logIndex": 3, + "transactionHash": "0x", + "transactionIndex": 0, + }, + ], + }, + { + "blockHash": "0x", + "blockNumber": 5n, + "events": [ + { + "blockHash": "0x", + "blockNumber": 5n, + "logIndex": 0, + "transactionHash": "0x", + "transactionIndex": 0, + }, + ], + }, + ] + `); + }); +}); diff --git a/packages/block-events-stream/src/groupLogsByBlockNumber.ts b/packages/block-events-stream/src/groupLogsByBlockNumber.ts index ac94f25f40..4784214e1c 100644 --- a/packages/block-events-stream/src/groupLogsByBlockNumber.ts +++ b/packages/block-events-stream/src/groupLogsByBlockNumber.ts @@ -4,8 +4,8 @@ import { bigIntSort } from "./utils"; import { isDefined } from "@latticexyz/common/utils"; export function groupLogsByBlockNumber( - logs: TLog[] -): { blockNumber: BlockNumber; blockHash: Hex; events: NonPendingLog[] }[] { + logs: readonly TLog[] +): { blockNumber: BlockNumber; blockHash: Hex; events: readonly NonPendingLog[] }[] { // Pending logs don't have block numbers, so filter them out. const nonPendingLogs = logs.filter(isNonPendingLog); if (logs.length !== nonPendingLogs.length) { From 3a4c02f5246e11e533dba4aaa17cc6128a69ae1b Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 3 Jul 2023 10:42:14 +0100 Subject: [PATCH 17/30] wip rxjs tests --- packages/block-events-stream/README.md | 2 +- .../src/blockRangeToLogs.test.ts | 58 +++++++++++++++++++ .../src/blockRangeToLogs.ts | 4 +- .../src/groupLogsByBlockNumber.ts | 3 +- 4 files changed, 63 insertions(+), 4 deletions(-) create mode 100644 packages/block-events-stream/src/blockRangeToLogs.test.ts diff --git a/packages/block-events-stream/README.md b/packages/block-events-stream/README.md index ac8404ef07..5e4b2fb63f 100644 --- a/packages/block-events-stream/README.md +++ b/packages/block-events-stream/README.md @@ -19,7 +19,7 @@ const latestBlockNumber$ = latestBlock$.pipe( latestBlockNumber$ .pipe( - map((latestBlockNumber) => [0n, latestBlockNumber]) + map((latestBlockNumber) => [0n, latestBlockNumber]), blockRangeToLogs({ publicClient, address, diff --git a/packages/block-events-stream/src/blockRangeToLogs.test.ts b/packages/block-events-stream/src/blockRangeToLogs.test.ts new file mode 100644 index 0000000000..0dc13c3ec3 --- /dev/null +++ b/packages/block-events-stream/src/blockRangeToLogs.test.ts @@ -0,0 +1,58 @@ +import { describe, it, expect, vi } from "vitest"; +import { blockRangeToLogs } from "./blockRangeToLogs"; +import { Subject, firstValueFrom, map } from "rxjs"; +import { Transport, createPublicClient, createTransport } from "viem"; +import * as fetchLogsExports from "./fetchLogs"; +import { bigIntMin } from "./utils"; + +const mockTransport: Transport = () => + createTransport({ + key: "mock", + name: "Mock Transport", + request: vi.fn(() => null) as any, + type: "mock", + }); + +describe("blockRangeToLogs", () => { + it("processes block ranges in order", async () => { + const publicClient = createPublicClient({ + transport: mockTransport, + }); + + const latestBlockNumber$ = new Subject(); + + const logs$ = latestBlockNumber$.pipe( + map((endBlock) => ({ startBlock: 0n, endBlock })), + blockRangeToLogs({ + publicClient, + address: "0x", + events: [], + }) + ); + + latestBlockNumber$.next(1000n); + latestBlockNumber$.next(1001n); + latestBlockNumber$.next(1002n); + + const logs = await firstValueFrom(logs$); + + expect(logs).toMatchInlineSnapshot(); + + // await new Promise((resolve) => setTimeout(resolve, 1000)); + + // // expect(spy).toHaveBeenCalledTimes(1); + // // expect(await firstValueFrom(logs$)).toMatchInlineSnapshot(``); + // expect(spy).toHaveBeenCalledTimes(1); + // expect(spy).toHaveBeenCalledWith({ + // address: "0x", + // fromBlock: 0n, + // toBlock: 1000n, + // maxBlockRange: 1000n, + // }); + // expect(await firstValueFrom(logs$)).toMatchObject({ + // fromBlock: 0n, + // toBlock: 1000n, + // logs: [], + // }); + }); +}); diff --git a/packages/block-events-stream/src/blockRangeToLogs.ts b/packages/block-events-stream/src/blockRangeToLogs.ts index 12cb3e71cb..6906cc6a02 100644 --- a/packages/block-events-stream/src/blockRangeToLogs.ts +++ b/packages/block-events-stream/src/blockRangeToLogs.ts @@ -11,7 +11,7 @@ export type BlockRangeToLogsOptions = { }; export type BlockRangeToLogsResult = OperatorFunction< - [BlockNumber, BlockNumber], + { startBlock: BlockNumber; endBlock: BlockNumber }, FetchLogsResult >; @@ -22,7 +22,7 @@ export function blockRangeToLogs({ maxBlockRange, }: BlockRangeToLogsOptions): BlockRangeToLogsResult { let fromBlock: bigint | null = null; - return exhaustMap(([startBlock, endBlock]) => { + return exhaustMap(({ startBlock, endBlock }) => { fromBlock ??= startBlock; return from( fetchLogs({ diff --git a/packages/block-events-stream/src/groupLogsByBlockNumber.ts b/packages/block-events-stream/src/groupLogsByBlockNumber.ts index 4784214e1c..fb21cde02d 100644 --- a/packages/block-events-stream/src/groupLogsByBlockNumber.ts +++ b/packages/block-events-stream/src/groupLogsByBlockNumber.ts @@ -2,6 +2,7 @@ import { BlockNumber, Hex, Log } from "viem"; import { NonPendingLog, isNonPendingLog } from "./isNonPendingLog"; import { bigIntSort } from "./utils"; import { isDefined } from "@latticexyz/common/utils"; +import { debug } from "./debug"; export function groupLogsByBlockNumber( logs: readonly TLog[] @@ -9,7 +10,7 @@ export function groupLogsByBlockNumber( // Pending logs don't have block numbers, so filter them out. const nonPendingLogs = logs.filter(isNonPendingLog); if (logs.length !== nonPendingLogs.length) { - console.warn( + debug( "pending logs discarded", logs.filter((log) => !isNonPendingLog(log)) ); From 6ab1699c1812c3cae6a0132b4ba64cb81da0ae49 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 3 Jul 2023 13:36:09 +0100 Subject: [PATCH 18/30] move fetchLogs to async generator, add tests --- .../src/blockRangeToLogs.test.ts | 20 +- .../src/blockRangeToLogs.ts | 2 +- .../src/createBlockStream.ts | 4 +- .../block-events-stream/src/fetchLogs.test.ts | 436 ++++++++++++++++++ packages/block-events-stream/src/fetchLogs.ts | 69 ++- .../src/fetchLogsSubset.ts | 64 +++ packages/block-events-stream/src/index.ts | 2 +- 7 files changed, 553 insertions(+), 44 deletions(-) create mode 100644 packages/block-events-stream/src/fetchLogs.test.ts create mode 100644 packages/block-events-stream/src/fetchLogsSubset.ts diff --git a/packages/block-events-stream/src/blockRangeToLogs.test.ts b/packages/block-events-stream/src/blockRangeToLogs.test.ts index 0dc13c3ec3..00617b6562 100644 --- a/packages/block-events-stream/src/blockRangeToLogs.test.ts +++ b/packages/block-events-stream/src/blockRangeToLogs.test.ts @@ -2,7 +2,7 @@ import { describe, it, expect, vi } from "vitest"; import { blockRangeToLogs } from "./blockRangeToLogs"; import { Subject, firstValueFrom, map } from "rxjs"; import { Transport, createPublicClient, createTransport } from "viem"; -import * as fetchLogsExports from "./fetchLogs"; +import * as fetchLogsExports from "./fetchLogsSubset"; import { bigIntMin } from "./utils"; const mockTransport: Transport = () => @@ -19,6 +19,15 @@ describe("blockRangeToLogs", () => { transport: mockTransport, }); + const spy = vi.spyOn(fetchLogsExports, "fetchLogs"); + spy.mockImplementation(async ({ fromBlock, toBlock, maxBlockRange = 1000n }) => { + return { + fromBlock, + toBlock: bigIntMin(toBlock, fromBlock + maxBlockRange), + logs: [], + }; + }); + const latestBlockNumber$ = new Subject(); const logs$ = latestBlockNumber$.pipe( @@ -30,13 +39,14 @@ describe("blockRangeToLogs", () => { }) ); - latestBlockNumber$.next(1000n); - latestBlockNumber$.next(1001n); - latestBlockNumber$.next(1002n); + setTimeout(() => latestBlockNumber$.next(1000n), 100); + setTimeout(() => latestBlockNumber$.next(1001n), 200); + setTimeout(() => latestBlockNumber$.next(1002n), 300); const logs = await firstValueFrom(logs$); - expect(logs).toMatchInlineSnapshot(); + console.log("got logs", logs); + // expect(logs).toMatchInlineSnapshot(); // await new Promise((resolve) => setTimeout(resolve, 1000)); diff --git a/packages/block-events-stream/src/blockRangeToLogs.ts b/packages/block-events-stream/src/blockRangeToLogs.ts index 6906cc6a02..bce7d3fd97 100644 --- a/packages/block-events-stream/src/blockRangeToLogs.ts +++ b/packages/block-events-stream/src/blockRangeToLogs.ts @@ -1,5 +1,5 @@ import { OperatorFunction, exhaustMap, from, tap } from "rxjs"; -import { FetchLogsResult, fetchLogs } from "./fetchLogs"; +import { FetchLogsResult, fetchLogs } from "./fetchLogsSubset"; import { AbiEvent, Address } from "abitype"; import { BlockNumber, PublicClient } from "viem"; diff --git a/packages/block-events-stream/src/createBlockStream.ts b/packages/block-events-stream/src/createBlockStream.ts index 0603aa08e5..b51e11ff6b 100644 --- a/packages/block-events-stream/src/createBlockStream.ts +++ b/packages/block-events-stream/src/createBlockStream.ts @@ -13,8 +13,8 @@ export function createBlockStream({ publicClient, blockTag }: CreateBlockStreamO return publicClient.watchBlocks({ blockTag, emitOnBegin: true, - onBlock: subscriber.next, - onError: subscriber.error, + onBlock: (block) => subscriber.next(block), + onError: (error) => subscriber.error(error), }); }); } diff --git a/packages/block-events-stream/src/fetchLogs.test.ts b/packages/block-events-stream/src/fetchLogs.test.ts new file mode 100644 index 0000000000..eb0bb7eee3 --- /dev/null +++ b/packages/block-events-stream/src/fetchLogs.test.ts @@ -0,0 +1,436 @@ +import { describe, it, expect, vi } from "vitest"; +import { + EIP1193RequestFn, + LimitExceededRpcError, + RpcLog, + RpcRequestError, + Transport, + createPublicClient, + createTransport, + hexToNumber, +} from "viem"; +import { fetchLogs } from "./fetchLogs"; + +const mockedTransportRequest = vi.fn, ReturnType>(); +const mockTransport: Transport = () => + createTransport({ + key: "mock", + name: "Mock Transport", + request: mockedTransportRequest as any, + type: "mock", + }); + +const publicClient = createPublicClient({ + transport: mockTransport, +}); + +describe("fetchLogs", () => { + it("yields chunks of logs for the block range", async () => { + const requests: any[] = []; + mockedTransportRequest.mockImplementation(async ({ method, params }): Promise => { + requests.push(params); + if (method !== "eth_getLogs") throw new Error("not implemented"); + return []; + }); + + const results = []; + for await (const result of fetchLogs({ + publicClient, + address: "0x", + events: [], + fromBlock: 0n, + toBlock: 500n, + maxBlockRange: 100n, + })) { + results.push(result); + } + + expect(requests).toMatchInlineSnapshot(` + [ + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x64", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x65", + "toBlock": "0xc9", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0xca", + "toBlock": "0x12e", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x12f", + "toBlock": "0x193", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x194", + "toBlock": "0x1f4", + "topics": [ + [], + ], + }, + ], + ] + `); + + expect(results).toMatchInlineSnapshot(` + [ + { + "fromBlock": 0n, + "logs": [], + "toBlock": 100n, + }, + { + "fromBlock": 101n, + "logs": [], + "toBlock": 201n, + }, + { + "fromBlock": 202n, + "logs": [], + "toBlock": 302n, + }, + { + "fromBlock": 303n, + "logs": [], + "toBlock": 403n, + }, + { + "fromBlock": 404n, + "logs": [], + "toBlock": 500n, + }, + ] + `); + }); + + it("reduces block range if block range is exceeded", async () => { + const requests: any[] = []; + mockedTransportRequest.mockImplementation(async ({ method, params }): Promise => { + if (method !== "eth_getLogs") throw new Error("not implemented"); + requests.push(params); + + if (hexToNumber(params[0].toBlock) - hexToNumber(params[0].fromBlock) > 500) { + throw new LimitExceededRpcError( + new RpcRequestError({ + body: params[0], + url: "https://viem.sh", + error: { + code: -32005, + message: "block range exceeded", + }, + }) + ); + } + + return []; + }); + + const results = []; + for await (const result of fetchLogs({ + publicClient, + address: "0x", + events: [], + fromBlock: 0n, + toBlock: 2000n, + })) { + results.push(result); + } + + expect(requests).toMatchInlineSnapshot(` + [ + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x3e8", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x3e8", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x3e8", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x3e8", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x1f4", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x1f5", + "toBlock": "0x5dd", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x1f5", + "toBlock": "0x5dd", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x1f5", + "toBlock": "0x5dd", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x1f5", + "toBlock": "0x5dd", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x1f5", + "toBlock": "0x3e9", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x3ea", + "toBlock": "0x7d0", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x3ea", + "toBlock": "0x7d0", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x3ea", + "toBlock": "0x7d0", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x3ea", + "toBlock": "0x7d0", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x3ea", + "toBlock": "0x5dd", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x5de", + "toBlock": "0x7d0", + "topics": [ + [], + ], + }, + ], + ] + `); + + expect(results).toMatchInlineSnapshot(` + [ + { + "fromBlock": 0n, + "logs": [], + "toBlock": 500n, + }, + { + "fromBlock": 501n, + "logs": [], + "toBlock": 1001n, + }, + { + "fromBlock": 1002n, + "logs": [], + "toBlock": 1501n, + }, + { + "fromBlock": 1502n, + "logs": [], + "toBlock": 2000n, + }, + ] + `); + }); + + it("retries if rate limit is exceeded", async () => { + const requests: any[] = []; + mockedTransportRequest.mockImplementation(async ({ method, params }): Promise => { + if (method !== "eth_getLogs") throw new Error("not implemented"); + requests.push(params); + + if (requests.length < 3) { + throw new LimitExceededRpcError( + new RpcRequestError({ + body: params[0], + url: "https://viem.sh", + error: { + code: -32005, + message: "rate limit exceeded", + }, + }) + ); + } + + return []; + }); + + const results = []; + for await (const result of fetchLogs({ + publicClient, + address: "0x", + events: [], + fromBlock: 0n, + toBlock: 500n, + })) { + results.push(result); + } + + expect(requests).toMatchInlineSnapshot(` + [ + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x1f4", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x1f4", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x1f4", + "topics": [ + [], + ], + }, + ], + ] + `); + + expect(results).toMatchInlineSnapshot(` + [ + { + "fromBlock": 0n, + "logs": [], + "toBlock": 500n, + }, + ] + `); + }); +}); diff --git a/packages/block-events-stream/src/fetchLogs.ts b/packages/block-events-stream/src/fetchLogs.ts index 7719a6e4c2..e36eaf143d 100644 --- a/packages/block-events-stream/src/fetchLogs.ts +++ b/packages/block-events-stream/src/fetchLogs.ts @@ -1,7 +1,7 @@ import { AbiEvent, Address } from "abitype"; +import { PublicClient, BlockNumber } from "viem"; import { GetLogsResult, getLogs } from "./getLogs"; import { bigIntMin, wait } from "./utils"; -import { PublicClient, BlockNumber } from "viem"; import { debug } from "./debug"; export type FetchLogsOptions = { @@ -11,7 +11,7 @@ export type FetchLogsOptions = { fromBlock: BlockNumber; toBlock: BlockNumber; maxBlockRange?: bigint; - retryCount?: number; + maxRetryCount?: number; }; export type FetchLogsResult = { @@ -20,45 +20,44 @@ export type FetchLogsResult = { logs: GetLogsResult; }; -export async function fetchLogs({ +export async function* fetchLogs({ maxBlockRange = 1000n, - retryCount = 0, + maxRetryCount = 3, ...getLogsOpts -}: FetchLogsOptions): Promise> { - try { - const fromBlock = getLogsOpts.fromBlock; - const blockRange = bigIntMin(maxBlockRange, getLogsOpts.toBlock - fromBlock); - const toBlock = fromBlock + blockRange; +}: FetchLogsOptions): AsyncGenerator> { + let fromBlock = getLogsOpts.fromBlock; + let blockRange = bigIntMin(maxBlockRange, getLogsOpts.toBlock - fromBlock); + let retryCount = 0; - const logs = await getLogs({ ...getLogsOpts, fromBlock, toBlock }); - return { fromBlock, toBlock, logs }; - } catch (error: unknown) { - if (!(error instanceof Error)) throw error; + while (fromBlock < getLogsOpts.toBlock) { + try { + const toBlock = fromBlock + blockRange; + const logs = await getLogs({ ...getLogsOpts, fromBlock, toBlock }); + yield { fromBlock, toBlock, logs }; + fromBlock = toBlock + 1n; + blockRange = bigIntMin(maxBlockRange, getLogsOpts.toBlock - fromBlock); + } catch (error: unknown) { + if (!(error instanceof Error)) throw error; - // TODO: replace this with a real error - if (error.message === "rate limit" && retryCount < 10) { - const seconds = 2 * retryCount; - debug(`too many requests, retrying in ${seconds}s`, error); - await wait(1000 * seconds); - return await fetchLogs({ ...getLogsOpts, maxBlockRange, retryCount: retryCount + 1 }); - } + if (error.message.includes("rate limit exceeded") && retryCount < maxRetryCount) { + const seconds = 2 * retryCount; + debug(`too many requests, retrying in ${seconds}s`, error); + await wait(1000 * seconds); + retryCount += 1; + continue; + } - // TODO: replace this with a real error - if (error.message === "block range exceeded") { - const blockRange = getLogsOpts.toBlock - getLogsOpts.fromBlock; - const newBlockRange = blockRange / 2n; - if (newBlockRange <= 0n) { - throw new Error("can't reduce block range any further"); + if (error.message.includes("block range exceeded")) { + blockRange /= 2n; + if (blockRange <= 0n) { + throw new Error("can't reduce block range any further"); + } + debug("block range exceeded, trying a smaller block range", error); + // TODO: adjust maxBlockRange down if we consistently hit this for a given block range size + continue; } - debug("block range exceeded, trying a smaller block range", error); - return await fetchLogs({ - ...getLogsOpts, - toBlock: getLogsOpts.fromBlock + newBlockRange, - maxBlockRange, - retryCount, - }); - } - throw error; + throw error; + } } } diff --git a/packages/block-events-stream/src/fetchLogsSubset.ts b/packages/block-events-stream/src/fetchLogsSubset.ts new file mode 100644 index 0000000000..7719a6e4c2 --- /dev/null +++ b/packages/block-events-stream/src/fetchLogsSubset.ts @@ -0,0 +1,64 @@ +import { AbiEvent, Address } from "abitype"; +import { GetLogsResult, getLogs } from "./getLogs"; +import { bigIntMin, wait } from "./utils"; +import { PublicClient, BlockNumber } from "viem"; +import { debug } from "./debug"; + +export type FetchLogsOptions = { + publicClient: PublicClient; + address?: Address | Address[]; + events: TAbiEvents; + fromBlock: BlockNumber; + toBlock: BlockNumber; + maxBlockRange?: bigint; + retryCount?: number; +}; + +export type FetchLogsResult = { + fromBlock: BlockNumber; + toBlock: BlockNumber; + logs: GetLogsResult; +}; + +export async function fetchLogs({ + maxBlockRange = 1000n, + retryCount = 0, + ...getLogsOpts +}: FetchLogsOptions): Promise> { + try { + const fromBlock = getLogsOpts.fromBlock; + const blockRange = bigIntMin(maxBlockRange, getLogsOpts.toBlock - fromBlock); + const toBlock = fromBlock + blockRange; + + const logs = await getLogs({ ...getLogsOpts, fromBlock, toBlock }); + return { fromBlock, toBlock, logs }; + } catch (error: unknown) { + if (!(error instanceof Error)) throw error; + + // TODO: replace this with a real error + if (error.message === "rate limit" && retryCount < 10) { + const seconds = 2 * retryCount; + debug(`too many requests, retrying in ${seconds}s`, error); + await wait(1000 * seconds); + return await fetchLogs({ ...getLogsOpts, maxBlockRange, retryCount: retryCount + 1 }); + } + + // TODO: replace this with a real error + if (error.message === "block range exceeded") { + const blockRange = getLogsOpts.toBlock - getLogsOpts.fromBlock; + const newBlockRange = blockRange / 2n; + if (newBlockRange <= 0n) { + throw new Error("can't reduce block range any further"); + } + debug("block range exceeded, trying a smaller block range", error); + return await fetchLogs({ + ...getLogsOpts, + toBlock: getLogsOpts.fromBlock + newBlockRange, + maxBlockRange, + retryCount, + }); + } + + throw error; + } +} diff --git a/packages/block-events-stream/src/index.ts b/packages/block-events-stream/src/index.ts index f7ee62ef5b..1bfcd73889 100644 --- a/packages/block-events-stream/src/index.ts +++ b/packages/block-events-stream/src/index.ts @@ -1,6 +1,6 @@ export * from "./blockRangeToLogs"; export * from "./createBlockStream"; -export * from "./fetchLogs"; +export * from "./fetchLogsSubset"; export * from "./groupLogsByBlockNumber"; export * from "./isNonPendingBlock"; export * from "./isNonPendingLog"; From 18a389beda35549d42929b8d771a58178fb2dc8b Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 3 Jul 2023 17:00:49 +0100 Subject: [PATCH 19/30] add block range tests --- .../src/blockRangeToLogs.test.ts | 124 ++++++++++++------ .../src/blockRangeToLogs.ts | 2 +- .../block-events-stream/src/fetchLogs.test.ts | 12 +- 3 files changed, 93 insertions(+), 45 deletions(-) diff --git a/packages/block-events-stream/src/blockRangeToLogs.test.ts b/packages/block-events-stream/src/blockRangeToLogs.test.ts index 00617b6562..10449d1042 100644 --- a/packages/block-events-stream/src/blockRangeToLogs.test.ts +++ b/packages/block-events-stream/src/blockRangeToLogs.test.ts @@ -1,31 +1,34 @@ -import { describe, it, expect, vi } from "vitest"; +import { describe, it, expect, vi, beforeEach } from "vitest"; import { blockRangeToLogs } from "./blockRangeToLogs"; -import { Subject, firstValueFrom, map } from "rxjs"; -import { Transport, createPublicClient, createTransport } from "viem"; -import * as fetchLogsExports from "./fetchLogsSubset"; -import { bigIntMin } from "./utils"; +import { Subject, lastValueFrom, map, toArray } from "rxjs"; +import { EIP1193RequestFn, RpcLog, Transport, createPublicClient, createTransport } from "viem"; +import { wait } from "./utils"; +const mockedTransportRequest = vi.fn, ReturnType>(); const mockTransport: Transport = () => createTransport({ key: "mock", name: "Mock Transport", - request: vi.fn(() => null) as any, + request: mockedTransportRequest as any, type: "mock", }); +const publicClient = createPublicClient({ + transport: mockTransport, +}); + describe("blockRangeToLogs", () => { - it("processes block ranges in order", async () => { - const publicClient = createPublicClient({ - transport: mockTransport, - }); + beforeEach(() => { + mockedTransportRequest.mockClear(); + }); - const spy = vi.spyOn(fetchLogsExports, "fetchLogs"); - spy.mockImplementation(async ({ fromBlock, toBlock, maxBlockRange = 1000n }) => { - return { - fromBlock, - toBlock: bigIntMin(toBlock, fromBlock + maxBlockRange), - logs: [], - }; + it("processes block ranges in order", async () => { + const requests: any[] = []; + mockedTransportRequest.mockImplementation(async ({ method, params }): Promise => { + requests.push(params); + if (method !== "eth_getLogs") throw new Error("not implemented"); + await wait(400); + return []; }); const latestBlockNumber$ = new Subject(); @@ -36,33 +39,74 @@ describe("blockRangeToLogs", () => { publicClient, address: "0x", events: [], - }) + }), + toArray() ); - setTimeout(() => latestBlockNumber$.next(1000n), 100); - setTimeout(() => latestBlockNumber$.next(1001n), 200); - setTimeout(() => latestBlockNumber$.next(1002n), 300); - - const logs = await firstValueFrom(logs$); + (async (): Promise => { + for (let blockNumber = 1000n; blockNumber <= 1010n; blockNumber++) { + await wait(100); + latestBlockNumber$.next(blockNumber); + } + await wait(100); + latestBlockNumber$.complete(); + })(); - console.log("got logs", logs); - // expect(logs).toMatchInlineSnapshot(); + const results = await lastValueFrom(logs$); - // await new Promise((resolve) => setTimeout(resolve, 1000)); + expect(requests).toMatchInlineSnapshot(` + [ + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x3e8", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x3e9", + "toBlock": "0x3ec", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x3ed", + "toBlock": "0x3f0", + "topics": [ + [], + ], + }, + ], + ] + `); - // // expect(spy).toHaveBeenCalledTimes(1); - // // expect(await firstValueFrom(logs$)).toMatchInlineSnapshot(``); - // expect(spy).toHaveBeenCalledTimes(1); - // expect(spy).toHaveBeenCalledWith({ - // address: "0x", - // fromBlock: 0n, - // toBlock: 1000n, - // maxBlockRange: 1000n, - // }); - // expect(await firstValueFrom(logs$)).toMatchObject({ - // fromBlock: 0n, - // toBlock: 1000n, - // logs: [], - // }); + expect(results).toMatchInlineSnapshot(` + [ + { + "fromBlock": 0n, + "logs": [], + "toBlock": 1000n, + }, + { + "fromBlock": 1001n, + "logs": [], + "toBlock": 1004n, + }, + { + "fromBlock": 1005n, + "logs": [], + "toBlock": 1008n, + }, + ] + `); }); }); diff --git a/packages/block-events-stream/src/blockRangeToLogs.ts b/packages/block-events-stream/src/blockRangeToLogs.ts index bce7d3fd97..6906cc6a02 100644 --- a/packages/block-events-stream/src/blockRangeToLogs.ts +++ b/packages/block-events-stream/src/blockRangeToLogs.ts @@ -1,5 +1,5 @@ import { OperatorFunction, exhaustMap, from, tap } from "rxjs"; -import { FetchLogsResult, fetchLogs } from "./fetchLogsSubset"; +import { FetchLogsResult, fetchLogs } from "./fetchLogs"; import { AbiEvent, Address } from "abitype"; import { BlockNumber, PublicClient } from "viem"; diff --git a/packages/block-events-stream/src/fetchLogs.test.ts b/packages/block-events-stream/src/fetchLogs.test.ts index eb0bb7eee3..cea0119d63 100644 --- a/packages/block-events-stream/src/fetchLogs.test.ts +++ b/packages/block-events-stream/src/fetchLogs.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect, vi } from "vitest"; +import { describe, it, expect, vi, beforeEach } from "vitest"; import { EIP1193RequestFn, LimitExceededRpcError, @@ -25,6 +25,10 @@ const publicClient = createPublicClient({ }); describe("fetchLogs", () => { + beforeEach(() => { + mockedTransportRequest.mockClear(); + }); + it("yields chunks of logs for the block range", async () => { const requests: any[] = []; mockedTransportRequest.mockImplementation(async ({ method, params }): Promise => { @@ -137,10 +141,10 @@ describe("fetchLogs", () => { if (method !== "eth_getLogs") throw new Error("not implemented"); requests.push(params); - if (hexToNumber(params[0].toBlock) - hexToNumber(params[0].fromBlock) > 500) { + if (hexToNumber((params as any)[0].toBlock) - hexToNumber((params as any)[0].fromBlock) > 500) { throw new LimitExceededRpcError( new RpcRequestError({ - body: params[0], + body: (params as any)[0], url: "https://viem.sh", error: { code: -32005, @@ -364,7 +368,7 @@ describe("fetchLogs", () => { if (requests.length < 3) { throw new LimitExceededRpcError( new RpcRequestError({ - body: params[0], + body: (params as any)[0], url: "https://viem.sh", error: { code: -32005, From ad76b3c2f7b154a4d6de8bc152e4386430ea746b Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 3 Jul 2023 17:01:57 +0100 Subject: [PATCH 20/30] get rid of old approach --- .../src/fetchLogsSubset.ts | 64 ------------------- packages/block-events-stream/src/index.ts | 2 +- 2 files changed, 1 insertion(+), 65 deletions(-) delete mode 100644 packages/block-events-stream/src/fetchLogsSubset.ts diff --git a/packages/block-events-stream/src/fetchLogsSubset.ts b/packages/block-events-stream/src/fetchLogsSubset.ts deleted file mode 100644 index 7719a6e4c2..0000000000 --- a/packages/block-events-stream/src/fetchLogsSubset.ts +++ /dev/null @@ -1,64 +0,0 @@ -import { AbiEvent, Address } from "abitype"; -import { GetLogsResult, getLogs } from "./getLogs"; -import { bigIntMin, wait } from "./utils"; -import { PublicClient, BlockNumber } from "viem"; -import { debug } from "./debug"; - -export type FetchLogsOptions = { - publicClient: PublicClient; - address?: Address | Address[]; - events: TAbiEvents; - fromBlock: BlockNumber; - toBlock: BlockNumber; - maxBlockRange?: bigint; - retryCount?: number; -}; - -export type FetchLogsResult = { - fromBlock: BlockNumber; - toBlock: BlockNumber; - logs: GetLogsResult; -}; - -export async function fetchLogs({ - maxBlockRange = 1000n, - retryCount = 0, - ...getLogsOpts -}: FetchLogsOptions): Promise> { - try { - const fromBlock = getLogsOpts.fromBlock; - const blockRange = bigIntMin(maxBlockRange, getLogsOpts.toBlock - fromBlock); - const toBlock = fromBlock + blockRange; - - const logs = await getLogs({ ...getLogsOpts, fromBlock, toBlock }); - return { fromBlock, toBlock, logs }; - } catch (error: unknown) { - if (!(error instanceof Error)) throw error; - - // TODO: replace this with a real error - if (error.message === "rate limit" && retryCount < 10) { - const seconds = 2 * retryCount; - debug(`too many requests, retrying in ${seconds}s`, error); - await wait(1000 * seconds); - return await fetchLogs({ ...getLogsOpts, maxBlockRange, retryCount: retryCount + 1 }); - } - - // TODO: replace this with a real error - if (error.message === "block range exceeded") { - const blockRange = getLogsOpts.toBlock - getLogsOpts.fromBlock; - const newBlockRange = blockRange / 2n; - if (newBlockRange <= 0n) { - throw new Error("can't reduce block range any further"); - } - debug("block range exceeded, trying a smaller block range", error); - return await fetchLogs({ - ...getLogsOpts, - toBlock: getLogsOpts.fromBlock + newBlockRange, - maxBlockRange, - retryCount, - }); - } - - throw error; - } -} diff --git a/packages/block-events-stream/src/index.ts b/packages/block-events-stream/src/index.ts index 1bfcd73889..f7ee62ef5b 100644 --- a/packages/block-events-stream/src/index.ts +++ b/packages/block-events-stream/src/index.ts @@ -1,6 +1,6 @@ export * from "./blockRangeToLogs"; export * from "./createBlockStream"; -export * from "./fetchLogsSubset"; +export * from "./fetchLogs"; export * from "./groupLogsByBlockNumber"; export * from "./isNonPendingBlock"; export * from "./isNonPendingLog"; From 5a6172026c90d1efb1dd4659d0185d3e39c4657d Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 3 Jul 2023 17:22:50 +0100 Subject: [PATCH 21/30] add note about timers --- packages/block-events-stream/src/blockRangeToLogs.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/block-events-stream/src/blockRangeToLogs.test.ts b/packages/block-events-stream/src/blockRangeToLogs.test.ts index 10449d1042..58e8659419 100644 --- a/packages/block-events-stream/src/blockRangeToLogs.test.ts +++ b/packages/block-events-stream/src/blockRangeToLogs.test.ts @@ -4,6 +4,8 @@ import { Subject, lastValueFrom, map, toArray } from "rxjs"; import { EIP1193RequestFn, RpcLog, Transport, createPublicClient, createTransport } from "viem"; import { wait } from "./utils"; +// TODO: there is a chance that these tests will need to be written differently with timers to avoid flakiness + const mockedTransportRequest = vi.fn, ReturnType>(); const mockTransport: Transport = () => createTransport({ From ee1015926ebe6ffc7a41b8d439fece29849aa2e0 Mon Sep 17 00:00:00 2001 From: alvrs Date: Mon, 3 Jul 2023 23:25:54 +0100 Subject: [PATCH 22/30] use concatMap instead of exhaustMap --- .../src/blockRangeToLogs.test.ts | 17 ++++++- .../src/blockRangeToLogs.ts | 48 +++++++++++-------- packages/block-events-stream/src/fetchLogs.ts | 2 +- 3 files changed, 46 insertions(+), 21 deletions(-) diff --git a/packages/block-events-stream/src/blockRangeToLogs.test.ts b/packages/block-events-stream/src/blockRangeToLogs.test.ts index 58e8659419..be1b49a5fc 100644 --- a/packages/block-events-stream/src/blockRangeToLogs.test.ts +++ b/packages/block-events-stream/src/blockRangeToLogs.test.ts @@ -29,7 +29,7 @@ describe("blockRangeToLogs", () => { mockedTransportRequest.mockImplementation(async ({ method, params }): Promise => { requests.push(params); if (method !== "eth_getLogs") throw new Error("not implemented"); - await wait(400); + await wait(450); return []; }); @@ -88,6 +88,16 @@ describe("blockRangeToLogs", () => { ], }, ], + [ + { + "address": "0x", + "fromBlock": "0x3f1", + "toBlock": "0x3f2", + "topics": [ + [], + ], + }, + ], ] `); @@ -108,6 +118,11 @@ describe("blockRangeToLogs", () => { "logs": [], "toBlock": 1008n, }, + { + "fromBlock": 1009n, + "logs": [], + "toBlock": 1010n, + }, ] `); }); diff --git a/packages/block-events-stream/src/blockRangeToLogs.ts b/packages/block-events-stream/src/blockRangeToLogs.ts index 6906cc6a02..faafce7d1a 100644 --- a/packages/block-events-stream/src/blockRangeToLogs.ts +++ b/packages/block-events-stream/src/blockRangeToLogs.ts @@ -1,4 +1,4 @@ -import { OperatorFunction, exhaustMap, from, tap } from "rxjs"; +import { EMPTY, OperatorFunction, concatMap, from, pipe, tap } from "rxjs"; import { FetchLogsResult, fetchLogs } from "./fetchLogs"; import { AbiEvent, Address } from "abitype"; import { BlockNumber, PublicClient } from "viem"; @@ -21,22 +21,32 @@ export function blockRangeToLogs({ events, maxBlockRange, }: BlockRangeToLogsOptions): BlockRangeToLogsResult { - let fromBlock: bigint | null = null; - return exhaustMap(({ startBlock, endBlock }) => { - fromBlock ??= startBlock; - return from( - fetchLogs({ - publicClient, - address, - events, - fromBlock, - toBlock: endBlock, - maxBlockRange, - }) - ).pipe( - tap((result) => { - fromBlock = result.toBlock + 1n; - }) - ); - }); + let fromBlock: bigint; + let toBlock: bigint; + + return pipe( + tap(({ endBlock, startBlock }) => { + fromBlock ??= startBlock; + toBlock = endBlock; + }), + // concatMap only processes the next emission once the inner observable completes, + // so it always uses the latest toBlock value. + concatMap(() => { + if (fromBlock > toBlock) return EMPTY; + return from( + fetchLogs({ + publicClient, + address, + events, + fromBlock, + toBlock, + maxBlockRange, + }) + ).pipe( + tap(({ toBlock }) => { + fromBlock = toBlock + 1n; + }) + ); + }) + ); } diff --git a/packages/block-events-stream/src/fetchLogs.ts b/packages/block-events-stream/src/fetchLogs.ts index e36eaf143d..711ee5469f 100644 --- a/packages/block-events-stream/src/fetchLogs.ts +++ b/packages/block-events-stream/src/fetchLogs.ts @@ -29,7 +29,7 @@ export async function* fetchLogs({ let blockRange = bigIntMin(maxBlockRange, getLogsOpts.toBlock - fromBlock); let retryCount = 0; - while (fromBlock < getLogsOpts.toBlock) { + while (fromBlock <= getLogsOpts.toBlock) { try { const toBlock = fromBlock + blockRange; const logs = await getLogs({ ...getLogsOpts, fromBlock, toBlock }); From 5489e92cb01aa44af69ae28446caa192197926fa Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Tue, 4 Jul 2023 08:19:38 +0100 Subject: [PATCH 23/30] update readme --- packages/block-events-stream/README.md | 2 +- packages/block-events-stream/src/blockRangeToLogs.test.ts | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/block-events-stream/README.md b/packages/block-events-stream/README.md index 5e4b2fb63f..caa31f62b4 100644 --- a/packages/block-events-stream/README.md +++ b/packages/block-events-stream/README.md @@ -19,7 +19,7 @@ const latestBlockNumber$ = latestBlock$.pipe( latestBlockNumber$ .pipe( - map((latestBlockNumber) => [0n, latestBlockNumber]), + map((latestBlockNumber) => ({ startBlock: 0n, endBlock: latestBlockNumber })), blockRangeToLogs({ publicClient, address, diff --git a/packages/block-events-stream/src/blockRangeToLogs.test.ts b/packages/block-events-stream/src/blockRangeToLogs.test.ts index be1b49a5fc..eb3c8e04fb 100644 --- a/packages/block-events-stream/src/blockRangeToLogs.test.ts +++ b/packages/block-events-stream/src/blockRangeToLogs.test.ts @@ -41,8 +41,7 @@ describe("blockRangeToLogs", () => { publicClient, address: "0x", events: [], - }), - toArray() + }) ); (async (): Promise => { @@ -54,7 +53,7 @@ describe("blockRangeToLogs", () => { latestBlockNumber$.complete(); })(); - const results = await lastValueFrom(logs$); + const results = await lastValueFrom(logs$.pipe(toArray())); expect(requests).toMatchInlineSnapshot(` [ From ae64564fc652228d8aadabfbc3ed79c1ab144f25 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Tue, 4 Jul 2023 03:55:08 -0700 Subject: [PATCH 24/30] Update packages/block-events-stream/src/fetchLogs.test.ts Co-authored-by: alvarius <89248902+alvrs@users.noreply.github.com> --- packages/block-events-stream/src/fetchLogs.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/block-events-stream/src/fetchLogs.test.ts b/packages/block-events-stream/src/fetchLogs.test.ts index cea0119d63..7f95f507b9 100644 --- a/packages/block-events-stream/src/fetchLogs.test.ts +++ b/packages/block-events-stream/src/fetchLogs.test.ts @@ -145,7 +145,7 @@ describe("fetchLogs", () => { throw new LimitExceededRpcError( new RpcRequestError({ body: (params as any)[0], - url: "https://viem.sh", + url: "https://mud.dev", error: { code: -32005, message: "block range exceeded", From 84a5c299c6d6bdec0605f683b0416ebf7a5d05c4 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Tue, 4 Jul 2023 11:59:08 +0100 Subject: [PATCH 25/30] update readme --- packages/block-events-stream/README.md | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/packages/block-events-stream/README.md b/packages/block-events-stream/README.md index caa31f62b4..ab67b02848 100644 --- a/packages/block-events-stream/README.md +++ b/packages/block-events-stream/README.md @@ -3,6 +3,8 @@ ## Example ```ts +import { filter, map, mergeMap } from "rxjs"; +import { createPublicClient, parseAbi } from "viem"; import { createBlockStream, isNonPendingBlock, @@ -10,6 +12,10 @@ import { blockRangeToLogs, } from "@latticexyz/block-events-stream"; +const publicClient = createPublicClient({ + // your viem public client config here +}); + const latestBlock$ = await createBlockStream({ publicClient, blockTag: "latest" }); const latestBlockNumber$ = latestBlock$.pipe( @@ -23,10 +29,14 @@ latestBlockNumber$ blockRangeToLogs({ publicClient, address, - events, + events: parseAbi([ + "event StoreDeleteRecord(bytes32 table, bytes32[] key)", + "event StoreSetField(bytes32 table, bytes32[] key, uint8 schemaIndex, bytes data)", + "event StoreSetRecord(bytes32 table, bytes32[] key, bytes data)", + "event StoreEphemeralRecord(bytes32 table, bytes32[] key, bytes data)", + ]), }), - map(({ logs }) => from(groupLogsByBlockNumber(logs))), - mergeAll() + mergeMap(({ logs }) => from(groupLogsByBlockNumber(logs))) ) .subscribe((block) => { console.log("got events for block", block); From 30673bbf266617d9ac93818bbaf21228c846c468 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Tue, 4 Jul 2023 12:24:37 +0100 Subject: [PATCH 26/30] add tsdoc --- .../src/blockRangeToLogs.ts | 22 ++++++++++- packages/block-events-stream/src/fetchLogs.ts | 38 +++++++++++++++++++ .../src/groupLogsByBlockNumber.ts | 13 +++++++ 3 files changed, 72 insertions(+), 1 deletion(-) diff --git a/packages/block-events-stream/src/blockRangeToLogs.ts b/packages/block-events-stream/src/blockRangeToLogs.ts index faafce7d1a..07df9e2512 100644 --- a/packages/block-events-stream/src/blockRangeToLogs.ts +++ b/packages/block-events-stream/src/blockRangeToLogs.ts @@ -4,9 +4,23 @@ import { AbiEvent, Address } from "abitype"; import { BlockNumber, PublicClient } from "viem"; export type BlockRangeToLogsOptions = { + /** + * [viem `PublicClient`][0] used for fetching logs from the RPC. + * + * [0]: https://viem.sh/docs/clients/public.html + */ publicClient: PublicClient; + /** + * Optional contract address(es) to fetch logs for. + */ address?: Address | Address[]; + /** + * Events to fetch logs for. + */ events: TAbiEvents; + /** + * Optional maximum block range, if your RPC limits the amount of blocks fetched at a time. + */ maxBlockRange?: bigint; }; @@ -15,6 +29,12 @@ export type BlockRangeToLogsResult = Ope FetchLogsResult >; +/** + * Takes in an observable of `Observable<{ startBlock: bigint, endBlock: bigint }>` and uses a viem `publicClient` to get logs for the contract `address` and matching `events` and emits the logs as they are fetched. + * + * @param {BlockRangeToLogsOptions} options See `BlockRangeToLogsOptions`. + * @returns {BlockRangeToLogsResult} An operator function that transforms a stream of block ranges into a stream of fetched logs. + */ export function blockRangeToLogs({ publicClient, address, @@ -30,7 +50,7 @@ export function blockRangeToLogs({ toBlock = endBlock; }), // concatMap only processes the next emission once the inner observable completes, - // so it always uses the latest toBlock value. + // so it always uses the latest`toBlock` value. concatMap(() => { if (fromBlock > toBlock) return EMPTY; return from( diff --git a/packages/block-events-stream/src/fetchLogs.ts b/packages/block-events-stream/src/fetchLogs.ts index 711ee5469f..1031c856e8 100644 --- a/packages/block-events-stream/src/fetchLogs.ts +++ b/packages/block-events-stream/src/fetchLogs.ts @@ -5,12 +5,35 @@ import { bigIntMin, wait } from "./utils"; import { debug } from "./debug"; export type FetchLogsOptions = { + /** + * [viem `PublicClient`][0] used for fetching logs from the RPC. + * + * [0]: https://viem.sh/docs/clients/public.html + */ publicClient: PublicClient; + /** + * Optional contract address(es) to fetch logs for. + */ address?: Address | Address[]; + /** + * Events to fetch logs for. + */ events: TAbiEvents; + /** + * The block number to start fetching logs from (inclusive). + */ fromBlock: BlockNumber; + /** + * The block number to stop fetching logs at (inclusive). + */ toBlock: BlockNumber; + /** + * Optional maximum block range, if your RPC limits the amount of blocks fetched at a time. Defaults to 1000n. + */ maxBlockRange?: bigint; + /** + * Optional maximum amount of retries if the RPC returns a rate limit error. Defaults to 3. + */ maxRetryCount?: number; }; @@ -20,6 +43,21 @@ export type FetchLogsResult = { logs: GetLogsResult; }; +/** + * An asynchronous generator function that fetches logs from the blockchain in a range of blocks. + * + * @remarks + * The function will fetch logs according to the given options. + * It will iteratively move forward in the block range, yielding fetched logs as they become available. + * If the function encounters rate limits, it will retry until `maxRetryCount` is reached. + * If the function encounters a block range that is too large, it will half the block range and retry, until the block range can't be halved anymore. + * + * @param {FetchLogsOptions} options See `FetchLogsOptions`. + * + * @yields The result of the fetched logs for each block range in the given range. + * + * @throws Will throw an error if the block range can't be reduced any further. + */ export async function* fetchLogs({ maxBlockRange = 1000n, maxRetryCount = 3, diff --git a/packages/block-events-stream/src/groupLogsByBlockNumber.ts b/packages/block-events-stream/src/groupLogsByBlockNumber.ts index fb21cde02d..2f6319a9fb 100644 --- a/packages/block-events-stream/src/groupLogsByBlockNumber.ts +++ b/packages/block-events-stream/src/groupLogsByBlockNumber.ts @@ -4,6 +4,19 @@ import { bigIntSort } from "./utils"; import { isDefined } from "@latticexyz/common/utils"; import { debug } from "./debug"; +/** + * Groups logs by their block number. + * + * @remarks + * This function takes an array of logs and returns a new array where each item corresponds to a distinct block number. + * Each item in the output array includes the block number, the block hash, and an array of all logs for that block. + * Pending logs are filtered out before processing, as they don't have block numbers. + * + * @param logs The logs to group by block number. + * + * @returns An array of objects where each object represents a distinct block and includes the block number, + * the block hash, and an array of logs for that block. + */ export function groupLogsByBlockNumber( logs: readonly TLog[] ): { blockNumber: BlockNumber; blockHash: Hex; events: readonly NonPendingLog[] }[] { From 439f0c680f3db356d8e57760577707519e570b32 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Tue, 4 Jul 2023 12:32:52 +0100 Subject: [PATCH 27/30] rename for consistent terminology --- .../{block-events-stream => block-logs-stream}/.eslintrc | 0 .../.gitignore | 0 .../.npmignore | 0 .../{block-events-stream => block-logs-stream}/README.md | 9 +++++++-- .../package.json | 6 +++--- .../src/blockRangeToLogs.test.ts | 0 .../src/blockRangeToLogs.ts | 0 .../src/createBlockStream.ts | 0 .../src/debug.ts | 0 .../src/fetchLogs.test.ts | 0 .../src/fetchLogs.ts | 0 .../src/getLogs.ts | 0 .../src/groupLogsByBlockNumber.test.ts | 6 +++--- .../src/groupLogsByBlockNumber.ts | 4 ++-- .../src/index.ts | 0 .../src/isNonPendingBlock.ts | 0 .../src/isNonPendingLog.ts | 0 .../src/utils.ts | 0 .../tsconfig.json | 0 .../tsup.config.ts | 0 pnpm-lock.yaml | 2 +- 21 files changed, 16 insertions(+), 11 deletions(-) rename packages/{block-events-stream => block-logs-stream}/.eslintrc (100%) rename packages/{block-events-stream => block-logs-stream}/.gitignore (100%) rename packages/{block-events-stream => block-logs-stream}/.npmignore (100%) rename packages/{block-events-stream => block-logs-stream}/README.md (84%) rename packages/{block-events-stream => block-logs-stream}/package.json (86%) rename packages/{block-events-stream => block-logs-stream}/src/blockRangeToLogs.test.ts (100%) rename packages/{block-events-stream => block-logs-stream}/src/blockRangeToLogs.ts (100%) rename packages/{block-events-stream => block-logs-stream}/src/createBlockStream.ts (100%) rename packages/{block-events-stream => block-logs-stream}/src/debug.ts (100%) rename packages/{block-events-stream => block-logs-stream}/src/fetchLogs.test.ts (100%) rename packages/{block-events-stream => block-logs-stream}/src/fetchLogs.ts (100%) rename packages/{block-events-stream => block-logs-stream}/src/getLogs.ts (100%) rename packages/{block-events-stream => block-logs-stream}/src/groupLogsByBlockNumber.test.ts (97%) rename packages/{block-events-stream => block-logs-stream}/src/groupLogsByBlockNumber.ts (93%) rename packages/{block-events-stream => block-logs-stream}/src/index.ts (100%) rename packages/{block-events-stream => block-logs-stream}/src/isNonPendingBlock.ts (100%) rename packages/{block-events-stream => block-logs-stream}/src/isNonPendingLog.ts (100%) rename packages/{block-events-stream => block-logs-stream}/src/utils.ts (100%) rename packages/{block-events-stream => block-logs-stream}/tsconfig.json (100%) rename packages/{block-events-stream => block-logs-stream}/tsup.config.ts (100%) diff --git a/packages/block-events-stream/.eslintrc b/packages/block-logs-stream/.eslintrc similarity index 100% rename from packages/block-events-stream/.eslintrc rename to packages/block-logs-stream/.eslintrc diff --git a/packages/block-events-stream/.gitignore b/packages/block-logs-stream/.gitignore similarity index 100% rename from packages/block-events-stream/.gitignore rename to packages/block-logs-stream/.gitignore diff --git a/packages/block-events-stream/.npmignore b/packages/block-logs-stream/.npmignore similarity index 100% rename from packages/block-events-stream/.npmignore rename to packages/block-logs-stream/.npmignore diff --git a/packages/block-events-stream/README.md b/packages/block-logs-stream/README.md similarity index 84% rename from packages/block-events-stream/README.md rename to packages/block-logs-stream/README.md index ab67b02848..1cbdc26dfd 100644 --- a/packages/block-events-stream/README.md +++ b/packages/block-logs-stream/README.md @@ -1,4 +1,9 @@ -# Block events stream +# Block logs stream + +A set of utilities for efficiently retrieving blockchain event logs. Built on top of [viem][0] and [RxJS][1]. + +[0]: https://viem.sh/ +[1]: https://rxjs.dev/ ## Example @@ -10,7 +15,7 @@ import { isNonPendingBlock, groupLogsByBlockNumber, blockRangeToLogs, -} from "@latticexyz/block-events-stream"; +} from "@latticexyz/block-logs-stream"; const publicClient = createPublicClient({ // your viem public client config here diff --git a/packages/block-events-stream/package.json b/packages/block-logs-stream/package.json similarity index 86% rename from packages/block-events-stream/package.json rename to packages/block-logs-stream/package.json index 654b008318..15efcc768e 100644 --- a/packages/block-events-stream/package.json +++ b/packages/block-logs-stream/package.json @@ -1,11 +1,11 @@ { - "name": "@latticexyz/block-events-stream", + "name": "@latticexyz/block-logs-stream", "version": "1.42.0", - "description": "Create a stream of EVM block events", + "description": "Create a stream of EVM block logs for events", "repository": { "type": "git", "url": "https://github.com/latticexyz/mud.git", - "directory": "packages/block-events-stream" + "directory": "packages/block-logs-stream" }, "license": "MIT", "type": "module", diff --git a/packages/block-events-stream/src/blockRangeToLogs.test.ts b/packages/block-logs-stream/src/blockRangeToLogs.test.ts similarity index 100% rename from packages/block-events-stream/src/blockRangeToLogs.test.ts rename to packages/block-logs-stream/src/blockRangeToLogs.test.ts diff --git a/packages/block-events-stream/src/blockRangeToLogs.ts b/packages/block-logs-stream/src/blockRangeToLogs.ts similarity index 100% rename from packages/block-events-stream/src/blockRangeToLogs.ts rename to packages/block-logs-stream/src/blockRangeToLogs.ts diff --git a/packages/block-events-stream/src/createBlockStream.ts b/packages/block-logs-stream/src/createBlockStream.ts similarity index 100% rename from packages/block-events-stream/src/createBlockStream.ts rename to packages/block-logs-stream/src/createBlockStream.ts diff --git a/packages/block-events-stream/src/debug.ts b/packages/block-logs-stream/src/debug.ts similarity index 100% rename from packages/block-events-stream/src/debug.ts rename to packages/block-logs-stream/src/debug.ts diff --git a/packages/block-events-stream/src/fetchLogs.test.ts b/packages/block-logs-stream/src/fetchLogs.test.ts similarity index 100% rename from packages/block-events-stream/src/fetchLogs.test.ts rename to packages/block-logs-stream/src/fetchLogs.test.ts diff --git a/packages/block-events-stream/src/fetchLogs.ts b/packages/block-logs-stream/src/fetchLogs.ts similarity index 100% rename from packages/block-events-stream/src/fetchLogs.ts rename to packages/block-logs-stream/src/fetchLogs.ts diff --git a/packages/block-events-stream/src/getLogs.ts b/packages/block-logs-stream/src/getLogs.ts similarity index 100% rename from packages/block-events-stream/src/getLogs.ts rename to packages/block-logs-stream/src/getLogs.ts diff --git a/packages/block-events-stream/src/groupLogsByBlockNumber.test.ts b/packages/block-logs-stream/src/groupLogsByBlockNumber.test.ts similarity index 97% rename from packages/block-events-stream/src/groupLogsByBlockNumber.test.ts rename to packages/block-logs-stream/src/groupLogsByBlockNumber.test.ts index ceee06e5be..5c8633a832 100644 --- a/packages/block-events-stream/src/groupLogsByBlockNumber.test.ts +++ b/packages/block-logs-stream/src/groupLogsByBlockNumber.test.ts @@ -54,7 +54,7 @@ describe("groupLogsByBlockNumber", () => { { "blockHash": "0x", "blockNumber": 1n, - "events": [ + "logs": [ { "blockHash": "0x", "blockNumber": 1n, @@ -81,7 +81,7 @@ describe("groupLogsByBlockNumber", () => { { "blockHash": "0x", "blockNumber": 3n, - "events": [ + "logs": [ { "blockHash": "0x", "blockNumber": 3n, @@ -94,7 +94,7 @@ describe("groupLogsByBlockNumber", () => { { "blockHash": "0x", "blockNumber": 5n, - "events": [ + "logs": [ { "blockHash": "0x", "blockNumber": 5n, diff --git a/packages/block-events-stream/src/groupLogsByBlockNumber.ts b/packages/block-logs-stream/src/groupLogsByBlockNumber.ts similarity index 93% rename from packages/block-events-stream/src/groupLogsByBlockNumber.ts rename to packages/block-logs-stream/src/groupLogsByBlockNumber.ts index 2f6319a9fb..0c24ce1312 100644 --- a/packages/block-events-stream/src/groupLogsByBlockNumber.ts +++ b/packages/block-logs-stream/src/groupLogsByBlockNumber.ts @@ -19,7 +19,7 @@ import { debug } from "./debug"; */ export function groupLogsByBlockNumber( logs: readonly TLog[] -): { blockNumber: BlockNumber; blockHash: Hex; events: readonly NonPendingLog[] }[] { +): { blockNumber: BlockNumber; blockHash: Hex; logs: readonly NonPendingLog[] }[] { // Pending logs don't have block numbers, so filter them out. const nonPendingLogs = logs.filter(isNonPendingLog); if (logs.length !== nonPendingLogs.length) { @@ -43,7 +43,7 @@ export function groupLogsByBlockNumber( return { blockNumber, blockHash: blockLogs[0].blockHash, - events: blockLogs, + logs: blockLogs, }; }) .filter(isDefined); diff --git a/packages/block-events-stream/src/index.ts b/packages/block-logs-stream/src/index.ts similarity index 100% rename from packages/block-events-stream/src/index.ts rename to packages/block-logs-stream/src/index.ts diff --git a/packages/block-events-stream/src/isNonPendingBlock.ts b/packages/block-logs-stream/src/isNonPendingBlock.ts similarity index 100% rename from packages/block-events-stream/src/isNonPendingBlock.ts rename to packages/block-logs-stream/src/isNonPendingBlock.ts diff --git a/packages/block-events-stream/src/isNonPendingLog.ts b/packages/block-logs-stream/src/isNonPendingLog.ts similarity index 100% rename from packages/block-events-stream/src/isNonPendingLog.ts rename to packages/block-logs-stream/src/isNonPendingLog.ts diff --git a/packages/block-events-stream/src/utils.ts b/packages/block-logs-stream/src/utils.ts similarity index 100% rename from packages/block-events-stream/src/utils.ts rename to packages/block-logs-stream/src/utils.ts diff --git a/packages/block-events-stream/tsconfig.json b/packages/block-logs-stream/tsconfig.json similarity index 100% rename from packages/block-events-stream/tsconfig.json rename to packages/block-logs-stream/tsconfig.json diff --git a/packages/block-events-stream/tsup.config.ts b/packages/block-logs-stream/tsup.config.ts similarity index 100% rename from packages/block-events-stream/tsup.config.ts rename to packages/block-logs-stream/tsup.config.ts diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 012a4bc56a..87d85e7f48 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -45,7 +45,7 @@ importers: specifier: ^4.9.5 version: 4.9.5 - packages/block-events-stream: + packages/block-logs-stream: dependencies: '@latticexyz/common': specifier: workspace:* From 600991261f2d1f3fae4df53f1ea9d83e43477a2a Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Tue, 4 Jul 2023 04:41:13 -0700 Subject: [PATCH 28/30] changeset --- .changeset/nasty-waves-divide.md | 46 ++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 .changeset/nasty-waves-divide.md diff --git a/.changeset/nasty-waves-divide.md b/.changeset/nasty-waves-divide.md new file mode 100644 index 0000000000..f85dd96047 --- /dev/null +++ b/.changeset/nasty-waves-divide.md @@ -0,0 +1,46 @@ +--- +"@latticexyz/block-logs-stream": minor +--- + +Add block logs stream package + +```ts +import { filter, map, mergeMap } from "rxjs"; +import { createPublicClient, parseAbi } from "viem"; +import { + createBlockStream, + isNonPendingBlock, + groupLogsByBlockNumber, + blockRangeToLogs, +} from "@latticexyz/block-logs-stream"; + +const publicClient = createPublicClient({ + // your viem public client config here +}); + +const latestBlock$ = await createBlockStream({ publicClient, blockTag: "latest" }); + +const latestBlockNumber$ = latestBlock$.pipe( + filter(isNonPendingBlock), + map((block) => block.number) +); + +latestBlockNumber$ + .pipe( + map((latestBlockNumber) => ({ startBlock: 0n, endBlock: latestBlockNumber })), + blockRangeToLogs({ + publicClient, + address, + events: parseAbi([ + "event StoreDeleteRecord(bytes32 table, bytes32[] key)", + "event StoreSetField(bytes32 table, bytes32[] key, uint8 schemaIndex, bytes data)", + "event StoreSetRecord(bytes32 table, bytes32[] key, bytes data)", + "event StoreEphemeralRecord(bytes32 table, bytes32[] key, bytes data)", + ]), + }), + mergeMap(({ logs }) => from(groupLogsByBlockNumber(logs))) + ) + .subscribe((block) => { + console.log("got events for block", block); + }); +``` From 03148bfe730ee68eef19d24c2db57d43d6500389 Mon Sep 17 00:00:00 2001 From: alvarius <89248902+alvrs@users.noreply.github.com> Date: Tue, 4 Jul 2023 12:43:48 +0100 Subject: [PATCH 29/30] chore: synchronize versioning across all packages (#1102) --- .changeset/config.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/config.json b/.changeset/config.json index 6b8959592a..c89ee3a73d 100644 --- a/.changeset/config.json +++ b/.changeset/config.json @@ -2,7 +2,7 @@ "$schema": "https://unpkg.com/@changesets/config@2.3.0/schema.json", "changelog": ["@changesets/changelog-github", { "repo": "latticexyz/mud" }], "commit": false, - "fixed": [], + "fixed": [["*"]], "linked": [], "access": "public", "baseBranch": "main", From 5dd52364b15a31daf65dc110e9a822610cd56b78 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Tue, 4 Jul 2023 12:51:02 +0100 Subject: [PATCH 30/30] empty