diff --git a/packages/store-indexer/bin/postgres-indexer.ts b/packages/store-indexer/bin/postgres-indexer.ts new file mode 100644 index 0000000000..9e8b5e27c1 --- /dev/null +++ b/packages/store-indexer/bin/postgres-indexer.ts @@ -0,0 +1,136 @@ +import fs from "node:fs"; +import { z } from "zod"; +import { DefaultLogger, eq } from "drizzle-orm"; +import { createPublicClient, fallback, webSocket, http, Transport } from "viem"; +import fastify from "fastify"; +import { fastifyTRPCPlugin } from "@trpc/server/adapters/fastify"; +import { createAppRouter } from "@latticexyz/store-sync/trpc-indexer"; +import { createStorageAdapter } from "../src/postgres/createStorageAdapter"; +import type { Chain } from "viem/chains"; +import * as mudChains from "@latticexyz/common/chains"; +import * as chains from "viem/chains"; +import { isNotNull } from "@latticexyz/common/utils"; +import { combineLatest, filter, first } from "rxjs"; +import { drizzle } from "drizzle-orm/postgres-js"; +import postgres from "postgres"; +import { createInternalTables, schemaVersion, syncToPostgres } from "@latticexyz/store-sync/postgres"; + +const possibleChains = Object.values({ ...mudChains, ...chains }) as Chain[]; + +// TODO: refine zod type to be either CHAIN_ID or RPC_HTTP_URL/RPC_WS_URL +const env = z + .object({ + CHAIN_ID: z.coerce.number().positive().optional(), + RPC_HTTP_URL: z.string().optional(), + RPC_WS_URL: z.string().optional(), + START_BLOCK: z.coerce.bigint().nonnegative().default(0n), + MAX_BLOCK_RANGE: z.coerce.bigint().positive().default(1000n), + PORT: z.coerce.number().positive().default(3001), + DATABASE_URL: z.string(), + }) + .parse(process.env, { + errorMap: (issue) => ({ + message: `Missing or invalid environment variable: ${issue.path.join(".")}`, + }), + }); + +const chain = env.CHAIN_ID != null ? possibleChains.find((c) => c.id === env.CHAIN_ID) : undefined; +if (env.CHAIN_ID != null && !chain) { + console.warn(`No chain found for chain ID ${env.CHAIN_ID}`); +} + +const transports: Transport[] = [ + env.RPC_WS_URL ? webSocket(env.RPC_WS_URL) : null, + env.RPC_HTTP_URL ? http(env.RPC_HTTP_URL) : null, +].filter(isNotNull); + +const publicClient = createPublicClient({ + chain, + transport: fallback( + // If one or more RPC URLs are provided, we'll configure the transport with only those RPC URLs + transports.length > 0 + ? transports + : // Otherwise use the chain defaults + [webSocket(), http()] + ), + pollingInterval: 1000, +}); + +// Fetch the chain ID from the RPC if no chain object was found for the provided chain ID. +// We do this to match the downstream logic, which also attempts to find the chain ID. +const chainId = chain?.id ?? (await publicClient.getChainId()); + +const database = drizzle(postgres(env.DATABASE_URL), { + logger: new DefaultLogger(), +}); + +let startBlock = env.START_BLOCK; + +// Resume from latest block stored in DB. This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error. +try { + const internalTables = createInternalTables(); + const currentChainStates = await database + .select() + .from(internalTables.chain) + .where(eq(internalTables.chain.chainId, chainId)) + .execute(); + // TODO: replace this type workaround with `noUncheckedIndexedAccess: true` when we can fix all the issues related (https://github.com/latticexyz/mud/issues/1212) + const currentChainState: (typeof currentChainStates)[number] | undefined = currentChainStates[0]; + + if (currentChainState != null) { + if (currentChainState.schemaVersion != schemaVersion) { + console.log( + "schema version changed from", + currentChainState.schemaVersion, + "to", + schemaVersion, + "recreating database" + ); + fs.truncateSync(env.DATABASE_URL); + } else if (currentChainState.lastUpdatedBlockNumber != null) { + console.log("resuming from block number", currentChainState.lastUpdatedBlockNumber + 1n); + startBlock = currentChainState.lastUpdatedBlockNumber + 1n; + } + } +} catch (error) { + // ignore errors, this is optional +} + +const { latestBlockNumber$, blockStorageOperations$ } = await syncToPostgres({ + database, + publicClient, + startBlock, + maxBlockRange: env.MAX_BLOCK_RANGE, +}); + +combineLatest([latestBlockNumber$, blockStorageOperations$]) + .pipe( + filter( + ([latestBlockNumber, { blockNumber: lastBlockNumberProcessed }]) => latestBlockNumber === lastBlockNumberProcessed + ), + first() + ) + .subscribe(() => { + console.log("all caught up"); + }); + +// @see https://fastify.dev/docs/latest/ +const server = fastify({ + maxParamLength: 5000, +}); + +await server.register(import("@fastify/cors")); + +// @see https://trpc.io/docs/server/adapters/fastify +server.register(fastifyTRPCPlugin, { + prefix: "/trpc", + trpcOptions: { + router: createAppRouter(), + createContext: async () => ({ + storageAdapter: await createStorageAdapter(database), + }), + }, +}); + +await server.listen({ port: env.PORT }); +console.log(`indexer server listening on http://127.0.0.1:${env.PORT}`); diff --git a/packages/store-indexer/bin/sqlite-indexer.ts b/packages/store-indexer/bin/sqlite-indexer.ts index c16d7b33aa..81afeff511 100644 --- a/packages/store-indexer/bin/sqlite-indexer.ts +++ b/packages/store-indexer/bin/sqlite-indexer.ts @@ -14,7 +14,6 @@ import * as mudChains from "@latticexyz/common/chains"; import * as chains from "viem/chains"; import { isNotNull } from "@latticexyz/common/utils"; import { combineLatest, filter, first } from "rxjs"; -import { debug } from "../src/debug"; const possibleChains = Object.values({ ...mudChains, ...chains }) as Chain[]; diff --git a/packages/store-indexer/package.json b/packages/store-indexer/package.json index c051b86bce..656772ff39 100644 --- a/packages/store-indexer/package.json +++ b/packages/store-indexer/package.json @@ -20,10 +20,10 @@ "clean:js": "rimraf dist", "dev": "tsup --watch", "lint": "eslint .", - "start": "tsx bin/sqlite-indexer", - "start:local": "SQLITE_FILENAME=anvil.db CHAIN_ID=31337 pnpm start", - "start:testnet": "SQLITE_FILENAME=testnet.db CHAIN_ID=4242 START_BLOCK=19037160 pnpm start", - "start:testnet2": "SQLITE_FILENAME=testnet2.db CHAIN_ID=4243 pnpm start", + "start": "tsx bin/postgres-indexer", + "start:local": "DATABASE_URL=http://127.0.0.1/postgres CHAIN_ID=31337 pnpm start", + "start:testnet": "DATABASE_URL=http://127.0.0.1/postgres CHAIN_ID=4242 START_BLOCK=19037160 pnpm start", + "start:testnet2": "DATABASE_URL=http://127.0.0.1/postgres CHAIN_ID=4243 pnpm start", "test": "tsc --noEmit --skipLibCheck" }, "dependencies": { @@ -39,6 +39,7 @@ "debug": "^4.3.4", "drizzle-orm": "^0.27.0", "fastify": "^4.21.0", + "postgres": "^3.3.5", "rxjs": "7.5.5", "superjson": "^1.12.4", "viem": "1.6.0", diff --git a/packages/store-indexer/src/postgres/createStorageAdapter.ts b/packages/store-indexer/src/postgres/createStorageAdapter.ts new file mode 100644 index 0000000000..b9d836033b --- /dev/null +++ b/packages/store-indexer/src/postgres/createStorageAdapter.ts @@ -0,0 +1,54 @@ +import { eq } from "drizzle-orm"; +import { PgDatabase } from "drizzle-orm/pg-core"; +import { createTable, createInternalTables, getTables } from "@latticexyz/store-sync/postgres"; +import { StorageAdapter } from "@latticexyz/store-sync/trpc-indexer"; +import { debug } from "../debug"; +import { getAddress } from "viem"; + +/** + * Creates a storage adapter for the tRPC server/client to query data from Postgres. + * + * @param {PgDatabase} database Postgres database object from Drizzle + * @returns {Promise} A set of methods used by tRPC endpoints. + */ +export async function createStorageAdapter(database: PgDatabase): Promise { + const adapter: StorageAdapter = { + async findAll(chainId, address) { + const internalTables = createInternalTables(); + const tables = (await getTables(database)).filter( + (table) => address != null && getAddress(address) === getAddress(table.address) + ); + + const tablesWithRecords = await Promise.all( + tables.map(async (table) => { + const sqliteTable = createTable(table); + const records = await database.select().from(sqliteTable).where(eq(sqliteTable.__isDeleted, false)).execute(); + return { + ...table, + records: records.map((record) => ({ + key: Object.fromEntries(Object.entries(table.keySchema).map(([name]) => [name, record[name]])), + value: Object.fromEntries(Object.entries(table.valueSchema).map(([name]) => [name, record[name]])), + })), + }; + }) + ); + + const metadata = await database + .select() + .from(internalTables.chain) + .where(eq(internalTables.chain.chainId, chainId)) + .execute(); + const { lastUpdatedBlockNumber } = metadata[0] ?? {}; + + const result = { + blockNumber: lastUpdatedBlockNumber ?? null, + tables: tablesWithRecords, + }; + + debug("findAll", chainId, address, result); + + return result; + }, + }; + return adapter; +} diff --git a/packages/store-sync/package.json b/packages/store-sync/package.json index ada790e7a2..b48672000e 100644 --- a/packages/store-sync/package.json +++ b/packages/store-sync/package.json @@ -11,8 +11,9 @@ "type": "module", "exports": { ".": "./dist/index.js", - "./sqlite": "./dist/sqlite/index.js", + "./postgres": "./dist/postgres/index.js", "./recs": "./dist/recs/index.js", + "./sqlite": "./dist/sqlite/index.js", "./trpc-indexer": "./dist/trpc-indexer/index.js" }, "typesVersions": { @@ -20,12 +21,15 @@ "index": [ "./src/index.ts" ], - "sqlite": [ - "./src/sqlite/index.ts" + "postgres": [ + "./src/postgres/index.ts" ], "recs": [ "./src/recs/index.ts" ], + "sqlite": [ + "./src/sqlite/index.ts" + ], "trpc-indexer": [ "./src/trpc-indexer/index.ts" ] diff --git a/packages/store-sync/src/postgres/index.ts b/packages/store-sync/src/postgres/index.ts index eaadeb8369..0de0aa29bd 100644 --- a/packages/store-sync/src/postgres/index.ts +++ b/packages/store-sync/src/postgres/index.ts @@ -3,4 +3,5 @@ export * from "./getTables"; export * from "./createInternalTables"; export * from "./schemaVersion"; export * from "./postgresStorage"; -export * from "./syncToSqlite"; +export * from "./setupTables"; +export * from "./syncToPostgres"; diff --git a/packages/store-sync/src/postgres/postgresStorage.test.ts b/packages/store-sync/src/postgres/postgresStorage.test.ts index 7cd8863d16..bd05bcbd1d 100644 --- a/packages/store-sync/src/postgres/postgresStorage.test.ts +++ b/packages/store-sync/src/postgres/postgresStorage.test.ts @@ -1,15 +1,15 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; -import { postgresStorage } from "./postgresStorage"; -import { getTables } from "./getTables"; import { PgDatabase, QueryResultHKT } from "drizzle-orm/pg-core"; import { DefaultLogger } from "drizzle-orm"; -import { createTable } from "./createTable"; import { drizzle } from "drizzle-orm/postgres-js"; +import postgres from "postgres"; import { createPublicClient, http } from "viem"; import { foundry } from "viem/chains"; import { blockLogsToStorage } from "../blockLogsToStorage"; -import postgres from "postgres"; import * as transformSchemaNameExports from "./transformSchemaName"; +import { getTables } from "./getTables"; +import { postgresStorage } from "./postgresStorage"; +import { createTable } from "./createTable"; vi.spyOn(transformSchemaNameExports, "transformSchemaName").mockImplementation( (schemaName) => `${process.pid}_${process.env.VITEST_POOL_ID}__${schemaName}` diff --git a/packages/store-sync/src/postgres/postgresStorage.ts b/packages/store-sync/src/postgres/postgresStorage.ts index 6369709a1a..35b8b9db9e 100644 --- a/packages/store-sync/src/postgres/postgresStorage.ts +++ b/packages/store-sync/src/postgres/postgresStorage.ts @@ -1,6 +1,6 @@ import { PublicClient, concatHex, encodeAbiParameters } from "viem"; import { PgDatabase, QueryResultHKT } from "drizzle-orm/pg-core"; -import { and, eq } from "drizzle-orm"; +import { and, eq, inArray } from "drizzle-orm"; import { createTable } from "./createTable"; import { schemaToDefaults } from "../schemaToDefaults"; import { BlockLogsToStorageOptions } from "../blockLogsToStorage"; @@ -10,7 +10,6 @@ import { createInternalTables } from "./createInternalTables"; import { getTables } from "./getTables"; import { schemaVersion } from "./schemaVersion"; import { tableIdToHex } from "@latticexyz/common"; -import { identity } from "@latticexyz/common/utils"; import { setupTables } from "./setupTables"; import { getTableKey } from "./getTableKey"; @@ -83,17 +82,14 @@ export async function postgresStorage const tables = await getTables(database, operations.map(getTableKey)); await database.transaction(async (tx) => { - for (const { address, namespace, name } of tables) { + const tablesWithOperations = tables.filter((table) => + operations.some((op) => getTableKey(op) === getTableKey(table)) + ); + if (tablesWithOperations.length) { await tx .update(internalTables.tables) .set({ lastUpdatedBlockNumber: blockNumber }) - .where( - and( - eq(internalTables.tables.address, address), - eq(internalTables.tables.namespace, namespace), - eq(internalTables.tables.name, name) - ) - ) + .where(inArray(internalTables.tables.id, [...new Set(tablesWithOperations.map(getTableKey))])) .execute(); } diff --git a/packages/store-sync/src/postgres/syncToSqlite.ts b/packages/store-sync/src/postgres/syncToPostgres.ts similarity index 60% rename from packages/store-sync/src/postgres/syncToSqlite.ts rename to packages/store-sync/src/postgres/syncToPostgres.ts index ee6814b405..be649f4e78 100644 --- a/packages/store-sync/src/postgres/syncToSqlite.ts +++ b/packages/store-sync/src/postgres/syncToPostgres.ts @@ -1,20 +1,20 @@ import { StoreConfig } from "@latticexyz/store"; -import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; +import { PgDatabase } from "drizzle-orm/pg-core"; import { SyncOptions, SyncResult } from "../common"; -import { sqliteStorage } from "./postgresStorage"; +import { postgresStorage } from "./postgresStorage"; import { createStoreSync } from "../createStoreSync"; -type SyncToSqliteOptions = SyncOptions & { +type SyncToPostgresOptions = SyncOptions & { /** - * [SQLite database object from Drizzle][0]. + * [Postgres database object from Drizzle][0]. * - * [0]: https://orm.drizzle.team/docs/installation-and-db-connection/sqlite/better-sqlite3 + * [0]: https://orm.drizzle.team/docs/installation-and-db-connection/postgresql/postgresjs */ - database: BaseSQLiteDatabase<"sync", any>; + database: PgDatabase; startSync?: boolean; }; -type SyncToSqliteResult = SyncResult & { +type SyncToPostgresResult = SyncResult & { stopSync: () => void; }; @@ -24,7 +24,7 @@ type SyncToSqliteResult = SyncResult< * @param {CreateIndexerOptions} options See `CreateIndexerOptions`. * @returns A function to unsubscribe from the block stream, effectively stopping the indexer. */ -export async function syncToSqlite({ +export async function syncToPostgres({ config, database, publicClient, @@ -34,9 +34,9 @@ export async function syncToSqlite({ indexerUrl, initialState, startSync = true, -}: SyncToSqliteOptions): Promise> { +}: SyncToPostgresOptions): Promise> { const storeSync = await createStoreSync({ - storageAdapter: await sqliteStorage({ database, publicClient, config }), + storageAdapter: await postgresStorage({ database, publicClient, config }), config, address, publicClient, diff --git a/packages/store-sync/tsup.config.ts b/packages/store-sync/tsup.config.ts index f2e8f2539d..7bc63a088a 100644 --- a/packages/store-sync/tsup.config.ts +++ b/packages/store-sync/tsup.config.ts @@ -1,7 +1,13 @@ import { defineConfig } from "tsup"; export default defineConfig({ - entry: ["src/index.ts", "src/sqlite/index.ts", "src/recs/index.ts", "src/trpc-indexer/index.ts"], + entry: [ + "src/index.ts", + "src/sqlite/index.ts", + "src/postgres/index.ts", + "src/recs/index.ts", + "src/trpc-indexer/index.ts", + ], target: "esnext", format: ["esm"], dts: false, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ce54c6501f..9cfd882fa7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -998,10 +998,13 @@ importers: version: 4.3.4(supports-color@8.1.1) drizzle-orm: specifier: ^0.27.0 - version: 0.27.0(@types/better-sqlite3@7.6.4)(better-sqlite3@8.4.0) + version: 0.27.0(@types/better-sqlite3@7.6.4)(better-sqlite3@8.4.0)(postgres@3.3.5) fastify: specifier: ^4.21.0 version: 4.21.0 + postgres: + specifier: ^3.3.5 + version: 3.3.5 rxjs: specifier: 7.5.5 version: 7.5.5 @@ -5255,7 +5258,7 @@ packages: detect-libc: 1.0.3 dev: true - /drizzle-orm@0.27.0(@types/better-sqlite3@7.6.4)(better-sqlite3@8.4.0): + /drizzle-orm@0.27.0(@types/better-sqlite3@7.6.4)(better-sqlite3@8.4.0)(postgres@3.3.5): resolution: {integrity: sha512-LGiJ0icB+wQwgbSCOvAjONY8Ec6G/EDzQQP5PmUaQYeI9OqgpVKHC2T1fFIbvk5dabWsbokJ5NOciVAxriStig==} peerDependencies: '@aws-sdk/client-rds-data': '>=3' @@ -5319,6 +5322,7 @@ packages: dependencies: '@types/better-sqlite3': 7.6.4 better-sqlite3: 8.4.0 + postgres: 3.3.5 dev: false /drizzle-orm@0.27.0(@types/sql.js@1.4.4)(kysely@0.26.1)(postgres@3.3.5)(sql.js@1.8.0):