Skip to content

Commit

Permalink
feat(store-indexer): run indexers with npx (#1526)
Browse files Browse the repository at this point in the history
  • Loading branch information
holic authored Sep 17, 2023
1 parent 759514d commit 498d05e
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 79 deletions.
25 changes: 25 additions & 0 deletions .changeset/proud-insects-perform.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
"@latticexyz/store-indexer": minor
---

You can now install and run `@latticexyz/store-indexer` from the npm package itself, without having to clone/build the MUD repo:

```sh
npm install @latticexyz/store-indexer

npm sqlite-indexer
# or
npm postgres-indexer
```

or

```sh
npx -p @latticexyz/store-indexer sqlite-indexer
# or
npx -p @latticexyz/store-indexer postgres-indexer
```

The binary will also load the nearby `.env` file for easier local configuration.

We've removed the `CHAIN_ID` requirement and instead require just a `RPC_HTTP_URL` or `RPC_WS_URL` or both. You can now also adjust the polling interval with `POLLING_INTERVAL` (defaults to 1000ms, which corresponds to MUD's default block time).
70 changes: 30 additions & 40 deletions packages/store-indexer/bin/postgres-indexer.ts
Original file line number Diff line number Diff line change
@@ -1,74 +1,64 @@
#!/usr/bin/env node
import "dotenv/config";
import { z } from "zod";
import { DefaultLogger, eq } from "drizzle-orm";
import { createPublicClient, fallback, webSocket, http, Transport } from "viem";
import fastify from "fastify";
import { fastifyTRPCPlugin } from "@trpc/server/adapters/fastify";
import { AppRouter, createAppRouter } from "@latticexyz/store-sync/trpc-indexer";
import { createQueryAdapter } from "../src/postgres/createQueryAdapter";
import type { Chain } from "viem/chains";
import * as mudChains from "@latticexyz/common/chains";
import * as chains from "viem/chains";
import { isNotNull } from "@latticexyz/common/utils";
import { isDefined } from "@latticexyz/common/utils";
import { combineLatest, filter, first } from "rxjs";
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";
import { cleanDatabase, postgresStorage, schemaVersion } from "@latticexyz/store-sync/postgres";
import { createStoreSync } from "@latticexyz/store-sync";

const possibleChains = Object.values({ ...mudChains, ...chains }) as Chain[];

// TODO: refine zod type to be either CHAIN_ID or RPC_HTTP_URL/RPC_WS_URL
const env = z
.object({
CHAIN_ID: z.coerce.number().positive().optional(),
RPC_HTTP_URL: z.string().optional(),
RPC_WS_URL: z.string().optional(),
START_BLOCK: z.coerce.bigint().nonnegative().default(0n),
MAX_BLOCK_RANGE: z.coerce.bigint().positive().default(1000n),
HOST: z.string().default("0.0.0.0"),
PORT: z.coerce.number().positive().default(3001),
DATABASE_URL: z.string(),
})
.intersection(
z.object({
HOST: z.string().default("0.0.0.0"),
PORT: z.coerce.number().positive().default(3001),
DATABASE_URL: z.string(),
START_BLOCK: z.coerce.bigint().nonnegative().default(0n),
MAX_BLOCK_RANGE: z.coerce.bigint().positive().default(1000n),
POLLING_INTERVAL: z.coerce.number().positive().default(1000),
}),
z
.object({
RPC_HTTP_URL: z.string(),
RPC_WS_URL: z.string(),
})
.partial()
.refine((values) => Object.values(values).some(isDefined))
)
.parse(process.env, {
errorMap: (issue) => ({
message: `Missing or invalid environment variable: ${issue.path.join(".")}`,
}),
});

const chain = env.CHAIN_ID != null ? possibleChains.find((c) => c.id === env.CHAIN_ID) : undefined;
if (env.CHAIN_ID != null && !chain) {
console.warn(`No chain found for chain ID ${env.CHAIN_ID}`);
}

const transports: Transport[] = [
env.RPC_WS_URL ? webSocket(env.RPC_WS_URL) : null,
env.RPC_HTTP_URL ? http(env.RPC_HTTP_URL) : null,
].filter(isNotNull);
// prefer WS when specified
env.RPC_WS_URL ? webSocket(env.RPC_WS_URL) : undefined,
// otherwise use or fallback to HTTP
env.RPC_HTTP_URL ? http(env.RPC_HTTP_URL) : undefined,
].filter(isDefined);

const publicClient = createPublicClient({
chain,
transport: fallback(
// If one or more RPC URLs are provided, we'll configure the transport with only those RPC URLs
transports.length > 0
? transports
: // Otherwise use the chain defaults
[webSocket(), http()]
),
pollingInterval: 1000,
transport: fallback(transports),
pollingInterval: env.POLLING_INTERVAL,
});

// Fetch the chain ID from the RPC if no chain object was found for the provided chain ID.
// We do this to match the downstream logic, which also attempts to find the chain ID.
const chainId = chain?.id ?? (await publicClient.getChainId());

const chainId = await publicClient.getChainId();
const database = drizzle(postgres(env.DATABASE_URL), {
logger: new DefaultLogger(),
});

let startBlock = env.START_BLOCK;

const { storageAdapter, internalTables } = await postgresStorage({ database, publicClient });

let startBlock = env.START_BLOCK;

// Resume from latest block stored in DB. This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error.
try {
const currentChainStates = await database
Expand Down
66 changes: 28 additions & 38 deletions packages/store-indexer/bin/sqlite-indexer.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env node
import "dotenv/config";
import fs from "node:fs";
import { z } from "zod";
import { eq } from "drizzle-orm";
Expand All @@ -9,58 +11,46 @@ import { fastifyTRPCPlugin } from "@trpc/server/adapters/fastify";
import { AppRouter, createAppRouter } from "@latticexyz/store-sync/trpc-indexer";
import { chainState, schemaVersion, syncToSqlite } from "@latticexyz/store-sync/sqlite";
import { createQueryAdapter } from "../src/sqlite/createQueryAdapter";
import type { Chain } from "viem/chains";
import * as mudChains from "@latticexyz/common/chains";
import * as chains from "viem/chains";
import { isNotNull } from "@latticexyz/common/utils";
import { isDefined } from "@latticexyz/common/utils";
import { combineLatest, filter, first } from "rxjs";

const possibleChains = Object.values({ ...mudChains, ...chains }) as Chain[];

// TODO: refine zod type to be either CHAIN_ID or RPC_HTTP_URL/RPC_WS_URL
const env = z
.object({
CHAIN_ID: z.coerce.number().positive().optional(),
RPC_HTTP_URL: z.string().optional(),
RPC_WS_URL: z.string().optional(),
START_BLOCK: z.coerce.bigint().nonnegative().default(0n),
MAX_BLOCK_RANGE: z.coerce.bigint().positive().default(1000n),
HOST: z.string().default("0.0.0.0"),
PORT: z.coerce.number().positive().default(3001),
SQLITE_FILENAME: z.string().default("indexer.db"),
})
.intersection(
z.object({
HOST: z.string().default("0.0.0.0"),
PORT: z.coerce.number().positive().default(3001),
SQLITE_FILENAME: z.string().default("indexer.db"),
START_BLOCK: z.coerce.bigint().nonnegative().default(0n),
MAX_BLOCK_RANGE: z.coerce.bigint().positive().default(1000n),
POLLING_INTERVAL: z.coerce.number().positive().default(1000),
}),
z
.object({
RPC_HTTP_URL: z.string(),
RPC_WS_URL: z.string(),
})
.partial()
.refine((values) => Object.values(values).some(isDefined))
)
.parse(process.env, {
errorMap: (issue) => ({
message: `Missing or invalid environment variable: ${issue.path.join(".")}`,
}),
});

const chain = env.CHAIN_ID != null ? possibleChains.find((c) => c.id === env.CHAIN_ID) : undefined;
if (env.CHAIN_ID != null && !chain) {
console.warn(`No chain found for chain ID ${env.CHAIN_ID}`);
}

const transports: Transport[] = [
env.RPC_WS_URL ? webSocket(env.RPC_WS_URL) : null,
env.RPC_HTTP_URL ? http(env.RPC_HTTP_URL) : null,
].filter(isNotNull);
// prefer WS when specified
env.RPC_WS_URL ? webSocket(env.RPC_WS_URL) : undefined,
// otherwise use or fallback to HTTP
env.RPC_HTTP_URL ? http(env.RPC_HTTP_URL) : undefined,
].filter(isDefined);

const publicClient = createPublicClient({
chain,
transport: fallback(
// If one or more RPC URLs are provided, we'll configure the transport with only those RPC URLs
transports.length > 0
? transports
: // Otherwise use the chain defaults
[webSocket(), http()]
),
pollingInterval: 1000,
transport: fallback(transports),
pollingInterval: env.POLLING_INTERVAL,
});

// Fetch the chain ID from the RPC if no chain object was found for the provided chain ID.
// We do this to match the downstream logic, which also attempts to find the chain ID.
const chainId = chain?.id ?? (await publicClient.getChainId());

const chainId = await publicClient.getChainId();
const database = drizzle(new Database(env.SQLITE_FILENAME));

let startBlock = env.START_BLOCK;
Expand Down
5 changes: 5 additions & 0 deletions packages/store-indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
".": "./dist/index.js"
},
"types": "src/index.ts",
"bin": {
"postgres-indexer": "./dist/bin/postgres-indexer.js",
"sqlite-indexer": "./dist/bin/sqlite-indexer.js"
},
"scripts": {
"build": "pnpm run build:js",
"build:js": "tsup",
Expand Down Expand Up @@ -42,6 +46,7 @@
"@wagmi/chains": "^0.2.22",
"better-sqlite3": "^8.6.0",
"debug": "^4.3.4",
"dotenv": "^16.0.3",
"drizzle-orm": "^0.28.5",
"fastify": "^4.21.0",
"postgres": "^3.3.5",
Expand Down
2 changes: 1 addition & 1 deletion packages/store-indexer/tsup.config.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { defineConfig } from "tsup";

export default defineConfig({
entry: ["src/index.ts"],
entry: ["src/index.ts", "bin/postgres-indexer.ts", "bin/sqlite-indexer.ts"],
target: "esnext",
format: ["esm"],
dts: false,
Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 498d05e

Please sign in to comment.