Skip to content

Commit

Permalink
feat(store-sync): fetch and store logs
Browse files Browse the repository at this point in the history
  • Loading branch information
holic committed Dec 4, 2023
1 parent 854de07 commit 70091bb
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 60 deletions.
1 change: 0 additions & 1 deletion packages/store-sync/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ export type SyncOptions<TConfig extends StoreConfig = StoreConfig> = {
export type SyncResult = {
latestBlock$: Observable<Block>;
latestBlockNumber$: Observable<bigint>;
blockLogs$: Observable<BlockLogs>;
storedBlockLogs$: Observable<StorageAdapterBlock>;
waitForTransaction: (tx: Hex) => Promise<void>;
};
Expand Down
124 changes: 65 additions & 59 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import { StoreConfig, storeEventsAbi } from "@latticexyz/store";
import { Hex, TransactionReceiptNotFoundError } from "viem";
import { StorageAdapter, StorageAdapterBlock, SyncFilter, SyncOptions, SyncResult, internalTableIds } from "./common";
import { createBlockStream, blockRangeToLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream";
import {
StorageAdapter,
StorageAdapterBlock,
StoreEventsLog,
SyncFilter,
SyncOptions,
SyncResult,
internalTableIds,
} from "./common";
import { createBlockStream } from "@latticexyz/block-logs-stream";
import {
filter,
map,
tap,
mergeMap,
from,
concat,
concatMap,
Expand All @@ -24,6 +31,7 @@ import { debug as parentDebug } from "./debug";
import { SyncStep } from "./SyncStep";
import { bigIntMax, chunk, isDefined } from "@latticexyz/common/utils";
import { getSnapshot } from "./getSnapshot";
import { fetchAndStoreLogs } from "./fetchAndStoreLogs";

const debug = parentDebug.extend("createStoreSync");

Expand Down Expand Up @@ -58,6 +66,16 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
? [...initialFilters, ...tableIds.map((tableId) => ({ tableId })), ...defaultFilters]
: [];

const logFilter = filters.length
? (log: StoreEventsLog): boolean =>
filters.some(
(filter) =>
filter.tableId === log.args.tableId &&
(filter.key0 == null || filter.key0 === log.args.keyTuple[0]) &&
(filter.key1 == null || filter.key1 === log.args.keyTuple[1])
)
: undefined;

const initialBlockLogs$ = defer(async (): Promise<StorageAdapterBlock | undefined> => {
const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());

Expand Down Expand Up @@ -164,71 +182,60 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>

let startBlock: bigint | null = null;
let endBlock: bigint | null = null;
const blockLogs$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
let lastBlockNumberProcessed: bigint | null = null;

const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
map(([startBlock, endBlock]) => ({ startBlock, endBlock })),
tap((range) => {
startBlock = range.startBlock;
endBlock = range.endBlock;
}),
blockRangeToLogs({
publicClient,
address,
events: storeEventsAbi,
// TODO: pass filters in here so we can filter at RPC level
maxBlockRange,
concatMap((range) => {
const storedBlocks = fetchAndStoreLogs({
publicClient,
address,
events: storeEventsAbi,
maxBlockRange,
fromBlock: lastBlockNumberProcessed
? bigIntMax(range.startBlock, lastBlockNumberProcessed + 1n)
: range.startBlock,
toBlock: range.endBlock,
storageAdapter,
logFilter,
});

return from(storedBlocks);
}),
map(({ toBlock, logs }) => {
if (!filters.length) return { toBlock, logs };
const filteredLogs = logs.filter((log) =>
filters.some(
(filter) =>
filter.tableId === log.args.tableId &&
(filter.key0 == null || filter.key0 === log.args.keyTuple[0]) &&
(filter.key1 == null || filter.key1 === log.args.keyTuple[1])
)
);
return { toBlock, logs: filteredLogs };
tap(({ blockNumber, logs }) => {
debug("stored", logs.length, "logs for block", blockNumber);
lastBlockNumberProcessed = blockNumber;

if (startBlock != null && endBlock != null) {
if (blockNumber < endBlock) {
const totalBlocks = endBlock - startBlock;
const processedBlocks = lastBlockNumberProcessed - startBlock;
onProgress?.({
step: SyncStep.RPC,
percentage: Number((processedBlocks * 1000n) / totalBlocks) / 10,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
message: "Hydrating from RPC",
});
} else {
onProgress?.({
step: SyncStep.LIVE,
percentage: 100,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
message: "All caught up!",
});
}
}
}),
mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))),
share()
);

let lastBlockNumberProcessed: bigint | null = null;
const storedBlockLogs$ = concat(
storedInitialBlockLogs$,
blockLogs$.pipe(
concatMap(async (block) => {
await storageAdapter(block);
return block;
}),
tap(({ blockNumber, logs }) => {
debug("stored", logs.length, "logs for block", blockNumber);
lastBlockNumberProcessed = blockNumber;

if (startBlock != null && endBlock != null) {
if (blockNumber < endBlock) {
const totalBlocks = endBlock - startBlock;
const processedBlocks = lastBlockNumberProcessed - startBlock;
onProgress?.({
step: SyncStep.RPC,
percentage: Number((processedBlocks * 1000n) / totalBlocks) / 10,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
message: "Hydrating from RPC",
});
} else {
onProgress?.({
step: SyncStep.LIVE,
percentage: 100,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
message: "All caught up!",
});
}
}
})
)
).pipe(share());
const storedBlockLogs$ = concat(storedInitialBlockLogs$, storedBlock$).pipe(share());

// keep 10 blocks worth processed transactions in memory
const recentBlocksWindow = 10;
Expand Down Expand Up @@ -274,7 +281,6 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
return {
latestBlock$,
latestBlockNumber$,
blockLogs$,
storedBlockLogs$,
waitForTransaction,
};
Expand Down
22 changes: 22 additions & 0 deletions packages/store-sync/src/fetchAndStoreLogs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { FetchLogsOptions, fetchLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream";
import { StoreEventsAbi } from "@latticexyz/store";
import { StorageAdapter, StorageAdapterBlock, StoreEventsLog } from "./common";

type FetchAndStoreLogsOptions = FetchLogsOptions<StoreEventsAbi> & {
storageAdapter: StorageAdapter;
logFilter?: (log: StoreEventsLog) => boolean;
};

export async function* fetchAndStoreLogs({
storageAdapter,
logFilter,
...fetchLogsOptions
}: FetchAndStoreLogsOptions): AsyncGenerator<StorageAdapterBlock> {
for await (const { logs, toBlock } of fetchLogs(fetchLogsOptions)) {
const blocks = groupLogsByBlockNumber(logFilter ? logs.filter(logFilter) : logs, toBlock);
for (const block of blocks) {
await storageAdapter(block);
yield block;
}
}
}

0 comments on commit 70091bb

Please sign in to comment.