Skip to content

Commit

Permalink
feat(store-indexer): add latest stored block number to metrics (#2740)
Browse files Browse the repository at this point in the history
  • Loading branch information
alvrs authored Apr 25, 2024
1 parent 82ada7a commit 7dfb82f
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 40 deletions.
43 changes: 25 additions & 18 deletions packages/store-indexer/bin/postgres-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,30 @@ const { storageAdapter, tables } = await createStorageAdapter({ database, public

let startBlock = env.START_BLOCK;

// Resume from latest block stored in DB. This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error.
// TODO: query if the DB exists instead of try/catch
try {
const chainState = await database
.select()
.from(tables.configTable)
.where(eq(tables.configTable.chainId, chainId))
.limit(1)
.execute()
// Get the first record in a way that returns a possible `undefined`
// TODO: move this to `.findFirst` after upgrading drizzle or `rows[0]` after enabling `noUncheckedIndexedAccess: true`
.then((rows) => rows.find(() => true));

if (chainState?.blockNumber != null) {
startBlock = chainState.blockNumber + 1n;
console.log("resuming from block number", startBlock);
async function getLatestStoredBlockNumber(): Promise<bigint | undefined> {
// Fetch latest block stored in DB. This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error.
// TODO: query if the DB exists instead of try/catch
try {
const chainState = await database
.select()
.from(tables.configTable)
.where(eq(tables.configTable.chainId, chainId))
.limit(1)
.execute()
// Get the first record in a way that returns a possible `undefined`
// TODO: move this to `.findFirst` after upgrading drizzle or `rows[0]` after enabling `noUncheckedIndexedAccess: true`
.then((rows) => rows.find(() => true));

return chainState?.blockNumber;
} catch (error) {
// ignore errors for now
}
} catch (error) {
// ignore errors for now
}

const latestStoredBlockNumber = await getLatestStoredBlockNumber();
if (latestStoredBlockNumber != null) {
startBlock = latestStoredBlockNumber + 1n;
console.log("resuming from block number", startBlock);
}

const { latestBlockNumber$, storedBlockLogs$ } = await createStoreSync({
Expand Down Expand Up @@ -111,6 +116,8 @@ if (env.HEALTHCHECK_HOST != null || env.HEALTHCHECK_PORT != null) {
metrics({
isHealthy: () => true,
isReady: () => isCaughtUp,
getLatestStoredBlockNumber,
followBlockTag: env.FOLLOW_BLOCK_TAG,
}),
);
server.use(helloWorld());
Expand Down
63 changes: 42 additions & 21 deletions packages/store-indexer/bin/sqlite-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,48 @@ const database = drizzle(new Database(env.SQLITE_FILENAME));

let startBlock = env.START_BLOCK;

// Resume from latest block stored in DB. This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error.
try {
const currentChainStates = database.select().from(chainState).where(eq(chainState.chainId, chainId)).all();
// TODO: replace this type workaround with `noUncheckedIndexedAccess: true` when we can fix all the issues related (https://github.com/latticexyz/mud/issues/1212)
const currentChainState: (typeof currentChainStates)[number] | undefined = currentChainStates[0];

if (currentChainState != null) {
if (currentChainState.schemaVersion != schemaVersion) {
console.log(
"schema version changed from",
currentChainState.schemaVersion,
"to",
schemaVersion,
"recreating database",
);
fs.truncateSync(env.SQLITE_FILENAME);
} else if (currentChainState.lastUpdatedBlockNumber != null) {
console.log("resuming from block number", currentChainState.lastUpdatedBlockNumber + 1n);
startBlock = currentChainState.lastUpdatedBlockNumber + 1n;
async function getCurrentChainState(): Promise<
| {
schemaVersion: number;
chainId: number;
lastUpdatedBlockNumber: bigint | null;
lastError: string | null;
}
| undefined
> {
// This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error.
try {
const currentChainStates = database.select().from(chainState).where(eq(chainState.chainId, chainId)).all();
// TODO: replace this type workaround with `noUncheckedIndexedAccess: true` when we can fix all the issues related (https://github.com/latticexyz/mud/issues/1212)
const currentChainState: (typeof currentChainStates)[number] | undefined = currentChainStates[0];
return currentChainState;
} catch (error) {
// ignore errors, this is optional
}
}

async function getLatestStoredBlockNumber(): Promise<bigint | undefined> {
const currentChainState = await getCurrentChainState();
return currentChainState?.lastUpdatedBlockNumber ?? undefined;
}

const currentChainState = await getCurrentChainState();
if (currentChainState) {
// Reset the db if the version changed
if (currentChainState.schemaVersion != schemaVersion) {
console.log(
"schema version changed from",
currentChainState.schemaVersion,
"to",
schemaVersion,
"recreating database",
);
fs.truncateSync(env.SQLITE_FILENAME);
} else if (currentChainState.lastUpdatedBlockNumber != null) {
// Resume from latest block stored in DB. This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error.
console.log("resuming from block number", currentChainState.lastUpdatedBlockNumber + 1n);
startBlock = currentChainState.lastUpdatedBlockNumber + 1n;
}
} catch (error) {
// ignore errors, this is optional
}

const { latestBlockNumber$, storedBlockLogs$ } = await syncToSqlite({
Expand Down Expand Up @@ -112,6 +131,8 @@ server.use(
metrics({
isHealthy: () => true,
isReady: () => isCaughtUp,
getLatestStoredBlockNumber,
followBlockTag: env.FOLLOW_BLOCK_TAG,
}),
);
server.use(helloWorld());
Expand Down
32 changes: 31 additions & 1 deletion packages/store-indexer/src/koa-middleware/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@ import promClient from "prom-client";
type MetricsOptions = {
isHealthy?: () => boolean;
isReady?: () => boolean;
getLatestStoredBlockNumber?: () => Promise<bigint | undefined>;
followBlockTag?: "latest" | "safe" | "finalized";
};

/**
* Middleware to add Prometheus metrics endpoints
*/
export function metrics({ isHealthy, isReady }: MetricsOptions = {}): Middleware {
export function metrics({
isHealthy,
isReady,
getLatestStoredBlockNumber,
followBlockTag,
}: MetricsOptions = {}): Middleware {
promClient.collectDefaultMetrics();
if (isHealthy != null) {
new promClient.Gauge({
Expand All @@ -31,6 +38,29 @@ export function metrics({ isHealthy, isReady }: MetricsOptions = {}): Middleware
});
}

if (getLatestStoredBlockNumber != null) {
new promClient.Gauge({
name: "latest_stored_block_number",
help: "Latest block number stored in the database",
async collect(): Promise<void> {
this.set(Number(await getLatestStoredBlockNumber()));
},
});
}

if (followBlockTag != null) {
const blockTagGauge = new promClient.Gauge({
name: "follow_block_tag",
help: "Block tag the indexer is following (0 = finalized, 1 = safe, 2 = latest)",
});
const blockTagToValue = {
finalized: 0,
safe: 1,
latest: 2,
};
blockTagGauge.set(blockTagToValue[followBlockTag]);
}

return async function metricsMiddleware(ctx, next): Promise<void> {
if (ctx.path === "/metrics") {
ctx.status = 200;
Expand Down

0 comments on commit 7dfb82f

Please sign in to comment.