Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store-sync): add support for live sync from indexer #3226

Merged
merged 23 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/great-dragons-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latticexyz/store-sync": patch
---

Added support for streaming logs from the indexer.
68 changes: 53 additions & 15 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ import {
combineLatest,
scan,
mergeMap,
throwError,
} from "rxjs";
import { debug as parentDebug } from "./debug";
import { SyncStep } from "./SyncStep";
import { bigIntMax, chunk, isDefined, waitForIdle } from "@latticexyz/common/utils";
import { getSnapshot } from "./getSnapshot";
import { fromEventSource } from "./fromEventSource";
import { fetchAndStoreLogs } from "./fetchAndStoreLogs";
import { isLogsApiResponse } from "./isLogsApiResponse";

const debug = parentDebug.extend("createStoreSync");

Expand Down Expand Up @@ -61,7 +64,7 @@ export async function createStoreSync({
maxBlockRange,
initialState,
initialBlockLogs,
indexerUrl,
indexerUrl: indexerUrlInput,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit: I tend to name these "initial"

Suggested change
indexerUrl: indexerUrlInput,
indexerUrl: initialIndexerUrl,

}: CreateStoreSyncOptions): Promise<SyncResult> {
const filters: SyncFilter[] =
initialFilters.length || tableIds.length
Expand All @@ -78,9 +81,17 @@ export async function createStoreSync({
)
: undefined;

const initialBlockLogs$ = defer(async (): Promise<StorageAdapterBlock | undefined> => {
const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());
const indexerUrl =
indexerUrlInput !== false
? indexerUrlInput ??
(publicClient.chain && "indexerUrl" in publicClient.chain && typeof publicClient.chain.indexerUrl === "string"
? publicClient.chain.indexerUrl
: undefined)
: undefined;

const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());

const initialBlockLogs$ = defer(async (): Promise<StorageAdapterBlock | undefined> => {
onProgress?.({
step: SyncStep.SNAPSHOT,
percentage: 0,
Expand All @@ -95,15 +106,7 @@ export async function createStoreSync({
filters,
initialState,
initialBlockLogs,
indexerUrl:
indexerUrl !== false
? indexerUrl ??
(publicClient.chain &&
"indexerUrl" in publicClient.chain &&
typeof publicClient.chain.indexerUrl === "string"
? publicClient.chain.indexerUrl
: undefined)
: undefined,
indexerUrl,
});

onProgress?.({
Expand Down Expand Up @@ -199,7 +202,34 @@ export async function createStoreSync({
let endBlock: bigint | null = null;
let lastBlockNumberProcessed: bigint | null = null;

const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
const storedIndexerLogs$ = indexerUrl
? startBlock$.pipe(
mergeMap((startBlock) => {
const url = new URL(
`api/logs-live?${new URLSearchParams({
input: JSON.stringify({ chainId, address, filters }),
block_num: startBlock.toString(),
include_tx_hash: "true",
})}`,
indexerUrl,
);
return fromEventSource<string>(url);
}),
map((messageEvent) => {
const data = JSON.parse(messageEvent.data);
if (!isLogsApiResponse(data)) {
throw new Error("Received unexpected from indexer:" + messageEvent.data);
}
return { ...data, blockNumber: BigInt(data.blockNumber) };
Copy link
Member

@holic holic Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not blocking but might be nice to move these lines into a toStorageAdapterBlock since we have this logic in a couple spots

arktype can help a lot with this pattern of parsing+validating+strong types but can save that for a later improvement

const parseManifest = type("string").pipe.try((s) => JSON.parse(s), SystemsManifest);

export const SystemsManifest = type({
systems: [
{
// labels
namespaceLabel: "string",
label: "string",
// resource ID
namespace: "string",
name: "string",
systemId: ["string", ":", (s): s is Hex => isHex(s, { strict: false })],
// abi
abi: "string[]",
worldAbi: "string[]",
},
"[]",
],
createdAt: "number",
});

}),
concatMap(async (block) => {
await storageAdapter(block);
return block;
}),
)
: throwError(() => new Error("No indexer URL provided"));
holic marked this conversation as resolved.
Show resolved Hide resolved

const storedRpcLogs$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
map(([startBlock, endBlock]) => ({ startBlock, endBlock })),
tap((range) => {
startBlock = range.startBlock;
Expand All @@ -215,13 +245,21 @@ export async function createStoreSync({
? bigIntMax(range.startBlock, lastBlockNumberProcessed + 1n)
: range.startBlock,
toBlock: range.endBlock,
storageAdapter,
logFilter,
storageAdapter,
});
holic marked this conversation as resolved.
Show resolved Hide resolved

return from(storedBlocks);
}),
tap(({ blockNumber, logs }) => {
);

const storedBlock$ = storedIndexerLogs$.pipe(
catchError((e) => {
debug("failed to stream logs from indexer:", e.message);
debug("falling back to streaming logs from RPC");
return storedRpcLogs$;
}),
Copy link
Member

@holic holic Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we tested the fallback behavior here? any way to make sure e2e tests cover this case like they do for the previous indexers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback behavior already kicks in if no indexer is provided or the existing indexer doesn't support this API, so in that sense it's covered by the existing e2e tests

tap(async ({ logs, blockNumber }) => {
debug("stored", logs.length, "logs for block", blockNumber);
lastBlockNumberProcessed = blockNumber;

Expand Down
10 changes: 10 additions & 0 deletions packages/store-sync/src/fromEventSource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Observable } from "rxjs";

export function fromEventSource<T>(url: string | URL): Observable<MessageEvent<T>> {
return new Observable<MessageEvent>((subscriber) => {
const eventSource = new EventSource(url);
eventSource.onmessage = (ev): void => subscriber.next(ev);
eventSource.onerror = (): void => subscriber.error(new Error("Event source failed" + new URL(url).origin));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
eventSource.onerror = (): void => subscriber.error(new Error("Event source failed" + new URL(url).origin));
// we immediately close instead of allowing `EventSource` to retry because the logs API doesn't support `Last-Event-ID` yet
eventSource.onerror = (): void => subscriber.error(new Error("Event source failed" + new URL(url).origin));

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since fromEventSource is technically not specific to the logs API, we should probably add an option to fromEventSource to enable/disable retries, so consumers can decide based on the API capabilities. Would leave for a followup

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return () => eventSource.close();
});
}
1 change: 1 addition & 0 deletions packages/store-sync/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from "./logToTable";
export * from "./tablesWithRecordsToLogs";
export * from "./tableToLog";
export * from "./recordToLog";
export * from "./logToRecord";
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { z } from "zod";
import { input } from "./input";
import { StorageAdapterBlock } from "../common";
import { Result } from "@latticexyz/common";
import { isLogsApiResponse } from "../isLogsApiResponse";

export type CreateIndexerClientOptions = {
/**
Expand Down Expand Up @@ -30,7 +31,7 @@ export function createIndexerClient({ url }: CreateIndexerClientOptions): Indexe

// TODO: return a readable stream instead of fetching the entire response at once
const result = await response.json();
if (!isStorageAdapterBlock(result)) {
if (!isLogsApiResponse(result)) {
return { error: result };
}

Expand All @@ -41,8 +42,3 @@ export function createIndexerClient({ url }: CreateIndexerClientOptions): Indexe
},
};
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
function isStorageAdapterBlock(data: any): data is Omit<StorageAdapterBlock, "blockNumber"> & { blockNumber: string } {
return data && typeof data.blockNumber === "string" && Array.isArray(data.logs);
}
10 changes: 10 additions & 0 deletions packages/store-sync/src/isLogsApiResponse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { StorageAdapterBlock } from "./common";

export type LogsApiResponse = Omit<StorageAdapterBlock, "blockNumber"> & { blockNumber: string };

export function isLogsApiResponse(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
data: any,
): data is LogsApiResponse {
return data && typeof data.blockNumber === "string" && Array.isArray(data.logs);
}
Loading