Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store-indexer, store-sync): improve query performance and enable compression, add new api #2026

Merged
merged 24 commits into from
Dec 7, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
pull over compress logic from sse PR
alvrs committed Dec 7, 2023
commit 058e5b785da14b5260fcb9e634d8e1577605acbc
3 changes: 3 additions & 0 deletions packages/common/src/utils/includes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function includes<item>(items: item[], value: any): value is item {
return items.includes(value);
}
1 change: 1 addition & 0 deletions packages/common/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ export * from "./chunk";
export * from "./curry";
export * from "./groupBy";
export * from "./identity";
export * from "./includes";
export * from "./isDefined";
export * from "./isNotNull";
export * from "./iteratorToArray";
2 changes: 2 additions & 0 deletions packages/store-indexer/package.json
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@
"@latticexyz/store-sync": "workspace:*",
"@trpc/client": "10.34.0",
"@trpc/server": "10.34.0",
"accepts": "^1.3.8",
"better-sqlite3": "^8.6.0",
"debug": "^4.3.4",
"dotenv": "^16.0.3",
@@ -58,6 +59,7 @@
"zod": "^3.21.4"
},
"devDependencies": {
"@types/accepts": "^1.3.7",
"@types/better-sqlite3": "^7.6.4",
"@types/cors": "^2.8.13",
"@types/debug": "^4.1.7",
48 changes: 48 additions & 0 deletions packages/store-indexer/src/compress.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { Middleware } from "koa";
import { Readable, Stream } from "node:stream";
import accepts from "accepts";
import { Zlib, createBrotliCompress, createDeflate, createGzip } from "node:zlib";
import { includes } from "@latticexyz/common/utils";

// Loosely based on https://github.com/holic/koa-compress/blob/master/lib/index.js
// with better handling of streams better with occasional flushing

const encodings = {
br: createBrotliCompress,
gzip: createGzip,
deflate: createDeflate,
} as const;

const encodingNames = Object.keys(encodings) as (keyof typeof encodings)[];

function flushEvery<stream extends Zlib & Readable>(stream: stream, bytesThreshold: number): stream {
let bytesSinceFlush = 0;
stream.on("data", (data) => {
bytesSinceFlush += data.length;
if (bytesSinceFlush > bytesThreshold) {
bytesSinceFlush = 0;
stream.flush();
}
});
return stream;
Comment on lines +19 to +27
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible that the "last" bytes are stuck in the stream if the data isn't a multiple of bytesThreshold? Would we need some kind of timeout after which we flush even if there is no more data event?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is that handled by compressed.end(ctx.body)?

Copy link
Member

@holic holic Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible that the "last" bytes are stuck in the stream if the data isn't a multiple of bytesThreshold? Would we need some kind of timeout after which we flush even if there is no more data event?

This might be possible for an open and long running stream, yes. But shouldn't matter as much while we're closing the stream when we get to the end of the query cursor.

The stream is flushed when the stream closes (which is separate from compressed.end(ctx.body), which pushes+closes the entirety of the body to the compressed, for non-streaming bodies).

I think it would be best to expose a .flush method in this middleware that we can access downstream in the eventStream middleware, but it was tricky to get it right with types, so I punted on that for now.

}

type CompressOptions = {
flushThreshold?: number;
};

export function compress({ flushThreshold = 1024 * 4 }: CompressOptions = {}): Middleware {
return async function compressMiddleware(ctx, next) {
ctx.vary("Accept-Encoding");

await next();

const encoding = accepts(ctx.req).encoding(encodingNames);
if (!includes(encodingNames, encoding)) return;

const compressed = flushEvery(encodings[encoding](), flushThreshold);

ctx.set("Content-Encoding", encoding);
ctx.body = ctx.body instanceof Stream ? ctx.body.pipe(compressed) : compressed.end(ctx.body);
};
}
33 changes: 2 additions & 31 deletions packages/store-indexer/src/postgres/getLogs.ts
Original file line number Diff line number Diff line change
@@ -9,11 +9,12 @@ import { recordToLog } from "./recordToLog";
import { debug } from "../debug";
import { createBenchmark } from "@latticexyz/common";
import superjson from "superjson";
import { compress } from "../compress";

export function getLogs(database: Sql): Middleware {
const router = new Router();

router.get("/get/logs", async (ctx) => {
router.get("/get/logs", compress(), async (ctx) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is now basically a REST API, can we call this /api/logs and name this function/module api or apiRoutes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep! was waiting for naming feedback, wasn't very happy with /get

const benchmark = createBenchmark("getLogs");

try {
@@ -38,36 +39,6 @@ export function getLogs(database: Sql): Middleware {
ctx.body = error;
debug(error);
}

// .cursor(100, async (rows) => {
// if (!hasEmittedConfig && rows.length) {
// ctx.send("config", {
// indexerVersion: rows[0].indexerVersion,
// chainId: rows[0].chainId,
// lastUpdatedBlockNumber: rows[0].chainBlockNumber,
// totalRows: rows[0].totalRows,
// });
// hasEmittedConfig = true;
// }

// rows.forEach((row) => {
// ctx.send("log", {
// // TODO: either properly encode bigints in a JSON-safe way or fix these types
// blockNumber: row.chainBlockNumber as unknown as bigint,
// address: row.address,
// eventName: "Store_SetRecord",
// args: {
// tableId: row.tableId,
// keyTuple: decodeDynamicField("bytes32[]", row.keyBytes),
// staticData: row.staticData ?? "0x",
// encodedLengths: row.encodedLengths ?? "0x",
// dynamicData: row.dynamicData ?? "0x",
// },
// });
// });
// });

// TODO: subscribe + continue writing
});

return compose([router.routes(), router.allowedMethods()]) as Middleware;
8 changes: 7 additions & 1 deletion pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.