From eeb15cc06fcbe80c37ba3926d9387f6bd5947234 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Wed, 19 Jul 2023 17:20:07 +0100 Subject: [PATCH] feat(store-sync): rework blockLogsToStorage (#1176) --- .changeset/modern-bikes-build.md | 7 + .../src/blockRangeToLogs.test.ts | 2 +- packages/block-logs-stream/src/fetchLogs.ts | 2 +- .../src/groupLogsByBlockNumber.test.ts | 65 +++- .../src/groupLogsByBlockNumber.ts | 26 +- .../src/isNonPendingBlock.ts | 2 + .../block-logs-stream/src/isNonPendingLog.ts | 2 + packages/block-logs-stream/src/utils.ts | 17 - .../src/blockEventsToStorage.test.ts | 153 --------- .../store-sync/src/blockEventsToStorage.ts | 234 -------------- .../store-sync/src/blockLogsToStorage.test.ts | 214 ++++++++++++ packages/store-sync/src/blockLogsToStorage.ts | 304 ++++++++++++++++++ packages/store-sync/src/index.ts | 2 +- 13 files changed, 613 insertions(+), 417 deletions(-) create mode 100644 .changeset/modern-bikes-build.md delete mode 100644 packages/block-logs-stream/src/utils.ts delete mode 100644 packages/store-sync/src/blockEventsToStorage.test.ts delete mode 100644 packages/store-sync/src/blockEventsToStorage.ts create mode 100644 packages/store-sync/src/blockLogsToStorage.test.ts create mode 100644 packages/store-sync/src/blockLogsToStorage.ts diff --git a/.changeset/modern-bikes-build.md b/.changeset/modern-bikes-build.md new file mode 100644 index 0000000000..62245c397a --- /dev/null +++ b/.changeset/modern-bikes-build.md @@ -0,0 +1,7 @@ +--- +"@latticexyz/block-logs-stream": minor +"@latticexyz/store-sync": minor +--- + +- Replace `blockEventsToStorage` with `blockLogsToStorage` that exposes a `storeOperations` callback to perform database writes from store operations. This helps encapsulates database adapters into a single wrapper/instance of `blockLogsToStorage` and allows for wrapping a block of store operations in a database transaction. +- Add `toBlock` option to `groupLogsByBlockNumber` and remove `blockHash` from results. This helps track the last block number for a given set of logs when used in the context of RxJS streams. diff --git a/packages/block-logs-stream/src/blockRangeToLogs.test.ts b/packages/block-logs-stream/src/blockRangeToLogs.test.ts index eb3c8e04fb..409ab125dd 100644 --- a/packages/block-logs-stream/src/blockRangeToLogs.test.ts +++ b/packages/block-logs-stream/src/blockRangeToLogs.test.ts @@ -2,7 +2,7 @@ import { describe, it, expect, vi, beforeEach } from "vitest"; import { blockRangeToLogs } from "./blockRangeToLogs"; import { Subject, lastValueFrom, map, toArray } from "rxjs"; import { EIP1193RequestFn, RpcLog, Transport, createPublicClient, createTransport } from "viem"; -import { wait } from "./utils"; +import { wait } from "@latticexyz/common/utils"; // TODO: there is a chance that these tests will need to be written differently with timers to avoid flakiness diff --git a/packages/block-logs-stream/src/fetchLogs.ts b/packages/block-logs-stream/src/fetchLogs.ts index 1031c856e8..2e1e60440e 100644 --- a/packages/block-logs-stream/src/fetchLogs.ts +++ b/packages/block-logs-stream/src/fetchLogs.ts @@ -1,7 +1,7 @@ import { AbiEvent, Address } from "abitype"; import { PublicClient, BlockNumber } from "viem"; +import { bigIntMin, wait } from "@latticexyz/common/utils"; import { GetLogsResult, getLogs } from "./getLogs"; -import { bigIntMin, wait } from "./utils"; import { debug } from "./debug"; export type FetchLogsOptions = { diff --git a/packages/block-logs-stream/src/groupLogsByBlockNumber.test.ts b/packages/block-logs-stream/src/groupLogsByBlockNumber.test.ts index 5c8633a832..0e3fd796f9 100644 --- a/packages/block-logs-stream/src/groupLogsByBlockNumber.test.ts +++ b/packages/block-logs-stream/src/groupLogsByBlockNumber.test.ts @@ -52,7 +52,6 @@ describe("groupLogsByBlockNumber", () => { expect(groupLogsByBlockNumber(logs)).toMatchInlineSnapshot(` [ { - "blockHash": "0x", "blockNumber": 1n, "logs": [ { @@ -79,7 +78,6 @@ describe("groupLogsByBlockNumber", () => { ], }, { - "blockHash": "0x", "blockNumber": 3n, "logs": [ { @@ -92,7 +90,6 @@ describe("groupLogsByBlockNumber", () => { ], }, { - "blockHash": "0x", "blockNumber": 5n, "logs": [ { @@ -107,4 +104,66 @@ describe("groupLogsByBlockNumber", () => { ] `); }); + + it("adds an entry for toBlock if block is not in logs", () => { + const logs = [ + { + blockNumber: 1n, + blockHash: "0x", + logIndex: 4, + transactionHash: "0x", + transactionIndex: 0, + }, + ] as any as Log[]; + + expect(groupLogsByBlockNumber(logs, 2n)).toMatchInlineSnapshot(` + [ + { + "blockNumber": 1n, + "logs": [ + { + "blockHash": "0x", + "blockNumber": 1n, + "logIndex": 4, + "transactionHash": "0x", + "transactionIndex": 0, + }, + ], + }, + { + "blockNumber": 2n, + "logs": [], + }, + ] + `); + }); + + it("does not add an entry for toBlock if block number is in logs", () => { + const logs = [ + { + blockNumber: 2n, + blockHash: "0x", + logIndex: 4, + transactionHash: "0x", + transactionIndex: 0, + }, + ] as any as Log[]; + + expect(groupLogsByBlockNumber(logs, 2n)).toMatchInlineSnapshot(` + [ + { + "blockNumber": 2n, + "logs": [ + { + "blockHash": "0x", + "blockNumber": 2n, + "logIndex": 4, + "transactionHash": "0x", + "transactionIndex": 0, + }, + ], + }, + ] + `); + }); }); diff --git a/packages/block-logs-stream/src/groupLogsByBlockNumber.ts b/packages/block-logs-stream/src/groupLogsByBlockNumber.ts index 93a711a1c4..f999dc88c9 100644 --- a/packages/block-logs-stream/src/groupLogsByBlockNumber.ts +++ b/packages/block-logs-stream/src/groupLogsByBlockNumber.ts @@ -1,12 +1,10 @@ -import { BlockNumber, Hex, Log } from "viem"; +import { BlockNumber, Log } from "viem"; import { NonPendingLog, isNonPendingLog } from "./isNonPendingLog"; -import { bigIntSort } from "./utils"; -import { isDefined } from "@latticexyz/common/utils"; +import { isDefined, bigIntSort } from "@latticexyz/common/utils"; import { debug } from "./debug"; export type GroupLogsByBlockNumberResult = { blockNumber: BlockNumber; - blockHash: Hex; logs: readonly NonPendingLog[]; }[]; @@ -19,11 +17,15 @@ export type GroupLogsByBlockNumberResult = { * Pending logs are filtered out before processing, as they don't have block numbers. * * @param logs The logs to group by block number. + * @param toBlock If specified, always include this block number at the end, even if there are no logs. * * @returns An array of objects where each object represents a distinct block and includes the block number, * the block hash, and an array of logs for that block. */ -export function groupLogsByBlockNumber(logs: readonly TLog[]): GroupLogsByBlockNumberResult { +export function groupLogsByBlockNumber( + logs: readonly TLog[], + toBlock?: BlockNumber +): GroupLogsByBlockNumberResult { // Pending logs don't have block numbers, so filter them out. const nonPendingLogs = logs.filter(isNonPendingLog); if (logs.length !== nonPendingLogs.length) { @@ -36,7 +38,7 @@ export function groupLogsByBlockNumber(logs: readonly TLog[]): const blockNumbers = Array.from(new Set(nonPendingLogs.map((log) => log.blockNumber))); blockNumbers.sort(bigIntSort); - return blockNumbers + const groupedBlocks = blockNumbers .map((blockNumber) => { const blockLogs = nonPendingLogs.filter((log) => log.blockNumber === blockNumber); if (!blockLogs.length) return; @@ -46,9 +48,19 @@ export function groupLogsByBlockNumber(logs: readonly TLog[]): return { blockNumber, - blockHash: blockLogs[0].blockHash, logs: blockLogs, }; }) .filter(isDefined); + + const lastBlockNumber = blockNumbers.length > 0 ? blockNumbers[blockNumbers.length - 1] : null; + + if (toBlock != null && (lastBlockNumber == null || toBlock > lastBlockNumber)) { + groupedBlocks.push({ + blockNumber: toBlock, + logs: [], + }); + } + + return groupedBlocks; } diff --git a/packages/block-logs-stream/src/isNonPendingBlock.ts b/packages/block-logs-stream/src/isNonPendingBlock.ts index 5545fc23a5..cbf39086ea 100644 --- a/packages/block-logs-stream/src/isNonPendingBlock.ts +++ b/packages/block-logs-stream/src/isNonPendingBlock.ts @@ -1,5 +1,7 @@ import type { Block } from "viem"; +// TODO: get rid of this once https://github.com/wagmi-dev/viem/pull/847 lands + export type NonPendingBlock = TBlock & { hash: NonNullable; logsBloom: NonNullable; diff --git a/packages/block-logs-stream/src/isNonPendingLog.ts b/packages/block-logs-stream/src/isNonPendingLog.ts index 6110d18171..c93033f74e 100644 --- a/packages/block-logs-stream/src/isNonPendingLog.ts +++ b/packages/block-logs-stream/src/isNonPendingLog.ts @@ -1,5 +1,7 @@ import type { Log } from "viem"; +// TODO: get rid of this once https://github.com/wagmi-dev/viem/pull/847 lands + export type NonPendingLog = TLog & { blockHash: NonNullable; blockNumber: NonNullable; diff --git a/packages/block-logs-stream/src/utils.ts b/packages/block-logs-stream/src/utils.ts deleted file mode 100644 index b365dd9dd1..0000000000 --- a/packages/block-logs-stream/src/utils.ts +++ /dev/null @@ -1,17 +0,0 @@ -// javascript, y u no support bigints better? - -export function bigIntMin(...args: bigint[]): bigint { - return args.reduce((m, e) => (e < m ? e : m)); -} - -export function bigIntMax(...args: bigint[]): bigint { - return args.reduce((m, e) => (e > m ? e : m)); -} - -export function bigIntSort(a: bigint, b: bigint): -1 | 0 | 1 { - return a < b ? -1 : a > b ? 1 : 0; -} - -export function wait(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); -} diff --git a/packages/store-sync/src/blockEventsToStorage.test.ts b/packages/store-sync/src/blockEventsToStorage.test.ts deleted file mode 100644 index 486834f3d1..0000000000 --- a/packages/store-sync/src/blockEventsToStorage.test.ts +++ /dev/null @@ -1,153 +0,0 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; -import { BlockEventsToStorageOptions, blockEventsToStorage } from "./blockEventsToStorage"; -import storeConfig from "@latticexyz/store/mud.config"; - -const mockedCallbacks = { - registerTableSchema: vi.fn< - Parameters, - ReturnType - >(), - registerTableMetadata: vi.fn< - Parameters, - ReturnType - >(), - getTableSchema: vi.fn< - Parameters, - ReturnType - >(), - getTableMetadata: vi.fn< - Parameters, - ReturnType - >(), -}; - -const mockedDecode = blockEventsToStorage(mockedCallbacks as any as BlockEventsToStorageOptions); - -describe("blockEventsToStorage", () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - it("call setField with data properly decoded", async () => { - mockedCallbacks.getTableSchema.mockImplementation(async ({ namespace, name }) => { - if (namespace === "mudstore" && name === "StoreMetadata") { - return { - namespace: "mudstore", - name: "StoreMetadata", - schema: { - keySchema: { - staticFields: ["bytes32"], - dynamicFields: [], - }, - valueSchema: { - staticFields: [], - dynamicFields: ["string", "bytes"], - }, - }, - }; - } - - if (namespace === "" && name === "Inventory") { - return { - namespace: "", - name: "Inventory", - schema: { - keySchema: { - staticFields: ["address", "uint32", "uint32"], - dynamicFields: [], - }, - valueSchema: { - staticFields: ["uint32"], - dynamicFields: [], - }, - }, - }; - } - }); - - mockedCallbacks.getTableMetadata.mockImplementation(async ({ namespace, name }) => { - if (namespace === "" && name === "Inventory") { - return { - namespace: "", - name: "Inventory", - keyNames: ["owner", "item", "itemVariant"], - valueNames: ["amount"], - }; - } - }); - - const operations = await mockedDecode({ - blockNumber: 5448n, - blockHash: "0x03e962e7402b2ab295b92feac342a132111dd14b0d1fd4d4a0456fdc77981577", - logs: [ - { - address: "0x5fbdb2315678afecb367f032d93f642f64180aa3", - topics: ["0xd01f9f1368f831528fc9fe6442366b2b7d957fbfff3bcf7c24d9ab5fe51f8c46"], - data: "0x00000000000000000000000000000000496e76656e746f7279000000000000000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000003000000000000000000000000796eb990a3f9c431c69149c7a168b91596d87f600000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000040000000800000000000000000000000000000000000000000000000000000000", - blockHash: "0x03e962e7402b2ab295b92feac342a132111dd14b0d1fd4d4a0456fdc77981577", - blockNumber: 5448n, - transactionHash: "0xa6986924609542dc4c2d81c53799d8eab47109ef34ee1e422de595e19ee9bfa4", - transactionIndex: 0, - logIndex: 0, - removed: false, - args: { - table: "0x00000000000000000000000000000000496e76656e746f727900000000000000", - key: [ - "0x000000000000000000000000796eb990a3f9c431c69149c7a168b91596d87f60", - "0x0000000000000000000000000000000000000000000000000000000000000001", - "0x0000000000000000000000000000000000000000000000000000000000000001", - ], - schemaIndex: 0, - data: "0x00000008", - }, - eventName: "StoreSetField", - }, - ], - }); - - expect(operations).toMatchInlineSnapshot(` - { - "blockHash": "0x03e962e7402b2ab295b92feac342a132111dd14b0d1fd4d4a0456fdc77981577", - "blockNumber": 5448n, - "operations": [ - { - "keyTuple": { - "item": 1, - "itemVariant": 1, - "owner": "0x796eb990A3F9C431C69149c7a168b91596D87F60", - }, - "log": { - "address": "0x5fbdb2315678afecb367f032d93f642f64180aa3", - "args": { - "data": "0x00000008", - "key": [ - "0x000000000000000000000000796eb990a3f9c431c69149c7a168b91596d87f60", - "0x0000000000000000000000000000000000000000000000000000000000000001", - "0x0000000000000000000000000000000000000000000000000000000000000001", - ], - "schemaIndex": 0, - "table": "0x00000000000000000000000000000000496e76656e746f727900000000000000", - }, - "blockHash": "0x03e962e7402b2ab295b92feac342a132111dd14b0d1fd4d4a0456fdc77981577", - "blockNumber": 5448n, - "data": "0x00000000000000000000000000000000496e76656e746f7279000000000000000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000003000000000000000000000000796eb990a3f9c431c69149c7a168b91596d87f600000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000040000000800000000000000000000000000000000000000000000000000000000", - "eventName": "StoreSetField", - "logIndex": 0, - "removed": false, - "topics": [ - "0xd01f9f1368f831528fc9fe6442366b2b7d957fbfff3bcf7c24d9ab5fe51f8c46", - ], - "transactionHash": "0xa6986924609542dc4c2d81c53799d8eab47109ef34ee1e422de595e19ee9bfa4", - "transactionIndex": 0, - }, - "name": "Inventory", - "namespace": "", - "type": "SetField", - "value": 8, - "valueName": "amount", - }, - ], - } - `); - }); -}); diff --git a/packages/store-sync/src/blockEventsToStorage.ts b/packages/store-sync/src/blockEventsToStorage.ts deleted file mode 100644 index 23059b2cbd..0000000000 --- a/packages/store-sync/src/blockEventsToStorage.ts +++ /dev/null @@ -1,234 +0,0 @@ -import { - TableSchema, - decodeField, - decodeKeyTuple, - decodeRecord, - hexToTableSchema, - schemaIndexToAbiType, -} from "@latticexyz/protocol-parser"; -import { GroupLogsByBlockNumberResult, GetLogsResult } from "@latticexyz/block-logs-stream"; -import { StoreEventsAbi, StoreConfig } from "@latticexyz/store"; -import { TableId } from "@latticexyz/common"; -import { Hex, decodeAbiParameters, parseAbiParameters } from "viem"; -import { debug } from "./debug"; -// TODO: move these type helpers into store? -import { Key, Value } from "@latticexyz/store-cache"; -import { isDefined } from "@latticexyz/common/utils"; - -// TODO: change table schema/metadata APIs once we get both schema and field names in the same event - -// TODO: export these from store or world -export const schemaTableId = new TableId("mudstore", "schema"); -export const metadataTableId = new TableId("mudstore", "StoreMetadata"); - -// I don't love carrying all these types through. Ideally this should be the shape of the thing we want, rather than the specific return type from a function. -export type StoreEventsLog = GetLogsResult[number]; -export type BlockEvents = GroupLogsByBlockNumberResult[number]; - -export type StoredTableSchema = { - namespace: string; - name: string; - schema: TableSchema; -}; - -export type StoredTableMetadata = { - namespace: string; - name: string; - keyNames: readonly string[]; - valueNames: readonly string[]; -}; - -export type BaseStorageOperation = { - log: StoreEventsLog; - namespace: string; -}; - -export type SetRecordOperation = BaseStorageOperation & { - type: "SetRecord"; -} & { - [TTable in keyof TConfig["tables"]]: { - name: TTable; - keyTuple: Key; - record: Value; - }; - }[keyof TConfig["tables"]]; - -export type SetFieldOperation = BaseStorageOperation & { - type: "SetField"; -} & { - [TTable in keyof TConfig["tables"]]: { - name: TTable; - keyTuple: Key; - } & { - [TValue in keyof Value]: { - // TODO: standardize on calling these "fields" or "values" or maybe "columns" - valueName: TValue; - value: Value[TValue]; - }; - }[keyof Value]; - }[keyof TConfig["tables"]]; - -export type DeleteRecordOperation = BaseStorageOperation & { - type: "DeleteRecord"; -} & { - [TTable in keyof TConfig["tables"]]: { - name: TTable; - keyTuple: Key; - }; - }[keyof TConfig["tables"]]; - -export type StorageOperation = - | SetFieldOperation - | SetRecordOperation - | DeleteRecordOperation; - -export type BlockEventsToStorageOptions = { - registerTableSchema: (data: StoredTableSchema) => Promise; - registerTableMetadata: (data: StoredTableMetadata) => Promise; - getTableSchema: (opts: Pick) => Promise; - getTableMetadata: (opts: Pick) => Promise; -}; - -export function blockEventsToStorage({ - registerTableMetadata, - registerTableSchema, - getTableMetadata, - getTableSchema, -}: BlockEventsToStorageOptions): (block: BlockEvents) => Promise<{ - blockNumber: BlockEvents["blockNumber"]; - blockHash: BlockEvents["blockHash"]; - operations: StorageOperation[]; -}> { - return async (block) => { - // Find and register all new table schemas - // Store schemas are immutable, so we can parallelize this - await Promise.all( - block.logs.map(async (log) => { - if (log.eventName !== "StoreSetRecord") return; - if (log.args.table !== schemaTableId.toHex()) return; - - const [tableForSchema, ...otherKeys] = log.args.key; - if (otherKeys.length) { - debug("registerSchema event is expected to have only one key in key tuple, but got multiple", log); - } - - const tableId = TableId.fromHex(tableForSchema); - const schema = hexToTableSchema(log.args.data); - - await registerTableSchema({ ...tableId, schema }); - }) - ); - - const metadataTableSchema = await getTableSchema(metadataTableId); - if (!metadataTableSchema) { - // TODO: better error - throw new Error("metadata table schema was not registered"); - } - - // Find and register all new table metadata - // Table metadata is technically mutable, but all of our code assumes its immutable, so we'll continue that trend - // TODO: rework contracts so schemas+tables are combined and immutable - await Promise.all( - block.logs.map(async (log) => { - if (log.eventName !== "StoreSetRecord") return; - if (log.args.table !== metadataTableId.toHex()) return; - - const [tableForSchema, ...otherKeys] = log.args.key; - if (otherKeys.length) { - debug("setMetadata event is expected to have only one key in key tuple, but got multiple", log); - } - - const tableId = TableId.fromHex(tableForSchema); - const [tableName, abiEncodedFieldNames] = decodeRecord(metadataTableSchema.schema.valueSchema, log.args.data); - const valueNames = decodeAbiParameters(parseAbiParameters("string[]"), abiEncodedFieldNames as Hex)[0]; - - // TODO: add key names to table registration when we refactor it - await registerTableMetadata({ ...tableId, keyNames: [], valueNames }); - }) - ); - - const tables = Array.from(new Set(block.logs.map((log) => log.args.table))).map((tableHex) => - TableId.fromHex(tableHex) - ); - // TODO: combine these once we refactor table registration - const tableSchemas = Object.fromEntries( - await Promise.all(tables.map(async (table) => [table.toHex(), await getTableSchema(table)])) - ) as Record; - const tableMetadatas = Object.fromEntries( - await Promise.all(tables.map(async (table) => [table.toHex(), await getTableMetadata(table)])) - ) as Record; - - const operations = block.logs - .map((log): StorageOperation | undefined => { - const tableId = TableId.fromHex(log.args.table); - const tableSchema = tableSchemas[log.args.table]; - const tableMetadata = tableMetadatas[log.args.table]; - if (!tableSchema) { - debug("no table schema found for event, skipping", tableId.toString(), log); - return; - } - if (!tableMetadata) { - debug("no table metadata found for event, skipping", tableId.toString(), log); - return; - } - - const keyTupleValues = decodeKeyTuple(tableSchema.schema.keySchema, log.args.key); - const keyTuple = Object.fromEntries( - keyTupleValues.map((value, i) => [tableMetadata.keyNames[i] ?? i, value]) - ) as Key; - - if (log.eventName === "StoreSetRecord" || log.eventName === "StoreEphemeralRecord") { - const values = decodeRecord(tableSchema.schema.valueSchema, log.args.data); - const record = Object.fromEntries(tableMetadata.valueNames.map((name, i) => [name, values[i]])) as Value< - TConfig, - keyof TConfig["tables"] - >; - // TODO: decide if we should handle ephemeral records separately? - // they'll eventually be turned into "events", but unclear if that should translate to client storage operations - return { - log, - type: "SetRecord", - ...tableId, - keyTuple, - record, - }; - } - - if (log.eventName === "StoreSetField") { - const valueName = tableMetadata.valueNames[log.args.schemaIndex] as string & - keyof Value; - const value = decodeField( - schemaIndexToAbiType(tableSchema.schema.valueSchema, log.args.schemaIndex), - log.args.data - ) as Value[typeof valueName]; - return { - log, - type: "SetField", - ...tableId, - keyTuple, - valueName, - value, - }; - } - - if (log.eventName === "StoreDeleteRecord") { - return { - log, - type: "DeleteRecord", - ...tableId, - keyTuple, - }; - } - - debug("unknown store event or log, skipping", log); - return; - }) - .filter(isDefined); - - return { - blockNumber: block.blockNumber, - blockHash: block.blockHash, - operations, - }; - }; -} diff --git a/packages/store-sync/src/blockLogsToStorage.test.ts b/packages/store-sync/src/blockLogsToStorage.test.ts new file mode 100644 index 0000000000..e0c0baad43 --- /dev/null +++ b/packages/store-sync/src/blockLogsToStorage.test.ts @@ -0,0 +1,214 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { BlockLogsToStorageOptions, blockLogsToStorage } from "./blockLogsToStorage"; +import storeConfig from "@latticexyz/store/mud.config"; +import { isDefined } from "@latticexyz/common/utils"; +import { Hex } from "viem"; + +const mockedCallbacks = { + registerTables: vi.fn< + Parameters, + ReturnType + >(), + getTables: vi.fn< + Parameters, + ReturnType + >(), + storeOperations: vi.fn< + Parameters, + ReturnType + >(), +}; + +const mockedDecode = blockLogsToStorage( + mockedCallbacks as any as BlockLogsToStorageOptions +); + +describe("blockLogsToStorage", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("call setField with data properly decoded", async () => { + mockedCallbacks.getTables.mockImplementation(async ({ tables }) => { + return tables + .map((table) => { + if (table.namespace === "" && table.name === "Inventory") { + return { + ...table, + keySchema: { + owner: "address", + item: "uint32", + itemVariant: "uint32", + } as const, + valueSchema: { + amount: "uint32", + } as const, + }; + } + }) + .filter(isDefined); + }); + + const operations = await mockedDecode({ + blockNumber: 5448n, + logs: [ + { + address: "0x5fbdb2315678afecb367f032d93f642f64180aa3", + topics: ["0x912af873e852235aae78a1d25ae9bb28b616a67c36898c53a14fd8184504ee32"], + data: "0x6d756473746f72650000000000000000736368656d6100000000000000000000000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000496e76656e746f72790000000000000000000000000000000000000000000000000000000000000000000000000000400004010003000000000000000000000000000000000000000000000000000000002001005f000000000000000000000000000000000000000000000000000000", + blockNumber: 5448n, + transactionHash: "0x2766c01dd2290a80e2b54c27e95ca303d7d362643a68bd71c7d8fdb620f2a3a6", + transactionIndex: 18, + blockHash: "0xc65212ced76e80c3d59fd210fca434d9ceebfc25b544989d5eaecec3d31f9ac9", + logIndex: 18, + removed: false, + args: { + table: "0x6d756473746f72650000000000000000736368656d6100000000000000000000", + key: ["0x00000000000000000000000000000000496e76656e746f727900000000000000"], + data: "0x0004010003000000000000000000000000000000000000000000000000000000002001005f000000000000000000000000000000000000000000000000000000", + }, + eventName: "StoreSetRecord", + }, + { + address: "0x5fbdb2315678afecb367f032d93f642f64180aa3", + topics: ["0x912af873e852235aae78a1d25ae9bb28b616a67c36898c53a14fd8184504ee32"], + data: "0x6d756473746f7265000000000000000053746f72654d65746164617461000000000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000496e76656e746f72790000000000000000000000000000000000000000000000000000000000000000000000000000c9000000000000a9000000000900000000a0000000000000000000000000000000496e76656e746f7279000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000576616c75650000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + blockNumber: 5448n, + transactionHash: "0x80d6650fdd6656461e6639988d7baa8d6d228297df505d8bbd0a4efc273b382b", + transactionIndex: 44, + blockHash: "0x930656a2399ed2473449118a030cf9a3b3f770db4f74e9b565e2e0035c49bc6e", + logIndex: 44, + removed: false, + args: { + table: "0x6d756473746f7265000000000000000053746f72654d65746164617461000000", + key: ["0x00000000000000000000000000000000496e76656e746f727900000000000000"], + data: "0x000000000000a9000000000900000000a0000000000000000000000000000000496e76656e746f7279000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000576616c7565000000000000000000000000000000000000000000000000000000", + }, + eventName: "StoreSetRecord", + }, + { + address: "0x5fbdb2315678afecb367f032d93f642f64180aa3", + topics: ["0xd01f9f1368f831528fc9fe6442366b2b7d957fbfff3bcf7c24d9ab5fe51f8c46"], + data: "0x00000000000000000000000000000000496e76656e746f7279000000000000000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000003000000000000000000000000796eb990a3f9c431c69149c7a168b91596d87f600000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000040000000800000000000000000000000000000000000000000000000000000000", + blockHash: "0x03e962e7402b2ab295b92feac342a132111dd14b0d1fd4d4a0456fdc77981577", + blockNumber: 5448n, + transactionHash: "0xa6986924609542dc4c2d81c53799d8eab47109ef34ee1e422de595e19ee9bfa4", + transactionIndex: 88, + logIndex: 88, + removed: false, + args: { + table: "0x00000000000000000000000000000000496e76656e746f727900000000000000", + key: [ + "0x000000000000000000000000796eb990a3f9c431c69149c7a168b91596d87f60", + "0x0000000000000000000000000000000000000000000000000000000000000001", + "0x0000000000000000000000000000000000000000000000000000000000000001", + ], + schemaIndex: 0, + data: "0x00000008", + }, + eventName: "StoreSetField", + }, + ], + }); + + expect(mockedCallbacks.storeOperations).toMatchInlineSnapshot(` + [MockFunction spy] { + "calls": [ + [ + { + "blockNumber": 5448n, + "operations": [ + { + "fieldName": "amount", + "fieldValue": 8, + "key": { + "item": 1, + "itemVariant": 1, + "owner": "0x796eb990A3F9C431C69149c7a168b91596D87F60", + }, + "log": { + "address": "0x5fbdb2315678afecb367f032d93f642f64180aa3", + "args": { + "data": "0x00000008", + "key": [ + "0x000000000000000000000000796eb990a3f9c431c69149c7a168b91596d87f60", + "0x0000000000000000000000000000000000000000000000000000000000000001", + "0x0000000000000000000000000000000000000000000000000000000000000001", + ], + "schemaIndex": 0, + "table": "0x00000000000000000000000000000000496e76656e746f727900000000000000", + }, + "blockHash": "0x03e962e7402b2ab295b92feac342a132111dd14b0d1fd4d4a0456fdc77981577", + "blockNumber": 5448n, + "data": "0x00000000000000000000000000000000496e76656e746f7279000000000000000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000003000000000000000000000000796eb990a3f9c431c69149c7a168b91596d87f600000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000040000000800000000000000000000000000000000000000000000000000000000", + "eventName": "StoreSetField", + "logIndex": 88, + "removed": false, + "topics": [ + "0xd01f9f1368f831528fc9fe6442366b2b7d957fbfff3bcf7c24d9ab5fe51f8c46", + ], + "transactionHash": "0xa6986924609542dc4c2d81c53799d8eab47109ef34ee1e422de595e19ee9bfa4", + "transactionIndex": 88, + }, + "name": "Inventory", + "namespace": "", + "type": "SetField", + }, + ], + }, + ], + ], + "results": [ + { + "type": "return", + "value": undefined, + }, + ], + } + `); + + expect(operations).toMatchInlineSnapshot(` + { + "blockNumber": 5448n, + "operations": [ + { + "fieldName": "amount", + "fieldValue": 8, + "key": { + "item": 1, + "itemVariant": 1, + "owner": "0x796eb990A3F9C431C69149c7a168b91596D87F60", + }, + "log": { + "address": "0x5fbdb2315678afecb367f032d93f642f64180aa3", + "args": { + "data": "0x00000008", + "key": [ + "0x000000000000000000000000796eb990a3f9c431c69149c7a168b91596d87f60", + "0x0000000000000000000000000000000000000000000000000000000000000001", + "0x0000000000000000000000000000000000000000000000000000000000000001", + ], + "schemaIndex": 0, + "table": "0x00000000000000000000000000000000496e76656e746f727900000000000000", + }, + "blockHash": "0x03e962e7402b2ab295b92feac342a132111dd14b0d1fd4d4a0456fdc77981577", + "blockNumber": 5448n, + "data": "0x00000000000000000000000000000000496e76656e746f7279000000000000000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000003000000000000000000000000796eb990a3f9c431c69149c7a168b91596d87f600000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000040000000800000000000000000000000000000000000000000000000000000000", + "eventName": "StoreSetField", + "logIndex": 88, + "removed": false, + "topics": [ + "0xd01f9f1368f831528fc9fe6442366b2b7d957fbfff3bcf7c24d9ab5fe51f8c46", + ], + "transactionHash": "0xa6986924609542dc4c2d81c53799d8eab47109ef34ee1e422de595e19ee9bfa4", + "transactionIndex": 88, + }, + "name": "Inventory", + "namespace": "", + "type": "SetField", + }, + ], + } + `); + }); +}); diff --git a/packages/store-sync/src/blockLogsToStorage.ts b/packages/store-sync/src/blockLogsToStorage.ts new file mode 100644 index 0000000000..57471df2ea --- /dev/null +++ b/packages/store-sync/src/blockLogsToStorage.ts @@ -0,0 +1,304 @@ +import { + decodeField, + decodeKeyTuple, + decodeRecord, + hexToTableSchema, + abiTypesToSchema, + TableSchema, +} from "@latticexyz/protocol-parser"; +import { GroupLogsByBlockNumberResult, GetLogsResult } from "@latticexyz/block-logs-stream"; +import { StoreEventsAbi, StoreConfig } from "@latticexyz/store"; +import { TableId } from "@latticexyz/common"; +import { Address, Hex, decodeAbiParameters, parseAbiParameters } from "viem"; +import { debug } from "./debug"; +// TODO: move these type helpers into store? +import { Key, Value } from "@latticexyz/store-cache"; +import { isDefined } from "@latticexyz/common/utils"; +import { SchemaAbiType, StaticAbiType } from "@latticexyz/schema-type"; + +// TODO: change table schema/metadata APIs once we get both schema and field names in the same event (https://github.com/latticexyz/mud/pull/1182) + +// TODO: export these from store or world +export const schemaTableId = new TableId("mudstore", "schema"); +export const metadataTableId = new TableId("mudstore", "StoreMetadata"); + +export type StoreEventsLog = GetLogsResult[number]; +export type BlockLogs = GroupLogsByBlockNumberResult[number]; + +export type StoredTable = { + address: Address; + namespace: string; + name: string; + keySchema: Record; + valueSchema: Record; +}; + +export type BaseStorageOperation = { + log: StoreEventsLog; + namespace: string; +}; + +export type SetRecordOperation = BaseStorageOperation & { + type: "SetRecord"; +} & { + [TTable in keyof TConfig["tables"]]: { + name: TTable; + key: Key; + value: Value; + }; + }[keyof TConfig["tables"]]; + +export type SetFieldOperation = BaseStorageOperation & { + type: "SetField"; +} & { + [TTable in keyof TConfig["tables"]]: { + name: TTable; + key: Key; + } & { + [TValue in keyof Value]: { + fieldName: TValue; + fieldValue: Value[TValue]; + }; + }[keyof Value]; + }[keyof TConfig["tables"]]; + +export type DeleteRecordOperation = BaseStorageOperation & { + type: "DeleteRecord"; +} & { + [TTable in keyof TConfig["tables"]]: { + name: TTable; + key: Key; + }; + }[keyof TConfig["tables"]]; + +export type StorageOperation = + | SetFieldOperation + | SetRecordOperation + | DeleteRecordOperation; + +export type BlockLogsToStorageOptions = { + registerTables: (opts: { blockNumber: BlockLogs["blockNumber"]; tables: StoredTable[] }) => Promise; + getTables: (opts: { + blockNumber: BlockLogs["blockNumber"]; + tables: Pick[]; + }) => Promise; + storeOperations: (opts: { + blockNumber: BlockLogs["blockNumber"]; + operations: StorageOperation[]; + }) => Promise; +}; + +export type BlockLogsToStorageResult = (block: BlockLogs) => Promise<{ + blockNumber: BlockLogs["blockNumber"]; + operations: StorageOperation[]; +}>; + +type TableNamespace = string; +type TableName = string; +type TableKey = `${Address}:${TableNamespace}:${TableName}`; + +// hacky fix for schema registration + metadata events spanning multiple blocks +// TODO: remove this once schema registration+metadata is one event or tx (https://github.com/latticexyz/mud/pull/1182) +const visitedSchemas = new Map(); +const visitedMetadata = new Map< + TableKey, + { address: Address; tableId: TableId; keyNames: readonly string[]; valueNames: readonly string[] } +>(); + +export function blockLogsToStorage({ + registerTables, + getTables, + storeOperations, +}: BlockLogsToStorageOptions): BlockLogsToStorageResult { + return async (block) => { + const newTableKeys = new Set(); + + // First find all schema registration events. + block.logs.forEach((log) => { + if (log.eventName !== "StoreSetRecord") return; + if (log.args.table !== schemaTableId.toHex()) return; + + const [tableForSchema, ...otherKeys] = log.args.key; + if (otherKeys.length) { + debug("registerSchema event is expected to have only one key in key tuple, but got multiple", log); + } + + const tableId = TableId.fromHex(tableForSchema); + const schema = hexToTableSchema(log.args.data); + + const key: TableKey = `${log.address}:${tableId.namespace}:${tableId.name}`; + if (!visitedSchemas.has(key)) { + visitedSchemas.set(key, { address: log.address, tableId, schema }); + newTableKeys.add(key); + } + }); + + // Then find all metadata events. These should follow schema registration events and be in the same block (since they're in the same tx). + // TODO: rework contracts so schemas+tables are combined and immutable (https://github.com/latticexyz/mud/pull/1182) + block.logs.forEach((log) => { + if (log.eventName !== "StoreSetRecord") return; + if (log.args.table !== metadataTableId.toHex()) return; + + const [tableForSchema, ...otherKeys] = log.args.key; + if (otherKeys.length) { + debug("setMetadata event is expected to have only one key in key tuple, but got multiple", log); + } + + const tableId = TableId.fromHex(tableForSchema); + const [tableName, abiEncodedFieldNames] = decodeRecord( + // TODO: this is hardcoded for now while metadata is separate from table registration (https://github.com/latticexyz/mud/pull/1182) + { staticFields: [], dynamicFields: ["string", "bytes"] }, + log.args.data + ); + const valueNames = decodeAbiParameters(parseAbiParameters("string[]"), abiEncodedFieldNames as Hex)[0]; + // TODO: add key names to table registration when we refactor it (https://github.com/latticexyz/mud/pull/1182) + + const key: TableKey = `${log.address}:${tableId.namespace}:${tableName}`; + if (!visitedMetadata.has(key)) { + visitedMetadata.set(key, { address: log.address, tableId, keyNames: [], valueNames }); + newTableKeys.add(key); + } + }); + + const newTableIds = Array.from(newTableKeys).map((tableKey) => { + const [address, namespace, name] = tableKey.split(":"); + return { address: address as Hex, tableId: new TableId(namespace, name) }; + }); + + await registerTables({ + blockNumber: block.blockNumber, + tables: newTableIds + .map(({ address, tableId }) => { + const schema = Array.from(visitedSchemas.values()).find( + ({ address: schemaAddress, tableId: schemaTableId }) => + schemaAddress === address && schemaTableId.toHex() === tableId.toHex() + ); + const metadata = Array.from(visitedMetadata.values()).find( + ({ address: metadataAddress, tableId: metadataTableId }) => + metadataAddress === address && metadataTableId.toHex() === tableId.toHex() + ); + if (!schema) { + debug( + `no schema registration found for table ${tableId.toString()} in block ${block.blockNumber}, skipping` + ); + return; + } + if (!metadata) { + debug( + `no metadata registration found for table ${tableId.toString()} in block ${block.blockNumber}, skipping` + ); + return; + } + + const valueAbiTypes = [...schema.schema.valueSchema.staticFields, ...schema.schema.valueSchema.dynamicFields]; + + return { + address, + namespace: schema.tableId.namespace, + name: schema.tableId.name, + // TODO: replace with proper named key tuple (https://github.com/latticexyz/mud/pull/1182) + keySchema: Object.fromEntries(schema.schema.keySchema.staticFields.map((abiType, i) => [i, abiType])), + valueSchema: Object.fromEntries(valueAbiTypes.map((abiType, i) => [metadata.valueNames[i], abiType])), + }; + }) + .filter(isDefined), + }); + + const tableIds = Array.from( + new Set( + block.logs.map((log) => + JSON.stringify({ + address: log.address, + ...TableId.fromHex(log.args.table), + }) + ) + ) + ); + // TODO: combine these once we refactor table registration (https://github.com/latticexyz/mud/pull/1182) + const tables = Object.fromEntries( + ( + await getTables({ + blockNumber: block.blockNumber, + tables: tableIds.map((json) => JSON.parse(json)), + }) + ).map((table) => [`${table.address}:${new TableId(table.namespace, table.name).toHex()}`, table]) + ) as Record; + + const operations = block.logs + .map((log): StorageOperation | undefined => { + const tableId = TableId.fromHex(log.args.table); + const table = tables[`${log.address}:${log.args.table}`]; + if (!table) { + debug("no table found for event, skipping", tableId.toString(), log); + return; + } + + const keyNames = Object.keys(table.keySchema); + const keyValues = decodeKeyTuple( + { staticFields: Object.values(table.keySchema), dynamicFields: [] }, + log.args.key + ); + const key = Object.fromEntries(keyValues.map((value, i) => [keyNames[i], value])) as Key< + TConfig, + keyof TConfig["tables"] + >; + + const valueAbiTypes = Object.values(table.valueSchema); + const valueSchema = abiTypesToSchema(valueAbiTypes); + const fieldNames = Object.keys(table.valueSchema); + + if (log.eventName === "StoreSetRecord" || log.eventName === "StoreEphemeralRecord") { + const valueTuple = decodeRecord(valueSchema, log.args.data); + const value = Object.fromEntries(fieldNames.map((name, i) => [name, valueTuple[i]])) as Value< + TConfig, + keyof TConfig["tables"] + >; + // TODO: decide if we should handle ephemeral records separately? + // they'll eventually be turned into "events", but unclear if that should translate to client storage operations + return { + log, + type: "SetRecord", + ...tableId, + key, + value, + }; + } + + if (log.eventName === "StoreSetField") { + const fieldName = fieldNames[log.args.schemaIndex] as string & keyof Value; + const fieldValue = decodeField(valueAbiTypes[log.args.schemaIndex], log.args.data) as Value< + TConfig, + keyof TConfig["tables"] + >[typeof fieldName]; + return { + log, + type: "SetField", + ...tableId, + key, + fieldName, + fieldValue, + }; + } + + if (log.eventName === "StoreDeleteRecord") { + return { + log, + type: "DeleteRecord", + ...tableId, + key, + }; + } + + debug("unknown store event or log, skipping", log); + return; + }) + .filter(isDefined); + + await storeOperations({ blockNumber: block.blockNumber, operations }); + + return { + blockNumber: block.blockNumber, + operations, + }; + }; +} diff --git a/packages/store-sync/src/index.ts b/packages/store-sync/src/index.ts index 209e8e1fc5..c6f5104063 100644 --- a/packages/store-sync/src/index.ts +++ b/packages/store-sync/src/index.ts @@ -1 +1 @@ -export * from "./blockEventsToStorage"; +export * from "./blockLogsToStorage";