Skip to content

Commit

Permalink
feat(store-indexer): command to run decoded indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
yonadaa committed Dec 11, 2023
1 parent 392c4b8 commit 655efa0
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 1 deletion.
123 changes: 123 additions & 0 deletions packages/store-indexer/bin/postgres-decoded-indexer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#!/usr/bin/env node
import "dotenv/config";
import { z } from "zod";
import { eq } from "drizzle-orm";
import { createPublicClient, fallback, webSocket, http, Transport } from "viem";
import { isDefined } from "@latticexyz/common/utils";
import { combineLatest, filter, first } from "rxjs";
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";
import { createStorageAdapter } from "@latticexyz/store-sync/postgres-decoded";
import { createStoreSync } from "@latticexyz/store-sync";
import { indexerEnvSchema, parseEnv } from "./parseEnv";

const env = parseEnv(
z.intersection(
indexerEnvSchema,
z.object({
DATABASE_URL: z.string(),
HEALTHCHECK_HOST: z.string().optional(),
HEALTHCHECK_PORT: z.coerce.number().optional(),
})
)
);

const transports: Transport[] = [
// prefer WS when specified
env.RPC_WS_URL ? webSocket(env.RPC_WS_URL) : undefined,
// otherwise use or fallback to HTTP
env.RPC_HTTP_URL ? http(env.RPC_HTTP_URL) : undefined,
].filter(isDefined);

const publicClient = createPublicClient({
transport: fallback(transports),
pollingInterval: env.POLLING_INTERVAL,
});

const chainId = await publicClient.getChainId();
const database = drizzle(postgres(env.DATABASE_URL, { prepare: false }));

const { storageAdapter, tables } = await createStorageAdapter({ database, publicClient });

let startBlock = env.START_BLOCK;

// Resume from latest block stored in DB. This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error.
// TODO: query if the DB exists instead of try/catch
try {
const chainState = await database
.select()
.from(tables.configTable)
.where(eq(tables.configTable.chainId, chainId))
.limit(1)
.execute()
// Get the first record in a way that returns a possible `undefined`
// TODO: move this to `.findFirst` after upgrading drizzle or `rows[0]` after enabling `noUncheckedIndexedAccess: true`
.then((rows) => rows.find(() => true));

if (chainState?.blockNumber != null) {
startBlock = chainState.blockNumber + 1n;
console.log("resuming from block number", startBlock);
}
} catch (error) {
// ignore errors for now
}

const { latestBlockNumber$, storedBlockLogs$ } = await createStoreSync({
storageAdapter,
publicClient,
startBlock,
maxBlockRange: env.MAX_BLOCK_RANGE,
address: env.STORE_ADDRESS,
});

storedBlockLogs$.subscribe();

let isCaughtUp = false;
combineLatest([latestBlockNumber$, storedBlockLogs$])
.pipe(
filter(
([latestBlockNumber, { blockNumber: lastBlockNumberProcessed }]) => latestBlockNumber === lastBlockNumberProcessed
),
first()
)
.subscribe(() => {
isCaughtUp = true;
console.log("all caught up");
});

if (env.HEALTHCHECK_HOST != null || env.HEALTHCHECK_PORT != null) {
const { default: Koa } = await import("koa");
const { default: cors } = await import("@koa/cors");
const { default: Router } = await import("@koa/router");

const server = new Koa();
server.use(cors());

const router = new Router();

router.get("/", (ctx) => {
ctx.body = "emit HelloWorld();";
});

// k8s healthchecks
router.get("/healthz", (ctx) => {
ctx.status = 200;
});
router.get("/readyz", (ctx) => {
if (isCaughtUp) {
ctx.status = 200;
ctx.body = "ready";
} else {
ctx.status = 424;
ctx.body = "backfilling";
}
});

server.use(router.routes());
server.use(router.allowedMethods());

server.listen({ host: env.HEALTHCHECK_HOST, port: env.HEALTHCHECK_PORT });
console.log(
`postgres indexer healthcheck server listening on http://${env.HEALTHCHECK_HOST}:${env.HEALTHCHECK_PORT}`
);
}
5 changes: 4 additions & 1 deletion packages/store-indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
"dev": "tsup --watch",
"lint": "eslint .",
"start:postgres": "concurrently -n indexer,frontend -c cyan,magenta 'tsx bin/postgres-indexer' 'tsx bin/postgres-frontend'",
"start:postgres-decoded": "tsx bin/postgres-decoded-indexer",
"start:postgres-decoded:local": "DATABASE_URL=postgres://127.0.0.1/postgres RPC_HTTP_URL=http://127.0.0.1:8545 pnpm start:postgres-decoded",
"start:postgres-decoded:testnet": "DATABASE_URL=postgres://127.0.0.1/postgres RPC_HTTP_URL=https://rpc.holesky.redstone.xyz pnpm start:postgres-decoded",
"start:postgres:local": "DATABASE_URL=postgres://127.0.0.1/postgres RPC_HTTP_URL=http://127.0.0.1:8545 pnpm start:postgres",
"start:postgres:testnet": "DATABASE_URL=postgres://127.0.0.1/postgres RPC_HTTP_URL=https://rpc.holesky.redstone.xyz pnpm start:postgres",
"start:sqlite": "tsx bin/sqlite-indexer",
Expand Down Expand Up @@ -79,4 +82,4 @@
"access": "public"
},
"gitHead": "914a1e0ae4a573d685841ca2ea921435057deb8f"
}
}

0 comments on commit 655efa0

Please sign in to comment.