Skip to content

Commit

Permalink
feat(store-sync,store-indexer): sync from getLogs indexer endpoint (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
holic authored Dec 1, 2023
1 parent 747d8d1 commit 5df1f31
Show file tree
Hide file tree
Showing 15 changed files with 289 additions and 152 deletions.
16 changes: 16 additions & 0 deletions .changeset/angry-peas-heal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
"@latticexyz/store-sync": minor
---

Refactored how we fetch snapshots from an indexer, preferring the new `getLogs` endpoint and falling back to the previous `findAll` if it isn't available. This refactor also prepares for an easier entry point for adding client caching of snapshots.

The `initialState` option for various sync methods (`syncToPostgres`, `syncToRecs`, etc.) is now deprecated in favor of `initialBlockLogs`. For now, we'll automatically convert `initialState` into `initialBlockLogs`, but if you want to update your code, you can do:

```ts
import { tablesWithRecordsToLogs } from "@latticexyz/store-sync";

const initialBlockLogs = {
blockNumber: initialState.blockNumber,
logs: tablesWithRecordsToLogs(initialState.tables),
};
```
5 changes: 5 additions & 0 deletions .changeset/wet-crabs-punch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latticexyz/common": minor
---

Updated `chunk` types to use readonly arrays
5 changes: 5 additions & 0 deletions .changeset/wicked-donuts-cheat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latticexyz/store-indexer": minor
---

Added `getLogs` query support to sqlite indexer
29 changes: 16 additions & 13 deletions e2e/packages/sync-test/indexerSync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ describe("Sync from indexer", async () => {
await waitForInitialSync(page);

expect(asyncErrorHandler.getErrors()).toHaveLength(1);
expect(asyncErrorHandler.getErrors()[0]).toContain("error fetching initial state from indexer");
expect(asyncErrorHandler.getErrors()[0]).toContain("error getting snapshot");
});

describe.each([["sqlite"], ["postgres"]] as const)("%s indexer", (indexerType) => {
Expand Down Expand Up @@ -128,18 +128,21 @@ describe("Sync from indexer", async () => {
await waitForInitialSync(page);

const entities = await callPageFunction(page, "getKeys", ["Position"]);
expect(entities).toEqual([
{
x: 1,
y: 1,
zone: "0x6d61703100000000000000000000000000000000000000000000000000000000",
},
{
x: 2,
y: -2,
zone: "0x6d61703100000000000000000000000000000000000000000000000000000000",
},
]);
expect(entities).toEqual(
// TODO: figure out how to make this consistently return the same order? may require https://github.com/latticexyz/mud/issues/1979
expect.arrayContaining([
{
x: 1,
y: 1,
zone: "0x6d61703100000000000000000000000000000000000000000000000000000000",
},
{
x: 2,
y: -2,
zone: "0x6d61703100000000000000000000000000000000000000000000000000000000",
},
])
);

// Should not have thrown errors
asyncErrorHandler.expectNoAsyncErrors();
Expand Down
2 changes: 1 addition & 1 deletion packages/common/src/utils/chunk.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export function* chunk<T>(arr: T[], n: number): Generator<T[], void> {
export function* chunk<T>(arr: readonly T[], n: number): Generator<readonly T[], void> {
for (let i = 0; i < arr.length; i += n) {
yield arr.slice(i, i + n);
}
Expand Down
5 changes: 4 additions & 1 deletion packages/store-indexer/src/postgres/getLogs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ export async function getLogs(
.select()
.from(tables.recordsTable)
.where(or(...conditions))
.orderBy(asc(tables.recordsTable.lastUpdatedBlockNumber));
.orderBy(
asc(tables.recordsTable.lastUpdatedBlockNumber)
// TODO: add logIndex (https://github.com/latticexyz/mud/issues/1979)
);

const blockNumber = records.reduce(
(max, record) => bigIntMax(max, record.lastUpdatedBlockNumber ?? 0n),
Expand Down
56 changes: 7 additions & 49 deletions packages/store-indexer/src/sqlite/createQueryAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { eq } from "drizzle-orm";
import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core";
import { buildTable, chainState, getTables } from "@latticexyz/store-sync/sqlite";
import { QueryAdapter } from "@latticexyz/store-sync/trpc-indexer";
import { debug } from "../debug";
import { getAddress } from "viem";
import { decodeDynamicField } from "@latticexyz/protocol-parser";
import { getTablesWithRecords } from "./getTablesWithRecords";
import { tablesWithRecordsToLogs } from "@latticexyz/store-sync";

/**
* Creates a storage adapter for the tRPC server/client to query data from SQLite.
Expand All @@ -15,51 +12,12 @@ import { decodeDynamicField } from "@latticexyz/protocol-parser";
export async function createQueryAdapter(database: BaseSQLiteDatabase<"sync", any>): Promise<QueryAdapter> {
const adapter: QueryAdapter = {
async getLogs(opts) {
// TODO
throw new Error("Not implemented");
const { blockNumber, tables } = getTablesWithRecords(database, opts);
const logs = tablesWithRecordsToLogs(tables);
return { blockNumber: blockNumber ?? 0n, logs };
},
async findAll({ chainId, address, filters = [] }) {
// If _any_ filter has a table ID, this will filter down all data to just those tables. Which mean we can't yet mix table filters with key-only filters.
// TODO: improve this so we can express this in the query (need to be able to query data across tables more easily)
const tableIds = Array.from(new Set(filters.map((filter) => filter.tableId)));
const tables = getTables(database)
.filter((table) => address == null || getAddress(address) === getAddress(table.address))
.filter((table) => !tableIds.length || tableIds.includes(table.tableId));

const tablesWithRecords = tables.map((table) => {
const sqliteTable = buildTable(table);
const records = database.select().from(sqliteTable).where(eq(sqliteTable.__isDeleted, false)).all();
const filteredRecords = !filters.length
? records
: records.filter((record) => {
const keyTuple = decodeDynamicField("bytes32[]", record.__key);
return filters.some(
(filter) =>
filter.tableId === table.tableId &&
(filter.key0 == null || filter.key0 === keyTuple[0]) &&
(filter.key1 == null || filter.key1 === keyTuple[1])
);
});
return {
...table,
records: filteredRecords.map((record) => ({
key: Object.fromEntries(Object.entries(table.keySchema).map(([name]) => [name, record[name]])),
value: Object.fromEntries(Object.entries(table.valueSchema).map(([name]) => [name, record[name]])),
})),
};
});

const metadata = database.select().from(chainState).where(eq(chainState.chainId, chainId)).all();
const { lastUpdatedBlockNumber } = metadata[0] ?? {};

const result = {
blockNumber: lastUpdatedBlockNumber ?? null,
tables: tablesWithRecords,
};

debug("findAll", chainId, address, result);

return result;
async findAll(opts) {
return getTablesWithRecords(database, opts);
},
};
return adapter;
Expand Down
75 changes: 75 additions & 0 deletions packages/store-indexer/src/sqlite/getTablesWithRecords.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { asc, eq } from "drizzle-orm";
import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core";
import { buildTable, chainState, getTables } from "@latticexyz/store-sync/sqlite";
import { Hex, getAddress } from "viem";
import { decodeDynamicField } from "@latticexyz/protocol-parser";
import { SyncFilter, TableWithRecords } from "@latticexyz/store-sync";

// TODO: refactor sqlite and replace this with getLogs to match postgres (https://github.com/latticexyz/mud/issues/1970)

/**
* @deprecated
* */
export function getTablesWithRecords(
database: BaseSQLiteDatabase<"sync", any>,
{
chainId,
address,
filters = [],
}: {
readonly chainId: number;
readonly address?: Hex;
readonly filters?: readonly SyncFilter[];
}
): { blockNumber: bigint | null; tables: readonly TableWithRecords[] } {
const metadata = database
.select()
.from(chainState)
.where(eq(chainState.chainId, chainId))
.limit(1)
.all()
.find(() => true);

// If _any_ filter has a table ID, this will filter down all data to just those tables. Which mean we can't yet mix table filters with key-only filters.
// TODO: improve this so we can express this in the query (need to be able to query data across tables more easily)
const tableIds = Array.from(new Set(filters.map((filter) => filter.tableId)));
const tables = getTables(database)
.filter((table) => address == null || getAddress(address) === getAddress(table.address))
.filter((table) => !tableIds.length || tableIds.includes(table.tableId));

const tablesWithRecords = tables.map((table) => {
const sqliteTable = buildTable(table);
const records = database
.select()
.from(sqliteTable)
.where(eq(sqliteTable.__isDeleted, false))
.orderBy(
asc(sqliteTable.__lastUpdatedBlockNumber)
// TODO: add logIndex (https://github.com/latticexyz/mud/issues/1979)
)
.all();
const filteredRecords = !filters.length
? records
: records.filter((record) => {
const keyTuple = decodeDynamicField("bytes32[]", record.__key);
return filters.some(
(filter) =>
filter.tableId === table.tableId &&
(filter.key0 == null || filter.key0 === keyTuple[0]) &&
(filter.key1 == null || filter.key1 === keyTuple[1])
);
});
return {
...table,
records: filteredRecords.map((record) => ({
key: Object.fromEntries(Object.entries(table.keySchema).map(([name]) => [name, record[name]])),
value: Object.fromEntries(Object.entries(table.valueSchema).map(([name]) => [name, record[name]])),
})),
};
});

return {
blockNumber: metadata?.lastUpdatedBlockNumber ?? null,
tables: tablesWithRecords,
};
}
18 changes: 13 additions & 5 deletions packages/store-sync/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export type Table = {
export type TableWithRecords = Table & { records: TableRecord[] };

export type StoreEventsLog = Log<bigint, number, false, StoreEventsAbiItem, true, StoreEventsAbi>;
export type BlockLogs = { blockNumber: StoreEventsLog["blockNumber"]; logs: StoreEventsLog[] };
export type BlockLogs = { blockNumber: StoreEventsLog["blockNumber"]; logs: readonly StoreEventsLog[] };

// only two keys for now, to reduce complexity of creating indexes on SQL tables
// TODO: make tableId optional to enable filtering just on keys (any table)
Expand Down Expand Up @@ -90,11 +90,19 @@ export type SyncOptions<TConfig extends StoreConfig = StoreConfig> = {
*/
indexerUrl?: string;
/**
* Optional initial state to hydrate from. Useful if you're hydrating from your own indexer or cache.
* Optional initial state to hydrate from. Useful if you're hydrating from an indexer or cache.
* @deprecated Use `initialLogs` option instead.
*/
initialState?: {
blockNumber: bigint | null;
tables: TableWithRecords[];
blockNumber: bigint;
tables: readonly TableWithRecords[];
};
/**
* Optional initial logs to hydrate from. Useful if you're hydrating from an indexer or cache.
*/
initialBlockLogs?: {
blockNumber: bigint;
logs: readonly StorageAdapterLog[];
};
};

Expand All @@ -108,7 +116,7 @@ export type SyncResult = {

// TODO: add optional, original log to this?
export type StorageAdapterLog = Partial<StoreEventsLog> & UnionPick<StoreEventsLog, "address" | "eventName" | "args">;
export type StorageAdapterBlock = { blockNumber: BlockLogs["blockNumber"]; logs: StorageAdapterLog[] };
export type StorageAdapterBlock = { blockNumber: BlockLogs["blockNumber"]; logs: readonly StorageAdapterLog[] };
export type StorageAdapter = (block: StorageAdapterBlock) => Promise<void>;

export const schemasTableId = storeTables.Tables.tableId;
Expand Down
Loading

0 comments on commit 5df1f31

Please sign in to comment.