Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store-indexer): separate postgres indexer/frontend services #1887

Merged
merged 8 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/large-hounds-type.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@latticexyz/store-indexer": major
---

Separated frontend server and indexer service for Postgres indexer. Now you can run the Postgres indexer with one writer and many readers.

If you were previously using the `postgres-indexer` binary, you'll now need to run both `postgres-indexer` and `postgres-frontend`.
37 changes: 22 additions & 15 deletions packages/store-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,47 @@ npm install @latticexyz/store-indexer

npm sqlite-indexer
# or
npm postgres-indexer
npm postgres-indexer & npm postgres-frontend
```

or execute the one of the package bins directly:

```sh
npx -p @latticexyz/store-indexer sqlite-indexer
# or
npx -p @latticexyz/store-indexer postgres-indexer
npx -p @latticexyz/store-indexer postgres-indexer & npx -p @latticexyz/store-indexer postgres-frontend
```

## Configuration

Each indexer can be configured with environment variables.

### Common environment variables
### Common environment variables for indexer

| Variable | Description | Default |
| ------------------ | ---------------------------------------------------------- | --------- |
| `HOST` | Host that the indexer server listens on | `0.0.0.0` |
| `PORT` | Port that the indexer server listens on | `3001` |
| `RPC_HTTP_URL` | HTTP URL for Ethereum RPC to fetch data from | |
| `RPC_WS_URL` | WebSocket URL for Ethereum RPC to fetch data from | |
| `START_BLOCK` | Block number to start indexing from | `0` |
| `MAX_BLOCK_RANGE` | Maximum number of blocks to fetch from the RPC per request | `1000` |
| `POLLING_INTERVAL` | How often to poll for new blocks (in milliseconds) | `1000` |
| Variable | Description | Default |
| ------------------ | ---------------------------------------------------------- | ------- |
| `RPC_HTTP_URL` | HTTP URL for Ethereum RPC to fetch data from | |
| `RPC_WS_URL` | WebSocket URL for Ethereum RPC to fetch data from | |
| `START_BLOCK` | Block number to start indexing from | `0` |
| `MAX_BLOCK_RANGE` | Maximum number of blocks to fetch from the RPC per request | `1000` |
| `POLLING_INTERVAL` | How often to poll for new blocks (in milliseconds) | `1000` |

Note that you only need one of `RPC_HTTP_URL` or `RPC_WS_URL`, but we recommend both. The WebSocket URL will be prioritized and fall back to the HTTP URL if there are any connection issues.

### Common environment variables for frontend

| Variable | Description | Default |
| -------- | ------------------------------------------------ | --------- |
| `HOST` | Host that the indexer frontend server listens on | `0.0.0.0` |
| `PORT` | Port that the indexer frontend server listens on | `3001` |

### Postgres indexer environment variables

| Variable | Description | Default |
| -------------- | ----------------------- | ------- |
| `DATABASE_URL` | Postgres connection URL | |
| Variable | Description | Default |
| ------------------ | --------------------------------------------------- | ------- |
| `DATABASE_URL` | Postgres connection URL | |
| `HEALTHCHECK_HOST` | Host that the indexer healthcheck server listens on | |
| `HEATHCHECK_PORT` | Port that the indexer healthcheck server listens on | |

### SQLite indexer environment variables

Expand Down
16 changes: 8 additions & 8 deletions packages/store-indexer/bin/parseEnv.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { z, ZodError, ZodIntersection, ZodTypeAny } from "zod";
import { z, ZodError, ZodTypeAny } from "zod";

const commonSchema = z.intersection(
export const frontendEnvSchema = z.object({
HOST: z.string().default("0.0.0.0"),
PORT: z.coerce.number().positive().default(3001),
});

export const indexerEnvSchema = z.intersection(
z.object({
HOST: z.string().default("0.0.0.0"),
PORT: z.coerce.number().positive().default(3001),
START_BLOCK: z.coerce.bigint().nonnegative().default(0n),
MAX_BLOCK_RANGE: z.coerce.bigint().positive().default(1000n),
POLLING_INTERVAL: z.coerce.number().positive().default(1000),
Expand All @@ -20,10 +23,7 @@ const commonSchema = z.intersection(
])
);

export function parseEnv<TSchema extends ZodTypeAny | undefined = undefined>(
schema?: TSchema
): z.infer<TSchema extends ZodTypeAny ? ZodIntersection<typeof commonSchema, TSchema> : typeof commonSchema> {
const envSchema = schema !== undefined ? z.intersection(commonSchema, schema) : commonSchema;
export function parseEnv<TSchema extends ZodTypeAny>(envSchema: TSchema): z.infer<TSchema> {
try {
return envSchema.parse(process.env);
} catch (error) {
Expand Down
46 changes: 46 additions & 0 deletions packages/store-indexer/bin/postgres-frontend.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env node
import "dotenv/config";
import { z } from "zod";
import fastify from "fastify";
import { fastifyTRPCPlugin } from "@trpc/server/adapters/fastify";
import { AppRouter, createAppRouter } from "@latticexyz/store-sync/trpc-indexer";
import { createQueryAdapter } from "../src/postgres/createQueryAdapter";
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";
import { frontendEnvSchema, parseEnv } from "./parseEnv";

const env = parseEnv(
z.intersection(
frontendEnvSchema,
z.object({
DATABASE_URL: z.string(),
})
)
);

const database = drizzle(postgres(env.DATABASE_URL));

// @see https://fastify.dev/docs/latest/
const server = fastify({
maxParamLength: 5000,
});

await server.register(import("@fastify/cors"));

// k8s healthchecks
server.get("/healthz", (req, res) => res.code(200).send());
server.get("/readyz", (req, res) => res.code(200).send());

// @see https://trpc.io/docs/server/adapters/fastify
server.register(fastifyTRPCPlugin<AppRouter>, {
prefix: "/trpc",
trpcOptions: {
router: createAppRouter(),
createContext: async () => ({
queryAdapter: await createQueryAdapter(database),
}),
},
});

await server.listen({ host: env.HOST, port: env.PORT });
console.log(`postgres indexer frontend listening on http://${env.HOST}:${env.PORT}`);
54 changes: 21 additions & 33 deletions packages/store-indexer/bin/postgres-indexer.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
#!/usr/bin/env node
import "dotenv/config";
import { z } from "zod";
import { DefaultLogger, eq } from "drizzle-orm";
import { eq } from "drizzle-orm";
import { createPublicClient, fallback, webSocket, http, Transport } from "viem";
import fastify from "fastify";
import { fastifyTRPCPlugin } from "@trpc/server/adapters/fastify";
import { AppRouter, createAppRouter } from "@latticexyz/store-sync/trpc-indexer";
import { createQueryAdapter } from "../src/postgres/createQueryAdapter";
import { isDefined } from "@latticexyz/common/utils";
import { combineLatest, filter, first } from "rxjs";
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";
import { cleanDatabase, postgresStorage, schemaVersion } from "@latticexyz/store-sync/postgres";
import { createStoreSync } from "@latticexyz/store-sync";
import { parseEnv } from "./parseEnv";
import { indexerEnvSchema, parseEnv } from "./parseEnv";

const env = parseEnv(
z.object({
DATABASE_URL: z.string(),
})
z.intersection(
indexerEnvSchema,
z.object({
DATABASE_URL: z.string(),
HEALTHCHECK_HOST: z.string().optional(),
HEALTHCHECK_PORT: z.coerce.number().optional(),
})
)
);

const transports: Transport[] = [
Expand All @@ -34,9 +35,7 @@ const publicClient = createPublicClient({
});

const chainId = await publicClient.getChainId();
const database = drizzle(postgres(env.DATABASE_URL), {
logger: new DefaultLogger(),
});
const database = drizzle(postgres(env.DATABASE_URL));

const { storageAdapter, internalTables } = await postgresStorage({ database, publicClient });

Expand Down Expand Up @@ -93,27 +92,16 @@ combineLatest([latestBlockNumber$, storedBlockLogs$])
console.log("all caught up");
});

// @see https://fastify.dev/docs/latest/
const server = fastify({
maxParamLength: 5000,
});
if (env.HEALTHCHECK_HOST != null || env.HEALTHCHECK_PORT != null) {
const { default: fastify } = await import("fastify");

await server.register(import("@fastify/cors"));
const server = fastify();

// k8s healthchecks
server.get("/healthz", (req, res) => res.code(200).send());
server.get("/readyz", (req, res) => (isCaughtUp ? res.code(200).send("ready") : res.code(424).send("backfilling")));
// k8s healthchecks
server.get("/healthz", (req, res) => res.code(200).send());
server.get("/readyz", (req, res) => (isCaughtUp ? res.code(200).send("ready") : res.code(424).send("backfilling")));

// @see https://trpc.io/docs/server/adapters/fastify
server.register(fastifyTRPCPlugin<AppRouter>, {
prefix: "/trpc",
trpcOptions: {
router: createAppRouter(),
createContext: async () => ({
queryAdapter: await createQueryAdapter(database),
}),
},
});

await server.listen({ host: env.HOST, port: env.PORT });
console.log(`indexer server listening on http://${env.HOST}:${env.PORT}`);
server.listen({ host: env.HEALTHCHECK_HOST, port: env.HEALTHCHECK_PORT }, (error, address) => {
console.log(`postgres indexer healthcheck server listening on ${address}`);
});
}
13 changes: 8 additions & 5 deletions packages/store-indexer/bin/sqlite-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ import { chainState, schemaVersion, syncToSqlite } from "@latticexyz/store-sync/
import { createQueryAdapter } from "../src/sqlite/createQueryAdapter";
import { isDefined } from "@latticexyz/common/utils";
import { combineLatest, filter, first } from "rxjs";
import { parseEnv } from "./parseEnv";
import { frontendEnvSchema, indexerEnvSchema, parseEnv } from "./parseEnv";

const env = parseEnv(
z.object({
SQLITE_FILENAME: z.string().default("indexer.db"),
})
z.intersection(
z.intersection(indexerEnvSchema, frontendEnvSchema),
z.object({
SQLITE_FILENAME: z.string().default("indexer.db"),
})
)
);

const transports: Transport[] = [
Expand Down Expand Up @@ -106,4 +109,4 @@ server.register(fastifyTRPCPlugin<AppRouter>, {
});

await server.listen({ host: env.HOST, port: env.PORT });
console.log(`indexer server listening on http://${env.HOST}:${env.PORT}`);
console.log(`sqlite indexer frontend listening on http://${env.HOST}:${env.PORT}`);
4 changes: 3 additions & 1 deletion packages/store-indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
},
"types": "src/index.ts",
"bin": {
"postgres-frontend": "./dist/bin/postgres-frontend.js",
"postgres-indexer": "./dist/bin/postgres-indexer.js",
"sqlite-indexer": "./dist/bin/sqlite-indexer.js"
},
Expand All @@ -24,7 +25,7 @@
"clean:js": "rimraf dist",
"dev": "tsup --watch",
"lint": "eslint .",
"start:postgres": "tsx bin/postgres-indexer",
"start:postgres": "concurrently -n indexer,frontend -c cyan,magenta 'tsx bin/postgres-indexer' 'tsx bin/postgres-frontend'",
"start:postgres:local": "DEBUG=mud:store-sync:createStoreSync DATABASE_URL=postgres://127.0.0.1/postgres RPC_HTTP_URL=http://127.0.0.1:8545 pnpm start:postgres",
"start:postgres:testnet": "DEBUG=mud:store-sync:createStoreSync DATABASE_URL=postgres://127.0.0.1/postgres RPC_HTTP_URL=https://follower.testnet-chain.linfra.xyz pnpm start:postgres",
"start:sqlite": "tsx bin/sqlite-indexer",
Expand Down Expand Up @@ -58,6 +59,7 @@
"@types/better-sqlite3": "^7.6.4",
"@types/cors": "^2.8.13",
"@types/debug": "^4.1.7",
"concurrently": "^8.2.2",
"tsup": "^6.7.0",
"tsx": "^3.12.6",
"vitest": "0.31.4"
Expand Down
2 changes: 1 addition & 1 deletion packages/store-indexer/tsup.config.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { defineConfig } from "tsup";

export default defineConfig({
entry: ["src/index.ts", "bin/postgres-indexer.ts", "bin/sqlite-indexer.ts"],
entry: ["src/index.ts", "bin/postgres-frontend.ts", "bin/postgres-indexer.ts", "bin/sqlite-indexer.ts"],
target: "esnext",
format: ["esm"],
dts: false,
Expand Down
Loading
Loading