Skip to content

Commit

Permalink
feat(store-sync): rework blockLogsToStorage (#1176)
Browse files Browse the repository at this point in the history
  • Loading branch information
holic authored Jul 19, 2023
1 parent 0c4f9fe commit eeb15cc
Show file tree
Hide file tree
Showing 13 changed files with 613 additions and 417 deletions.
7 changes: 7 additions & 0 deletions .changeset/modern-bikes-build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@latticexyz/block-logs-stream": minor
"@latticexyz/store-sync": minor
---

- Replace `blockEventsToStorage` with `blockLogsToStorage` that exposes a `storeOperations` callback to perform database writes from store operations. This helps encapsulates database adapters into a single wrapper/instance of `blockLogsToStorage` and allows for wrapping a block of store operations in a database transaction.
- Add `toBlock` option to `groupLogsByBlockNumber` and remove `blockHash` from results. This helps track the last block number for a given set of logs when used in the context of RxJS streams.
2 changes: 1 addition & 1 deletion packages/block-logs-stream/src/blockRangeToLogs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { describe, it, expect, vi, beforeEach } from "vitest";
import { blockRangeToLogs } from "./blockRangeToLogs";
import { Subject, lastValueFrom, map, toArray } from "rxjs";
import { EIP1193RequestFn, RpcLog, Transport, createPublicClient, createTransport } from "viem";
import { wait } from "./utils";
import { wait } from "@latticexyz/common/utils";

// TODO: there is a chance that these tests will need to be written differently with timers to avoid flakiness

Expand Down
2 changes: 1 addition & 1 deletion packages/block-logs-stream/src/fetchLogs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AbiEvent, Address } from "abitype";
import { PublicClient, BlockNumber } from "viem";
import { bigIntMin, wait } from "@latticexyz/common/utils";
import { GetLogsResult, getLogs } from "./getLogs";
import { bigIntMin, wait } from "./utils";
import { debug } from "./debug";

export type FetchLogsOptions<TAbiEvents extends readonly AbiEvent[]> = {
Expand Down
65 changes: 62 additions & 3 deletions packages/block-logs-stream/src/groupLogsByBlockNumber.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ describe("groupLogsByBlockNumber", () => {
expect(groupLogsByBlockNumber(logs)).toMatchInlineSnapshot(`
[
{
"blockHash": "0x",
"blockNumber": 1n,
"logs": [
{
Expand All @@ -79,7 +78,6 @@ describe("groupLogsByBlockNumber", () => {
],
},
{
"blockHash": "0x",
"blockNumber": 3n,
"logs": [
{
Expand All @@ -92,7 +90,6 @@ describe("groupLogsByBlockNumber", () => {
],
},
{
"blockHash": "0x",
"blockNumber": 5n,
"logs": [
{
Expand All @@ -107,4 +104,66 @@ describe("groupLogsByBlockNumber", () => {
]
`);
});

it("adds an entry for toBlock if block is not in logs", () => {
const logs = [
{
blockNumber: 1n,
blockHash: "0x",
logIndex: 4,
transactionHash: "0x",
transactionIndex: 0,
},
] as any as Log[];

expect(groupLogsByBlockNumber(logs, 2n)).toMatchInlineSnapshot(`
[
{
"blockNumber": 1n,
"logs": [
{
"blockHash": "0x",
"blockNumber": 1n,
"logIndex": 4,
"transactionHash": "0x",
"transactionIndex": 0,
},
],
},
{
"blockNumber": 2n,
"logs": [],
},
]
`);
});

it("does not add an entry for toBlock if block number is in logs", () => {
const logs = [
{
blockNumber: 2n,
blockHash: "0x",
logIndex: 4,
transactionHash: "0x",
transactionIndex: 0,
},
] as any as Log[];

expect(groupLogsByBlockNumber(logs, 2n)).toMatchInlineSnapshot(`
[
{
"blockNumber": 2n,
"logs": [
{
"blockHash": "0x",
"blockNumber": 2n,
"logIndex": 4,
"transactionHash": "0x",
"transactionIndex": 0,
},
],
},
]
`);
});
});
26 changes: 19 additions & 7 deletions packages/block-logs-stream/src/groupLogsByBlockNumber.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import { BlockNumber, Hex, Log } from "viem";
import { BlockNumber, Log } from "viem";
import { NonPendingLog, isNonPendingLog } from "./isNonPendingLog";
import { bigIntSort } from "./utils";
import { isDefined } from "@latticexyz/common/utils";
import { isDefined, bigIntSort } from "@latticexyz/common/utils";
import { debug } from "./debug";

export type GroupLogsByBlockNumberResult<TLog extends Log> = {
blockNumber: BlockNumber;
blockHash: Hex;
logs: readonly NonPendingLog<TLog>[];
}[];

Expand All @@ -19,11 +17,15 @@ export type GroupLogsByBlockNumberResult<TLog extends Log> = {
* Pending logs are filtered out before processing, as they don't have block numbers.
*
* @param logs The logs to group by block number.
* @param toBlock If specified, always include this block number at the end, even if there are no logs.
*
* @returns An array of objects where each object represents a distinct block and includes the block number,
* the block hash, and an array of logs for that block.
*/
export function groupLogsByBlockNumber<TLog extends Log>(logs: readonly TLog[]): GroupLogsByBlockNumberResult<TLog> {
export function groupLogsByBlockNumber<TLog extends Log>(
logs: readonly TLog[],
toBlock?: BlockNumber
): GroupLogsByBlockNumberResult<TLog> {
// Pending logs don't have block numbers, so filter them out.
const nonPendingLogs = logs.filter(isNonPendingLog);
if (logs.length !== nonPendingLogs.length) {
Expand All @@ -36,7 +38,7 @@ export function groupLogsByBlockNumber<TLog extends Log>(logs: readonly TLog[]):
const blockNumbers = Array.from(new Set(nonPendingLogs.map((log) => log.blockNumber)));
blockNumbers.sort(bigIntSort);

return blockNumbers
const groupedBlocks = blockNumbers
.map((blockNumber) => {
const blockLogs = nonPendingLogs.filter((log) => log.blockNumber === blockNumber);
if (!blockLogs.length) return;
Expand All @@ -46,9 +48,19 @@ export function groupLogsByBlockNumber<TLog extends Log>(logs: readonly TLog[]):

return {
blockNumber,
blockHash: blockLogs[0].blockHash,
logs: blockLogs,
};
})
.filter(isDefined);

const lastBlockNumber = blockNumbers.length > 0 ? blockNumbers[blockNumbers.length - 1] : null;

if (toBlock != null && (lastBlockNumber == null || toBlock > lastBlockNumber)) {
groupedBlocks.push({
blockNumber: toBlock,
logs: [],
});
}

return groupedBlocks;
}
2 changes: 2 additions & 0 deletions packages/block-logs-stream/src/isNonPendingBlock.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { Block } from "viem";

// TODO: get rid of this once https://github.com/wagmi-dev/viem/pull/847 lands

export type NonPendingBlock<TBlock extends Block> = TBlock & {
hash: NonNullable<TBlock["hash"]>;
logsBloom: NonNullable<TBlock["logsBloom"]>;
Expand Down
2 changes: 2 additions & 0 deletions packages/block-logs-stream/src/isNonPendingLog.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { Log } from "viem";

// TODO: get rid of this once https://github.com/wagmi-dev/viem/pull/847 lands

export type NonPendingLog<TLog extends Log> = TLog & {
blockHash: NonNullable<TLog["blockHash"]>;
blockNumber: NonNullable<TLog["blockNumber"]>;
Expand Down
17 changes: 0 additions & 17 deletions packages/block-logs-stream/src/utils.ts

This file was deleted.

153 changes: 0 additions & 153 deletions packages/store-sync/src/blockEventsToStorage.test.ts

This file was deleted.

Loading

0 comments on commit eeb15cc

Please sign in to comment.