From 131c63e539a8e9947835dcc323c8b37562aed9ca Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Thu, 3 Aug 2023 09:59:06 -0700 Subject: [PATCH] feat(store-indexer,store-sync): make chain optional, configure indexer with RPC (#1234) --- .changeset/itchy-shoes-appear.md | 7 ++++ packages/store-indexer/bin/sqlite-indexer.ts | 35 ++++++++++++++----- .../store-indexer/src/sqlite/createIndexer.ts | 10 +++--- packages/store-sync/src/recs/syncToRecs.ts | 8 ++--- .../src/sqlite/sqliteStorage.test.ts | 2 +- .../store-sync/src/sqlite/sqliteStorage.ts | 14 ++++---- 6 files changed, 51 insertions(+), 25 deletions(-) create mode 100644 .changeset/itchy-shoes-appear.md diff --git a/.changeset/itchy-shoes-appear.md b/.changeset/itchy-shoes-appear.md new file mode 100644 index 0000000000..c6e862d475 --- /dev/null +++ b/.changeset/itchy-shoes-appear.md @@ -0,0 +1,7 @@ +--- +"@latticexyz/store-indexer": minor +"@latticexyz/store-sync": minor +--- + +- Accept a plain viem `PublicClient` (instead of requiring a `Chain` to be set) in `store-sync` and `store-indexer` functions. These functions now fetch chain ID using `publicClient.getChainId()` when no `publicClient.chain.id` is present. +- Allow configuring `store-indexer` with a set of RPC URLs (`RPC_HTTP_URL` and `RPC_WS_URL`) instead of `CHAIN_ID`. diff --git a/packages/store-indexer/bin/sqlite-indexer.ts b/packages/store-indexer/bin/sqlite-indexer.ts index be108dfe76..6ea684b819 100644 --- a/packages/store-indexer/bin/sqlite-indexer.ts +++ b/packages/store-indexer/bin/sqlite-indexer.ts @@ -4,7 +4,7 @@ import cors from "cors"; import { eq } from "drizzle-orm"; import { drizzle } from "drizzle-orm/better-sqlite3"; import Database from "better-sqlite3"; -import { createPublicClient, fallback, webSocket, http } from "viem"; +import { createPublicClient, fallback, webSocket, http, Transport } from "viem"; import { createHTTPServer } from "@trpc/server/adapters/standalone"; import { createAppRouter } from "@latticexyz/store-sync/trpc-indexer"; import { chainState, schemaVersion } from "@latticexyz/store-sync/sqlite"; @@ -13,12 +13,16 @@ import { createStorageAdapter } from "../src/sqlite/createStorageAdapter"; import type { Chain } from "viem/chains"; import * as mudChains from "@latticexyz/common/chains"; import * as chains from "viem/chains"; +import { isNotNull } from "@latticexyz/common/utils"; const possibleChains = Object.values({ ...mudChains, ...chains }) as Chain[]; +// TODO: refine zod type to be either CHAIN_ID or RPC_HTTP_URL/RPC_WS_URL const env = z .object({ - CHAIN_ID: z.coerce.number().positive(), + CHAIN_ID: z.coerce.number().positive().optional(), + RPC_HTTP_URL: z.string().optional(), + RPC_WS_URL: z.string().optional(), START_BLOCK: z.coerce.bigint().nonnegative().default(0n), MAX_BLOCK_RANGE: z.coerce.bigint().positive().default(1000n), PORT: z.coerce.number().positive().default(3001), @@ -30,24 +34,39 @@ const env = z }), }); -const chain = possibleChains.find((c) => c.id === env.CHAIN_ID); -if (!chain) { - throw new Error(`Chain ${env.CHAIN_ID} not found`); +const chain = env.CHAIN_ID != null ? possibleChains.find((c) => c.id === env.CHAIN_ID) : undefined; +if (env.CHAIN_ID != null && !chain) { + console.warn(`No chain found for chain ID ${env.CHAIN_ID}`); } +const transports: Transport[] = [ + env.RPC_WS_URL ? webSocket(env.RPC_WS_URL) : null, + env.RPC_HTTP_URL ? http(env.RPC_HTTP_URL) : null, +].filter(isNotNull); + const publicClient = createPublicClient({ chain, - transport: fallback([webSocket(), http()]), + transport: fallback( + // If one or more RPC URLs are provided, we'll configure the transport with only those RPC URLs + transports.length > 0 + ? transports + : // Otherwise use the chain defaults + [webSocket(), http()] + ), pollingInterval: 1000, }); +// Fetch the chain ID from the RPC if no chain object was found for the provided chain ID. +// We do this to match the downstream logic, which also attempts to find the chain ID. +const chainId = chain?.id ?? (await publicClient.getChainId()); + const database = drizzle(new Database(env.SQLITE_FILENAME)); let startBlock = env.START_BLOCK; // Resume from latest block stored in DB. This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error. try { - const currentChainStates = database.select().from(chainState).where(eq(chainState.chainId, chain.id)).all(); + const currentChainStates = database.select().from(chainState).where(eq(chainState.chainId, chainId)).all(); // TODO: replace this type workaround with `noUncheckedIndexedAccess: true` when we can fix all the issues related (https://github.com/latticexyz/mud/issues/1212) const currentChainState: (typeof currentChainStates)[number] | undefined = currentChainStates[0]; @@ -70,7 +89,7 @@ try { // ignore errors, this is optional } -createIndexer({ +await createIndexer({ database, publicClient, startBlock, diff --git a/packages/store-indexer/src/sqlite/createIndexer.ts b/packages/store-indexer/src/sqlite/createIndexer.ts index 42dc0cff2a..ea6fe2e7b3 100644 --- a/packages/store-indexer/src/sqlite/createIndexer.ts +++ b/packages/store-indexer/src/sqlite/createIndexer.ts @@ -1,4 +1,4 @@ -import { Chain, PublicClient, Transport } from "viem"; +import { PublicClient } from "viem"; import { createBlockStream, isNonPendingBlock, @@ -24,7 +24,7 @@ type CreateIndexerOptions = { * * [0]: https://viem.sh/docs/clients/public.html */ - publicClient: PublicClient; + publicClient: PublicClient; /** * Optional block number to start indexing from. Useful for resuming the indexer from a particular point in time or starting after a particular contract deployment. */ @@ -41,12 +41,12 @@ type CreateIndexerOptions = { * @param {CreateIndexerOptions} options See `CreateIndexerOptions`. * @returns A function to unsubscribe from the block stream, effectively stopping the indexer. */ -export function createIndexer({ +export async function createIndexer({ database, publicClient, startBlock = 0n, maxBlockRange, -}: CreateIndexerOptions): () => void { +}: CreateIndexerOptions): Promise<() => void> { const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }); const latestBlockNumber$ = latestBlock$.pipe( @@ -70,7 +70,7 @@ export function createIndexer({ const sub = blockLogs$ .pipe( - concatMap(blockLogsToStorage(sqliteStorage({ database, publicClient }))), + concatMap(blockLogsToStorage(await sqliteStorage({ database, publicClient }))), tap(({ blockNumber, operations }) => { debug("stored", operations.length, "operations for block", blockNumber); }) diff --git a/packages/store-sync/src/recs/syncToRecs.ts b/packages/store-sync/src/recs/syncToRecs.ts index 0245f6bfef..870ff12195 100644 --- a/packages/store-sync/src/recs/syncToRecs.ts +++ b/packages/store-sync/src/recs/syncToRecs.ts @@ -39,7 +39,7 @@ type SyncToRecsOptions< config: TConfig; address: Address; // TODO: make this optional and return one if none provided (but will need chain ID at least) - publicClient: PublicClient; + publicClient: PublicClient; // TODO: generate these from config and return instead? components: TComponents; indexerUrl?: string; @@ -94,10 +94,8 @@ export async function syncToRecs< if (indexerUrl != null && initialState == null) { const indexer = createIndexerClient({ url: indexerUrl }); try { - initialState = await indexer.findAll.query({ - chainId: publicClient.chain.id, - address, - }); + const chainId = publicClient.chain?.id ?? (await publicClient.getChainId()); + initialState = await indexer.findAll.query({ chainId, address }); } catch (error) { debug("couldn't get initial state from indexer", error); } diff --git a/packages/store-sync/src/sqlite/sqliteStorage.test.ts b/packages/store-sync/src/sqlite/sqliteStorage.test.ts index ffe298a375..386bd61220 100644 --- a/packages/store-sync/src/sqlite/sqliteStorage.test.ts +++ b/packages/store-sync/src/sqlite/sqliteStorage.test.ts @@ -33,7 +33,7 @@ describe("sqliteStorage", async () => { '"no such table: __mudStoreTables"' ); - const storageAdapter = sqliteStorage({ database: db, publicClient }); + const storageAdapter = await sqliteStorage({ database: db, publicClient }); expect(db.select().from(chainState).all()).toMatchInlineSnapshot("[]"); expect(db.select().from(mudStoreTables).all()).toMatchInlineSnapshot("[]"); diff --git a/packages/store-sync/src/sqlite/sqliteStorage.ts b/packages/store-sync/src/sqlite/sqliteStorage.ts index 21a0b32465..1d4c39133e 100644 --- a/packages/store-sync/src/sqlite/sqliteStorage.ts +++ b/packages/store-sync/src/sqlite/sqliteStorage.ts @@ -1,4 +1,4 @@ -import { Chain, Hex, PublicClient, Transport, encodePacked, getAddress } from "viem"; +import { Hex, PublicClient, encodePacked, getAddress } from "viem"; import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; import { and, eq, sql } from "drizzle-orm"; import { sqliteTableToSql } from "./sqliteTableToSql"; @@ -13,14 +13,16 @@ import { chainState, mudStoreTables } from "./internalTables"; import { getTables } from "./getTables"; import { schemaVersion } from "./schemaVersion"; -export function sqliteStorage({ +export async function sqliteStorage({ database, publicClient, }: { database: BaseSQLiteDatabase<"sync", void>; - publicClient: PublicClient; + publicClient: PublicClient; config?: TConfig; -}): BlockLogsToStorageOptions { +}): Promise> { + const chainId = publicClient.chain?.id ?? (await publicClient.getChainId()); + // TODO: should these run lazily before first `registerTables`? database.run(sql.raw(sqliteTableToSql(chainState))); database.run(sql.raw(sqliteTableToSql(mudStoreTables))); @@ -29,7 +31,7 @@ export function sqliteStorage({ async registerTables({ blockNumber, tables }) { await database.transaction(async (tx) => { for (const table of tables) { - debug(`creating table ${table.namespace}:${table.name} for world ${publicClient.chain.id}:${table.address}`); + debug(`creating table ${table.namespace}:${table.name} for world ${chainId}:${table.address}`); const sqliteTable = createSqliteTable({ address: table.address, @@ -170,7 +172,7 @@ export function sqliteStorage({ tx.insert(chainState) .values({ schemaVersion, - chainId: publicClient.chain.id, + chainId, lastUpdatedBlockNumber: blockNumber, }) .onConflictDoUpdate({