From 5df1f31bc9d35969de6f03396905778748017f38 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 1 Dec 2023 13:37:48 +0000 Subject: [PATCH] feat(store-sync,store-indexer): sync from getLogs indexer endpoint (#1973) --- .changeset/angry-peas-heal.md | 16 +++ .changeset/wet-crabs-punch.md | 5 + .changeset/wicked-donuts-cheat.md | 5 + e2e/packages/sync-test/indexerSync.test.ts | 29 ++-- packages/common/src/utils/chunk.ts | 2 +- .../store-indexer/src/postgres/getLogs.ts | 5 +- .../src/sqlite/createQueryAdapter.ts | 56 +------- .../src/sqlite/getTablesWithRecords.ts | 75 +++++++++++ packages/store-sync/src/common.ts | 18 ++- packages/store-sync/src/createStoreSync.ts | 124 +++++++----------- packages/store-sync/src/getSnapshot.ts | 69 ++++++++++ packages/store-sync/src/index.ts | 2 + packages/store-sync/src/sqlite/getTables.ts | 6 +- .../store-sync/src/tablesWithRecordsToLogs.ts | 25 ++++ .../store-sync/src/trpc-indexer/common.ts | 4 +- 15 files changed, 289 insertions(+), 152 deletions(-) create mode 100644 .changeset/angry-peas-heal.md create mode 100644 .changeset/wet-crabs-punch.md create mode 100644 .changeset/wicked-donuts-cheat.md create mode 100644 packages/store-indexer/src/sqlite/getTablesWithRecords.ts create mode 100644 packages/store-sync/src/getSnapshot.ts create mode 100644 packages/store-sync/src/tablesWithRecordsToLogs.ts diff --git a/.changeset/angry-peas-heal.md b/.changeset/angry-peas-heal.md new file mode 100644 index 0000000000..9aa4513d05 --- /dev/null +++ b/.changeset/angry-peas-heal.md @@ -0,0 +1,16 @@ +--- +"@latticexyz/store-sync": minor +--- + +Refactored how we fetch snapshots from an indexer, preferring the new `getLogs` endpoint and falling back to the previous `findAll` if it isn't available. This refactor also prepares for an easier entry point for adding client caching of snapshots. + +The `initialState` option for various sync methods (`syncToPostgres`, `syncToRecs`, etc.) is now deprecated in favor of `initialBlockLogs`. For now, we'll automatically convert `initialState` into `initialBlockLogs`, but if you want to update your code, you can do: + +```ts +import { tablesWithRecordsToLogs } from "@latticexyz/store-sync"; + +const initialBlockLogs = { + blockNumber: initialState.blockNumber, + logs: tablesWithRecordsToLogs(initialState.tables), +}; +``` diff --git a/.changeset/wet-crabs-punch.md b/.changeset/wet-crabs-punch.md new file mode 100644 index 0000000000..eda8919841 --- /dev/null +++ b/.changeset/wet-crabs-punch.md @@ -0,0 +1,5 @@ +--- +"@latticexyz/common": minor +--- + +Updated `chunk` types to use readonly arrays diff --git a/.changeset/wicked-donuts-cheat.md b/.changeset/wicked-donuts-cheat.md new file mode 100644 index 0000000000..cb82c2abbc --- /dev/null +++ b/.changeset/wicked-donuts-cheat.md @@ -0,0 +1,5 @@ +--- +"@latticexyz/store-indexer": minor +--- + +Added `getLogs` query support to sqlite indexer diff --git a/e2e/packages/sync-test/indexerSync.test.ts b/e2e/packages/sync-test/indexerSync.test.ts index 61bc5a0e5f..6f3f3a751a 100644 --- a/e2e/packages/sync-test/indexerSync.test.ts +++ b/e2e/packages/sync-test/indexerSync.test.ts @@ -58,7 +58,7 @@ describe("Sync from indexer", async () => { await waitForInitialSync(page); expect(asyncErrorHandler.getErrors()).toHaveLength(1); - expect(asyncErrorHandler.getErrors()[0]).toContain("error fetching initial state from indexer"); + expect(asyncErrorHandler.getErrors()[0]).toContain("error getting snapshot"); }); describe.each([["sqlite"], ["postgres"]] as const)("%s indexer", (indexerType) => { @@ -128,18 +128,21 @@ describe("Sync from indexer", async () => { await waitForInitialSync(page); const entities = await callPageFunction(page, "getKeys", ["Position"]); - expect(entities).toEqual([ - { - x: 1, - y: 1, - zone: "0x6d61703100000000000000000000000000000000000000000000000000000000", - }, - { - x: 2, - y: -2, - zone: "0x6d61703100000000000000000000000000000000000000000000000000000000", - }, - ]); + expect(entities).toEqual( + // TODO: figure out how to make this consistently return the same order? may require https://github.com/latticexyz/mud/issues/1979 + expect.arrayContaining([ + { + x: 1, + y: 1, + zone: "0x6d61703100000000000000000000000000000000000000000000000000000000", + }, + { + x: 2, + y: -2, + zone: "0x6d61703100000000000000000000000000000000000000000000000000000000", + }, + ]) + ); // Should not have thrown errors asyncErrorHandler.expectNoAsyncErrors(); diff --git a/packages/common/src/utils/chunk.ts b/packages/common/src/utils/chunk.ts index 796b8ebca7..bab2facefa 100644 --- a/packages/common/src/utils/chunk.ts +++ b/packages/common/src/utils/chunk.ts @@ -1,4 +1,4 @@ -export function* chunk(arr: T[], n: number): Generator { +export function* chunk(arr: readonly T[], n: number): Generator { for (let i = 0; i < arr.length; i += n) { yield arr.slice(i, i + n); } diff --git a/packages/store-indexer/src/postgres/getLogs.ts b/packages/store-indexer/src/postgres/getLogs.ts index e2b4f54ee9..66860aee7c 100644 --- a/packages/store-indexer/src/postgres/getLogs.ts +++ b/packages/store-indexer/src/postgres/getLogs.ts @@ -54,7 +54,10 @@ export async function getLogs( .select() .from(tables.recordsTable) .where(or(...conditions)) - .orderBy(asc(tables.recordsTable.lastUpdatedBlockNumber)); + .orderBy( + asc(tables.recordsTable.lastUpdatedBlockNumber) + // TODO: add logIndex (https://github.com/latticexyz/mud/issues/1979) + ); const blockNumber = records.reduce( (max, record) => bigIntMax(max, record.lastUpdatedBlockNumber ?? 0n), diff --git a/packages/store-indexer/src/sqlite/createQueryAdapter.ts b/packages/store-indexer/src/sqlite/createQueryAdapter.ts index 8d68d5f7df..2ceb89c3fe 100644 --- a/packages/store-indexer/src/sqlite/createQueryAdapter.ts +++ b/packages/store-indexer/src/sqlite/createQueryAdapter.ts @@ -1,10 +1,7 @@ -import { eq } from "drizzle-orm"; import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; -import { buildTable, chainState, getTables } from "@latticexyz/store-sync/sqlite"; import { QueryAdapter } from "@latticexyz/store-sync/trpc-indexer"; -import { debug } from "../debug"; -import { getAddress } from "viem"; -import { decodeDynamicField } from "@latticexyz/protocol-parser"; +import { getTablesWithRecords } from "./getTablesWithRecords"; +import { tablesWithRecordsToLogs } from "@latticexyz/store-sync"; /** * Creates a storage adapter for the tRPC server/client to query data from SQLite. @@ -15,51 +12,12 @@ import { decodeDynamicField } from "@latticexyz/protocol-parser"; export async function createQueryAdapter(database: BaseSQLiteDatabase<"sync", any>): Promise { const adapter: QueryAdapter = { async getLogs(opts) { - // TODO - throw new Error("Not implemented"); + const { blockNumber, tables } = getTablesWithRecords(database, opts); + const logs = tablesWithRecordsToLogs(tables); + return { blockNumber: blockNumber ?? 0n, logs }; }, - async findAll({ chainId, address, filters = [] }) { - // If _any_ filter has a table ID, this will filter down all data to just those tables. Which mean we can't yet mix table filters with key-only filters. - // TODO: improve this so we can express this in the query (need to be able to query data across tables more easily) - const tableIds = Array.from(new Set(filters.map((filter) => filter.tableId))); - const tables = getTables(database) - .filter((table) => address == null || getAddress(address) === getAddress(table.address)) - .filter((table) => !tableIds.length || tableIds.includes(table.tableId)); - - const tablesWithRecords = tables.map((table) => { - const sqliteTable = buildTable(table); - const records = database.select().from(sqliteTable).where(eq(sqliteTable.__isDeleted, false)).all(); - const filteredRecords = !filters.length - ? records - : records.filter((record) => { - const keyTuple = decodeDynamicField("bytes32[]", record.__key); - return filters.some( - (filter) => - filter.tableId === table.tableId && - (filter.key0 == null || filter.key0 === keyTuple[0]) && - (filter.key1 == null || filter.key1 === keyTuple[1]) - ); - }); - return { - ...table, - records: filteredRecords.map((record) => ({ - key: Object.fromEntries(Object.entries(table.keySchema).map(([name]) => [name, record[name]])), - value: Object.fromEntries(Object.entries(table.valueSchema).map(([name]) => [name, record[name]])), - })), - }; - }); - - const metadata = database.select().from(chainState).where(eq(chainState.chainId, chainId)).all(); - const { lastUpdatedBlockNumber } = metadata[0] ?? {}; - - const result = { - blockNumber: lastUpdatedBlockNumber ?? null, - tables: tablesWithRecords, - }; - - debug("findAll", chainId, address, result); - - return result; + async findAll(opts) { + return getTablesWithRecords(database, opts); }, }; return adapter; diff --git a/packages/store-indexer/src/sqlite/getTablesWithRecords.ts b/packages/store-indexer/src/sqlite/getTablesWithRecords.ts new file mode 100644 index 0000000000..987de4e641 --- /dev/null +++ b/packages/store-indexer/src/sqlite/getTablesWithRecords.ts @@ -0,0 +1,75 @@ +import { asc, eq } from "drizzle-orm"; +import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; +import { buildTable, chainState, getTables } from "@latticexyz/store-sync/sqlite"; +import { Hex, getAddress } from "viem"; +import { decodeDynamicField } from "@latticexyz/protocol-parser"; +import { SyncFilter, TableWithRecords } from "@latticexyz/store-sync"; + +// TODO: refactor sqlite and replace this with getLogs to match postgres (https://github.com/latticexyz/mud/issues/1970) + +/** + * @deprecated + * */ +export function getTablesWithRecords( + database: BaseSQLiteDatabase<"sync", any>, + { + chainId, + address, + filters = [], + }: { + readonly chainId: number; + readonly address?: Hex; + readonly filters?: readonly SyncFilter[]; + } +): { blockNumber: bigint | null; tables: readonly TableWithRecords[] } { + const metadata = database + .select() + .from(chainState) + .where(eq(chainState.chainId, chainId)) + .limit(1) + .all() + .find(() => true); + + // If _any_ filter has a table ID, this will filter down all data to just those tables. Which mean we can't yet mix table filters with key-only filters. + // TODO: improve this so we can express this in the query (need to be able to query data across tables more easily) + const tableIds = Array.from(new Set(filters.map((filter) => filter.tableId))); + const tables = getTables(database) + .filter((table) => address == null || getAddress(address) === getAddress(table.address)) + .filter((table) => !tableIds.length || tableIds.includes(table.tableId)); + + const tablesWithRecords = tables.map((table) => { + const sqliteTable = buildTable(table); + const records = database + .select() + .from(sqliteTable) + .where(eq(sqliteTable.__isDeleted, false)) + .orderBy( + asc(sqliteTable.__lastUpdatedBlockNumber) + // TODO: add logIndex (https://github.com/latticexyz/mud/issues/1979) + ) + .all(); + const filteredRecords = !filters.length + ? records + : records.filter((record) => { + const keyTuple = decodeDynamicField("bytes32[]", record.__key); + return filters.some( + (filter) => + filter.tableId === table.tableId && + (filter.key0 == null || filter.key0 === keyTuple[0]) && + (filter.key1 == null || filter.key1 === keyTuple[1]) + ); + }); + return { + ...table, + records: filteredRecords.map((record) => ({ + key: Object.fromEntries(Object.entries(table.keySchema).map(([name]) => [name, record[name]])), + value: Object.fromEntries(Object.entries(table.valueSchema).map(([name]) => [name, record[name]])), + })), + }; + }); + + return { + blockNumber: metadata?.lastUpdatedBlockNumber ?? null, + tables: tablesWithRecords, + }; +} diff --git a/packages/store-sync/src/common.ts b/packages/store-sync/src/common.ts index 11418e2186..a7873ccbd9 100644 --- a/packages/store-sync/src/common.ts +++ b/packages/store-sync/src/common.ts @@ -34,7 +34,7 @@ export type Table = { export type TableWithRecords = Table & { records: TableRecord[] }; export type StoreEventsLog = Log; -export type BlockLogs = { blockNumber: StoreEventsLog["blockNumber"]; logs: StoreEventsLog[] }; +export type BlockLogs = { blockNumber: StoreEventsLog["blockNumber"]; logs: readonly StoreEventsLog[] }; // only two keys for now, to reduce complexity of creating indexes on SQL tables // TODO: make tableId optional to enable filtering just on keys (any table) @@ -90,11 +90,19 @@ export type SyncOptions = { */ indexerUrl?: string; /** - * Optional initial state to hydrate from. Useful if you're hydrating from your own indexer or cache. + * Optional initial state to hydrate from. Useful if you're hydrating from an indexer or cache. + * @deprecated Use `initialLogs` option instead. */ initialState?: { - blockNumber: bigint | null; - tables: TableWithRecords[]; + blockNumber: bigint; + tables: readonly TableWithRecords[]; + }; + /** + * Optional initial logs to hydrate from. Useful if you're hydrating from an indexer or cache. + */ + initialBlockLogs?: { + blockNumber: bigint; + logs: readonly StorageAdapterLog[]; }; }; @@ -108,7 +116,7 @@ export type SyncResult = { // TODO: add optional, original log to this? export type StorageAdapterLog = Partial & UnionPick; -export type StorageAdapterBlock = { blockNumber: BlockLogs["blockNumber"]; logs: StorageAdapterLog[] }; +export type StorageAdapterBlock = { blockNumber: BlockLogs["blockNumber"]; logs: readonly StorageAdapterLog[] }; export type StorageAdapter = (block: StorageAdapterBlock) => Promise; export const schemasTableId = storeTables.Tables.tableId; diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index aa0f0bb874..59d83bdf51 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -1,16 +1,6 @@ import { StoreConfig, storeEventsAbi } from "@latticexyz/store"; -import { Hex, TransactionReceiptNotFoundError, encodeAbiParameters, parseAbiParameters } from "viem"; -import { - StorageAdapter, - StorageAdapterBlock, - StorageAdapterLog, - SyncFilter, - SyncOptions, - SyncResult, - TableWithRecords, - internalTableIds, - storeTables, -} from "./common"; +import { Hex, TransactionReceiptNotFoundError } from "viem"; +import { StorageAdapter, StorageAdapterBlock, SyncFilter, SyncOptions, SyncResult, internalTableIds } from "./common"; import { createBlockStream, blockRangeToLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream"; import { filter, @@ -31,11 +21,9 @@ import { identity, } from "rxjs"; import { debug as parentDebug } from "./debug"; -import { createIndexerClient } from "./trpc-indexer"; import { SyncStep } from "./SyncStep"; -import { chunk, isDefined } from "@latticexyz/common/utils"; -import { encodeKey, encodeValueArgs } from "@latticexyz/protocol-parser"; -import { tableToLog } from "./tableToLog"; +import { bigIntMax, chunk, isDefined } from "@latticexyz/common/utils"; +import { getSnapshot } from "./getSnapshot"; const debug = parentDebug.extend("createStoreSync"); @@ -62,57 +50,53 @@ export async function createStoreSync startBlock: initialStartBlock = 0n, maxBlockRange, initialState, + initialBlockLogs, indexerUrl, }: CreateStoreSyncOptions): Promise { const filters: SyncFilter[] = initialFilters.length || tableIds.length ? [...initialFilters, ...tableIds.map((tableId) => ({ tableId })), ...defaultFilters] : []; - const initialState$ = defer( - async (): Promise< - | { - blockNumber: bigint | null; - tables: TableWithRecords[]; - } - | undefined - > => { - if (initialState) return initialState; - if (!indexerUrl) return; - debug("fetching initial state from indexer", indexerUrl); + const initialBlockLogs$ = defer(async (): Promise => { + const chainId = publicClient.chain?.id ?? (await publicClient.getChainId()); - onProgress?.({ - step: SyncStep.SNAPSHOT, - percentage: 0, - latestBlockNumber: 0n, - lastBlockNumberProcessed: 0n, - message: "Fetching snapshot from indexer", - }); + onProgress?.({ + step: SyncStep.SNAPSHOT, + percentage: 0, + latestBlockNumber: 0n, + lastBlockNumberProcessed: 0n, + message: "Getting snapshot", + }); - const indexer = createIndexerClient({ url: indexerUrl }); - const chainId = publicClient.chain?.id ?? (await publicClient.getChainId()); - const result = await indexer.findAll.query({ chainId, address, filters }); + const snapshot = await getSnapshot({ + chainId, + address, + filters, + initialState, + initialBlockLogs, + indexerUrl, + }); - onProgress?.({ - step: SyncStep.SNAPSHOT, - percentage: 100, - latestBlockNumber: 0n, - lastBlockNumberProcessed: 0n, - message: "Fetched snapshot from indexer", - }); + onProgress?.({ + step: SyncStep.SNAPSHOT, + percentage: 100, + latestBlockNumber: 0n, + lastBlockNumberProcessed: 0n, + message: "Got snapshot", + }); - return result; - } - ).pipe( + return snapshot; + }).pipe( catchError((error) => { - debug("error fetching initial state from indexer", error); + debug("error getting snapshot", error); onProgress?.({ step: SyncStep.SNAPSHOT, percentage: 100, latestBlockNumber: 0n, lastBlockNumberProcessed: initialStartBlock, - message: "Failed to fetch snapshot from indexer", + message: "Failed to get snapshot", }); return of(undefined); @@ -120,19 +104,10 @@ export async function createStoreSync shareReplay(1) ); - const startBlock$ = initialState$.pipe( - map((initialState) => initialState?.blockNumber ?? initialStartBlock), - // TODO: if start block is still 0, find via deploy event - tap((startBlock) => debug("starting sync from block", startBlock)) - ); - - const initialLogs$ = initialState$.pipe( - filter( - (initialState): initialState is { blockNumber: bigint; tables: TableWithRecords[] } => - initialState != null && initialState.blockNumber != null && initialState.tables.length > 0 - ), - concatMap(async ({ blockNumber, tables }) => { - debug("hydrating from initial state to block", blockNumber); + const storedInitialBlockLogs$ = initialBlockLogs$.pipe( + filter(isDefined), + concatMap(async ({ blockNumber, logs }) => { + debug("hydrating", logs.length, "logs to block", blockNumber); onProgress?.({ step: SyncStep.SNAPSHOT, @@ -142,23 +117,6 @@ export async function createStoreSync message: "Hydrating from snapshot", }); - const logs: StorageAdapterLog[] = [ - ...tables.map(tableToLog), - ...tables.flatMap((table) => - table.records.map( - (record): StorageAdapterLog => ({ - eventName: "Store_SetRecord", - address: table.address, - args: { - tableId: table.tableId, - keyTuple: encodeKey(table.keySchema, record.key), - ...encodeValueArgs(table.valueSchema, record.value), - }, - }) - ) - ), - ]; - // Split snapshot operations into chunks so we can update the progress callback (and ultimately render visual progress for the user). // This isn't ideal if we want to e.g. batch load these into a DB in a single DB tx, but we'll take it. // @@ -189,6 +147,12 @@ export async function createStoreSync shareReplay(1) ); + const startBlock$ = initialBlockLogs$.pipe( + map((block) => bigIntMax(block?.blockNumber ?? 0n, initialStartBlock)), + // TODO: if start block is still 0, find via deploy event + tap((startBlock) => debug("starting sync from block", startBlock)) + ); + const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }).pipe(shareReplay(1)); const latestBlockNumber$ = latestBlock$.pipe( map((block) => block.number), @@ -231,7 +195,7 @@ export async function createStoreSync let lastBlockNumberProcessed: bigint | null = null; const storedBlockLogs$ = concat( - initialLogs$, + storedInitialBlockLogs$, blockLogs$.pipe( concatMap(async (block) => { await storageAdapter(block); diff --git a/packages/store-sync/src/getSnapshot.ts b/packages/store-sync/src/getSnapshot.ts new file mode 100644 index 0000000000..79ddcf90cc --- /dev/null +++ b/packages/store-sync/src/getSnapshot.ts @@ -0,0 +1,69 @@ +import { StorageAdapterBlock, SyncOptions } from "./common"; +import { debug as parentDebug } from "./debug"; +import { createIndexerClient } from "./trpc-indexer"; +import { TRPCClientError } from "@trpc/client"; +import { tablesWithRecordsToLogs } from "./tablesWithRecordsToLogs"; + +const debug = parentDebug.extend("getSnapshot"); + +type GetSnapshotOptions = Pick< + SyncOptions, + "address" | "filters" | "indexerUrl" | "initialBlockLogs" | "initialState" +> & { + chainId: number; +}; + +export async function getSnapshot({ + chainId, + address, + filters, + initialState, + initialBlockLogs, + indexerUrl, +}: GetSnapshotOptions): Promise { + // TODO: extend types to enforce this + if (initialBlockLogs && initialState) { + throw new Error("Only one of initialBlockLogs or initialState should be provided."); + } + + if (initialBlockLogs) return initialBlockLogs; + + // Backwards compatibility with older indexers + // TODO: remove in the future + if (initialState) { + return { + blockNumber: initialState.blockNumber, + logs: tablesWithRecordsToLogs(initialState.tables), + }; + } + + if (!indexerUrl) return; + + const indexer = createIndexerClient({ url: indexerUrl }); + + try { + debug("fetching logs from indexer", indexerUrl); + return await indexer.getLogs.query({ chainId, address, filters }); + } catch (error) { + if (error instanceof TRPCClientError) { + // Backwards compatibility with older indexers + // TODO: remove in the future + debug("failed to fetch logs, fetching table records instead", indexerUrl); + const result = await indexer.findAll.query({ chainId, address, filters }); + // warn after we fetch from old endpoint so we know that the indexer is accessible + console.warn( + `The indexer at ${indexerUrl} appears to be outdated. Consider upgrading to a recent version for better performance.` + ); + + // if the indexer returns no block number, it hasn't indexed this chain + if (result.blockNumber == null) { + return; + } + return { + blockNumber: result.blockNumber, + logs: tablesWithRecordsToLogs(result.tables), + }; + } + throw error; + } +} diff --git a/packages/store-sync/src/index.ts b/packages/store-sync/src/index.ts index deb192e47d..7efbf9258d 100644 --- a/packages/store-sync/src/index.ts +++ b/packages/store-sync/src/index.ts @@ -3,3 +3,5 @@ export * from "./createStoreSync"; export * from "./SyncStep"; export * from "./isTableRegistrationLog"; export * from "./logToTable"; +export * from "./tablesWithRecordsToLogs"; +export * from "./tableToLog"; diff --git a/packages/store-sync/src/sqlite/getTables.ts b/packages/store-sync/src/sqlite/getTables.ts index f0db291ddb..008a772167 100644 --- a/packages/store-sync/src/sqlite/getTables.ts +++ b/packages/store-sync/src/sqlite/getTables.ts @@ -1,5 +1,5 @@ import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; -import { inArray } from "drizzle-orm"; +import { asc, inArray } from "drizzle-orm"; import { Table } from "../common"; import { getTableName } from "./getTableName"; import { mudStoreTables } from "./internalTables"; @@ -15,6 +15,10 @@ export function getTables( .select() .from(mudStoreTables) .where(ids.length ? inArray(mudStoreTables.id, ids) : undefined) + .orderBy( + asc(mudStoreTables.lastUpdatedBlockNumber) + // TODO: add logIndex (https://github.com/latticexyz/mud/issues/1979) + ) .all(); return tables; diff --git a/packages/store-sync/src/tablesWithRecordsToLogs.ts b/packages/store-sync/src/tablesWithRecordsToLogs.ts new file mode 100644 index 0000000000..a7e35d5a89 --- /dev/null +++ b/packages/store-sync/src/tablesWithRecordsToLogs.ts @@ -0,0 +1,25 @@ +import { StorageAdapterLog, TableWithRecords } from "./common"; +import { encodeKey, encodeValueArgs } from "@latticexyz/protocol-parser"; +import { tableToLog } from "./tableToLog"; + +/** + * @internal + */ +export function tablesWithRecordsToLogs(tables: readonly TableWithRecords[]): StorageAdapterLog[] { + return [ + ...tables.map(tableToLog), + ...tables.flatMap((table) => + table.records.map( + (record): StorageAdapterLog => ({ + eventName: "Store_SetRecord", + address: table.address, + args: { + tableId: table.tableId, + keyTuple: encodeKey(table.keySchema, record.key), + ...encodeValueArgs(table.valueSchema, record.value), + }, + }) + ) + ), + ]; +} diff --git a/packages/store-sync/src/trpc-indexer/common.ts b/packages/store-sync/src/trpc-indexer/common.ts index eee07715a3..63bcd22ec3 100644 --- a/packages/store-sync/src/trpc-indexer/common.ts +++ b/packages/store-sync/src/trpc-indexer/common.ts @@ -5,9 +5,9 @@ export type QueryAdapter = { /** * @deprecated */ - findAll: (opts: { chainId: number; address?: Hex; filters?: SyncFilter[] }) => Promise<{ + findAll: (opts: { chainId: number; address?: Hex; filters?: readonly SyncFilter[] }) => Promise<{ blockNumber: bigint | null; - tables: TableWithRecords[]; + tables: readonly TableWithRecords[]; }>; getLogs: (opts: { readonly chainId: number;