Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store-sync): sync to RECS #1197

Merged
merged 13 commits into from
Jul 31, 2023
Merged

feat(store-sync): sync to RECS #1197

merged 13 commits into from
Jul 31, 2023

Conversation

holic
Copy link
Member

@holic holic commented Jul 27, 2023

pulled out of #1113

@changeset-bot
Copy link

changeset-bot bot commented Jul 27, 2023

🦋 Changeset detected

Latest commit: 543334c

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 26 packages
Name Type
@latticexyz/store-sync Patch
@latticexyz/block-logs-stream Patch
@latticexyz/cli Patch
@latticexyz/common Patch
@latticexyz/config Patch
create-mud Patch
@latticexyz/dev-tools Patch
@latticexyz/ecs-browser Patch
@latticexyz/gas-report Patch
@latticexyz/network Patch
@latticexyz/noise Patch
@latticexyz/phaserx Patch
@latticexyz/protocol-parser Patch
@latticexyz/react Patch
@latticexyz/recs Patch
@latticexyz/schema-type Patch
@latticexyz/services Patch
@latticexyz/solecs Patch
solhint-config-mud Patch
solhint-plugin-mud Patch
@latticexyz/std-client Patch
@latticexyz/std-contracts Patch
@latticexyz/store-cache Patch
@latticexyz/store Patch
@latticexyz/utils Patch
@latticexyz/world Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@holic holic marked this pull request as ready for review July 27, 2023 13:34
@holic holic requested a review from alvrs as a code owner July 27, 2023 13:34
add usage example
Comment on lines 194 to 197
const operationsForTx$ = blockStorageOperations$.pipe(
filter(({ blockNumber }) => blockNumber === receipt.blockNumber),
map(({ operations }) => operations.filter((op) => op.log.transactionHash === tx))
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that by the time this gets executed the block storage operations were already sent through the pipe and we would't get a result in firstValueFrom?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right, hmmm

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated!

@holic holic changed the title feat(store-sync): add sync to RECS strategy and utils feat(store-sync): sync to RECS Jul 27, 2023
@holic holic requested a review from alvrs July 28, 2023 09:51
import { SchemaToPrimitives } from "../common";
import { hexKeyTupleToEntity } from "./hexKeyTupleToEntity";

export function encodeEntity<TKeySchema extends Record<string, StaticAbiType>>(
Copy link
Member Author

@holic holic Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was gonna move these into RECS but it's a bigger refactor and we have to figure out where to draw lines in terms of type definitions (where should KeySchema and ValueSchema live and, if not centralized, is it okay to duplicate them in a few packages?).

Will follow up and move these around later, but keeping them here for simplicity for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking more about this I think it makes sense to keep these utils in here instead of in recs. In recs land the entity is just a string, while the encoding and decoding based on key schema is a concept on top of it.

initialState?: {
blockNumber: bigint | null;
tables: (Table & { records: TableRecord[] })[];
};
Copy link
Member Author

@holic holic Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally built this using indexerUrl that would fetch from the indexer here, but it caused a circular dependency (because of tRPC types).

@alvrs and I talked about moving the indexer code into store-sync package and reducing the store-indexer package to just a "indexer runner", but now that I am looking back at this, I quite like the flexibility of being able to provide the initial state from ~anywhere, rather than strictly our indexer.

We could offer the option of either, but I worry about an explosion of options/code paths (provide chain ID or public client, indexer URL or initial state, etc).

Maybe a good middle ground is exposing a helper in store-indexer that is getInitialState(indexerUrl, chainId, address) that abstracts over the raw tRPC call:

const indexer = createTRPCProxyClient<AppRouter>({
  transformer: superjson,
  links: [httpBatchLink({ url: "http://127.0.0.1:3001" })],
});
const state = await indexer.findAll.query({
  chainId: publicClient.chain.id,
  address: networkConfig.worldAddress as Hex,
});

But honestly, this isn't much code, so maybe it's fine how it is? It would be nice to remove the need to know about our transformer, AppRouter type, etc. so maybe an abstraction layer over it is nice. 🤷

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I quite like the flexibility of being able to provide the initial state from ~anywhere, rather than strictly our indexer.

agree

It would be nice to remove the need to know about our transformer, AppRouter type, etc. so maybe an abstraction layer over it is nice.

agree

What if we move it to one package but keep it modular (so users could do it as shown in your example) but also have a lean wrapper around it with simple options for the "default setup"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here's the approach I landed on:

const indexer = createIndexerClient({ url: "http://127.0.0.1:3001" });
const initialState = await (async () => {
  try {
    return await indexer.findAll.query({
      chainId: publicClient.chain.id,
      address: networkConfig.worldAddress as Hex,
    });
  } catch (error) {
    console.log("couldn't get initial state from indexer", error);
    return undefined;
  }
})();

where indexer is strongly typed based on the tRPC server types, and abstracts away setting up the tRPC client, transformer, etc.

we could further reduce this to something like getInitialState but that seemed unnecessary and seems nice to demonstrate using the indexer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, took another pass and got it down to an option in syncToRecs

image

internally in syncToRecs, it does this:

image

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll bring these changes over in a follow up to keep this PR focused on populating RECS from store

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated store-indexer package branch so it's built on top of this one: #1198

Comment on lines 117 to 121
setComponent(components.SyncProgress, singletonEntity, {
step: SyncStep.SNAPSHOT,
message: `Hydrating from snapshot to block ${initialState.blockNumber}`,
percentage: (recordsProcessed / numRecords) * 100,
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should only update the sync progress every x records (eg 100 or 1000), otherwise every other update event is a progress update event, which might unnecessarily overwhelm the client if there are a lot of records in the initial state (think OPCraft)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a naive thing for now, but would like to move a bunch of the sync stuff over to a less-blocking approach with requestIdleCallback or similar.

Comment on lines 139 to 148
const latestBlockNumber = new BehaviorSubject<bigint | null>(null);
{
const sub = latestBlockNumber$.subscribe(latestBlockNumber);
world.registerDisposer(() => sub.unsubscribe());
}

const blockLogs$ = latestBlockNumber$.pipe(
tap((blockNumber) => {
debug("latest block number", blockNumber);
}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does latestBlockNumber need to be a BehaviorSubject? Could we just update a simple variable in this tap? (Would also save us from having to manage the subscription / registering the disposer in the World)

Copy link
Member Author

@holic holic Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally had wired up like that, but it felt a little dirty/hard to read. I think the BehaviorSubject approach is effectively the same thing, minus the additional subscriber, and felt cleaner to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on what was harder to read / felt dirtier?

To me it seems like the BehaviorSubject is just unnecessary overhead since we only ever read the current value via .value. (Like it just feels weird to have all this code for a variable https://github.com/ReactiveX/rxjs/blob/master/src/internal/BehaviorSubject.ts, https://github.com/ReactiveX/rxjs/blob/master/src/internal/Subject.ts, https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts even if there is little overhead at runtime). The other thing that feels a bit weird is that we're breaking the convention of using the $ suffix for streams here.

For the point of easier to read, we could still update the latestBlockNumber as a variable in a subscription like here instead of in tap.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chatted in person and ended up moving off of behavior subject

alvrs
alvrs previously approved these changes Jul 31, 2023
Copy link
Member

@alvrs alvrs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm! just some minor non-blocking nits

@@ -115,7 +116,7 @@ export async function syncToRecs<
setComponent(component, entity, record.value as ComponentValue);

recordsProcessed++;
if (recordsProcessed % 1000 === 0) {
if (recordsProcessed % recordsPerSyncProgressUpdate === 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

@holic holic merged commit 9e5baf4 into main Jul 31, 2023
@holic holic deleted the holic/store-sync-recs branch July 31, 2023 12:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants