diff --git a/.changeset/tough-moose-pay.md b/.changeset/tough-moose-pay.md new file mode 100644 index 0000000000..42ced06f9f --- /dev/null +++ b/.changeset/tough-moose-pay.md @@ -0,0 +1,7 @@ +--- +"@latticexyz/common": minor +--- + +- Added a `Result` type for more explicit and typesafe error handling ([inspired by Rust](https://doc.rust-lang.org/std/result/)). + +- Added a `includes` util as typesafe alternative to [`Array.prototype.includes()`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/includes). diff --git a/.changeset/wild-years-search.md b/.changeset/wild-years-search.md new file mode 100644 index 0000000000..2ca0461a39 --- /dev/null +++ b/.changeset/wild-years-search.md @@ -0,0 +1,22 @@ +--- +"@latticexyz/store-indexer": minor +"@latticexyz/store-sync": minor +--- + +- Improved query performance by 10x by moving from drizzle ORM to handcrafted SQL. +- Moved away from `trpc` for more granular control over the transport layer. + Added an `/api/logs` endpoint using the new query and gzip compression for 40x less data transferred over the wire. + Deprecated the `/trpc/getLogs` and `/trpc/findAll` endpoints. +- Added a `createIndexerClient` client for the new `/api` indexer API exported from `@latticexyz/store-sync/indexer-client`. + The `createIndexerClient` export from `@latticexyz/store-sync/trpc-indexer` is deprecated. + +```diff +- import { createIndexerClient } from "@latticexyz/store-sync/trpc-indexer"; ++ import { createIndexerClient } from "@latticexyz/store-sync/indexer-client"; + +- const indexer = createIndexerClient({ url: "https://indexer.holesky.redstone.xyz/trpc" }); ++ const indexer = createIndexerClient({ url: "https://indexer.holesky.redstone.xyz" }); + +- const snapshot = indexer.getLogs.query(options); ++ const snapshot = indexer.getLogs(options); +``` diff --git a/e2e/packages/contracts/worlds.json b/e2e/packages/contracts/worlds.json index a3a1b09f18..3fb8535203 100644 --- a/e2e/packages/contracts/worlds.json +++ b/e2e/packages/contracts/worlds.json @@ -1,5 +1,5 @@ { "31337": { - "address": "0x6e9474e9c83676b9a71133ff96db43e7aa0a4342" + "address": "0x2ea123a56f2e986c9844bf4dc13050c4df200b29" } } \ No newline at end of file diff --git a/e2e/packages/sync-test/indexerSync.test.ts b/e2e/packages/sync-test/indexerSync.test.ts index 6f3f3a751a..f23edf5f16 100644 --- a/e2e/packages/sync-test/indexerSync.test.ts +++ b/e2e/packages/sync-test/indexerSync.test.ts @@ -23,7 +23,7 @@ import { waitForInitialSync } from "./data/waitForInitialSync"; const env = z .object({ - DATABASE_URL: z.string().default("postgres://127.0.0.1/postgres"), + DATABASE_URL: z.string().default("postgres://127.0.0.1/postgres_e2e"), }) .parse(process.env, { errorMap: (issue) => ({ @@ -57,18 +57,20 @@ describe("Sync from indexer", async () => { await openClientWithRootAccount(page, { indexerUrl: `http://127.0.0.1:9999/trpc` }); await waitForInitialSync(page); - expect(asyncErrorHandler.getErrors()).toHaveLength(1); - expect(asyncErrorHandler.getErrors()[0]).toContain("error getting snapshot"); + const errors = asyncErrorHandler.getErrors(); + expect(errors).toHaveLength(2); + expect(errors[0]).toContain("Failed to fetch"); + expect(errors[1]).toContain("error getting snapshot"); }); describe.each([["sqlite"], ["postgres"]] as const)("%s indexer", (indexerType) => { let indexerIteration = 1; - let indexer: ReturnType; + let indexer: Awaited>; beforeEach(async () => { // Start indexer const port = 3000 + indexerIteration++; - indexer = startIndexer({ + indexer = await startIndexer({ port, rpcHttpUrl, reportError: asyncErrorHandler.reportError, diff --git a/e2e/packages/sync-test/package.json b/e2e/packages/sync-test/package.json index e6c5fdcc27..ca224633e7 100644 --- a/e2e/packages/sync-test/package.json +++ b/e2e/packages/sync-test/package.json @@ -4,8 +4,9 @@ "private": true, "license": "MIT", "scripts": { - "test": "vitest --run", - "test:ci": "pnpm run test" + "setup": "psql postgres://127.0.0.1/postgres -c \"CREATE DATABASE postgres_e2e;\"", + "test": "DATABASE_URL=postgres://127.0.0.1/postgres_e2e vitest --run", + "test:ci": "vitest --run" }, "devDependencies": { "@latticexyz/cli": "link:../../../packages/cli", @@ -20,8 +21,10 @@ "abitype": "0.9.8", "chalk": "^5.2.0", "dotenv": "^16.0.3", + "drizzle-orm": "^0.28.5", "execa": "^7.1.1", "happy-dom": "^12.10.3", + "postgres": "3.3.5", "typescript": "5.1.6", "viem": "1.14.0", "vite": "^4.2.1", diff --git a/e2e/packages/sync-test/setup/startIndexer.ts b/e2e/packages/sync-test/setup/startIndexer.ts index 7225947c71..5bff7e397f 100644 --- a/e2e/packages/sync-test/setup/startIndexer.ts +++ b/e2e/packages/sync-test/setup/startIndexer.ts @@ -1,6 +1,9 @@ import chalk from "chalk"; import { execa } from "execa"; import { rmSync } from "node:fs"; +import { cleanDatabase } from "@latticexyz/store-sync/postgres"; +import { drizzle } from "drizzle-orm/postgres-js"; +import postgres from "postgres"; import path from "node:path"; type IndexerOptions = @@ -19,7 +22,7 @@ type StartIndexerOptions = { reportError: (error: string) => void; } & IndexerOptions; -export function startIndexer(opts: StartIndexerOptions) { +export async function startIndexer(opts: StartIndexerOptions) { let resolve: () => void; let reject: (reason?: string) => void; const doneSyncing = new Promise((res, rej) => { @@ -37,6 +40,9 @@ export function startIndexer(opts: StartIndexerOptions) { }; console.log(chalk.magenta("[indexer]:"), "starting indexer", env); + // Clean the test db + await cleanUp(); + const proc = execa("pnpm", opts.indexer === "postgres" ? ["start:postgres"] : ["start:sqlite"], { cwd: path.join(__dirname, "..", "..", "..", "..", "packages", "store-indexer"), env, @@ -67,13 +73,22 @@ export function startIndexer(opts: StartIndexerOptions) { proc.stdout?.on("data", (data) => onLog(data.toString())); proc.stderr?.on("data", (data) => onLog(data.toString())); - function cleanUp() { + async function cleanUp() { // attempt to clean up sqlite file if (opts.indexer === "sqlite") { try { rmSync(opts.sqliteFilename); } catch (error) { - console.log("could not delete", opts.sqliteFilename, error); + console.log("could not delete", opts.sqliteFilename); + } + } + + // attempt to clean up the postgres db + if (opts.indexer === "postgres") { + try { + await cleanDatabase(drizzle(postgres(opts.databaseUrl))); + } catch (error) { + console.log("could not clean postgres database"); } } } diff --git a/e2e/pnpm-lock.yaml b/e2e/pnpm-lock.yaml index 27aa485a1f..ab2975794e 100644 --- a/e2e/pnpm-lock.yaml +++ b/e2e/pnpm-lock.yaml @@ -165,12 +165,18 @@ importers: dotenv: specifier: ^16.0.3 version: 16.0.3 + drizzle-orm: + specifier: ^0.28.5 + version: 0.28.5(postgres@3.3.5) execa: specifier: ^7.1.1 version: 7.1.1 happy-dom: specifier: ^12.10.3 version: 12.10.3 + postgres: + specifier: 3.3.5 + version: 3.3.5 typescript: specifier: 5.1.6 version: 5.1.6 @@ -1019,6 +1025,71 @@ packages: engines: {node: '>=12'} dev: true + /drizzle-orm@0.28.5(postgres@3.3.5): + resolution: {integrity: sha512-6r6Iw4c38NAmW6TiKH3TUpGUQ1YdlEoLJOQptn8XPx3Z63+vFNKfAiANqrIiYZiMjKR9+NYAL219nFrmo1duXA==} + peerDependencies: + '@aws-sdk/client-rds-data': '>=3' + '@cloudflare/workers-types': '>=3' + '@libsql/client': '*' + '@neondatabase/serverless': '>=0.1' + '@opentelemetry/api': ^1.4.1 + '@planetscale/database': '>=1' + '@types/better-sqlite3': '*' + '@types/pg': '*' + '@types/sql.js': '*' + '@vercel/postgres': '*' + better-sqlite3: '>=7' + bun-types: '*' + knex: '*' + kysely: '*' + mysql2: '>=2' + pg: '>=8' + postgres: '>=3' + sql.js: '>=1' + sqlite3: '>=5' + peerDependenciesMeta: + '@aws-sdk/client-rds-data': + optional: true + '@cloudflare/workers-types': + optional: true + '@libsql/client': + optional: true + '@neondatabase/serverless': + optional: true + '@opentelemetry/api': + optional: true + '@planetscale/database': + optional: true + '@types/better-sqlite3': + optional: true + '@types/pg': + optional: true + '@types/sql.js': + optional: true + '@vercel/postgres': + optional: true + better-sqlite3: + optional: true + bun-types: + optional: true + knex: + optional: true + kysely: + optional: true + mysql2: + optional: true + pg: + optional: true + postgres: + optional: true + sql.js: + optional: true + sqlite3: + optional: true + dependencies: + postgres: 3.3.5 + dev: true + /emoji-regex@8.0.0: resolution: {integrity: sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A==} dev: false @@ -1475,6 +1546,10 @@ packages: source-map-js: 1.0.2 dev: true + /postgres@3.3.5: + resolution: {integrity: sha512-+JD93VELV9gHkqpV5gdL5/70HdGtEw4/XE1S4BC8f1mcPmdib3K5XsKVbnR1XcAyC41zOnifJ+9YRKxdIsXiUw==} + dev: true + /prettier@2.6.2: resolution: {integrity: sha512-PkUpF+qoXTqhOeWL9fu7As8LXsIUZ1WYaJiY/a7McAQzxjk82OF0tibkFXVCDImZtWxbvojFjerkiLb0/q8mew==} engines: {node: '>=10.13.0'} diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index c5a2061a44..633ae85b37 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -10,6 +10,7 @@ export * from "./hexToResource"; export * from "./readHex"; export * from "./resourceToHex"; export * from "./resourceTypes"; +export * from "./result"; export * from "./sendTransaction"; export * from "./spliceHex"; export * from "./transportObserver"; diff --git a/packages/common/src/result.ts b/packages/common/src/result.ts new file mode 100644 index 0000000000..f3228a72f1 --- /dev/null +++ b/packages/common/src/result.ts @@ -0,0 +1,10 @@ +// Inspired by https://doc.rust-lang.org/std/result/ +export type Result = { ok: Ok } | { error: Err }; + +export function isOk(result: Result): result is { ok: Ok } { + return "ok" in result; +} + +export function isError(result: Result): result is { error: Err } { + return "error" in result; +} diff --git a/packages/common/src/utils/includes.ts b/packages/common/src/utils/includes.ts new file mode 100644 index 0000000000..5b4e83e6b8 --- /dev/null +++ b/packages/common/src/utils/includes.ts @@ -0,0 +1,3 @@ +export function includes(items: item[], value: any): value is item { + return items.includes(value); +} diff --git a/packages/common/src/utils/index.ts b/packages/common/src/utils/index.ts index f1d4991554..37d4ce5838 100644 --- a/packages/common/src/utils/index.ts +++ b/packages/common/src/utils/index.ts @@ -6,6 +6,7 @@ export * from "./chunk"; export * from "./curry"; export * from "./groupBy"; export * from "./identity"; +export * from "./includes"; export * from "./isDefined"; export * from "./isNotNull"; export * from "./iteratorToArray"; diff --git a/packages/store-indexer/bin/postgres-frontend.ts b/packages/store-indexer/bin/postgres-frontend.ts index bab85b1fd4..0b1cc3cf9f 100644 --- a/packages/store-indexer/bin/postgres-frontend.ts +++ b/packages/store-indexer/bin/postgres-frontend.ts @@ -6,10 +6,11 @@ import cors from "@koa/cors"; import Router from "@koa/router"; import { createKoaMiddleware } from "trpc-koa-adapter"; import { createAppRouter } from "@latticexyz/store-sync/trpc-indexer"; -import { createQueryAdapter } from "../src/postgres/createQueryAdapter"; import { drizzle } from "drizzle-orm/postgres-js"; import postgres from "postgres"; import { frontendEnvSchema, parseEnv } from "./parseEnv"; +import { createQueryAdapter } from "../src/postgres/deprecated/createQueryAdapter"; +import { apiRoutes } from "../src/postgres/apiRoutes"; const env = parseEnv( z.intersection( @@ -20,10 +21,11 @@ const env = parseEnv( ) ); -const database = drizzle(postgres(env.DATABASE_URL)); +const database = postgres(env.DATABASE_URL); const server = new Koa(); server.use(cors()); +server.use(apiRoutes(database)); const router = new Router(); @@ -47,7 +49,7 @@ server.use( prefix: "/trpc", router: createAppRouter(), createContext: async () => ({ - queryAdapter: await createQueryAdapter(database), + queryAdapter: await createQueryAdapter(drizzle(database)), }), }) ); diff --git a/packages/store-indexer/bin/postgres-indexer.ts b/packages/store-indexer/bin/postgres-indexer.ts index 1f553f1f73..ea17b7d7d7 100644 --- a/packages/store-indexer/bin/postgres-indexer.ts +++ b/packages/store-indexer/bin/postgres-indexer.ts @@ -39,7 +39,7 @@ const database = drizzle(postgres(env.DATABASE_URL)); if (await shouldCleanDatabase(database, chainId)) { console.log("outdated database detected, clearing data to start fresh"); - cleanDatabase(database); + await cleanDatabase(database); } const { storageAdapter, tables } = await createStorageAdapter({ database, publicClient }); diff --git a/packages/store-indexer/bin/sqlite-indexer.ts b/packages/store-indexer/bin/sqlite-indexer.ts index b5f4b7d088..835b2a75c2 100644 --- a/packages/store-indexer/bin/sqlite-indexer.ts +++ b/packages/store-indexer/bin/sqlite-indexer.ts @@ -16,6 +16,7 @@ import { createQueryAdapter } from "../src/sqlite/createQueryAdapter"; import { isDefined } from "@latticexyz/common/utils"; import { combineLatest, filter, first } from "rxjs"; import { frontendEnvSchema, indexerEnvSchema, parseEnv } from "./parseEnv"; +import { apiRoutes } from "../src/sqlite/apiRoutes"; const env = parseEnv( z.intersection( @@ -91,6 +92,7 @@ combineLatest([latestBlockNumber$, storedBlockLogs$]) const server = new Koa(); server.use(cors()); +server.use(apiRoutes(database)); const router = new Router(); diff --git a/packages/store-indexer/package.json b/packages/store-indexer/package.json index edc7d92d68..6b946572ee 100644 --- a/packages/store-indexer/package.json +++ b/packages/store-indexer/package.json @@ -44,12 +44,14 @@ "@latticexyz/store-sync": "workspace:*", "@trpc/client": "10.34.0", "@trpc/server": "10.34.0", + "accepts": "^1.3.8", "better-sqlite3": "^8.6.0", "debug": "^4.3.4", "dotenv": "^16.0.3", "drizzle-orm": "^0.28.5", "koa": "^2.14.2", - "postgres": "^3.3.5", + "koa-compose": "^4.1.0", + "postgres": "3.3.5", "rxjs": "7.5.5", "superjson": "^1.12.4", "trpc-koa-adapter": "^1.1.3", @@ -57,10 +59,12 @@ "zod": "^3.21.4" }, "devDependencies": { + "@types/accepts": "^1.3.7", "@types/better-sqlite3": "^7.6.4", "@types/cors": "^2.8.13", "@types/debug": "^4.1.7", "@types/koa": "^2.13.12", + "@types/koa-compose": "^3.2.8", "@types/koa__cors": "^4.0.3", "@types/koa__router": "^12.0.4", "concurrently": "^8.2.2", diff --git a/packages/store-indexer/src/compress.ts b/packages/store-indexer/src/compress.ts new file mode 100644 index 0000000000..24eb2298e4 --- /dev/null +++ b/packages/store-indexer/src/compress.ts @@ -0,0 +1,48 @@ +import { Middleware } from "koa"; +import { Readable, Stream } from "node:stream"; +import accepts from "accepts"; +import { Zlib, createBrotliCompress, createDeflate, createGzip } from "node:zlib"; +import { includes } from "@latticexyz/common/utils"; + +// Loosely based on https://github.com/holic/koa-compress/blob/master/lib/index.js +// with better handling of streams better with occasional flushing + +const encodings = { + br: createBrotliCompress, + gzip: createGzip, + deflate: createDeflate, +} as const; + +const encodingNames = Object.keys(encodings) as (keyof typeof encodings)[]; + +function flushEvery(stream: stream, bytesThreshold: number): stream { + let bytesSinceFlush = 0; + stream.on("data", (data) => { + bytesSinceFlush += data.length; + if (bytesSinceFlush > bytesThreshold) { + bytesSinceFlush = 0; + stream.flush(); + } + }); + return stream; +} + +type CompressOptions = { + flushThreshold?: number; +}; + +export function compress({ flushThreshold = 1024 * 4 }: CompressOptions = {}): Middleware { + return async function compressMiddleware(ctx, next) { + ctx.vary("Accept-Encoding"); + + await next(); + + const encoding = accepts(ctx.req).encoding(encodingNames); + if (!includes(encodingNames, encoding)) return; + + const compressed = flushEvery(encodings[encoding](), flushThreshold); + + ctx.set("Content-Encoding", encoding); + ctx.body = ctx.body instanceof Stream ? ctx.body.pipe(compressed) : compressed.end(ctx.body); + }; +} diff --git a/packages/store-indexer/src/postgres/apiRoutes.ts b/packages/store-indexer/src/postgres/apiRoutes.ts new file mode 100644 index 0000000000..dfaf74990e --- /dev/null +++ b/packages/store-indexer/src/postgres/apiRoutes.ts @@ -0,0 +1,47 @@ +import { Sql } from "postgres"; +import { Middleware } from "koa"; +import Router from "@koa/router"; +import compose from "koa-compose"; +import { input } from "@latticexyz/store-sync/indexer-client"; +import { storeTables } from "@latticexyz/store-sync"; +import { queryLogs } from "./queryLogs"; +import { recordToLog } from "./recordToLog"; +import { debug } from "../debug"; +import { createBenchmark } from "@latticexyz/common"; +import { compress } from "../compress"; + +export function apiRoutes(database: Sql): Middleware { + const router = new Router(); + + router.get("/api/logs", compress(), async (ctx) => { + const benchmark = createBenchmark("postgres:logs"); + let options: ReturnType; + + try { + options = input.parse(typeof ctx.query.input === "string" ? JSON.parse(ctx.query.input) : {}); + } catch (error) { + ctx.status = 400; + ctx.body = JSON.stringify(error); + debug(error); + return; + } + + try { + options.filters = options.filters.length > 0 ? [...options.filters, { tableId: storeTables.Tables.tableId }] : []; + const records = await queryLogs(database, options ?? {}).execute(); + benchmark("query records"); + const logs = records.map(recordToLog); + benchmark("map records to logs"); + const blockNumber = records[0]?.chainBlockNumber ?? "-1"; + + ctx.body = JSON.stringify({ blockNumber, logs }); + ctx.status = 200; + } catch (error) { + ctx.status = 500; + ctx.body = JSON.stringify(error); + debug(error); + } + }); + + return compose([router.routes(), router.allowedMethods()]) as Middleware; +} diff --git a/packages/store-indexer/src/postgres/common.ts b/packages/store-indexer/src/postgres/common.ts new file mode 100644 index 0000000000..ff4807cea7 --- /dev/null +++ b/packages/store-indexer/src/postgres/common.ts @@ -0,0 +1,20 @@ +import { Hex } from "viem"; + +export type RecordData = { + address: Hex; + tableId: Hex; + keyBytes: Hex; + staticData: Hex | null; + encodedLengths: Hex | null; + dynamicData: Hex | null; + lastUpdatedBlockNumber: string; +}; + +export type RecordMetadata = { + indexerVersion: string; + chainId: string; + chainBlockNumber: string; + totalRows: number; +}; + +export type Record = RecordData & RecordMetadata; diff --git a/packages/store-indexer/src/postgres/createQueryAdapter.ts b/packages/store-indexer/src/postgres/deprecated/createQueryAdapter.ts similarity index 97% rename from packages/store-indexer/src/postgres/createQueryAdapter.ts rename to packages/store-indexer/src/postgres/deprecated/createQueryAdapter.ts index c38c21ffbb..db0f6ddf7c 100644 --- a/packages/store-indexer/src/postgres/createQueryAdapter.ts +++ b/packages/store-indexer/src/postgres/deprecated/createQueryAdapter.ts @@ -3,7 +3,7 @@ import { PgDatabase } from "drizzle-orm/pg-core"; import { TableWithRecords, isTableRegistrationLog, logToTable, storeTables } from "@latticexyz/store-sync"; import { decodeKey, decodeValueArgs } from "@latticexyz/protocol-parser"; import { QueryAdapter } from "@latticexyz/store-sync/trpc-indexer"; -import { debug } from "../debug"; +import { debug } from "../../debug"; import { getLogs } from "./getLogs"; import { groupBy } from "@latticexyz/common/utils"; @@ -12,6 +12,7 @@ import { groupBy } from "@latticexyz/common/utils"; * * @param {PgDatabase} database Postgres database object from Drizzle * @returns {Promise} A set of methods used by tRPC endpoints. + * @deprecated */ export async function createQueryAdapter(database: PgDatabase): Promise { const adapter: QueryAdapter = { diff --git a/packages/store-indexer/src/postgres/getLogs.ts b/packages/store-indexer/src/postgres/deprecated/getLogs.ts similarity index 84% rename from packages/store-indexer/src/postgres/getLogs.ts rename to packages/store-indexer/src/postgres/deprecated/getLogs.ts index b6e19aff14..30896d2d21 100644 --- a/packages/store-indexer/src/postgres/getLogs.ts +++ b/packages/store-indexer/src/postgres/deprecated/getLogs.ts @@ -3,9 +3,13 @@ import { Hex } from "viem"; import { StorageAdapterLog, SyncFilter } from "@latticexyz/store-sync"; import { tables } from "@latticexyz/store-sync/postgres"; import { and, asc, eq, or } from "drizzle-orm"; -import { decodeDynamicField } from "@latticexyz/protocol-parser"; import { bigIntMax } from "@latticexyz/common/utils"; +import { recordToLog } from "../recordToLog"; +import { createBenchmark } from "@latticexyz/common"; +/** + * @deprecated + */ export async function getLogs( database: PgDatabase, { @@ -18,6 +22,8 @@ export async function getLogs( readonly filters?: readonly SyncFilter[]; } ): Promise<{ blockNumber: bigint; logs: (StorageAdapterLog & { eventName: "Store_SetRecord" })[] }> { + const benchmark = createBenchmark("drizzleGetLogs"); + const conditions = filters.length ? filters.map((filter) => and( @@ -30,6 +36,7 @@ export async function getLogs( : address != null ? [eq(tables.recordsTable.address, address)] : []; + benchmark("parse config"); // Query for the block number that the indexer (i.e. chain) is at, in case the // indexer is further along in the chain than a given store/table's last updated @@ -49,6 +56,7 @@ export async function getLogs( // TODO: move this to `.findFirst` after upgrading drizzle or `rows[0]` after enabling `noUncheckedIndexedAccess: true` .then((rows) => rows.find(() => true)); const indexerBlockNumber = chainState?.lastUpdatedBlockNumber ?? 0n; + benchmark("query chainState"); const records = await database .select() @@ -58,29 +66,19 @@ export async function getLogs( asc(tables.recordsTable.lastUpdatedBlockNumber) // TODO: add logIndex (https://github.com/latticexyz/mud/issues/1979) ); + benchmark("query records"); const blockNumber = records.reduce( (max, record) => bigIntMax(max, record.lastUpdatedBlockNumber ?? 0n), indexerBlockNumber ); + benchmark("find block number"); const logs = records // TODO: add this to the query, assuming we can optimize with an index .filter((record) => !record.isDeleted) - .map( - (record) => - ({ - address: record.address, - eventName: "Store_SetRecord", - args: { - tableId: record.tableId, - keyTuple: decodeDynamicField("bytes32[]", record.keyBytes), - staticData: record.staticData ?? "0x", - encodedLengths: record.encodedLengths ?? "0x", - dynamicData: record.dynamicData ?? "0x", - }, - } as const) - ); + .map((record) => recordToLog({ ...record, lastUpdatedBlockNumber: record.lastUpdatedBlockNumber.toString() })); + benchmark("map records to logs"); return { blockNumber, logs }; } diff --git a/packages/store-indexer/src/postgres/queryLogs.ts b/packages/store-indexer/src/postgres/queryLogs.ts new file mode 100644 index 0000000000..e4981234e7 --- /dev/null +++ b/packages/store-indexer/src/postgres/queryLogs.ts @@ -0,0 +1,71 @@ +import { isNotNull } from "@latticexyz/common/utils"; +import { PendingQuery, Row, Sql } from "postgres"; +import { hexToBytes } from "viem"; +import { z } from "zod"; +import { input } from "@latticexyz/store-sync/indexer-client"; +import { transformSchemaName } from "@latticexyz/store-sync/postgres"; +import { Record } from "./common"; + +const schemaName = transformSchemaName("mud"); + +function and(sql: Sql, conditions: PendingQuery[]): PendingQuery { + return sql`(${conditions.reduce((query, condition) => sql`${query} AND ${condition}`)})`; +} + +function or(sql: Sql, conditions: PendingQuery[]): PendingQuery { + return sql`(${conditions.reduce((query, condition) => sql`${query} OR ${condition}`)})`; +} + +export function queryLogs(sql: Sql, opts: z.infer): PendingQuery { + const conditions = opts.filters.length + ? opts.filters.map((filter) => + and( + sql, + [ + opts.address != null ? sql`address = ${hexToBytes(opts.address)}` : null, + sql`table_id = ${hexToBytes(filter.tableId)}`, + filter.key0 != null ? sql`key0 = ${hexToBytes(filter.key0)}` : null, + filter.key1 != null ? sql`key1 = ${hexToBytes(filter.key1)}` : null, + ].filter(isNotNull) + ) + ) + : opts.address != null + ? [sql`address = ${hexToBytes(opts.address)}`] + : []; + + const where = sql`WHERE ${and( + sql, + [sql`is_deleted != true`, conditions.length ? or(sql, conditions) : null].filter(isNotNull) + )}`; + + // TODO: implement bytea <> hex columns via custom types: https://github.com/porsager/postgres#custom-types + // TODO: sort by logIndex (https://github.com/latticexyz/mud/issues/1979) + return sql` + WITH + config AS ( + SELECT + version AS "indexerVersion", + chain_id AS "chainId", + last_updated_block_number AS "chainBlockNumber" + FROM ${sql(`${schemaName}.config`)} + LIMIT 1 + ), + records AS ( + SELECT + '0x' || encode(address, 'hex') AS address, + '0x' || encode(table_id, 'hex') AS "tableId", + '0x' || encode(key_bytes, 'hex') AS "keyBytes", + '0x' || encode(static_data, 'hex') AS "staticData", + '0x' || encode(encoded_lengths, 'hex') AS "encodedLengths", + '0x' || encode(dynamic_data, 'hex') AS "dynamicData", + last_updated_block_number AS "recordBlockNumber" + FROM ${sql(`${schemaName}.records`)} + ${where} + ORDER BY last_updated_block_number ASC + ) + SELECT + (SELECT COUNT(*) FROM records) AS "totalRows", + * + FROM config, records + `; +} diff --git a/packages/store-indexer/src/postgres/recordToLog.ts b/packages/store-indexer/src/postgres/recordToLog.ts new file mode 100644 index 0000000000..21f7d720a7 --- /dev/null +++ b/packages/store-indexer/src/postgres/recordToLog.ts @@ -0,0 +1,17 @@ +import { StorageAdapterLog } from "@latticexyz/store-sync"; +import { decodeDynamicField } from "@latticexyz/protocol-parser"; +import { RecordData } from "./common"; + +export function recordToLog(record: RecordData): StorageAdapterLog & { eventName: "Store_SetRecord" } { + return { + address: record.address, + eventName: "Store_SetRecord", + args: { + tableId: record.tableId, + keyTuple: decodeDynamicField("bytes32[]", record.keyBytes), + staticData: record.staticData ?? "0x", + encodedLengths: record.encodedLengths ?? "0x", + dynamicData: record.dynamicData ?? "0x", + }, + } as const; +} diff --git a/packages/store-indexer/src/sqlite/apiRoutes.ts b/packages/store-indexer/src/sqlite/apiRoutes.ts new file mode 100644 index 0000000000..c83e87ef0e --- /dev/null +++ b/packages/store-indexer/src/sqlite/apiRoutes.ts @@ -0,0 +1,47 @@ +import { Middleware } from "koa"; +import Router from "@koa/router"; +import compose from "koa-compose"; +import { input } from "@latticexyz/store-sync/indexer-client"; +import { storeTables, tablesWithRecordsToLogs } from "@latticexyz/store-sync"; +import { debug } from "../debug"; +import { createBenchmark } from "@latticexyz/common"; +import { compress } from "../compress"; +import { getTablesWithRecords } from "./getTablesWithRecords"; +import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; + +export function apiRoutes(database: BaseSQLiteDatabase<"sync", any>): Middleware { + const router = new Router(); + + router.get("/api/logs", compress(), async (ctx) => { + const benchmark = createBenchmark("sqlite:logs"); + + let options: ReturnType; + + try { + options = input.parse(typeof ctx.query.input === "string" ? JSON.parse(ctx.query.input) : {}); + } catch (error) { + ctx.status = 400; + ctx.body = JSON.stringify(error); + debug(error); + return; + } + + try { + options.filters = options.filters.length > 0 ? [...options.filters, { tableId: storeTables.Tables.tableId }] : []; + benchmark("parse config"); + const { blockNumber, tables } = getTablesWithRecords(database, options); + benchmark("query tables with records"); + const logs = tablesWithRecordsToLogs(tables); + benchmark("convert records to logs"); + + ctx.body = JSON.stringify({ blockNumber: blockNumber?.toString() ?? "-1", logs }); + ctx.status = 200; + } catch (error) { + ctx.status = 500; + ctx.body = JSON.stringify(error); + debug(error); + } + }); + + return compose([router.routes(), router.allowedMethods()]) as Middleware; +} diff --git a/packages/store-sync/package.json b/packages/store-sync/package.json index 048c5dde88..d7fc0c3ce6 100644 --- a/packages/store-sync/package.json +++ b/packages/store-sync/package.json @@ -11,6 +11,7 @@ "type": "module", "exports": { ".": "./dist/index.js", + "./indexer-client": "./dist/indexer-client/index.js", "./postgres": "./dist/postgres/index.js", "./postgres-decoded": "./dist/postgres-decoded/index.js", "./recs": "./dist/recs/index.js", @@ -23,6 +24,9 @@ "index": [ "./src/index.ts" ], + "indexer-client": [ + "./src/indexer-client/index.ts" + ], "postgres": [ "./src/postgres/index.ts" ], diff --git a/packages/store-sync/src/getSnapshot.ts b/packages/store-sync/src/getSnapshot.ts index 79ddcf90cc..c79f2df255 100644 --- a/packages/store-sync/src/getSnapshot.ts +++ b/packages/store-sync/src/getSnapshot.ts @@ -1,8 +1,10 @@ import { StorageAdapterBlock, SyncOptions } from "./common"; import { debug as parentDebug } from "./debug"; -import { createIndexerClient } from "./trpc-indexer"; import { TRPCClientError } from "@trpc/client"; import { tablesWithRecordsToLogs } from "./tablesWithRecordsToLogs"; +import { createIndexerClient as createTrpcIndexerClient } from "./trpc-indexer"; +import { createIndexerClient } from "./indexer-client"; +import { isOk } from "@latticexyz/common"; const debug = parentDebug.extend("getSnapshot"); @@ -39,17 +41,26 @@ export async function getSnapshot({ if (!indexerUrl) return; - const indexer = createIndexerClient({ url: indexerUrl }); + const indexerOrigin = new URL(indexerUrl).origin; + const indexer = createIndexerClient({ url: indexerOrigin }); + const trpcIndexer = createTrpcIndexerClient({ url: `${indexerOrigin}/trpc` }); + + debug("fetching logs from indexer via get", indexerUrl); + const result = await indexer.getLogs({ chainId, address, filters }); + if (isOk(result)) return result.ok; + console.warn(result.error); try { - debug("fetching logs from indexer", indexerUrl); - return await indexer.getLogs.query({ chainId, address, filters }); + // Backwards compatibility with older indexers + // TODO: remove in the future + debug("fetching logs from indexer via trpc", indexerUrl); + return await trpcIndexer.getLogs.query({ chainId, address, filters }); } catch (error) { if (error instanceof TRPCClientError) { // Backwards compatibility with older indexers // TODO: remove in the future debug("failed to fetch logs, fetching table records instead", indexerUrl); - const result = await indexer.findAll.query({ chainId, address, filters }); + const result = await trpcIndexer.findAll.query({ chainId, address, filters }); // warn after we fetch from old endpoint so we know that the indexer is accessible console.warn( `The indexer at ${indexerUrl} appears to be outdated. Consider upgrading to a recent version for better performance.` diff --git a/packages/store-sync/src/indexer-client/createIndexerClient.ts b/packages/store-sync/src/indexer-client/createIndexerClient.ts new file mode 100644 index 0000000000..6c19bfb0ac --- /dev/null +++ b/packages/store-sync/src/indexer-client/createIndexerClient.ts @@ -0,0 +1,47 @@ +import { z } from "zod"; +import { input } from "./input"; +import { StorageAdapterBlock } from "../common"; +import { Result } from "@latticexyz/common"; + +type CreateIndexerClientOptions = { + /** + * Indexer endpoint URL like `https://indexer.holesky.redstone.xyz`. + */ + url: string; +}; + +type IndexerClient = { + getLogs: (opts: z.input) => Promise>; +}; + +/** + * Creates a client to talk to a MUD indexer. + * + * @param {CreateIndexerClientOptions} options See `CreateIndexerClientOptions`. + * @returns {IndexerClient} A typed indexer client. + */ +export function createIndexerClient({ url }: CreateIndexerClientOptions): IndexerClient { + return { + getLogs: async (opts): Promise> => { + try { + const input = encodeURIComponent(JSON.stringify(opts)); + const urlOrigin = new URL(url).origin; + const response = await fetch(`${urlOrigin}/api/logs?input=${input}`, { method: "GET" }); + + // TODO: return a readable stream instead of fetching the entire response at once + const result = await response.json(); + if (!isStorageAdapterBlock(result)) { + return { error: result }; + } + + return { ok: { ...result, blockNumber: BigInt(result.blockNumber) } }; + } catch (error) { + return { error }; + } + }, + }; +} + +function isStorageAdapterBlock(data: any): data is Omit & { blockNumber: string } { + return data && typeof data.blockNumber === "string" && Array.isArray(data.logs); +} diff --git a/packages/store-sync/src/indexer-client/index.ts b/packages/store-sync/src/indexer-client/index.ts new file mode 100644 index 0000000000..7036d5e4df --- /dev/null +++ b/packages/store-sync/src/indexer-client/index.ts @@ -0,0 +1,2 @@ +export * from "./createIndexerClient"; +export * from "./input"; diff --git a/packages/store-sync/src/indexer-client/input.ts b/packages/store-sync/src/indexer-client/input.ts new file mode 100644 index 0000000000..14107f1faa --- /dev/null +++ b/packages/store-sync/src/indexer-client/input.ts @@ -0,0 +1,16 @@ +import { isHex } from "viem"; +import { z } from "zod"; + +export const input = z.object({ + chainId: z.number(), + address: z.string().refine(isHex).optional(), + filters: z + .array( + z.object({ + tableId: z.string().refine(isHex), + key0: z.string().refine(isHex).optional(), + key1: z.string().refine(isHex).optional(), + }) + ) + .default([]), +}); diff --git a/packages/store-sync/src/postgres/index.ts b/packages/store-sync/src/postgres/index.ts index 28ae0479f9..bc1d77da97 100644 --- a/packages/store-sync/src/postgres/index.ts +++ b/packages/store-sync/src/postgres/index.ts @@ -4,4 +4,5 @@ export * from "./createStorageAdapter"; export * from "./setupTables"; export * from "./shouldCleanDatabase"; export * from "./syncToPostgres"; +export * from "./transformSchemaName"; export * from "./tables"; diff --git a/packages/store-sync/src/trpc-indexer/createAppRouter.ts b/packages/store-sync/src/trpc-indexer/createAppRouter.ts index 9090c6fa1c..773810c0ea 100644 --- a/packages/store-sync/src/trpc-indexer/createAppRouter.ts +++ b/packages/store-sync/src/trpc-indexer/createAppRouter.ts @@ -1,9 +1,11 @@ -import { z } from "zod"; import { QueryAdapter } from "./common"; -import { isHex } from "viem"; import { initTRPC } from "@trpc/server"; import superjson from "superjson"; +import { input } from "../indexer-client/input"; +/** + * @deprecated + */ // eslint-disable-next-line @typescript-eslint/explicit-function-return-type export function createAppRouter() { const t = initTRPC.context<{ queryAdapter: QueryAdapter }>().create({ @@ -11,49 +13,17 @@ export function createAppRouter() { }); return t.router({ - getLogs: t.procedure - .input( - z.object({ - chainId: z.number(), - address: z.string().refine(isHex).optional(), - filters: z - .array( - z.object({ - tableId: z.string().refine(isHex), - key0: z.string().refine(isHex).optional(), - key1: z.string().refine(isHex).optional(), - }) - ) - .optional(), - }) - ) - .query(async (opts): ReturnType => { - const { queryAdapter } = opts.ctx; - const { chainId, address, filters } = opts.input; - return queryAdapter.getLogs({ chainId, address, filters }); - }), + getLogs: t.procedure.input(input).query(async (opts): ReturnType => { + const { queryAdapter } = opts.ctx; + const { chainId, address, filters } = opts.input; + return queryAdapter.getLogs({ chainId, address, filters }); + }), - findAll: t.procedure - .input( - z.object({ - chainId: z.number(), - address: z.string().refine(isHex).optional(), - filters: z - .array( - z.object({ - tableId: z.string().refine(isHex), - key0: z.string().refine(isHex).optional(), - key1: z.string().refine(isHex).optional(), - }) - ) - .optional(), - }) - ) - .query(async (opts): ReturnType => { - const { queryAdapter } = opts.ctx; - const { chainId, address, filters } = opts.input; - return queryAdapter.findAll({ chainId, address, filters }); - }), + findAll: t.procedure.input(input).query(async (opts): ReturnType => { + const { queryAdapter } = opts.ctx; + const { chainId, address, filters } = opts.input; + return queryAdapter.findAll({ chainId, address, filters }); + }), }); } diff --git a/packages/store-sync/src/trpc-indexer/createIndexerClient.ts b/packages/store-sync/src/trpc-indexer/createIndexerClient.ts index 89a804179f..4d8dffcaa0 100644 --- a/packages/store-sync/src/trpc-indexer/createIndexerClient.ts +++ b/packages/store-sync/src/trpc-indexer/createIndexerClient.ts @@ -14,6 +14,7 @@ type CreateIndexerClientOptions = { * * @param {CreateIndexerClientOptions} options See `CreateIndexerClientOptions`. * @returns {CreateTRPCProxyClient} A typed tRPC client. + * @deprecated Use the `createIndexerClient` export from `@latticexyz/store-sync/indexer-client */ export function createIndexerClient({ url }: CreateIndexerClientOptions): CreateTRPCProxyClient { return createTRPCProxyClient({ diff --git a/packages/store-sync/tsup.config.ts b/packages/store-sync/tsup.config.ts index 15719d50b0..184fc6c6ff 100644 --- a/packages/store-sync/tsup.config.ts +++ b/packages/store-sync/tsup.config.ts @@ -8,6 +8,7 @@ export default defineConfig({ "src/postgres-decoded/index.ts", "src/recs/index.ts", "src/trpc-indexer/index.ts", + "src/indexer-client/index.ts", "src/zustand/index.ts", ], target: "esnext", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index eeb4e563fb..780b594b6d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -851,6 +851,9 @@ importers: '@trpc/server': specifier: 10.34.0 version: 10.34.0 + accepts: + specifier: ^1.3.8 + version: 1.3.8 better-sqlite3: specifier: ^8.6.0 version: 8.6.0 @@ -866,8 +869,11 @@ importers: koa: specifier: ^2.14.2 version: 2.14.2 + koa-compose: + specifier: ^4.1.0 + version: 4.1.0 postgres: - specifier: ^3.3.5 + specifier: 3.3.5 version: 3.3.5 rxjs: specifier: 7.5.5 @@ -885,6 +891,9 @@ importers: specifier: ^3.21.4 version: 3.21.4 devDependencies: + '@types/accepts': + specifier: ^1.3.7 + version: 1.3.7 '@types/better-sqlite3': specifier: ^7.6.4 version: 7.6.4 @@ -897,6 +906,9 @@ importers: '@types/koa': specifier: ^2.13.12 version: 2.13.12 + '@types/koa-compose': + specifier: ^3.2.8 + version: 3.2.8 '@types/koa__cors': specifier: ^4.0.3 version: 4.0.3 @@ -7573,7 +7585,7 @@ packages: jest-util: 29.5.0 jest-validate: 29.5.0 prompts: 2.4.2 - yargs: 17.7.1 + yargs: 17.7.2 transitivePeerDependencies: - '@types/node' - supports-color