-
Notifications
You must be signed in to change notification settings - Fork 196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(store-sync): sync to RECS #1197
Conversation
🦋 Changeset detectedLatest commit: 543334c The changes in this PR will be included in the next version bump. This PR includes changesets to release 26 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
add usage example
const operationsForTx$ = blockStorageOperations$.pipe( | ||
filter(({ blockNumber }) => blockNumber === receipt.blockNumber), | ||
map(({ operations }) => operations.filter((op) => op.log.transactionHash === tx)) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible that by the time this gets executed the block storage operations were already sent through the pipe and we would't get a result in firstValueFrom
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right, hmmm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated!
import { SchemaToPrimitives } from "../common"; | ||
import { hexKeyTupleToEntity } from "./hexKeyTupleToEntity"; | ||
|
||
export function encodeEntity<TKeySchema extends Record<string, StaticAbiType>>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was gonna move these into RECS but it's a bigger refactor and we have to figure out where to draw lines in terms of type definitions (where should KeySchema
and ValueSchema
live and, if not centralized, is it okay to duplicate them in a few packages?).
Will follow up and move these around later, but keeping them here for simplicity for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking more about this I think it makes sense to keep these utils in here instead of in recs
. In recs
land the entity is just a string, while the encoding and decoding based on key schema is a concept on top of it.
initialState?: { | ||
blockNumber: bigint | null; | ||
tables: (Table & { records: TableRecord[] })[]; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally built this using indexerUrl
that would fetch from the indexer here, but it caused a circular dependency (because of tRPC types).
@alvrs and I talked about moving the indexer code into store-sync
package and reducing the store-indexer
package to just a "indexer runner", but now that I am looking back at this, I quite like the flexibility of being able to provide the initial state from ~anywhere, rather than strictly our indexer.
We could offer the option of either, but I worry about an explosion of options/code paths (provide chain ID or public client, indexer URL or initial state, etc).
Maybe a good middle ground is exposing a helper in store-indexer
that is getInitialState(indexerUrl, chainId, address)
that abstracts over the raw tRPC call:
const indexer = createTRPCProxyClient<AppRouter>({
transformer: superjson,
links: [httpBatchLink({ url: "http://127.0.0.1:3001" })],
});
const state = await indexer.findAll.query({
chainId: publicClient.chain.id,
address: networkConfig.worldAddress as Hex,
});
But honestly, this isn't much code, so maybe it's fine how it is? It would be nice to remove the need to know about our transformer, AppRouter
type, etc. so maybe an abstraction layer over it is nice. 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I quite like the flexibility of being able to provide the initial state from ~anywhere, rather than strictly our indexer.
agree
It would be nice to remove the need to know about our transformer, AppRouter type, etc. so maybe an abstraction layer over it is nice.
agree
What if we move it to one package but keep it modular (so users could do it as shown in your example) but also have a lean wrapper around it with simple options for the "default setup"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here's the approach I landed on:
const indexer = createIndexerClient({ url: "http://127.0.0.1:3001" });
const initialState = await (async () => {
try {
return await indexer.findAll.query({
chainId: publicClient.chain.id,
address: networkConfig.worldAddress as Hex,
});
} catch (error) {
console.log("couldn't get initial state from indexer", error);
return undefined;
}
})();
where indexer
is strongly typed based on the tRPC server types, and abstracts away setting up the tRPC client, transformer, etc.
we could further reduce this to something like getInitialState
but that seemed unnecessary and seems nice to demonstrate using the indexer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll bring these changes over in a follow up to keep this PR focused on populating RECS from store
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated store-indexer package branch so it's built on top of this one: #1198
setComponent(components.SyncProgress, singletonEntity, { | ||
step: SyncStep.SNAPSHOT, | ||
message: `Hydrating from snapshot to block ${initialState.blockNumber}`, | ||
percentage: (recordsProcessed / numRecords) * 100, | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should only update the sync progress every x records (eg 100 or 1000), otherwise every other update event is a progress update event, which might unnecessarily overwhelm the client if there are a lot of records in the initial state (think OPCraft)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a naive thing for now, but would like to move a bunch of the sync stuff over to a less-blocking approach with requestIdleCallback
or similar.
const latestBlockNumber = new BehaviorSubject<bigint | null>(null); | ||
{ | ||
const sub = latestBlockNumber$.subscribe(latestBlockNumber); | ||
world.registerDisposer(() => sub.unsubscribe()); | ||
} | ||
|
||
const blockLogs$ = latestBlockNumber$.pipe( | ||
tap((blockNumber) => { | ||
debug("latest block number", blockNumber); | ||
}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does latestBlockNumber
need to be a BehaviorSubject
? Could we just update a simple variable in this tap? (Would also save us from having to manage the subscription / registering the disposer in the World)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally had wired up like that, but it felt a little dirty/hard to read. I think the BehaviorSubject approach is effectively the same thing, minus the additional subscriber, and felt cleaner to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate on what was harder to read / felt dirtier?
To me it seems like the BehaviorSubject is just unnecessary overhead since we only ever read the current value via .value
. (Like it just feels weird to have all this code for a variable https://github.com/ReactiveX/rxjs/blob/master/src/internal/BehaviorSubject.ts, https://github.com/ReactiveX/rxjs/blob/master/src/internal/Subject.ts, https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts even if there is little overhead at runtime). The other thing that feels a bit weird is that we're breaking the convention of using the $
suffix for streams here.
For the point of easier to read, we could still update the latestBlockNumber
as a variable in a subscription like here instead of in tap
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chatted in person and ended up moving off of behavior subject
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! just some minor non-blocking nits
@@ -115,7 +116,7 @@ export async function syncToRecs< | |||
setComponent(component, entity, record.value as ComponentValue); | |||
|
|||
recordsProcessed++; | |||
if (recordsProcessed % 1000 === 0) { | |||
if (recordsProcessed % recordsPerSyncProgressUpdate === 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
pulled out of #1113