Skip to content

Commit

Permalink
feat(store-sync): add client support for streaming logs from indexer (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
alvrs authored Oct 3, 2024
1 parent 111bb1b commit 61930ee
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 22 deletions.
5 changes: 5 additions & 0 deletions .changeset/great-dragons-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latticexyz/store-sync": patch
---

Added support for streaming logs from the indexer.
69 changes: 54 additions & 15 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@ import {
combineLatest,
scan,
mergeMap,
throwError,
} from "rxjs";
import { debug as parentDebug } from "./debug";
import { SyncStep } from "./SyncStep";
import { bigIntMax, chunk, isDefined, waitForIdle } from "@latticexyz/common/utils";
import { getSnapshot } from "./getSnapshot";
import { fromEventSource } from "./fromEventSource";
import { fetchAndStoreLogs } from "./fetchAndStoreLogs";
import { isLogsApiResponse } from "./indexer-client/isLogsApiResponse";
import { toStorageAdatperBlock } from "./indexer-client/toStorageAdapterBlock";

const debug = parentDebug.extend("createStoreSync");

Expand Down Expand Up @@ -61,7 +65,7 @@ export async function createStoreSync({
maxBlockRange,
initialState,
initialBlockLogs,
indexerUrl,
indexerUrl: initialIndexerUrl,
}: CreateStoreSyncOptions): Promise<SyncResult> {
const filters: SyncFilter[] =
initialFilters.length || tableIds.length
Expand All @@ -78,9 +82,17 @@ export async function createStoreSync({
)
: undefined;

const initialBlockLogs$ = defer(async (): Promise<StorageAdapterBlock | undefined> => {
const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());
const indexerUrl =
initialIndexerUrl !== false
? initialIndexerUrl ??
(publicClient.chain && "indexerUrl" in publicClient.chain && typeof publicClient.chain.indexerUrl === "string"
? publicClient.chain.indexerUrl
: undefined)
: undefined;

const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());

const initialBlockLogs$ = defer(async (): Promise<StorageAdapterBlock | undefined> => {
onProgress?.({
step: SyncStep.SNAPSHOT,
percentage: 0,
Expand All @@ -95,15 +107,7 @@ export async function createStoreSync({
filters,
initialState,
initialBlockLogs,
indexerUrl:
indexerUrl !== false
? indexerUrl ??
(publicClient.chain &&
"indexerUrl" in publicClient.chain &&
typeof publicClient.chain.indexerUrl === "string"
? publicClient.chain.indexerUrl
: undefined)
: undefined,
indexerUrl,
});

onProgress?.({
Expand Down Expand Up @@ -199,7 +203,34 @@ export async function createStoreSync({
let endBlock: bigint | null = null;
let lastBlockNumberProcessed: bigint | null = null;

const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
const storedIndexerLogs$ = indexerUrl
? startBlock$.pipe(
mergeMap((startBlock) => {
const url = new URL(
`api/logs-live?${new URLSearchParams({
input: JSON.stringify({ chainId, address, filters }),
block_num: startBlock.toString(),
include_tx_hash: "true",
})}`,
indexerUrl,
);
return fromEventSource<string>(url);
}),
map((messageEvent) => {
const data = JSON.parse(messageEvent.data);
if (!isLogsApiResponse(data)) {
throw new Error("Received unexpected from indexer:" + messageEvent.data);
}
return toStorageAdatperBlock(data);
}),
concatMap(async (block) => {
await storageAdapter(block);
return block;
}),
)
: throwError(() => new Error("No indexer URL provided"));

const storedRpcLogs$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
map(([startBlock, endBlock]) => ({ startBlock, endBlock })),
tap((range) => {
startBlock = range.startBlock;
Expand All @@ -215,13 +246,21 @@ export async function createStoreSync({
? bigIntMax(range.startBlock, lastBlockNumberProcessed + 1n)
: range.startBlock,
toBlock: range.endBlock,
storageAdapter,
logFilter,
storageAdapter,
});

return from(storedBlocks);
}),
tap(({ blockNumber, logs }) => {
);

const storedBlock$ = storedIndexerLogs$.pipe(
catchError((e) => {
debug("failed to stream logs from indexer:", e.message);
debug("falling back to streaming logs from RPC");
return storedRpcLogs$;
}),
tap(async ({ logs, blockNumber }) => {
debug("stored", logs.length, "logs for block", blockNumber);
lastBlockNumberProcessed = blockNumber;

Expand Down
10 changes: 10 additions & 0 deletions packages/store-sync/src/fromEventSource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Observable } from "rxjs";

export function fromEventSource<T>(url: string | URL): Observable<MessageEvent<T>> {
return new Observable<MessageEvent>((subscriber) => {
const eventSource = new EventSource(url);
eventSource.onmessage = (ev): void => subscriber.next(ev);
eventSource.onerror = (): void => subscriber.error(new Error("Event source failed" + new URL(url).origin));
return () => eventSource.close();
});
}
1 change: 1 addition & 0 deletions packages/store-sync/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from "./logToTable";
export * from "./tablesWithRecordsToLogs";
export * from "./tableToLog";
export * from "./recordToLog";
export * from "./logToRecord";
11 changes: 4 additions & 7 deletions packages/store-sync/src/indexer-client/createIndexerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { z } from "zod";
import { input } from "./input";
import { StorageAdapterBlock } from "../common";
import { Result } from "@latticexyz/common";
import { isLogsApiResponse } from "./isLogsApiResponse";
import { toStorageAdatperBlock } from "./toStorageAdapterBlock";

export type CreateIndexerClientOptions = {
/**
Expand Down Expand Up @@ -30,19 +32,14 @@ export function createIndexerClient({ url }: CreateIndexerClientOptions): Indexe

// TODO: return a readable stream instead of fetching the entire response at once
const result = await response.json();
if (!isStorageAdapterBlock(result)) {
if (!isLogsApiResponse(result)) {
return { error: result };
}

return { ok: { ...result, blockNumber: BigInt(result.blockNumber) } };
return { ok: toStorageAdatperBlock(result) };
} catch (error) {
return { error };
}
},
};
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
function isStorageAdapterBlock(data: any): data is Omit<StorageAdapterBlock, "blockNumber"> & { blockNumber: string } {
return data && typeof data.blockNumber === "string" && Array.isArray(data.logs);
}
10 changes: 10 additions & 0 deletions packages/store-sync/src/indexer-client/isLogsApiResponse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { StorageAdapterBlock } from "../common";

export type LogsApiResponse = Omit<StorageAdapterBlock, "blockNumber"> & { blockNumber: string };

export function isLogsApiResponse(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
data: any,
): data is LogsApiResponse {
return data && typeof data.blockNumber === "string" && Array.isArray(data.logs);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { StorageAdapterBlock } from "../common";
import { LogsApiResponse } from "./isLogsApiResponse";

export function toStorageAdatperBlock(data: LogsApiResponse): StorageAdapterBlock {
return { ...data, blockNumber: BigInt(data.blockNumber) };
}

0 comments on commit 61930ee

Please sign in to comment.