Skip to content

Commit

Permalink
refactor(store-indexer): clean up middleware (#2163)
Browse files Browse the repository at this point in the history
  • Loading branch information
holic authored Jan 19, 2024
1 parent 08b4221 commit 3a089d1
Show file tree
Hide file tree
Showing 14 changed files with 196 additions and 213 deletions.
38 changes: 15 additions & 23 deletions packages/store-indexer/bin/postgres-decoded-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import postgres from "postgres";
import { createStorageAdapter } from "@latticexyz/store-sync/postgres-decoded";
import { createStoreSync } from "@latticexyz/store-sync";
import { indexerEnvSchema, parseEnv } from "./parseEnv";
import { sentry } from "../src/koa-middleware/sentry";
import { healthcheck } from "../src/koa-middleware/healthcheck";
import { helloWorld } from "../src/koa-middleware/helloWorld";

const env = parseEnv(
z.intersection(
Expand All @@ -18,6 +21,7 @@ const env = parseEnv(
DATABASE_URL: z.string(),
HEALTHCHECK_HOST: z.string().optional(),
HEALTHCHECK_PORT: z.coerce.number().optional(),
SENTRY_DSN: z.string().optional(),
})
)
);
Expand Down Expand Up @@ -88,33 +92,21 @@ combineLatest([latestBlockNumber$, storedBlockLogs$])
if (env.HEALTHCHECK_HOST != null || env.HEALTHCHECK_PORT != null) {
const { default: Koa } = await import("koa");
const { default: cors } = await import("@koa/cors");
const { default: Router } = await import("@koa/router");

const server = new Koa();
server.use(cors());

const router = new Router();

router.get("/", (ctx) => {
ctx.body = "emit HelloWorld();";
});

// k8s healthchecks
router.get("/healthz", (ctx) => {
ctx.status = 200;
});
router.get("/readyz", (ctx) => {
if (isCaughtUp) {
ctx.status = 200;
ctx.body = "ready";
} else {
ctx.status = 424;
ctx.body = "backfilling";
}
});
if (env.SENTRY_DSN) {
server.use(sentry(env.SENTRY_DSN));
}

server.use(router.routes());
server.use(router.allowedMethods());
server.use(cors());
server.use(cors());

This comment has been minimized.

Copy link
@holic

holic Jan 19, 2024

Author Member

whoops, duplicated this line on accident

server.use(
healthcheck({
isReady: () => isCaughtUp,
})
);
server.use(helloWorld());

server.listen({ host: env.HEALTHCHECK_HOST, port: env.HEALTHCHECK_PORT });
console.log(
Expand Down
29 changes: 8 additions & 21 deletions packages/store-indexer/bin/postgres-frontend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@ import "dotenv/config";
import { z } from "zod";
import Koa from "koa";
import cors from "@koa/cors";
import Router from "@koa/router";
import { createKoaMiddleware } from "trpc-koa-adapter";
import { createAppRouter } from "@latticexyz/store-sync/trpc-indexer";
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";
import { frontendEnvSchema, parseEnv } from "./parseEnv";
import { createQueryAdapter } from "../src/postgres/deprecated/createQueryAdapter";
import { apiRoutes } from "../src/postgres/apiRoutes";
import { registerSentryMiddlewares } from "../src/sentry";
import { sentry } from "../src/koa-middleware/sentry";
import { healthcheck } from "../src/koa-middleware/healthcheck";
import { helloWorld } from "../src/koa-middleware/helloWorld";

const env = parseEnv(
z.intersection(
frontendEnvSchema,
z.object({
DATABASE_URL: z.string(),
SENTRY_DSN: z.string().optional(),
})
)
);
Expand All @@ -26,30 +28,15 @@ const database = postgres(env.DATABASE_URL, { prepare: false });

const server = new Koa();

if (process.env.SENTRY_DSN) {
registerSentryMiddlewares(server);
if (env.SENTRY_DSN) {
server.use(sentry(env.SENTRY_DSN));
}

server.use(cors());
server.use(healthcheck());
server.use(helloWorld());
server.use(apiRoutes(database));

const router = new Router();

router.get("/", (ctx) => {
ctx.body = "emit HelloWorld();";
});

// k8s healthchecks
router.get("/healthz", (ctx) => {
ctx.status = 200;
});
router.get("/readyz", (ctx) => {
ctx.status = 200;
});

server.use(router.routes());
server.use(router.allowedMethods());

server.use(
createKoaMiddleware({
prefix: "/trpc",
Expand Down
33 changes: 9 additions & 24 deletions packages/store-indexer/bin/postgres-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,33 +93,18 @@ combineLatest([latestBlockNumber$, storedBlockLogs$])
if (env.HEALTHCHECK_HOST != null || env.HEALTHCHECK_PORT != null) {
const { default: Koa } = await import("koa");
const { default: cors } = await import("@koa/cors");
const { default: Router } = await import("@koa/router");
const { healthcheck } = await import("../src/koa-middleware/healthcheck");
const { helloWorld } = await import("../src/koa-middleware/helloWorld");

const server = new Koa();
server.use(cors());

const router = new Router();

router.get("/", (ctx) => {
ctx.body = "emit HelloWorld();";
});

// k8s healthchecks
router.get("/healthz", (ctx) => {
ctx.status = 200;
});
router.get("/readyz", (ctx) => {
if (isCaughtUp) {
ctx.status = 200;
ctx.body = "ready";
} else {
ctx.status = 424;
ctx.body = "backfilling";
}
});

server.use(router.routes());
server.use(router.allowedMethods());
server.use(cors());
server.use(
healthcheck({
isReady: () => isCaughtUp,
})
);
server.use(helloWorld());

server.listen({ host: env.HEALTHCHECK_HOST, port: env.HEALTHCHECK_PORT });
console.log(
Expand Down
39 changes: 12 additions & 27 deletions packages/store-indexer/bin/sqlite-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ import Database from "better-sqlite3";
import { createPublicClient, fallback, webSocket, http, Transport } from "viem";
import Koa from "koa";
import cors from "@koa/cors";
import Router from "@koa/router";
import { createKoaMiddleware } from "trpc-koa-adapter";
import { createAppRouter } from "@latticexyz/store-sync/trpc-indexer";
import { chainState, schemaVersion, syncToSqlite } from "@latticexyz/store-sync/sqlite";
import { createQueryAdapter } from "../src/sqlite/createQueryAdapter";
import { isDefined } from "@latticexyz/common/utils";
import { combineLatest, filter, first } from "rxjs";
import { frontendEnvSchema, indexerEnvSchema, parseEnv } from "./parseEnv";
import { healthcheck } from "../src/koa-middleware/healthcheck";
import { helloWorld } from "../src/koa-middleware/helloWorld";
import { apiRoutes } from "../src/sqlite/apiRoutes";
import { registerSentryMiddlewares } from "../src/sentry";
import { sentry } from "../src/koa-middleware/sentry";

const env = parseEnv(
z.intersection(
Expand Down Expand Up @@ -93,35 +94,19 @@ combineLatest([latestBlockNumber$, storedBlockLogs$])
});

const server = new Koa();
server.use(cors());
server.use(apiRoutes(database));

if (env.SENTRY_DSN) {
registerSentryMiddlewares(server);
server.use(sentry(env.SENTRY_DSN));
}

const router = new Router();

router.get("/", (ctx) => {
ctx.body = "emit HelloWorld();";
});

// k8s healthchecks
router.get("/healthz", (ctx) => {
ctx.status = 200;
});
router.get("/readyz", (ctx) => {
if (isCaughtUp) {
ctx.status = 200;
ctx.body = "ready";
} else {
ctx.status = 424;
ctx.body = "backfilling";
}
});

server.use(router.routes());
server.use(router.allowedMethods());
server.use(cors());
server.use(
healthcheck({
isReady: () => isCaughtUp,
})
);
server.use(helloWorld());
server.use(apiRoutes(database));

server.use(
createKoaMiddleware({
Expand Down
1 change: 0 additions & 1 deletion packages/store-indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
"devDependencies": {
"@types/accepts": "^1.3.7",
"@types/better-sqlite3": "^7.6.4",
"@types/cors": "^2.8.13",
"@types/debug": "^4.1.7",
"@types/koa": "^2.13.12",
"@types/koa-compose": "^3.2.8",
Expand Down
File renamed without changes.
37 changes: 37 additions & 0 deletions packages/store-indexer/src/koa-middleware/healthcheck.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { Middleware } from "koa";

type HealthcheckOptions = {
isHealthy?: () => boolean;
isReady?: () => boolean;
};

/**
* Middleware to add Kubernetes healthcheck endpoints
*/
export function healthcheck({ isHealthy, isReady }: HealthcheckOptions = {}): Middleware {
return async function healthcheckMiddleware(ctx, next): Promise<void> {
if (ctx.path === "/healthz") {
if (isHealthy == null || isHealthy()) {
ctx.status = 200;
ctx.body = "healthy";
} else {
ctx.status = 503;
ctx.body = "not healthy";
}
return;
}

if (ctx.path === "/readyz") {
if (isReady == null || isReady()) {
ctx.status = 200;
ctx.body = "ready";
} else {
ctx.status = 503;
ctx.body = "not ready";
}
return;
}

await next();
};
}
12 changes: 12 additions & 0 deletions packages/store-indexer/src/koa-middleware/helloWorld.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Middleware } from "koa";

export function helloWorld(): Middleware {
return async function helloWorldMiddleware(ctx, next): Promise<void> {
if (ctx.path === "/") {
ctx.status = 200;
ctx.body = "emit HelloWorld();";
return;
}
await next();
};
}
101 changes: 101 additions & 0 deletions packages/store-indexer/src/koa-middleware/sentry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import * as Sentry from "@sentry/node";
import { ProfilingIntegration } from "@sentry/profiling-node";
import { stripUrlQueryAndFragment } from "@sentry/utils";
import debug from "debug";
import Koa from "koa";
import compose from "koa-compose";

export function errorHandler(): Koa.Middleware {
return async function errorHandlerMiddleware(ctx, next) {
try {
await next();
} catch (err) {
Sentry.withScope((scope) => {
scope.addEventProcessor((event) => {
return Sentry.addRequestDataToEvent(event, ctx.request);
});
Sentry.captureException(err);
});
throw err;
}
};
}

export function requestHandler(): Koa.Middleware {
return async function requestHandlerMiddleware(ctx, next) {
await Sentry.runWithAsyncContext(async () => {
const hub = Sentry.getCurrentHub();
hub.configureScope((scope) =>
scope.addEventProcessor((event) =>
Sentry.addRequestDataToEvent(event, ctx.request, {
include: {
user: false,
},
})
)
);
await next();
});
};
}

export function tracing(): Koa.Middleware {
// creates a Sentry transaction per request
return async function tracingMiddleware(ctx, next) {
const reqMethod = (ctx.method || "").toUpperCase();
const reqUrl = ctx.url && stripUrlQueryAndFragment(ctx.url);

// Connect to trace of upstream app
let traceparentData;
if (ctx.request.get("sentry-trace")) {
traceparentData = Sentry.extractTraceparentData(ctx.request.get("sentry-trace"));
}

const transaction = Sentry.startTransaction({
name: `${reqMethod} ${reqUrl}`,
op: "http.server",
...traceparentData,
});

ctx.__sentry_transaction = transaction;

// We put the transaction on the scope so users can attach children to it
Sentry.getCurrentHub().configureScope((scope) => {
scope.setSpan(transaction);
});

ctx.res.on("finish", () => {
// Push `transaction.finish` to the next event loop so open spans have a chance to finish before the transaction closes
setImmediate(() => {
// If you're using koa router, set the matched route as transaction name
if (ctx._matchedRoute) {
const mountPath = ctx.mountPath || "";
transaction.setName(`${reqMethod} ${mountPath}${ctx._matchedRoute}`);
}

transaction.setHttpStatus(ctx.status);
transaction.finish();
});
});

await next();
};
}

export function sentry(dsn: string): Koa.Middleware {
debug("Initializing Sentry");
Sentry.init({
dsn,
integrations: [
// Automatically instrument Node.js libraries and frameworks
...Sentry.autoDiscoverNodePerformanceMonitoringIntegrations(),
new ProfilingIntegration(),
],
// Performance Monitoring
tracesSampleRate: 1.0,
// Set sampling rate for profiling - this is relative to tracesSampleRate
profilesSampleRate: 1.0,
});

return compose([errorHandler(), requestHandler(), tracing()]);
}
Loading

0 comments on commit 3a089d1

Please sign in to comment.