From 15cddaebc2bc0e9f7c613634f405900227675267 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 20 Sep 2024 17:28:06 +0100 Subject: [PATCH 01/15] add reth block range errors, accept regular client --- packages/block-logs-stream/src/fetchLogs.ts | 30 ++++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/packages/block-logs-stream/src/fetchLogs.ts b/packages/block-logs-stream/src/fetchLogs.ts index d6de091619..cc76142a78 100644 --- a/packages/block-logs-stream/src/fetchLogs.ts +++ b/packages/block-logs-stream/src/fetchLogs.ts @@ -1,15 +1,17 @@ import { AbiEvent } from "abitype"; -import { Address, PublicClient, BlockNumber, GetLogsReturnType } from "viem"; +import { Address, Client, BlockNumber, GetLogsReturnType } from "viem"; import { bigIntMin, wait } from "@latticexyz/common/utils"; import { debug } from "./debug"; +import { getAction } from "viem/utils"; +import { getLogs } from "viem/actions"; -export type FetchLogsOptions = { +export type FetchLogsOptions = { /** - * [viem `PublicClient`][0] used for fetching logs from the RPC. + * [viem `Client`][0] used for fetching logs from the RPC. * * [0]: https://viem.sh/docs/clients/public.html */ - publicClient: PublicClient; + publicClient: Client; /** * Optional contract address(es) to fetch logs for. */ @@ -17,7 +19,7 @@ export type FetchLogsOptions = { /** * Events to fetch logs for. */ - events: TAbiEvents; + events: abiEvents; /** * The block number to start fetching logs from (inclusive). */ @@ -36,10 +38,10 @@ export type FetchLogsOptions = { maxRetryCount?: number; }; -export type FetchLogsResult = { +export type FetchLogsResult = { fromBlock: BlockNumber; toBlock: BlockNumber; - logs: GetLogsReturnType; + logs: GetLogsReturnType; }; const RATE_LIMIT_ERRORS = [ @@ -59,6 +61,10 @@ const BLOCK_RANGE_TOO_LARGE_ERRORS = [ // https://github.com/ethereum-optimism/optimism/blob/4fb534ab3d924ac87383e1e70ae4872340d68d9d/proxyd/backend.go#L98 // https://github.com/ethereum-optimism/optimism/blob/4fb534ab3d924ac87383e1e70ae4872340d68d9d/proxyd/rewriter.go#L35 "block is out of range", + // https://github.com/paradigmxyz/reth/blob/b5adf24a65e83bc48da16fd722d369a28d12f644/crates/rpc/rpc-eth-types/src/logs_utils.rs#L25 + "query exceeds max block range", + // https://github.com/paradigmxyz/reth/blob/b5adf24a65e83bc48da16fd722d369a28d12f644/crates/rpc/rpc-eth-types/src/logs_utils.rs#L28 + "query exceeds max results", ]; /** @@ -76,12 +82,12 @@ const BLOCK_RANGE_TOO_LARGE_ERRORS = [ * * @throws Will throw an error if the block range can't be reduced any further. */ -export async function* fetchLogs({ +export async function* fetchLogs({ maxBlockRange = 1000n, maxRetryCount = 3, publicClient, ...getLogsOpts -}: FetchLogsOptions): AsyncGenerator> { +}: FetchLogsOptions): AsyncGenerator> { let fromBlock = getLogsOpts.fromBlock; let blockRange = bigIntMin(maxBlockRange, getLogsOpts.toBlock - fromBlock); let retryCount = 0; @@ -90,7 +96,11 @@ export async function* fetchLogs({ try { const toBlock = fromBlock + blockRange; debug("getting logs", { fromBlock, toBlock }); - const logs = await publicClient.getLogs({ ...getLogsOpts, fromBlock, toBlock, strict: true }); + const logs = await getAction( + publicClient, + getLogs, + "getLogs", + )({ ...getLogsOpts, fromBlock, toBlock, strict: true }); yield { fromBlock, toBlock, logs }; fromBlock = toBlock + 1n; blockRange = bigIntMin(maxBlockRange, getLogsOpts.toBlock - fromBlock); From 7a7611e6febf987205655c9e4eec2f86dc176dfd Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 20 Sep 2024 18:20:52 +0100 Subject: [PATCH 02/15] fetch block logs --- .../block-logs-stream/src/fetchBlockLogs.ts | 25 +++++++++++++++++++ .../src/groupLogsByBlockNumber.ts | 12 ++++----- packages/block-logs-stream/src/index.ts | 1 + 3 files changed, 32 insertions(+), 6 deletions(-) create mode 100644 packages/block-logs-stream/src/fetchBlockLogs.ts diff --git a/packages/block-logs-stream/src/fetchBlockLogs.ts b/packages/block-logs-stream/src/fetchBlockLogs.ts new file mode 100644 index 0000000000..885592e45f --- /dev/null +++ b/packages/block-logs-stream/src/fetchBlockLogs.ts @@ -0,0 +1,25 @@ +import { AbiEvent, Log } from "viem"; +import { GroupLogsByBlockNumberResult, groupLogsByBlockNumber } from "./groupLogsByBlockNumber"; +import { FetchLogsOptions, FetchLogsResult, fetchLogs } from "./fetchLogs"; + +/** + * Fetches all logs from the blockchain for the given range, grouped by block number. + * + * @remarks + * The function will fetch logs according to the given options. + * If the function encounters rate limits, it will retry until `maxRetryCount` is reached. + * If the function encounters a block range that is too large, it will half the block range and retry, until the block range can't be halved anymore. + * + * @param {FetchLogsOptions} options See `FetchLogsOptions`. + * + * @throws Will throw an error if the block range can't be reduced any further. + */ +export async function fetchBlockLogs( + opts: FetchLogsOptions, +): Promise>> { + const results: FetchLogsResult[] = []; + for await (const result of fetchLogs(opts)) { + results.push(result); + } + return groupLogsByBlockNumber(results.flatMap((result) => result.logs)); +} diff --git a/packages/block-logs-stream/src/groupLogsByBlockNumber.ts b/packages/block-logs-stream/src/groupLogsByBlockNumber.ts index 37fd614d35..b5bf5ad483 100644 --- a/packages/block-logs-stream/src/groupLogsByBlockNumber.ts +++ b/packages/block-logs-stream/src/groupLogsByBlockNumber.ts @@ -3,9 +3,9 @@ import { bigIntSort, isDefined } from "@latticexyz/common/utils"; type PartialLog = { blockNumber: bigint; logIndex: number }; -export type GroupLogsByBlockNumberResult = { - blockNumber: TLog["blockNumber"]; - logs: TLog[]; +export type GroupLogsByBlockNumberResult = { + blockNumber: log["blockNumber"]; + logs: readonly log[]; }[]; /** @@ -22,10 +22,10 @@ export type GroupLogsByBlockNumberResult = { * @returns An array of objects where each object represents a distinct block and includes the block number, * the block hash, and an array of logs for that block. */ -export function groupLogsByBlockNumber( - logs: readonly TLog[], +export function groupLogsByBlockNumber( + logs: readonly log[], toBlock?: BlockNumber, -): GroupLogsByBlockNumberResult { +): GroupLogsByBlockNumberResult { const blockNumbers = Array.from(new Set(logs.map((log) => log.blockNumber))); blockNumbers.sort(bigIntSort); diff --git a/packages/block-logs-stream/src/index.ts b/packages/block-logs-stream/src/index.ts index b765828242..3d007724c0 100644 --- a/packages/block-logs-stream/src/index.ts +++ b/packages/block-logs-stream/src/index.ts @@ -1,4 +1,5 @@ export * from "./blockRangeToLogs"; export * from "./createBlockStream"; +export * from "./fetchBlockLogs"; export * from "./fetchLogs"; export * from "./groupLogsByBlockNumber"; From b5ae562b268a5bcaeaabbf83c5c96809ff4db746 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 20 Sep 2024 18:27:19 +0100 Subject: [PATCH 03/15] fetch all logs when looking up deploy --- packages/cli/package.json | 1 + packages/cli/src/deploy/getWorldDeploy.ts | 23 +++++++++++++---------- pnpm-lock.yaml | 3 +++ 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/packages/cli/package.json b/packages/cli/package.json index 65257c49ae..18ff2f89c7 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -42,6 +42,7 @@ "@ark/util": "catalog:", "@aws-sdk/client-kms": "^3.556.0", "@latticexyz/abi-ts": "workspace:*", + "@latticexyz/block-logs-stream": "workspace:*", "@latticexyz/common": "workspace:*", "@latticexyz/config": "workspace:*", "@latticexyz/gas-report": "workspace:*", diff --git a/packages/cli/src/deploy/getWorldDeploy.ts b/packages/cli/src/deploy/getWorldDeploy.ts index 9427f77c93..e3955f3fba 100644 --- a/packages/cli/src/deploy/getWorldDeploy.ts +++ b/packages/cli/src/deploy/getWorldDeploy.ts @@ -1,5 +1,6 @@ import { Client, Address, getAddress, parseAbi } from "viem"; -import { getBlockNumber, getLogs } from "viem/actions"; +import { getBlock } from "viem/actions"; +import { fetchBlockLogs } from "@latticexyz/block-logs-stream"; import { WorldDeploy, worldDeployEvents } from "./common"; import { debug } from "./debug"; import { logsToWorldDeploy } from "./logsToWorldDeploy"; @@ -16,20 +17,22 @@ export async function getWorldDeploy(client: Client, worldAddress: Address): Pro debug("looking up world deploy for", address); - const stateBlock = await getBlockNumber(client); - const logs = await getLogs(client, { - strict: true, + const [fromBlock, toBlock] = await Promise.all([ + getBlock(client, { blockTag: "earliest" }), + getBlock(client, { blockTag: "latest" }), + ]); + + const blockLogs = await fetchBlockLogs({ + publicClient: client, address, events: parseAbi(worldDeployEvents), - // this may fail for certain RPC providers with block range limits - // if so, could potentially use our fetchLogs helper (which does pagination) - fromBlock: "earliest", - toBlock: stateBlock, + fromBlock: fromBlock.number, + toBlock: toBlock.number, }); deploy = { - ...logsToWorldDeploy(logs), - stateBlock, + ...logsToWorldDeploy(blockLogs.flatMap((block) => block.logs)), + stateBlock: toBlock.number, }; deploys.set(address, deploy); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8e27e8f060..f56fb2d432 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -157,6 +157,9 @@ importers: '@latticexyz/abi-ts': specifier: workspace:* version: link:../abi-ts + '@latticexyz/block-logs-stream': + specifier: workspace:* + version: link:../block-logs-stream '@latticexyz/common': specifier: workspace:* version: link:../common From d18bbd295f43e8dcf173315358343ce3c898e11c Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 20 Sep 2024 18:27:35 +0100 Subject: [PATCH 04/15] install sort package for faster git hook --- package.json | 5 +-- pnpm-lock.yaml | 86 +++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 81 insertions(+), 10 deletions(-) diff --git a/package.json b/package.json index 3248223c2c..3c2ad5b267 100644 --- a/package.json +++ b/package.json @@ -15,6 +15,7 @@ "dev": "TSUP_SKIP_DTS=true turbo run dev --concurrency 100", "dist-tag-rm": "pnpm recursive exec -- sh -c 'npm dist-tag rm $(cat package.json | jq -r \".name\") $TAG || true'", "docs:generate:api": "tsx scripts/render-api-docs.ts", + "fix:package-json": "sort-package-json package.json 'packages/*/package.json' 'templates/*/package.json' 'templates/*/packages/*/package.json' 'examples/*/package.json' 'examples/*/packages/*/package.json' 'e2e/*/package.json' 'e2e/*/packages/*/package.json' 'docs/package.json' 'test/*/package.json'", "foundryup": "curl -L https://foundry.paradigm.xyz | bash && bash ~/.foundry/bin/foundryup", "gas-report": "pnpm run --recursive --parallel gas-report", "lint": "pnpm prettier:check && eslint . --ext .ts --ext .tsx", @@ -24,7 +25,6 @@ "release:check": "changeset status --verbose --since=origin/main", "release:publish": "pnpm install && pnpm build && changeset publish", "release:version": "changeset version && pnpm install --lockfile-only && pnpm run changelog:generate", - "sort-package-json": "npx sort-package-json package.json 'packages/*/package.json' 'templates/*/package.json' 'templates/*/packages/*/package.json' 'examples/*/package.json' 'examples/*/packages/*/package.json' 'e2e/*/package.json' 'e2e/*/packages/*/package.json' 'docs/package.json' 'test/*/package.json'", "test": "pnpm run --recursive test", "test:ci": "pnpm run --recursive --parallel test:ci", "type-bench": "pnpm --filter ./test/ts-benchmarks bench", @@ -33,7 +33,7 @@ "lint-staged": { "*.{ts,tsx}": "eslint --cache --fix", "*.{ts,tsx,css,md,mdx,sol}": "prettier --write", - "package.json": "pnpm sort-package-json" + "package.json": "pnpm fix:package-json" }, "devDependencies": { "@ark/attest": "catalog:", @@ -50,6 +50,7 @@ "prettier": "3.2.5", "prettier-plugin-solidity": "1.3.1", "shx": "^0.3.4", + "sort-package-json": "^2.10.1", "tsx": "4.16.2", "turbo": "^1.9.3", "typescript": "5.4.2" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f56fb2d432..c71f9b0904 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -74,6 +74,9 @@ importers: shx: specifier: ^0.3.4 version: 0.3.4 + sort-package-json: + specifier: ^2.10.1 + version: 2.10.1 tsx: specifier: 4.16.2 version: 4.16.2 @@ -806,10 +809,10 @@ importers: version: 8.3.4 jest: specifier: ^29.3.1 - version: 29.5.0(@types/node@18.15.11) + version: 29.5.0(@types/node@20.12.12) ts-jest: specifier: ^29.0.5 - version: 29.0.5(@babel/core@7.21.4)(@jest/types@29.6.3)(babel-jest@29.5.0(@babel/core@7.21.4))(jest@29.5.0(@types/node@18.15.11))(typescript@5.4.2) + version: 29.0.5(@babel/core@7.25.2)(@jest/types@29.6.3)(babel-jest@29.5.0(@babel/core@7.25.2))(jest@29.5.0(@types/node@20.12.12))(typescript@5.4.2) tsup: specifier: ^6.7.0 version: 6.7.0(postcss@8.4.31)(typescript@5.4.2) @@ -1197,10 +1200,10 @@ importers: version: 27.4.1 jest: specifier: ^29.3.1 - version: 29.5.0(@types/node@20.12.12) + version: 29.5.0(@types/node@18.15.11) ts-jest: specifier: ^29.0.5 - version: 29.0.5(@babel/core@7.25.2)(@jest/types@29.6.3)(babel-jest@29.5.0(@babel/core@7.25.2))(jest@29.5.0(@types/node@20.12.12))(typescript@5.4.2) + version: 29.0.5(@babel/core@7.21.4)(@jest/types@29.6.3)(babel-jest@29.5.0(@babel/core@7.21.4))(jest@29.5.0(@types/node@18.15.11))(typescript@5.4.2) tsup: specifier: ^6.7.0 version: 6.7.0(postcss@8.4.31)(typescript@5.4.2) @@ -6375,6 +6378,10 @@ packages: resolution: {integrity: sha512-reYkTUJAZb9gUuZ2RvVCNhVHdg62RHnJ7WJl8ftMi4diZ6NWlciOzQN88pUhSELEwflJht4oQDv0F0BMlwaYtA==} engines: {node: '>=8'} + detect-indent@7.0.1: + resolution: {integrity: sha512-Mc7QhQ8s+cLrnUfU/Ji94vG/r8M26m8f++vyres4ZoojaRDpZ1eSIh/EpzLNwlWuvzSZ3UbDFspjFvTDXe6e/g==} + engines: {node: '>=12.20'} + detect-libc@1.0.3: resolution: {integrity: sha512-pGjwhsmsp4kL2RTz08wcOlGN83otlqHeD/Z5T8GXZB+/YcpQ/dgo+lbU8ZsGxV0HIvqqxo9l7mqYwyYMD9bKDg==} engines: {node: '>=0.10'} @@ -6388,6 +6395,10 @@ packages: resolution: {integrity: sha512-TLz+x/vEXm/Y7P7wn1EJFNLxYpUD4TgMosxY6fAVJUnJMbupHBOncxyWUG9OpTaH9EBD7uFI5LfEgmMOc54DsA==} engines: {node: '>=8'} + detect-newline@4.0.1: + resolution: {integrity: sha512-qE3Veg1YXzGHQhlA6jzebZN2qVf6NX+A7m7qlhCGG30dJixrAQhYOsJjsnBjJkCSmuOPpCk30145fr8FV0bzog==} + engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + detect-node-es@1.1.0: resolution: {integrity: sha512-ypdmJU/TbBby2Dxibuv7ZLW3Bs1QEmM7nHjEANfohJLvE0XVujisn1qPJcZxg+qDucsr+bP6fLD1rPS3AhJ7EQ==} @@ -7226,6 +7237,10 @@ packages: resolution: {integrity: sha512-BrGGraKm2uPqurfGVj/z97/zv8dPleC6x9JBNRTrDNtCkkRF4rPwrQXFgL7+I+q8QSdU4ntLQX2D7KIxSy8nGw==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + get-stdin@9.0.0: + resolution: {integrity: sha512-dVKBjfWisLAicarI2Sf+JuBE/DghV4UzNAVe9yhEJuzeREd3JhOTE9cUaJTeSa77fsbQUK3pcOpJfM59+VKZaA==} + engines: {node: '>=12'} + get-stream@6.0.1: resolution: {integrity: sha512-ts6Wi+2j3jQjqi70w5AlN8DFnkSwC+MqmxEzdEALB2qXZYV3X/b1CTfgPLGJNMeAWxdPfU8FO1ms3NUfaHCPYg==} engines: {node: '>=10'} @@ -7248,6 +7263,9 @@ packages: get-tsconfig@4.7.5: resolution: {integrity: sha512-ZCuZCnlqNzjb4QprAzXKdpp/gh6KTxSJuw3IBsPnV/7fV4NxC9ckB+vPTt8w7fJA0TaSD7c55BR47JD6MEDyDw==} + git-hooks-list@3.1.0: + resolution: {integrity: sha512-LF8VeHeR7v+wAbXqfgRlTSX/1BJR9Q1vEMR8JAz1cEg6GX07+zyj3sAdDvYjj/xnlIfVuGgj4qBei1K3hKH+PA==} + gitconfig@2.0.8: resolution: {integrity: sha512-qOB1QswIHFNKAOPN0pEu7U1iyajLBv3Tz5X630UlkAtKM904I4dO7XIjH84wmR2SUVAgaVR99UC9U4ABJujAJQ==} engines: {node: '>=6', npm: '>=3'} @@ -7302,6 +7320,10 @@ packages: resolution: {integrity: sha512-jhIXaOzy1sb8IyocaruWSn1TjmnBVs8Ayhcy83rmxNJ8q2uWKCAj3CnJY+KpGSXCueAPc0i05kVvVKtP1t9S3g==} engines: {node: '>=10'} + globby@13.2.2: + resolution: {integrity: sha512-Y1zNGV+pzQdh7H39l9zgB4PJqjRNqydvdYCDG4HFXM4XuvSaQQlEc91IU1yALL8gUTDomgBAfz3XJdmUS+oo0w==} + engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + gopd@1.0.1: resolution: {integrity: sha512-d65bNlIadxvpb/A2abVdlqKqV563juRnZ1Wtk6s1sIR8uNsXR70xqIzVqxVf1eTqDunwT2MkczEeaezCKTZhwA==} @@ -7693,6 +7715,10 @@ packages: resolution: {integrity: sha512-Fd4gABb+ycGAmKou8eMftCupSir5lRxqf4aD/vd0cD2qc4HL07OjCeuHMr8Ro4CoMaeCKDB0/ECBOVWjTwUvPQ==} engines: {node: '>=8'} + is-plain-obj@4.1.0: + resolution: {integrity: sha512-+Pgi+vMuUNkJyExiMBt5IlFoMyKnr5zhJ4Uspz58WOhBF5QoIZkFyNHIbBAtHwzVAgk5RtndVNsDRN61/mmDqg==} + engines: {node: '>=12'} + is-plain-object@2.0.4: resolution: {integrity: sha512-h5PpgXkWitc38BBMYawTYMWJHFZJVnBquFE57xFpjB8pJFiF6gZ+bU+WyI/yqXiFR5mdLsgYNaPe8uao6Uv9Og==} engines: {node: '>=0.10.0'} @@ -10009,6 +10035,10 @@ packages: resolution: {integrity: sha512-g9Q1haeby36OSStwb4ntCGGGaKsaVSjQ68fBxoQcutl5fS1vuY18H3wSt3jFyFtrkx+Kz0V1G85A4MyAdDMi2Q==} engines: {node: '>=8'} + slash@4.0.0: + resolution: {integrity: sha512-3dOsAHXXUkQTpOYcoAxLIorMTp4gIQr5IW3iVb7A7lFIp0VHhnynm9izx6TssdrIcVIESAlVjtnO2K8bg+Coew==} + engines: {node: '>=12'} + slice-ansi@2.1.0: resolution: {integrity: sha512-Qu+VC3EwYLldKa1fCxuuvULvSJOKEgk9pi8dZeCVK7TqBfUNTH4sFkk4joj8afVSfAYgJoSOetjx9QWOJ5mYoQ==} engines: {node: '>=6'} @@ -10068,6 +10098,13 @@ packages: react: ^18.0.0 react-dom: ^18.0.0 + sort-object-keys@1.1.3: + resolution: {integrity: sha512-855pvK+VkU7PaKYPc+Jjnmt4EzejQHyhhF33q31qG8x7maDzkeFhAAThdCYay11CISO+qAMwjOBP+fPZe0IPyg==} + + sort-package-json@2.10.1: + resolution: {integrity: sha512-d76wfhgUuGypKqY72Unm5LFnMpACbdxXsLPcL27pOsSrmVqH3PztFp1uq+Z22suk15h7vXmTesuh2aEjdCqb5w==} + hasBin: true + source-map-js@1.0.2: resolution: {integrity: sha512-R0XvVJ9WusLiqTCEiGCmICCMplcCkIwwR11mOSD9CR5u+IXYdiseeEuXCVAjS54zqwkLcPNnmU4OeJ6tUrWhDw==} engines: {node: '>=0.10.0'} @@ -17800,12 +17837,16 @@ snapshots: detect-indent@6.1.0: {} + detect-indent@7.0.1: {} + detect-libc@1.0.3: {} detect-libc@2.0.2: {} detect-newline@3.1.0: {} + detect-newline@4.0.1: {} + detect-node-es@1.1.0: {} didyoumean@1.2.2: {} @@ -18978,6 +19019,8 @@ snapshots: get-port@6.1.2: {} + get-stdin@9.0.0: {} + get-stream@6.0.1: {} get-stream@8.0.1: {} @@ -18999,6 +19042,8 @@ snapshots: dependencies: resolve-pkg-maps: 1.0.0 + git-hooks-list@3.1.0: {} + gitconfig@2.0.8: dependencies: argx: 3.0.2 @@ -19082,6 +19127,14 @@ snapshots: merge2: 1.4.1 slash: 3.0.0 + globby@13.2.2: + dependencies: + dir-glob: 3.0.1 + fast-glob: 3.3.2 + ignore: 5.2.4 + merge2: 1.4.1 + slash: 4.0.0 + gopd@1.0.1: dependencies: get-intrinsic: 1.1.3 @@ -19475,6 +19528,8 @@ snapshots: is-path-inside@3.0.3: {} + is-plain-obj@4.1.0: {} + is-plain-object@2.0.4: dependencies: isobject: 3.0.1 @@ -19972,7 +20027,7 @@ snapshots: jest-util: 29.5.0 natural-compare: 1.4.0 pretty-format: 29.5.0 - semver: 7.6.0 + semver: 7.6.3 transitivePeerDependencies: - supports-color @@ -20920,11 +20975,11 @@ snapshots: node-abi@3.45.0: dependencies: - semver: 7.6.0 + semver: 7.6.3 node-abi@3.52.0: dependencies: - semver: 7.5.0 + semver: 7.6.3 node-abort-controller@3.1.1: {} @@ -20966,7 +21021,7 @@ snapshots: nopt: 6.0.0 npmlog: 6.0.2 rimraf: 3.0.2 - semver: 7.5.0 + semver: 7.6.3 tar: 6.2.0 which: 2.0.2 transitivePeerDependencies: @@ -22269,6 +22324,8 @@ snapshots: slash@3.0.0: {} + slash@4.0.0: {} + slice-ansi@2.1.0: dependencies: ansi-styles: 3.2.1 @@ -22362,6 +22419,19 @@ snapshots: react: 18.2.0 react-dom: 18.2.0(react@18.2.0) + sort-object-keys@1.1.3: {} + + sort-package-json@2.10.1: + dependencies: + detect-indent: 7.0.1 + detect-newline: 4.0.1 + get-stdin: 9.0.0 + git-hooks-list: 3.1.0 + globby: 13.2.2 + is-plain-obj: 4.1.0 + semver: 7.6.3 + sort-object-keys: 1.1.3 + source-map-js@1.0.2: {} source-map-support@0.5.13: From 42169ee51f40de5c4ebc68a6ff2aa05530bd6f92 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 20 Sep 2024 20:58:40 +0100 Subject: [PATCH 05/15] immediate fetch receipt --- packages/store-sync/src/common.ts | 4 +-- packages/store-sync/src/createStoreSync.ts | 31 ++++++++------------ packages/store-sync/src/stash/syncToStash.ts | 13 +++----- 3 files changed, 19 insertions(+), 29 deletions(-) diff --git a/packages/store-sync/src/common.ts b/packages/store-sync/src/common.ts index 44ad08ff9a..91ff8b03fc 100644 --- a/packages/store-sync/src/common.ts +++ b/packages/store-sync/src/common.ts @@ -66,11 +66,11 @@ export type SyncFilter = { key1?: Hex; }; -export type SyncOptions = { +export type SyncOptions = { /** * MUD config */ - config?: config; + config?: StoreConfig; /** * [viem `PublicClient`][0] used for fetching logs from the RPC. * diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index a7a5ce992a..adf38c5311 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -25,21 +25,20 @@ import { catchError, shareReplay, combineLatest, - scan, mergeMap, + BehaviorSubject, } from "rxjs"; import { debug as parentDebug } from "./debug"; import { SyncStep } from "./SyncStep"; import { bigIntMax, chunk, isDefined, waitForIdle } from "@latticexyz/common/utils"; import { getSnapshot } from "./getSnapshot"; import { fetchAndStoreLogs } from "./fetchAndStoreLogs"; -import { Store as StoreConfig } from "@latticexyz/store"; const debug = parentDebug.extend("createStoreSync"); const defaultFilters: SyncFilter[] = internalTableIds.map((tableId) => ({ tableId })); -type CreateStoreSyncOptions = SyncOptions & { +type CreateStoreSyncOptions = SyncOptions & { storageAdapter: StorageAdapter; onProgress?: (opts: { step: SyncStep; @@ -50,7 +49,7 @@ type CreateStoreSyncOptions = SyncOpti }) => void; }; -export async function createStoreSync({ +export async function createStoreSync({ storageAdapter, onProgress, publicClient, @@ -63,7 +62,7 @@ export async function createStoreSync( initialState, initialBlockLogs, indexerUrl, -}: CreateStoreSyncOptions): Promise { +}: CreateStoreSyncOptions): Promise { const filters: SyncFilter[] = initialFilters.length || tableIds.length ? [...initialFilters, ...tableIds.map((tableId) => ({ tableId })), ...defaultFilters] @@ -251,18 +250,16 @@ export async function createStoreSync( share(), ); - const storedBlockLogs$ = concat(storedInitialBlockLogs$, storedBlock$).pipe(share()); - // keep 10 blocks worth processed transactions in memory const recentBlocksWindow = 10; - // most recent block first, for ease of pulling the first one off the array - const recentBlocks$ = storedBlockLogs$.pipe( - scan( - (recentBlocks, block) => [block, ...recentBlocks].slice(0, recentBlocksWindow), - [], - ), - filter((recentBlocks) => recentBlocks.length > 0), - shareReplay(1), + const recentBlocks$ = new BehaviorSubject([]); + + const storedBlockLogs$ = concat(storedInitialBlockLogs$, storedBlock$).pipe( + tap((block) => { + // most recent block first, for ease of pulling the first one off the array + recentBlocks$.next([block, ...recentBlocks$.value].slice(0, recentBlocksWindow)); + }), + share(), ); // TODO: move to its own file so we can test it, have its own debug instance, etc. @@ -285,18 +282,16 @@ export async function createStoreSync( try { const lastBlock = blocks[0]; - debug("fetching tx receipt for block", lastBlock.blockNumber); + debug("fetching tx receipt after seeing block", lastBlock.blockNumber); const { status, blockNumber, transactionHash } = await publicClient.getTransactionReceipt({ hash: tx }); if (lastBlock.blockNumber >= blockNumber) { return { status, blockNumber, transactionHash }; } } catch (e) { const error = e as GetTransactionReceiptErrorType; - if (error.name === "TransactionReceiptNotFoundError") { return; } - throw error; } }), diff --git a/packages/store-sync/src/stash/syncToStash.ts b/packages/store-sync/src/stash/syncToStash.ts index d9000e4e0c..b70d0a810d 100644 --- a/packages/store-sync/src/stash/syncToStash.ts +++ b/packages/store-sync/src/stash/syncToStash.ts @@ -1,9 +1,8 @@ import { getRecord, setRecord, registerTable, Stash } from "@latticexyz/stash/internal"; -import { Address, Client, publicActions } from "viem"; import { createStorageAdapter } from "./createStorageAdapter"; import { defineTable } from "@latticexyz/store/config/v2"; import { SyncStep } from "../SyncStep"; -import { SyncResult } from "../common"; +import { SyncOptions, SyncResult } from "../common"; import { createStoreSync } from "../createStoreSync"; import { getSchemaPrimitives, getValueSchema } from "@latticexyz/protocol-parser/internal"; @@ -28,10 +27,8 @@ export const initialProgress = { message: "Connecting", } satisfies getSchemaPrimitives>; -export type SyncToStashOptions = { +export type SyncToStashOptions = Omit & { stash: Stash; - client: Client; - address: Address; startSync?: boolean; }; @@ -41,18 +38,16 @@ export type SyncToStashResult = SyncResult & { export async function syncToStash({ stash, - client, - address, startSync = true, + ...opts }: SyncToStashOptions): Promise { registerTable({ stash, table: SyncProgress }); const storageAdapter = createStorageAdapter({ stash }); const sync = await createStoreSync({ + ...opts, storageAdapter, - publicClient: client.extend(publicActions) as never, - address, onProgress: (nextValue) => { const currentValue = getRecord({ stash, table: SyncProgress, key: {} }); // update sync progress until we're caught up and live From 63fcc48741099f5b4e8c397e814f873f9a2a86b1 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 20 Sep 2024 21:15:49 +0100 Subject: [PATCH 06/15] apply optimistic logs --- packages/store-sync/src/createStoreSync.ts | 40 +++++++++++++++++++--- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index adf38c5311..3c175919c5 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -1,5 +1,5 @@ import { storeEventsAbi } from "@latticexyz/store"; -import { GetTransactionReceiptErrorType, Hex } from "viem"; +import { GetTransactionReceiptErrorType, Hex, parseEventLogs } from "viem"; import { StorageAdapter, StorageAdapterBlock, @@ -10,7 +10,7 @@ import { internalTableIds, WaitForTransactionResult, } from "./common"; -import { createBlockStream } from "@latticexyz/block-logs-stream"; +import { createBlockStream, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream"; import { filter, map, @@ -198,6 +198,23 @@ export async function createStoreSync({ let startBlock: bigint | null = null; let endBlock: bigint | null = null; let lastBlockNumberProcessed: bigint | null = null; + let optimisticLogs: readonly StoreEventsLog[] = []; + + // For chains that provide guaranteed receipts ahead of block mining, we can apply the logs immediately. + // This works because, once the block is mined, the same logs will be applied. Store events are defined in + // such a way that reapplying the same logs, even if the order changes, will mean that the storage adapter + // is kept up to date. + async function applyOptimisticLogs(): Promise { + const blocks = groupLogsByBlockNumber(optimisticLogs).filter( + (block) => + block.logs.length > 0 && (lastBlockNumberProcessed == null || block.blockNumber > lastBlockNumberProcessed), + ); + for (const block of blocks) { + await storageAdapter(block); + } + optimisticLogs = blocks.flatMap((block) => block.logs); + return blocks; + } const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe( map(([startBlock, endBlock]) => ({ startBlock, endBlock })), @@ -247,6 +264,10 @@ export async function createStoreSync({ } } }), + concatMap(async (block) => { + await applyOptimisticLogs(); + return block; + }), share(), ); @@ -283,9 +304,18 @@ export async function createStoreSync({ try { const lastBlock = blocks[0]; debug("fetching tx receipt after seeing block", lastBlock.blockNumber); - const { status, blockNumber, transactionHash } = await publicClient.getTransactionReceipt({ hash: tx }); - if (lastBlock.blockNumber >= blockNumber) { - return { status, blockNumber, transactionHash }; + const receipt = await publicClient.getTransactionReceipt({ hash: tx }); + if (receipt.status === "success") { + const logs = parseEventLogs({ abi: storeEventsAbi, logs: receipt.logs }); + optimisticLogs = [...optimisticLogs, ...logs]; + await applyOptimisticLogs(); + } + if (lastBlock.blockNumber >= receipt.blockNumber) { + return { + status: receipt.status, + blockNumber: receipt.blockNumber, + transactionHash: receipt.transactionHash, + }; } } catch (e) { const error = e as GetTransactionReceiptErrorType; From a1d88b8b3a0fd5c1b010079b1e18ff8df190c4bb Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 20 Sep 2024 21:23:15 +0100 Subject: [PATCH 07/15] fix up transactionQueue types --- packages/common/src/actions/transactionQueue.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/common/src/actions/transactionQueue.ts b/packages/common/src/actions/transactionQueue.ts index d6fe03353b..9c44847e23 100644 --- a/packages/common/src/actions/transactionQueue.ts +++ b/packages/common/src/actions/transactionQueue.ts @@ -19,10 +19,10 @@ export type TransactionQueueOptions = { queueConcurrency?: number; }; -export function transactionQueue( +export function transactionQueue( opts: TransactionQueueOptions = {}, -): ( - client: Client, +): ( + client: Client, ) => Pick, "writeContract" | "sendTransaction"> { return (client) => ({ // Applies to: `client.writeContract`, `getContract(client, ...).write` From f9352b9cf6811773240533e77fe4ec0bb138667a Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 20 Sep 2024 21:37:39 +0100 Subject: [PATCH 08/15] remove config requirement --- packages/store-sync/src/common.ts | 5 ----- .../postgres-decoded/createStorageAdapter.ts | 7 ++----- .../src/postgres-decoded/syncToPostgres.ts | 17 +++++------------ .../src/postgres/createStorageAdapter.ts | 4 +--- .../store-sync/src/postgres/syncToPostgres.ts | 11 ++++------- packages/store-sync/src/recs/syncToRecs.ts | 9 ++++----- packages/store-sync/src/sqlite/sqliteStorage.ts | 4 +--- packages/store-sync/src/sqlite/syncToSqlite.ts | 11 ++++------- .../store-sync/src/zustand/syncToZustand.ts | 5 +---- 9 files changed, 22 insertions(+), 51 deletions(-) diff --git a/packages/store-sync/src/common.ts b/packages/store-sync/src/common.ts index 91ff8b03fc..02f9ee1fc1 100644 --- a/packages/store-sync/src/common.ts +++ b/packages/store-sync/src/common.ts @@ -11,7 +11,6 @@ import { } from "@latticexyz/protocol-parser/internal"; import storeConfig from "@latticexyz/store/mud.config"; import worldConfig from "@latticexyz/world/mud.config"; -import { Store as StoreConfig } from "@latticexyz/store"; import { Table as ConfigTable, Schema } from "@latticexyz/config"; import { configToTables } from "./configToTables"; @@ -67,10 +66,6 @@ export type SyncFilter = { }; export type SyncOptions = { - /** - * MUD config - */ - config?: StoreConfig; /** * [viem `PublicClient`][0] used for fetching logs from the RPC. * diff --git a/packages/store-sync/src/postgres-decoded/createStorageAdapter.ts b/packages/store-sync/src/postgres-decoded/createStorageAdapter.ts index 4f40357dcd..6f60f6d971 100644 --- a/packages/store-sync/src/postgres-decoded/createStorageAdapter.ts +++ b/packages/store-sync/src/postgres-decoded/createStorageAdapter.ts @@ -2,7 +2,6 @@ import { Hex, PublicClient, concatHex, getAddress } from "viem"; import { PgDatabase, QueryResultHKT } from "drizzle-orm/pg-core"; import { and, eq } from "drizzle-orm"; import { buildTable } from "./buildTable"; -import { Store as StoreConfig } from "@latticexyz/store"; import { debug } from "./debug"; import { StorageAdapter, StorageAdapterBlock } from "../common"; import { isTableRegistrationLog } from "../isTableRegistrationLog"; @@ -22,16 +21,14 @@ export type PostgresStorageAdapter = { cleanUp: () => Promise; }; -export async function createStorageAdapter({ +export async function createStorageAdapter({ database, publicClient, - config, }: { database: PgDatabase; publicClient: PublicClient; - config?: config; }): Promise { - const bytesStorageAdapter = await createBytesStorageAdapter({ database, publicClient, config }); + const bytesStorageAdapter = await createBytesStorageAdapter({ database, publicClient }); const cleanUp: (() => Promise)[] = []; async function postgresStorageAdapter({ blockNumber, logs }: StorageAdapterBlock): Promise { diff --git a/packages/store-sync/src/postgres-decoded/syncToPostgres.ts b/packages/store-sync/src/postgres-decoded/syncToPostgres.ts index 6f42469ed9..da97f864b4 100644 --- a/packages/store-sync/src/postgres-decoded/syncToPostgres.ts +++ b/packages/store-sync/src/postgres-decoded/syncToPostgres.ts @@ -1,10 +1,9 @@ -import { Store as StoreConfig } from "@latticexyz/store"; import { PgDatabase } from "drizzle-orm/pg-core"; import { SyncOptions, SyncResult } from "../common"; import { createStorageAdapter } from "./createStorageAdapter"; import { createStoreSync } from "../createStoreSync"; -export type SyncToPostgresOptions = SyncOptions & { +export type SyncToPostgresOptions = SyncOptions & { /** * [Postgres database object from Drizzle][0]. * @@ -25,20 +24,14 @@ export type SyncToPostgresResult = SyncResult & { * @param {CreateIndexerOptions} options See `CreateIndexerOptions`. * @returns A function to unsubscribe from the block stream, effectively stopping the indexer. */ -export async function syncToPostgres({ - config, +export async function syncToPostgres({ database, publicClient, startSync = true, ...syncOptions -}: SyncToPostgresOptions): Promise { - const { storageAdapter } = await createStorageAdapter({ database, publicClient, config }); - const storeSync = await createStoreSync({ - storageAdapter, - config, - publicClient, - ...syncOptions, - }); +}: SyncToPostgresOptions): Promise { + const { storageAdapter } = await createStorageAdapter({ database, publicClient }); + const storeSync = await createStoreSync({ storageAdapter, publicClient, ...syncOptions }); const sub = startSync ? storeSync.storedBlockLogs$.subscribe() : null; const stopSync = (): void => { diff --git a/packages/store-sync/src/postgres/createStorageAdapter.ts b/packages/store-sync/src/postgres/createStorageAdapter.ts index c44a4b39b9..b578be9a3e 100644 --- a/packages/store-sync/src/postgres/createStorageAdapter.ts +++ b/packages/store-sync/src/postgres/createStorageAdapter.ts @@ -1,7 +1,6 @@ import { PublicClient, encodePacked, size } from "viem"; import { PgDatabase, QueryResultHKT } from "drizzle-orm/pg-core"; import { and, eq } from "drizzle-orm"; -import { Store as StoreConfig } from "@latticexyz/store"; import { debug } from "./debug"; import { tables } from "./tables"; import { spliceHex } from "@latticexyz/common"; @@ -17,13 +16,12 @@ export type PostgresStorageAdapter = { cleanUp: () => Promise; }; -export async function createStorageAdapter({ +export async function createStorageAdapter({ database, publicClient, }: { database: PgDatabase; publicClient: PublicClient; - config?: config; }): Promise { const cleanUp: (() => Promise)[] = []; diff --git a/packages/store-sync/src/postgres/syncToPostgres.ts b/packages/store-sync/src/postgres/syncToPostgres.ts index 6f42469ed9..074979c5c4 100644 --- a/packages/store-sync/src/postgres/syncToPostgres.ts +++ b/packages/store-sync/src/postgres/syncToPostgres.ts @@ -1,10 +1,9 @@ -import { Store as StoreConfig } from "@latticexyz/store"; import { PgDatabase } from "drizzle-orm/pg-core"; import { SyncOptions, SyncResult } from "../common"; import { createStorageAdapter } from "./createStorageAdapter"; import { createStoreSync } from "../createStoreSync"; -export type SyncToPostgresOptions = SyncOptions & { +export type SyncToPostgresOptions = SyncOptions & { /** * [Postgres database object from Drizzle][0]. * @@ -25,17 +24,15 @@ export type SyncToPostgresResult = SyncResult & { * @param {CreateIndexerOptions} options See `CreateIndexerOptions`. * @returns A function to unsubscribe from the block stream, effectively stopping the indexer. */ -export async function syncToPostgres({ - config, +export async function syncToPostgres({ database, publicClient, startSync = true, ...syncOptions -}: SyncToPostgresOptions): Promise { - const { storageAdapter } = await createStorageAdapter({ database, publicClient, config }); +}: SyncToPostgresOptions): Promise { + const { storageAdapter } = await createStorageAdapter({ database, publicClient }); const storeSync = await createStoreSync({ storageAdapter, - config, publicClient, ...syncOptions, }); diff --git a/packages/store-sync/src/recs/syncToRecs.ts b/packages/store-sync/src/recs/syncToRecs.ts index 4bf73937f6..2adb70b042 100644 --- a/packages/store-sync/src/recs/syncToRecs.ts +++ b/packages/store-sync/src/recs/syncToRecs.ts @@ -9,10 +9,10 @@ import { SyncStep } from "../SyncStep"; import { configToTables } from "../configToTables"; import { merge } from "@ark/util"; -export type SyncToRecsOptions = Omit< - SyncOptions, - "config" -> & { +export type SyncToRecsOptions< + config extends StoreConfig = StoreConfig, + extraTables extends Tables = Tables, +> = SyncOptions & { world: RecsWorld; config: config; tables?: extraTables; @@ -46,7 +46,6 @@ export async function syncToRecs { // already live, no need for more progress updates diff --git a/packages/store-sync/src/sqlite/sqliteStorage.ts b/packages/store-sync/src/sqlite/sqliteStorage.ts index cb77704c3e..1f623ce8e3 100644 --- a/packages/store-sync/src/sqlite/sqliteStorage.ts +++ b/packages/store-sync/src/sqlite/sqliteStorage.ts @@ -3,7 +3,6 @@ import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; import { and, eq, sql } from "drizzle-orm"; import { sqliteTableToSql } from "./sqliteTableToSql"; import { buildTable } from "./buildTable"; -import { Store as StoreConfig } from "@latticexyz/store"; import { debug } from "./debug"; import { getTableName } from "./getTableName"; import { chainState, mudStoreTables } from "./internalTables"; @@ -17,13 +16,12 @@ import { KeySchema, decodeKey, decodeValueArgs } from "@latticexyz/protocol-pars // TODO: upgrade drizzle and use async sqlite interface for consistency -export async function sqliteStorage({ +export async function sqliteStorage({ database, publicClient, }: { database: BaseSQLiteDatabase<"sync", void>; publicClient: PublicClient; - config?: config; }): Promise { const chainId = publicClient.chain?.id ?? (await publicClient.getChainId()); diff --git a/packages/store-sync/src/sqlite/syncToSqlite.ts b/packages/store-sync/src/sqlite/syncToSqlite.ts index 406e5eb98c..51fa391daa 100644 --- a/packages/store-sync/src/sqlite/syncToSqlite.ts +++ b/packages/store-sync/src/sqlite/syncToSqlite.ts @@ -1,10 +1,9 @@ -import { Store as StoreConfig } from "@latticexyz/store"; import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; import { SyncOptions, SyncResult } from "../common"; import { sqliteStorage } from "./sqliteStorage"; import { createStoreSync } from "../createStoreSync"; -export type SyncToSqliteOptions = SyncOptions & { +export type SyncToSqliteOptions = SyncOptions & { /** * [SQLite database object from Drizzle][0]. * @@ -25,16 +24,14 @@ export type SyncToSqliteResult = SyncResult & { * @param {SyncToSqliteOptions} options See `SyncToSqliteOptions`. * @returns A function to unsubscribe from the block stream, effectively stopping the indexer. */ -export async function syncToSqlite({ - config, +export async function syncToSqlite({ database, publicClient, startSync = true, ...syncOptions -}: SyncToSqliteOptions): Promise { +}: SyncToSqliteOptions): Promise { const storeSync = await createStoreSync({ - storageAdapter: await sqliteStorage({ database, publicClient, config }), - config, + storageAdapter: await sqliteStorage({ database, publicClient }), publicClient, ...syncOptions, }); diff --git a/packages/store-sync/src/zustand/syncToZustand.ts b/packages/store-sync/src/zustand/syncToZustand.ts index a814b45c04..65f2e2c024 100644 --- a/packages/store-sync/src/zustand/syncToZustand.ts +++ b/packages/store-sync/src/zustand/syncToZustand.ts @@ -10,10 +10,7 @@ import { Tables } from "@latticexyz/config"; import { merge } from "@ark/util"; import { configToTables } from "../configToTables"; -export type SyncToZustandOptions = Omit< - SyncOptions, - "address" | "config" -> & { +export type SyncToZustandOptions = SyncOptions & { // require address for now to keep the data model + retrieval simpler address: Address; config: config; From cbc15b847da2768edd94c7a7f8c9022e94b2d301 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 20 Sep 2024 22:03:49 +0100 Subject: [PATCH 09/15] more logs --- packages/store-sync/src/createStoreSync.ts | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index 3c175919c5..e6847f7a03 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -205,11 +205,13 @@ export async function createStoreSync({ // such a way that reapplying the same logs, even if the order changes, will mean that the storage adapter // is kept up to date. async function applyOptimisticLogs(): Promise { + // order logs and group by block const blocks = groupLogsByBlockNumber(optimisticLogs).filter( (block) => block.logs.length > 0 && (lastBlockNumberProcessed == null || block.blockNumber > lastBlockNumberProcessed), ); for (const block of blocks) { + debug("applying optimistic logs for block", block.blockNumber); await storageAdapter(block); } optimisticLogs = blocks.flatMap((block) => block.logs); @@ -305,18 +307,20 @@ export async function createStoreSync({ const lastBlock = blocks[0]; debug("fetching tx receipt after seeing block", lastBlock.blockNumber); const receipt = await publicClient.getTransactionReceipt({ hash: tx }); + debug("got receipt", receipt.status); if (receipt.status === "success") { const logs = parseEventLogs({ abi: storeEventsAbi, logs: receipt.logs }); - optimisticLogs = [...optimisticLogs, ...logs]; - await applyOptimisticLogs(); - } - if (lastBlock.blockNumber >= receipt.blockNumber) { - return { - status: receipt.status, - blockNumber: receipt.blockNumber, - transactionHash: receipt.transactionHash, - }; + if (logs.length) { + debug("applying", logs.length, "optimistic logs"); + optimisticLogs = [...optimisticLogs, ...logs]; + await applyOptimisticLogs(); + } } + return { + status: receipt.status, + blockNumber: receipt.blockNumber, + transactionHash: receipt.transactionHash, + }; } catch (e) { const error = e as GetTransactionReceiptErrorType; if (error.name === "TransactionReceiptNotFoundError") { From 7f78d1023c7d2f7d6edc9586117589357d3ceaeb Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Sat, 21 Sep 2024 08:59:11 +0100 Subject: [PATCH 10/15] pause stream while applying optimistic logs --- packages/store-sync/src/createStoreSync.ts | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index e6847f7a03..e8251858a2 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -218,8 +218,15 @@ export async function createStoreSync({ return blocks; } + let applyingOptimisticLogs: Promise | undefined; + const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe( map(([startBlock, endBlock]) => ({ startBlock, endBlock })), + concatMap(async (range) => { + // wait for any prior pending optimistic logs, so we don't have data conflicts + await applyingOptimisticLogs; + return range; + }), tap((range) => { startBlock = range.startBlock; endBlock = range.endBlock; @@ -243,6 +250,7 @@ export async function createStoreSync({ tap(({ blockNumber, logs }) => { debug("stored", logs.length, "logs for block", blockNumber); lastBlockNumberProcessed = blockNumber; + applyingOptimisticLogs = applyOptimisticLogs(); if (startBlock != null && endBlock != null) { if (blockNumber < endBlock) { @@ -266,10 +274,6 @@ export async function createStoreSync({ } } }), - concatMap(async (block) => { - await applyOptimisticLogs(); - return block; - }), share(), ); @@ -313,7 +317,8 @@ export async function createStoreSync({ if (logs.length) { debug("applying", logs.length, "optimistic logs"); optimisticLogs = [...optimisticLogs, ...logs]; - await applyOptimisticLogs(); + applyingOptimisticLogs = applyOptimisticLogs(); + await applyingOptimisticLogs; } } return { From 580b8426d8621da2c4301c2634e0ea0a287943d8 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Sun, 22 Sep 2024 13:11:19 +0100 Subject: [PATCH 11/15] wip reworking optimistic logs --- packages/store-sync/src/createStoreSync.ts | 64 +++++++++++++++------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index e8251858a2..be3beeed82 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -27,6 +27,8 @@ import { combineLatest, mergeMap, BehaviorSubject, + switchMap, + ignoreElements, } from "rxjs"; import { debug as parentDebug } from "./debug"; import { SyncStep } from "./SyncStep"; @@ -198,33 +200,53 @@ export async function createStoreSync({ let startBlock: bigint | null = null; let endBlock: bigint | null = null; let lastBlockNumberProcessed: bigint | null = null; - let optimisticLogs: readonly StoreEventsLog[] = []; // For chains that provide guaranteed receipts ahead of block mining, we can apply the logs immediately. // This works because, once the block is mined, the same logs will be applied. Store events are defined in // such a way that reapplying the same logs, even if the order changes, will mean that the storage adapter // is kept up to date. - async function applyOptimisticLogs(): Promise { - // order logs and group by block - const blocks = groupLogsByBlockNumber(optimisticLogs).filter( - (block) => - block.logs.length > 0 && (lastBlockNumberProcessed == null || block.blockNumber > lastBlockNumberProcessed), + + const optimisticLogs$ = new BehaviorSubject([]); + function pushOptimisticLogs(logs: readonly StoreEventsLog[]): void { + optimisticLogs$.next( + [...optimisticLogs$.value, ...logs].filter( + (log) => lastBlockNumberProcessed == null || log.blockNumber > lastBlockNumberProcessed, + ), ); - for (const block of blocks) { - debug("applying optimistic logs for block", block.blockNumber); - await storageAdapter(block); - } - optimisticLogs = blocks.flatMap((block) => block.logs); - return blocks; } - let applyingOptimisticLogs: Promise | undefined; + const isStoringOptimisticLogs$ = optimisticLogs$.pipe( + switchMap((logs) => + concat( + of(true), + of(logs).pipe( + concatMap(async (logs) => { + if (!logs.length) return; + debug("applying", logs.length, "optimistic logs"); + const blocks = groupLogsByBlockNumber( + logs.filter((log) => lastBlockNumberProcessed == null || log.blockNumber > lastBlockNumberProcessed), + ).filter((block) => block.logs.length); + for (const block of blocks) { + debug("applying optimistic logs for block", block.blockNumber); + await storageAdapter(block); + } + }), + ignoreElements(), + ), + of(false), + ), + ), + ); + const isOptimisticIdle$ = isStoringOptimisticLogs$.pipe( + filter((isStoring) => isStoring === false), + shareReplay(1), + ); const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe( map(([startBlock, endBlock]) => ({ startBlock, endBlock })), concatMap(async (range) => { // wait for any prior pending optimistic logs, so we don't have data conflicts - await applyingOptimisticLogs; + await firstValueFrom(isOptimisticIdle$); return range; }), tap((range) => { @@ -250,7 +272,6 @@ export async function createStoreSync({ tap(({ blockNumber, logs }) => { debug("stored", logs.length, "logs for block", blockNumber); lastBlockNumberProcessed = blockNumber; - applyingOptimisticLogs = applyOptimisticLogs(); if (startBlock != null && endBlock != null) { if (blockNumber < endBlock) { @@ -274,6 +295,12 @@ export async function createStoreSync({ } } }), + concatMap(async (block) => { + // reapply optimistic logs + pushOptimisticLogs([]); + await firstValueFrom(isOptimisticIdle$); + return block; + }), share(), ); @@ -316,9 +343,9 @@ export async function createStoreSync({ const logs = parseEventLogs({ abi: storeEventsAbi, logs: receipt.logs }); if (logs.length) { debug("applying", logs.length, "optimistic logs"); - optimisticLogs = [...optimisticLogs, ...logs]; - applyingOptimisticLogs = applyOptimisticLogs(); - await applyingOptimisticLogs; + await firstValueFrom(isOptimisticIdle$); + pushOptimisticLogs(logs); + await firstValueFrom(isOptimisticIdle$); } } return { @@ -334,7 +361,6 @@ export async function createStoreSync({ throw error; } }), - tap((result) => debug("has tx?", tx, result)), ); return await firstValueFrom(hasTransaction$.pipe(filter(isDefined))); From 89e73058f62998d4c0dee74ae09689274f7c05e5 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 23 Sep 2024 11:50:35 +0100 Subject: [PATCH 12/15] rework optimistic logs --- packages/store-sync/src/createStoreSync.ts | 91 ++++++++++------------ 1 file changed, 42 insertions(+), 49 deletions(-) diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index be3beeed82..d5ac926100 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -27,8 +27,10 @@ import { combineLatest, mergeMap, BehaviorSubject, - switchMap, ignoreElements, + last, + first, + defaultIfEmpty, } from "rxjs"; import { debug as parentDebug } from "./debug"; import { SyncStep } from "./SyncStep"; @@ -206,49 +208,24 @@ export async function createStoreSync({ // such a way that reapplying the same logs, even if the order changes, will mean that the storage adapter // is kept up to date. - const optimisticLogs$ = new BehaviorSubject([]); - function pushOptimisticLogs(logs: readonly StoreEventsLog[]): void { - optimisticLogs$.next( - [...optimisticLogs$.value, ...logs].filter( - (log) => lastBlockNumberProcessed == null || log.blockNumber > lastBlockNumberProcessed, - ), - ); + let optimisticLogs: readonly StoreEventsLog[] = []; + async function applyOptimisticLogs(blockNumber: bigint): Promise { + const logs = optimisticLogs.filter((log) => log.blockNumber > blockNumber); + if (logs.length) { + debug("applying", logs.length, "optimistic logs"); + const blocks = groupLogsByBlockNumber(logs).filter((block) => block.logs.length); + for (const block of blocks) { + debug("applying optimistic logs for block", block.blockNumber); + await storageAdapter(block); + } + } + optimisticLogs = logs; } - const isStoringOptimisticLogs$ = optimisticLogs$.pipe( - switchMap((logs) => - concat( - of(true), - of(logs).pipe( - concatMap(async (logs) => { - if (!logs.length) return; - debug("applying", logs.length, "optimistic logs"); - const blocks = groupLogsByBlockNumber( - logs.filter((log) => lastBlockNumberProcessed == null || log.blockNumber > lastBlockNumberProcessed), - ).filter((block) => block.logs.length); - for (const block of blocks) { - debug("applying optimistic logs for block", block.blockNumber); - await storageAdapter(block); - } - }), - ignoreElements(), - ), - of(false), - ), - ), - ); - const isOptimisticIdle$ = isStoringOptimisticLogs$.pipe( - filter((isStoring) => isStoring === false), - shareReplay(1), - ); + const storageAdapterLock$ = new BehaviorSubject(false); const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe( map(([startBlock, endBlock]) => ({ startBlock, endBlock })), - concatMap(async (range) => { - // wait for any prior pending optimistic logs, so we don't have data conflicts - await firstValueFrom(isOptimisticIdle$); - return range; - }), tap((range) => { startBlock = range.startBlock; endBlock = range.endBlock; @@ -266,8 +243,26 @@ export async function createStoreSync({ storageAdapter, logFilter, }); + const storedBlock$ = from(storedBlocks); - return from(storedBlocks); + return concat( + storageAdapterLock$.pipe( + first((lock) => lock === false), + tap(() => storageAdapterLock$.next(true)), + ignoreElements(), + ), + storedBlock$, + storedBlock$.pipe( + defaultIfEmpty(null), + last(), + concatMap(async (block) => { + if (block == null) return; + await applyOptimisticLogs(block.blockNumber); + }), + tap(() => storageAdapterLock$.next(false)), + ignoreElements(), + ), + ); }), tap(({ blockNumber, logs }) => { debug("stored", logs.length, "logs for block", blockNumber); @@ -295,12 +290,6 @@ export async function createStoreSync({ } } }), - concatMap(async (block) => { - // reapply optimistic logs - pushOptimisticLogs([]); - await firstValueFrom(isOptimisticIdle$); - return block; - }), share(), ); @@ -343,9 +332,13 @@ export async function createStoreSync({ const logs = parseEventLogs({ abi: storeEventsAbi, logs: receipt.logs }); if (logs.length) { debug("applying", logs.length, "optimistic logs"); - await firstValueFrom(isOptimisticIdle$); - pushOptimisticLogs(logs); - await firstValueFrom(isOptimisticIdle$); + // wait for lock to clear + await firstValueFrom(storageAdapterLock$.pipe(filter((lock) => lock === false))); + + storageAdapterLock$.next(true); + optimisticLogs = [...optimisticLogs, ...logs]; + await applyOptimisticLogs(lastBlock.blockNumber); + storageAdapterLock$.next(false); } } return { From 2cff36acf6634ff6ef224d79a7478114d5d5b9c2 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 23 Sep 2024 11:51:01 +0100 Subject: [PATCH 13/15] cache waitForTx promises to reduce rpc and logs --- packages/common/src/LruMap.ts | 9 +++++++++ packages/store-sync/src/createStoreSync.ts | 15 +++++++++++---- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/packages/common/src/LruMap.ts b/packages/common/src/LruMap.ts index 7d0d3b5f30..a1661857b1 100644 --- a/packages/common/src/LruMap.ts +++ b/packages/common/src/LruMap.ts @@ -12,6 +12,15 @@ export class LruMap extends Map { this.maxSize = size; } + override get(key: key): value | undefined { + const value = super.get(key); + if (this.has(key)) { + this.delete(key); + this.set(key, value as never); + } + return value; + } + override set(key: key, value: value): this { super.set(key, value); if (this.maxSize && this.size > this.maxSize) { diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index d5ac926100..6fff586545 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -37,6 +37,7 @@ import { SyncStep } from "./SyncStep"; import { bigIntMax, chunk, isDefined, waitForIdle } from "@latticexyz/common/utils"; import { getSnapshot } from "./getSnapshot"; import { fetchAndStoreLogs } from "./fetchAndStoreLogs"; +import { LruMap } from "@latticexyz/common"; const debug = parentDebug.extend("createStoreSync"); @@ -306,12 +307,14 @@ export async function createStoreSync({ ); // TODO: move to its own file so we can test it, have its own debug instance, etc. - async function waitForTransaction(tx: Hex): Promise { - debug("waiting for tx", tx); + const waitPromises = new LruMap>(1024); + function waitForTransaction(tx: Hex): Promise { + const existingPromise = waitPromises.get(tx); + if (existingPromise) return existingPromise; // This currently blocks for async call on each block processed // We could potentially speed this up a tiny bit by racing to see if 1) tx exists in processed block or 2) fetch tx receipt for latest block processed - const hasTransaction$ = recentBlocks$.pipe( + const receipt$ = recentBlocks$.pipe( // We use `mergeMap` instead of `concatMap` here to send the fetch request immediately when a new block range appears, // instead of sending the next request only when the previous one completed. mergeMap(async (blocks) => { @@ -354,9 +357,13 @@ export async function createStoreSync({ throw error; } }), + filter(isDefined), ); - return await firstValueFrom(hasTransaction$.pipe(filter(isDefined))); + debug("waiting for tx", tx); + const promise = firstValueFrom(receipt$); + waitPromises.set(tx, promise); + return promise; } return { From ad4eb366decb65c7f1b9cbb69efd3efc11e240bd Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Mon, 23 Sep 2024 12:48:38 +0100 Subject: [PATCH 14/15] rearrange again --- packages/store-sync/src/createStoreSync.ts | 35 +++++++++++----------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index 6fff586545..a72de4a7db 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -28,9 +28,7 @@ import { mergeMap, BehaviorSubject, ignoreElements, - last, first, - defaultIfEmpty, } from "rxjs"; import { debug as parentDebug } from "./debug"; import { SyncStep } from "./SyncStep"; @@ -244,7 +242,8 @@ export async function createStoreSync({ storageAdapter, logFilter, }); - const storedBlock$ = from(storedBlocks); + + const storedBlock$ = from(storedBlocks).pipe(share()); return concat( storageAdapterLock$.pipe( @@ -252,32 +251,33 @@ export async function createStoreSync({ tap(() => storageAdapterLock$.next(true)), ignoreElements(), ), - storedBlock$, storedBlock$.pipe( - defaultIfEmpty(null), - last(), - concatMap(async (block) => { - if (block == null) return; - await applyOptimisticLogs(block.blockNumber); + tap(({ blockNumber, logs }) => { + debug("stored", logs.length, "logs for block", blockNumber); + lastBlockNumberProcessed = blockNumber; + }), + ), + of(true).pipe( + concatMap(async () => { + if (lastBlockNumberProcessed != null) { + await applyOptimisticLogs(lastBlockNumberProcessed); + } }), tap(() => storageAdapterLock$.next(false)), ignoreElements(), ), ); }), - tap(({ blockNumber, logs }) => { - debug("stored", logs.length, "logs for block", blockNumber); - lastBlockNumberProcessed = blockNumber; - + tap(({ blockNumber }) => { if (startBlock != null && endBlock != null) { if (blockNumber < endBlock) { const totalBlocks = endBlock - startBlock; - const processedBlocks = lastBlockNumberProcessed - startBlock; + const processedBlocks = blockNumber - startBlock; onProgress?.({ step: SyncStep.RPC, percentage: Number((processedBlocks * 1000n) / totalBlocks) / 10, latestBlockNumber: endBlock, - lastBlockNumberProcessed, + lastBlockNumberProcessed: blockNumber, message: "Hydrating from RPC", }); } else { @@ -285,7 +285,7 @@ export async function createStoreSync({ step: SyncStep.LIVE, percentage: 100, latestBlockNumber: endBlock, - lastBlockNumberProcessed, + lastBlockNumberProcessed: blockNumber, message: "All caught up!", }); } @@ -334,10 +334,9 @@ export async function createStoreSync({ if (receipt.status === "success") { const logs = parseEventLogs({ abi: storeEventsAbi, logs: receipt.logs }); if (logs.length) { - debug("applying", logs.length, "optimistic logs"); // wait for lock to clear + // unclear what happens if two waitForTransaction calls are triggered simultaneously and both get released for the same lock emission? await firstValueFrom(storageAdapterLock$.pipe(filter((lock) => lock === false))); - storageAdapterLock$.next(true); optimisticLogs = [...optimisticLogs, ...logs]; await applyOptimisticLogs(lastBlock.blockNumber); From 2c74e64e18a7f77d6e06ffdbe8f5c753f4f65776 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Tue, 24 Sep 2024 01:54:11 -0700 Subject: [PATCH 15/15] Update packages/store-sync/src/createStoreSync.ts Co-authored-by: alvarius --- packages/store-sync/src/createStoreSync.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index a72de4a7db..a781b78428 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -204,7 +204,7 @@ export async function createStoreSync({ // For chains that provide guaranteed receipts ahead of block mining, we can apply the logs immediately. // This works because, once the block is mined, the same logs will be applied. Store events are defined in - // such a way that reapplying the same logs, even if the order changes, will mean that the storage adapter + // such a way that reapplying the same logs will mean that the storage adapter // is kept up to date. let optimisticLogs: readonly StoreEventsLog[] = [];