Skip to content

Commit

Permalink
postgres indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
holic committed Aug 23, 2023
1 parent 8213907 commit 7b93807
Show file tree
Hide file tree
Showing 11 changed files with 237 additions and 36 deletions.
136 changes: 136 additions & 0 deletions packages/store-indexer/bin/postgres-indexer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import fs from "node:fs";
import { z } from "zod";
import { DefaultLogger, eq } from "drizzle-orm";
import { createPublicClient, fallback, webSocket, http, Transport } from "viem";
import fastify from "fastify";
import { fastifyTRPCPlugin } from "@trpc/server/adapters/fastify";
import { createAppRouter } from "@latticexyz/store-sync/trpc-indexer";
import { createStorageAdapter } from "../src/postgres/createStorageAdapter";
import type { Chain } from "viem/chains";
import * as mudChains from "@latticexyz/common/chains";
import * as chains from "viem/chains";
import { isNotNull } from "@latticexyz/common/utils";
import { combineLatest, filter, first } from "rxjs";
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";
import { createInternalTables, schemaVersion, syncToPostgres } from "@latticexyz/store-sync/postgres";

const possibleChains = Object.values({ ...mudChains, ...chains }) as Chain[];

// TODO: refine zod type to be either CHAIN_ID or RPC_HTTP_URL/RPC_WS_URL
const env = z
.object({
CHAIN_ID: z.coerce.number().positive().optional(),
RPC_HTTP_URL: z.string().optional(),
RPC_WS_URL: z.string().optional(),
START_BLOCK: z.coerce.bigint().nonnegative().default(0n),
MAX_BLOCK_RANGE: z.coerce.bigint().positive().default(1000n),
PORT: z.coerce.number().positive().default(3001),
DATABASE_URL: z.string(),
})
.parse(process.env, {
errorMap: (issue) => ({
message: `Missing or invalid environment variable: ${issue.path.join(".")}`,
}),
});

const chain = env.CHAIN_ID != null ? possibleChains.find((c) => c.id === env.CHAIN_ID) : undefined;
if (env.CHAIN_ID != null && !chain) {
console.warn(`No chain found for chain ID ${env.CHAIN_ID}`);
}

const transports: Transport[] = [
env.RPC_WS_URL ? webSocket(env.RPC_WS_URL) : null,
env.RPC_HTTP_URL ? http(env.RPC_HTTP_URL) : null,
].filter(isNotNull);

const publicClient = createPublicClient({
chain,
transport: fallback(
// If one or more RPC URLs are provided, we'll configure the transport with only those RPC URLs
transports.length > 0
? transports
: // Otherwise use the chain defaults
[webSocket(), http()]
),
pollingInterval: 1000,
});

// Fetch the chain ID from the RPC if no chain object was found for the provided chain ID.
// We do this to match the downstream logic, which also attempts to find the chain ID.
const chainId = chain?.id ?? (await publicClient.getChainId());

const database = drizzle(postgres(env.DATABASE_URL), {
logger: new DefaultLogger(),
});

let startBlock = env.START_BLOCK;

// Resume from latest block stored in DB. This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error.
try {
const internalTables = createInternalTables();
const currentChainStates = await database
.select()
.from(internalTables.chain)
.where(eq(internalTables.chain.chainId, chainId))
.execute();
// TODO: replace this type workaround with `noUncheckedIndexedAccess: true` when we can fix all the issues related (https://github.com/latticexyz/mud/issues/1212)
const currentChainState: (typeof currentChainStates)[number] | undefined = currentChainStates[0];

if (currentChainState != null) {
if (currentChainState.schemaVersion != schemaVersion) {
console.log(
"schema version changed from",
currentChainState.schemaVersion,
"to",
schemaVersion,
"recreating database"
);
fs.truncateSync(env.DATABASE_URL);
} else if (currentChainState.lastUpdatedBlockNumber != null) {
console.log("resuming from block number", currentChainState.lastUpdatedBlockNumber + 1n);
startBlock = currentChainState.lastUpdatedBlockNumber + 1n;
}
}
} catch (error) {
// ignore errors, this is optional
}

const { latestBlockNumber$, blockStorageOperations$ } = await syncToPostgres({
database,
publicClient,
startBlock,
maxBlockRange: env.MAX_BLOCK_RANGE,
});

combineLatest([latestBlockNumber$, blockStorageOperations$])
.pipe(
filter(
([latestBlockNumber, { blockNumber: lastBlockNumberProcessed }]) => latestBlockNumber === lastBlockNumberProcessed
),
first()
)
.subscribe(() => {
console.log("all caught up");
});

// @see https://fastify.dev/docs/latest/
const server = fastify({
maxParamLength: 5000,
});

await server.register(import("@fastify/cors"));

// @see https://trpc.io/docs/server/adapters/fastify
server.register(fastifyTRPCPlugin, {
prefix: "/trpc",
trpcOptions: {
router: createAppRouter(),
createContext: async () => ({
storageAdapter: await createStorageAdapter(database),
}),
},
});

await server.listen({ port: env.PORT });
console.log(`indexer server listening on http://127.0.0.1:${env.PORT}`);
1 change: 0 additions & 1 deletion packages/store-indexer/bin/sqlite-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import * as mudChains from "@latticexyz/common/chains";
import * as chains from "viem/chains";
import { isNotNull } from "@latticexyz/common/utils";
import { combineLatest, filter, first } from "rxjs";
import { debug } from "../src/debug";

const possibleChains = Object.values({ ...mudChains, ...chains }) as Chain[];

Expand Down
9 changes: 5 additions & 4 deletions packages/store-indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
"clean:js": "rimraf dist",
"dev": "tsup --watch",
"lint": "eslint .",
"start": "tsx bin/sqlite-indexer",
"start:local": "SQLITE_FILENAME=anvil.db CHAIN_ID=31337 pnpm start",
"start:testnet": "SQLITE_FILENAME=testnet.db CHAIN_ID=4242 START_BLOCK=19037160 pnpm start",
"start:testnet2": "SQLITE_FILENAME=testnet2.db CHAIN_ID=4243 pnpm start",
"start": "tsx bin/postgres-indexer",
"start:local": "DATABASE_URL=http://127.0.0.1/postgres CHAIN_ID=31337 pnpm start",
"start:testnet": "DATABASE_URL=http://127.0.0.1/postgres CHAIN_ID=4242 START_BLOCK=19037160 pnpm start",
"start:testnet2": "DATABASE_URL=http://127.0.0.1/postgres CHAIN_ID=4243 pnpm start",
"test": "tsc --noEmit --skipLibCheck"
},
"dependencies": {
Expand All @@ -39,6 +39,7 @@
"debug": "^4.3.4",
"drizzle-orm": "^0.27.0",
"fastify": "^4.21.0",
"postgres": "^3.3.5",
"rxjs": "7.5.5",
"superjson": "^1.12.4",
"viem": "1.6.0",
Expand Down
54 changes: 54 additions & 0 deletions packages/store-indexer/src/postgres/createStorageAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { eq } from "drizzle-orm";
import { PgDatabase } from "drizzle-orm/pg-core";
import { createTable, createInternalTables, getTables } from "@latticexyz/store-sync/postgres";
import { StorageAdapter } from "@latticexyz/store-sync/trpc-indexer";
import { debug } from "../debug";
import { getAddress } from "viem";

/**
* Creates a storage adapter for the tRPC server/client to query data from Postgres.
*
* @param {PgDatabase<any>} database Postgres database object from Drizzle
* @returns {Promise<StorageAdapter>} A set of methods used by tRPC endpoints.
*/
export async function createStorageAdapter(database: PgDatabase<any>): Promise<StorageAdapter> {
const adapter: StorageAdapter = {
async findAll(chainId, address) {
const internalTables = createInternalTables();
const tables = (await getTables(database)).filter(
(table) => address != null && getAddress(address) === getAddress(table.address)
);

const tablesWithRecords = await Promise.all(
tables.map(async (table) => {
const sqliteTable = createTable(table);
const records = await database.select().from(sqliteTable).where(eq(sqliteTable.__isDeleted, false)).execute();
return {
...table,
records: records.map((record) => ({
key: Object.fromEntries(Object.entries(table.keySchema).map(([name]) => [name, record[name]])),
value: Object.fromEntries(Object.entries(table.valueSchema).map(([name]) => [name, record[name]])),
})),
};
})
);

const metadata = await database
.select()
.from(internalTables.chain)
.where(eq(internalTables.chain.chainId, chainId))
.execute();
const { lastUpdatedBlockNumber } = metadata[0] ?? {};

const result = {
blockNumber: lastUpdatedBlockNumber ?? null,
tables: tablesWithRecords,
};

debug("findAll", chainId, address, result);

return result;
},
};
return adapter;
}
10 changes: 7 additions & 3 deletions packages/store-sync/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,25 @@
"type": "module",
"exports": {
".": "./dist/index.js",
"./sqlite": "./dist/sqlite/index.js",
"./postgres": "./dist/postgres/index.js",
"./recs": "./dist/recs/index.js",
"./sqlite": "./dist/sqlite/index.js",
"./trpc-indexer": "./dist/trpc-indexer/index.js"
},
"typesVersions": {
"*": {
"index": [
"./src/index.ts"
],
"sqlite": [
"./src/sqlite/index.ts"
"postgres": [
"./src/postgres/index.ts"
],
"recs": [
"./src/recs/index.ts"
],
"sqlite": [
"./src/sqlite/index.ts"
],
"trpc-indexer": [
"./src/trpc-indexer/index.ts"
]
Expand Down
3 changes: 2 additions & 1 deletion packages/store-sync/src/postgres/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export * from "./getTables";
export * from "./createInternalTables";
export * from "./schemaVersion";
export * from "./postgresStorage";
export * from "./syncToSqlite";
export * from "./setupTables";
export * from "./syncToPostgres";
8 changes: 4 additions & 4 deletions packages/store-sync/src/postgres/postgresStorage.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { postgresStorage } from "./postgresStorage";
import { getTables } from "./getTables";
import { PgDatabase, QueryResultHKT } from "drizzle-orm/pg-core";
import { DefaultLogger } from "drizzle-orm";
import { createTable } from "./createTable";
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";
import { createPublicClient, http } from "viem";
import { foundry } from "viem/chains";
import { blockLogsToStorage } from "../blockLogsToStorage";
import postgres from "postgres";
import * as transformSchemaNameExports from "./transformSchemaName";
import { getTables } from "./getTables";
import { postgresStorage } from "./postgresStorage";
import { createTable } from "./createTable";

vi.spyOn(transformSchemaNameExports, "transformSchemaName").mockImplementation(
(schemaName) => `${process.pid}_${process.env.VITEST_POOL_ID}__${schemaName}`
Expand Down
16 changes: 6 additions & 10 deletions packages/store-sync/src/postgres/postgresStorage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { PublicClient, concatHex, encodeAbiParameters } from "viem";
import { PgDatabase, QueryResultHKT } from "drizzle-orm/pg-core";
import { and, eq } from "drizzle-orm";
import { and, eq, inArray } from "drizzle-orm";
import { createTable } from "./createTable";
import { schemaToDefaults } from "../schemaToDefaults";
import { BlockLogsToStorageOptions } from "../blockLogsToStorage";
Expand All @@ -10,7 +10,6 @@ import { createInternalTables } from "./createInternalTables";
import { getTables } from "./getTables";
import { schemaVersion } from "./schemaVersion";
import { tableIdToHex } from "@latticexyz/common";
import { identity } from "@latticexyz/common/utils";
import { setupTables } from "./setupTables";
import { getTableKey } from "./getTableKey";

Expand Down Expand Up @@ -83,17 +82,14 @@ export async function postgresStorage<TConfig extends StoreConfig = StoreConfig>
const tables = await getTables(database, operations.map(getTableKey));

await database.transaction(async (tx) => {
for (const { address, namespace, name } of tables) {
const tablesWithOperations = tables.filter((table) =>
operations.some((op) => getTableKey(op) === getTableKey(table))
);
if (tablesWithOperations.length) {
await tx
.update(internalTables.tables)
.set({ lastUpdatedBlockNumber: blockNumber })
.where(
and(
eq(internalTables.tables.address, address),
eq(internalTables.tables.namespace, namespace),
eq(internalTables.tables.name, name)
)
)
.where(inArray(internalTables.tables.id, [...new Set(tablesWithOperations.map(getTableKey))]))
.execute();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import { StoreConfig } from "@latticexyz/store";
import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core";
import { PgDatabase } from "drizzle-orm/pg-core";
import { SyncOptions, SyncResult } from "../common";
import { sqliteStorage } from "./postgresStorage";
import { postgresStorage } from "./postgresStorage";
import { createStoreSync } from "../createStoreSync";

type SyncToSqliteOptions<TConfig extends StoreConfig = StoreConfig> = SyncOptions<TConfig> & {
type SyncToPostgresOptions<TConfig extends StoreConfig = StoreConfig> = SyncOptions<TConfig> & {
/**
* [SQLite database object from Drizzle][0].
* [Postgres database object from Drizzle][0].
*
* [0]: https://orm.drizzle.team/docs/installation-and-db-connection/sqlite/better-sqlite3
* [0]: https://orm.drizzle.team/docs/installation-and-db-connection/postgresql/postgresjs
*/
database: BaseSQLiteDatabase<"sync", any>;
database: PgDatabase<any>;
startSync?: boolean;
};

type SyncToSqliteResult<TConfig extends StoreConfig = StoreConfig> = SyncResult<TConfig> & {
type SyncToPostgresResult<TConfig extends StoreConfig = StoreConfig> = SyncResult<TConfig> & {
stopSync: () => void;
};

Expand All @@ -24,7 +24,7 @@ type SyncToSqliteResult<TConfig extends StoreConfig = StoreConfig> = SyncResult<
* @param {CreateIndexerOptions} options See `CreateIndexerOptions`.
* @returns A function to unsubscribe from the block stream, effectively stopping the indexer.
*/
export async function syncToSqlite<TConfig extends StoreConfig = StoreConfig>({
export async function syncToPostgres<TConfig extends StoreConfig = StoreConfig>({
config,
database,
publicClient,
Expand All @@ -34,9 +34,9 @@ export async function syncToSqlite<TConfig extends StoreConfig = StoreConfig>({
indexerUrl,
initialState,
startSync = true,
}: SyncToSqliteOptions<TConfig>): Promise<SyncToSqliteResult<TConfig>> {
}: SyncToPostgresOptions<TConfig>): Promise<SyncToPostgresResult<TConfig>> {
const storeSync = await createStoreSync({
storageAdapter: await sqliteStorage({ database, publicClient, config }),
storageAdapter: await postgresStorage({ database, publicClient, config }),
config,
address,
publicClient,
Expand Down
8 changes: 7 additions & 1 deletion packages/store-sync/tsup.config.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { defineConfig } from "tsup";

export default defineConfig({
entry: ["src/index.ts", "src/sqlite/index.ts", "src/recs/index.ts", "src/trpc-indexer/index.ts"],
entry: [
"src/index.ts",
"src/sqlite/index.ts",
"src/postgres/index.ts",
"src/recs/index.ts",
"src/trpc-indexer/index.ts",
],
target: "esnext",
format: ["esm"],
dts: false,
Expand Down
Loading

0 comments on commit 7b93807

Please sign in to comment.