From 2e5ebfa715ce7fc3fdb885ebdce74f5a636cf842 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Wed, 2 Aug 2023 18:39:35 +0100 Subject: [PATCH 01/17] add syncToSqlite --- packages/store-sync/src/common.ts | 50 +++++++- packages/store-sync/src/recs/syncToRecs.ts | 39 ++---- .../store-sync/src/sqlite/syncToSqlite.ts | 115 ++++++++++++++++++ .../store-sync/src/trpc-indexer/common.ts | 2 +- .../src/trpc-indexer/createAppRouter.ts | 2 +- 5 files changed, 178 insertions(+), 30 deletions(-) create mode 100644 packages/store-sync/src/sqlite/syncToSqlite.ts diff --git a/packages/store-sync/src/common.ts b/packages/store-sync/src/common.ts index 41f00420d7..eb718a9b06 100644 --- a/packages/store-sync/src/common.ts +++ b/packages/store-sync/src/common.ts @@ -1,4 +1,4 @@ -import { Address, Hex } from "viem"; +import { Address, Block, Chain, Hex, PublicClient, TransactionReceipt, Transport } from "viem"; import { GetLogsResult, GroupLogsByBlockNumberResult, NonPendingLog } from "@latticexyz/block-logs-stream"; import { StoreEventsAbi, @@ -7,7 +7,10 @@ import { ValueSchema, ConfigToKeyPrimitives as Key, ConfigToValuePrimitives as Value, + TableRecord, } from "@latticexyz/store"; +import { Observable } from "rxjs"; +import { BlockStorageOperations } from "./blockLogsToStorage"; export type ChainId = number; export type WorldId = `${ChainId}:${Address}`; @@ -70,3 +73,48 @@ export type StorageOperation = | SetFieldOperation | SetRecordOperation | DeleteRecordOperation; + +export type SyncOptions = { + /** + * MUD config + */ + config: TConfig; + /** + * [viem `PublicClient`][0] used for fetching logs from the RPC. + * + * [0]: https://viem.sh/docs/clients/public.html + */ + publicClient: PublicClient; + /** + * MUD Store/World contract address + */ + address?: Address; + /** + * Optional block number to start indexing from. Useful for resuming the indexer from a particular point in time or starting after a particular contract deployment. + */ + startBlock?: bigint; + /** + * Optional maximum block range, if your RPC limits the amount of blocks fetched at a time. + */ + maxBlockRange?: bigint; + /** + * Optional MUD tRPC indexer URL to fetch initial state from. + */ + indexerUrl?: string; + /** + * Optional initial state to hydrate from. Useful if you're hydrating from your own indexer or cache. + */ + initialState?: { + blockNumber: bigint | null; + tables: (Table & { records: TableRecord[] })[]; + }; +}; + +export type SyncResult = { + latestBlock$: Observable; + latestBlockNumber$: Observable; + blockLogs$: Observable; + blockStorageOperations$: Observable>; + waitForTransaction: (tx: Hex) => Promise<{ receipt: TransactionReceipt }>; + destroy: () => void; +}; diff --git a/packages/store-sync/src/recs/syncToRecs.ts b/packages/store-sync/src/recs/syncToRecs.ts index 3a57f643ef..b04ddf5784 100644 --- a/packages/store-sync/src/recs/syncToRecs.ts +++ b/packages/store-sync/src/recs/syncToRecs.ts @@ -1,5 +1,5 @@ import { StoreConfig, storeEventsAbi } from "@latticexyz/store"; -import { Address, Block, Chain, Hex, PublicClient, TransactionReceipt, Transport } from "viem"; +import { Hex, TransactionReceipt } from "viem"; import { ComponentValue, Entity, @@ -9,16 +9,15 @@ import { getComponentValue, setComponent, } from "@latticexyz/recs"; -import { BlockLogs, Table } from "../common"; -import { TableRecord } from "@latticexyz/store"; +import { SyncOptions, SyncResult } from "../common"; import { createBlockStream, isNonPendingBlock, blockRangeToLogs, groupLogsByBlockNumber, } from "@latticexyz/block-logs-stream"; -import { filter, map, tap, mergeMap, from, concatMap, Observable, share, firstValueFrom } from "rxjs"; -import { BlockStorageOperations, blockLogsToStorage } from "../blockLogsToStorage"; +import { filter, map, tap, mergeMap, from, concatMap, share, firstValueFrom } from "rxjs"; +import { blockLogsToStorage } from "../blockLogsToStorage"; import { recsStorage } from "./recsStorage"; import { hexKeyTupleToEntity } from "./hexKeyTupleToEntity"; import { debug } from "./debug"; @@ -34,19 +33,10 @@ type SyncToRecsOptions< string, RecsComponent > -> = { +> = SyncOptions & { world: RecsWorld; - config: TConfig; - address: Address; - // TODO: make this optional and return one if none provided (but will need chain ID at least) - publicClient: PublicClient; // TODO: generate these from config and return instead? components: TComponents; - indexerUrl?: string; - initialState?: { - blockNumber: bigint | null; - tables: (Table & { records: TableRecord[] })[]; - }; }; type SyncToRecsResult< @@ -55,16 +45,10 @@ type SyncToRecsResult< string, RecsComponent > -> = { +> = SyncResult & { // TODO: return publicClient? components: TComponents & ReturnType; singletonEntity: Entity; - latestBlock$: Observable; - latestBlockNumber$: Observable; - blockLogs$: Observable; - blockStorageOperations$: Observable>; - waitForTransaction: (tx: Hex) => Promise<{ receipt: TransactionReceipt }>; - destroy: () => void; }; export async function syncToRecs< @@ -79,6 +63,8 @@ export async function syncToRecs< address, publicClient, components: initialComponents, + startBlock = 0n, + maxBlockRange, initialState, indexerUrl, }: SyncToRecsOptions): Promise> { @@ -89,8 +75,6 @@ export async function syncToRecs< const singletonEntity = world.registerEntity({ id: hexKeyTupleToEntity([]) }); - let startBlock = 0n; - if (indexerUrl != null && initialState == null) { const indexer = createIndexerClient({ url: indexerUrl }); try { @@ -172,17 +156,18 @@ export async function syncToRecs< publicClient, address, events: storeEventsAbi, + maxBlockRange, }), mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))), share() ); - let latestBlockNumberProcessed: bigint | null = null; + let lastBlockNumberProcessed: bigint | null = null; const blockStorageOperations$ = blockLogs$.pipe( concatMap(blockLogsToStorage(recsStorage({ components, config }))), tap(({ blockNumber, operations }) => { debug("stored", operations.length, "operations for block", blockNumber); - latestBlockNumberProcessed = blockNumber; + lastBlockNumberProcessed = blockNumber; if ( latestBlockNumber != null && @@ -217,7 +202,7 @@ export async function syncToRecs< const receipt = await publicClient.waitForTransactionReceipt({ hash: tx }); // If we haven't processed a block yet or we haven't processed the block for the tx, wait for it - if (latestBlockNumberProcessed == null || latestBlockNumberProcessed < receipt.blockNumber) { + if (lastBlockNumberProcessed == null || lastBlockNumberProcessed < receipt.blockNumber) { await firstValueFrom( blockStorageOperations$.pipe( filter(({ blockNumber }) => blockNumber != null && blockNumber >= receipt.blockNumber) diff --git a/packages/store-sync/src/sqlite/syncToSqlite.ts b/packages/store-sync/src/sqlite/syncToSqlite.ts new file mode 100644 index 0000000000..ae95618d8d --- /dev/null +++ b/packages/store-sync/src/sqlite/syncToSqlite.ts @@ -0,0 +1,115 @@ +import { Hex, TransactionReceipt } from "viem"; +import { + createBlockStream, + isNonPendingBlock, + blockRangeToLogs, + groupLogsByBlockNumber, +} from "@latticexyz/block-logs-stream"; +import { concatMap, filter, firstValueFrom, from, map, mergeMap, share, tap } from "rxjs"; +import { StoreConfig, storeEventsAbi } from "@latticexyz/store"; +import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; +import { debug } from "../debug"; +import { SyncOptions, SyncResult } from "../common"; +import { blockLogsToStorage } from "../blockLogsToStorage"; +import { sqliteStorage } from "./sqliteStorage"; + +type SyncToSqliteOptions = SyncOptions & { + /** + * [SQLite database object from Drizzle][0]. + * + * [0]: https://orm.drizzle.team/docs/installation-and-db-connection/sqlite/better-sqlite3 + */ + database: BaseSQLiteDatabase<"sync", any>; +}; + +type SyncToSqliteResult = SyncResult; + +/** + * Creates an indexer to process and store blockchain events. + * + * @param {CreateIndexerOptions} options See `CreateIndexerOptions`. + * @returns A function to unsubscribe from the block stream, effectively stopping the indexer. + */ +export function syncToSqlite({ + database, + publicClient, + address, + startBlock = 0n, + maxBlockRange, + indexerUrl, + initialState, +}: SyncToSqliteOptions): SyncToSqliteResult { + // TODO: sync initial state + + const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }).pipe(share()); + + const latestBlockNumber$ = latestBlock$.pipe( + filter(isNonPendingBlock), + map((block) => block.number), + share() + ); + + let latestBlockNumber: bigint | null = null; + const blockLogs$ = latestBlockNumber$.pipe( + tap((blockNumber) => { + debug("latest block number", blockNumber); + latestBlockNumber = blockNumber; + }), + map((blockNumber) => ({ startBlock, endBlock: blockNumber })), + blockRangeToLogs({ + publicClient, + address, + events: storeEventsAbi, + maxBlockRange, + }), + tap(({ fromBlock, toBlock, logs }) => { + debug("found", logs.length, "logs for block", fromBlock, "-", toBlock); + }), + mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))), + share() + ); + + let lastBlockNumberProcessed: bigint | null = null; + const blockStorageOperations$ = blockLogs$.pipe( + concatMap(blockLogsToStorage(sqliteStorage({ database, publicClient }))), + tap(({ blockNumber, operations }) => { + debug("stored", operations.length, "operations for block", blockNumber); + lastBlockNumberProcessed = blockNumber; + + // TODO: store some notion of sync progress? + }), + share() + ); + + // Start the sync + const sub = blockStorageOperations$.subscribe(); + + async function waitForTransaction(tx: Hex): Promise<{ + receipt: TransactionReceipt; + }> { + // Wait for tx to be mined + const receipt = await publicClient.waitForTransactionReceipt({ hash: tx }); + + // If we haven't processed a block yet or we haven't processed the block for the tx, wait for it + if (lastBlockNumberProcessed == null || lastBlockNumberProcessed < receipt.blockNumber) { + await firstValueFrom( + blockStorageOperations$.pipe( + filter(({ blockNumber }) => blockNumber != null && blockNumber >= receipt.blockNumber) + ) + ); + } + + return { receipt }; + } + + return { + latestBlock$, + latestBlockNumber$, + blockLogs$, + blockStorageOperations$, + waitForTransaction, + destroy: (): void => { + sub.unsubscribe(); + }, + }; +} diff --git a/packages/store-sync/src/trpc-indexer/common.ts b/packages/store-sync/src/trpc-indexer/common.ts index 0e71ec464a..c7da106053 100644 --- a/packages/store-sync/src/trpc-indexer/common.ts +++ b/packages/store-sync/src/trpc-indexer/common.ts @@ -5,5 +5,5 @@ import type { Table } from "../common"; export type TableWithRecords = Table & { records: TableRecord[] }; export type StorageAdapter = { - findAll: (chainId: number, address: Hex) => Promise<{ blockNumber: bigint | null; tables: TableWithRecords[] }>; + findAll: (chainId: number, address?: Hex) => Promise<{ blockNumber: bigint | null; tables: TableWithRecords[] }>; }; diff --git a/packages/store-sync/src/trpc-indexer/createAppRouter.ts b/packages/store-sync/src/trpc-indexer/createAppRouter.ts index 2065d6a948..2e91c0c3cf 100644 --- a/packages/store-sync/src/trpc-indexer/createAppRouter.ts +++ b/packages/store-sync/src/trpc-indexer/createAppRouter.ts @@ -15,7 +15,7 @@ export function createAppRouter() { .input( z.object({ chainId: z.number(), - address: z.string().refine(isHex), + address: z.string().refine(isHex).optional(), }) ) .query(async (opts): ReturnType => { From 85951bb4b91dbab411c2c1270d57e15da86b0b3a Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Wed, 2 Aug 2023 18:41:08 +0100 Subject: [PATCH 02/17] move indexer to syncToSqlite --- packages/store-indexer/bin/sqlite-indexer.ts | 5 +- .../store-indexer/src/sqlite/createIndexer.ts | 83 ------------------- packages/store-sync/src/common.ts | 2 +- packages/store-sync/src/sqlite/index.ts | 1 + 4 files changed, 4 insertions(+), 87 deletions(-) delete mode 100644 packages/store-indexer/src/sqlite/createIndexer.ts diff --git a/packages/store-indexer/bin/sqlite-indexer.ts b/packages/store-indexer/bin/sqlite-indexer.ts index be108dfe76..cc53d7d3d4 100644 --- a/packages/store-indexer/bin/sqlite-indexer.ts +++ b/packages/store-indexer/bin/sqlite-indexer.ts @@ -7,8 +7,7 @@ import Database from "better-sqlite3"; import { createPublicClient, fallback, webSocket, http } from "viem"; import { createHTTPServer } from "@trpc/server/adapters/standalone"; import { createAppRouter } from "@latticexyz/store-sync/trpc-indexer"; -import { chainState, schemaVersion } from "@latticexyz/store-sync/sqlite"; -import { createIndexer } from "../src/sqlite/createIndexer"; +import { chainState, schemaVersion, syncToSqlite } from "@latticexyz/store-sync/sqlite"; import { createStorageAdapter } from "../src/sqlite/createStorageAdapter"; import type { Chain } from "viem/chains"; import * as mudChains from "@latticexyz/common/chains"; @@ -70,7 +69,7 @@ try { // ignore errors, this is optional } -createIndexer({ +syncToSqlite({ database, publicClient, startBlock, diff --git a/packages/store-indexer/src/sqlite/createIndexer.ts b/packages/store-indexer/src/sqlite/createIndexer.ts deleted file mode 100644 index 42dc0cff2a..0000000000 --- a/packages/store-indexer/src/sqlite/createIndexer.ts +++ /dev/null @@ -1,83 +0,0 @@ -import { Chain, PublicClient, Transport } from "viem"; -import { - createBlockStream, - isNonPendingBlock, - blockRangeToLogs, - groupLogsByBlockNumber, -} from "@latticexyz/block-logs-stream"; -import { concatMap, filter, from, map, mergeMap, tap } from "rxjs"; -import { storeEventsAbi } from "@latticexyz/store"; -import { blockLogsToStorage } from "@latticexyz/store-sync"; -import { sqliteStorage } from "@latticexyz/store-sync/sqlite"; -import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; -import { debug } from "../debug"; - -type CreateIndexerOptions = { - /** - * [SQLite database object from Drizzle][0]. - * - * [0]: https://orm.drizzle.team/docs/installation-and-db-connection/sqlite/better-sqlite3 - */ - database: BaseSQLiteDatabase<"sync", any>; - /** - * [viem `PublicClient`][0] used for fetching logs from the RPC. - * - * [0]: https://viem.sh/docs/clients/public.html - */ - publicClient: PublicClient; - /** - * Optional block number to start indexing from. Useful for resuming the indexer from a particular point in time or starting after a particular contract deployment. - */ - startBlock?: bigint; - /** - * Optional maximum block range, if your RPC limits the amount of blocks fetched at a time. - */ - maxBlockRange?: bigint; -}; - -/** - * Creates an indexer to process and store blockchain events. - * - * @param {CreateIndexerOptions} options See `CreateIndexerOptions`. - * @returns A function to unsubscribe from the block stream, effectively stopping the indexer. - */ -export function createIndexer({ - database, - publicClient, - startBlock = 0n, - maxBlockRange, -}: CreateIndexerOptions): () => void { - const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }); - - const latestBlockNumber$ = latestBlock$.pipe( - filter(isNonPendingBlock), - map((block) => block.number) - ); - - const blockLogs$ = latestBlockNumber$.pipe( - tap((latestBlockNumber) => debug("latest block number", latestBlockNumber)), - map((latestBlockNumber) => ({ startBlock, endBlock: latestBlockNumber })), - blockRangeToLogs({ - publicClient, - events: storeEventsAbi, - maxBlockRange, - }), - tap(({ fromBlock, toBlock, logs }) => { - debug("found", logs.length, "logs for block", fromBlock, "-", toBlock); - }), - mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))) - ); - - const sub = blockLogs$ - .pipe( - concatMap(blockLogsToStorage(sqliteStorage({ database, publicClient }))), - tap(({ blockNumber, operations }) => { - debug("stored", operations.length, "operations for block", blockNumber); - }) - ) - .subscribe(); - - return () => { - sub.unsubscribe(); - }; -} diff --git a/packages/store-sync/src/common.ts b/packages/store-sync/src/common.ts index eb718a9b06..97e96a74e6 100644 --- a/packages/store-sync/src/common.ts +++ b/packages/store-sync/src/common.ts @@ -78,7 +78,7 @@ export type SyncOptions = { /** * MUD config */ - config: TConfig; + config?: TConfig; /** * [viem `PublicClient`][0] used for fetching logs from the RPC. * diff --git a/packages/store-sync/src/sqlite/index.ts b/packages/store-sync/src/sqlite/index.ts index 9e82168c3f..bcfd3948ad 100644 --- a/packages/store-sync/src/sqlite/index.ts +++ b/packages/store-sync/src/sqlite/index.ts @@ -3,3 +3,4 @@ export * from "./getTables"; export * from "./internalTables"; export * from "./schemaVersion"; export * from "./sqliteStorage"; +export * from "./syncToSqlite"; From f296ff251b51a8aa18698b5a0c68d6e5625d9d32 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Thu, 3 Aug 2023 09:14:20 +0100 Subject: [PATCH 03/17] wip start moving out shared logic --- packages/store-sync/src/startSync.ts | 121 +++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 packages/store-sync/src/startSync.ts diff --git a/packages/store-sync/src/startSync.ts b/packages/store-sync/src/startSync.ts new file mode 100644 index 0000000000..1f3de14968 --- /dev/null +++ b/packages/store-sync/src/startSync.ts @@ -0,0 +1,121 @@ +import { StoreConfig, storeEventsAbi } from "@latticexyz/store"; +import { Hex, TransactionReceipt } from "viem"; +import { SyncOptions, SyncResult } from "./common"; +import { + createBlockStream, + isNonPendingBlock, + blockRangeToLogs, + groupLogsByBlockNumber, +} from "@latticexyz/block-logs-stream"; +import { filter, map, tap, mergeMap, from, concatMap, share, firstValueFrom } from "rxjs"; +import { blockLogsToStorage } from "./blockLogsToStorage"; +import { debug } from "./debug"; +import { createIndexerClient } from "./trpc-indexer"; +import { BlockLogsToStorageOptions } from "./blockLogsToStorage"; + +type StartSyncOptions = SyncOptions & { + storageAdapter: BlockLogsToStorageOptions; +}; + +type StartSyncResult = SyncResult; + +export async function startSync({ + storageAdapter, + config, + address, + publicClient, + startBlock = 0n, + maxBlockRange, + initialState, + indexerUrl, +}: StartSyncOptions): Promise> { + if (indexerUrl != null && initialState == null) { + const indexer = createIndexerClient({ url: indexerUrl }); + try { + initialState = await indexer.findAll.query({ + chainId: publicClient.chain.id, + address, + }); + } catch (error) { + debug("couldn't get initial state from indexer", error); + } + } + + if (initialState != null && initialState.blockNumber != null) { + debug("hydrating from initial state to block", initialState.blockNumber); + startBlock = initialState.blockNumber + 1n; + + // TODO: call storage adapter with initial state? + } + + // TODO: if startBlock is still 0, find via deploy event + + debug("starting sync from block", startBlock); + + const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }).pipe(share()); + + const latestBlockNumber$ = latestBlock$.pipe( + filter(isNonPendingBlock), + map((block) => block.number), + share() + ); + + let latestBlockNumber: bigint | null = null; + const blockLogs$ = latestBlockNumber$.pipe( + tap((blockNumber) => { + debug("latest block number", blockNumber); + latestBlockNumber = blockNumber; + }), + map((blockNumber) => ({ startBlock, endBlock: blockNumber })), + blockRangeToLogs({ + publicClient, + address, + events: storeEventsAbi, + maxBlockRange, + }), + mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))), + share() + ); + + let lastBlockNumberProcessed: bigint | null = null; + const blockStorageOperations$ = blockLogs$.pipe( + concatMap(blockLogsToStorage(storageAdapter)), + tap(({ blockNumber, operations }) => { + debug("stored", operations.length, "operations for block", blockNumber); + lastBlockNumberProcessed = blockNumber; + }), + share() + ); + + // Start the sync + const sub = blockStorageOperations$.subscribe(); + + async function waitForTransaction(tx: Hex): Promise<{ + receipt: TransactionReceipt; + }> { + // Wait for tx to be mined + const receipt = await publicClient.waitForTransactionReceipt({ hash: tx }); + + // If we haven't processed a block yet or we haven't processed the block for the tx, wait for it + if (lastBlockNumberProcessed == null || lastBlockNumberProcessed < receipt.blockNumber) { + await firstValueFrom( + blockStorageOperations$.pipe( + filter(({ blockNumber }) => blockNumber != null && blockNumber >= receipt.blockNumber) + ) + ); + } + + return { receipt }; + } + + return { + latestBlock$, + latestBlockNumber$, + blockLogs$, + blockStorageOperations$, + waitForTransaction, + destroy: (): void => { + sub.unsubscribe(); + }, + }; +} From dc2f5faed2af2fb4977f5b9fb378184f0589bd59 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Thu, 3 Aug 2023 16:36:09 +0100 Subject: [PATCH 04/17] wip refactor --- packages/store-sync/src/SyncStep.ts | 6 + packages/store-sync/src/common.ts | 1 - packages/store-sync/src/createStoreSync.ts | 183 ++++++++++++++++++ packages/store-sync/src/index.ts | 1 + packages/store-sync/src/recs/common.ts | 7 - packages/store-sync/src/recs/syncToRecs.ts | 133 ++++--------- .../store-sync/src/sqlite/syncToSqlite.ts | 99 ++-------- packages/store-sync/src/startSync.ts | 121 ------------ 8 files changed, 253 insertions(+), 298 deletions(-) create mode 100644 packages/store-sync/src/SyncStep.ts create mode 100644 packages/store-sync/src/createStoreSync.ts delete mode 100644 packages/store-sync/src/startSync.ts diff --git a/packages/store-sync/src/SyncStep.ts b/packages/store-sync/src/SyncStep.ts new file mode 100644 index 0000000000..be0ce83c5b --- /dev/null +++ b/packages/store-sync/src/SyncStep.ts @@ -0,0 +1,6 @@ +export enum SyncStep { + INITIALIZE = "initialize", + SNAPSHOT = "snapshot", + RPC = "rpc", + LIVE = "live", +} diff --git a/packages/store-sync/src/common.ts b/packages/store-sync/src/common.ts index 97e96a74e6..f6412b82bc 100644 --- a/packages/store-sync/src/common.ts +++ b/packages/store-sync/src/common.ts @@ -116,5 +116,4 @@ export type SyncResult = { blockLogs$: Observable; blockStorageOperations$: Observable>; waitForTransaction: (tx: Hex) => Promise<{ receipt: TransactionReceipt }>; - destroy: () => void; }; diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts new file mode 100644 index 0000000000..03eab4fabd --- /dev/null +++ b/packages/store-sync/src/createStoreSync.ts @@ -0,0 +1,183 @@ +import { StoreConfig, storeEventsAbi } from "@latticexyz/store"; +import { Hex, TransactionReceipt } from "viem"; +import { SetRecordOperation, SyncOptions, SyncResult } from "./common"; +import { + createBlockStream, + isNonPendingBlock, + blockRangeToLogs, + groupLogsByBlockNumber, +} from "@latticexyz/block-logs-stream"; +import { filter, map, tap, mergeMap, from, concatMap, share, firstValueFrom } from "rxjs"; +import { blockLogsToStorage } from "./blockLogsToStorage"; +import { debug } from "./debug"; +import { createIndexerClient } from "./trpc-indexer"; +import { BlockLogsToStorageOptions } from "./blockLogsToStorage"; +import { SyncStep } from "./SyncStep"; + +type CreateStoreSyncOptions = SyncOptions & { + storageAdapter: BlockLogsToStorageOptions; + onProgress?: (opts: { + step: SyncStep; + percentage: number; + latestBlockNumber: bigint; + lastBlockNumberProcessed: bigint; + }) => void; +}; + +type CreateStoreSyncResult = SyncResult; + +export async function createStoreSync({ + storageAdapter, + onProgress, + config, + address, + publicClient, + startBlock = 0n, + maxBlockRange, + initialState, + indexerUrl, +}: CreateStoreSyncOptions): Promise> { + if (indexerUrl != null && initialState == null) { + const indexer = createIndexerClient({ url: indexerUrl }); + try { + initialState = await indexer.findAll.query({ + chainId: publicClient.chain.id, + address, + }); + } catch (error) { + debug("couldn't get initial state from indexer", error); + } + } + + if (initialState != null) { + const { blockNumber, tables } = initialState; + if (blockNumber != null) { + debug("hydrating from initial state to block", initialState.blockNumber); + startBlock = blockNumber + 1n; + + await storageAdapter.registerTables({ blockNumber, tables }); + + const numRecords = initialState.tables.reduce((sum, table) => sum + table.records.length, 0); + const recordsPerProgressUpdate = Math.floor(numRecords / 100); + let recordsProcessed = 0; + let recordsProcessedSinceLastUpdate = 0; + + for (const table of initialState.tables) { + await storageAdapter.storeOperations({ + blockNumber, + operations: table.records.map( + (record) => + ({ + type: "SetRecord", + namespace: table.namespace, + name: table.name, + key: record.key, + value: record.value, + // TODO: refactor to not depend on log + log: {} as any, + } as SetRecordOperation) + ), + }); + + recordsProcessed += table.records.length; + recordsProcessedSinceLastUpdate += table.records.length; + + if (recordsProcessedSinceLastUpdate > recordsPerProgressUpdate) { + onProgress?.({ + step: SyncStep.SNAPSHOT, + percentage: (recordsProcessed / numRecords) * 100, + latestBlockNumber: 0n, + lastBlockNumberProcessed: blockNumber, + }); + recordsProcessedSinceLastUpdate = 0; + } + + debug(`hydrated ${table.records.length} records for table ${table.namespace}:${table.name}`); + } + } + } + + // TODO: if startBlock is still 0, find via deploy event + + debug("starting sync from block", startBlock); + + const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }).pipe(share()); + + const latestBlockNumber$ = latestBlock$.pipe( + filter(isNonPendingBlock), + map((block) => block.number), + share() + ); + + let latestBlockNumber: bigint | null = null; + const blockLogs$ = latestBlockNumber$.pipe( + tap((blockNumber) => { + debug("latest block number", blockNumber); + latestBlockNumber = blockNumber; + }), + map((blockNumber) => ({ startBlock, endBlock: blockNumber })), + blockRangeToLogs({ + publicClient, + address, + events: storeEventsAbi, + maxBlockRange, + }), + mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))), + share() + ); + + let lastBlockNumberProcessed: bigint | null = null; + const blockStorageOperations$ = blockLogs$.pipe( + concatMap(blockLogsToStorage(storageAdapter)), + tap(({ blockNumber, operations }) => { + debug("stored", operations.length, "operations for block", blockNumber); + lastBlockNumberProcessed = blockNumber; + + // if ( + // latestBlockNumber != null && + // getComponentValue(components.SyncProgress, singletonEntity)?.step !== SyncStep.LIVE + // ) { + // if (blockNumber < latestBlockNumber) { + // setComponent(components.SyncProgress, singletonEntity, { + // step: SyncStep.RPC, + // message: `Hydrating from RPC to block ${latestBlockNumber}`, + // percentage: (Number(blockNumber) / Number(latestBlockNumber)) * 100, + // }); + // } else { + // setComponent(components.SyncProgress, singletonEntity, { + // step: SyncStep.LIVE, + // message: `All caught up!`, + // percentage: 100, + // }); + // } + // } + }), + share() + ); + + async function waitForTransaction(tx: Hex): Promise<{ + receipt: TransactionReceipt; + }> { + // Wait for tx to be mined + const receipt = await publicClient.waitForTransactionReceipt({ hash: tx }); + + // If we haven't processed a block yet or we haven't processed the block for the tx, wait for it + if (lastBlockNumberProcessed == null || lastBlockNumberProcessed < receipt.blockNumber) { + await firstValueFrom( + blockStorageOperations$.pipe( + filter(({ blockNumber }) => blockNumber != null && blockNumber >= receipt.blockNumber) + ) + ); + } + + return { receipt }; + } + + return { + latestBlock$, + latestBlockNumber$, + blockLogs$, + blockStorageOperations$, + waitForTransaction, + }; +} diff --git a/packages/store-sync/src/index.ts b/packages/store-sync/src/index.ts index a94b166d1b..a56300146c 100644 --- a/packages/store-sync/src/index.ts +++ b/packages/store-sync/src/index.ts @@ -1,2 +1,3 @@ export * from "./blockLogsToStorage"; export * from "./common"; +export * from "./SyncStep"; diff --git a/packages/store-sync/src/recs/common.ts b/packages/store-sync/src/recs/common.ts index 77327bc997..2c142431ba 100644 --- a/packages/store-sync/src/recs/common.ts +++ b/packages/store-sync/src/recs/common.ts @@ -4,10 +4,3 @@ export type StoreComponentMetadata = { keySchema: KeySchema; valueSchema: ValueSchema; }; - -export enum SyncStep { - INITIALIZE = "initialize", - SNAPSHOT = "snapshot", - RPC = "rpc", - LIVE = "live", -} diff --git a/packages/store-sync/src/recs/syncToRecs.ts b/packages/store-sync/src/recs/syncToRecs.ts index b04ddf5784..a719f63292 100644 --- a/packages/store-sync/src/recs/syncToRecs.ts +++ b/packages/store-sync/src/recs/syncToRecs.ts @@ -1,5 +1,4 @@ -import { StoreConfig, storeEventsAbi } from "@latticexyz/store"; -import { Hex, TransactionReceipt } from "viem"; +import { StoreConfig } from "@latticexyz/store"; import { ComponentValue, Entity, @@ -10,22 +9,16 @@ import { setComponent, } from "@latticexyz/recs"; import { SyncOptions, SyncResult } from "../common"; -import { - createBlockStream, - isNonPendingBlock, - blockRangeToLogs, - groupLogsByBlockNumber, -} from "@latticexyz/block-logs-stream"; -import { filter, map, tap, mergeMap, from, concatMap, share, firstValueFrom } from "rxjs"; -import { blockLogsToStorage } from "../blockLogsToStorage"; import { recsStorage } from "./recsStorage"; import { hexKeyTupleToEntity } from "./hexKeyTupleToEntity"; import { debug } from "./debug"; import { defineInternalComponents } from "./defineInternalComponents"; import { getTableKey } from "./getTableKey"; -import { StoreComponentMetadata, SyncStep } from "./common"; +import { StoreComponentMetadata } from "./common"; +import { SyncStep } from "../SyncStep"; import { encodeEntity } from "./encodeEntity"; import { createIndexerClient } from "../trpc-indexer"; +import { createStoreSync } from "../createStoreSync"; type SyncToRecsOptions< TConfig extends StoreConfig = StoreConfig, @@ -133,96 +126,56 @@ export async function syncToRecs< }); } - // TODO: if startBlock is still 0, find via deploy event - - debug("starting sync from block", startBlock); - - const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }).pipe(share()); - - const latestBlockNumber$ = latestBlock$.pipe( - filter(isNonPendingBlock), - map((block) => block.number), - share() - ); + const storeSync = await createStoreSync({ + storageAdapter: recsStorage({ components, config }), + config, + address, + publicClient, + startBlock, + maxBlockRange, + indexerUrl, + initialState, + }); let latestBlockNumber: bigint | null = null; - const blockLogs$ = latestBlockNumber$.pipe( - tap((blockNumber) => { - debug("latest block number", blockNumber); + { + const sub = storeSync.latestBlockNumber$.subscribe((blockNumber) => { latestBlockNumber = blockNumber; - }), - map((blockNumber) => ({ startBlock, endBlock: blockNumber })), - blockRangeToLogs({ - publicClient, - address, - events: storeEventsAbi, - maxBlockRange, - }), - mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))), - share() - ); + }); + world.registerDisposer(sub.unsubscribe); + } + // Start the sync let lastBlockNumberProcessed: bigint | null = null; - const blockStorageOperations$ = blockLogs$.pipe( - concatMap(blockLogsToStorage(recsStorage({ components, config }))), - tap(({ blockNumber, operations }) => { - debug("stored", operations.length, "operations for block", blockNumber); - lastBlockNumberProcessed = blockNumber; - - if ( - latestBlockNumber != null && - getComponentValue(components.SyncProgress, singletonEntity)?.step !== SyncStep.LIVE - ) { - if (blockNumber < latestBlockNumber) { - setComponent(components.SyncProgress, singletonEntity, { - step: SyncStep.RPC, - message: `Hydrating from RPC to block ${latestBlockNumber}`, - percentage: (Number(blockNumber) / Number(latestBlockNumber)) * 100, - }); - } else { - setComponent(components.SyncProgress, singletonEntity, { - step: SyncStep.LIVE, - message: `All caught up!`, - percentage: 100, - }); - } + const sub = storeSync.blockStorageOperations$.subscribe(({ blockNumber, operations }) => { + debug("stored", operations.length, "operations for block", blockNumber); + lastBlockNumberProcessed = blockNumber; + + if ( + latestBlockNumber != null && + getComponentValue(components.SyncProgress, singletonEntity)?.step !== SyncStep.LIVE + ) { + if (blockNumber < latestBlockNumber) { + setComponent(components.SyncProgress, singletonEntity, { + step: SyncStep.RPC, + message: `Hydrating from RPC to block ${latestBlockNumber}`, + percentage: (Number(blockNumber) / Number(latestBlockNumber)) * 100, + }); + } else { + setComponent(components.SyncProgress, singletonEntity, { + step: SyncStep.LIVE, + message: `All caught up!`, + percentage: 100, + }); } - }), - share() - ); - - // Start the sync - const sub = blockStorageOperations$.subscribe(); - world.registerDisposer(() => sub.unsubscribe()); - - async function waitForTransaction(tx: Hex): Promise<{ - receipt: TransactionReceipt; - }> { - // Wait for tx to be mined - const receipt = await publicClient.waitForTransactionReceipt({ hash: tx }); - - // If we haven't processed a block yet or we haven't processed the block for the tx, wait for it - if (lastBlockNumberProcessed == null || lastBlockNumberProcessed < receipt.blockNumber) { - await firstValueFrom( - blockStorageOperations$.pipe( - filter(({ blockNumber }) => blockNumber != null && blockNumber >= receipt.blockNumber) - ) - ); } + }); - return { receipt }; - } + world.registerDisposer(sub.unsubscribe); return { + ...storeSync, components, singletonEntity, - latestBlock$, - latestBlockNumber$, - blockLogs$, - blockStorageOperations$, - waitForTransaction, - destroy: (): void => { - world.dispose(); - }, }; } diff --git a/packages/store-sync/src/sqlite/syncToSqlite.ts b/packages/store-sync/src/sqlite/syncToSqlite.ts index ae95618d8d..808d28e8af 100644 --- a/packages/store-sync/src/sqlite/syncToSqlite.ts +++ b/packages/store-sync/src/sqlite/syncToSqlite.ts @@ -1,17 +1,8 @@ -import { Hex, TransactionReceipt } from "viem"; -import { - createBlockStream, - isNonPendingBlock, - blockRangeToLogs, - groupLogsByBlockNumber, -} from "@latticexyz/block-logs-stream"; -import { concatMap, filter, firstValueFrom, from, map, mergeMap, share, tap } from "rxjs"; -import { StoreConfig, storeEventsAbi } from "@latticexyz/store"; +import { StoreConfig } from "@latticexyz/store"; import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; -import { debug } from "../debug"; import { SyncOptions, SyncResult } from "../common"; -import { blockLogsToStorage } from "../blockLogsToStorage"; import { sqliteStorage } from "./sqliteStorage"; +import { createStoreSync } from "../createStoreSync"; type SyncToSqliteOptions = SyncOptions & { /** @@ -22,7 +13,9 @@ type SyncToSqliteOptions = SyncOption database: BaseSQLiteDatabase<"sync", any>; }; -type SyncToSqliteResult = SyncResult; +type SyncToSqliteResult = SyncResult & { + destroy: () => void; +}; /** * Creates an indexer to process and store blockchain events. @@ -30,7 +23,8 @@ type SyncToSqliteResult = SyncResult< * @param {CreateIndexerOptions} options See `CreateIndexerOptions`. * @returns A function to unsubscribe from the block stream, effectively stopping the indexer. */ -export function syncToSqlite({ +export async function syncToSqlite({ + config, database, publicClient, address, @@ -38,76 +32,23 @@ export function syncToSqlite({ maxBlockRange, indexerUrl, initialState, -}: SyncToSqliteOptions): SyncToSqliteResult { - // TODO: sync initial state - - const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }).pipe(share()); - - const latestBlockNumber$ = latestBlock$.pipe( - filter(isNonPendingBlock), - map((block) => block.number), - share() - ); - - let latestBlockNumber: bigint | null = null; - const blockLogs$ = latestBlockNumber$.pipe( - tap((blockNumber) => { - debug("latest block number", blockNumber); - latestBlockNumber = blockNumber; - }), - map((blockNumber) => ({ startBlock, endBlock: blockNumber })), - blockRangeToLogs({ - publicClient, - address, - events: storeEventsAbi, - maxBlockRange, - }), - tap(({ fromBlock, toBlock, logs }) => { - debug("found", logs.length, "logs for block", fromBlock, "-", toBlock); - }), - mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))), - share() - ); - - let lastBlockNumberProcessed: bigint | null = null; - const blockStorageOperations$ = blockLogs$.pipe( - concatMap(blockLogsToStorage(sqliteStorage({ database, publicClient }))), - tap(({ blockNumber, operations }) => { - debug("stored", operations.length, "operations for block", blockNumber); - lastBlockNumberProcessed = blockNumber; - - // TODO: store some notion of sync progress? - }), - share() - ); +}: SyncToSqliteOptions): Promise> { + const storeSync = await createStoreSync({ + storageAdapter: sqliteStorage({ database, publicClient }), + config, + address, + publicClient, + startBlock, + maxBlockRange, + indexerUrl, + initialState, + }); // Start the sync - const sub = blockStorageOperations$.subscribe(); - - async function waitForTransaction(tx: Hex): Promise<{ - receipt: TransactionReceipt; - }> { - // Wait for tx to be mined - const receipt = await publicClient.waitForTransactionReceipt({ hash: tx }); - - // If we haven't processed a block yet or we haven't processed the block for the tx, wait for it - if (lastBlockNumberProcessed == null || lastBlockNumberProcessed < receipt.blockNumber) { - await firstValueFrom( - blockStorageOperations$.pipe( - filter(({ blockNumber }) => blockNumber != null && blockNumber >= receipt.blockNumber) - ) - ); - } - - return { receipt }; - } + const sub = storeSync.blockStorageOperations$.subscribe(); return { - latestBlock$, - latestBlockNumber$, - blockLogs$, - blockStorageOperations$, - waitForTransaction, + ...storeSync, destroy: (): void => { sub.unsubscribe(); }, diff --git a/packages/store-sync/src/startSync.ts b/packages/store-sync/src/startSync.ts deleted file mode 100644 index 1f3de14968..0000000000 --- a/packages/store-sync/src/startSync.ts +++ /dev/null @@ -1,121 +0,0 @@ -import { StoreConfig, storeEventsAbi } from "@latticexyz/store"; -import { Hex, TransactionReceipt } from "viem"; -import { SyncOptions, SyncResult } from "./common"; -import { - createBlockStream, - isNonPendingBlock, - blockRangeToLogs, - groupLogsByBlockNumber, -} from "@latticexyz/block-logs-stream"; -import { filter, map, tap, mergeMap, from, concatMap, share, firstValueFrom } from "rxjs"; -import { blockLogsToStorage } from "./blockLogsToStorage"; -import { debug } from "./debug"; -import { createIndexerClient } from "./trpc-indexer"; -import { BlockLogsToStorageOptions } from "./blockLogsToStorage"; - -type StartSyncOptions = SyncOptions & { - storageAdapter: BlockLogsToStorageOptions; -}; - -type StartSyncResult = SyncResult; - -export async function startSync({ - storageAdapter, - config, - address, - publicClient, - startBlock = 0n, - maxBlockRange, - initialState, - indexerUrl, -}: StartSyncOptions): Promise> { - if (indexerUrl != null && initialState == null) { - const indexer = createIndexerClient({ url: indexerUrl }); - try { - initialState = await indexer.findAll.query({ - chainId: publicClient.chain.id, - address, - }); - } catch (error) { - debug("couldn't get initial state from indexer", error); - } - } - - if (initialState != null && initialState.blockNumber != null) { - debug("hydrating from initial state to block", initialState.blockNumber); - startBlock = initialState.blockNumber + 1n; - - // TODO: call storage adapter with initial state? - } - - // TODO: if startBlock is still 0, find via deploy event - - debug("starting sync from block", startBlock); - - const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }).pipe(share()); - - const latestBlockNumber$ = latestBlock$.pipe( - filter(isNonPendingBlock), - map((block) => block.number), - share() - ); - - let latestBlockNumber: bigint | null = null; - const blockLogs$ = latestBlockNumber$.pipe( - tap((blockNumber) => { - debug("latest block number", blockNumber); - latestBlockNumber = blockNumber; - }), - map((blockNumber) => ({ startBlock, endBlock: blockNumber })), - blockRangeToLogs({ - publicClient, - address, - events: storeEventsAbi, - maxBlockRange, - }), - mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))), - share() - ); - - let lastBlockNumberProcessed: bigint | null = null; - const blockStorageOperations$ = blockLogs$.pipe( - concatMap(blockLogsToStorage(storageAdapter)), - tap(({ blockNumber, operations }) => { - debug("stored", operations.length, "operations for block", blockNumber); - lastBlockNumberProcessed = blockNumber; - }), - share() - ); - - // Start the sync - const sub = blockStorageOperations$.subscribe(); - - async function waitForTransaction(tx: Hex): Promise<{ - receipt: TransactionReceipt; - }> { - // Wait for tx to be mined - const receipt = await publicClient.waitForTransactionReceipt({ hash: tx }); - - // If we haven't processed a block yet or we haven't processed the block for the tx, wait for it - if (lastBlockNumberProcessed == null || lastBlockNumberProcessed < receipt.blockNumber) { - await firstValueFrom( - blockStorageOperations$.pipe( - filter(({ blockNumber }) => blockNumber != null && blockNumber >= receipt.blockNumber) - ) - ); - } - - return { receipt }; - } - - return { - latestBlock$, - latestBlockNumber$, - blockLogs$, - blockStorageOperations$, - waitForTransaction, - destroy: (): void => { - sub.unsubscribe(); - }, - }; -} From 0f715890458f59ce9593c392e89876a215d2fe98 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Thu, 3 Aug 2023 21:18:17 +0100 Subject: [PATCH 05/17] it works --- packages/store-sync/src/sqlite/syncToSqlite.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/store-sync/src/sqlite/syncToSqlite.ts b/packages/store-sync/src/sqlite/syncToSqlite.ts index a27aa3c889..ec27bd3f96 100644 --- a/packages/store-sync/src/sqlite/syncToSqlite.ts +++ b/packages/store-sync/src/sqlite/syncToSqlite.ts @@ -34,7 +34,7 @@ export async function syncToSqlite({ initialState, }: SyncToSqliteOptions): Promise> { const storeSync = await createStoreSync({ - storageAdapter: await sqliteStorage({ database, publicClient }), + storageAdapter: await sqliteStorage({ database, publicClient, config }), config, address, publicClient, From a1340d01a40e77074f0116e5def8330adae66c97 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 14 Aug 2023 14:57:28 +0100 Subject: [PATCH 06/17] move TableWithRecords --- packages/store-sync/src/common.ts | 4 +++- packages/store-sync/src/trpc-indexer/common.ts | 13 ++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/packages/store-sync/src/common.ts b/packages/store-sync/src/common.ts index fe109ff0dd..4892733b13 100644 --- a/packages/store-sync/src/common.ts +++ b/packages/store-sync/src/common.ts @@ -27,6 +27,8 @@ export type Table = { valueSchema: ValueSchema; }; +export type TableWithRecords = Table & { records: TableRecord[] }; + export type StoreEventsLog = GetLogsResult[number]; export type BlockLogs = GroupLogsByBlockNumberResult[number]; @@ -106,7 +108,7 @@ export type SyncOptions = { */ initialState?: { blockNumber: bigint | null; - tables: (Table & { records: TableRecord[] })[]; + tables: TableWithRecords[]; }; }; diff --git a/packages/store-sync/src/trpc-indexer/common.ts b/packages/store-sync/src/trpc-indexer/common.ts index c7da106053..7089b74b0b 100644 --- a/packages/store-sync/src/trpc-indexer/common.ts +++ b/packages/store-sync/src/trpc-indexer/common.ts @@ -1,9 +1,12 @@ import { Hex } from "viem"; -import type { TableRecord } from "@latticexyz/store"; -import type { Table } from "../common"; - -export type TableWithRecords = Table & { records: TableRecord[] }; +import { TableWithRecords } from "../common"; export type StorageAdapter = { - findAll: (chainId: number, address?: Hex) => Promise<{ blockNumber: bigint | null; tables: TableWithRecords[] }>; + findAll: ( + chainId: number, + address?: Hex + ) => Promise<{ + blockNumber: bigint | null; + tables: TableWithRecords[]; + }>; }; From 96cc55e57b0f65766faedcf27c3c4e10805440d2 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 14 Aug 2023 15:45:18 +0100 Subject: [PATCH 07/17] finish recs progress, make log optional, add id for react keys --- packages/dev-tools/src/events/EventIcon.tsx | 21 ++++---- .../src/events/StorageOperationsTable.tsx | 6 +-- packages/store-sync/src/blockLogsToStorage.ts | 3 ++ packages/store-sync/src/common.ts | 3 +- packages/store-sync/src/createStoreSync.ts | 51 +++++++++---------- .../store-sync/src/recs/syncStepToMessage.ts | 17 +++++++ packages/store-sync/src/recs/syncToRecs.ts | 12 ++++- 7 files changed, 71 insertions(+), 42 deletions(-) create mode 100644 packages/store-sync/src/recs/syncStepToMessage.ts diff --git a/packages/dev-tools/src/events/EventIcon.tsx b/packages/dev-tools/src/events/EventIcon.tsx index 6aba7d871d..8b6aa682e3 100644 --- a/packages/dev-tools/src/events/EventIcon.tsx +++ b/packages/dev-tools/src/events/EventIcon.tsx @@ -1,21 +1,22 @@ import { assertExhaustive } from "@latticexyz/common/utils"; -import { StoreEventsAbiItem } from "@latticexyz/store"; +import { StoreConfig } from "@latticexyz/store"; +import { StorageOperation } from "@latticexyz/store-sync"; type Props = { - eventName: StoreEventsAbiItem["name"]; + type: StorageOperation["type"]; }; -export function EventIcon({ eventName }: Props) { - switch (eventName) { - case "StoreSetRecord": +export function EventIcon({ type }: Props) { + switch (type) { + case "SetRecord": return =; - case "StoreSetField": + case "SetField": return +; - case "StoreDeleteRecord": + case "DeleteRecord": return -; - case "StoreEphemeralRecord": - return ~; + // case "EphemeralRecord": + // return ~; default: - return assertExhaustive(eventName, `Unexpected event name: ${eventName}`); + return assertExhaustive(type, `Unexpected storage operation type: ${type}`); } } diff --git a/packages/dev-tools/src/events/StorageOperationsTable.tsx b/packages/dev-tools/src/events/StorageOperationsTable.tsx index b305fbd779..98d00297fd 100644 --- a/packages/dev-tools/src/events/StorageOperationsTable.tsx +++ b/packages/dev-tools/src/events/StorageOperationsTable.tsx @@ -23,16 +23,16 @@ export function StorageOperationsTable({ operations }: Props) { {operations.map((operation) => ( - + - {operation.log.blockNumber.toString()} + {operation.log?.blockNumber.toString()} {operation.namespace}:{operation.name} {serialize(operation.key)} - + {operation.type === "SetRecord" ? serialize(operation.value) : null} diff --git a/packages/store-sync/src/blockLogsToStorage.ts b/packages/store-sync/src/blockLogsToStorage.ts index 090dba77a3..dee2187715 100644 --- a/packages/store-sync/src/blockLogsToStorage.ts +++ b/packages/store-sync/src/blockLogsToStorage.ts @@ -201,6 +201,7 @@ export function blockLogsToStorage({ // TODO: decide if we should handle ephemeral records separately? // they'll eventually be turned into "events", but unclear if that should translate to client storage operations return { + id: `${log.blockHash}:${log.logIndex}`, log, type: "SetRecord", ...tableId, @@ -216,6 +217,7 @@ export function blockLogsToStorage({ keyof TConfig["tables"] >[typeof fieldName]; return { + id: `${log.blockHash}:${log.logIndex}`, log, type: "SetField", ...tableId, @@ -227,6 +229,7 @@ export function blockLogsToStorage({ if (log.eventName === "StoreDeleteRecord") { return { + id: `${log.blockHash}:${log.logIndex}`, log, type: "DeleteRecord", ...tableId, diff --git a/packages/store-sync/src/common.ts b/packages/store-sync/src/common.ts index 4892733b13..890f688845 100644 --- a/packages/store-sync/src/common.ts +++ b/packages/store-sync/src/common.ts @@ -33,7 +33,8 @@ export type StoreEventsLog = GetLogsResult[number]; export type BlockLogs = GroupLogsByBlockNumberResult[number]; export type BaseStorageOperation = { - log: NonPendingLog; + id: string; // TODO: better name for this to signal that this is client-only used for things like React keys + log?: NonPendingLog; namespace: TableNamespace; name: TableName; }; diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index 1df1a62572..83b5d3f223 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -1,4 +1,4 @@ -import { StoreConfig, storeEventsAbi } from "@latticexyz/store"; +import { ConfigToKeyPrimitives, ConfigToValuePrimitives, StoreConfig, storeEventsAbi } from "@latticexyz/store"; import { Hex, TransactionReceipt } from "viem"; import { SetRecordOperation, SyncOptions, SyncResult } from "./common"; import { @@ -29,7 +29,6 @@ type CreateStoreSyncResult = SyncResu export async function createStoreSync({ storageAdapter, onProgress, - config, address, publicClient, startBlock = 0n, @@ -64,16 +63,15 @@ export async function createStoreSync await storageAdapter.storeOperations({ blockNumber, operations: table.records.map( - (record) => + (record, i) => ({ + id: `${blockNumber}:${table.namespace}:${table.name}:${i}`, type: "SetRecord", namespace: table.namespace, name: table.name, - key: record.key, - value: record.value, - // TODO: refactor to not depend on log - log: {} as any, - } as SetRecordOperation) + key: record.key as ConfigToKeyPrimitives, + value: record.value as ConfigToValuePrimitives, + } as const satisfies SetRecordOperation) ), }); @@ -81,13 +79,13 @@ export async function createStoreSync recordsProcessedSinceLastUpdate += table.records.length; if (recordsProcessedSinceLastUpdate > recordsPerProgressUpdate) { + recordsProcessedSinceLastUpdate = 0; onProgress?.({ step: SyncStep.SNAPSHOT, percentage: (recordsProcessed / numRecords) * 100, latestBlockNumber: 0n, lastBlockNumberProcessed: blockNumber, }); - recordsProcessedSinceLastUpdate = 0; } debug(`hydrated ${table.records.length} records for table ${table.namespace}:${table.name}`); @@ -131,24 +129,23 @@ export async function createStoreSync debug("stored", operations.length, "operations for block", blockNumber); lastBlockNumberProcessed = blockNumber; - // if ( - // latestBlockNumber != null && - // getComponentValue(components.SyncProgress, singletonEntity)?.step !== SyncStep.LIVE - // ) { - // if (blockNumber < latestBlockNumber) { - // setComponent(components.SyncProgress, singletonEntity, { - // step: SyncStep.RPC, - // message: `Hydrating from RPC to block ${latestBlockNumber}`, - // percentage: (Number(blockNumber) / Number(latestBlockNumber)) * 100, - // }); - // } else { - // setComponent(components.SyncProgress, singletonEntity, { - // step: SyncStep.LIVE, - // message: `All caught up!`, - // percentage: 100, - // }); - // } - // } + if (onProgress != null && latestBlockNumber != null) { + if (blockNumber < latestBlockNumber) { + onProgress({ + step: SyncStep.RPC, + percentage: Number((lastBlockNumberProcessed * 1000n) / (latestBlockNumber * 1000n)) / 100, + latestBlockNumber, + lastBlockNumberProcessed, + }); + } else { + onProgress({ + step: SyncStep.LIVE, + percentage: 100, + latestBlockNumber, + lastBlockNumberProcessed, + }); + } + } }), share() ); diff --git a/packages/store-sync/src/recs/syncStepToMessage.ts b/packages/store-sync/src/recs/syncStepToMessage.ts new file mode 100644 index 0000000000..88e09e7503 --- /dev/null +++ b/packages/store-sync/src/recs/syncStepToMessage.ts @@ -0,0 +1,17 @@ +import { SyncStep } from "../SyncStep"; +import { assertExhaustive } from "@latticexyz/common/utils"; + +export function syncStepToMessage(step: SyncStep): string { + switch (step) { + case SyncStep.INITIALIZE: + return "Connecting"; + case SyncStep.SNAPSHOT: + return "Hydrating from snapshot"; + case SyncStep.RPC: + return "Hydrating from RPC"; + case SyncStep.LIVE: + return "All caught up!"; + default: + assertExhaustive(step); + } +} diff --git a/packages/store-sync/src/recs/syncToRecs.ts b/packages/store-sync/src/recs/syncToRecs.ts index 50993efb12..3fb3cbf102 100644 --- a/packages/store-sync/src/recs/syncToRecs.ts +++ b/packages/store-sync/src/recs/syncToRecs.ts @@ -1,5 +1,5 @@ import { StoreConfig } from "@latticexyz/store"; -import { World as RecsWorld } from "@latticexyz/recs"; +import { World as RecsWorld, setComponent } from "@latticexyz/recs"; import { SyncOptions, SyncResult } from "../common"; import { recsStorage } from "./recsStorage"; import { defineInternalComponents } from "./defineInternalComponents"; @@ -9,6 +9,7 @@ import storeConfig from "@latticexyz/store/mud.config"; import worldConfig from "@latticexyz/world/mud.config"; import { configToRecsComponents } from "./configToRecsComponents"; import { singletonEntity } from "./singletonEntity"; +import { syncStepToMessage } from "./syncStepToMessage"; type SyncToRecsOptions = SyncOptions & { world: RecsWorld; @@ -51,6 +52,15 @@ export async function syncToRecs({ maxBlockRange, indexerUrl, initialState, + onProgress: ({ step, percentage, latestBlockNumber, lastBlockNumberProcessed }) => { + setComponent(components.SyncProgress, singletonEntity, { + step, + percentage, + latestBlockNumber, + lastBlockNumberProcessed, + message: syncStepToMessage(step), + }); + }, }); const sub = storeSync.latestBlockNumber$.subscribe(); From d4cfc7aebceaeec0af5e153cd2d3e2bc64238557 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 14 Aug 2023 15:57:08 +0100 Subject: [PATCH 08/17] move SyncStep import --- .../client-phaser/src/ui/LoadingScreen/LoadingScreen.tsx | 3 ++- .../packages/client/src/ui/LoadingScreen/LoadingScreen.tsx | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/examples/minimal/packages/client-phaser/src/ui/LoadingScreen/LoadingScreen.tsx b/examples/minimal/packages/client-phaser/src/ui/LoadingScreen/LoadingScreen.tsx index ca4cb4d058..3f42736970 100644 --- a/examples/minimal/packages/client-phaser/src/ui/LoadingScreen/LoadingScreen.tsx +++ b/examples/minimal/packages/client-phaser/src/ui/LoadingScreen/LoadingScreen.tsx @@ -4,7 +4,8 @@ import { LoadingBar } from "./LoadingBar"; import { BootScreen } from "./BootScreen"; import { useComponentValue } from "@latticexyz/react"; import { useMUD } from "../../store"; -import { SyncStep, singletonEntity } from "@latticexyz/store-sync/recs"; +import { singletonEntity } from "@latticexyz/store-sync/recs"; +import { SyncStep } from "@latticexyz/store-sync"; export const LoadingScreen = () => { const { diff --git a/templates/phaser/packages/client/src/ui/LoadingScreen/LoadingScreen.tsx b/templates/phaser/packages/client/src/ui/LoadingScreen/LoadingScreen.tsx index ca4cb4d058..3f42736970 100644 --- a/templates/phaser/packages/client/src/ui/LoadingScreen/LoadingScreen.tsx +++ b/templates/phaser/packages/client/src/ui/LoadingScreen/LoadingScreen.tsx @@ -4,7 +4,8 @@ import { LoadingBar } from "./LoadingBar"; import { BootScreen } from "./BootScreen"; import { useComponentValue } from "@latticexyz/react"; import { useMUD } from "../../store"; -import { SyncStep, singletonEntity } from "@latticexyz/store-sync/recs"; +import { singletonEntity } from "@latticexyz/store-sync/recs"; +import { SyncStep } from "@latticexyz/store-sync"; export const LoadingScreen = () => { const { From fb5e48136fdbd82fee3fe20abf99575469c7988d Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 14 Aug 2023 15:57:55 +0100 Subject: [PATCH 09/17] update snapshot --- packages/store-sync/src/blockLogsToStorage.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/store-sync/src/blockLogsToStorage.test.ts b/packages/store-sync/src/blockLogsToStorage.test.ts index 9e93b24ffe..c76841ff31 100644 --- a/packages/store-sync/src/blockLogsToStorage.test.ts +++ b/packages/store-sync/src/blockLogsToStorage.test.ts @@ -122,6 +122,7 @@ describe("blockLogsToStorage", () => { { "fieldName": "amount", "fieldValue": 8, + "id": "0x03e962e7402b2ab295b92feac342a132111dd14b0d1fd4d4a0456fdc77981577:88", "key": { "item": 1, "itemVariant": 1, @@ -175,6 +176,7 @@ describe("blockLogsToStorage", () => { { "fieldName": "amount", "fieldValue": 8, + "id": "0x03e962e7402b2ab295b92feac342a132111dd14b0d1fd4d4a0456fdc77981577:88", "key": { "item": 1, "itemVariant": 1, From cdc5a6f83b608360cb0eb3271a6bcc3b718b0771 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 14 Aug 2023 16:02:14 +0100 Subject: [PATCH 10/17] add message to assert --- packages/store-sync/src/recs/syncStepToMessage.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/store-sync/src/recs/syncStepToMessage.ts b/packages/store-sync/src/recs/syncStepToMessage.ts index 88e09e7503..8f702c3102 100644 --- a/packages/store-sync/src/recs/syncStepToMessage.ts +++ b/packages/store-sync/src/recs/syncStepToMessage.ts @@ -12,6 +12,6 @@ export function syncStepToMessage(step: SyncStep): string { case SyncStep.LIVE: return "All caught up!"; default: - assertExhaustive(step); + assertExhaustive(step, `Unexpected sync step: ${step}`); } } From 03eb28abc2354756f3642984299ddd1d5190ea10 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 14 Aug 2023 16:13:43 +0100 Subject: [PATCH 11/17] fixes after making log optional --- packages/store-sync/src/common.ts | 1 + packages/store-sync/src/createStoreSync.ts | 1 + packages/store-sync/src/recs/recsStorage.ts | 18 +++++++----------- .../store-sync/src/sqlite/sqliteStorage.ts | 13 +++++++------ 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/packages/store-sync/src/common.ts b/packages/store-sync/src/common.ts index 890f688845..64846b3685 100644 --- a/packages/store-sync/src/common.ts +++ b/packages/store-sync/src/common.ts @@ -35,6 +35,7 @@ export type BlockLogs = GroupLogsByBlockNumberResult[number]; export type BaseStorageOperation = { id: string; // TODO: better name for this to signal that this is client-only used for things like React keys log?: NonPendingLog; + address: Hex; namespace: TableNamespace; name: TableName; }; diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index 83b5d3f223..997e767419 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -67,6 +67,7 @@ export async function createStoreSync ({ id: `${blockNumber}:${table.namespace}:${table.name}:${i}`, type: "SetRecord", + address: table.address, namespace: table.namespace, name: table.name, key: record.key as ConfigToKeyPrimitives, diff --git a/packages/store-sync/src/recs/recsStorage.ts b/packages/store-sync/src/recs/recsStorage.ts index 42fefdaaa6..d0db64f0c2 100644 --- a/packages/store-sync/src/recs/recsStorage.ts +++ b/packages/store-sync/src/recs/recsStorage.ts @@ -12,14 +12,12 @@ import { updateComponent, } from "@latticexyz/recs"; import { isDefined } from "@latticexyz/common/utils"; -import { TableId } from "@latticexyz/common/deprecated"; import { schemaToDefaults } from "../schemaToDefaults"; -import { hexKeyTupleToEntity } from "./hexKeyTupleToEntity"; import { defineInternalComponents } from "./defineInternalComponents"; import { getTableKey } from "./getTableKey"; import { StoreComponentMetadata } from "./common"; - -// TODO: should we create components here from config rather than passing them in? +import { tableIdToHex } from "@latticexyz/common"; +import { encodeEntity } from "./encodeEntity"; export function recsStorage({ components, @@ -52,26 +50,24 @@ export function recsStorage({ const table = getComponentValue( components.TableMetadata, getTableKey({ - address: operation.log.address, + address: operation.address, namespace: operation.namespace, name: operation.name, }) as Entity )?.table; if (!table) { - debug( - `skipping update for unknown table: ${operation.namespace}:${operation.name} at ${operation.log.address}` - ); + debug(`skipping update for unknown table: ${operation.namespace}:${operation.name} at ${operation.address}`); continue; } - const tableId = new TableId(operation.namespace, operation.name).toString(); - const component = componentsByTableId[operation.log.args.table]; + const tableId = tableIdToHex(operation.namespace, operation.name); + const component = componentsByTableId[tableId]; if (!component) { debug(`skipping update for unknown component: ${tableId}. Available components: ${Object.keys(components)}`); continue; } - const entity = hexKeyTupleToEntity(operation.log.args.key); + const entity = encodeEntity(table.keySchema, operation.key); if (operation.type === "SetRecord") { debug("setting component", tableId, entity, operation.value); diff --git a/packages/store-sync/src/sqlite/sqliteStorage.ts b/packages/store-sync/src/sqlite/sqliteStorage.ts index 3c78ba86ee..f5f32a2989 100644 --- a/packages/store-sync/src/sqlite/sqliteStorage.ts +++ b/packages/store-sync/src/sqlite/sqliteStorage.ts @@ -1,4 +1,4 @@ -import { Hex, PublicClient, encodePacked, getAddress } from "viem"; +import { PublicClient, concatHex, encodeAbiParameters, getAddress } from "viem"; import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; import { and, eq, sql } from "drizzle-orm"; import { sqliteTableToSql } from "./sqliteTableToSql"; @@ -76,7 +76,7 @@ export async function sqliteStorage({ new Set( operations.map((operation) => JSON.stringify({ - address: getAddress(operation.log.address), + address: getAddress(operation.address), namespace: operation.namespace, name: operation.name, }) @@ -102,7 +102,7 @@ export async function sqliteStorage({ for (const operation of operations) { const table = tables.find( (table) => - table.address === getAddress(operation.log.address) && + table.address === getAddress(operation.address) && table.namespace === operation.namespace && table.name === operation.name ); @@ -112,9 +112,10 @@ export async function sqliteStorage({ } const sqliteTable = createSqliteTable(table); - const key = encodePacked( - operation.log.args.key.map(() => "bytes32"), - operation.log.args.key as Hex[] + const key = concatHex( + Object.entries(table.keySchema).map(([keyName, type]) => + encodeAbiParameters([{ type }], [operation.key[keyName]]) + ) ); if (operation.type === "SetRecord") { From 30709db3e49e205a4e598319b0abe259d3f8c77f Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 14 Aug 2023 17:24:28 +0100 Subject: [PATCH 12/17] missed a spot --- packages/store-sync/src/blockLogsToStorage.test.ts | 2 ++ packages/store-sync/src/blockLogsToStorage.ts | 3 +++ 2 files changed, 5 insertions(+) diff --git a/packages/store-sync/src/blockLogsToStorage.test.ts b/packages/store-sync/src/blockLogsToStorage.test.ts index c76841ff31..1e69fa54dd 100644 --- a/packages/store-sync/src/blockLogsToStorage.test.ts +++ b/packages/store-sync/src/blockLogsToStorage.test.ts @@ -120,6 +120,7 @@ describe("blockLogsToStorage", () => { "blockNumber": 5448n, "operations": [ { + "address": "0x5FbDB2315678afecb367f032d93F642f64180aa3", "fieldName": "amount", "fieldValue": 8, "id": "0x03e962e7402b2ab295b92feac342a132111dd14b0d1fd4d4a0456fdc77981577:88", @@ -174,6 +175,7 @@ describe("blockLogsToStorage", () => { "blockNumber": 5448n, "operations": [ { + "address": "0x5FbDB2315678afecb367f032d93F642f64180aa3", "fieldName": "amount", "fieldValue": 8, "id": "0x03e962e7402b2ab295b92feac342a132111dd14b0d1fd4d4a0456fdc77981577:88", diff --git a/packages/store-sync/src/blockLogsToStorage.ts b/packages/store-sync/src/blockLogsToStorage.ts index dee2187715..fabc11637e 100644 --- a/packages/store-sync/src/blockLogsToStorage.ts +++ b/packages/store-sync/src/blockLogsToStorage.ts @@ -203,6 +203,7 @@ export function blockLogsToStorage({ return { id: `${log.blockHash}:${log.logIndex}`, log, + address: getAddress(log.address), type: "SetRecord", ...tableId, key, @@ -219,6 +220,7 @@ export function blockLogsToStorage({ return { id: `${log.blockHash}:${log.logIndex}`, log, + address: getAddress(log.address), type: "SetField", ...tableId, key, @@ -231,6 +233,7 @@ export function blockLogsToStorage({ return { id: `${log.blockHash}:${log.logIndex}`, log, + address: getAddress(log.address), type: "DeleteRecord", ...tableId, key, From 2962c4cf180b8a680e63637e1be2f2df6c8bd75a Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Tue, 15 Aug 2023 09:07:21 +0100 Subject: [PATCH 13/17] some fixes --- e2e/packages/client-vanilla/src/mud/setupNetwork.ts | 2 -- packages/store-sync/src/createStoreSync.ts | 4 +++- packages/store-sync/src/recs/syncToRecs.ts | 6 ++++-- packages/store-sync/src/sqlite/syncToSqlite.ts | 2 +- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/e2e/packages/client-vanilla/src/mud/setupNetwork.ts b/e2e/packages/client-vanilla/src/mud/setupNetwork.ts index 4a615e6381..341c3e303d 100644 --- a/e2e/packages/client-vanilla/src/mud/setupNetwork.ts +++ b/e2e/packages/client-vanilla/src/mud/setupNetwork.ts @@ -18,8 +18,6 @@ export async function setupNetwork() { pollingInterval: 1000, } as const satisfies ClientConfig; - console.log("client options", clientOptions); - const publicClient = createPublicClient(clientOptions); const burnerAccount = createBurnerAccount(networkConfig.privateKey as Hex); diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index 997e767419..d8692821d8 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -9,11 +9,13 @@ import { } from "@latticexyz/block-logs-stream"; import { filter, map, tap, mergeMap, from, concatMap, share, firstValueFrom } from "rxjs"; import { blockLogsToStorage } from "./blockLogsToStorage"; -import { debug } from "./debug"; +import { debug as parentDebug } from "./debug"; import { createIndexerClient } from "./trpc-indexer"; import { BlockLogsToStorageOptions } from "./blockLogsToStorage"; import { SyncStep } from "./SyncStep"; +const debug = parentDebug.extend("createStoreSync"); + type CreateStoreSyncOptions = SyncOptions & { storageAdapter: BlockLogsToStorageOptions; onProgress?: (opts: { diff --git a/packages/store-sync/src/recs/syncToRecs.ts b/packages/store-sync/src/recs/syncToRecs.ts index 3fb3cbf102..858ebff912 100644 --- a/packages/store-sync/src/recs/syncToRecs.ts +++ b/packages/store-sync/src/recs/syncToRecs.ts @@ -29,7 +29,7 @@ export async function syncToRecs({ config, address, publicClient, - startBlock = 0n, + startBlock, maxBlockRange, initialState, indexerUrl, @@ -53,6 +53,8 @@ export async function syncToRecs({ indexerUrl, initialState, onProgress: ({ step, percentage, latestBlockNumber, lastBlockNumberProcessed }) => { + console.log("got progress", step, percentage); + // TODO: stop updating once live? setComponent(components.SyncProgress, singletonEntity, { step, percentage, @@ -63,7 +65,7 @@ export async function syncToRecs({ }, }); - const sub = storeSync.latestBlockNumber$.subscribe(); + const sub = storeSync.blockStorageOperations$.subscribe(); world.registerDisposer(() => sub.unsubscribe()); return { diff --git a/packages/store-sync/src/sqlite/syncToSqlite.ts b/packages/store-sync/src/sqlite/syncToSqlite.ts index ec27bd3f96..89ed96ca12 100644 --- a/packages/store-sync/src/sqlite/syncToSqlite.ts +++ b/packages/store-sync/src/sqlite/syncToSqlite.ts @@ -28,7 +28,7 @@ export async function syncToSqlite({ database, publicClient, address, - startBlock = 0n, + startBlock, maxBlockRange, indexerUrl, initialState, From 92d823d47013b1d4db0f0a9886fef6713dbb0bf2 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Tue, 15 Aug 2023 09:14:20 +0100 Subject: [PATCH 14/17] fix indexer live check --- e2e/packages/sync-test/setup/startIndexer.ts | 2 +- packages/store-sync/src/createStoreSync.ts | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/e2e/packages/sync-test/setup/startIndexer.ts b/e2e/packages/sync-test/setup/startIndexer.ts index 38c7f1904b..9ec0f02459 100644 --- a/e2e/packages/sync-test/setup/startIndexer.ts +++ b/e2e/packages/sync-test/setup/startIndexer.ts @@ -17,7 +17,7 @@ export function startIndexer( const proc = execa("pnpm", ["start"], { cwd: path.join(__dirname, "..", "..", "..", "..", "packages", "store-indexer"), env: { - DEBUG: "mud:store-indexer", + DEBUG: "mud:*", PORT: port.toString(), CHAIN_ID: "31337", RPC_HTTP_URL: rpcUrl, diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index d8692821d8..0a6ed2414c 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -132,16 +132,17 @@ export async function createStoreSync debug("stored", operations.length, "operations for block", blockNumber); lastBlockNumberProcessed = blockNumber; - if (onProgress != null && latestBlockNumber != null) { + if (latestBlockNumber != null) { if (blockNumber < latestBlockNumber) { - onProgress({ + onProgress?.({ step: SyncStep.RPC, percentage: Number((lastBlockNumberProcessed * 1000n) / (latestBlockNumber * 1000n)) / 100, latestBlockNumber, lastBlockNumberProcessed, }); } else { - onProgress({ + debug("all caught up"); + onProgress?.({ step: SyncStep.LIVE, percentage: 100, latestBlockNumber, From 7c3209582a70e50c8a1f4dab2d08af0d6beeb45f Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Tue, 15 Aug 2023 10:23:05 +0100 Subject: [PATCH 15/17] rework log handling, move 'all caught up' message --- e2e/packages/sync-test/setup/startIndexer.ts | 35 ++++++++------------ packages/store-indexer/bin/sqlite-indexer.ts | 15 ++++++++- packages/store-sync/src/createStoreSync.ts | 1 - 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/e2e/packages/sync-test/setup/startIndexer.ts b/e2e/packages/sync-test/setup/startIndexer.ts index 9ec0f02459..d13b5a3e58 100644 --- a/e2e/packages/sync-test/setup/startIndexer.ts +++ b/e2e/packages/sync-test/setup/startIndexer.ts @@ -11,6 +11,10 @@ export function startIndexer( ) { let resolve: () => void; let reject: (reason?: string) => void; + const doneSyncing = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); console.log(chalk.magenta("[indexer]:"), "start syncing"); @@ -32,31 +36,23 @@ export function startIndexer( reject(errorMessage); }); - proc.stdout?.on("data", (data) => { - const dataString = data.toString(); - const errors = extractLineContaining("ERROR", dataString).join("\n"); + function onLog(data: string) { + const errors = extractLineContaining("ERROR", data).join("\n"); if (errors) { - console.log(chalk.magenta("[indexer error]:", errors)); - reject(errors); - } - console.log(chalk.magentaBright("[indexer]:", dataString)); - }); - - proc.stderr?.on("data", (data) => { - const dataString = data.toString(); - const modeErrors = extractLineContaining("ERROR", dataString).join("\n"); - if (modeErrors) { - const errorMessage = chalk.magenta("[indexer error]:", modeErrors); + const errorMessage = chalk.magenta("[indexer error]:", errors); console.log(errorMessage); reportError(errorMessage); - reject(modeErrors); + reject(errors); } if (data.toString().includes("all caught up")) { console.log(chalk.magenta("[indexer]:"), "done syncing"); resolve(); } - console.log(chalk.magentaBright("[indexer ingress]:", dataString)); - }); + console.log(chalk.magentaBright("[indexer]:", data)); + } + + proc.stdout?.on("data", (data) => onLog(data.toString())); + proc.stderr?.on("data", (data) => onLog(data.toString())); function cleanUp() { // attempt to clean up sqlite file @@ -75,10 +71,7 @@ export function startIndexer( return { url: `http://127.0.0.1:${port}`, - doneSyncing: new Promise((res, rej) => { - resolve = res; - reject = rej; - }), + doneSyncing, process: proc, kill: () => new Promise((resolve) => { diff --git a/packages/store-indexer/bin/sqlite-indexer.ts b/packages/store-indexer/bin/sqlite-indexer.ts index b22b606e6f..5d34f2e2d8 100644 --- a/packages/store-indexer/bin/sqlite-indexer.ts +++ b/packages/store-indexer/bin/sqlite-indexer.ts @@ -13,6 +13,8 @@ import type { Chain } from "viem/chains"; import * as mudChains from "@latticexyz/common/chains"; import * as chains from "viem/chains"; import { isNotNull } from "@latticexyz/common/utils"; +import { combineLatest, filter, first } from "rxjs"; +import { debug } from "../src/debug"; const possibleChains = Object.values({ ...mudChains, ...chains }) as Chain[]; @@ -88,13 +90,24 @@ try { // ignore errors, this is optional } -await syncToSqlite({ +const { latestBlockNumber$, blockStorageOperations$ } = await syncToSqlite({ database, publicClient, startBlock, maxBlockRange: env.MAX_BLOCK_RANGE, }); +combineLatest([latestBlockNumber$, blockStorageOperations$]) + .pipe( + filter( + ([latestBlockNumber, { blockNumber: lastBlockNumberProcessed }]) => latestBlockNumber === lastBlockNumberProcessed + ), + first() + ) + .subscribe(() => { + console.log("all caught up"); + }); + const server = createHTTPServer({ middleware: cors(), router: createAppRouter(), diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index 0a6ed2414c..2e16bf1d43 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -141,7 +141,6 @@ export async function createStoreSync lastBlockNumberProcessed, }); } else { - debug("all caught up"); onProgress?.({ step: SyncStep.LIVE, percentage: 100, From 94625b4bf524b70420e164b4fdbc62860d18f2a6 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Tue, 15 Aug 2023 02:48:30 -0700 Subject: [PATCH 16/17] Create smooth-elephants-wave.md --- .changeset/smooth-elephants-wave.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .changeset/smooth-elephants-wave.md diff --git a/.changeset/smooth-elephants-wave.md b/.changeset/smooth-elephants-wave.md new file mode 100644 index 0000000000..63b368bfe4 --- /dev/null +++ b/.changeset/smooth-elephants-wave.md @@ -0,0 +1,9 @@ +--- +"@latticexyz/dev-tools": patch +"@latticexyz/store-indexer": minor +"@latticexyz/store-sync": minor +--- + +Store sync logic is now consolidated into a `createStoreSync` function exported from `@latticexyz/store-sync`. This simplifies each storage sync strategy to just a simple wrapper around the storage adapter. You can now sync to RECS with `syncToRecs` or SQLite with `syncToSqlite` and PostgreSQL support coming soon. + +There are no breaking changes if you were just using `syncToRecs` from `@latticexyz/store-sync` or running the `sqlite-indexer` binary from `@latticexyz/store-indexer`. From 2cda3999e8b20e9455bf975d036930840bb94530 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Tue, 15 Aug 2023 10:56:37 +0100 Subject: [PATCH 17/17] remove ID --- packages/dev-tools/src/events/StorageOperationsTable.tsx | 9 ++++++++- packages/store-sync/src/blockLogsToStorage.test.ts | 2 -- packages/store-sync/src/blockLogsToStorage.ts | 3 --- packages/store-sync/src/common.ts | 1 - packages/store-sync/src/createStoreSync.ts | 3 +-- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/packages/dev-tools/src/events/StorageOperationsTable.tsx b/packages/dev-tools/src/events/StorageOperationsTable.tsx index 98d00297fd..bcf63d8876 100644 --- a/packages/dev-tools/src/events/StorageOperationsTable.tsx +++ b/packages/dev-tools/src/events/StorageOperationsTable.tsx @@ -23,7 +23,14 @@ export function StorageOperationsTable({ operations }: Props) { {operations.map((operation) => ( - + {operation.log?.blockNumber.toString()} diff --git a/packages/store-sync/src/blockLogsToStorage.test.ts b/packages/store-sync/src/blockLogsToStorage.test.ts index 1e69fa54dd..30a682321c 100644 --- a/packages/store-sync/src/blockLogsToStorage.test.ts +++ b/packages/store-sync/src/blockLogsToStorage.test.ts @@ -123,7 +123,6 @@ describe("blockLogsToStorage", () => { "address": "0x5FbDB2315678afecb367f032d93F642f64180aa3", "fieldName": "amount", "fieldValue": 8, - "id": "0x03e962e7402b2ab295b92feac342a132111dd14b0d1fd4d4a0456fdc77981577:88", "key": { "item": 1, "itemVariant": 1, @@ -178,7 +177,6 @@ describe("blockLogsToStorage", () => { "address": "0x5FbDB2315678afecb367f032d93F642f64180aa3", "fieldName": "amount", "fieldValue": 8, - "id": "0x03e962e7402b2ab295b92feac342a132111dd14b0d1fd4d4a0456fdc77981577:88", "key": { "item": 1, "itemVariant": 1, diff --git a/packages/store-sync/src/blockLogsToStorage.ts b/packages/store-sync/src/blockLogsToStorage.ts index fabc11637e..652c92069b 100644 --- a/packages/store-sync/src/blockLogsToStorage.ts +++ b/packages/store-sync/src/blockLogsToStorage.ts @@ -201,7 +201,6 @@ export function blockLogsToStorage({ // TODO: decide if we should handle ephemeral records separately? // they'll eventually be turned into "events", but unclear if that should translate to client storage operations return { - id: `${log.blockHash}:${log.logIndex}`, log, address: getAddress(log.address), type: "SetRecord", @@ -218,7 +217,6 @@ export function blockLogsToStorage({ keyof TConfig["tables"] >[typeof fieldName]; return { - id: `${log.blockHash}:${log.logIndex}`, log, address: getAddress(log.address), type: "SetField", @@ -231,7 +229,6 @@ export function blockLogsToStorage({ if (log.eventName === "StoreDeleteRecord") { return { - id: `${log.blockHash}:${log.logIndex}`, log, address: getAddress(log.address), type: "DeleteRecord", diff --git a/packages/store-sync/src/common.ts b/packages/store-sync/src/common.ts index 64846b3685..b0d5b3ba63 100644 --- a/packages/store-sync/src/common.ts +++ b/packages/store-sync/src/common.ts @@ -33,7 +33,6 @@ export type StoreEventsLog = GetLogsResult[number]; export type BlockLogs = GroupLogsByBlockNumberResult[number]; export type BaseStorageOperation = { - id: string; // TODO: better name for this to signal that this is client-only used for things like React keys log?: NonPendingLog; address: Hex; namespace: TableNamespace; diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index 2e16bf1d43..0e99d49685 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -65,9 +65,8 @@ export async function createStoreSync await storageAdapter.storeOperations({ blockNumber, operations: table.records.map( - (record, i) => + (record) => ({ - id: `${blockNumber}:${table.namespace}:${table.name}:${i}`, type: "SetRecord", address: table.address, namespace: table.namespace,