Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store-indexer,store-sync): filter by table and key #1794

Merged
merged 15 commits into from
Oct 25, 2023
1 change: 1 addition & 0 deletions packages/store-indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"@fastify/cors": "^8.3.0",
"@latticexyz/block-logs-stream": "workspace:*",
"@latticexyz/common": "workspace:*",
"@latticexyz/protocol-parser": "workspace:*",
"@latticexyz/store": "workspace:*",
"@latticexyz/store-sync": "workspace:*",
"@trpc/client": "10.34.0",
Expand Down
20 changes: 18 additions & 2 deletions packages/store-indexer/src/postgres/createQueryAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { buildTable, buildInternalTables, getTables } from "@latticexyz/store-sy
import { QueryAdapter } from "@latticexyz/store-sync/trpc-indexer";
import { debug } from "../debug";
import { getAddress } from "viem";
import { isDefined } from "@latticexyz/common/utils";
import { decodeDynamicField } from "@latticexyz/protocol-parser";

/**
* Creates a query adapter for the tRPC server/client to query data from Postgres.
Expand All @@ -13,7 +15,10 @@ import { getAddress } from "viem";
*/
export async function createQueryAdapter(database: PgDatabase<any>): Promise<QueryAdapter> {
const adapter: QueryAdapter = {
async findAll({ chainId, address, tableIds = [] }) {
async findAll({ chainId, address, filters = [] }) {
// If _any_ filter has a table ID, this will filter down all data to just those tables. Which mean we can't yet mix table filters with key-only filters.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this to the changeset? Might make it easier to update in the docs later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up making tableId required for now, so this comment doesn't really apply but I left it here so that we revisit this once we make tableId optional.

// TODO: improve this so we can express this in the query (likely need to build and query the "megatable" rather than each distinct SQL table)
const tableIds = Array.from(new Set(filters.map((filter) => filter.tableId).filter(isDefined)));
const tables = (await getTables(database))
.filter((table) => address == null || getAddress(address) === getAddress(table.address))
.filter((table) => !tableIds.length || tableIds.includes(table.tableId));
Expand All @@ -22,9 +27,20 @@ export async function createQueryAdapter(database: PgDatabase<any>): Promise<Que
tables.map(async (table) => {
const sqliteTable = buildTable(table);
const records = await database.select().from(sqliteTable).where(eq(sqliteTable.__isDeleted, false)).execute();
const filteredRecords = !filters.length
? records
: records.filter((record) => {
const keyTuple = decodeDynamicField("bytes32[]", record.__key);
return filters.some(
(filter) =>
(filter.tableId == null || filter.tableId === table.tableId) &&
(filter.key0 == null || filter.key0 === keyTuple[0]) &&
(filter.key1 == null || filter.key1 === keyTuple[1])
);
});
return {
...table,
records: records.map((record) => ({
records: filteredRecords.map((record) => ({
key: Object.fromEntries(Object.entries(table.keySchema).map(([name]) => [name, record[name]])),
value: Object.fromEntries(Object.entries(table.valueSchema).map(([name]) => [name, record[name]])),
})),
Expand Down
20 changes: 18 additions & 2 deletions packages/store-indexer/src/sqlite/createQueryAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { buildTable, chainState, getTables } from "@latticexyz/store-sync/sqlite
import { QueryAdapter } from "@latticexyz/store-sync/trpc-indexer";
import { debug } from "../debug";
import { getAddress } from "viem";
import { isDefined } from "@latticexyz/common/utils";
import { decodeDynamicField } from "@latticexyz/protocol-parser";

/**
* Creates a storage adapter for the tRPC server/client to query data from SQLite.
Expand All @@ -13,17 +15,31 @@ import { getAddress } from "viem";
*/
export async function createQueryAdapter(database: BaseSQLiteDatabase<"sync", any>): Promise<QueryAdapter> {
const adapter: QueryAdapter = {
async findAll({ chainId, address, tableIds = [] }) {
async findAll({ chainId, address, filters = [] }) {
// If _any_ filter has a table ID, this will filter down all data to just those tables. Which mean we can't yet mix table filters with key-only filters.
// TODO: improve this so we can express this in the query (likely need to build and query the "megatable" rather than each distinct SQL table)
const tableIds = Array.from(new Set(filters.map((filter) => filter.tableId).filter(isDefined)));
const tables = getTables(database)
.filter((table) => address == null || getAddress(address) === getAddress(table.address))
.filter((table) => !tableIds.length || tableIds.includes(table.tableId));

const tablesWithRecords = tables.map((table) => {
const sqliteTable = buildTable(table);
const records = database.select().from(sqliteTable).where(eq(sqliteTable.__isDeleted, false)).all();
const filteredRecords = !filters.length
? records
: records.filter((record) => {
const keyTuple = decodeDynamicField("bytes32[]", record.__key);
return filters.some(
(filter) =>
(filter.tableId == null || filter.tableId === table.tableId) &&
(filter.key0 == null || filter.key0 === keyTuple[0]) &&
(filter.key1 == null || filter.key1 === keyTuple[1])
);
});
return {
...table,
records: records.map((record) => ({
records: filteredRecords.map((record) => ({
key: Object.fromEntries(Object.entries(table.keySchema).map(([name]) => [name, record[name]])),
value: Object.fromEntries(Object.entries(table.valueSchema).map(([name]) => [name, record[name]])),
})),
Expand Down
12 changes: 10 additions & 2 deletions packages/store-sync/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ export type TableWithRecords = Table & { records: TableRecord[] };
export type StoreEventsLog = Log<bigint, number, false, StoreEventsAbiItem, true, StoreEventsAbi>;
export type BlockLogs = { blockNumber: StoreEventsLog["blockNumber"]; logs: StoreEventsLog[] };

// TODO: should we add address here?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best if we keep address outside of this to align with eth_getLogs filtering capability (where address is separate from topics): https://ethereum.org/en/developers/docs/apis/json-rpc/#eth_getlogs

export type SyncFilter = {
tableId?: Hex;
key0?: Hex;
key1?: Hex;
// TODO: decide how many keys to support
};

export type SyncOptions<TConfig extends StoreConfig = StoreConfig> = {
/**
* MUD config
Expand All @@ -42,9 +50,9 @@ export type SyncOptions<TConfig extends StoreConfig = StoreConfig> = {
*/
address?: Address;
/**
* Optional table IDs to filter indexer state and RPC state.
* Optional filters for indexer and RPC state. Useful to narrow down the data received by the client for large worlds.
*/
tableIds?: Hex[];
holic marked this conversation as resolved.
Show resolved Hide resolved
filters?: SyncFilter[];
/**
* Optional block number to start indexing from. Useful for resuming the indexer from a particular point in time or starting after a particular contract deployment.
*/
Expand Down
22 changes: 19 additions & 3 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
StorageAdapter,
StorageAdapterBlock,
StorageAdapterLog,
SyncFilter,
SyncOptions,
SyncResult,
TableWithRecords,
Expand Down Expand Up @@ -36,6 +37,8 @@ import { internalTableIds } from "./internalTableIds";

const debug = parentDebug.extend("createStoreSync");

const defaultFilters: SyncFilter[] = internalTableIds.map((tableId) => ({ tableId }));

type CreateStoreSyncOptions<TConfig extends StoreConfig = StoreConfig> = SyncOptions<TConfig> & {
storageAdapter: StorageAdapter;
onProgress?: (opts: {
Expand All @@ -52,13 +55,13 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
onProgress,
publicClient,
address,
tableIds = [],
filters: initialFilters = [],
startBlock: initialStartBlock = 0n,
maxBlockRange,
initialState,
indexerUrl,
}: CreateStoreSyncOptions<TConfig>): Promise<SyncResult> {
const includedTableIds = new Set(tableIds.length ? [...internalTableIds, ...tableIds] : []);
const filters = initialFilters.length ? [...initialFilters, ...defaultFilters] : [];
const initialState$ = defer(
async (): Promise<
| {
Expand All @@ -82,7 +85,7 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>

const indexer = createIndexerClient({ url: indexerUrl });
const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());
const result = await indexer.findAll.query({ chainId, address, tableIds: Array.from(includedTableIds) });
const result = await indexer.findAll.query({ chainId, address, filters });

onProgress?.({
step: SyncStep.SNAPSHOT,
Expand Down Expand Up @@ -198,8 +201,21 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
publicClient,
address,
events: storeEventsAbi,
// TODO: pass filters in here so we can filter at RPC level
maxBlockRange,
}),
map(({ toBlock, logs }) => {
if (!filters.length) return { toBlock, logs };
const filteredLogs = logs.filter((log) =>
filters.some(
(filter) =>
(filter.tableId == null || filter.tableId === log.args.tableId) &&
(filter.key0 == null || filter.key0 === log.args.keyTuple[0]) &&
(filter.key1 == null || filter.key1 === log.args.keyTuple[1])
)
);
return { toBlock, logs: filteredLogs };
}),
mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))),
share()
);
Expand Down
12 changes: 2 additions & 10 deletions packages/store-sync/src/postgres/syncToPostgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,15 @@ export async function syncToPostgres<TConfig extends StoreConfig = StoreConfig>(
config,
database,
publicClient,
address,
startBlock,
maxBlockRange,
indexerUrl,
initialState,
startSync = true,
...syncOptions
}: SyncToPostgresOptions<TConfig>): Promise<SyncToPostgresResult> {
const { storageAdapter } = await postgresStorage({ database, publicClient, config });
const storeSync = await createStoreSync({
storageAdapter,
config,
address,
publicClient,
startBlock,
maxBlockRange,
indexerUrl,
initialState,
...syncOptions,
});

const sub = startSync ? storeSync.storedBlockLogs$.subscribe() : null;
Expand Down
14 changes: 2 additions & 12 deletions packages/store-sync/src/recs/syncToRecs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,8 @@ type SyncToRecsResult<TConfig extends StoreConfig = StoreConfig> = SyncResult &
export async function syncToRecs<TConfig extends StoreConfig = StoreConfig>({
world,
config,
address,
publicClient,
startBlock,
maxBlockRange,
initialState,
indexerUrl,
startSync = true,
...syncOptions
}: SyncToRecsOptions<TConfig>): Promise<SyncToRecsResult<TConfig>> {
const { storageAdapter, components } = recsStorage({
world,
Expand All @@ -38,12 +33,7 @@ export async function syncToRecs<TConfig extends StoreConfig = StoreConfig>({
const storeSync = await createStoreSync({
storageAdapter,
config,
address,
publicClient,
startBlock,
maxBlockRange,
indexerUrl,
initialState,
...syncOptions,
onProgress: ({ step, percentage, latestBlockNumber, lastBlockNumberProcessed, message }) => {
// already live, no need for more progress updates
if (getComponentValue(components.SyncProgress, singletonEntity)?.step === SyncStep.LIVE) return;
Expand Down
12 changes: 2 additions & 10 deletions packages/store-sync/src/sqlite/syncToSqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,14 @@ export async function syncToSqlite<TConfig extends StoreConfig = StoreConfig>({
config,
database,
publicClient,
address,
startBlock,
maxBlockRange,
indexerUrl,
initialState,
startSync = true,
...syncOptions
}: SyncToSqliteOptions<TConfig>): Promise<SyncToSqliteResult> {
const storeSync = await createStoreSync({
storageAdapter: await sqliteStorage({ database, publicClient, config }),
config,
address,
publicClient,
startBlock,
maxBlockRange,
indexerUrl,
initialState,
...syncOptions,
});

const sub = startSync ? storeSync.storedBlockLogs$.subscribe() : null;
Expand Down
4 changes: 2 additions & 2 deletions packages/store-sync/src/trpc-indexer/common.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Hex } from "viem";
import { TableWithRecords } from "../common";
import { SyncFilter, TableWithRecords } from "../common";

export type QueryAdapter = {
findAll: (opts: { chainId: number; address?: Hex; tableIds?: Hex[] }) => Promise<{
findAll: (opts: { chainId: number; address?: Hex; filters?: SyncFilter[] }) => Promise<{
blockNumber: bigint | null;
tables: TableWithRecords[];
}>;
Expand Down
14 changes: 11 additions & 3 deletions packages/store-sync/src/trpc-indexer/createAppRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,21 @@ export function createAppRouter() {
z.object({
chainId: z.number(),
address: z.string().refine(isHex).optional(),
tableIds: z.array(z.string().refine(isHex)).optional(),
filters: z
.array(
z.object({
tableId: z.string().refine(isHex).optional(),
key0: z.string().refine(isHex).optional(),
key1: z.string().refine(isHex).optional(),
})
)
.optional(),
})
)
.query(async (opts): ReturnType<QueryAdapter["findAll"]> => {
const { queryAdapter } = opts.ctx;
const { chainId, address, tableIds } = opts.input;
return queryAdapter.findAll({ chainId, address, tableIds });
const { chainId, address, filters } = opts.input;
return queryAdapter.findAll({ chainId, address, filters });
}),
});
}
Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading