Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store-sync): rework blockLogsToStorage #1176

Merged
merged 10 commits into from
Jul 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/modern-bikes-build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@latticexyz/block-logs-stream": minor
"@latticexyz/store-sync": minor
---

- Replace `blockEventsToStorage` with `blockLogsToStorage` that exposes a `storeOperations` callback to perform database writes from store operations. This helps encapsulates database adapters into a single wrapper/instance of `blockLogsToStorage` and allows for wrapping a block of store operations in a database transaction.
- Add `toBlock` option to `groupLogsByBlockNumber` and remove `blockHash` from results. This helps track the last block number for a given set of logs when used in the context of RxJS streams.
2 changes: 1 addition & 1 deletion packages/block-logs-stream/src/blockRangeToLogs.test.ts
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ import { describe, it, expect, vi, beforeEach } from "vitest";
import { blockRangeToLogs } from "./blockRangeToLogs";
import { Subject, lastValueFrom, map, toArray } from "rxjs";
import { EIP1193RequestFn, RpcLog, Transport, createPublicClient, createTransport } from "viem";
import { wait } from "./utils";
import { wait } from "@latticexyz/common/utils";

// TODO: there is a chance that these tests will need to be written differently with timers to avoid flakiness

2 changes: 1 addition & 1 deletion packages/block-logs-stream/src/fetchLogs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AbiEvent, Address } from "abitype";
import { PublicClient, BlockNumber } from "viem";
import { bigIntMin, wait } from "@latticexyz/common/utils";
import { GetLogsResult, getLogs } from "./getLogs";
import { bigIntMin, wait } from "./utils";
import { debug } from "./debug";

export type FetchLogsOptions<TAbiEvents extends readonly AbiEvent[]> = {
65 changes: 62 additions & 3 deletions packages/block-logs-stream/src/groupLogsByBlockNumber.test.ts
Original file line number Diff line number Diff line change
@@ -52,7 +52,6 @@ describe("groupLogsByBlockNumber", () => {
expect(groupLogsByBlockNumber(logs)).toMatchInlineSnapshot(`
[
{
"blockHash": "0x",
"blockNumber": 1n,
"logs": [
{
@@ -79,7 +78,6 @@ describe("groupLogsByBlockNumber", () => {
],
},
{
"blockHash": "0x",
"blockNumber": 3n,
"logs": [
{
@@ -92,7 +90,6 @@ describe("groupLogsByBlockNumber", () => {
],
},
{
"blockHash": "0x",
"blockNumber": 5n,
"logs": [
{
@@ -107,4 +104,66 @@ describe("groupLogsByBlockNumber", () => {
]
`);
});

it("adds an entry for toBlock if block is not in logs", () => {
const logs = [
{
blockNumber: 1n,
blockHash: "0x",
logIndex: 4,
transactionHash: "0x",
transactionIndex: 0,
},
] as any as Log[];

expect(groupLogsByBlockNumber(logs, 2n)).toMatchInlineSnapshot(`
[
{
"blockNumber": 1n,
"logs": [
{
"blockHash": "0x",
"blockNumber": 1n,
"logIndex": 4,
"transactionHash": "0x",
"transactionIndex": 0,
},
],
},
{
"blockNumber": 2n,
"logs": [],
},
]
`);
});

it("does not add an entry for toBlock if block number is in logs", () => {
const logs = [
{
blockNumber: 2n,
blockHash: "0x",
logIndex: 4,
transactionHash: "0x",
transactionIndex: 0,
},
] as any as Log[];

expect(groupLogsByBlockNumber(logs, 2n)).toMatchInlineSnapshot(`
[
{
"blockNumber": 2n,
"logs": [
{
"blockHash": "0x",
"blockNumber": 2n,
"logIndex": 4,
"transactionHash": "0x",
"transactionIndex": 0,
},
],
},
]
`);
});
});
26 changes: 19 additions & 7 deletions packages/block-logs-stream/src/groupLogsByBlockNumber.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import { BlockNumber, Hex, Log } from "viem";
import { BlockNumber, Log } from "viem";
import { NonPendingLog, isNonPendingLog } from "./isNonPendingLog";
import { bigIntSort } from "./utils";
import { isDefined } from "@latticexyz/common/utils";
import { isDefined, bigIntSort } from "@latticexyz/common/utils";
import { debug } from "./debug";

export type GroupLogsByBlockNumberResult<TLog extends Log> = {
blockNumber: BlockNumber;
blockHash: Hex;
logs: readonly NonPendingLog<TLog>[];
}[];

@@ -19,11 +17,15 @@ export type GroupLogsByBlockNumberResult<TLog extends Log> = {
* Pending logs are filtered out before processing, as they don't have block numbers.
*
* @param logs The logs to group by block number.
* @param toBlock If specified, always include this block number at the end, even if there are no logs.
*
* @returns An array of objects where each object represents a distinct block and includes the block number,
* the block hash, and an array of logs for that block.
*/
export function groupLogsByBlockNumber<TLog extends Log>(logs: readonly TLog[]): GroupLogsByBlockNumberResult<TLog> {
export function groupLogsByBlockNumber<TLog extends Log>(
logs: readonly TLog[],
toBlock?: BlockNumber
): GroupLogsByBlockNumberResult<TLog> {
// Pending logs don't have block numbers, so filter them out.
const nonPendingLogs = logs.filter(isNonPendingLog);
if (logs.length !== nonPendingLogs.length) {
@@ -36,7 +38,7 @@ export function groupLogsByBlockNumber<TLog extends Log>(logs: readonly TLog[]):
const blockNumbers = Array.from(new Set(nonPendingLogs.map((log) => log.blockNumber)));
blockNumbers.sort(bigIntSort);

return blockNumbers
const groupedBlocks = blockNumbers
.map((blockNumber) => {
const blockLogs = nonPendingLogs.filter((log) => log.blockNumber === blockNumber);
if (!blockLogs.length) return;
@@ -46,9 +48,19 @@ export function groupLogsByBlockNumber<TLog extends Log>(logs: readonly TLog[]):

return {
blockNumber,
blockHash: blockLogs[0].blockHash,
logs: blockLogs,
};
})
.filter(isDefined);

const lastBlockNumber = blockNumbers.length > 0 ? blockNumbers[blockNumbers.length - 1] : null;

if (toBlock != null && (lastBlockNumber == null || toBlock > lastBlockNumber)) {
groupedBlocks.push({
blockNumber: toBlock,
logs: [],
});
}

return groupedBlocks;
}
2 changes: 2 additions & 0 deletions packages/block-logs-stream/src/isNonPendingBlock.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { Block } from "viem";

// TODO: get rid of this once https://github.com/wagmi-dev/viem/pull/847 lands

export type NonPendingBlock<TBlock extends Block> = TBlock & {
hash: NonNullable<TBlock["hash"]>;
logsBloom: NonNullable<TBlock["logsBloom"]>;
2 changes: 2 additions & 0 deletions packages/block-logs-stream/src/isNonPendingLog.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { Log } from "viem";

// TODO: get rid of this once https://github.com/wagmi-dev/viem/pull/847 lands

export type NonPendingLog<TLog extends Log> = TLog & {
blockHash: NonNullable<TLog["blockHash"]>;
blockNumber: NonNullable<TLog["blockNumber"]>;
17 changes: 0 additions & 17 deletions packages/block-logs-stream/src/utils.ts

This file was deleted.

153 changes: 0 additions & 153 deletions packages/store-sync/src/blockEventsToStorage.test.ts

This file was deleted.

Loading