Skip to content

Commit

Permalink
feat(store-indexer,store-sync): make chain optional, configure indexe…
Browse files Browse the repository at this point in the history
…r with RPC (#1234)
  • Loading branch information
holic authored Aug 3, 2023
1 parent 57a5260 commit 131c63e
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 25 deletions.
7 changes: 7 additions & 0 deletions .changeset/itchy-shoes-appear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@latticexyz/store-indexer": minor
"@latticexyz/store-sync": minor
---

- Accept a plain viem `PublicClient` (instead of requiring a `Chain` to be set) in `store-sync` and `store-indexer` functions. These functions now fetch chain ID using `publicClient.getChainId()` when no `publicClient.chain.id` is present.
- Allow configuring `store-indexer` with a set of RPC URLs (`RPC_HTTP_URL` and `RPC_WS_URL`) instead of `CHAIN_ID`.
35 changes: 27 additions & 8 deletions packages/store-indexer/bin/sqlite-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cors from "cors";
import { eq } from "drizzle-orm";
import { drizzle } from "drizzle-orm/better-sqlite3";
import Database from "better-sqlite3";
import { createPublicClient, fallback, webSocket, http } from "viem";
import { createPublicClient, fallback, webSocket, http, Transport } from "viem";
import { createHTTPServer } from "@trpc/server/adapters/standalone";
import { createAppRouter } from "@latticexyz/store-sync/trpc-indexer";
import { chainState, schemaVersion } from "@latticexyz/store-sync/sqlite";
Expand All @@ -13,12 +13,16 @@ import { createStorageAdapter } from "../src/sqlite/createStorageAdapter";
import type { Chain } from "viem/chains";
import * as mudChains from "@latticexyz/common/chains";
import * as chains from "viem/chains";
import { isNotNull } from "@latticexyz/common/utils";

const possibleChains = Object.values({ ...mudChains, ...chains }) as Chain[];

// TODO: refine zod type to be either CHAIN_ID or RPC_HTTP_URL/RPC_WS_URL
const env = z
.object({
CHAIN_ID: z.coerce.number().positive(),
CHAIN_ID: z.coerce.number().positive().optional(),
RPC_HTTP_URL: z.string().optional(),
RPC_WS_URL: z.string().optional(),
START_BLOCK: z.coerce.bigint().nonnegative().default(0n),
MAX_BLOCK_RANGE: z.coerce.bigint().positive().default(1000n),
PORT: z.coerce.number().positive().default(3001),
Expand All @@ -30,24 +34,39 @@ const env = z
}),
});

const chain = possibleChains.find((c) => c.id === env.CHAIN_ID);
if (!chain) {
throw new Error(`Chain ${env.CHAIN_ID} not found`);
const chain = env.CHAIN_ID != null ? possibleChains.find((c) => c.id === env.CHAIN_ID) : undefined;
if (env.CHAIN_ID != null && !chain) {
console.warn(`No chain found for chain ID ${env.CHAIN_ID}`);
}

const transports: Transport[] = [
env.RPC_WS_URL ? webSocket(env.RPC_WS_URL) : null,
env.RPC_HTTP_URL ? http(env.RPC_HTTP_URL) : null,
].filter(isNotNull);

const publicClient = createPublicClient({
chain,
transport: fallback([webSocket(), http()]),
transport: fallback(
// If one or more RPC URLs are provided, we'll configure the transport with only those RPC URLs
transports.length > 0
? transports
: // Otherwise use the chain defaults
[webSocket(), http()]
),
pollingInterval: 1000,
});

// Fetch the chain ID from the RPC if no chain object was found for the provided chain ID.
// We do this to match the downstream logic, which also attempts to find the chain ID.
const chainId = chain?.id ?? (await publicClient.getChainId());

const database = drizzle(new Database(env.SQLITE_FILENAME));

let startBlock = env.START_BLOCK;

// Resume from latest block stored in DB. This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error.
try {
const currentChainStates = database.select().from(chainState).where(eq(chainState.chainId, chain.id)).all();
const currentChainStates = database.select().from(chainState).where(eq(chainState.chainId, chainId)).all();
// TODO: replace this type workaround with `noUncheckedIndexedAccess: true` when we can fix all the issues related (https://github.com/latticexyz/mud/issues/1212)
const currentChainState: (typeof currentChainStates)[number] | undefined = currentChainStates[0];

Expand All @@ -70,7 +89,7 @@ try {
// ignore errors, this is optional
}

createIndexer({
await createIndexer({
database,
publicClient,
startBlock,
Expand Down
10 changes: 5 additions & 5 deletions packages/store-indexer/src/sqlite/createIndexer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Chain, PublicClient, Transport } from "viem";
import { PublicClient } from "viem";
import {
createBlockStream,
isNonPendingBlock,
Expand All @@ -24,7 +24,7 @@ type CreateIndexerOptions = {
*
* [0]: https://viem.sh/docs/clients/public.html
*/
publicClient: PublicClient<Transport, Chain>;
publicClient: PublicClient;
/**
* Optional block number to start indexing from. Useful for resuming the indexer from a particular point in time or starting after a particular contract deployment.
*/
Expand All @@ -41,12 +41,12 @@ type CreateIndexerOptions = {
* @param {CreateIndexerOptions} options See `CreateIndexerOptions`.
* @returns A function to unsubscribe from the block stream, effectively stopping the indexer.
*/
export function createIndexer({
export async function createIndexer({
database,
publicClient,
startBlock = 0n,
maxBlockRange,
}: CreateIndexerOptions): () => void {
}: CreateIndexerOptions): Promise<() => void> {
const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" });

const latestBlockNumber$ = latestBlock$.pipe(
Expand All @@ -70,7 +70,7 @@ export function createIndexer({

const sub = blockLogs$
.pipe(
concatMap(blockLogsToStorage(sqliteStorage({ database, publicClient }))),
concatMap(blockLogsToStorage(await sqliteStorage({ database, publicClient }))),
tap(({ blockNumber, operations }) => {
debug("stored", operations.length, "operations for block", blockNumber);
})
Expand Down
8 changes: 3 additions & 5 deletions packages/store-sync/src/recs/syncToRecs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type SyncToRecsOptions<
config: TConfig;
address: Address;
// TODO: make this optional and return one if none provided (but will need chain ID at least)
publicClient: PublicClient<Transport, Chain>;
publicClient: PublicClient;
// TODO: generate these from config and return instead?
components: TComponents;
indexerUrl?: string;
Expand Down Expand Up @@ -94,10 +94,8 @@ export async function syncToRecs<
if (indexerUrl != null && initialState == null) {
const indexer = createIndexerClient({ url: indexerUrl });
try {
initialState = await indexer.findAll.query({
chainId: publicClient.chain.id,
address,
});
const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());
initialState = await indexer.findAll.query({ chainId, address });
} catch (error) {
debug("couldn't get initial state from indexer", error);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/store-sync/src/sqlite/sqliteStorage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ describe("sqliteStorage", async () => {
'"no such table: __mudStoreTables"'
);

const storageAdapter = sqliteStorage({ database: db, publicClient });
const storageAdapter = await sqliteStorage({ database: db, publicClient });

expect(db.select().from(chainState).all()).toMatchInlineSnapshot("[]");
expect(db.select().from(mudStoreTables).all()).toMatchInlineSnapshot("[]");
Expand Down
14 changes: 8 additions & 6 deletions packages/store-sync/src/sqlite/sqliteStorage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Chain, Hex, PublicClient, Transport, encodePacked, getAddress } from "viem";
import { Hex, PublicClient, encodePacked, getAddress } from "viem";
import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core";
import { and, eq, sql } from "drizzle-orm";
import { sqliteTableToSql } from "./sqliteTableToSql";
Expand All @@ -13,14 +13,16 @@ import { chainState, mudStoreTables } from "./internalTables";
import { getTables } from "./getTables";
import { schemaVersion } from "./schemaVersion";

export function sqliteStorage<TConfig extends StoreConfig = StoreConfig>({
export async function sqliteStorage<TConfig extends StoreConfig = StoreConfig>({
database,
publicClient,
}: {
database: BaseSQLiteDatabase<"sync", void>;
publicClient: PublicClient<Transport, Chain>;
publicClient: PublicClient;
config?: TConfig;
}): BlockLogsToStorageOptions<TConfig> {
}): Promise<BlockLogsToStorageOptions<TConfig>> {
const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());

// TODO: should these run lazily before first `registerTables`?
database.run(sql.raw(sqliteTableToSql(chainState)));
database.run(sql.raw(sqliteTableToSql(mudStoreTables)));
Expand All @@ -29,7 +31,7 @@ export function sqliteStorage<TConfig extends StoreConfig = StoreConfig>({
async registerTables({ blockNumber, tables }) {
await database.transaction(async (tx) => {
for (const table of tables) {
debug(`creating table ${table.namespace}:${table.name} for world ${publicClient.chain.id}:${table.address}`);
debug(`creating table ${table.namespace}:${table.name} for world ${chainId}:${table.address}`);

const sqliteTable = createSqliteTable({
address: table.address,
Expand Down Expand Up @@ -170,7 +172,7 @@ export function sqliteStorage<TConfig extends StoreConfig = StoreConfig>({
tx.insert(chainState)
.values({
schemaVersion,
chainId: publicClient.chain.id,
chainId,
lastUpdatedBlockNumber: blockNumber,
})
.onConflictDoUpdate({
Expand Down

0 comments on commit 131c63e

Please sign in to comment.