Skip to content

Commit

Permalink
rearrange again
Browse files Browse the repository at this point in the history
  • Loading branch information
holic committed Sep 23, 2024
1 parent ae9b7a3 commit ad4eb36
Showing 1 changed file with 17 additions and 18 deletions.
35 changes: 17 additions & 18 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ import {
mergeMap,
BehaviorSubject,
ignoreElements,
last,
first,
defaultIfEmpty,
} from "rxjs";
import { debug as parentDebug } from "./debug";
import { SyncStep } from "./SyncStep";
Expand Down Expand Up @@ -244,48 +242,50 @@ export async function createStoreSync({
storageAdapter,
logFilter,
});
const storedBlock$ = from(storedBlocks);

const storedBlock$ = from(storedBlocks).pipe(share());

return concat(
storageAdapterLock$.pipe(
first((lock) => lock === false),
tap(() => storageAdapterLock$.next(true)),
ignoreElements(),
),
storedBlock$,
storedBlock$.pipe(
defaultIfEmpty(null),
last(),
concatMap(async (block) => {
if (block == null) return;
await applyOptimisticLogs(block.blockNumber);
tap(({ blockNumber, logs }) => {
debug("stored", logs.length, "logs for block", blockNumber);
lastBlockNumberProcessed = blockNumber;
}),
),
of(true).pipe(
concatMap(async () => {
if (lastBlockNumberProcessed != null) {
await applyOptimisticLogs(lastBlockNumberProcessed);
}
}),
tap(() => storageAdapterLock$.next(false)),
ignoreElements(),
),
);
}),
tap(({ blockNumber, logs }) => {
debug("stored", logs.length, "logs for block", blockNumber);
lastBlockNumberProcessed = blockNumber;

tap(({ blockNumber }) => {
if (startBlock != null && endBlock != null) {
if (blockNumber < endBlock) {
const totalBlocks = endBlock - startBlock;
const processedBlocks = lastBlockNumberProcessed - startBlock;
const processedBlocks = blockNumber - startBlock;
onProgress?.({
step: SyncStep.RPC,
percentage: Number((processedBlocks * 1000n) / totalBlocks) / 10,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
lastBlockNumberProcessed: blockNumber,
message: "Hydrating from RPC",
});
} else {
onProgress?.({
step: SyncStep.LIVE,
percentage: 100,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
lastBlockNumberProcessed: blockNumber,
message: "All caught up!",
});
}
Expand Down Expand Up @@ -334,10 +334,9 @@ export async function createStoreSync({
if (receipt.status === "success") {
const logs = parseEventLogs({ abi: storeEventsAbi, logs: receipt.logs });
if (logs.length) {
debug("applying", logs.length, "optimistic logs");
// wait for lock to clear
// unclear what happens if two waitForTransaction calls are triggered simultaneously and both get released for the same lock emission?
await firstValueFrom(storageAdapterLock$.pipe(filter((lock) => lock === false)));

storageAdapterLock$.next(true);
optimisticLogs = [...optimisticLogs, ...logs];
await applyOptimisticLogs(lastBlock.blockNumber);
Expand Down

0 comments on commit ad4eb36

Please sign in to comment.