Skip to content

Commit

Permalink
feat(block-logs-stream): add block logs stream package (#1070)
Browse files Browse the repository at this point in the history
* feat(block-events-stream): add block events stream package

* wip anvil test

* Revert "wip anvil test"

This reverts commit 1952a98.

* accidentally left in a store refernence

* Update packages/block-events-stream/src/createBlockEventsStream.ts

Co-authored-by: alvarius <[email protected]>

* make streams closeable

I don't love this design

* clean up

* add log back in

* move comments

* refactor with just streams

* add README with example

* renamed

* rename again and take in a tuple as input

* fix scope

* add TODO

* add tests for grouping logs

* wip rxjs tests

* move fetchLogs to async generator, add tests

* add block range tests

* get rid of old approach

* add note about timers

* use concatMap instead of exhaustMap

* update readme

* Update packages/block-events-stream/src/fetchLogs.test.ts

Co-authored-by: alvarius <[email protected]>

* update readme

* add tsdoc

* rename for consistent terminology

* changeset

* chore: synchronize versioning across all packages (#1102)

* empty

---------

Co-authored-by: alvarius <[email protected]>
Co-authored-by: alvrs <[email protected]>
  • Loading branch information
3 people authored Jul 4, 2023
1 parent 3d01bc7 commit 72b8069
Show file tree
Hide file tree
Showing 22 changed files with 1,260 additions and 0 deletions.
46 changes: 46 additions & 0 deletions .changeset/nasty-waves-divide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
---
"@latticexyz/block-logs-stream": minor
---

Add block logs stream package

```ts
import { filter, map, mergeMap } from "rxjs";
import { createPublicClient, parseAbi } from "viem";
import {
createBlockStream,
isNonPendingBlock,
groupLogsByBlockNumber,
blockRangeToLogs,
} from "@latticexyz/block-logs-stream";

const publicClient = createPublicClient({
// your viem public client config here
});

const latestBlock$ = await createBlockStream({ publicClient, blockTag: "latest" });

const latestBlockNumber$ = latestBlock$.pipe(
filter(isNonPendingBlock),
map((block) => block.number)
);

latestBlockNumber$
.pipe(
map((latestBlockNumber) => ({ startBlock: 0n, endBlock: latestBlockNumber })),
blockRangeToLogs({
publicClient,
address,
events: parseAbi([
"event StoreDeleteRecord(bytes32 table, bytes32[] key)",
"event StoreSetField(bytes32 table, bytes32[] key, uint8 schemaIndex, bytes data)",
"event StoreSetRecord(bytes32 table, bytes32[] key, bytes data)",
"event StoreEphemeralRecord(bytes32 table, bytes32[] key, bytes data)",
]),
}),
mergeMap(({ logs }) => from(groupLogsByBlockNumber(logs)))
)
.subscribe((block) => {
console.log("got events for block", block);
});
```
6 changes: 6 additions & 0 deletions packages/block-logs-stream/.eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"extends": ["../../.eslintrc"],
"rules": {
"@typescript-eslint/explicit-function-return-type": "error"
}
}
1 change: 1 addition & 0 deletions packages/block-logs-stream/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dist
6 changes: 6 additions & 0 deletions packages/block-logs-stream/.npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*

!dist/**
!src/**
!package.json
!README.md
49 changes: 49 additions & 0 deletions packages/block-logs-stream/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Block logs stream

A set of utilities for efficiently retrieving blockchain event logs. Built on top of [viem][0] and [RxJS][1].

[0]: https://viem.sh/
[1]: https://rxjs.dev/

## Example

```ts
import { filter, map, mergeMap } from "rxjs";
import { createPublicClient, parseAbi } from "viem";
import {
createBlockStream,
isNonPendingBlock,
groupLogsByBlockNumber,
blockRangeToLogs,
} from "@latticexyz/block-logs-stream";

const publicClient = createPublicClient({
// your viem public client config here
});

const latestBlock$ = await createBlockStream({ publicClient, blockTag: "latest" });

const latestBlockNumber$ = latestBlock$.pipe(
filter(isNonPendingBlock),
map((block) => block.number)
);

latestBlockNumber$
.pipe(
map((latestBlockNumber) => ({ startBlock: 0n, endBlock: latestBlockNumber })),
blockRangeToLogs({
publicClient,
address,
events: parseAbi([
"event StoreDeleteRecord(bytes32 table, bytes32[] key)",
"event StoreSetField(bytes32 table, bytes32[] key, uint8 schemaIndex, bytes data)",
"event StoreSetRecord(bytes32 table, bytes32[] key, bytes data)",
"event StoreEphemeralRecord(bytes32 table, bytes32[] key, bytes data)",
]),
}),
mergeMap(({ logs }) => from(groupLogsByBlockNumber(logs)))
)
.subscribe((block) => {
console.log("got events for block", block);
});
```
43 changes: 43 additions & 0 deletions packages/block-logs-stream/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"name": "@latticexyz/block-logs-stream",
"version": "1.42.0",
"description": "Create a stream of EVM block logs for events",
"repository": {
"type": "git",
"url": "https://github.com/latticexyz/mud.git",
"directory": "packages/block-logs-stream"
},
"license": "MIT",
"type": "module",
"exports": {
".": "./dist/index.js"
},
"types": "src/index.ts",
"scripts": {
"build": "pnpm run build:js",
"build:js": "tsup",
"clean": "pnpm run clean:js",
"clean:js": "rimraf dist",
"dev": "tsup --watch",
"lint": "eslint .",
"test": "vitest typecheck --run --passWithNoTests && vitest --run --passWithNoTests"
},
"dependencies": {
"@latticexyz/common": "workspace:*",
"@latticexyz/config": "workspace:*",
"@latticexyz/schema-type": "workspace:*",
"abitype": "0.8.7",
"debug": "^4.3.4",
"rxjs": "7.5.5",
"viem": "1.1.7"
},
"devDependencies": {
"@types/debug": "^4.1.7",
"tsup": "^6.7.0",
"vitest": "0.31.4"
},
"publishConfig": {
"access": "public"
},
"gitHead": "914a1e0ae4a573d685841ca2ea921435057deb8f"
}
128 changes: 128 additions & 0 deletions packages/block-logs-stream/src/blockRangeToLogs.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { blockRangeToLogs } from "./blockRangeToLogs";
import { Subject, lastValueFrom, map, toArray } from "rxjs";
import { EIP1193RequestFn, RpcLog, Transport, createPublicClient, createTransport } from "viem";
import { wait } from "./utils";

// TODO: there is a chance that these tests will need to be written differently with timers to avoid flakiness

const mockedTransportRequest = vi.fn<Parameters<EIP1193RequestFn>, ReturnType<EIP1193RequestFn>>();
const mockTransport: Transport = () =>
createTransport({
key: "mock",
name: "Mock Transport",
request: mockedTransportRequest as any,
type: "mock",
});

const publicClient = createPublicClient({
transport: mockTransport,
});

describe("blockRangeToLogs", () => {
beforeEach(() => {
mockedTransportRequest.mockClear();
});

it("processes block ranges in order", async () => {
const requests: any[] = [];
mockedTransportRequest.mockImplementation(async ({ method, params }): Promise<RpcLog[]> => {
requests.push(params);
if (method !== "eth_getLogs") throw new Error("not implemented");
await wait(450);
return [];
});

const latestBlockNumber$ = new Subject<bigint>();

const logs$ = latestBlockNumber$.pipe(
map((endBlock) => ({ startBlock: 0n, endBlock })),
blockRangeToLogs({
publicClient,
address: "0x",
events: [],
})
);

(async (): Promise<void> => {
for (let blockNumber = 1000n; blockNumber <= 1010n; blockNumber++) {
await wait(100);
latestBlockNumber$.next(blockNumber);
}
await wait(100);
latestBlockNumber$.complete();
})();

const results = await lastValueFrom(logs$.pipe(toArray()));

expect(requests).toMatchInlineSnapshot(`
[
[
{
"address": "0x",
"fromBlock": "0x0",
"toBlock": "0x3e8",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x3e9",
"toBlock": "0x3ec",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x3ed",
"toBlock": "0x3f0",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x3f1",
"toBlock": "0x3f2",
"topics": [
[],
],
},
],
]
`);

expect(results).toMatchInlineSnapshot(`
[
{
"fromBlock": 0n,
"logs": [],
"toBlock": 1000n,
},
{
"fromBlock": 1001n,
"logs": [],
"toBlock": 1004n,
},
{
"fromBlock": 1005n,
"logs": [],
"toBlock": 1008n,
},
{
"fromBlock": 1009n,
"logs": [],
"toBlock": 1010n,
},
]
`);
});
});
72 changes: 72 additions & 0 deletions packages/block-logs-stream/src/blockRangeToLogs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { EMPTY, OperatorFunction, concatMap, from, pipe, tap } from "rxjs";
import { FetchLogsResult, fetchLogs } from "./fetchLogs";
import { AbiEvent, Address } from "abitype";
import { BlockNumber, PublicClient } from "viem";

export type BlockRangeToLogsOptions<TAbiEvents extends readonly AbiEvent[]> = {
/**
* [viem `PublicClient`][0] used for fetching logs from the RPC.
*
* [0]: https://viem.sh/docs/clients/public.html
*/
publicClient: PublicClient;
/**
* Optional contract address(es) to fetch logs for.
*/
address?: Address | Address[];
/**
* Events to fetch logs for.
*/
events: TAbiEvents;
/**
* Optional maximum block range, if your RPC limits the amount of blocks fetched at a time.
*/
maxBlockRange?: bigint;
};

export type BlockRangeToLogsResult<TAbiEvents extends readonly AbiEvent[]> = OperatorFunction<
{ startBlock: BlockNumber; endBlock: BlockNumber },
FetchLogsResult<TAbiEvents>
>;

/**
* Takes in an observable of `Observable<{ startBlock: bigint, endBlock: bigint }>` and uses a viem `publicClient` to get logs for the contract `address` and matching `events` and emits the logs as they are fetched.
*
* @param {BlockRangeToLogsOptions<AbiEvent[]>} options See `BlockRangeToLogsOptions`.
* @returns {BlockRangeToLogsResult<AbiEvent[]>} An operator function that transforms a stream of block ranges into a stream of fetched logs.
*/
export function blockRangeToLogs<TAbiEvents extends readonly AbiEvent[]>({
publicClient,
address,
events,
maxBlockRange,
}: BlockRangeToLogsOptions<TAbiEvents>): BlockRangeToLogsResult<TAbiEvents> {
let fromBlock: bigint;
let toBlock: bigint;

return pipe(
tap(({ endBlock, startBlock }) => {
fromBlock ??= startBlock;
toBlock = endBlock;
}),
// concatMap only processes the next emission once the inner observable completes,
// so it always uses the latest`toBlock` value.
concatMap(() => {
if (fromBlock > toBlock) return EMPTY;
return from(
fetchLogs({
publicClient,
address,
events,
fromBlock,
toBlock,
maxBlockRange,
})
).pipe(
tap(({ toBlock }) => {
fromBlock = toBlock + 1n;
})
);
})
);
}
20 changes: 20 additions & 0 deletions packages/block-logs-stream/src/createBlockStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Observable } from "rxjs";
import type { Block, BlockTag, PublicClient } from "viem";

export type CreateBlockStreamOptions = {
publicClient: PublicClient;
blockTag: BlockTag;
};

export type CreateBlockStreamResult = Observable<Block>;

export function createBlockStream({ publicClient, blockTag }: CreateBlockStreamOptions): CreateBlockStreamResult {
return new Observable(function subscribe(subscriber) {
return publicClient.watchBlocks({
blockTag,
emitOnBegin: true,
onBlock: (block) => subscriber.next(block),
onError: (error) => subscriber.error(error),
});
});
}
3 changes: 3 additions & 0 deletions packages/block-logs-stream/src/debug.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import createDebug from "debug";

export const debug = createDebug("mud:block-events-stream");
Loading

0 comments on commit 72b8069

Please sign in to comment.