From 1b5eb0d075579d2437b4329266ca37735e65ce41 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Wed, 29 Nov 2023 11:46:25 +0000 Subject: [PATCH] feat(store-sync,store-indexer): schemaless indexer (#1965) --- .changeset/poor-waves-occur.md | 27 + .changeset/seven-rice-dance.md | 7 + .changeset/wild-moose-smile.md | 9 + .gitignore | 2 + e2e/packages/sync-test/package.json | 2 +- e2e/packages/sync-test/vite.config.ts | 2 +- e2e/pnpm-lock.yaml | 262 +--- packages/common/src/utils/groupBy.test.ts | 32 + packages/common/src/utils/groupBy.ts | 12 + packages/common/src/utils/index.ts | 2 + packages/common/src/utils/unique.test.ts | 15 + packages/common/src/utils/unique.ts | 3 + .../store-indexer/bin/postgres-indexer.ts | 38 +- packages/store-indexer/package.json | 9 +- .../src/postgres/createQueryAdapter.ts | 97 +- .../store-indexer/src/postgres/getLogs.ts | 82 ++ .../src/sqlite/createQueryAdapter.ts | 4 + packages/store-sync/package.json | 4 + packages/store-sync/src/index.ts | 2 + .../store-sync/src/isTableRegistrationLog.ts | 3 + packages/store-sync/src/logToTable.ts | 3 + .../buildColumn.ts | 2 +- .../src/postgres-decoded/buildTable.test.ts | 98 ++ .../buildTable.ts | 16 +- .../src/postgres-decoded/cleanDatabase.ts | 27 + .../createStorageAdapter.test.ts} | 57 +- .../postgres-decoded/createStorageAdapter.ts | 147 ++ .../store-sync/src/postgres-decoded/debug.ts | 3 + .../src/postgres-decoded/getTables.ts | 43 + .../store-sync/src/postgres-decoded/index.ts | 5 + .../src/postgres-decoded/syncToPostgres.ts | 51 + .../src/postgres/buildInternalTables.ts | 32 - .../src/postgres/buildTable.test.ts | 1226 ----------------- .../store-sync/src/postgres/cleanDatabase.ts | 35 +- .../src/postgres/createStorageAdapter.test.ts | 86 ++ .../src/postgres/createStorageAdapter.ts | 222 +++ .../store-sync/src/postgres/getTableKey.ts | 8 - packages/store-sync/src/postgres/getTables.ts | 30 - packages/store-sync/src/postgres/index.ts | 9 +- .../src/postgres/postgresStorage.ts | 294 ---- .../store-sync/src/postgres/schemaVersion.ts | 4 - .../src/postgres/setupTables.test.ts | 24 +- .../store-sync/src/postgres/setupTables.ts | 33 +- .../store-sync/src/postgres/syncToPostgres.ts | 4 +- packages/store-sync/src/postgres/tables.ts | 44 + .../src/postgres/transformSchemaName.ts | 7 +- packages/store-sync/src/tableToLog.ts | 3 + .../store-sync/src/trpc-indexer/common.ts | 10 +- .../src/trpc-indexer/createAppRouter.ts | 22 + packages/store-sync/tsup.config.ts | 1 + pnpm-lock.yaml | 14 - 51 files changed, 1124 insertions(+), 2050 deletions(-) create mode 100644 .changeset/poor-waves-occur.md create mode 100644 .changeset/seven-rice-dance.md create mode 100644 .changeset/wild-moose-smile.md create mode 100644 packages/common/src/utils/groupBy.test.ts create mode 100644 packages/common/src/utils/groupBy.ts create mode 100644 packages/common/src/utils/unique.test.ts create mode 100644 packages/common/src/utils/unique.ts create mode 100644 packages/store-indexer/src/postgres/getLogs.ts rename packages/store-sync/src/{postgres => postgres-decoded}/buildColumn.ts (98%) create mode 100644 packages/store-sync/src/postgres-decoded/buildTable.test.ts rename packages/store-sync/src/{postgres => postgres-decoded}/buildTable.ts (78%) create mode 100644 packages/store-sync/src/postgres-decoded/cleanDatabase.ts rename packages/store-sync/src/{postgres/postgresStorage.test.ts => postgres-decoded/createStorageAdapter.test.ts} (63%) create mode 100644 packages/store-sync/src/postgres-decoded/createStorageAdapter.ts create mode 100644 packages/store-sync/src/postgres-decoded/debug.ts create mode 100644 packages/store-sync/src/postgres-decoded/getTables.ts create mode 100644 packages/store-sync/src/postgres-decoded/index.ts create mode 100644 packages/store-sync/src/postgres-decoded/syncToPostgres.ts delete mode 100644 packages/store-sync/src/postgres/buildInternalTables.ts delete mode 100644 packages/store-sync/src/postgres/buildTable.test.ts create mode 100644 packages/store-sync/src/postgres/createStorageAdapter.test.ts create mode 100644 packages/store-sync/src/postgres/createStorageAdapter.ts delete mode 100644 packages/store-sync/src/postgres/getTableKey.ts delete mode 100644 packages/store-sync/src/postgres/getTables.ts delete mode 100644 packages/store-sync/src/postgres/postgresStorage.ts delete mode 100644 packages/store-sync/src/postgres/schemaVersion.ts create mode 100644 packages/store-sync/src/postgres/tables.ts diff --git a/.changeset/poor-waves-occur.md b/.changeset/poor-waves-occur.md new file mode 100644 index 0000000000..10b5d563f8 --- /dev/null +++ b/.changeset/poor-waves-occur.md @@ -0,0 +1,27 @@ +--- +"@latticexyz/common": minor +--- + +Added `unique` and `groupBy` array helpers to `@latticexyz/common/utils`. + +```ts +import { unique } from "@latticexyz/common/utils"; + +unique([1, 2, 1, 4, 3, 2]); +// [1, 2, 4, 3] +``` + +```ts +import { groupBy } from "@latticexyz/common/utils"; + +const records = [ + { type: "cat", name: "Bob" }, + { type: "cat", name: "Spot" }, + { type: "dog", name: "Rover" }, +]; +Object.fromEntries(groupBy(records, (record) => record.type)); +// { +// "cat": [{ type: "cat", name: "Bob" }, { type: "cat", name: "Spot" }], +// "dog: [{ type: "dog", name: "Rover" }] +// } +``` diff --git a/.changeset/seven-rice-dance.md b/.changeset/seven-rice-dance.md new file mode 100644 index 0000000000..22442dedd2 --- /dev/null +++ b/.changeset/seven-rice-dance.md @@ -0,0 +1,7 @@ +--- +"@latticexyz/store-indexer": minor +--- + +The `findAll` method is now considered deprecated in favor of a new `getLogs` method. This is only implemented in the Postgres indexer for now, with SQLite coming soon. The new `getLogs` method will be an easier and more robust data source to hydrate the client and other indexers and will allow us to add streaming updates from the indexer in the near future. + +For backwards compatibility, `findAll` is now implemented on top of `getLogs`, with record key/value decoding done in memory at request time. This may not scale for large databases, so use wisely. diff --git a/.changeset/wild-moose-smile.md b/.changeset/wild-moose-smile.md new file mode 100644 index 0000000000..a9128187f2 --- /dev/null +++ b/.changeset/wild-moose-smile.md @@ -0,0 +1,9 @@ +--- +"@latticexyz/store-sync": major +--- + +`syncToPostgres` from `@latticexyz/store-sync/postgres` now uses a single table to store all records in their bytes form (`staticData`, `encodedLengths`, and `dynamicData`), more closely mirroring onchain state and enabling more scalability and stability for automatic indexing of many worlds. + +The previous behavior, where schemaful SQL tables are created and populated for each MUD table, has been moved to a separate `@latticexyz/store-sync/postgres-decoded` export bundle. This approach is considered less stable and is intended to be used for analytics purposes rather than hydrating clients. Some previous metadata columns on these tables have been removed in favor of the bytes records table as the source of truth for onchain state. + +This overhaul is considered breaking and we recommend starting a fresh database when syncing with either of these strategies. diff --git a/.gitignore b/.gitignore index 3f3141c070..1750b5a496 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,5 @@ yarn-error.log # We don't want projects created from templates to ignore their lockfiles, but we don't # want to check them in here, so we'll ignore them from the root. templates/*/pnpm-lock.yaml + +.env diff --git a/e2e/packages/sync-test/package.json b/e2e/packages/sync-test/package.json index 1904d6471f..e6c5fdcc27 100644 --- a/e2e/packages/sync-test/package.json +++ b/e2e/packages/sync-test/package.json @@ -21,7 +21,7 @@ "chalk": "^5.2.0", "dotenv": "^16.0.3", "execa": "^7.1.1", - "jsdom": "^22.0.0", + "happy-dom": "^12.10.3", "typescript": "5.1.6", "viem": "1.14.0", "vite": "^4.2.1", diff --git a/e2e/packages/sync-test/vite.config.ts b/e2e/packages/sync-test/vite.config.ts index c6202f1992..9d916684ff 100644 --- a/e2e/packages/sync-test/vite.config.ts +++ b/e2e/packages/sync-test/vite.config.ts @@ -2,7 +2,7 @@ import { defineConfig } from "vitest/config"; export default defineConfig({ test: { - environment: "jsdom", + environment: "happy-dom", testTimeout: 1000 * 60 * 2, hookTimeout: 1000 * 60 * 2, singleThread: true, diff --git a/e2e/pnpm-lock.yaml b/e2e/pnpm-lock.yaml index 1711ffe418..27aa485a1f 100644 --- a/e2e/pnpm-lock.yaml +++ b/e2e/pnpm-lock.yaml @@ -83,7 +83,7 @@ importers: version: 4.3.5(@types/node@20.1.3) vitest: specifier: 0.31.4 - version: 0.31.4(jsdom@22.0.0) + version: 0.31.4(happy-dom@12.10.3) packages/contracts: devDependencies: @@ -125,7 +125,7 @@ importers: version: 4.3.5(@types/node@20.1.3) vitest: specifier: 0.31.4 - version: 0.31.4(jsdom@22.0.0) + version: 0.31.4(happy-dom@12.10.3) packages/sync-test: devDependencies: @@ -168,9 +168,9 @@ importers: execa: specifier: ^7.1.1 version: 7.1.1 - jsdom: - specifier: ^22.0.0 - version: 22.0.0 + happy-dom: + specifier: ^12.10.3 + version: 12.10.3 typescript: specifier: 5.1.6 version: 5.1.6 @@ -182,7 +182,7 @@ importers: version: 4.3.5(@types/node@20.1.3) vitest: specifier: ^0.31.0 - version: 0.31.4(jsdom@22.0.0) + version: 0.31.4(happy-dom@12.10.3) zod: specifier: ^3.22.2 version: 3.22.2 @@ -754,11 +754,6 @@ packages: '@noble/hashes': 1.3.2 '@scure/base': 1.1.1 - /@tootallnate/once@2.0.0: - resolution: {integrity: sha512-XCuKFP5PS55gnMVu3dty8KPatLqUoy/ZYzDzAGCQ8JNFCkLXzmI7vNHCR+XpbZaMWQK/vQubr7PkYq8g470J/A==} - engines: {node: '>= 10'} - dev: true - /@types/chai-subset@1.3.3: resolution: {integrity: sha512-frBecisrNGz+F4T6bcc+NLeolfiojh5FxW2klu669+8BARtyQv2C/GkNW6FUodVe4BroGMP/wER/YDGc7rEllw==} dependencies: @@ -833,10 +828,6 @@ packages: pretty-format: 27.5.1 dev: true - /abab@2.0.6: - resolution: {integrity: sha512-j2afSsaIENvHZN2B8GOpF566vZ5WVk5opAiMTvWgaQT8DkbOqsTfvNAvHoRGU2zzP8cPoqys+xHTRDWW8L+/BA==} - dev: true - /abitype@0.9.8(typescript@5.1.6)(zod@3.22.2): resolution: {integrity: sha512-puLifILdm+8sjyss4S+fsUN09obiT1g2YW6CtcQF+QDzxR0euzgEB29MZujC6zMk2a6SVmtttq1fc6+YFA7WYQ==} peerDependencies: @@ -866,15 +857,6 @@ packages: hasBin: true dev: true - /agent-base@6.0.2: - resolution: {integrity: sha512-RZNwNclF7+MS/8bDg70amg32dyeZGZxiDuQmZxKLAlQjr3jGyLx+4Kkk58UO7D2QdgFIQCovuSuZESne6RG6XQ==} - engines: {node: '>= 6.0.0'} - dependencies: - debug: 4.3.4 - transitivePeerDependencies: - - supports-color - dev: true - /ansi-regex@5.0.1: resolution: {integrity: sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ==} engines: {node: '>=8'} @@ -901,10 +883,6 @@ packages: tslib: 2.5.0 dev: false - /asynckit@0.4.0: - resolution: {integrity: sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==} - dev: true - /balanced-match@1.0.2: resolution: {integrity: sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==} dev: true @@ -980,13 +958,6 @@ packages: resolution: {integrity: sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==} dev: false - /combined-stream@1.0.8: - resolution: {integrity: sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==} - engines: {node: '>= 0.8'} - dependencies: - delayed-stream: 1.0.0 - dev: true - /concat-map@0.0.1: resolution: {integrity: sha512-/Srv4dswyQNBfohGpz9o6Yb3Gz3SrUDqBH5rTuhGR7ahtlbYKnVxw2bCFMRljaA7EXHaXZ8wsHdodFvbkhKmqg==} dev: true @@ -1014,20 +985,8 @@ packages: which: 2.0.2 dev: true - /cssstyle@3.0.0: - resolution: {integrity: sha512-N4u2ABATi3Qplzf0hWbVCdjenim8F3ojEXpBDF5hBpjzW182MjNGLqfmQ0SkSPeQ+V86ZXgeH8aXj6kayd4jgg==} - engines: {node: '>=14'} - dependencies: - rrweb-cssom: 0.6.0 - dev: true - - /data-urls@4.0.0: - resolution: {integrity: sha512-/mMTei/JXPqvFqQtfyTowxmJVwr2PVAeCcDxyFf6LhoOu/09TX2OX3kb2wzi4DMXcfj4OItwDOnhl5oziPnT6g==} - engines: {node: '>=14'} - dependencies: - abab: 2.0.6 - whatwg-mimetype: 3.0.0 - whatwg-url: 12.0.1 + /css.escape@1.5.1: + resolution: {integrity: sha512-YUifsXXuknHlUsmlgyY0PKzgPOr7/FjCePfHNt0jxm83wHZi44VDMQ7/fGNkjY3/jV1MC+1CmZbaHzugyeRtpg==} dev: true /date-time@3.1.0: @@ -1048,10 +1007,6 @@ packages: dependencies: ms: 2.1.2 - /decimal.js@10.4.3: - resolution: {integrity: sha512-VBBaLc1MgL5XpzgIP7ny5Z6Nx3UrRkIViUkPUdtl9aya5amy3De1gsUUSB1g3+3sExYNjCAsAznmukyxCb1GRA==} - dev: true - /deep-eql@4.1.3: resolution: {integrity: sha512-WaEtAOpRA1MQ0eohqZjpGD8zdI0Ovsm8mmFhaDN8dvDZzyoUMcYDnf5Y6iu7HTXxf8JDS23qWa4a+hKCDyOPzw==} engines: {node: '>=6'} @@ -1059,18 +1014,6 @@ packages: type-detect: 4.0.8 dev: true - /delayed-stream@1.0.0: - resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==} - engines: {node: '>=0.4.0'} - dev: true - - /domexception@4.0.0: - resolution: {integrity: sha512-A2is4PLG+eeSfoTMA95/s4pvAoSo2mKtiM5jlHkAVewmiO8ISFTFKZjH7UAM1Atli/OT/7JHOrJRJiMKUZKYBw==} - engines: {node: '>=12'} - dependencies: - webidl-conversions: 7.0.0 - dev: true - /dotenv@16.0.3: resolution: {integrity: sha512-7GO6HghkA5fYG9TYnNxi14/7K9f5occMlp3zXAuSxn7CKCxt9xbNWG7yF8hTCSUchlfWSe3uLmlPfigevRItzQ==} engines: {node: '>=12'} @@ -1195,15 +1138,6 @@ packages: optional: true dev: true - /form-data@4.0.0: - resolution: {integrity: sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==} - engines: {node: '>= 6'} - dependencies: - asynckit: 0.4.0 - combined-stream: 1.0.8 - mime-types: 2.1.35 - dev: true - /fs.realpath@1.0.0: resolution: {integrity: sha512-OO0pH2lK6a0hZnAdau5ItzHPI6pUlvI7jMVnxUQRtw4owF2wk8lOSabtGDCTP4Ggrg2MbGnWO9X8K1t4+fGMDw==} dev: true @@ -1256,22 +1190,15 @@ packages: resolution: {integrity: sha512-3MSOYFO5U9mPGikIYCzK0SaThypfGgS6bHqrUGXG3DPHCrb+txNqeEcns1W0lkGfk0rCyNXm7xB9rMxnCiZOoA==} dev: false - /html-encoding-sniffer@3.0.0: - resolution: {integrity: sha512-oWv4T4yJ52iKrufjnyZPkrN0CH3QnrUqdB6In1g5Fe1mia8GmF36gnfNySxoZtxD5+NmYw1EElVXiBk93UeskA==} - engines: {node: '>=12'} + /happy-dom@12.10.3: + resolution: {integrity: sha512-JzUXOh0wdNGY54oKng5hliuBkq/+aT1V3YpTM+lrN/GoLQTANZsMaIvmHiHe612rauHvPJnDZkZ+5GZR++1Abg==} dependencies: + css.escape: 1.5.1 + entities: 4.5.0 + iconv-lite: 0.6.3 + webidl-conversions: 7.0.0 whatwg-encoding: 2.0.0 - dev: true - - /http-proxy-agent@5.0.0: - resolution: {integrity: sha512-n2hY8YdoRE1i7r6M0w9DIw5GgZN0G25P8zLCRQ8rjXtTU3vsNFBI/vWK/UIeE6g5MUUz6avwAPXmL6Fy9D/90w==} - engines: {node: '>= 6'} - dependencies: - '@tootallnate/once': 2.0.0 - agent-base: 6.0.2 - debug: 4.3.4 - transitivePeerDependencies: - - supports-color + whatwg-mimetype: 3.0.0 dev: true /http-proxy@1.18.1: @@ -1285,16 +1212,6 @@ packages: - debug dev: true - /https-proxy-agent@5.0.1: - resolution: {integrity: sha512-dFcAjpTQFgoLMzC2VwU+C/CbS7uRL0lWmxDITmqm7C+7F0Odmj6s9l6alZc6AELXhrnggM2CeWSXHGOdX2YtwA==} - engines: {node: '>= 6'} - dependencies: - agent-base: 6.0.2 - debug: 4.3.4 - transitivePeerDependencies: - - supports-color - dev: true - /human-signals@4.3.1: resolution: {integrity: sha512-nZXjEF2nbo7lIw3mgYjItAfgQXog3OjJogSbKa2CQIIvSGWcKgeJnQlNXip6NglNzYH45nSRiEVimMvYL8DDqQ==} engines: {node: '>=14.18.0'} @@ -1328,10 +1245,6 @@ packages: engines: {node: '>=8'} dev: false - /is-potential-custom-element-name@1.0.1: - resolution: {integrity: sha512-bCYeRA2rVibKZd+s2625gGnGF/t7DSqDs4dP7CrLA1m7jKWz6pps0LpYLJN8Q64HtmPKJ1hrN3nzPNKFEKOUiQ==} - dev: true - /is-stream@3.0.0: resolution: {integrity: sha512-LnQR4bZ9IADDRSkvpqMGvt/tEJWclzklNgSw48V5EAaAeDd6qGvN8ei6k5p0tvxSR171VmGyHuTiAOfxAbr8kA==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} @@ -1361,44 +1274,6 @@ packages: resolution: {integrity: sha512-RdJUflcE3cUzKiMqQgsCu06FPu9UdIJO0beYbPhHN4k6apgJtifcoCtT9bcxOpYBtpD2kCM6Sbzg4CausW/PKQ==} dev: false - /jsdom@22.0.0: - resolution: {integrity: sha512-p5ZTEb5h+O+iU02t0GfEjAnkdYPrQSkfuTSMkMYyIoMvUNEHsbG0bHHbfXIcfTqD2UfvjQX7mmgiFsyRwGscVw==} - engines: {node: '>=16'} - peerDependencies: - canvas: ^2.5.0 - peerDependenciesMeta: - canvas: - optional: true - dependencies: - abab: 2.0.6 - cssstyle: 3.0.0 - data-urls: 4.0.0 - decimal.js: 10.4.3 - domexception: 4.0.0 - form-data: 4.0.0 - html-encoding-sniffer: 3.0.0 - http-proxy-agent: 5.0.0 - https-proxy-agent: 5.0.1 - is-potential-custom-element-name: 1.0.1 - nwsapi: 2.2.4 - parse5: 7.1.2 - rrweb-cssom: 0.6.0 - saxes: 6.0.0 - symbol-tree: 3.2.4 - tough-cookie: 4.1.2 - w3c-xmlserializer: 4.0.0 - webidl-conversions: 7.0.0 - whatwg-encoding: 2.0.0 - whatwg-mimetype: 3.0.0 - whatwg-url: 12.0.1 - ws: 8.13.0 - xml-name-validator: 4.0.0 - transitivePeerDependencies: - - bufferutil - - supports-color - - utf-8-validate - dev: true - /jsonc-parser@3.2.0: resolution: {integrity: sha512-gfFQZrcTc8CnKXp6Y4/CBT3fTc0OVuDofpre4aEeEpSBPV5X5v4+Vmx+8snU7RLPrNHPKSgLxGo9YuQzz20o+w==} dev: true @@ -1461,18 +1336,6 @@ packages: resolution: {integrity: sha512-abv/qOcuPfk3URPfDzmZU1LKmuw8kT+0nIHvKrKgFrwifol/doWcdA4ZqsWQ8ENrFKkd67Mfpo/LovbIUsbt3w==} dev: true - /mime-db@1.52.0: - resolution: {integrity: sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==} - engines: {node: '>= 0.6'} - dev: true - - /mime-types@2.1.35: - resolution: {integrity: sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==} - engines: {node: '>= 0.6'} - dependencies: - mime-db: 1.52.0 - dev: true - /mimic-fn@4.0.0: resolution: {integrity: sha512-vqiC06CuhBTUdZH+RYl8sFrL096vA45Ok5ISO6sE/Mr1jRbGH4Csnhi8f3wKVl7x8mO4Au7Ir9D3Oyv1VYMFJw==} engines: {node: '>=12'} @@ -1538,10 +1401,6 @@ packages: path-key: 4.0.0 dev: true - /nwsapi@2.2.4: - resolution: {integrity: sha512-NHj4rzRo0tQdijE9ZqAx6kYDcoRwYwSYzCA8MY3JzfxlrvEU0jhnhJT9BhqhJs7I/dKcrDm6TyulaRqZPIhN5g==} - dev: true - /observable-fns@0.6.1: resolution: {integrity: sha512-9gRK4+sRWzeN6AOewNBTLXir7Zl/i3GB6Yl26gK4flxz8BXVpD3kt8amREmWNb0mxYOGDotvE5a4N+PtGGKdkg==} dev: false @@ -1566,12 +1425,6 @@ packages: yocto-queue: 1.0.0 dev: true - /parse5@7.1.2: - resolution: {integrity: sha512-Czj1WaSVpaoj0wbhMzLmWD69anp2WH7FXMB9n1Sy8/ZFF9jolSQVMu1Ij5WIyGmcBmhk7EOndpO4mIpihVqAXw==} - dependencies: - entities: 4.5.0 - dev: true - /path-is-absolute@1.0.1: resolution: {integrity: sha512-AVbw3UJ2e9bq64vSaS9Am0fje1Pa8pbGqTTsmXfaIiMpnr5DlDhfJOuLj9Sf95ZPVDAUerDfEk88MPmPe7UCQg==} engines: {node: '>=0.10.0'} @@ -1660,19 +1513,6 @@ packages: resolution: {integrity: sha512-kppbvLUNJ4IOMZds9/4gz/rtT5OFiesy3XosLsgMKlF3vb6GA5Y3ptyDlzKLcOcUBW+zaY+RiMINTsgE+O6e+Q==} dev: false - /psl@1.9.0: - resolution: {integrity: sha512-E/ZsdU4HLs/68gYzgGTkMicWTLPdAftJLfJFlLUAAKZGkStNU72sZjT66SnMDVOfOWY/YAoiD7Jxa9iHvngcag==} - dev: true - - /punycode@2.3.0: - resolution: {integrity: sha512-rRV+zQD8tVFys26lAGR9WUuS4iUAngJScM+ZRSKtvl5tKeZ2t5bvdNFdNHBW9FWR4guGHlgmsZ1G7BSm2wTbuA==} - engines: {node: '>=6'} - dev: true - - /querystringify@2.2.0: - resolution: {integrity: sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==} - dev: true - /react-is@17.0.2: resolution: {integrity: sha512-w2GsyukL62IJnlaff/nRegPQR94C/XXamvMWmSHRJ4y7Ts/4ocGRmTHvOs8PSE6pB3dWOrD/nueuU5sduBsQ4w==} dev: true @@ -1712,10 +1552,6 @@ packages: fsevents: 2.3.2 dev: true - /rrweb-cssom@0.6.0: - resolution: {integrity: sha512-APM0Gt1KoXBz0iIkkdB/kfvGOwC4UuJFeG/c+yV7wSc7q96cG/kJ0HiYCnzivD9SB53cLV1MlHFNfOuPaadYSw==} - dev: true - /rxjs@7.5.5: resolution: {integrity: sha512-sy+H0pQofO95VDmFLzyaw9xNJU4KTRSwQIGM6+iG3SypAtCiLDzpeG8sJrNCWn2Up9km+KhkvTdbkrdy+yzZdw==} dependencies: @@ -1726,13 +1562,6 @@ packages: resolution: {integrity: sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==} dev: true - /saxes@6.0.0: - resolution: {integrity: sha512-xAg7SOnEhrm5zI3puOOKyy1OMcMlIJZYNJY7xLBwSze0UjhPLnWfj2GF2EpT0jmzaJKIWKHLsaSSajf35bcYnA==} - engines: {node: '>=v12.22.7'} - dependencies: - xmlchars: 2.2.0 - dev: true - /semver@7.5.0: resolution: {integrity: sha512-+XC0AD/R7Q2mPSRuy2Id0+CGTZ98+8f+KvwirxOKIEyid+XSx6HbC63p+O4IndTHuX5Z+JxQ0TghCkO5Cg/2HA==} engines: {node: '>=10'} @@ -1813,10 +1642,6 @@ packages: acorn: 8.8.2 dev: true - /symbol-tree@3.2.4: - resolution: {integrity: sha512-9QNk5KwDF+Bvz+PyObkmSYjI5ksVUYtjW7AU22r2NKcfLJcXp96hkDWU3+XndOsUb+AQ9QhfzfCT2O+CNWT5Tw==} - dev: true - /threads@1.7.0: resolution: {integrity: sha512-Mx5NBSHX3sQYR6iI9VYbgHKBLisyB+xROCBGjjWm1O9wb9vfLxdaGtmT/KCjUqMsSNW6nERzCW3T6H43LqjDZQ==} dependencies: @@ -1857,23 +1682,6 @@ packages: engines: {node: '>=14.0.0'} dev: true - /tough-cookie@4.1.2: - resolution: {integrity: sha512-G9fqXWoYFZgTc2z8Q5zaHy/vJMjm+WV0AkAeHxVCQiEB1b+dGvWzFW6QV07cY5jQ5gRkeid2qIkzkxUnmoQZUQ==} - engines: {node: '>=6'} - dependencies: - psl: 1.9.0 - punycode: 2.3.0 - universalify: 0.2.0 - url-parse: 1.5.10 - dev: true - - /tr46@4.1.1: - resolution: {integrity: sha512-2lv/66T7e5yNyhAAC4NaKe5nVavzuGJQVVtRYLyQ2OI8tsJ61PMLlelehb0wi2Hx6+hT/OJUWZcw8MjlSRnxvw==} - engines: {node: '>=14'} - dependencies: - punycode: 2.3.0 - dev: true - /ts-error@1.0.6: resolution: {integrity: sha512-tLJxacIQUM82IR7JO1UUkKlYuUTmoY9HBJAmNWFzheSlDS5SPMcNIepejHJa4BpPQLAcbRhRf3GDJzyj6rbKvA==} dev: false @@ -1907,18 +1715,6 @@ packages: resolution: {integrity: sha512-TrY6DsjTQQgyS3E3dBaOXf0TpPD8u9FVrVYmKVegJuFw51n/YB9XPt+U6ydzFG5ZIN7+DIjPbNmXoBj9esYhgQ==} dev: true - /universalify@0.2.0: - resolution: {integrity: sha512-CJ1QgKmNg3CwvAv/kOFmtnEN05f0D/cn9QntgNOQlQF9dgvVTHj3t+8JPdjqawCHk7V/KA+fbUqzZ9XWhcqPUg==} - engines: {node: '>= 4.0.0'} - dev: true - - /url-parse@1.5.10: - resolution: {integrity: sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==} - dependencies: - querystringify: 2.2.0 - requires-port: 1.0.0 - dev: true - /viem@1.14.0(typescript@5.1.6)(zod@3.22.2): resolution: {integrity: sha512-4d+4/H3lnbkSAbrpQ15i1nBA7hne06joLFy3L3m0ZpMc+g+Zr3D4nuSTyeiqbHAYs9m2P9Kjap0HlyGkehasgg==} peerDependencies: @@ -1996,7 +1792,7 @@ packages: fsevents: 2.3.2 dev: true - /vitest@0.31.4(jsdom@22.0.0): + /vitest@0.31.4(happy-dom@12.10.3): resolution: {integrity: sha512-GoV0VQPmWrUFOZSg3RpQAPN+LPmHg2/gxlMNJlyxJihkz6qReHDV6b0pPDcqFLNEPya4tWJ1pgwUNP9MLmUfvQ==} engines: {node: '>=v14.18.0'} hasBin: true @@ -2041,7 +1837,7 @@ packages: chai: 4.3.7 concordance: 5.0.4 debug: 4.3.4 - jsdom: 22.0.0 + happy-dom: 12.10.3 local-pkg: 0.4.3 magic-string: 0.30.0 pathe: 1.1.0 @@ -2062,13 +1858,6 @@ packages: - terser dev: true - /w3c-xmlserializer@4.0.0: - resolution: {integrity: sha512-d+BFHzbiCx6zGfz0HyQ6Rg69w9k19nviJspaj4yNscGjrHu94sVP+aRm75yEbCh+r2/yR+7q6hux9LVtbuTGBw==} - engines: {node: '>=14'} - dependencies: - xml-name-validator: 4.0.0 - dev: true - /webidl-conversions@7.0.0: resolution: {integrity: sha512-VwddBukDzu71offAQR975unBIGqfKZpM+8ZX6ySk8nYhVoo5CYaZyzt3YBvYtRtO+aoGlqxPg/B87NGVZ/fu6g==} engines: {node: '>=12'} @@ -2091,14 +1880,6 @@ packages: engines: {node: '>=12'} dev: true - /whatwg-url@12.0.1: - resolution: {integrity: sha512-Ed/LrqB8EPlGxjS+TrsXcpUond1mhccS3pchLhzSgPCnTimUCKj3IZE75pAs5m6heB2U2TMerKFUXheyHY+VDQ==} - engines: {node: '>=14'} - dependencies: - tr46: 4.1.1 - webidl-conversions: 7.0.0 - dev: true - /which@2.0.2: resolution: {integrity: sha512-BLI3Tl1TW3Pvl70l3yq3Y64i+awpwXqsGBYWkkqMtnbXgrMD+yj7rhW0kuEDxzJaYXGjEW5ogapKNMEKNMjibA==} engines: {node: '>= 8'} @@ -2141,15 +1922,6 @@ packages: utf-8-validate: optional: true - /xml-name-validator@4.0.0: - resolution: {integrity: sha512-ICP2e+jsHvAj2E2lIHxa5tjXRlKDJo4IdvPvCXbXQGdzSfmSpNVyIKMvoZHjDY9DP0zV17iI85o90vRFXNccRw==} - engines: {node: '>=12'} - dev: true - - /xmlchars@2.2.0: - resolution: {integrity: sha512-JZnDKK8B0RCDw84FNdDAIpZK+JuJw+s7Lz8nksI7SIuU3UXJJslUthsi+uWBUYOwPFwW7W7PRLRfUKpxjtjFCw==} - dev: true - /y18n@5.0.8: resolution: {integrity: sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA==} engines: {node: '>=10'} diff --git a/packages/common/src/utils/groupBy.test.ts b/packages/common/src/utils/groupBy.test.ts new file mode 100644 index 0000000000..8145aba261 --- /dev/null +++ b/packages/common/src/utils/groupBy.test.ts @@ -0,0 +1,32 @@ +import { describe, expect, it } from "vitest"; +import { groupBy } from "./groupBy"; + +describe("groupBy", () => { + it("should group values by key", () => { + const records = [ + { type: "cat", name: "Bob" }, + { type: "cat", name: "Spot" }, + { type: "dog", name: "Rover" }, + ]; + expect(groupBy(records, (record) => record.type)).toMatchInlineSnapshot(` + Map { + "cat" => [ + { + "name": "Bob", + "type": "cat", + }, + { + "name": "Spot", + "type": "cat", + }, + ], + "dog" => [ + { + "name": "Rover", + "type": "dog", + }, + ], + } + `); + }); +}); diff --git a/packages/common/src/utils/groupBy.ts b/packages/common/src/utils/groupBy.ts new file mode 100644 index 0000000000..5a81d49e3f --- /dev/null +++ b/packages/common/src/utils/groupBy.ts @@ -0,0 +1,12 @@ +export function groupBy( + values: readonly value[], + getKey: (value: value) => key +): Map { + const map = new Map(); + for (const value of values) { + const key = getKey(value); + if (!map.has(key)) map.set(key, []); + (map.get(key) as value[]).push(value); + } + return map; +} diff --git a/packages/common/src/utils/index.ts b/packages/common/src/utils/index.ts index c11ae82e51..f1d4991554 100644 --- a/packages/common/src/utils/index.ts +++ b/packages/common/src/utils/index.ts @@ -4,11 +4,13 @@ export * from "./bigIntMin"; export * from "./bigIntSort"; export * from "./chunk"; export * from "./curry"; +export * from "./groupBy"; export * from "./identity"; export * from "./isDefined"; export * from "./isNotNull"; export * from "./iteratorToArray"; export * from "./mapObject"; +export * from "./unique"; export * from "./uniqueBy"; export * from "./wait"; export * from "./waitForIdle"; diff --git a/packages/common/src/utils/unique.test.ts b/packages/common/src/utils/unique.test.ts new file mode 100644 index 0000000000..67693e0d14 --- /dev/null +++ b/packages/common/src/utils/unique.test.ts @@ -0,0 +1,15 @@ +import { describe, expect, it } from "vitest"; +import { unique } from "./unique"; + +describe("unique", () => { + it("should return unique values", () => { + expect(unique([1, 2, 1, 4, 3, 2])).toMatchInlineSnapshot(` + [ + 1, + 2, + 4, + 3, + ] + `); + }); +}); diff --git a/packages/common/src/utils/unique.ts b/packages/common/src/utils/unique.ts new file mode 100644 index 0000000000..565cd7b03e --- /dev/null +++ b/packages/common/src/utils/unique.ts @@ -0,0 +1,3 @@ +export function unique(values: readonly value[]): readonly value[] { + return Array.from(new Set(values)); +} diff --git a/packages/store-indexer/bin/postgres-indexer.ts b/packages/store-indexer/bin/postgres-indexer.ts index b6a288d5d1..eac1c1a09b 100644 --- a/packages/store-indexer/bin/postgres-indexer.ts +++ b/packages/store-indexer/bin/postgres-indexer.ts @@ -7,7 +7,7 @@ import { isDefined } from "@latticexyz/common/utils"; import { combineLatest, filter, first } from "rxjs"; import { drizzle } from "drizzle-orm/postgres-js"; import postgres from "postgres"; -import { cleanDatabase, postgresStorage, schemaVersion } from "@latticexyz/store-sync/postgres"; +import { createStorageAdapter } from "@latticexyz/store-sync/postgres"; import { createStoreSync } from "@latticexyz/store-sync"; import { indexerEnvSchema, parseEnv } from "./parseEnv"; @@ -37,37 +37,29 @@ const publicClient = createPublicClient({ const chainId = await publicClient.getChainId(); const database = drizzle(postgres(env.DATABASE_URL)); -const { storageAdapter, internalTables } = await postgresStorage({ database, publicClient }); +const { storageAdapter, tables } = await createStorageAdapter({ database, publicClient }); let startBlock = env.START_BLOCK; // Resume from latest block stored in DB. This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error. +// TODO: query if the DB exists instead of try/catch try { - const currentChainStates = await database + const chainState = await database .select() - .from(internalTables.chain) - .where(eq(internalTables.chain.chainId, chainId)) - .execute(); - // TODO: replace this type workaround with `noUncheckedIndexedAccess: true` when we can fix all the issues related (https://github.com/latticexyz/mud/issues/1212) - const currentChainState: (typeof currentChainStates)[number] | undefined = currentChainStates[0]; + .from(tables.chainTable) + .where(eq(tables.chainTable.chainId, chainId)) + .limit(1) + .execute() + // Get the first record in a way that returns a possible `undefined` + // TODO: move this to `.findFirst` after upgrading drizzle or `rows[0]` after enabling `noUncheckedIndexedAccess: true` + .then((rows) => rows.find(() => true)); - if (currentChainState != null) { - if (currentChainState.schemaVersion != schemaVersion) { - console.log( - "schema version changed from", - currentChainState.schemaVersion, - "to", - schemaVersion, - "cleaning database" - ); - await cleanDatabase(database); - } else if (currentChainState.lastUpdatedBlockNumber != null) { - console.log("resuming from block number", currentChainState.lastUpdatedBlockNumber + 1n); - startBlock = currentChainState.lastUpdatedBlockNumber + 1n; - } + if (chainState?.lastUpdatedBlockNumber != null) { + startBlock = chainState.lastUpdatedBlockNumber + 1n; + console.log("resuming from block number", startBlock); } } catch (error) { - // ignore errors, this is optional + // ignore errors for now } const { latestBlockNumber$, storedBlockLogs$ } = await createStoreSync({ diff --git a/packages/store-indexer/package.json b/packages/store-indexer/package.json index 051bc8fd3a..8eabd6181c 100644 --- a/packages/store-indexer/package.json +++ b/packages/store-indexer/package.json @@ -26,11 +26,11 @@ "dev": "tsup --watch", "lint": "eslint .", "start:postgres": "concurrently -n indexer,frontend -c cyan,magenta 'tsx bin/postgres-indexer' 'tsx bin/postgres-frontend'", - "start:postgres:local": "DEBUG=mud:store-sync:createStoreSync DATABASE_URL=postgres://127.0.0.1/postgres RPC_HTTP_URL=http://127.0.0.1:8545 pnpm start:postgres", - "start:postgres:testnet": "DEBUG=mud:store-sync:createStoreSync DATABASE_URL=postgres://127.0.0.1/postgres RPC_HTTP_URL=https://follower.testnet-chain.linfra.xyz pnpm start:postgres", + "start:postgres:local": "DATABASE_URL=postgres://127.0.0.1/postgres RPC_HTTP_URL=http://127.0.0.1:8545 pnpm start:postgres", + "start:postgres:testnet": "DATABASE_URL=postgres://127.0.0.1/postgres RPC_HTTP_URL=https://rpc.holesky.redstone.xyz pnpm start:postgres", "start:sqlite": "tsx bin/sqlite-indexer", - "start:sqlite:local": "DEBUG=mud:store-sync:createStoreSync SQLITE_FILENAME=anvil.db RPC_HTTP_URL=http://127.0.0.1:8545 pnpm start:sqlite", - "start:sqlite:testnet": "DEBUG=mud:store-sync:createStoreSync SQLITE_FILENAME=testnet.db RPC_HTTP_URL=https://follower.testnet-chain.linfra.xyz pnpm start:sqlite", + "start:sqlite:local": "SQLITE_FILENAME=anvil.db RPC_HTTP_URL=http://127.0.0.1:8545 pnpm start:sqlite", + "start:sqlite:testnet": "SQLITE_FILENAME=testnet.db RPC_HTTP_URL=https://rpc.holesky.redstone.xyz pnpm start:sqlite", "test": "tsc --noEmit --skipLibCheck", "test:ci": "pnpm run test" }, @@ -43,7 +43,6 @@ "@latticexyz/store-sync": "workspace:*", "@trpc/client": "10.34.0", "@trpc/server": "10.34.0", - "@wagmi/chains": "^0.2.22", "better-sqlite3": "^8.6.0", "debug": "^4.3.4", "dotenv": "^16.0.3", diff --git a/packages/store-indexer/src/postgres/createQueryAdapter.ts b/packages/store-indexer/src/postgres/createQueryAdapter.ts index aced9d193b..c38c21ffbb 100644 --- a/packages/store-indexer/src/postgres/createQueryAdapter.ts +++ b/packages/store-indexer/src/postgres/createQueryAdapter.ts @@ -1,10 +1,11 @@ -import { eq, getTableName } from "drizzle-orm"; +import { getAddress } from "viem"; import { PgDatabase } from "drizzle-orm/pg-core"; -import { buildTable, buildInternalTables, getTables } from "@latticexyz/store-sync/postgres"; +import { TableWithRecords, isTableRegistrationLog, logToTable, storeTables } from "@latticexyz/store-sync"; +import { decodeKey, decodeValueArgs } from "@latticexyz/protocol-parser"; import { QueryAdapter } from "@latticexyz/store-sync/trpc-indexer"; import { debug } from "../debug"; -import { getAddress } from "viem"; -import { decodeDynamicField } from "@latticexyz/protocol-parser"; +import { getLogs } from "./getLogs"; +import { groupBy } from "@latticexyz/common/utils"; /** * Creates a query adapter for the tRPC server/client to query data from Postgres. @@ -14,70 +15,40 @@ import { decodeDynamicField } from "@latticexyz/protocol-parser"; */ export async function createQueryAdapter(database: PgDatabase): Promise { const adapter: QueryAdapter = { - async findAll({ chainId, address, filters = [] }) { - // If _any_ filter has a table ID, this will filter down all data to just those tables. Which mean we can't yet mix table filters with key-only filters. - // TODO: improve this so we can express this in the query (need to be able to query data across tables more easily) - const tableIds = Array.from(new Set(filters.map((filter) => filter.tableId))); - const tables = (await getTables(database)) - .filter((table) => address == null || getAddress(address) === getAddress(table.address)) - .filter((table) => !tableIds.length || tableIds.includes(table.tableId)); + async getLogs(opts) { + return getLogs(database, opts); + }, + async findAll(opts) { + const filters = opts.filters ?? []; + const { blockNumber, logs } = await getLogs(database, { + ...opts, + // make sure we're always retrieving `store.Tables` table, so we can decode table values + filters: filters.length > 0 ? [...filters, { tableId: storeTables.Tables.tableId }] : [], + }); - const tablesWithRecords = await Promise.all( - tables.map(async (table) => { - const sqlTable = buildTable(table); - const records = await database - .select() - .from(sqlTable) - .where(eq(sqlTable.__isDeleted, false)) - .execute() - // Apparently sometimes we can have tables that exist in the internal table but no relation/schema set up, so queries fail. - // See https://github.com/latticexyz/mud/issues/1923 - // TODO: make a more robust fix for this - .catch((error) => { - console.error( - "Could not query for records, returning empty set for table", - getTableName(sqlTable), - error - ); - return []; - }); - const filteredRecords = !filters.length - ? records - : records.filter((record) => { - const keyTuple = decodeDynamicField("bytes32[]", record.__key); - return filters.some( - (filter) => - filter.tableId === table.tableId && - (filter.key0 == null || filter.key0 === keyTuple[0]) && - (filter.key1 == null || filter.key1 === keyTuple[1]) - ); - }); - return { - ...table, - records: filteredRecords.map((record) => ({ - key: Object.fromEntries(Object.entries(table.keySchema).map(([name]) => [name, record[name]])), - value: Object.fromEntries(Object.entries(table.valueSchema).map(([name]) => [name, record[name]])), - })), - }; - }) - ); + const tables = logs.filter(isTableRegistrationLog).map(logToTable); - const internalTables = buildInternalTables(); - const metadata = await database - .select() - .from(internalTables.chain) - .where(eq(internalTables.chain.chainId, chainId)) - .execute(); - const { lastUpdatedBlockNumber } = metadata[0] ?? {}; + const logsByTable = groupBy(logs, (log) => `${getAddress(log.address)}:${log.args.tableId}`); - const result = { - blockNumber: lastUpdatedBlockNumber ?? null, - tables: tablesWithRecords, - }; + const tablesWithRecords: TableWithRecords[] = tables.map((table) => { + const tableLogs = logsByTable.get(`${getAddress(table.address)}:${table.tableId}`) ?? []; + const records = tableLogs.map((log) => ({ + key: decodeKey(table.keySchema, log.args.keyTuple), + value: decodeValueArgs(table.valueSchema, log.args), + })); + + return { + ...table, + records, + }; + }); - debug("findAll", chainId, address, result); + debug("findAll: decoded %d logs across %d tables", logs.length, tables.length); - return result; + return { + blockNumber, + tables: tablesWithRecords, + }; }, }; return adapter; diff --git a/packages/store-indexer/src/postgres/getLogs.ts b/packages/store-indexer/src/postgres/getLogs.ts new file mode 100644 index 0000000000..0edd55362a --- /dev/null +++ b/packages/store-indexer/src/postgres/getLogs.ts @@ -0,0 +1,82 @@ +import { PgDatabase } from "drizzle-orm/pg-core"; +import { Hex } from "viem"; +import { StorageAdapterLog, SyncFilter } from "@latticexyz/store-sync"; +import { tables } from "@latticexyz/store-sync/postgres"; +import { and, eq, or } from "drizzle-orm"; +import { decodeDynamicField } from "@latticexyz/protocol-parser"; +import { bigIntMax } from "@latticexyz/common/utils"; + +export async function getLogs( + database: PgDatabase, + { + chainId, + address, + filters = [], + }: { + readonly chainId: number; + readonly address?: Hex; + readonly filters?: readonly SyncFilter[]; + } +): Promise<{ blockNumber: bigint; logs: (StorageAdapterLog & { eventName: "Store_SetRecord" })[] }> { + const conditions = filters.length + ? filters.map((filter) => + and( + address != null ? eq(tables.recordsTable.address, address) : undefined, + eq(tables.recordsTable.tableId, filter.tableId), + filter.key0 != null ? eq(tables.recordsTable.key0, filter.key0) : undefined, + filter.key1 != null ? eq(tables.recordsTable.key1, filter.key1) : undefined + ) + ) + : address != null + ? [eq(tables.recordsTable.address, address)] + : []; + + // Query for the block number that the indexer (i.e. chain) is at, in case the + // indexer is further along in the chain than a given store/table's last updated + // block number. We'll then take the highest block number between the indexer's + // chain state and all the records in the query (in case the records updated + // between these queries). Using just the highest block number from the queries + // could potentially signal to the client an older-than-necessary block number, + // for stores/tables that haven't seen recent activity. + // TODO: move the block number query into the records query for atomicity so we don't have to merge them here + const chainState = await database + .select() + .from(tables.chainTable) + .where(eq(tables.chainTable.chainId, chainId)) + .limit(1) + .execute() + // Get the first record in a way that returns a possible `undefined` + // TODO: move this to `.findFirst` after upgrading drizzle or `rows[0]` after enabling `noUncheckedIndexedAccess: true` + .then((rows) => rows.find(() => true)); + const indexerBlockNumber = chainState?.lastUpdatedBlockNumber ?? 0n; + + const records = await database + .select() + .from(tables.recordsTable) + .where(or(...conditions)); + + const blockNumber = records.reduce( + (max, record) => bigIntMax(max, record.lastUpdatedBlockNumber ?? 0n), + indexerBlockNumber + ); + + const logs = records + // TODO: add this to the query, assuming we can optimize with an index + .filter((record) => !record.isDeleted) + .map( + (record) => + ({ + address: record.address, + eventName: "Store_SetRecord", + args: { + tableId: record.tableId, + keyTuple: decodeDynamicField("bytes32[]", record.keyBytes), + staticData: record.staticData ?? "0x", + encodedLengths: record.encodedLengths ?? "0x", + dynamicData: record.dynamicData ?? "0x", + }, + } as const) + ); + + return { blockNumber, logs }; +} diff --git a/packages/store-indexer/src/sqlite/createQueryAdapter.ts b/packages/store-indexer/src/sqlite/createQueryAdapter.ts index aee529e4f1..8d68d5f7df 100644 --- a/packages/store-indexer/src/sqlite/createQueryAdapter.ts +++ b/packages/store-indexer/src/sqlite/createQueryAdapter.ts @@ -14,6 +14,10 @@ import { decodeDynamicField } from "@latticexyz/protocol-parser"; */ export async function createQueryAdapter(database: BaseSQLiteDatabase<"sync", any>): Promise { const adapter: QueryAdapter = { + async getLogs(opts) { + // TODO + throw new Error("Not implemented"); + }, async findAll({ chainId, address, filters = [] }) { // If _any_ filter has a table ID, this will filter down all data to just those tables. Which mean we can't yet mix table filters with key-only filters. // TODO: improve this so we can express this in the query (need to be able to query data across tables more easily) diff --git a/packages/store-sync/package.json b/packages/store-sync/package.json index abd1c2ea5d..e85e8b6481 100644 --- a/packages/store-sync/package.json +++ b/packages/store-sync/package.json @@ -12,6 +12,7 @@ "exports": { ".": "./dist/index.js", "./postgres": "./dist/postgres/index.js", + "./postgres-decoded": "./dist/postgres-decoded/index.js", "./recs": "./dist/recs/index.js", "./sqlite": "./dist/sqlite/index.js", "./trpc-indexer": "./dist/trpc-indexer/index.js", @@ -25,6 +26,9 @@ "postgres": [ "./src/postgres/index.ts" ], + "postgres-decoded": [ + "./src/postgres-decoded/index.ts" + ], "recs": [ "./src/recs/index.ts" ], diff --git a/packages/store-sync/src/index.ts b/packages/store-sync/src/index.ts index 2e0771488d..deb192e47d 100644 --- a/packages/store-sync/src/index.ts +++ b/packages/store-sync/src/index.ts @@ -1,3 +1,5 @@ export * from "./common"; export * from "./createStoreSync"; export * from "./SyncStep"; +export * from "./isTableRegistrationLog"; +export * from "./logToTable"; diff --git a/packages/store-sync/src/isTableRegistrationLog.ts b/packages/store-sync/src/isTableRegistrationLog.ts index c71ad1f3e4..8f6b6148d6 100644 --- a/packages/store-sync/src/isTableRegistrationLog.ts +++ b/packages/store-sync/src/isTableRegistrationLog.ts @@ -1,5 +1,8 @@ import { StorageAdapterLog, storeTables } from "./common"; +/** + * @internal + */ export function isTableRegistrationLog( log: StorageAdapterLog ): log is StorageAdapterLog & { eventName: "Store_SetRecord" } { diff --git a/packages/store-sync/src/logToTable.ts b/packages/store-sync/src/logToTable.ts index 0572c6f3f0..0d50dce3aa 100644 --- a/packages/store-sync/src/logToTable.ts +++ b/packages/store-sync/src/logToTable.ts @@ -3,6 +3,9 @@ import { Hex, concatHex, decodeAbiParameters, parseAbiParameters } from "viem"; import { StorageAdapterLog, Table, schemasTable } from "./common"; import { hexToResource } from "@latticexyz/common"; +/** + * @internal + */ export function logToTable(log: StorageAdapterLog & { eventName: "Store_SetRecord" }): Table { const [tableId, ...otherKeys] = log.args.keyTuple; if (otherKeys.length) { diff --git a/packages/store-sync/src/postgres/buildColumn.ts b/packages/store-sync/src/postgres-decoded/buildColumn.ts similarity index 98% rename from packages/store-sync/src/postgres/buildColumn.ts rename to packages/store-sync/src/postgres-decoded/buildColumn.ts index d9ffa6db99..71e57bd770 100644 --- a/packages/store-sync/src/postgres/buildColumn.ts +++ b/packages/store-sync/src/postgres-decoded/buildColumn.ts @@ -1,7 +1,7 @@ import { boolean, text } from "drizzle-orm/pg-core"; import { SchemaAbiType } from "@latticexyz/schema-type"; import { assertExhaustive } from "@latticexyz/common/utils"; -import { asAddress, asBigInt, asHex, asJson, asNumber } from "./columnTypes"; +import { asAddress, asBigInt, asHex, asJson, asNumber } from "../postgres/columnTypes"; // eslint-disable-next-line @typescript-eslint/explicit-function-return-type export function buildColumn(name: string, schemaAbiType: SchemaAbiType) { diff --git a/packages/store-sync/src/postgres-decoded/buildTable.test.ts b/packages/store-sync/src/postgres-decoded/buildTable.test.ts new file mode 100644 index 0000000000..634cf6bbbe --- /dev/null +++ b/packages/store-sync/src/postgres-decoded/buildTable.test.ts @@ -0,0 +1,98 @@ +import { describe, it, expect } from "vitest"; +import { buildTable } from "./buildTable"; +import { getTableColumns } from "drizzle-orm"; +import { getTableConfig } from "drizzle-orm/pg-core"; +import { mapObject } from "@latticexyz/common/utils"; + +describe("buildTable", () => { + it("should create table from schema", async () => { + const table = buildTable({ + address: "0xffffffffffffffffffffffffffffffffffffffff", + namespace: "test", + name: "users", + keySchema: { x: "uint32", y: "uint32" }, + valueSchema: { name: "string", addr: "address" }, + }); + + expect(getTableConfig(table).schema).toMatch(/^test_\d+__0xFFfFfFffFFfffFFfFFfFFFFFffFFFffffFfFFFfF__test$/); + expect(getTableConfig(table).name).toMatchInlineSnapshot('"users"'); + expect( + mapObject(getTableColumns(table), (column) => ({ + name: column.name, + dataType: column.dataType, + sqlName: column.sqlName, + })) + ).toMatchInlineSnapshot(` + { + "__keyBytes": { + "dataType": "custom", + "name": "__key_bytes", + "sqlName": "bytea", + }, + "__lastUpdatedBlockNumber": { + "dataType": "custom", + "name": "__last_updated_block_number", + "sqlName": "numeric", + }, + "addr": { + "dataType": "custom", + "name": "addr", + "sqlName": "bytea", + }, + "name": { + "dataType": "string", + "name": "name", + "sqlName": undefined, + }, + "x": { + "dataType": "custom", + "name": "x", + "sqlName": "integer", + }, + "y": { + "dataType": "custom", + "name": "y", + "sqlName": "integer", + }, + } + `); + }); + + it("can create a singleton table", async () => { + const table = buildTable({ + address: "0xffffffffffffffffffffffffffffffffffffffff", + namespace: "test", + name: "users", + keySchema: {}, + valueSchema: { addrs: "address[]" }, + }); + + expect(getTableConfig(table).schema).toMatch(/^test_\d+__0xFFfFfFffFFfffFFfFFfFFFFFffFFFffffFfFFFfF__test$/); + expect(getTableConfig(table).name).toMatchInlineSnapshot('"users"'); + expect( + mapObject(getTableColumns(table), (column) => ({ + name: column.name, + dataType: column.dataType, + sqlName: column.sqlName, + })) + ).toMatchInlineSnapshot(` + { + "__keyBytes": { + "dataType": "custom", + "name": "__key_bytes", + "sqlName": "bytea", + }, + "__lastUpdatedBlockNumber": { + "dataType": "custom", + "name": "__last_updated_block_number", + "sqlName": "numeric", + }, + "addrs": { + "dataType": "custom", + "name": "addrs", + "sqlName": "text", + }, + } + `); + }); +}); diff --git a/packages/store-sync/src/postgres/buildTable.ts b/packages/store-sync/src/postgres-decoded/buildTable.ts similarity index 78% rename from packages/store-sync/src/postgres/buildTable.ts rename to packages/store-sync/src/postgres-decoded/buildTable.ts index 2fb7feb499..131f5c3d58 100644 --- a/packages/store-sync/src/postgres/buildTable.ts +++ b/packages/store-sync/src/postgres-decoded/buildTable.ts @@ -1,18 +1,13 @@ import { PgColumnBuilderBase, PgTableWithColumns, pgSchema } from "drizzle-orm/pg-core"; -import { buildColumn } from "./buildColumn"; import { Address, getAddress } from "viem"; -import { transformSchemaName } from "./transformSchemaName"; import { KeySchema, ValueSchema } from "@latticexyz/protocol-parser"; +import { asBigInt, asHex } from "../postgres/columnTypes"; +import { transformSchemaName } from "../postgres/transformSchemaName"; +import { buildColumn } from "./buildColumn"; -// TODO: convert camel case to snake case for DB storage? export const metaColumns = { - __key: buildColumn("__key", "bytes").primaryKey(), - __staticData: buildColumn("__staticData", "bytes"), - __encodedLengths: buildColumn("__encodedLengths", "bytes"), - __dynamicData: buildColumn("__dynamicData", "bytes"), - __lastUpdatedBlockNumber: buildColumn("__lastUpdatedBlockNumber", "uint256").notNull(), - // TODO: last updated block hash? - __isDeleted: buildColumn("__isDeleted", "bool").notNull(), + __keyBytes: asHex("__key_bytes").primaryKey(), + __lastUpdatedBlockNumber: asBigInt("__last_updated_block_number", "numeric"), } as const satisfies Record; type PgTableFromSchema = PgTableWithColumns<{ @@ -62,7 +57,6 @@ export function buildTable): Promise { + const sqlTables = (await getTables(db)).map(buildTable); + + const schemaNames = unique(sqlTables.map((sqlTable) => getTableConfig(sqlTable).schema).filter(isDefined)); + + for (const schemaName of schemaNames) { + try { + debug(`dropping namespace ${schemaName} and all of its tables`); + await db.execute(sql.raw(pgDialect.schema.dropSchema(schemaName).ifExists().cascade().compile().sql)); + } catch (error) { + debug(`failed to drop namespace ${schemaName}`, error); + } + } + + await cleanBytesDatabase(db); +} diff --git a/packages/store-sync/src/postgres/postgresStorage.test.ts b/packages/store-sync/src/postgres-decoded/createStorageAdapter.test.ts similarity index 63% rename from packages/store-sync/src/postgres/postgresStorage.test.ts rename to packages/store-sync/src/postgres-decoded/createStorageAdapter.test.ts index 846fcf3f65..a0e67060aa 100644 --- a/packages/store-sync/src/postgres/postgresStorage.test.ts +++ b/packages/store-sync/src/postgres-decoded/createStorageAdapter.test.ts @@ -1,17 +1,17 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { beforeEach, describe, expect, it } from "vitest"; import { DefaultLogger, eq } from "drizzle-orm"; import { drizzle } from "drizzle-orm/postgres-js"; import postgres from "postgres"; import { Hex, RpcLog, createPublicClient, decodeEventLog, formatLog, http } from "viem"; import { foundry } from "viem/chains"; -import * as transformSchemaNameExports from "./transformSchemaName"; import { getTables } from "./getTables"; -import { PostgresStorageAdapter, postgresStorage } from "./postgresStorage"; +import { PostgresStorageAdapter, createStorageAdapter } from "./createStorageAdapter"; import { buildTable } from "./buildTable"; import { groupLogsByBlockNumber } from "@latticexyz/block-logs-stream"; import { storeEventsAbi } from "@latticexyz/store"; import { StoreEventsLog } from "../common"; import worldRpcLogs from "../../../../test-data/world-logs.json"; +import { resourceToHex } from "@latticexyz/common"; const blocks = groupLogsByBlockNumber( worldRpcLogs.map((log) => { @@ -25,11 +25,7 @@ const blocks = groupLogsByBlockNumber( }) ); -vi.spyOn(transformSchemaNameExports, "transformSchemaName").mockImplementation( - (schemaName) => `${process.pid}_${process.env.VITEST_POOL_ID}__${schemaName}` -); - -describe("postgresStorage", async () => { +describe("createStorageAdapter", async () => { const db = drizzle(postgres(process.env.DATABASE_URL!), { logger: new DefaultLogger(), }); @@ -42,7 +38,7 @@ describe("postgresStorage", async () => { let storageAdapter: PostgresStorageAdapter; beforeEach(async () => { - storageAdapter = await postgresStorage({ database: db, publicClient }); + storageAdapter = await createStorageAdapter({ database: db, publicClient }); return storageAdapter.cleanUp; }); @@ -51,13 +47,11 @@ describe("postgresStorage", async () => { await storageAdapter.storageAdapter(block); } - expect(await db.select().from(storageAdapter.internalTables.chain)).toMatchInlineSnapshot(` + expect(await db.select().from(storageAdapter.tables.chainTable)).toMatchInlineSnapshot(` [ { "chainId": 31337, - "lastError": null, "lastUpdatedBlockNumber": 12n, - "schemaVersion": 1, }, ] `); @@ -65,23 +59,26 @@ describe("postgresStorage", async () => { expect( await db .select() - .from(storageAdapter.internalTables.tables) - .where(eq(storageAdapter.internalTables.tables.name, "NumberList")) + .from(storageAdapter.tables.recordsTable) + .where( + eq( + storageAdapter.tables.recordsTable.tableId, + resourceToHex({ type: "table", namespace: "", name: "NumberList" }) + ) + ) ).toMatchInlineSnapshot(` [ { "address": "0x6E9474e9c83676B9A71133FF96Db43E7AA0a4342", - "key": "0x6E9474e9c83676B9A71133FF96Db43E7AA0a4342::NumberList", - "keySchema": {}, - "lastError": null, + "dynamicData": "0x000001a400000045", + "encodedLengths": "0x0000000000000000000000000000000000000000000000000800000000000008", + "isDeleted": false, + "key0": null, + "key1": null, + "keyBytes": "0x", "lastUpdatedBlockNumber": 12n, - "name": "NumberList", - "namespace": "", - "schemaVersion": 1, + "staticData": null, "tableId": "0x746200000000000000000000000000004e756d6265724c697374000000000000", - "valueSchema": { - "value": "uint32[]", - }, }, ] `); @@ -91,13 +88,9 @@ describe("postgresStorage", async () => { [ { "address": "0x6E9474e9c83676B9A71133FF96Db43E7AA0a4342", - "key": "0x6E9474e9c83676B9A71133FF96Db43E7AA0a4342::NumberList", "keySchema": {}, - "lastError": null, - "lastUpdatedBlockNumber": 12n, "name": "NumberList", "namespace": "", - "schemaVersion": 1, "tableId": "0x746200000000000000000000000000004e756d6265724c697374000000000000", "valueSchema": { "value": "uint32[]", @@ -110,15 +103,11 @@ describe("postgresStorage", async () => { expect(await db.select().from(sqlTable)).toMatchInlineSnapshot(` [ { - "__dynamicData": "0x000001a400000045", - "__encodedLengths": "0x0000000000000000000000000000000000000000000000000800000000000008", - "__isDeleted": false, - "__key": "0x", + "__keyBytes": "0x", "__lastUpdatedBlockNumber": 12n, - "__staticData": null, "value": [ - 420, - 69, + 0, + 0, ], }, ] diff --git a/packages/store-sync/src/postgres-decoded/createStorageAdapter.ts b/packages/store-sync/src/postgres-decoded/createStorageAdapter.ts new file mode 100644 index 0000000000..fe5208a4dc --- /dev/null +++ b/packages/store-sync/src/postgres-decoded/createStorageAdapter.ts @@ -0,0 +1,147 @@ +import { Hex, PublicClient, concatHex, getAddress } from "viem"; +import { PgDatabase, QueryResultHKT } from "drizzle-orm/pg-core"; +import { and, eq } from "drizzle-orm"; +import { buildTable } from "./buildTable"; +import { StoreConfig } from "@latticexyz/store"; +import { debug } from "./debug"; +import { StorageAdapter, StorageAdapterBlock } from "../common"; +import { isTableRegistrationLog } from "../isTableRegistrationLog"; +import { logToTable } from "../logToTable"; +import { decodeKey, decodeValueArgs } from "@latticexyz/protocol-parser"; +import { tables as internalTables } from "../postgres/tables"; +import { createStorageAdapter as createBytesStorageAdapter } from "../postgres/createStorageAdapter"; +import { setupTables } from "../postgres/setupTables"; +import { getTables } from "./getTables"; +import { hexToResource } from "@latticexyz/common"; + +// Currently assumes one DB per chain ID + +export type PostgresStorageAdapter = { + storageAdapter: StorageAdapter; + tables: typeof internalTables; + cleanUp: () => Promise; +}; + +export async function createStorageAdapter({ + database, + publicClient, + config, +}: { + database: PgDatabase; + publicClient: PublicClient; + config?: TConfig; +}): Promise { + const bytesStorageAdapter = await createBytesStorageAdapter({ database, publicClient, config }); + const cleanUp: (() => Promise)[] = []; + + async function postgresStorageAdapter({ blockNumber, logs }: StorageAdapterBlock): Promise { + await bytesStorageAdapter.storageAdapter({ blockNumber, logs }); + + const newTables = logs.filter(isTableRegistrationLog).map(logToTable); + const newSqlTables = newTables.map(buildTable); + cleanUp.push(await setupTables(database, newSqlTables)); + + // TODO: cache these inside `getTables`? + const tables = await getTables( + database, + logs.map((log) => ({ address: log.address, tableId: log.args.tableId })) + ); + + // TODO: check if DB schema/table was created? + + // This is currently parallelized per world (each world has its own database). + // This may need to change if we decide to put multiple worlds into one DB (e.g. a namespace per world, but all under one DB). + // If so, we'll probably want to wrap the entire block worth of operations in a transaction. + + await database.transaction(async (tx) => { + for (const log of logs) { + const table = tables.find( + (table) => getAddress(table.address) === getAddress(log.address) && table.tableId === log.args.tableId + ); + if (!table) { + const { namespace, name } = hexToResource(log.args.tableId); + debug(`table registration record for ${namespace}:${name} not found, skipping log`, log); + continue; + } + + const sqlTable = buildTable(table); + const keyBytes = concatHex(log.args.keyTuple as Hex[]); + const key = decodeKey(table.keySchema, log.args.keyTuple); + + if ( + log.eventName === "Store_SetRecord" || + log.eventName === "Store_SpliceStaticData" || + log.eventName === "Store_SpliceDynamicData" + ) { + const record = await database + .select() + .from(internalTables.recordsTable) + .where( + and( + eq(internalTables.recordsTable.address, log.address), + eq(internalTables.recordsTable.tableId, log.args.tableId), + eq(internalTables.recordsTable.keyBytes, keyBytes) + ) + ) + .limit(1) + // Get the first record in a way that returns a possible `undefined` + // TODO: move this to `.findFirst` after upgrading drizzle or `rows[0]` after enabling `noUncheckedIndexedAccess: true` + .then((rows) => rows.find(() => true)); + if (!record) { + const { namespace, name } = hexToResource(log.args.tableId); + debug(`no record found for ${log.args.keyTuple} in table ${namespace}:${name}, skipping log`, log); + continue; + } + + const value = decodeValueArgs(table.valueSchema, { + staticData: record.staticData ?? "0x", + encodedLengths: record.encodedLengths ?? "0x", + dynamicData: record.encodedLengths ?? "0x", + }); + + debug("upserting record", { + namespace: table.namespace, + name: table.name, + key, + }); + + await tx + .insert(sqlTable) + .values({ + __keyBytes: keyBytes, + __lastUpdatedBlockNumber: blockNumber, + ...key, + ...value, + }) + .onConflictDoUpdate({ + target: sqlTable.__keyBytes, + set: { + __lastUpdatedBlockNumber: blockNumber, + ...value, + }, + }) + .execute(); + } else if (log.eventName === "Store_DeleteRecord") { + debug("deleting record", { + namespace: table.namespace, + name: table.name, + key, + }); + + await tx.delete(sqlTable).where(eq(sqlTable.__keyBytes, keyBytes)).execute(); + } + } + }); + } + + return { + storageAdapter: postgresStorageAdapter, + tables: internalTables, + cleanUp: async (): Promise => { + for (const fn of cleanUp) { + await fn(); + } + await bytesStorageAdapter.cleanUp(); + }, + }; +} diff --git a/packages/store-sync/src/postgres-decoded/debug.ts b/packages/store-sync/src/postgres-decoded/debug.ts new file mode 100644 index 0000000000..607afe7505 --- /dev/null +++ b/packages/store-sync/src/postgres-decoded/debug.ts @@ -0,0 +1,3 @@ +import { debug as parentDebug } from "../debug"; + +export const debug = parentDebug.extend("postgres-decoded"); diff --git a/packages/store-sync/src/postgres-decoded/getTables.ts b/packages/store-sync/src/postgres-decoded/getTables.ts new file mode 100644 index 0000000000..b3ba2bd1ea --- /dev/null +++ b/packages/store-sync/src/postgres-decoded/getTables.ts @@ -0,0 +1,43 @@ +import { PgDatabase } from "drizzle-orm/pg-core"; +import { and, eq, or } from "drizzle-orm"; +import { Table, storeTables } from "../common"; +import { tables as internalTables } from "../postgres/tables"; +import { Hex } from "viem"; +import { decodeDynamicField } from "@latticexyz/protocol-parser"; +import { logToTable } from "../logToTable"; + +export async function getTables( + db: PgDatabase, + filters: { address: Hex | null; tableId: Hex | null }[] = [] +): Promise { + const conditions = filters.map((filter) => + and( + filter.address != null ? eq(internalTables.recordsTable.address, filter.address) : undefined, + filter.tableId != null ? eq(internalTables.recordsTable.key0, filter.tableId) : undefined + ) + ); + + const records = await db + .select() + .from(internalTables.recordsTable) + .where(and(eq(internalTables.recordsTable.tableId, storeTables.Tables.tableId), or(...conditions))); + + const logs = records.map( + (record) => + ({ + address: record.address, + eventName: "Store_SetRecord", + args: { + tableId: record.tableId, + keyTuple: decodeDynamicField("bytes32[]", record.keyBytes), + staticData: record.staticData ?? "0x", + encodedLengths: record.encodedLengths ?? "0x", + dynamicData: record.dynamicData ?? "0x", + }, + } as const) + ); + + const tables = logs.map(logToTable); + + return tables; +} diff --git a/packages/store-sync/src/postgres-decoded/index.ts b/packages/store-sync/src/postgres-decoded/index.ts new file mode 100644 index 0000000000..76ed5fcfb7 --- /dev/null +++ b/packages/store-sync/src/postgres-decoded/index.ts @@ -0,0 +1,5 @@ +export * from "./buildTable"; +export * from "./cleanDatabase"; +export * from "./createStorageAdapter"; +export * from "./getTables"; +export * from "./syncToPostgres"; diff --git a/packages/store-sync/src/postgres-decoded/syncToPostgres.ts b/packages/store-sync/src/postgres-decoded/syncToPostgres.ts new file mode 100644 index 0000000000..b4c98627e7 --- /dev/null +++ b/packages/store-sync/src/postgres-decoded/syncToPostgres.ts @@ -0,0 +1,51 @@ +import { StoreConfig } from "@latticexyz/store"; +import { PgDatabase } from "drizzle-orm/pg-core"; +import { SyncOptions, SyncResult } from "../common"; +import { createStorageAdapter } from "./createStorageAdapter"; +import { createStoreSync } from "../createStoreSync"; + +type SyncToPostgresOptions = SyncOptions & { + /** + * [Postgres database object from Drizzle][0]. + * + * [0]: https://orm.drizzle.team/docs/installation-and-db-connection/postgresql/postgresjs + */ + database: PgDatabase; + startSync?: boolean; +}; + +type SyncToPostgresResult = SyncResult & { + stopSync: () => void; +}; + +/** + * Creates an indexer to process and store blockchain events. + * + * @param {CreateIndexerOptions} options See `CreateIndexerOptions`. + * @returns A function to unsubscribe from the block stream, effectively stopping the indexer. + */ +export async function syncToPostgres({ + config, + database, + publicClient, + startSync = true, + ...syncOptions +}: SyncToPostgresOptions): Promise { + const { storageAdapter } = await createStorageAdapter({ database, publicClient, config }); + const storeSync = await createStoreSync({ + storageAdapter, + config, + publicClient, + ...syncOptions, + }); + + const sub = startSync ? storeSync.storedBlockLogs$.subscribe() : null; + const stopSync = (): void => { + sub?.unsubscribe(); + }; + + return { + ...storeSync, + stopSync, + }; +} diff --git a/packages/store-sync/src/postgres/buildInternalTables.ts b/packages/store-sync/src/postgres/buildInternalTables.ts deleted file mode 100644 index bd0b7372e0..0000000000 --- a/packages/store-sync/src/postgres/buildInternalTables.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { integer, pgSchema, text } from "drizzle-orm/pg-core"; -import { transformSchemaName } from "./transformSchemaName"; -import { asAddress, asBigInt, asHex, asJson, asNumber } from "./columnTypes"; -import { KeySchema, ValueSchema } from "@latticexyz/protocol-parser"; - -// eslint-disable-next-line @typescript-eslint/explicit-function-return-type -export function buildInternalTables() { - const schema = pgSchema(transformSchemaName("__mud_internal")); - return { - chain: schema.table("chain", { - // TODO: change schema version to varchar/text? - schemaVersion: integer("schema_version").notNull().primaryKey(), - chainId: asNumber("chain_id", "bigint").notNull().primaryKey(), - lastUpdatedBlockNumber: asBigInt("last_updated_block_number", "numeric"), - // TODO: last block hash? - lastError: text("last_error"), - }), - tables: schema.table("tables", { - schemaVersion: integer("schema_version").primaryKey(), - key: text("key").notNull().primaryKey(), - address: asAddress("address").notNull(), - tableId: asHex("table_id").notNull(), - namespace: text("namespace").notNull(), - name: text("name").notNull(), - keySchema: asJson("key_schema").notNull(), - valueSchema: asJson("value_schema").notNull(), - lastUpdatedBlockNumber: asBigInt("last_updated_block_number", "numeric"), - // TODO: last block hash? - lastError: text("last_error"), - }), - }; -} diff --git a/packages/store-sync/src/postgres/buildTable.test.ts b/packages/store-sync/src/postgres/buildTable.test.ts deleted file mode 100644 index b13b991393..0000000000 --- a/packages/store-sync/src/postgres/buildTable.test.ts +++ /dev/null @@ -1,1226 +0,0 @@ -import { describe, it, expect } from "vitest"; -import { buildTable } from "./buildTable"; - -describe("buildTable", () => { - it("should create table from schema", async () => { - const table = buildTable({ - address: "0xffffffffffffffffffffffffffffffffffffffff", - namespace: "test", - name: "users", - keySchema: { x: "uint32", y: "uint32" }, - valueSchema: { name: "string", addr: "address" }, - }); - - expect(table).toMatchInlineSnapshot(` - PgTable { - "__dynamicData": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__dynamicData", - "notNull": false, - "primaryKey": false, - "uniqueName": "users___dynamicData_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__dynamicData", - "notNull": false, - "primary": false, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___dynamicData_unique", - "uniqueType": undefined, - }, - "__encodedLengths": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__encodedLengths", - "notNull": false, - "primaryKey": false, - "uniqueName": "users___encodedLengths_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__encodedLengths", - "notNull": false, - "primary": false, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___encodedLengths_unique", - "uniqueType": undefined, - }, - "__isDeleted": PgBoolean { - "columnType": "PgBoolean", - "config": { - "columnType": "PgBoolean", - "dataType": "boolean", - "default": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__isDeleted", - "notNull": true, - "primaryKey": false, - "uniqueName": "users___isDeleted_unique", - "uniqueType": undefined, - }, - "dataType": "boolean", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__isDeleted", - "notNull": true, - "primary": false, - "table": [Circular], - "uniqueName": "users___isDeleted_unique", - "uniqueType": undefined, - }, - "__key": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__key", - "notNull": true, - "primaryKey": true, - "uniqueName": "users___key_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__key", - "notNull": true, - "primary": true, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___key_unique", - "uniqueType": undefined, - }, - "__lastUpdatedBlockNumber": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__lastUpdatedBlockNumber", - "notNull": true, - "primaryKey": false, - "uniqueName": "users___lastUpdatedBlockNumber_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__lastUpdatedBlockNumber", - "notNull": true, - "primary": false, - "sqlName": "numeric", - "table": [Circular], - "uniqueName": "users___lastUpdatedBlockNumber_unique", - "uniqueType": undefined, - }, - "__staticData": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__staticData", - "notNull": false, - "primaryKey": false, - "uniqueName": "users___staticData_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__staticData", - "notNull": false, - "primary": false, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___staticData_unique", - "uniqueType": undefined, - }, - "addr": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "addr", - "notNull": true, - "primaryKey": false, - "uniqueName": "users_addr_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "addr", - "notNull": true, - "primary": false, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users_addr_unique", - "uniqueType": undefined, - }, - "name": PgText { - "columnType": "PgText", - "config": { - "columnType": "PgText", - "dataType": "string", - "default": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "name": "name", - "notNull": true, - "primaryKey": false, - "uniqueName": "users_name_unique", - "uniqueType": undefined, - }, - "dataType": "string", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "name": "name", - "notNull": true, - "primary": false, - "table": [Circular], - "uniqueName": "users_name_unique", - "uniqueType": undefined, - }, - "x": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "x", - "notNull": true, - "primaryKey": false, - "uniqueName": "users_x_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "x", - "notNull": true, - "primary": false, - "sqlName": "integer", - "table": [Circular], - "uniqueName": "users_x_unique", - "uniqueType": undefined, - }, - "y": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "y", - "notNull": true, - "primaryKey": false, - "uniqueName": "users_y_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "y", - "notNull": true, - "primary": false, - "sqlName": "integer", - "table": [Circular], - "uniqueName": "users_y_unique", - "uniqueType": undefined, - }, - Symbol(drizzle:Name): "users", - Symbol(drizzle:OriginalName): "users", - Symbol(drizzle:Schema): "0xFFfFfFffFFfffFFfFFfFFFFFffFFFffffFfFFFfF__test", - Symbol(drizzle:Columns): { - "__dynamicData": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__dynamicData", - "notNull": false, - "primaryKey": false, - "uniqueName": "users___dynamicData_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__dynamicData", - "notNull": false, - "primary": false, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___dynamicData_unique", - "uniqueType": undefined, - }, - "__encodedLengths": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__encodedLengths", - "notNull": false, - "primaryKey": false, - "uniqueName": "users___encodedLengths_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__encodedLengths", - "notNull": false, - "primary": false, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___encodedLengths_unique", - "uniqueType": undefined, - }, - "__isDeleted": PgBoolean { - "columnType": "PgBoolean", - "config": { - "columnType": "PgBoolean", - "dataType": "boolean", - "default": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__isDeleted", - "notNull": true, - "primaryKey": false, - "uniqueName": "users___isDeleted_unique", - "uniqueType": undefined, - }, - "dataType": "boolean", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__isDeleted", - "notNull": true, - "primary": false, - "table": [Circular], - "uniqueName": "users___isDeleted_unique", - "uniqueType": undefined, - }, - "__key": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__key", - "notNull": true, - "primaryKey": true, - "uniqueName": "users___key_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__key", - "notNull": true, - "primary": true, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___key_unique", - "uniqueType": undefined, - }, - "__lastUpdatedBlockNumber": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__lastUpdatedBlockNumber", - "notNull": true, - "primaryKey": false, - "uniqueName": "users___lastUpdatedBlockNumber_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__lastUpdatedBlockNumber", - "notNull": true, - "primary": false, - "sqlName": "numeric", - "table": [Circular], - "uniqueName": "users___lastUpdatedBlockNumber_unique", - "uniqueType": undefined, - }, - "__staticData": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__staticData", - "notNull": false, - "primaryKey": false, - "uniqueName": "users___staticData_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__staticData", - "notNull": false, - "primary": false, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___staticData_unique", - "uniqueType": undefined, - }, - "addr": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "addr", - "notNull": true, - "primaryKey": false, - "uniqueName": "users_addr_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "addr", - "notNull": true, - "primary": false, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users_addr_unique", - "uniqueType": undefined, - }, - "name": PgText { - "columnType": "PgText", - "config": { - "columnType": "PgText", - "dataType": "string", - "default": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "name": "name", - "notNull": true, - "primaryKey": false, - "uniqueName": "users_name_unique", - "uniqueType": undefined, - }, - "dataType": "string", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "name": "name", - "notNull": true, - "primary": false, - "table": [Circular], - "uniqueName": "users_name_unique", - "uniqueType": undefined, - }, - "x": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "x", - "notNull": true, - "primaryKey": false, - "uniqueName": "users_x_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "x", - "notNull": true, - "primary": false, - "sqlName": "integer", - "table": [Circular], - "uniqueName": "users_x_unique", - "uniqueType": undefined, - }, - "y": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "y", - "notNull": true, - "primaryKey": false, - "uniqueName": "users_y_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "y", - "notNull": true, - "primary": false, - "sqlName": "integer", - "table": [Circular], - "uniqueName": "users_y_unique", - "uniqueType": undefined, - }, - }, - Symbol(drizzle:BaseName): "users", - Symbol(drizzle:IsAlias): false, - Symbol(drizzle:ExtraConfigBuilder): undefined, - Symbol(drizzle:IsDrizzleTable): true, - Symbol(drizzle:PgInlineForeignKeys): [], - } - `); - }); - - it("can create a singleton table", async () => { - const table = buildTable({ - address: "0xffffffffffffffffffffffffffffffffffffffff", - namespace: "test", - name: "users", - keySchema: {}, - valueSchema: { addrs: "address[]" }, - }); - - expect(table).toMatchInlineSnapshot(` - PgTable { - "__dynamicData": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__dynamicData", - "notNull": false, - "primaryKey": false, - "uniqueName": "users___dynamicData_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__dynamicData", - "notNull": false, - "primary": false, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___dynamicData_unique", - "uniqueType": undefined, - }, - "__encodedLengths": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__encodedLengths", - "notNull": false, - "primaryKey": false, - "uniqueName": "users___encodedLengths_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__encodedLengths", - "notNull": false, - "primary": false, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___encodedLengths_unique", - "uniqueType": undefined, - }, - "__isDeleted": PgBoolean { - "columnType": "PgBoolean", - "config": { - "columnType": "PgBoolean", - "dataType": "boolean", - "default": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__isDeleted", - "notNull": true, - "primaryKey": false, - "uniqueName": "users___isDeleted_unique", - "uniqueType": undefined, - }, - "dataType": "boolean", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__isDeleted", - "notNull": true, - "primary": false, - "table": [Circular], - "uniqueName": "users___isDeleted_unique", - "uniqueType": undefined, - }, - "__key": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__key", - "notNull": true, - "primaryKey": true, - "uniqueName": "users___key_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__key", - "notNull": true, - "primary": true, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___key_unique", - "uniqueType": undefined, - }, - "__lastUpdatedBlockNumber": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__lastUpdatedBlockNumber", - "notNull": true, - "primaryKey": false, - "uniqueName": "users___lastUpdatedBlockNumber_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__lastUpdatedBlockNumber", - "notNull": true, - "primary": false, - "sqlName": "numeric", - "table": [Circular], - "uniqueName": "users___lastUpdatedBlockNumber_unique", - "uniqueType": undefined, - }, - "__staticData": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__staticData", - "notNull": false, - "primaryKey": false, - "uniqueName": "users___staticData_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__staticData", - "notNull": false, - "primary": false, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___staticData_unique", - "uniqueType": undefined, - }, - "addrs": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "addrs", - "notNull": true, - "primaryKey": false, - "uniqueName": "users_addrs_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "addrs", - "notNull": true, - "primary": false, - "sqlName": "text", - "table": [Circular], - "uniqueName": "users_addrs_unique", - "uniqueType": undefined, - }, - Symbol(drizzle:Name): "users", - Symbol(drizzle:OriginalName): "users", - Symbol(drizzle:Schema): "0xFFfFfFffFFfffFFfFFfFFFFFffFFFffffFfFFFfF__test", - Symbol(drizzle:Columns): { - "__dynamicData": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__dynamicData", - "notNull": false, - "primaryKey": false, - "uniqueName": "users___dynamicData_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__dynamicData", - "notNull": false, - "primary": false, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___dynamicData_unique", - "uniqueType": undefined, - }, - "__encodedLengths": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__encodedLengths", - "notNull": false, - "primaryKey": false, - "uniqueName": "users___encodedLengths_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__encodedLengths", - "notNull": false, - "primary": false, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___encodedLengths_unique", - "uniqueType": undefined, - }, - "__isDeleted": PgBoolean { - "columnType": "PgBoolean", - "config": { - "columnType": "PgBoolean", - "dataType": "boolean", - "default": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__isDeleted", - "notNull": true, - "primaryKey": false, - "uniqueName": "users___isDeleted_unique", - "uniqueType": undefined, - }, - "dataType": "boolean", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__isDeleted", - "notNull": true, - "primary": false, - "table": [Circular], - "uniqueName": "users___isDeleted_unique", - "uniqueType": undefined, - }, - "__key": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__key", - "notNull": true, - "primaryKey": true, - "uniqueName": "users___key_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__key", - "notNull": true, - "primary": true, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___key_unique", - "uniqueType": undefined, - }, - "__lastUpdatedBlockNumber": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__lastUpdatedBlockNumber", - "notNull": true, - "primaryKey": false, - "uniqueName": "users___lastUpdatedBlockNumber_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__lastUpdatedBlockNumber", - "notNull": true, - "primary": false, - "sqlName": "numeric", - "table": [Circular], - "uniqueName": "users___lastUpdatedBlockNumber_unique", - "uniqueType": undefined, - }, - "__staticData": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "__staticData", - "notNull": false, - "primaryKey": false, - "uniqueName": "users___staticData_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "__staticData", - "notNull": false, - "primary": false, - "sqlName": "bytea", - "table": [Circular], - "uniqueName": "users___staticData_unique", - "uniqueType": undefined, - }, - "addrs": PgCustomColumn { - "columnType": "PgCustomColumn", - "config": { - "columnType": "PgCustomColumn", - "customTypeParams": { - "dataType": [Function], - "fromDriver": [Function], - "toDriver": [Function], - }, - "dataType": "custom", - "default": undefined, - "fieldConfig": undefined, - "hasDefault": false, - "isUnique": false, - "name": "addrs", - "notNull": true, - "primaryKey": false, - "uniqueName": "users_addrs_unique", - "uniqueType": undefined, - }, - "dataType": "custom", - "default": undefined, - "defaultFn": undefined, - "enumValues": undefined, - "hasDefault": false, - "isUnique": false, - "mapFrom": [Function], - "mapTo": [Function], - "name": "addrs", - "notNull": true, - "primary": false, - "sqlName": "text", - "table": [Circular], - "uniqueName": "users_addrs_unique", - "uniqueType": undefined, - }, - }, - Symbol(drizzle:BaseName): "users", - Symbol(drizzle:IsAlias): false, - Symbol(drizzle:ExtraConfigBuilder): undefined, - Symbol(drizzle:IsDrizzleTable): true, - Symbol(drizzle:PgInlineForeignKeys): [], - } - `); - }); -}); diff --git a/packages/store-sync/src/postgres/cleanDatabase.ts b/packages/store-sync/src/postgres/cleanDatabase.ts index 26d22ad31c..e63e897b28 100644 --- a/packages/store-sync/src/postgres/cleanDatabase.ts +++ b/packages/store-sync/src/postgres/cleanDatabase.ts @@ -1,34 +1,23 @@ import { PgDatabase, getTableConfig } from "drizzle-orm/pg-core"; -import { buildInternalTables } from "./buildInternalTables"; -import { getTables } from "./getTables"; -import { buildTable } from "./buildTable"; -import { isDefined } from "@latticexyz/common/utils"; +import { tables } from "./tables"; import { debug } from "./debug"; import { sql } from "drizzle-orm"; import { pgDialect } from "./pgDialect"; +import { isDefined, unique } from "@latticexyz/common/utils"; // This intentionally just cleans up known schemas/tables/rows. We could drop the database but that's scary. export async function cleanDatabase(db: PgDatabase): Promise { - const internalTables = buildInternalTables(); - // TODO: check if internalTables schema matches, delete if not + const schemaNames = unique( + Object.values(tables) + .map((table) => getTableConfig(table).schema) + .filter(isDefined) + ); - const tables = (await getTables(db)).map(buildTable); - - const schemaNames = [...new Set(tables.map((table) => getTableConfig(table).schema))].filter(isDefined); - - for (const schemaName of schemaNames) { - try { - debug(`dropping namespace ${schemaName} and all of its tables`); - await db.execute(sql.raw(pgDialect.schema.dropSchema(schemaName).ifExists().cascade().compile().sql)); - } catch (error) { - debug(`failed to drop namespace ${schemaName}`, error); + await db.transaction(async (tx) => { + for (const schemaName of schemaNames) { + debug(`dropping schema ${schemaName} and all of its tables`); + await tx.execute(sql.raw(pgDialect.schema.dropSchema(schemaName).ifExists().cascade().compile().sql)); } - } - - for (const internalTable of Object.values(internalTables)) { - const tableConfig = getTableConfig(internalTable); - debug(`deleting all rows from ${tableConfig.schema}.${tableConfig.name}`); - await db.delete(internalTable); - } + }); } diff --git a/packages/store-sync/src/postgres/createStorageAdapter.test.ts b/packages/store-sync/src/postgres/createStorageAdapter.test.ts new file mode 100644 index 0000000000..554c1315ee --- /dev/null +++ b/packages/store-sync/src/postgres/createStorageAdapter.test.ts @@ -0,0 +1,86 @@ +import { beforeEach, describe, expect, it } from "vitest"; +import { DefaultLogger, eq } from "drizzle-orm"; +import { drizzle } from "drizzle-orm/postgres-js"; +import postgres from "postgres"; +import { Hex, RpcLog, createPublicClient, decodeEventLog, formatLog, http } from "viem"; +import { foundry } from "viem/chains"; +import { PostgresStorageAdapter, createStorageAdapter } from "./createStorageAdapter"; +import { groupLogsByBlockNumber } from "@latticexyz/block-logs-stream"; +import { storeEventsAbi } from "@latticexyz/store"; +import { StoreEventsLog } from "../common"; +import worldRpcLogs from "../../../../test-data/world-logs.json"; +import { resourceToHex } from "@latticexyz/common"; + +const blocks = groupLogsByBlockNumber( + worldRpcLogs.map((log) => { + const { eventName, args } = decodeEventLog({ + abi: storeEventsAbi, + data: log.data as Hex, + topics: log.topics as [Hex, ...Hex[]], + strict: true, + }); + return formatLog(log as any as RpcLog, { args, eventName: eventName as string }) as StoreEventsLog; + }) +); + +describe("createStorageAdapter", async () => { + const db = drizzle(postgres(process.env.DATABASE_URL!), { + logger: new DefaultLogger(), + }); + + const publicClient = createPublicClient({ + chain: foundry, + transport: http(), + }); + + let storageAdapter: PostgresStorageAdapter; + + beforeEach(async () => { + storageAdapter = await createStorageAdapter({ database: db, publicClient }); + return storageAdapter.cleanUp; + }); + + it("should create tables and data from block log", async () => { + for (const block of blocks) { + await storageAdapter.storageAdapter(block); + } + + expect(await db.select().from(storageAdapter.tables.chainTable)).toMatchInlineSnapshot(` + [ + { + "chainId": 31337, + "lastUpdatedBlockNumber": 12n, + }, + ] + `); + + expect( + await db + .select() + .from(storageAdapter.tables.recordsTable) + .where( + eq( + storageAdapter.tables.recordsTable.tableId, + resourceToHex({ type: "table", namespace: "", name: "NumberList" }) + ) + ) + ).toMatchInlineSnapshot(` + [ + { + "address": "0x6E9474e9c83676B9A71133FF96Db43E7AA0a4342", + "dynamicData": "0x000001a400000045", + "encodedLengths": "0x0000000000000000000000000000000000000000000000000800000000000008", + "isDeleted": false, + "key0": null, + "key1": null, + "keyBytes": "0x", + "lastUpdatedBlockNumber": 12n, + "staticData": null, + "tableId": "0x746200000000000000000000000000004e756d6265724c697374000000000000", + }, + ] + `); + + await storageAdapter.cleanUp(); + }); +}); diff --git a/packages/store-sync/src/postgres/createStorageAdapter.ts b/packages/store-sync/src/postgres/createStorageAdapter.ts new file mode 100644 index 0000000000..da587559d4 --- /dev/null +++ b/packages/store-sync/src/postgres/createStorageAdapter.ts @@ -0,0 +1,222 @@ +import { PublicClient, encodePacked, size } from "viem"; +import { PgDatabase, QueryResultHKT } from "drizzle-orm/pg-core"; +import { and, eq } from "drizzle-orm"; +import { StoreConfig } from "@latticexyz/store"; +import { debug } from "./debug"; +import { tables } from "./tables"; +import { spliceHex } from "@latticexyz/common"; +import { setupTables } from "./setupTables"; +import { StorageAdapter, StorageAdapterBlock } from "../common"; + +// Currently assumes one DB per chain ID + +export type PostgresStorageAdapter = { + storageAdapter: StorageAdapter; + tables: typeof tables; + cleanUp: () => Promise; +}; + +export async function createStorageAdapter({ + database, + publicClient, +}: { + database: PgDatabase; + publicClient: PublicClient; + config?: TConfig; +}): Promise { + const cleanUp: (() => Promise)[] = []; + + const chainId = publicClient.chain?.id ?? (await publicClient.getChainId()); + + cleanUp.push(await setupTables(database, Object.values(tables))); + + async function postgresStorageAdapter({ blockNumber, logs }: StorageAdapterBlock): Promise { + await database.transaction(async (tx) => { + for (const log of logs) { + const keyBytes = encodePacked(["bytes32[]"], [log.args.keyTuple]); + + if (log.eventName === "Store_SetRecord") { + debug("upserting record", { + address: log.address, + tableId: log.args.tableId, + keyTuple: log.args.keyTuple, + }); + + await tx + .insert(tables.recordsTable) + .values({ + address: log.address, + tableId: log.args.tableId, + keyBytes, + key0: log.args.keyTuple[0], + key1: log.args.keyTuple[1], + staticData: log.args.staticData, + encodedLengths: log.args.encodedLengths, + dynamicData: log.args.dynamicData, + lastUpdatedBlockNumber: blockNumber, + isDeleted: false, + }) + .onConflictDoUpdate({ + target: [tables.recordsTable.address, tables.recordsTable.tableId, tables.recordsTable.keyBytes], + set: { + staticData: log.args.staticData, + encodedLengths: log.args.encodedLengths, + dynamicData: log.args.dynamicData, + lastUpdatedBlockNumber: blockNumber, + isDeleted: false, + }, + }) + .execute(); + } else if (log.eventName === "Store_SpliceStaticData") { + // TODO: replace this operation with SQL `overlay()` (https://www.postgresql.org/docs/9.3/functions-binarystring.html) + + const previousValue = await tx + .select({ staticData: tables.recordsTable.staticData }) + .from(tables.recordsTable) + .where( + and( + eq(tables.recordsTable.address, log.address), + eq(tables.recordsTable.tableId, log.args.tableId), + eq(tables.recordsTable.keyBytes, keyBytes) + ) + ) + .limit(1) + .execute() + // Get the first record in a way that returns a possible `undefined` + // TODO: move this to `.findFirst` after upgrading drizzle or `rows[0]` after enabling `noUncheckedIndexedAccess: true` + .then((rows) => rows.find(() => true)); + + const previousStaticData = previousValue?.staticData ?? "0x"; + const newStaticData = spliceHex(previousStaticData, log.args.start, size(log.args.data), log.args.data); + + debug("upserting record via splice static", { + address: log.address, + tableId: log.args.tableId, + keyTuple: log.args.keyTuple, + }); + + await tx + .insert(tables.recordsTable) + .values({ + address: log.address, + tableId: log.args.tableId, + keyBytes, + key0: log.args.keyTuple[0], + key1: log.args.keyTuple[1], + staticData: newStaticData, + lastUpdatedBlockNumber: blockNumber, + isDeleted: false, + }) + .onConflictDoUpdate({ + target: [tables.recordsTable.address, tables.recordsTable.tableId, tables.recordsTable.keyBytes], + set: { + staticData: newStaticData, + lastUpdatedBlockNumber: blockNumber, + isDeleted: false, + }, + }) + .execute(); + } else if (log.eventName === "Store_SpliceDynamicData") { + // TODO: replace this operation with SQL `overlay()` (https://www.postgresql.org/docs/9.3/functions-binarystring.html) + + const previousValue = await tx + .select({ dynamicData: tables.recordsTable.dynamicData }) + .from(tables.recordsTable) + .where( + and( + eq(tables.recordsTable.address, log.address), + eq(tables.recordsTable.tableId, log.args.tableId), + eq(tables.recordsTable.keyBytes, keyBytes) + ) + ) + .limit(1) + .execute() + // Get the first record in a way that returns a possible `undefined` + // TODO: move this to `.findFirst` after upgrading drizzle or `rows[0]` after enabling `noUncheckedIndexedAccess: true` + .then((rows) => rows.find(() => true)); + + const previousDynamicData = previousValue?.dynamicData ?? "0x"; + const newDynamicData = spliceHex(previousDynamicData, log.args.start, log.args.deleteCount, log.args.data); + + debug("upserting record via splice dynamic", { + address: log.address, + tableId: log.args.tableId, + keyTuple: log.args.keyTuple, + }); + + await tx + .insert(tables.recordsTable) + .values({ + address: log.address, + tableId: log.args.tableId, + keyBytes, + key0: log.args.keyTuple[0], + key1: log.args.keyTuple[1], + encodedLengths: log.args.encodedLengths, + dynamicData: newDynamicData, + lastUpdatedBlockNumber: blockNumber, + isDeleted: false, + }) + .onConflictDoUpdate({ + target: [tables.recordsTable.address, tables.recordsTable.tableId, tables.recordsTable.keyBytes], + set: { + encodedLengths: log.args.encodedLengths, + dynamicData: newDynamicData, + lastUpdatedBlockNumber: blockNumber, + isDeleted: false, + }, + }) + .execute(); + } else if (log.eventName === "Store_DeleteRecord") { + debug("deleting record", { + address: log.address, + tableId: log.args.tableId, + keyTuple: log.args.keyTuple, + }); + + await tx + .update(tables.recordsTable) + .set({ + staticData: null, + encodedLengths: null, + dynamicData: null, + lastUpdatedBlockNumber: blockNumber, + isDeleted: true, + }) + .where( + and( + eq(tables.recordsTable.address, log.address), + eq(tables.recordsTable.tableId, log.args.tableId), + eq(tables.recordsTable.keyBytes, keyBytes) + ) + ) + .execute(); + } + } + + await tx + .insert(tables.chainTable) + .values({ + chainId, + lastUpdatedBlockNumber: blockNumber, + }) + .onConflictDoUpdate({ + target: [tables.chainTable.chainId], + set: { + lastUpdatedBlockNumber: blockNumber, + }, + }) + .execute(); + }); + } + + return { + storageAdapter: postgresStorageAdapter, + tables, + cleanUp: async (): Promise => { + for (const fn of cleanUp) { + await fn(); + } + }, + }; +} diff --git a/packages/store-sync/src/postgres/getTableKey.ts b/packages/store-sync/src/postgres/getTableKey.ts deleted file mode 100644 index c1d08a0c29..0000000000 --- a/packages/store-sync/src/postgres/getTableKey.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { getAddress } from "viem"; -import { Table } from "../common"; -import { hexToResource } from "@latticexyz/common"; - -export function getTableKey({ address, tableId }: Pick): string { - const { namespace, name } = hexToResource(tableId); - return `${getAddress(address)}:${namespace}:${name}`; -} diff --git a/packages/store-sync/src/postgres/getTables.ts b/packages/store-sync/src/postgres/getTables.ts deleted file mode 100644 index b218ec2f03..0000000000 --- a/packages/store-sync/src/postgres/getTables.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { PgDatabase } from "drizzle-orm/pg-core"; -import { getTableName, inArray } from "drizzle-orm"; -import { Table } from "../common"; -import { buildInternalTables } from "./buildInternalTables"; -import { buildTable } from "./buildTable"; -import { debug } from "./debug"; -import { isDefined } from "@latticexyz/common/utils"; - -export async function getTables(db: PgDatabase, keys: string[] = []): Promise { - const internalTables = buildInternalTables(); - - const tables = await db - .select() - .from(internalTables.tables) - .where(keys.length ? inArray(internalTables.tables.key, [...new Set(keys)]) : undefined); - - const validTables = await Promise.all( - tables.map(async (table) => { - const sqlTable = buildTable(table); - try { - await db.select({ key: sqlTable.__key }).from(sqlTable).limit(1); - return table; - } catch (error) { - debug("Could not query table, skipping", getTableName(sqlTable), error); - } - }) - ); - - return validTables.filter(isDefined); -} diff --git a/packages/store-sync/src/postgres/index.ts b/packages/store-sync/src/postgres/index.ts index dd1f7c115f..05a54b4890 100644 --- a/packages/store-sync/src/postgres/index.ts +++ b/packages/store-sync/src/postgres/index.ts @@ -1,9 +1,6 @@ -export * from "./buildTable"; export * from "./cleanDatabase"; -export * from "./getTables"; -export * from "./buildInternalTables"; -export * from "./schemaVersion"; -export * from "./postgresStorage"; +export * from "./columnTypes"; +export * from "./createStorageAdapter"; export * from "./setupTables"; export * from "./syncToPostgres"; -export * from "./columnTypes"; +export * from "./tables"; diff --git a/packages/store-sync/src/postgres/postgresStorage.ts b/packages/store-sync/src/postgres/postgresStorage.ts deleted file mode 100644 index 111894aada..0000000000 --- a/packages/store-sync/src/postgres/postgresStorage.ts +++ /dev/null @@ -1,294 +0,0 @@ -import { Hex, PublicClient, concatHex, size } from "viem"; -import { PgDatabase, QueryResultHKT } from "drizzle-orm/pg-core"; -import { eq, getTableName, inArray } from "drizzle-orm"; -import { buildTable } from "./buildTable"; -import { StoreConfig } from "@latticexyz/store"; -import { debug } from "./debug"; -import { buildInternalTables } from "./buildInternalTables"; -import { getTables } from "./getTables"; -import { schemaVersion } from "./schemaVersion"; -import { hexToResource, spliceHex } from "@latticexyz/common"; -import { setupTables } from "./setupTables"; -import { getTableKey } from "./getTableKey"; -import { StorageAdapter, StorageAdapterBlock } from "../common"; -import { isTableRegistrationLog } from "../isTableRegistrationLog"; -import { logToTable } from "../logToTable"; -import { decodeKey, decodeValueArgs } from "@latticexyz/protocol-parser"; - -// Currently assumes one DB per chain ID - -export type PostgresStorageAdapter = { - storageAdapter: StorageAdapter; - internalTables: ReturnType; - cleanUp: () => Promise; -}; - -export async function postgresStorage({ - database, - publicClient, -}: { - database: PgDatabase; - publicClient: PublicClient; - config?: TConfig; -}): Promise { - const cleanUp: (() => Promise)[] = []; - - const chainId = publicClient.chain?.id ?? (await publicClient.getChainId()); - - const internalTables = buildInternalTables(); - cleanUp.push(await setupTables(database, Object.values(internalTables))); - - async function postgresStorageAdapter({ blockNumber, logs }: StorageAdapterBlock): Promise { - const newTables = logs.filter(isTableRegistrationLog).map(logToTable); - const newSqlTables = newTables.map(buildTable); - - cleanUp.push(await setupTables(database, newSqlTables)); - - await database.transaction(async (tx) => { - for (const table of newTables) { - await tx - .insert(internalTables.tables) - .values({ - schemaVersion, - key: getTableKey(table), - ...table, - lastUpdatedBlockNumber: blockNumber, - }) - .onConflictDoNothing() - .execute(); - } - }); - - const tables = await getTables( - database, - logs.map((log) => getTableKey({ address: log.address, tableId: log.args.tableId })) - ); - - // This is currently parallelized per world (each world has its own database). - // This may need to change if we decide to put multiple worlds into one DB (e.g. a namespace per world, but all under one DB). - // If so, we'll probably want to wrap the entire block worth of operations in a transaction. - - await database.transaction(async (tx) => { - const tablesWithOperations = tables.filter((table) => - logs.some((log) => getTableKey({ address: log.address, tableId: log.args.tableId }) === getTableKey(table)) - ); - if (tablesWithOperations.length) { - await tx - .update(internalTables.tables) - .set({ lastUpdatedBlockNumber: blockNumber }) - .where(inArray(internalTables.tables.key, [...new Set(tablesWithOperations.map(getTableKey))])) - .execute(); - } - - for (const log of logs) { - const table = tables.find( - (table) => getTableKey(table) === getTableKey({ address: log.address, tableId: log.args.tableId }) - ); - if (!table) { - const { namespace, name } = hexToResource(log.args.tableId); - debug(`table ${namespace}:${name} not found, skipping log`, log); - continue; - } - - const sqlTable = buildTable(table); - const uniqueKey = concatHex(log.args.keyTuple as Hex[]); - const key = decodeKey(table.keySchema, log.args.keyTuple); - - debug(log.eventName, log); - - if (log.eventName === "Store_SetRecord") { - const value = decodeValueArgs(table.valueSchema, log.args); - debug("upserting record", { - namespace: table.namespace, - name: table.name, - key, - value, - }); - await tx - .insert(sqlTable) - .values({ - __key: uniqueKey, - __staticData: log.args.staticData, - __encodedLengths: log.args.encodedLengths, - __dynamicData: log.args.dynamicData, - __lastUpdatedBlockNumber: blockNumber, - __isDeleted: false, - ...key, - ...value, - }) - .onConflictDoUpdate({ - target: sqlTable.__key, - set: { - __staticData: log.args.staticData, - __encodedLengths: log.args.encodedLengths, - __dynamicData: log.args.dynamicData, - __lastUpdatedBlockNumber: blockNumber, - __isDeleted: false, - ...value, - }, - }) - .execute(); - } else if (log.eventName === "Store_SpliceStaticData") { - // TODO: verify that this returns what we expect (doesn't error/undefined on no record) - const previousValue = await tx - .select() - .from(sqlTable) - .where(eq(sqlTable.__key, uniqueKey)) - .execute() - .then( - (rows) => rows[0], - (error) => (error instanceof Error ? error : new Error(String(error))) - ); - if (previousValue instanceof Error) { - // https://github.com/latticexyz/mud/issues/1923 - debug( - "Could not query previous value for splice static data, skipping update", - getTableName(sqlTable), - uniqueKey, - previousValue - ); - continue; - } - const previousStaticData = (previousValue?.__staticData as Hex) ?? "0x"; - const newStaticData = spliceHex(previousStaticData, log.args.start, size(log.args.data), log.args.data); - const newValue = decodeValueArgs(table.valueSchema, { - staticData: newStaticData, - encodedLengths: (previousValue?.__encodedLengths as Hex) ?? "0x", - dynamicData: (previousValue?.__dynamicData as Hex) ?? "0x", - }); - debug("upserting record via splice static", { - namespace: table.namespace, - name: table.name, - key, - previousStaticData, - newStaticData, - previousValue, - newValue, - }); - await tx - .insert(sqlTable) - .values({ - __key: uniqueKey, - __staticData: newStaticData, - __lastUpdatedBlockNumber: blockNumber, - __isDeleted: false, - ...key, - ...newValue, - }) - .onConflictDoUpdate({ - target: sqlTable.__key, - set: { - __staticData: newStaticData, - __lastUpdatedBlockNumber: blockNumber, - __isDeleted: false, - ...newValue, - }, - }) - .execute(); - } else if (log.eventName === "Store_SpliceDynamicData") { - // TODO: verify that this returns what we expect (doesn't error/undefined on no record) - const previousValue = await tx - .select() - .from(sqlTable) - .where(eq(sqlTable.__key, uniqueKey)) - .execute() - .then( - (rows) => rows[0], - (error) => (error instanceof Error ? error : new Error(String(error))) - ); - if (previousValue instanceof Error) { - // https://github.com/latticexyz/mud/issues/1923 - debug( - "Could not query previous value for splice dynamic data, skipping update", - getTableName(sqlTable), - uniqueKey, - previousValue - ); - continue; - } - const previousDynamicData = (previousValue?.__dynamicData as Hex) ?? "0x"; - const newDynamicData = spliceHex(previousDynamicData, log.args.start, log.args.deleteCount, log.args.data); - const newValue = decodeValueArgs(table.valueSchema, { - staticData: (previousValue?.__staticData as Hex) ?? "0x", - // TODO: handle unchanged encoded lengths - encodedLengths: log.args.encodedLengths, - dynamicData: newDynamicData, - }); - debug("upserting record via splice dynamic", { - namespace: table.namespace, - name: table.name, - key, - previousDynamicData, - newDynamicData, - previousValue, - newValue, - }); - await tx - .insert(sqlTable) - .values({ - __key: uniqueKey, - // TODO: handle unchanged encoded lengths - __encodedLengths: log.args.encodedLengths, - __dynamicData: newDynamicData, - __lastUpdatedBlockNumber: blockNumber, - __isDeleted: false, - ...key, - ...newValue, - }) - .onConflictDoUpdate({ - target: sqlTable.__key, - set: { - // TODO: handle unchanged encoded lengths - __encodedLengths: log.args.encodedLengths, - __dynamicData: newDynamicData, - __lastUpdatedBlockNumber: blockNumber, - __isDeleted: false, - ...newValue, - }, - }) - .execute(); - } else if (log.eventName === "Store_DeleteRecord") { - // TODO: should we upsert so we at least have a DB record of when a thing was created/deleted within the same block? - debug("deleting record", { - namespace: table.namespace, - name: table.name, - key, - }); - await tx - .update(sqlTable) - .set({ - __lastUpdatedBlockNumber: blockNumber, - __isDeleted: true, - }) - .where(eq(sqlTable.__key, uniqueKey)) - .execute(); - } - } - - await tx - .insert(internalTables.chain) - .values({ - schemaVersion, - chainId, - lastUpdatedBlockNumber: blockNumber, - }) - .onConflictDoUpdate({ - target: [internalTables.chain.schemaVersion, internalTables.chain.chainId], - set: { - lastUpdatedBlockNumber: blockNumber, - }, - }) - .execute(); - }); - } - - return { - storageAdapter: postgresStorageAdapter, - internalTables, - cleanUp: async (): Promise => { - for (const fn of cleanUp) { - await fn(); - } - }, - }; -} diff --git a/packages/store-sync/src/postgres/schemaVersion.ts b/packages/store-sync/src/postgres/schemaVersion.ts deleted file mode 100644 index 397579c6b3..0000000000 --- a/packages/store-sync/src/postgres/schemaVersion.ts +++ /dev/null @@ -1,4 +0,0 @@ -// When this is incremented, it forces all indexers to reindex from scratch the next time they start up. -// Only use this when the schemas change, until we get proper schema migrations. -// TODO: instead of this, detect schema changes and drop/recreate tables as needed -export const schemaVersion = 1; diff --git a/packages/store-sync/src/postgres/setupTables.test.ts b/packages/store-sync/src/postgres/setupTables.test.ts index 93ab5b7dc8..c9a6c11add 100644 --- a/packages/store-sync/src/postgres/setupTables.test.ts +++ b/packages/store-sync/src/postgres/setupTables.test.ts @@ -1,19 +1,13 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; -import { buildInternalTables } from "./buildInternalTables"; +import { beforeEach, describe, expect, it } from "vitest"; +import { tables } from "./tables"; import { PgDatabase, QueryResultHKT } from "drizzle-orm/pg-core"; import { DefaultLogger } from "drizzle-orm"; import { drizzle } from "drizzle-orm/postgres-js"; import postgres from "postgres"; import { setupTables } from "./setupTables"; -import * as transformSchemaNameExports from "./transformSchemaName"; - -vi.spyOn(transformSchemaNameExports, "transformSchemaName").mockImplementation( - (schemaName) => `${process.pid}_${process.env.VITEST_POOL_ID}__${schemaName}` -); describe("setupTables", async () => { let db: PgDatabase; - const internalTables = buildInternalTables(); beforeEach(async () => { db = drizzle(postgres(process.env.DATABASE_URL!), { @@ -23,24 +17,20 @@ describe("setupTables", async () => { describe("before running", () => { it("should be missing schemas", async () => { - await expect(db.select().from(internalTables.chain)).rejects.toThrow( - /relation "\w+mud_internal.chain" does not exist/ - ); - await expect(db.select().from(internalTables.tables)).rejects.toThrow( - /relation "\w+mud_internal.tables" does not exist/ - ); + await expect(db.select().from(tables.chainTable)).rejects.toThrow(/relation "\w+mud.chain" does not exist/); + await expect(db.select().from(tables.recordsTable)).rejects.toThrow(/relation "\w+mud.records" does not exist/); }); }); describe("after running", () => { beforeEach(async () => { - const cleanUp = await setupTables(db, Object.values(internalTables)); + const cleanUp = await setupTables(db, Object.values(tables)); return cleanUp; }); it("should have schemas", async () => { - expect(await db.select().from(internalTables.chain)).toMatchInlineSnapshot("[]"); - expect(await db.select().from(internalTables.tables)).toMatchInlineSnapshot("[]"); + expect(await db.select().from(tables.chainTable)).toMatchInlineSnapshot("[]"); + expect(await db.select().from(tables.recordsTable)).toMatchInlineSnapshot("[]"); }); }); }); diff --git a/packages/store-sync/src/postgres/setupTables.ts b/packages/store-sync/src/postgres/setupTables.ts index d15f92ce54..94a2ae3762 100644 --- a/packages/store-sync/src/postgres/setupTables.ts +++ b/packages/store-sync/src/postgres/setupTables.ts @@ -1,7 +1,7 @@ import { AnyPgColumn, PgTableWithColumns, PgDatabase, getTableConfig } from "drizzle-orm/pg-core"; import { getTableColumns, sql } from "drizzle-orm"; import { ColumnDataType } from "kysely"; -import { isDefined } from "@latticexyz/common/utils"; +import { isDefined, unique } from "@latticexyz/common/utils"; import { debug } from "./debug"; import { pgDialect } from "./pgDialect"; @@ -9,10 +9,7 @@ export async function setupTables( db: PgDatabase, tables: PgTableWithColumns[] ): Promise<() => Promise> { - // TODO: add table to internal tables here - // TODO: look up table schema and check if it matches expected schema, drop if not - - const schemaNames = [...new Set(tables.map((table) => getTableConfig(table).schema).filter(isDefined))]; + const schemaNames = unique(tables.map((table) => getTableConfig(table).schema).filter(isDefined)); await db.transaction(async (tx) => { for (const schemaName of schemaNames) { @@ -39,13 +36,33 @@ export async function setupTables( }); } - const primaryKeys = columns.filter((column) => column.primary).map((column) => column.name); - if (primaryKeys.length) { - query = query.addPrimaryKeyConstraint(`${tableConfig.name}__pk`, primaryKeys as any); + const primaryKeyColumns = columns.filter((column) => column.primary).map((column) => column.name); + if (primaryKeyColumns.length) { + query = query.addPrimaryKeyConstraint( + `${tableConfig.name}_${primaryKeyColumns.join("_")}_pk`, + primaryKeyColumns as any + ); + } + + for (const pk of tableConfig.primaryKeys) { + query = query.addPrimaryKeyConstraint(pk.getName(), pk.columns.map((col) => col.name) as any); } debug(`creating table ${tableConfig.name} in namespace ${tableConfig.schema}`); await tx.execute(sql.raw(query.compile().sql)); + + for (const index of tableConfig.indexes) { + const columnNames = index.config.columns.map((col) => col.name); + let query = scopedDb.schema + .createIndex(index.config.name ?? `${tableConfig.name}_${columnNames.join("_")}_index`) + .on(tableConfig.name) + .columns(columnNames) + .ifNotExists(); + if (index.config.unique) { + query = query.unique(); + } + await tx.execute(sql.raw(query.compile().sql)); + } } }); diff --git a/packages/store-sync/src/postgres/syncToPostgres.ts b/packages/store-sync/src/postgres/syncToPostgres.ts index 10ea4cf2af..b4c98627e7 100644 --- a/packages/store-sync/src/postgres/syncToPostgres.ts +++ b/packages/store-sync/src/postgres/syncToPostgres.ts @@ -1,7 +1,7 @@ import { StoreConfig } from "@latticexyz/store"; import { PgDatabase } from "drizzle-orm/pg-core"; import { SyncOptions, SyncResult } from "../common"; -import { postgresStorage } from "./postgresStorage"; +import { createStorageAdapter } from "./createStorageAdapter"; import { createStoreSync } from "../createStoreSync"; type SyncToPostgresOptions = SyncOptions & { @@ -31,7 +31,7 @@ export async function syncToPostgres( startSync = true, ...syncOptions }: SyncToPostgresOptions): Promise { - const { storageAdapter } = await postgresStorage({ database, publicClient, config }); + const { storageAdapter } = await createStorageAdapter({ database, publicClient, config }); const storeSync = await createStoreSync({ storageAdapter, config, diff --git a/packages/store-sync/src/postgres/tables.ts b/packages/store-sync/src/postgres/tables.ts new file mode 100644 index 0000000000..40ec19a7ca --- /dev/null +++ b/packages/store-sync/src/postgres/tables.ts @@ -0,0 +1,44 @@ +import { boolean, index, pgSchema, primaryKey } from "drizzle-orm/pg-core"; +import { transformSchemaName } from "./transformSchemaName"; +import { asAddress, asBigInt, asHex, asNumber } from "./columnTypes"; + +const schemaName = transformSchemaName("mud"); + +/** + * Singleton table for the state of the chain we're indexing + */ +const chainTable = pgSchema(schemaName).table("chain", { + chainId: asNumber("chain_id", "bigint").notNull().primaryKey(), + lastUpdatedBlockNumber: asBigInt("last_updated_block_number", "numeric"), +}); + +const recordsTable = pgSchema(schemaName).table( + "records", + { + address: asAddress("address").notNull(), + tableId: asHex("table_id").notNull(), + /** + * `keyBytes` is equivalent to `abi.encodePacked(bytes32[] keyTuple)` + */ + keyBytes: asHex("key_bytes").notNull(), + key0: asHex("key0"), + key1: asHex("key1"), + staticData: asHex("static_data"), + encodedLengths: asHex("encoded_lengths"), + dynamicData: asHex("dynamic_data"), + isDeleted: boolean("is_deleted"), + lastUpdatedBlockNumber: asBigInt("last_updated_block_number", "numeric"), + }, + (table) => ({ + pk: primaryKey(table.address, table.tableId, table.keyBytes), + key0Index: index("key0_index").on(table.address, table.tableId, table.key0), + key1Index: index("key1_index").on(table.address, table.tableId, table.key1), + // TODO: add indices for querying without table ID + // TODO: add indices for querying multiple keys + }) +); + +export const tables = { + chainTable, + recordsTable, +}; diff --git a/packages/store-sync/src/postgres/transformSchemaName.ts b/packages/store-sync/src/postgres/transformSchemaName.ts index 63693eafbb..050997a042 100644 --- a/packages/store-sync/src/postgres/transformSchemaName.ts +++ b/packages/store-sync/src/postgres/transformSchemaName.ts @@ -1,4 +1,9 @@ -// This is overridden in tests to better parallelize against the same database +/** + * Helps parallelize creating/altering tables in tests + */ export function transformSchemaName(schemaName: string): string { + if (process.env.NODE_ENV === "test") { + return `test_${process.env.VITEST_POOL_ID}__${schemaName}`; + } return schemaName; } diff --git a/packages/store-sync/src/tableToLog.ts b/packages/store-sync/src/tableToLog.ts index 67e13f4b4b..ec1670101e 100644 --- a/packages/store-sync/src/tableToLog.ts +++ b/packages/store-sync/src/tableToLog.ts @@ -9,6 +9,9 @@ import { encodeAbiParameters, parseAbiParameters } from "viem"; import { StorageAdapterLog, Table, storeTables } from "./common"; import { flattenSchema } from "./flattenSchema"; +/** + * @internal + */ export function tableToLog(table: Table): StorageAdapterLog & { eventName: "Store_SetRecord" } { return { eventName: "Store_SetRecord", diff --git a/packages/store-sync/src/trpc-indexer/common.ts b/packages/store-sync/src/trpc-indexer/common.ts index 731c940362..eee07715a3 100644 --- a/packages/store-sync/src/trpc-indexer/common.ts +++ b/packages/store-sync/src/trpc-indexer/common.ts @@ -1,9 +1,17 @@ import { Hex } from "viem"; -import { SyncFilter, TableWithRecords } from "../common"; +import { StorageAdapterBlock, SyncFilter, TableWithRecords } from "../common"; export type QueryAdapter = { + /** + * @deprecated + */ findAll: (opts: { chainId: number; address?: Hex; filters?: SyncFilter[] }) => Promise<{ blockNumber: bigint | null; tables: TableWithRecords[]; }>; + getLogs: (opts: { + readonly chainId: number; + readonly address?: Hex; + readonly filters?: readonly SyncFilter[]; + }) => Promise; }; diff --git a/packages/store-sync/src/trpc-indexer/createAppRouter.ts b/packages/store-sync/src/trpc-indexer/createAppRouter.ts index 4d9272e61d..9090c6fa1c 100644 --- a/packages/store-sync/src/trpc-indexer/createAppRouter.ts +++ b/packages/store-sync/src/trpc-indexer/createAppRouter.ts @@ -11,6 +11,28 @@ export function createAppRouter() { }); return t.router({ + getLogs: t.procedure + .input( + z.object({ + chainId: z.number(), + address: z.string().refine(isHex).optional(), + filters: z + .array( + z.object({ + tableId: z.string().refine(isHex), + key0: z.string().refine(isHex).optional(), + key1: z.string().refine(isHex).optional(), + }) + ) + .optional(), + }) + ) + .query(async (opts): ReturnType => { + const { queryAdapter } = opts.ctx; + const { chainId, address, filters } = opts.input; + return queryAdapter.getLogs({ chainId, address, filters }); + }), + findAll: t.procedure .input( z.object({ diff --git a/packages/store-sync/tsup.config.ts b/packages/store-sync/tsup.config.ts index e2345f3a5b..15719d50b0 100644 --- a/packages/store-sync/tsup.config.ts +++ b/packages/store-sync/tsup.config.ts @@ -5,6 +5,7 @@ export default defineConfig({ "src/index.ts", "src/sqlite/index.ts", "src/postgres/index.ts", + "src/postgres-decoded/index.ts", "src/recs/index.ts", "src/trpc-indexer/index.ts", "src/zustand/index.ts", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a275a971d9..20e7b4c2cd 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -845,9 +845,6 @@ importers: '@trpc/server': specifier: 10.34.0 version: 10.34.0 - '@wagmi/chains': - specifier: ^0.2.22 - version: 0.2.22(typescript@5.1.6) better-sqlite3: specifier: ^8.6.0 version: 8.6.0 @@ -3681,17 +3678,6 @@ packages: pretty-format: 27.5.1 dev: true - /@wagmi/chains@0.2.22(typescript@5.1.6): - resolution: {integrity: sha512-TdiOzJT6TO1JrztRNjTA5Quz+UmQlbvWFG8N41u9tta0boHA1JCAzGGvU6KuIcOmJfRJkKOUIt67wlbopCpVHg==} - peerDependencies: - typescript: '>=4.9.4' - peerDependenciesMeta: - typescript: - optional: true - dependencies: - typescript: 5.1.6 - dev: false - /abab@2.0.6: resolution: {integrity: sha512-j2afSsaIENvHZN2B8GOpF566vZ5WVk5opAiMTvWgaQT8DkbOqsTfvNAvHoRGU2zzP8cPoqys+xHTRDWW8L+/BA==} dev: true