Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(block-logs-stream): add block logs stream package #1070

Merged
merged 33 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
51a7be0
feat(block-events-stream): add block events stream package
holic Jun 26, 2023
1952a98
wip anvil test
holic Jun 26, 2023
8ce8150
Revert "wip anvil test"
holic Jun 26, 2023
711e1af
accidentally left in a store refernence
holic Jun 26, 2023
c1af973
Update packages/block-events-stream/src/createBlockEventsStream.ts
holic Jun 27, 2023
0f55aeb
make streams closeable
holic Jun 28, 2023
9e1d9ea
clean up
holic Jun 28, 2023
3120c2f
add log back in
holic Jun 28, 2023
e9fcb0f
move comments
holic Jun 28, 2023
db64640
refactor with just streams
holic Jun 30, 2023
63f6cc0
add README with example
holic Jun 30, 2023
279b51b
Merge remote-tracking branch 'origin/main' into holic/block-sync-package
holic Jun 30, 2023
cf6b101
renamed
holic Jun 30, 2023
2a5ba98
rename again and take in a tuple as input
holic Jun 30, 2023
58d4541
fix scope
holic Jun 30, 2023
6a8d908
add TODO
holic Jun 30, 2023
e2b8650
add tests for grouping logs
holic Jun 30, 2023
3a4c02f
wip rxjs tests
holic Jul 3, 2023
6ab1699
move fetchLogs to async generator, add tests
holic Jul 3, 2023
18a389b
add block range tests
holic Jul 3, 2023
ad76b3c
get rid of old approach
holic Jul 3, 2023
5a61720
add note about timers
holic Jul 3, 2023
ee10159
use concatMap instead of exhaustMap
alvrs Jul 3, 2023
5489e92
update readme
holic Jul 4, 2023
ae64564
Update packages/block-events-stream/src/fetchLogs.test.ts
holic Jul 4, 2023
84a5c29
update readme
holic Jul 4, 2023
30673bb
add tsdoc
holic Jul 4, 2023
439f0c6
rename for consistent terminology
holic Jul 4, 2023
10f9214
Merge remote-tracking branch 'origin/main' into holic/block-sync-package
holic Jul 4, 2023
6009912
changeset
holic Jul 4, 2023
03148bf
chore: synchronize versioning across all packages (#1102)
alvrs Jul 4, 2023
5dd5236
empty
holic Jul 4, 2023
4891d89
Merge remote-tracking branch 'origin/main' into holic/block-sync-package
holic Jul 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions .changeset/nasty-waves-divide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
---
"@latticexyz/block-logs-stream": minor
---

Add block logs stream package

```ts
import { filter, map, mergeMap } from "rxjs";
import { createPublicClient, parseAbi } from "viem";
import {
createBlockStream,
isNonPendingBlock,
groupLogsByBlockNumber,
blockRangeToLogs,
} from "@latticexyz/block-logs-stream";

const publicClient = createPublicClient({
// your viem public client config here
});

const latestBlock$ = await createBlockStream({ publicClient, blockTag: "latest" });

const latestBlockNumber$ = latestBlock$.pipe(
filter(isNonPendingBlock),
map((block) => block.number)
);

latestBlockNumber$
.pipe(
map((latestBlockNumber) => ({ startBlock: 0n, endBlock: latestBlockNumber })),
blockRangeToLogs({
publicClient,
address,
events: parseAbi([
"event StoreDeleteRecord(bytes32 table, bytes32[] key)",
"event StoreSetField(bytes32 table, bytes32[] key, uint8 schemaIndex, bytes data)",
"event StoreSetRecord(bytes32 table, bytes32[] key, bytes data)",
"event StoreEphemeralRecord(bytes32 table, bytes32[] key, bytes data)",
]),
}),
mergeMap(({ logs }) => from(groupLogsByBlockNumber(logs)))
)
.subscribe((block) => {
console.log("got events for block", block);
});
```
6 changes: 6 additions & 0 deletions packages/block-logs-stream/.eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"extends": ["../../.eslintrc"],
"rules": {
"@typescript-eslint/explicit-function-return-type": "error"
}
}
1 change: 1 addition & 0 deletions packages/block-logs-stream/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dist
6 changes: 6 additions & 0 deletions packages/block-logs-stream/.npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*

!dist/**
!src/**
!package.json
!README.md
49 changes: 49 additions & 0 deletions packages/block-logs-stream/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Block logs stream

A set of utilities for efficiently retrieving blockchain event logs. Built on top of [viem][0] and [RxJS][1].

[0]: https://viem.sh/
[1]: https://rxjs.dev/

## Example

```ts
import { filter, map, mergeMap } from "rxjs";
import { createPublicClient, parseAbi } from "viem";
import {
createBlockStream,
isNonPendingBlock,
groupLogsByBlockNumber,
blockRangeToLogs,
} from "@latticexyz/block-logs-stream";

const publicClient = createPublicClient({
// your viem public client config here
});

const latestBlock$ = await createBlockStream({ publicClient, blockTag: "latest" });

const latestBlockNumber$ = latestBlock$.pipe(
filter(isNonPendingBlock),
map((block) => block.number)
);

latestBlockNumber$
.pipe(
map((latestBlockNumber) => ({ startBlock: 0n, endBlock: latestBlockNumber })),
blockRangeToLogs({
publicClient,
address,
events: parseAbi([
"event StoreDeleteRecord(bytes32 table, bytes32[] key)",
"event StoreSetField(bytes32 table, bytes32[] key, uint8 schemaIndex, bytes data)",
"event StoreSetRecord(bytes32 table, bytes32[] key, bytes data)",
"event StoreEphemeralRecord(bytes32 table, bytes32[] key, bytes data)",
]),
}),
mergeMap(({ logs }) => from(groupLogsByBlockNumber(logs)))
)
.subscribe((block) => {
console.log("got events for block", block);
});
```
43 changes: 43 additions & 0 deletions packages/block-logs-stream/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"name": "@latticexyz/block-logs-stream",
"version": "1.42.0",
"description": "Create a stream of EVM block logs for events",
"repository": {
"type": "git",
"url": "https://github.com/latticexyz/mud.git",
"directory": "packages/block-logs-stream"
},
"license": "MIT",
"type": "module",
"exports": {
".": "./dist/index.js"
},
"types": "src/index.ts",
"scripts": {
"build": "pnpm run build:js",
"build:js": "tsup",
"clean": "pnpm run clean:js",
"clean:js": "rimraf dist",
"dev": "tsup --watch",
"lint": "eslint .",
"test": "vitest typecheck --run --passWithNoTests && vitest --run --passWithNoTests"
},
"dependencies": {
"@latticexyz/common": "workspace:*",
"@latticexyz/config": "workspace:*",
"@latticexyz/schema-type": "workspace:*",
"abitype": "0.8.7",
"debug": "^4.3.4",
"rxjs": "7.5.5",
"viem": "1.1.7"
},
"devDependencies": {
"@types/debug": "^4.1.7",
"tsup": "^6.7.0",
"vitest": "0.31.4"
},
"publishConfig": {
"access": "public"
},
"gitHead": "914a1e0ae4a573d685841ca2ea921435057deb8f"
}
128 changes: 128 additions & 0 deletions packages/block-logs-stream/src/blockRangeToLogs.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { blockRangeToLogs } from "./blockRangeToLogs";
import { Subject, lastValueFrom, map, toArray } from "rxjs";
import { EIP1193RequestFn, RpcLog, Transport, createPublicClient, createTransport } from "viem";
import { wait } from "./utils";

// TODO: there is a chance that these tests will need to be written differently with timers to avoid flakiness

const mockedTransportRequest = vi.fn<Parameters<EIP1193RequestFn>, ReturnType<EIP1193RequestFn>>();
const mockTransport: Transport = () =>
createTransport({
key: "mock",
name: "Mock Transport",
request: mockedTransportRequest as any,
type: "mock",
});

const publicClient = createPublicClient({
transport: mockTransport,
});

describe("blockRangeToLogs", () => {
beforeEach(() => {
mockedTransportRequest.mockClear();
});

it("processes block ranges in order", async () => {
const requests: any[] = [];
mockedTransportRequest.mockImplementation(async ({ method, params }): Promise<RpcLog[]> => {
requests.push(params);
if (method !== "eth_getLogs") throw new Error("not implemented");
await wait(450);
return [];
});

const latestBlockNumber$ = new Subject<bigint>();

const logs$ = latestBlockNumber$.pipe(
map((endBlock) => ({ startBlock: 0n, endBlock })),
blockRangeToLogs({
publicClient,
address: "0x",
events: [],
})
);

(async (): Promise<void> => {
for (let blockNumber = 1000n; blockNumber <= 1010n; blockNumber++) {
await wait(100);
latestBlockNumber$.next(blockNumber);
}
await wait(100);
latestBlockNumber$.complete();
})();

const results = await lastValueFrom(logs$.pipe(toArray()));

expect(requests).toMatchInlineSnapshot(`
[
[
{
"address": "0x",
"fromBlock": "0x0",
"toBlock": "0x3e8",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x3e9",
"toBlock": "0x3ec",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x3ed",
"toBlock": "0x3f0",
"topics": [
[],
],
},
],
[
{
"address": "0x",
"fromBlock": "0x3f1",
"toBlock": "0x3f2",
"topics": [
[],
],
},
],
]
`);

expect(results).toMatchInlineSnapshot(`
[
{
"fromBlock": 0n,
"logs": [],
"toBlock": 1000n,
},
{
"fromBlock": 1001n,
"logs": [],
"toBlock": 1004n,
},
{
"fromBlock": 1005n,
"logs": [],
"toBlock": 1008n,
},
{
"fromBlock": 1009n,
"logs": [],
"toBlock": 1010n,
},
]
`);
});
});
72 changes: 72 additions & 0 deletions packages/block-logs-stream/src/blockRangeToLogs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { EMPTY, OperatorFunction, concatMap, from, pipe, tap } from "rxjs";
import { FetchLogsResult, fetchLogs } from "./fetchLogs";
import { AbiEvent, Address } from "abitype";
import { BlockNumber, PublicClient } from "viem";

export type BlockRangeToLogsOptions<TAbiEvents extends readonly AbiEvent[]> = {
/**
* [viem `PublicClient`][0] used for fetching logs from the RPC.
*
* [0]: https://viem.sh/docs/clients/public.html
*/
publicClient: PublicClient;
/**
* Optional contract address(es) to fetch logs for.
*/
address?: Address | Address[];
/**
* Events to fetch logs for.
*/
events: TAbiEvents;
/**
* Optional maximum block range, if your RPC limits the amount of blocks fetched at a time.
*/
maxBlockRange?: bigint;
};

export type BlockRangeToLogsResult<TAbiEvents extends readonly AbiEvent[]> = OperatorFunction<
{ startBlock: BlockNumber; endBlock: BlockNumber },
FetchLogsResult<TAbiEvents>
>;

/**
* Takes in an observable of `Observable<{ startBlock: bigint, endBlock: bigint }>` and uses a viem `publicClient` to get logs for the contract `address` and matching `events` and emits the logs as they are fetched.
*
* @param {BlockRangeToLogsOptions<AbiEvent[]>} options See `BlockRangeToLogsOptions`.
* @returns {BlockRangeToLogsResult<AbiEvent[]>} An operator function that transforms a stream of block ranges into a stream of fetched logs.
*/
export function blockRangeToLogs<TAbiEvents extends readonly AbiEvent[]>({
publicClient,
address,
events,
maxBlockRange,
}: BlockRangeToLogsOptions<TAbiEvents>): BlockRangeToLogsResult<TAbiEvents> {
let fromBlock: bigint;
let toBlock: bigint;

return pipe(
tap(({ endBlock, startBlock }) => {
fromBlock ??= startBlock;
toBlock = endBlock;
}),
// concatMap only processes the next emission once the inner observable completes,
// so it always uses the latest`toBlock` value.
concatMap(() => {
if (fromBlock > toBlock) return EMPTY;
return from(
fetchLogs({
publicClient,
address,
events,
fromBlock,
toBlock,
maxBlockRange,
})
).pipe(
tap(({ toBlock }) => {
fromBlock = toBlock + 1n;
})
);
})
);
}
20 changes: 20 additions & 0 deletions packages/block-logs-stream/src/createBlockStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Observable } from "rxjs";
import type { Block, BlockTag, PublicClient } from "viem";

export type CreateBlockStreamOptions = {
publicClient: PublicClient;
blockTag: BlockTag;
};

export type CreateBlockStreamResult = Observable<Block>;

export function createBlockStream({ publicClient, blockTag }: CreateBlockStreamOptions): CreateBlockStreamResult {
return new Observable(function subscribe(subscriber) {
return publicClient.watchBlocks({
blockTag,
emitOnBegin: true,
onBlock: (block) => subscriber.next(block),
onError: (error) => subscriber.error(error),
});
});
}
3 changes: 3 additions & 0 deletions packages/block-logs-stream/src/debug.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import createDebug from "debug";

export const debug = createDebug("mud:block-events-stream");
Loading