Skip to content

Commit

Permalink
feat(block-events-stream): new block events stream package
Browse files Browse the repository at this point in the history
  • Loading branch information
holic committed Jun 22, 2023
1 parent 8ffed42 commit 093a852
Show file tree
Hide file tree
Showing 15 changed files with 390 additions and 0 deletions.
6 changes: 6 additions & 0 deletions packages/block-events-stream/.eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"extends": ["../../.eslintrc"],
"rules": {
"@typescript-eslint/explicit-function-return-type": "error"
}
}
1 change: 1 addition & 0 deletions packages/block-events-stream/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dist
6 changes: 6 additions & 0 deletions packages/block-events-stream/.npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*

!dist/**
!src/**
!package.json
!README.md
21 changes: 21 additions & 0 deletions packages/block-events-stream/src/common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { BehaviorSubject, Observable } from "rxjs";
import type { BlockNumber, Hex } from "viem";
import type { AbiEvent } from "abitype";
import { NonPendingLog } from "./isNonPendingLog";
import { GetLogsReturnType } from "./getLogs";

export type ReadonlyBehaviorSubject<T> = Pick<BehaviorSubject<T>, "subscribe" | "pipe" | "value" | "getValue">;

export type BlockEvents<TAbiEvent extends AbiEvent> = {
blockNumber: BlockNumber;
blockHash: Hex;
events: NonPendingLog<GetLogsReturnType<TAbiEvent[]>>[];
};

export type BlockEventsStream<TAbiEvent extends AbiEvent> = Observable<BlockEvents<TAbiEvent>>;

export type BlockEventsFromStream<TStream extends BlockEventsStream<AbiEvent>> = TStream extends BlockEventsStream<
infer TAbiEvent
>
? BlockEvents<TAbiEvent>
: never;
130 changes: 130 additions & 0 deletions packages/block-events-stream/src/createBlockEventsStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { BehaviorSubject, Subject, Subscribable } from "rxjs";
import type { BlockNumber, Hex, PublicClient } from "viem";
import type { AbiEvent } from "abitype";
import { BlockEvents, BlockEventsStream } from "./common";
import { bigIntMin } from "./utils";
import { isNonPendingLog } from "./isNonPendingLog";
import { debug } from "./debug";
import { createBlockNumberStream } from "./createBlockNumberStream";
import { getLogs } from "./getLogs";
import { storeEventsAbi } from "@latticexyz/store";

// TODO: add nice logging with debub lib or similar
// TODO: make `toBlock` accept a `BehaviorSubject<BlockNumber>` or add `latestBlockStream` so we only need one listener/watcher/poller
// TODO: consider excluding `pending` block tags so we can just assume all block numbers are present

export type CreateBlockEventsStreamOptions<TAbiEvent extends AbiEvent> = {
publicClient: PublicClient;
fromBlock?: BlockNumber;
toBlock?: BlockNumber | Subscribable<BlockNumber>;
address?: Hex;
events: readonly TAbiEvent[];
maxBlockRange?: number; // defaults to 1000
};

export async function createBlockEventsStream<TAbiEvent extends AbiEvent>({
publicClient,
fromBlock: initialFromBlock,
toBlock: initialToBlock,
address,
events,
maxBlockRange = 1000,
}: CreateBlockEventsStreamOptions<TAbiEvent>): Promise<BlockEventsStream<TAbiEvent>> {
debug("createBlockEventsStream", { initialFromBlock, initialToBlock, address, events, maxBlockRange });

if (initialFromBlock == null) {
debug("getting earliest block");
const earliestBlock = await publicClient.getBlock({ blockTag: "earliest" });
debug("earliest block", earliestBlock);
if (earliestBlock.number == null) {
// TODO: better error
throw new Error(`pending or missing earliest block`);
}
initialFromBlock = earliestBlock.number;
}

if (initialToBlock == null) {
debug("creating latest block number stream");
initialToBlock = await createBlockNumberStream({ publicClient, blockTag: "latest" });
}

const stream = new Subject<BlockEvents<TAbiEvent>>();
fetchBlockRange(
initialFromBlock,
maxBlockRange,
initialToBlock instanceof BehaviorSubject ? initialToBlock.value : initialToBlock
);

async function fetchBlockRange(fromBlock: bigint, maxBlockRange: number, lastBlockNumber: bigint): Promise<void> {
try {
const toBlock = bigIntMin(fromBlock + BigInt(maxBlockRange), lastBlockNumber);
debug("fetching block range", { fromBlock, toBlock });

// TODO: swap this with viem `getLogs` call when viem supports multiple events: https://github.com/wagmi-dev/viem/pull/633
const logs = await getLogs({
publicClient,
address,
fromBlock,
toBlock,
events: storeEventsAbi,
});

// TODO: do something other than just throwing out pending logs
const nonPendingLogs = logs.filter(isNonPendingLog);

if (logs.length !== nonPendingLogs.length) {
// TODO: better error
console.warn("pending logs discarded");
}

// TODO: handle RPC block range errors
// TODO: handle RPC rate limit errors (hopefully via client retry policy)

const blockNumbers = Array.from(new Set(nonPendingLogs.map((log) => log.blockNumber)));
blockNumbers.sort((a, b) => (a < b ? -1 : a > b ? 1 : 0));

for (const blockNumber of blockNumbers) {
const blockLogs = nonPendingLogs.filter((log) => log.blockNumber === blockNumber);
blockLogs.sort((a, b) => (a.logIndex < b.logIndex ? -1 : a.logIndex > b.logIndex ? 1 : 0));

if (blockLogs.length) {
debug("emitting events for block", { blockNumber, blockHash: blockLogs[0].blockHash, events: blockLogs });
stream.next({
blockNumber,
blockHash: blockLogs[0].blockHash,
events: blockLogs,
// TODO: figure out why we need to cast this
} as any as BlockEvents<TAbiEvent>);
}
}

if (toBlock < lastBlockNumber) {
fetchBlockRange(toBlock + 1n, maxBlockRange, lastBlockNumber);
return;
}

if (initialToBlock instanceof BehaviorSubject) {
if (initialToBlock.value > toBlock) {
fetchBlockRange(toBlock + 1n, maxBlockRange, initialToBlock.value);
return;
}

debug("waiting for next block");
const sub = initialToBlock.subscribe((blockNumber) => {
if (blockNumber > toBlock) {
sub.unsubscribe();
fetchBlockRange(toBlock + 1n, maxBlockRange, blockNumber);
}
});
return;
}

stream.complete();
} catch (error: unknown) {
// TODO: do more specific error handling?
stream.error(error);
}
}

return stream.asObservable();
}
47 changes: 47 additions & 0 deletions packages/block-events-stream/src/createBlockNumberStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { BehaviorSubject } from "rxjs";
import type { Block, BlockNumber, BlockTag, PublicClient } from "viem";
import { createBlockStream } from "./createBlockStream";
import { ReadonlyBehaviorSubject } from "./common";

// TODO: pass through viem's types, e.g. WatchBlocksParameters -> GetBlockReturnType
// TODO: make stream closeable?

export type CreateBlockNumberStreamOptions =
| {
publicClient: PublicClient;
blockTag: Omit<BlockTag, "pending">;
block$?: never;
}
| {
publicClient?: never;
blockTag?: never;
block$: ReadonlyBehaviorSubject<Block>;
};

export async function createBlockNumberStream({
publicClient,
blockTag,
block$: initialBlock$,
}: CreateBlockNumberStreamOptions): Promise<ReadonlyBehaviorSubject<BlockNumber>> {
const block$ = initialBlock$ ?? (await createBlockStream({ publicClient, blockTag: blockTag as BlockTag }));
const block = block$.value;
if (!block.number) {
// TODO: better error
throw new Error(`${blockTag} block missing or pending`);
}

const blockNumber$ = new BehaviorSubject<BlockNumber>(block.number);
// TODO: do something with unwatch?
const unwatch = block$.subscribe({
next: (block) => {
if (block.number) {
blockNumber$.next(block.number);
}
// TODO: warn/error on blocks with missing block number?
},
error: blockNumber$.error,
complete: blockNumber$.complete,
});

return blockNumber$;
}
38 changes: 38 additions & 0 deletions packages/block-events-stream/src/createBlockStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { BehaviorSubject } from "rxjs";
import type { Block, BlockTag, PublicClient } from "viem";
import { ReadonlyBehaviorSubject } from "./common";

// TODO: pass through viem's types, e.g. WatchBlocksParameters -> GetBlockReturnType
// TODO: make stream closeable?

export type CreateBlockStreamOptions = {
publicClient: PublicClient;
blockTag: BlockTag;
};

export function createBlockStream({
publicClient,
blockTag,
}: CreateBlockStreamOptions): Promise<ReadonlyBehaviorSubject<Block>> {
return new Promise((resolve, reject) => {
let stream: BehaviorSubject<Block> | undefined;
// TODO: do something with unwatch?
const unwatch = publicClient.watchBlocks({
blockTag,
emitOnBegin: true,
onBlock: (block) => {
if (!stream) {
stream = new BehaviorSubject(block);
// TODO: return actual readonly behavior subject rather than just a type?
resolve(stream as ReadonlyBehaviorSubject<Block>);
} else {
stream.next(block);
}
},
onError: (error) => {
reject(error);
stream?.error(error);
},
});
});
}
3 changes: 3 additions & 0 deletions packages/block-events-stream/src/debug.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import createDebug from "debug";

export const debug = createDebug("mud:block-events-stream");
71 changes: 71 additions & 0 deletions packages/block-events-stream/src/getLogs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { AbiEvent } from "abitype";
import {
Address,
BlockNumber,
BlockTag,
Log,
PublicClient,
decodeEventLog,
encodeEventTopics,
numberToHex,
formatLog,
} from "viem";
import { isDefined } from "@latticexyz/common/utils";

// Based on https://github.com/wagmi-dev/viem/blob/main/src/actions/public/getLogs.ts
// TODO: swap this out once viem has support for multiple events: https://github.com/wagmi-dev/viem/pull/633

export type GetLogsOptions<TAbiEvents extends readonly AbiEvent[]> = {
publicClient: PublicClient;
address?: Address | Address[];
events: TAbiEvents;
fromBlock: BlockNumber | BlockTag;
toBlock: BlockNumber | BlockTag;
};

export type GetLogsReturnType<TAbiEvents extends readonly AbiEvent[]> = Log<
bigint,
number,
TAbiEvents[number],
true,
TAbiEvents
>;

export async function getLogs<TAbiEvents extends readonly AbiEvent[]>({
publicClient,
address,
events,
fromBlock,
toBlock,
}: GetLogsOptions<TAbiEvents>): Promise<GetLogsReturnType<TAbiEvents>[]> {
const topics = [events.flatMap((event) => encodeEventTopics({ abi: [event], eventName: event.name }))];

const logs = await publicClient.request({
method: "eth_getLogs",
params: [
{
address,
topics,
fromBlock: typeof fromBlock === "bigint" ? numberToHex(fromBlock) : fromBlock,
toBlock: typeof toBlock === "bigint" ? numberToHex(toBlock) : toBlock,
},
],
});

return logs
.map((log) => {
try {
const { eventName, args } = decodeEventLog({
abi: events,
data: log.data,
topics: log.topics,
strict: true,
});
return formatLog(log, { args, eventName });
} catch (err) {
// We're using strict mode, so just skip if there is an error decoding.
return;
}
})
.filter(isDefined) as GetLogsReturnType<TAbiEvents>[];
}
4 changes: 4 additions & 0 deletions packages/block-events-stream/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export * from "./common";
export * from "./createBlockEventsStream";
export * from "./createBlockNumberStream";
export * from "./createBlockStream";
12 changes: 12 additions & 0 deletions packages/block-events-stream/src/isNonPendingBlock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { Block } from "viem";

export type NonPendingBlock<TBlock extends Block> = TBlock & {
hash: NonNullable<TBlock["hash"]>;
logsBloom: NonNullable<TBlock["logsBloom"]>;
nonce: NonNullable<TBlock["nonce"]>;
number: NonNullable<TBlock["number"]>;
};

export function isNonPendingBlock<TBlock extends Block>(block: TBlock): block is NonPendingBlock<TBlock> {
return block.hash != null && block.logsBloom != null && block.nonce != null && block.number != null;
}
19 changes: 19 additions & 0 deletions packages/block-events-stream/src/isNonPendingLog.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type { Log } from "viem";

export type NonPendingLog<TLog extends Log> = TLog & {
blockHash: NonNullable<TLog["blockHash"]>;
blockNumber: NonNullable<TLog["blockNumber"]>;
logIndex: NonNullable<TLog["logIndex"]>;
transactionHash: NonNullable<TLog["transactionHash"]>;
transactionIndex: NonNullable<TLog["transactionIndex"]>;
};

export function isNonPendingLog<TLog extends Log>(log: TLog): log is NonPendingLog<TLog> {
return (
log.blockHash != null &&
log.blockNumber != null &&
log.logIndex != null &&
log.transactionHash != null &&
log.transactionIndex != null
);
}
7 changes: 7 additions & 0 deletions packages/block-events-stream/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export function bigIntMin(...args: bigint[]): bigint {
return args.reduce((m, e) => (e < m ? e : m));
}

export function bigIntMax(...args: bigint[]): bigint {
return args.reduce((m, e) => (e > m ? e : m));
}
14 changes: 14 additions & 0 deletions packages/block-events-stream/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"compilerOptions": {
"target": "es2021",
"module": "esnext",
"moduleResolution": "node",
"declaration": true,
"sourceMap": true,
"outDir": "dist",
"isolatedModules": true,
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true,
"strict": true
}
}
Loading

0 comments on commit 093a852

Please sign in to comment.