Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(block-events-stream): add block events stream package
Browse files Browse the repository at this point in the history
holic committed Jun 26, 2023
1 parent 77b940d commit 51a7be0
Showing 17 changed files with 464 additions and 0 deletions.
6 changes: 6 additions & 0 deletions packages/block-events-stream/.eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"extends": ["../../.eslintrc"],
"rules": {
"@typescript-eslint/explicit-function-return-type": "error"
}
}
1 change: 1 addition & 0 deletions packages/block-events-stream/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dist
6 changes: 6 additions & 0 deletions packages/block-events-stream/.npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*

!dist/**
!src/**
!package.json
!README.md
44 changes: 44 additions & 0 deletions packages/block-events-stream/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"name": "@latticexyz/block-events-stream",
"version": "1.42.0",
"description": "Create a stream of EVM block events",
"repository": {
"type": "git",
"url": "https://github.com/latticexyz/mud.git",
"directory": "packages/block-events-stream"
},
"license": "MIT",
"type": "module",
"exports": {
".": "./dist/index.js"
},
"types": "src/index.ts",
"scripts": {
"build": "pnpm run build:js",
"build:js": "tsup",
"clean": "pnpm run clean:js",
"clean:js": "rimraf dist",
"dev": "tsup --watch",
"lint": "eslint .",
"test": "vitest typecheck --run --passWithNoTests && vitest --run --passWithNoTests"
},
"dependencies": {
"@latticexyz/common": "workspace:*",
"@latticexyz/config": "workspace:*",
"@latticexyz/schema-type": "workspace:*",
"@latticexyz/store": "workspace:*",
"abitype": "0.8.7",
"debug": "^4.3.4",
"rxjs": "7.5.5",
"viem": "1.1.7"
},
"devDependencies": {
"@types/debug": "^4.1.7",
"tsup": "^6.7.0",
"vitest": "0.31.4"
},
"publishConfig": {
"access": "public"
},
"gitHead": "914a1e0ae4a573d685841ca2ea921435057deb8f"
}
21 changes: 21 additions & 0 deletions packages/block-events-stream/src/common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { BehaviorSubject, Observable } from "rxjs";
import type { BlockNumber, Hex } from "viem";
import type { AbiEvent } from "abitype";
import { NonPendingLog } from "./isNonPendingLog";
import { GetLogsReturnType } from "./getLogs";

export type ReadonlyBehaviorSubject<T> = Pick<BehaviorSubject<T>, "subscribe" | "pipe" | "value" | "getValue">;

export type BlockEvents<TAbiEvent extends AbiEvent> = {
blockNumber: BlockNumber;
blockHash: Hex;
events: NonPendingLog<GetLogsReturnType<TAbiEvent[]>>[];
};

export type BlockEventsStream<TAbiEvent extends AbiEvent> = Observable<BlockEvents<TAbiEvent>>;

export type BlockEventsFromStream<TStream extends BlockEventsStream<AbiEvent>> = TStream extends BlockEventsStream<
infer TAbiEvent
>
? BlockEvents<TAbiEvent>
: never;
126 changes: 126 additions & 0 deletions packages/block-events-stream/src/createBlockEventsStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import { BehaviorSubject, Subject } from "rxjs";
import type { BlockNumber, Hex, PublicClient } from "viem";
import type { AbiEvent } from "abitype";
import { BlockEvents, BlockEventsStream, ReadonlyBehaviorSubject } from "./common";
import { bigIntMin } from "./utils";
import { isNonPendingLog } from "./isNonPendingLog";
import { debug } from "./debug";
import { createBlockNumberStream } from "./createBlockNumberStream";
import { getLogs } from "./getLogs";
import { storeEventsAbi } from "@latticexyz/store";

export type CreateBlockEventsStreamOptions<TAbiEvent extends AbiEvent> = {
publicClient: PublicClient;
fromBlock?: BlockNumber;
toBlock?: BlockNumber | ReadonlyBehaviorSubject<BlockNumber>;
address?: Hex;
events: readonly TAbiEvent[];
maxBlockRange?: number; // defaults to 1000
};

export async function createBlockEventsStream<TAbiEvent extends AbiEvent>({
publicClient,
fromBlock: initialFromBlock,
toBlock: initialToBlock,
address,
events,
maxBlockRange = 1000,
}: CreateBlockEventsStreamOptions<TAbiEvent>): Promise<BlockEventsStream<TAbiEvent>> {
debug("createBlockEventsStream", { initialFromBlock, initialToBlock, address, events, maxBlockRange });

if (initialFromBlock == null) {
debug("getting earliest block");
const earliestBlock = await publicClient.getBlock({ blockTag: "earliest" });
debug("earliest block", earliestBlock);
if (earliestBlock.number == null) {
// TODO: better error
throw new Error(`pending or missing earliest block`);
}
initialFromBlock = earliestBlock.number;
}

if (initialToBlock == null) {
debug("creating latest block number stream");
initialToBlock = await createBlockNumberStream({ publicClient, blockTag: "latest" });
}

const stream = new Subject<BlockEvents<TAbiEvent>>();
fetchBlockRange(
initialFromBlock,
maxBlockRange,
initialToBlock instanceof BehaviorSubject ? initialToBlock.value : initialToBlock
);

async function fetchBlockRange(fromBlock: bigint, maxBlockRange: number, lastBlockNumber: bigint): Promise<void> {
try {
const toBlock = bigIntMin(fromBlock + BigInt(maxBlockRange), lastBlockNumber);
debug("fetching block range", { fromBlock, toBlock });

// TODO: swap this with viem `getLogs` call when viem supports multiple events: https://github.com/wagmi-dev/viem/pull/633
const logs = await getLogs({
publicClient,
address,
fromBlock,
toBlock,
events: storeEventsAbi,
});

// TODO: do something other than just throwing out pending logs
const nonPendingLogs = logs.filter(isNonPendingLog);

if (logs.length !== nonPendingLogs.length) {
// TODO: better error
console.warn("pending logs discarded");
}

// TODO: handle RPC block range errors
// TODO: handle RPC rate limit errors (hopefully via client retry policy)

const blockNumbers = Array.from(new Set(nonPendingLogs.map((log) => log.blockNumber)));
blockNumbers.sort((a, b) => (a < b ? -1 : a > b ? 1 : 0));

for (const blockNumber of blockNumbers) {
const blockLogs = nonPendingLogs.filter((log) => log.blockNumber === blockNumber);
blockLogs.sort((a, b) => (a.logIndex < b.logIndex ? -1 : a.logIndex > b.logIndex ? 1 : 0));

if (blockLogs.length) {
debug("emitting events for block", { blockNumber, blockHash: blockLogs[0].blockHash, events: blockLogs });
stream.next({
blockNumber,
blockHash: blockLogs[0].blockHash,
events: blockLogs,
// TODO: figure out why we need to cast this
} as any as BlockEvents<TAbiEvent>);
}
}

if (toBlock < lastBlockNumber) {
fetchBlockRange(toBlock + 1n, maxBlockRange, lastBlockNumber);
return;
}

if (initialToBlock instanceof BehaviorSubject) {
if (initialToBlock.value > toBlock) {
fetchBlockRange(toBlock + 1n, maxBlockRange, initialToBlock.value);
return;
}

debug("waiting for next block");
const sub = initialToBlock.subscribe((blockNumber) => {
if (blockNumber > toBlock) {
sub.unsubscribe();
fetchBlockRange(toBlock + 1n, maxBlockRange, blockNumber);
}
});
return;
}

stream.complete();
} catch (error: unknown) {
// TODO: do more specific error handling?
stream.error(error);
}
}

return stream.asObservable();
}
44 changes: 44 additions & 0 deletions packages/block-events-stream/src/createBlockNumberStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { BehaviorSubject, filter, map } from "rxjs";
import type { Block, BlockNumber, BlockTag, PublicClient } from "viem";
import { createBlockStream } from "./createBlockStream";
import { ReadonlyBehaviorSubject } from "./common";
import { isNonPendingBlock } from "./isNonPendingBlock";

// TODO: pass through viem's types, e.g. WatchBlocksParameters -> GetBlockReturnType

export type CreateBlockNumberStreamOptions =
| {
publicClient: PublicClient;
blockTag: Omit<BlockTag, "pending">;
block$?: never;
}
| {
publicClient?: never;
blockTag?: never;
block$: ReadonlyBehaviorSubject<Block>;
};

export async function createBlockNumberStream({
publicClient,
blockTag,
block$: initialBlock$,
}: CreateBlockNumberStreamOptions): Promise<ReadonlyBehaviorSubject<BlockNumber>> {
const block$ = initialBlock$ ?? (await createBlockStream({ publicClient, blockTag: blockTag as BlockTag }));
const block = block$.value;

if (!block.number) {
// TODO: better error
throw new Error(`${blockTag} block missing or pending`);
}

const blockNumber$ = new BehaviorSubject<BlockNumber>(block.number);

block$
.pipe(
filter(isNonPendingBlock),
map((block) => block.number)
)
.subscribe(blockNumber$);

return blockNumber$;
}
38 changes: 38 additions & 0 deletions packages/block-events-stream/src/createBlockStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { BehaviorSubject } from "rxjs";
import type { Block, BlockTag, PublicClient } from "viem";
import { ReadonlyBehaviorSubject } from "./common";

// TODO: pass through viem's types, e.g. WatchBlocksParameters -> GetBlockReturnType
// TODO: make stream closeable to make use of unwatch?

export type CreateBlockStreamOptions = {
publicClient: PublicClient;
blockTag: BlockTag;
};

export function createBlockStream({
publicClient,
blockTag,
}: CreateBlockStreamOptions): Promise<ReadonlyBehaviorSubject<Block>> {
return new Promise((resolve, reject) => {
let stream: BehaviorSubject<Block> | undefined;
// TODO: do something with unwatch?
const unwatch = publicClient.watchBlocks({
blockTag,
emitOnBegin: true,
onBlock: (block) => {
if (!stream) {
stream = new BehaviorSubject(block);
// TODO: return actual readonly behavior subject rather than just a type?
resolve(stream as ReadonlyBehaviorSubject<Block>);
} else {
stream.next(block);
}
},
onError: (error) => {
reject(error);
stream?.error(error);
},
});
});
}
3 changes: 3 additions & 0 deletions packages/block-events-stream/src/debug.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import createDebug from "debug";

export const debug = createDebug("mud:block-events-stream");
71 changes: 71 additions & 0 deletions packages/block-events-stream/src/getLogs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { AbiEvent } from "abitype";
import {
Address,
BlockNumber,
BlockTag,
Log,
PublicClient,
decodeEventLog,
encodeEventTopics,
numberToHex,
formatLog,
} from "viem";
import { isDefined } from "@latticexyz/common/utils";

// Based on https://github.com/wagmi-dev/viem/blob/main/src/actions/public/getLogs.ts
// TODO: swap this out once viem has support for multiple events: https://github.com/wagmi-dev/viem/pull/633

export type GetLogsOptions<TAbiEvents extends readonly AbiEvent[]> = {
publicClient: PublicClient;
address?: Address | Address[];
events: TAbiEvents;
fromBlock: BlockNumber | BlockTag;
toBlock: BlockNumber | BlockTag;
};

export type GetLogsReturnType<TAbiEvents extends readonly AbiEvent[]> = Log<
bigint,
number,
TAbiEvents[number],
true,
TAbiEvents
>;

export async function getLogs<TAbiEvents extends readonly AbiEvent[]>({
publicClient,
address,
events,
fromBlock,
toBlock,
}: GetLogsOptions<TAbiEvents>): Promise<GetLogsReturnType<TAbiEvents>[]> {
const topics = [events.flatMap((event) => encodeEventTopics({ abi: [event], eventName: event.name }))];

const logs = await publicClient.request({
method: "eth_getLogs",
params: [
{
address,
topics,
fromBlock: typeof fromBlock === "bigint" ? numberToHex(fromBlock) : fromBlock,
toBlock: typeof toBlock === "bigint" ? numberToHex(toBlock) : toBlock,
},
],
});

return logs
.map((log) => {
try {
const { eventName, args } = decodeEventLog({
abi: events,
data: log.data,
topics: log.topics,
strict: true,
});
return formatLog(log, { args, eventName });
} catch (err) {
// We're using strict mode, so just skip if there is an error decoding.
return;
}
})
.filter(isDefined) as GetLogsReturnType<TAbiEvents>[];
}
4 changes: 4 additions & 0 deletions packages/block-events-stream/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export * from "./common";
export * from "./createBlockEventsStream";
export * from "./createBlockNumberStream";
export * from "./createBlockStream";
12 changes: 12 additions & 0 deletions packages/block-events-stream/src/isNonPendingBlock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { Block } from "viem";

export type NonPendingBlock<TBlock extends Block> = TBlock & {
hash: NonNullable<TBlock["hash"]>;
logsBloom: NonNullable<TBlock["logsBloom"]>;
nonce: NonNullable<TBlock["nonce"]>;
number: NonNullable<TBlock["number"]>;
};

export function isNonPendingBlock<TBlock extends Block>(block: TBlock): block is NonPendingBlock<TBlock> {
return block.hash != null && block.logsBloom != null && block.nonce != null && block.number != null;
}
Loading

0 comments on commit 51a7be0

Please sign in to comment.