diff --git a/.changeset/great-dragons-sit.md b/.changeset/great-dragons-sit.md new file mode 100644 index 0000000000..fd4c38e467 --- /dev/null +++ b/.changeset/great-dragons-sit.md @@ -0,0 +1,5 @@ +--- +"@latticexyz/store-sync": patch +--- + +Added support for streaming logs from the indexer. diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index 121cf0fdd2..2f210fdf2b 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -27,12 +27,16 @@ import { combineLatest, scan, mergeMap, + throwError, } from "rxjs"; import { debug as parentDebug } from "./debug"; import { SyncStep } from "./SyncStep"; import { bigIntMax, chunk, isDefined, waitForIdle } from "@latticexyz/common/utils"; import { getSnapshot } from "./getSnapshot"; +import { fromEventSource } from "./fromEventSource"; import { fetchAndStoreLogs } from "./fetchAndStoreLogs"; +import { isLogsApiResponse } from "./indexer-client/isLogsApiResponse"; +import { toStorageAdatperBlock } from "./indexer-client/toStorageAdapterBlock"; const debug = parentDebug.extend("createStoreSync"); @@ -61,7 +65,7 @@ export async function createStoreSync({ maxBlockRange, initialState, initialBlockLogs, - indexerUrl, + indexerUrl: initialIndexerUrl, }: CreateStoreSyncOptions): Promise { const filters: SyncFilter[] = initialFilters.length || tableIds.length @@ -78,9 +82,17 @@ export async function createStoreSync({ ) : undefined; - const initialBlockLogs$ = defer(async (): Promise => { - const chainId = publicClient.chain?.id ?? (await publicClient.getChainId()); + const indexerUrl = + initialIndexerUrl !== false + ? initialIndexerUrl ?? + (publicClient.chain && "indexerUrl" in publicClient.chain && typeof publicClient.chain.indexerUrl === "string" + ? publicClient.chain.indexerUrl + : undefined) + : undefined; + + const chainId = publicClient.chain?.id ?? (await publicClient.getChainId()); + const initialBlockLogs$ = defer(async (): Promise => { onProgress?.({ step: SyncStep.SNAPSHOT, percentage: 0, @@ -95,15 +107,7 @@ export async function createStoreSync({ filters, initialState, initialBlockLogs, - indexerUrl: - indexerUrl !== false - ? indexerUrl ?? - (publicClient.chain && - "indexerUrl" in publicClient.chain && - typeof publicClient.chain.indexerUrl === "string" - ? publicClient.chain.indexerUrl - : undefined) - : undefined, + indexerUrl, }); onProgress?.({ @@ -199,7 +203,34 @@ export async function createStoreSync({ let endBlock: bigint | null = null; let lastBlockNumberProcessed: bigint | null = null; - const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe( + const storedIndexerLogs$ = indexerUrl + ? startBlock$.pipe( + mergeMap((startBlock) => { + const url = new URL( + `api/logs-live?${new URLSearchParams({ + input: JSON.stringify({ chainId, address, filters }), + block_num: startBlock.toString(), + include_tx_hash: "true", + })}`, + indexerUrl, + ); + return fromEventSource(url); + }), + map((messageEvent) => { + const data = JSON.parse(messageEvent.data); + if (!isLogsApiResponse(data)) { + throw new Error("Received unexpected from indexer:" + messageEvent.data); + } + return toStorageAdatperBlock(data); + }), + concatMap(async (block) => { + await storageAdapter(block); + return block; + }), + ) + : throwError(() => new Error("No indexer URL provided")); + + const storedRpcLogs$ = combineLatest([startBlock$, latestBlockNumber$]).pipe( map(([startBlock, endBlock]) => ({ startBlock, endBlock })), tap((range) => { startBlock = range.startBlock; @@ -215,13 +246,21 @@ export async function createStoreSync({ ? bigIntMax(range.startBlock, lastBlockNumberProcessed + 1n) : range.startBlock, toBlock: range.endBlock, - storageAdapter, logFilter, + storageAdapter, }); return from(storedBlocks); }), - tap(({ blockNumber, logs }) => { + ); + + const storedBlock$ = storedIndexerLogs$.pipe( + catchError((e) => { + debug("failed to stream logs from indexer:", e.message); + debug("falling back to streaming logs from RPC"); + return storedRpcLogs$; + }), + tap(async ({ logs, blockNumber }) => { debug("stored", logs.length, "logs for block", blockNumber); lastBlockNumberProcessed = blockNumber; diff --git a/packages/store-sync/src/fromEventSource.ts b/packages/store-sync/src/fromEventSource.ts new file mode 100644 index 0000000000..86b7b85af8 --- /dev/null +++ b/packages/store-sync/src/fromEventSource.ts @@ -0,0 +1,10 @@ +import { Observable } from "rxjs"; + +export function fromEventSource(url: string | URL): Observable> { + return new Observable((subscriber) => { + const eventSource = new EventSource(url); + eventSource.onmessage = (ev): void => subscriber.next(ev); + eventSource.onerror = (): void => subscriber.error(new Error("Event source failed" + new URL(url).origin)); + return () => eventSource.close(); + }); +} diff --git a/packages/store-sync/src/index.ts b/packages/store-sync/src/index.ts index de276bd499..61f07f1506 100644 --- a/packages/store-sync/src/index.ts +++ b/packages/store-sync/src/index.ts @@ -7,3 +7,4 @@ export * from "./logToTable"; export * from "./tablesWithRecordsToLogs"; export * from "./tableToLog"; export * from "./recordToLog"; +export * from "./logToRecord"; diff --git a/packages/store-sync/src/indexer-client/createIndexerClient.ts b/packages/store-sync/src/indexer-client/createIndexerClient.ts index af5cf57f87..676d1903e6 100644 --- a/packages/store-sync/src/indexer-client/createIndexerClient.ts +++ b/packages/store-sync/src/indexer-client/createIndexerClient.ts @@ -2,6 +2,8 @@ import { z } from "zod"; import { input } from "./input"; import { StorageAdapterBlock } from "../common"; import { Result } from "@latticexyz/common"; +import { isLogsApiResponse } from "./isLogsApiResponse"; +import { toStorageAdatperBlock } from "./toStorageAdapterBlock"; export type CreateIndexerClientOptions = { /** @@ -30,19 +32,14 @@ export function createIndexerClient({ url }: CreateIndexerClientOptions): Indexe // TODO: return a readable stream instead of fetching the entire response at once const result = await response.json(); - if (!isStorageAdapterBlock(result)) { + if (!isLogsApiResponse(result)) { return { error: result }; } - return { ok: { ...result, blockNumber: BigInt(result.blockNumber) } }; + return { ok: toStorageAdatperBlock(result) }; } catch (error) { return { error }; } }, }; } - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -function isStorageAdapterBlock(data: any): data is Omit & { blockNumber: string } { - return data && typeof data.blockNumber === "string" && Array.isArray(data.logs); -} diff --git a/packages/store-sync/src/indexer-client/isLogsApiResponse.ts b/packages/store-sync/src/indexer-client/isLogsApiResponse.ts new file mode 100644 index 0000000000..31e9423760 --- /dev/null +++ b/packages/store-sync/src/indexer-client/isLogsApiResponse.ts @@ -0,0 +1,10 @@ +import { StorageAdapterBlock } from "../common"; + +export type LogsApiResponse = Omit & { blockNumber: string }; + +export function isLogsApiResponse( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + data: any, +): data is LogsApiResponse { + return data && typeof data.blockNumber === "string" && Array.isArray(data.logs); +} diff --git a/packages/store-sync/src/indexer-client/toStorageAdapterBlock.ts b/packages/store-sync/src/indexer-client/toStorageAdapterBlock.ts new file mode 100644 index 0000000000..702a92b712 --- /dev/null +++ b/packages/store-sync/src/indexer-client/toStorageAdapterBlock.ts @@ -0,0 +1,6 @@ +import { StorageAdapterBlock } from "../common"; +import { LogsApiResponse } from "./isLogsApiResponse"; + +export function toStorageAdatperBlock(data: LogsApiResponse): StorageAdapterBlock { + return { ...data, blockNumber: BigInt(data.blockNumber) }; +}