Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store-sync): add support for live sync from indexer #3226

Merged
merged 23 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/great-dragons-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latticexyz/store-sync": patch
---

Added support for streaming logs from the indexer.
61 changes: 46 additions & 15 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import {
combineLatest,
scan,
mergeMap,
throwError,
} from "rxjs";
import { debug as parentDebug } from "./debug";
import { SyncStep } from "./SyncStep";
import { bigIntMax, chunk, isDefined, waitForIdle } from "@latticexyz/common/utils";
import { getSnapshot } from "./getSnapshot";
import { fromEventSource } from "./fromEventSource";
import { fetchAndStoreLogs } from "./fetchAndStoreLogs";

const debug = parentDebug.extend("createStoreSync");
Expand Down Expand Up @@ -61,7 +63,7 @@ export async function createStoreSync({
maxBlockRange,
initialState,
initialBlockLogs,
indexerUrl,
indexerUrl: indexerUrlInput,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit: I tend to name these "initial"

Suggested change
indexerUrl: indexerUrlInput,
indexerUrl: initialIndexerUrl,

}: CreateStoreSyncOptions): Promise<SyncResult> {
const filters: SyncFilter[] =
initialFilters.length || tableIds.length
Expand All @@ -78,9 +80,17 @@ export async function createStoreSync({
)
: undefined;

const initialBlockLogs$ = defer(async (): Promise<StorageAdapterBlock | undefined> => {
const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());
const indexerUrl =
indexerUrlInput !== false
? indexerUrlInput ??
(publicClient.chain && "indexerUrl" in publicClient.chain && typeof publicClient.chain.indexerUrl === "string"
? publicClient.chain.indexerUrl
: undefined)
: undefined;

const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());

const initialBlockLogs$ = defer(async (): Promise<StorageAdapterBlock | undefined> => {
onProgress?.({
step: SyncStep.SNAPSHOT,
percentage: 0,
Expand All @@ -95,15 +105,7 @@ export async function createStoreSync({
filters,
initialState,
initialBlockLogs,
indexerUrl:
indexerUrl !== false
? indexerUrl ??
(publicClient.chain &&
"indexerUrl" in publicClient.chain &&
typeof publicClient.chain.indexerUrl === "string"
? publicClient.chain.indexerUrl
: undefined)
: undefined,
indexerUrl,
});

onProgress?.({
Expand Down Expand Up @@ -199,7 +201,28 @@ export async function createStoreSync({
let endBlock: bigint | null = null;
let lastBlockNumberProcessed: bigint | null = null;

const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
const storedIndexerLogs$ = indexerUrl
? startBlock$.pipe(
mergeMap((startBlock) => {
const url = new URL(
`api/logs-live?${new URLSearchParams({
input: JSON.stringify({ chainId, address, filters }),
block_num: startBlock.toString(),
include_tx_hash: "true",
})}`,
indexerUrl,
);
return fromEventSource<string>(url);
}),
map((messageEvent) => JSON.parse(messageEvent.data) as StorageAdapterBlock),
holic marked this conversation as resolved.
Show resolved Hide resolved
concatMap(async (block) => {
await storageAdapter(block);
return block;
}),
)
: throwError(() => new Error("No indexer URL provided"));
holic marked this conversation as resolved.
Show resolved Hide resolved

const storedRpcLogs$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
map(([startBlock, endBlock]) => ({ startBlock, endBlock })),
tap((range) => {
startBlock = range.startBlock;
Expand All @@ -215,13 +238,21 @@ export async function createStoreSync({
? bigIntMax(range.startBlock, lastBlockNumberProcessed + 1n)
: range.startBlock,
toBlock: range.endBlock,
storageAdapter,
logFilter,
storageAdapter,
});
holic marked this conversation as resolved.
Show resolved Hide resolved

return from(storedBlocks);
}),
tap(({ blockNumber, logs }) => {
);

const storedBlock$ = storedIndexerLogs$.pipe(
catchError((e) => {
debug("error streaming logs from indexer:", e);
debug("falling back to streaming logs from RPC");
return storedRpcLogs$;
}),
Copy link
Member

@holic holic Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we tested the fallback behavior here? any way to make sure e2e tests cover this case like they do for the previous indexers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback behavior already kicks in if no indexer is provided or the existing indexer doesn't support this API, so in that sense it's covered by the existing e2e tests

tap(async ({ logs, blockNumber }) => {
debug("stored", logs.length, "logs for block", blockNumber);
lastBlockNumberProcessed = blockNumber;

Expand Down
10 changes: 10 additions & 0 deletions packages/store-sync/src/fromEventSource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Observable } from "rxjs";

export function fromEventSource<T>(url: string | URL): Observable<MessageEvent<T>> {
return new Observable<MessageEvent>((subscriber) => {
const eventSource = new EventSource(url);
eventSource.onmessage = (ev): void => subscriber.next(ev);
eventSource.onerror = (ev): void => subscriber.error(ev);
holic marked this conversation as resolved.
Show resolved Hide resolved
return () => eventSource.close();
});
}
1 change: 1 addition & 0 deletions packages/store-sync/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from "./logToTable";
export * from "./tablesWithRecordsToLogs";
export * from "./tableToLog";
export * from "./recordToLog";
export * from "./logToRecord";
Loading