Skip to content

Commit

Permalink
feat(store-sync, store-indexer): order logs by logIndex (#2037)
Browse files Browse the repository at this point in the history
  • Loading branch information
alvrs authored Dec 7, 2023
1 parent 017e57a commit 85b9461
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 32 deletions.
6 changes: 6 additions & 0 deletions .changeset/fluffy-days-carry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@latticexyz/store-indexer": major
"@latticexyz/store-sync": major
---

The postgres indexer is now storing the `logIndex` of the last update of a record to be able to return the snapshot logs in the order they were emitted onchain.
4 changes: 2 additions & 2 deletions packages/store-indexer/bin/postgres-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ try {
// TODO: move this to `.findFirst` after upgrading drizzle or `rows[0]` after enabling `noUncheckedIndexedAccess: true`
.then((rows) => rows.find(() => true));

if (chainState?.lastUpdatedBlockNumber != null) {
startBlock = chainState.lastUpdatedBlockNumber + 1n;
if (chainState?.blockNumber != null) {
startBlock = chainState.blockNumber + 1n;
console.log("resuming from block number", startBlock);
}
} catch (error) {
Expand Down
3 changes: 2 additions & 1 deletion packages/store-indexer/src/postgres/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ export type RecordData = {
staticData: Hex | null;
encodedLengths: Hex | null;
dynamicData: Hex | null;
lastUpdatedBlockNumber: string;
recordBlockNumber: string;
logIndex: number;
};

export type RecordMetadata = {
Expand Down
11 changes: 4 additions & 7 deletions packages/store-indexer/src/postgres/deprecated/getLogs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,26 @@ export async function getLogs(
// Get the first record in a way that returns a possible `undefined`
// TODO: move this to `.findFirst` after upgrading drizzle or `rows[0]` after enabling `noUncheckedIndexedAccess: true`
.then((rows) => rows.find(() => true));
const indexerBlockNumber = chainState?.lastUpdatedBlockNumber ?? 0n;
const indexerBlockNumber = chainState?.blockNumber ?? 0n;
benchmark("query chainState");

const records = await database
.select()
.from(tables.recordsTable)
.where(or(...conditions))
.orderBy(
asc(tables.recordsTable.lastUpdatedBlockNumber)
asc(tables.recordsTable.blockNumber)
// TODO: add logIndex (https://github.com/latticexyz/mud/issues/1979)
);
benchmark("query records");

const blockNumber = records.reduce(
(max, record) => bigIntMax(max, record.lastUpdatedBlockNumber ?? 0n),
indexerBlockNumber
);
const blockNumber = records.reduce((max, record) => bigIntMax(max, record.blockNumber ?? 0n), indexerBlockNumber);
benchmark("find block number");

const logs = records
// TODO: add this to the query, assuming we can optimize with an index
.filter((record) => !record.isDeleted)
.map((record) => recordToLog({ ...record, lastUpdatedBlockNumber: record.lastUpdatedBlockNumber.toString() }));
.map(recordToLog);
benchmark("map records to logs");

return { blockNumber, logs };
Expand Down
7 changes: 4 additions & 3 deletions packages/store-indexer/src/postgres/queryLogs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export function queryLogs(sql: Sql, opts: z.infer<typeof input>): PendingQuery<R
SELECT
version AS "indexerVersion",
chain_id AS "chainId",
last_updated_block_number AS "chainBlockNumber"
block_number AS "chainBlockNumber"
FROM ${sql(`${schemaName}.config`)}
LIMIT 1
),
Expand All @@ -58,10 +58,11 @@ export function queryLogs(sql: Sql, opts: z.infer<typeof input>): PendingQuery<R
'0x' || encode(static_data, 'hex') AS "staticData",
'0x' || encode(encoded_lengths, 'hex') AS "encodedLengths",
'0x' || encode(dynamic_data, 'hex') AS "dynamicData",
last_updated_block_number AS "recordBlockNumber"
block_number AS "recordBlockNumber",
log_index AS "logIndex"
FROM ${sql(`${schemaName}.records`)}
${where}
ORDER BY last_updated_block_number ASC
ORDER BY block_number, log_index ASC
)
SELECT
(SELECT COUNT(*) FROM records) AS "totalRows",
Expand Down
4 changes: 3 additions & 1 deletion packages/store-indexer/src/postgres/recordToLog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import { StorageAdapterLog } from "@latticexyz/store-sync";
import { decodeDynamicField } from "@latticexyz/protocol-parser";
import { RecordData } from "./common";

export function recordToLog(record: RecordData): StorageAdapterLog & { eventName: "Store_SetRecord" } {
export function recordToLog(
record: Omit<RecordData, "recordBlockNumber">
): StorageAdapterLog & { eventName: "Store_SetRecord" } {
return {
address: record.address,
eventName: "Store_SetRecord",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ describe("createStorageAdapter", async () => {
expect(await db.select().from(storageAdapter.tables.configTable)).toMatchInlineSnapshot(`
[
{
"blockNumber": 12n,
"chainId": 31337,
"lastUpdatedBlockNumber": 12n,
"version": "0.0.2",
"version": "0.0.4",
},
]
`);
Expand All @@ -71,13 +71,14 @@ describe("createStorageAdapter", async () => {
[
{
"address": "0x6E9474e9c83676B9A71133FF96Db43E7AA0a4342",
"blockNumber": 12n,
"dynamicData": "0x000001a400000045",
"encodedLengths": "0x0000000000000000000000000000000000000000000000000800000000000008",
"isDeleted": false,
"key0": null,
"key1": null,
"keyBytes": "0x",
"lastUpdatedBlockNumber": 12n,
"logIndex": 1,
"staticData": null,
"tableId": "0x746200000000000000000000000000004e756d6265724c697374000000000000",
},
Expand Down
7 changes: 4 additions & 3 deletions packages/store-sync/src/postgres/createStorageAdapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ describe("createStorageAdapter", async () => {
expect(await db.select().from(storageAdapter.tables.configTable)).toMatchInlineSnapshot(`
[
{
"blockNumber": 12n,
"chainId": 31337,
"lastUpdatedBlockNumber": 12n,
"version": "0.0.2",
"version": "0.0.4",
},
]
`);
Expand All @@ -69,13 +69,14 @@ describe("createStorageAdapter", async () => {
[
{
"address": "0x6E9474e9c83676B9A71133FF96Db43E7AA0a4342",
"blockNumber": 12n,
"dynamicData": "0x000001a400000045",
"encodedLengths": "0x0000000000000000000000000000000000000000000000000800000000000008",
"isDeleted": false,
"key0": null,
"key1": null,
"keyBytes": "0x",
"lastUpdatedBlockNumber": 12n,
"logIndex": 1,
"staticData": null,
"tableId": "0x746200000000000000000000000000004e756d6265724c697374000000000000",
},
Expand Down
25 changes: 16 additions & 9 deletions packages/store-sync/src/postgres/createStorageAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ export async function createStorageAdapter<TConfig extends StoreConfig = StoreCo
staticData: log.args.staticData,
encodedLengths: log.args.encodedLengths,
dynamicData: log.args.dynamicData,
lastUpdatedBlockNumber: blockNumber,
blockNumber,
logIndex: log.logIndex ?? 0,
isDeleted: false,
})
.onConflictDoUpdate({
Expand All @@ -63,7 +64,8 @@ export async function createStorageAdapter<TConfig extends StoreConfig = StoreCo
staticData: log.args.staticData,
encodedLengths: log.args.encodedLengths,
dynamicData: log.args.dynamicData,
lastUpdatedBlockNumber: blockNumber,
blockNumber,
logIndex: log.logIndex ?? 0,
isDeleted: false,
},
})
Expand Down Expand Up @@ -105,14 +107,16 @@ export async function createStorageAdapter<TConfig extends StoreConfig = StoreCo
key0: log.args.keyTuple[0],
key1: log.args.keyTuple[1],
staticData: newStaticData,
lastUpdatedBlockNumber: blockNumber,
blockNumber,
logIndex: log.logIndex ?? 0,
isDeleted: false,
})
.onConflictDoUpdate({
target: [tables.recordsTable.address, tables.recordsTable.tableId, tables.recordsTable.keyBytes],
set: {
staticData: newStaticData,
lastUpdatedBlockNumber: blockNumber,
blockNumber,
logIndex: log.logIndex,
isDeleted: false,
},
})
Expand Down Expand Up @@ -155,15 +159,17 @@ export async function createStorageAdapter<TConfig extends StoreConfig = StoreCo
key1: log.args.keyTuple[1],
encodedLengths: log.args.encodedLengths,
dynamicData: newDynamicData,
lastUpdatedBlockNumber: blockNumber,
blockNumber,
logIndex: log.logIndex ?? 0,
isDeleted: false,
})
.onConflictDoUpdate({
target: [tables.recordsTable.address, tables.recordsTable.tableId, tables.recordsTable.keyBytes],
set: {
encodedLengths: log.args.encodedLengths,
dynamicData: newDynamicData,
lastUpdatedBlockNumber: blockNumber,
blockNumber: blockNumber,
logIndex: log.logIndex ?? 0,
isDeleted: false,
},
})
Expand All @@ -181,7 +187,8 @@ export async function createStorageAdapter<TConfig extends StoreConfig = StoreCo
staticData: null,
encodedLengths: null,
dynamicData: null,
lastUpdatedBlockNumber: blockNumber,
blockNumber,
logIndex: log.logIndex ?? 0,
isDeleted: true,
})
.where(
Expand All @@ -200,12 +207,12 @@ export async function createStorageAdapter<TConfig extends StoreConfig = StoreCo
.values({
version,
chainId,
lastUpdatedBlockNumber: blockNumber,
blockNumber,
})
.onConflictDoUpdate({
target: [tables.configTable.chainId],
set: {
lastUpdatedBlockNumber: blockNumber,
blockNumber,
},
})
.execute();
Expand Down
5 changes: 3 additions & 2 deletions packages/store-sync/src/postgres/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const schemaName = transformSchemaName("mud");
const configTable = pgSchema(schemaName).table("config", {
version: varchar("version").notNull(),
chainId: asNumber("chain_id", "bigint").notNull().primaryKey(),
lastUpdatedBlockNumber: asBigInt("last_updated_block_number", "numeric").notNull(),
blockNumber: asBigInt("block_number", "numeric").notNull(),
});

const recordsTable = pgSchema(schemaName).table(
Expand All @@ -28,7 +28,8 @@ const recordsTable = pgSchema(schemaName).table(
encodedLengths: asHex("encoded_lengths"),
dynamicData: asHex("dynamic_data"),
isDeleted: boolean("is_deleted"),
lastUpdatedBlockNumber: asBigInt("last_updated_block_number", "numeric").notNull(),
blockNumber: asBigInt("block_number", "numeric").notNull(),
logIndex: asNumber("log_index", "numeric").notNull(),
},
(table) => ({
pk: primaryKey(table.address, table.tableId, table.keyBytes),
Expand Down
2 changes: 1 addition & 1 deletion packages/store-sync/src/postgres/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export const version = "0.0.2";
export const version = "0.0.4";

0 comments on commit 85b9461

Please sign in to comment.