Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store-indexer, store-sync): improve query performance and enable compression, add new api #2026

Merged
merged 24 commits into from
Dec 7, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
integrate new endpoint in client
  • Loading branch information
alvrs committed Dec 7, 2023
commit d890c5bd95bdc7ed664a6ba2a840191c33aab39d
6 changes: 3 additions & 3 deletions packages/store-indexer/bin/postgres-frontend.ts
Original file line number Diff line number Diff line change
@@ -6,11 +6,11 @@ import cors from "@koa/cors";
import Router from "@koa/router";
import { createKoaMiddleware } from "trpc-koa-adapter";
import { createAppRouter } from "@latticexyz/store-sync/trpc-indexer";
import { createQueryAdapter } from "../src/postgres/createQueryAdapter";
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";
import { frontendEnvSchema, parseEnv } from "./parseEnv";
import { fetchLogs } from "../src/postgres/fetchLogs";
import { createQueryAdapter } from "../src/postgres/deprecated/createQueryAdapter";
import { getLogs } from "../src/postgres/getLogs";

const env = parseEnv(
z.intersection(
@@ -25,7 +25,7 @@ const database = postgres(env.DATABASE_URL);

const server = new Koa();
server.use(cors());
server.use(fetchLogs(database));
server.use(getLogs(database));

const router = new Router();

Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ import { PgDatabase } from "drizzle-orm/pg-core";
import { TableWithRecords, isTableRegistrationLog, logToTable, storeTables } from "@latticexyz/store-sync";
import { decodeKey, decodeValueArgs } from "@latticexyz/protocol-parser";
import { QueryAdapter } from "@latticexyz/store-sync/trpc-indexer";
import { debug } from "../debug";
import { debug } from "../../debug";
import { getLogs } from "./getLogs";
import { groupBy } from "@latticexyz/common/utils";

@@ -12,6 +12,7 @@ import { groupBy } from "@latticexyz/common/utils";
*
* @param {PgDatabase<any>} database Postgres database object from Drizzle
* @returns {Promise<QueryAdapter>} A set of methods used by tRPC endpoints.
* @deprecated
*/
export async function createQueryAdapter(database: PgDatabase<any>): Promise<QueryAdapter> {
const adapter: QueryAdapter = {
84 changes: 84 additions & 0 deletions packages/store-indexer/src/postgres/deprecated/getLogs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { PgDatabase } from "drizzle-orm/pg-core";
import { Hex } from "viem";
import { StorageAdapterLog, SyncFilter } from "@latticexyz/store-sync";
import { tables } from "@latticexyz/store-sync/postgres";
import { and, asc, eq, or } from "drizzle-orm";
import { bigIntMax } from "@latticexyz/common/utils";
import { recordToLog } from "../recordToLog";
import { createBenchmark } from "@latticexyz/common";

/**
* @deprecated
*/
export async function getLogs(
database: PgDatabase<any>,
{
chainId,
address,
filters = [],
}: {
readonly chainId: number;
readonly address?: Hex;
readonly filters?: readonly SyncFilter[];
}
): Promise<{ blockNumber: bigint; logs: (StorageAdapterLog & { eventName: "Store_SetRecord" })[] }> {
const benchmark = createBenchmark("drizzleGetLogs");

const conditions = filters.length
? filters.map((filter) =>
and(
address != null ? eq(tables.recordsTable.address, address) : undefined,
eq(tables.recordsTable.tableId, filter.tableId),
filter.key0 != null ? eq(tables.recordsTable.key0, filter.key0) : undefined,
filter.key1 != null ? eq(tables.recordsTable.key1, filter.key1) : undefined
)
)
: address != null
? [eq(tables.recordsTable.address, address)]
: [];
benchmark("parse config");

// Query for the block number that the indexer (i.e. chain) is at, in case the
// indexer is further along in the chain than a given store/table's last updated
// block number. We'll then take the highest block number between the indexer's
// chain state and all the records in the query (in case the records updated
// between these queries). Using just the highest block number from the queries
// could potentially signal to the client an older-than-necessary block number,
// for stores/tables that haven't seen recent activity.
// TODO: move the block number query into the records query for atomicity so we don't have to merge them here
const chainState = await database
.select()
.from(tables.configTable)
.where(eq(tables.configTable.chainId, chainId))
.limit(1)
.execute()
// Get the first record in a way that returns a possible `undefined`
// TODO: move this to `.findFirst` after upgrading drizzle or `rows[0]` after enabling `noUncheckedIndexedAccess: true`
.then((rows) => rows.find(() => true));
const indexerBlockNumber = chainState?.lastUpdatedBlockNumber ?? 0n;
benchmark("query chainState");

const records = await database
.select()
.from(tables.recordsTable)
.where(or(...conditions))
.orderBy(
asc(tables.recordsTable.lastUpdatedBlockNumber)
// TODO: add logIndex (https://github.com/latticexyz/mud/issues/1979)
);
benchmark("query records");

const blockNumber = records.reduce(
(max, record) => bigIntMax(max, record.lastUpdatedBlockNumber ?? 0n),
indexerBlockNumber
);
benchmark("find block number");

const logs = records
// TODO: add this to the query, assuming we can optimize with an index
.filter((record) => !record.isDeleted)
.map((record) => recordToLog({ ...record, lastUpdatedBlockNumber: record.lastUpdatedBlockNumber.toString() }));
benchmark("map records to logs");

return { blockNumber, logs };
}
71 changes: 0 additions & 71 deletions packages/store-indexer/src/postgres/fetchLogs.ts

This file was deleted.

135 changes: 64 additions & 71 deletions packages/store-indexer/src/postgres/getLogs.ts
Original file line number Diff line number Diff line change
@@ -1,81 +1,74 @@
import { PgDatabase } from "drizzle-orm/pg-core";
import { Hex } from "viem";
import { StorageAdapterLog, SyncFilter } from "@latticexyz/store-sync";
import { tables } from "@latticexyz/store-sync/postgres";
import { and, asc, eq, or } from "drizzle-orm";
import { bigIntMax } from "@latticexyz/common/utils";
import { Sql } from "postgres";
import { Middleware } from "koa";
import Router from "@koa/router";
import compose from "koa-compose";
import { input } from "@latticexyz/store-sync/trpc-indexer";
import { storeTables } from "@latticexyz/store-sync";
import { queryLogs } from "./queryLogs";
import { recordToLog } from "./recordToLog";
import { debug } from "../debug";
import { createBenchmark } from "@latticexyz/common";
import superjson from "superjson";

export async function getLogs(
database: PgDatabase<any>,
{
chainId,
address,
filters = [],
}: {
readonly chainId: number;
readonly address?: Hex;
readonly filters?: readonly SyncFilter[];
}
): Promise<{ blockNumber: bigint; logs: (StorageAdapterLog & { eventName: "Store_SetRecord" })[] }> {
const benchmark = createBenchmark("getLogs");
export function getLogs(database: Sql): Middleware {
const router = new Router();

const conditions = filters.length
? filters.map((filter) =>
and(
address != null ? eq(tables.recordsTable.address, address) : undefined,
eq(tables.recordsTable.tableId, filter.tableId),
filter.key0 != null ? eq(tables.recordsTable.key0, filter.key0) : undefined,
filter.key1 != null ? eq(tables.recordsTable.key1, filter.key1) : undefined
)
)
: address != null
? [eq(tables.recordsTable.address, address)]
: [];
benchmark("parse config");
router.get("/get/logs", async (ctx) => {
const benchmark = createBenchmark("getLogs");

// Query for the block number that the indexer (i.e. chain) is at, in case the
// indexer is further along in the chain than a given store/table's last updated
// block number. We'll then take the highest block number between the indexer's
// chain state and all the records in the query (in case the records updated
// between these queries). Using just the highest block number from the queries
// could potentially signal to the client an older-than-necessary block number,
// for stores/tables that haven't seen recent activity.
// TODO: move the block number query into the records query for atomicity so we don't have to merge them here
const chainState = await database
.select()
.from(tables.configTable)
.where(eq(tables.configTable.chainId, chainId))
.limit(1)
.execute()
// Get the first record in a way that returns a possible `undefined`
// TODO: move this to `.findFirst` after upgrading drizzle or `rows[0]` after enabling `noUncheckedIndexedAccess: true`
.then((rows) => rows.find(() => true));
const indexerBlockNumber = chainState?.lastUpdatedBlockNumber ?? 0n;
benchmark("query chainState");
try {
const opts = input.parse(typeof ctx.query.input === "string" ? JSON.parse(ctx.query.input) : {});
opts.filters = opts.filters.length > 0 ? [...opts.filters, { tableId: storeTables.Tables.tableId }] : [];
benchmark("parse config");

const records = await database
.select()
.from(tables.recordsTable)
.where(or(...conditions))
.orderBy(
asc(tables.recordsTable.lastUpdatedBlockNumber)
// TODO: add logIndex (https://github.com/latticexyz/mud/issues/1979)
);
benchmark("query records");
const records = await queryLogs(database, opts ?? {}).execute();
benchmark("query records");

const blockNumber = records.reduce(
(max, record) => bigIntMax(max, record.lastUpdatedBlockNumber ?? 0n),
indexerBlockNumber
);
benchmark("find block number");
const logs = records.map(recordToLog);
benchmark("map records to logs");

const logs = records
// TODO: add this to the query, assuming we can optimize with an index
.filter((record) => !record.isDeleted)
.map((record) => recordToLog({ ...record, lastUpdatedBlockNumber: record.lastUpdatedBlockNumber.toString() }));
benchmark("map records to logs");
const blockNumber = BigInt(records[0].chainBlockNumber);

return { blockNumber, logs };
ctx.status = 200;

// TODO: replace superjson with more efficient encoding
ctx.body = superjson.stringify({ blockNumber, logs });
} catch (error) {
ctx.status = 500;
ctx.body = error;
debug(error);
}

// .cursor(100, async (rows) => {
// if (!hasEmittedConfig && rows.length) {
// ctx.send("config", {
// indexerVersion: rows[0].indexerVersion,
// chainId: rows[0].chainId,
// lastUpdatedBlockNumber: rows[0].chainBlockNumber,
// totalRows: rows[0].totalRows,
// });
// hasEmittedConfig = true;
// }

// rows.forEach((row) => {
// ctx.send("log", {
// // TODO: either properly encode bigints in a JSON-safe way or fix these types
// blockNumber: row.chainBlockNumber as unknown as bigint,
// address: row.address,
// eventName: "Store_SetRecord",
// args: {
// tableId: row.tableId,
// keyTuple: decodeDynamicField("bytes32[]", row.keyBytes),
// staticData: row.staticData ?? "0x",
// encodedLengths: row.encodedLengths ?? "0x",
// dynamicData: row.dynamicData ?? "0x",
// },
// });
// });
// });

// TODO: subscribe + continue writing
});

return compose([router.routes(), router.allowedMethods()]) as Middleware;
}
4 changes: 2 additions & 2 deletions packages/store-indexer/src/postgres/types.ts
alvrs marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -7,13 +7,13 @@ export type RecordData = {
staticData: Hex | null;
encodedLengths: Hex | null;
dynamicData: Hex | null;
lastUpdatedBlockNumber: string;
lastUpdatedBlockNumber: bigint;
};

export type RecordMetadata = {
indexerVersion: string;
chainId: string;
chainBlockNumber: string;
chainBlockNumber: bigint;
totalRows: number;
};

Loading