Skip to content

Commit

Permalink
feat(store-indexer): separate postgres indexer/frontend services (#1887)
Browse files Browse the repository at this point in the history
  • Loading branch information
holic authored Nov 9, 2023
1 parent 3122cdf commit 5ecccfe
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 64 deletions.
9 changes: 9 additions & 0 deletions .changeset/large-hounds-type.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@latticexyz/store-indexer": major
---

Separated frontend server and indexer service for Postgres indexer. Now you can run the Postgres indexer with one writer and many readers.

If you were previously using the `postgres-indexer` binary, you'll now need to run both `postgres-indexer` and `postgres-frontend`.

For consistency, the Postgres database logs are now disabled by default. If you were using these, please let us know so we can add them back in with an environment variable flag.
37 changes: 22 additions & 15 deletions packages/store-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,47 @@ npm install @latticexyz/store-indexer

npm sqlite-indexer
# or
npm postgres-indexer
npm postgres-indexer & npm postgres-frontend
```

or execute the one of the package bins directly:

```sh
npx -p @latticexyz/store-indexer sqlite-indexer
# or
npx -p @latticexyz/store-indexer postgres-indexer
npx -p @latticexyz/store-indexer postgres-indexer & npx -p @latticexyz/store-indexer postgres-frontend
```

## Configuration

Each indexer can be configured with environment variables.

### Common environment variables
### Common environment variables for indexer

| Variable | Description | Default |
| ------------------ | ---------------------------------------------------------- | --------- |
| `HOST` | Host that the indexer server listens on | `0.0.0.0` |
| `PORT` | Port that the indexer server listens on | `3001` |
| `RPC_HTTP_URL` | HTTP URL for Ethereum RPC to fetch data from | |
| `RPC_WS_URL` | WebSocket URL for Ethereum RPC to fetch data from | |
| `START_BLOCK` | Block number to start indexing from | `0` |
| `MAX_BLOCK_RANGE` | Maximum number of blocks to fetch from the RPC per request | `1000` |
| `POLLING_INTERVAL` | How often to poll for new blocks (in milliseconds) | `1000` |
| Variable | Description | Default |
| ------------------ | ---------------------------------------------------------- | ------- |
| `RPC_HTTP_URL` | HTTP URL for Ethereum RPC to fetch data from | |
| `RPC_WS_URL` | WebSocket URL for Ethereum RPC to fetch data from | |
| `START_BLOCK` | Block number to start indexing from | `0` |
| `MAX_BLOCK_RANGE` | Maximum number of blocks to fetch from the RPC per request | `1000` |
| `POLLING_INTERVAL` | How often to poll for new blocks (in milliseconds) | `1000` |

Note that you only need one of `RPC_HTTP_URL` or `RPC_WS_URL`, but we recommend both. The WebSocket URL will be prioritized and fall back to the HTTP URL if there are any connection issues.

### Common environment variables for frontend

| Variable | Description | Default |
| -------- | ------------------------------------------------ | --------- |
| `HOST` | Host that the indexer frontend server listens on | `0.0.0.0` |
| `PORT` | Port that the indexer frontend server listens on | `3001` |

### Postgres indexer environment variables

| Variable | Description | Default |
| -------------- | ----------------------- | ------- |
| `DATABASE_URL` | Postgres connection URL | |
| Variable | Description | Default |
| ------------------ | --------------------------------------------------- | ------- |
| `DATABASE_URL` | Postgres connection URL | |
| `HEALTHCHECK_HOST` | Host that the indexer healthcheck server listens on | |
| `HEATHCHECK_PORT` | Port that the indexer healthcheck server listens on | |

### SQLite indexer environment variables

Expand Down
16 changes: 8 additions & 8 deletions packages/store-indexer/bin/parseEnv.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { z, ZodError, ZodIntersection, ZodTypeAny } from "zod";
import { z, ZodError, ZodTypeAny } from "zod";

const commonSchema = z.intersection(
export const frontendEnvSchema = z.object({
HOST: z.string().default("0.0.0.0"),
PORT: z.coerce.number().positive().default(3001),
});

export const indexerEnvSchema = z.intersection(
z.object({
HOST: z.string().default("0.0.0.0"),
PORT: z.coerce.number().positive().default(3001),
START_BLOCK: z.coerce.bigint().nonnegative().default(0n),
MAX_BLOCK_RANGE: z.coerce.bigint().positive().default(1000n),
POLLING_INTERVAL: z.coerce.number().positive().default(1000),
Expand All @@ -20,10 +23,7 @@ const commonSchema = z.intersection(
])
);

export function parseEnv<TSchema extends ZodTypeAny | undefined = undefined>(
schema?: TSchema
): z.infer<TSchema extends ZodTypeAny ? ZodIntersection<typeof commonSchema, TSchema> : typeof commonSchema> {
const envSchema = schema !== undefined ? z.intersection(commonSchema, schema) : commonSchema;
export function parseEnv<TSchema extends ZodTypeAny>(envSchema: TSchema): z.infer<TSchema> {
try {
return envSchema.parse(process.env);
} catch (error) {
Expand Down
46 changes: 46 additions & 0 deletions packages/store-indexer/bin/postgres-frontend.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env node
import "dotenv/config";
import { z } from "zod";
import fastify from "fastify";
import { fastifyTRPCPlugin } from "@trpc/server/adapters/fastify";
import { AppRouter, createAppRouter } from "@latticexyz/store-sync/trpc-indexer";
import { createQueryAdapter } from "../src/postgres/createQueryAdapter";
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";
import { frontendEnvSchema, parseEnv } from "./parseEnv";

const env = parseEnv(
z.intersection(
frontendEnvSchema,
z.object({
DATABASE_URL: z.string(),
})
)
);

const database = drizzle(postgres(env.DATABASE_URL));

// @see https://fastify.dev/docs/latest/
const server = fastify({
maxParamLength: 5000,
});

await server.register(import("@fastify/cors"));

// k8s healthchecks
server.get("/healthz", (req, res) => res.code(200).send());
server.get("/readyz", (req, res) => res.code(200).send());

// @see https://trpc.io/docs/server/adapters/fastify
server.register(fastifyTRPCPlugin<AppRouter>, {
prefix: "/trpc",
trpcOptions: {
router: createAppRouter(),
createContext: async () => ({
queryAdapter: await createQueryAdapter(database),
}),
},
});

await server.listen({ host: env.HOST, port: env.PORT });
console.log(`postgres indexer frontend listening on http://${env.HOST}:${env.PORT}`);
54 changes: 21 additions & 33 deletions packages/store-indexer/bin/postgres-indexer.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
#!/usr/bin/env node
import "dotenv/config";
import { z } from "zod";
import { DefaultLogger, eq } from "drizzle-orm";
import { eq } from "drizzle-orm";
import { createPublicClient, fallback, webSocket, http, Transport } from "viem";
import fastify from "fastify";
import { fastifyTRPCPlugin } from "@trpc/server/adapters/fastify";
import { AppRouter, createAppRouter } from "@latticexyz/store-sync/trpc-indexer";
import { createQueryAdapter } from "../src/postgres/createQueryAdapter";
import { isDefined } from "@latticexyz/common/utils";
import { combineLatest, filter, first } from "rxjs";
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";
import { cleanDatabase, postgresStorage, schemaVersion } from "@latticexyz/store-sync/postgres";
import { createStoreSync } from "@latticexyz/store-sync";
import { parseEnv } from "./parseEnv";
import { indexerEnvSchema, parseEnv } from "./parseEnv";

const env = parseEnv(
z.object({
DATABASE_URL: z.string(),
})
z.intersection(
indexerEnvSchema,
z.object({
DATABASE_URL: z.string(),
HEALTHCHECK_HOST: z.string().optional(),
HEALTHCHECK_PORT: z.coerce.number().optional(),
})
)
);

const transports: Transport[] = [
Expand All @@ -34,9 +35,7 @@ const publicClient = createPublicClient({
});

const chainId = await publicClient.getChainId();
const database = drizzle(postgres(env.DATABASE_URL), {
logger: new DefaultLogger(),
});
const database = drizzle(postgres(env.DATABASE_URL));

const { storageAdapter, internalTables } = await postgresStorage({ database, publicClient });

Expand Down Expand Up @@ -93,27 +92,16 @@ combineLatest([latestBlockNumber$, storedBlockLogs$])
console.log("all caught up");
});

// @see https://fastify.dev/docs/latest/
const server = fastify({
maxParamLength: 5000,
});
if (env.HEALTHCHECK_HOST != null || env.HEALTHCHECK_PORT != null) {
const { default: fastify } = await import("fastify");

await server.register(import("@fastify/cors"));
const server = fastify();

// k8s healthchecks
server.get("/healthz", (req, res) => res.code(200).send());
server.get("/readyz", (req, res) => (isCaughtUp ? res.code(200).send("ready") : res.code(424).send("backfilling")));
// k8s healthchecks
server.get("/healthz", (req, res) => res.code(200).send());
server.get("/readyz", (req, res) => (isCaughtUp ? res.code(200).send("ready") : res.code(424).send("backfilling")));

// @see https://trpc.io/docs/server/adapters/fastify
server.register(fastifyTRPCPlugin<AppRouter>, {
prefix: "/trpc",
trpcOptions: {
router: createAppRouter(),
createContext: async () => ({
queryAdapter: await createQueryAdapter(database),
}),
},
});

await server.listen({ host: env.HOST, port: env.PORT });
console.log(`indexer server listening on http://${env.HOST}:${env.PORT}`);
server.listen({ host: env.HEALTHCHECK_HOST, port: env.HEALTHCHECK_PORT }, (error, address) => {
console.log(`postgres indexer healthcheck server listening on ${address}`);
});
}
13 changes: 8 additions & 5 deletions packages/store-indexer/bin/sqlite-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ import { chainState, schemaVersion, syncToSqlite } from "@latticexyz/store-sync/
import { createQueryAdapter } from "../src/sqlite/createQueryAdapter";
import { isDefined } from "@latticexyz/common/utils";
import { combineLatest, filter, first } from "rxjs";
import { parseEnv } from "./parseEnv";
import { frontendEnvSchema, indexerEnvSchema, parseEnv } from "./parseEnv";

const env = parseEnv(
z.object({
SQLITE_FILENAME: z.string().default("indexer.db"),
})
z.intersection(
z.intersection(indexerEnvSchema, frontendEnvSchema),
z.object({
SQLITE_FILENAME: z.string().default("indexer.db"),
})
)
);

const transports: Transport[] = [
Expand Down Expand Up @@ -106,4 +109,4 @@ server.register(fastifyTRPCPlugin<AppRouter>, {
});

await server.listen({ host: env.HOST, port: env.PORT });
console.log(`indexer server listening on http://${env.HOST}:${env.PORT}`);
console.log(`sqlite indexer frontend listening on http://${env.HOST}:${env.PORT}`);
4 changes: 3 additions & 1 deletion packages/store-indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
},
"types": "src/index.ts",
"bin": {
"postgres-frontend": "./dist/bin/postgres-frontend.js",
"postgres-indexer": "./dist/bin/postgres-indexer.js",
"sqlite-indexer": "./dist/bin/sqlite-indexer.js"
},
Expand All @@ -24,7 +25,7 @@
"clean:js": "rimraf dist",
"dev": "tsup --watch",
"lint": "eslint .",
"start:postgres": "tsx bin/postgres-indexer",
"start:postgres": "concurrently -n indexer,frontend -c cyan,magenta 'tsx bin/postgres-indexer' 'tsx bin/postgres-frontend'",
"start:postgres:local": "DEBUG=mud:store-sync:createStoreSync DATABASE_URL=postgres://127.0.0.1/postgres RPC_HTTP_URL=http://127.0.0.1:8545 pnpm start:postgres",
"start:postgres:testnet": "DEBUG=mud:store-sync:createStoreSync DATABASE_URL=postgres://127.0.0.1/postgres RPC_HTTP_URL=https://follower.testnet-chain.linfra.xyz pnpm start:postgres",
"start:sqlite": "tsx bin/sqlite-indexer",
Expand Down Expand Up @@ -58,6 +59,7 @@
"@types/better-sqlite3": "^7.6.4",
"@types/cors": "^2.8.13",
"@types/debug": "^4.1.7",
"concurrently": "^8.2.2",
"tsup": "^6.7.0",
"tsx": "^3.12.6",
"vitest": "0.31.4"
Expand Down
2 changes: 1 addition & 1 deletion packages/store-indexer/tsup.config.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { defineConfig } from "tsup";

export default defineConfig({
entry: ["src/index.ts", "bin/postgres-indexer.ts", "bin/sqlite-indexer.ts"],
entry: ["src/index.ts", "bin/postgres-frontend.ts", "bin/postgres-indexer.ts", "bin/sqlite-indexer.ts"],
target: "esnext",
format: ["esm"],
dts: false,
Expand Down
Loading

0 comments on commit 5ecccfe

Please sign in to comment.