From 68e31b7f89b115d09eaafd81c9722d16ad4d6f15 Mon Sep 17 00:00:00 2001 From: Giems <109511301+Giems@users.noreply.github.com> Date: Tue, 30 Jul 2024 14:42:47 +0200 Subject: [PATCH 1/3] inject odin dependency into sui indexer --- .cargo/config | 3 + Cargo.lock | 416 +++++++++++++++++++++++++++++++--- Cargo.toml | 3 + crates/sui-indexer/Cargo.toml | 16 +- 4 files changed, 406 insertions(+), 32 deletions(-) diff --git a/.cargo/config b/.cargo/config index f79d78b2050dd..ae82ace2b5c8d 100644 --- a/.cargo/config +++ b/.cargo/config @@ -41,3 +41,6 @@ mysql-clippy = [ [build] rustflags = ["-C", "force-frame-pointers=yes", "-C", "force-unwind-tables=yes"] + +[net] +git-fetch-with-cli = true \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index bac5e68573f27..a8162edf3ed13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,7 +164,7 @@ dependencies = [ "async-trait", "bincode", "bytes", - "ed25519", + "ed25519 1.5.3", "futures", "hex", "http", @@ -585,6 +585,39 @@ dependencies = [ "futures-lite", ] +[[package]] +name = "async-nats" +version = "0.35.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab8df97cb8fc4a884af29ab383e9292ea0939cfcdd7d2a17179086dc6c427e7f" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures", + "memchr", + "nkeys", + "nuid", + "once_cell", + "portable-atomic 1.7.0", + "rand 0.8.5", + "regex", + "ring 0.17.8", + "rustls-native-certs 0.7.1", + "rustls-pemfile 2.1.2", + "rustls-webpki 0.102.6", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror", + "time", + "tokio", + "tokio-rustls 0.26.0", + "tracing", + "tryhard", + "url", +] + [[package]] name = "async-recursion" version = "1.0.4" @@ -1169,7 +1202,7 @@ dependencies = [ "hyper", "pin-project-lite", "rustls 0.21.12", - "rustls-pemfile", + "rustls-pemfile 1.0.2", "tokio", "tokio-rustls 0.24.0", "tower-service", @@ -1405,6 +1438,27 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" +[[package]] +name = "bitcode" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48bc1c27654127a24c476d40198746860ef56475f41a601bfa5c4d0f832968f0" +dependencies = [ + "bitcode_derive", + "bytemuck", +] + +[[package]] +name = "bitcode_derive" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a539389a13af092cd345a2b47ae7dec12deb306d660b2223d25cd3419b253ebe" +dependencies = [ + "proc-macro2 1.0.78", + "quote 1.0.35", + "syn 2.0.48", +] + [[package]] name = "bitcoin-private" version = "0.1.0" @@ -1612,7 +1666,7 @@ checksum = "b45ea9b00a7b3f2988e9a65ad3917e62123c38dba709b666506207be96d1790b" dependencies = [ "memchr", "once_cell", - "regex-automata", + "regex-automata 0.1.10", "serde", ] @@ -1711,7 +1765,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e10ca87c81aaa3a949dbbe2b5e6c2c45dbc94ba4897e45ea31ff9ec5087be3dc" dependencies = [ "cached_proc_macro_types", - "darling", + "darling 0.14.2", "proc-macro2 1.0.78", "quote 1.0.35", "syn 1.0.107", @@ -2424,6 +2478,32 @@ dependencies = [ "cipher", ] +[[package]] +name = "curve25519-dalek" +version = "4.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +dependencies = [ + "cfg-if", + "cpufeatures", + "curve25519-dalek-derive", + "digest 0.10.7", + "fiat-crypto", + "rustc_version", + "subtle", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2 1.0.78", + "quote 1.0.35", + "syn 2.0.48", +] + [[package]] name = "curve25519-dalek-ng" version = "4.1.1" @@ -2487,8 +2567,18 @@ version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0dd3cd20dc6b5a876612a6e5accfe7f3dd883db6d07acfbf14c128f61550dfa" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.14.2", + "darling_macro 0.14.2", +] + +[[package]] +name = "darling" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" +dependencies = [ + "darling_core 0.20.10", + "darling_macro 0.20.10", ] [[package]] @@ -2505,17 +2595,42 @@ dependencies = [ "syn 1.0.107", ] +[[package]] +name = "darling_core" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2 1.0.78", + "quote 1.0.35", + "strsim 0.11.1", + "syn 2.0.48", +] + [[package]] name = "darling_macro" version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7618812407e9402654622dd402b0a89dff9ba93badd6540781526117b92aab7e" dependencies = [ - "darling_core", + "darling_core 0.14.2", "quote 1.0.35", "syn 1.0.107", ] +[[package]] +name = "darling_macro" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" +dependencies = [ + "darling_core 0.20.10", + "quote 1.0.35", + "syn 2.0.48", +] + [[package]] name = "dashmap" version = "5.5.3" @@ -2668,7 +2783,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f" dependencies = [ - "darling", + "darling 0.14.2", "proc-macro2 1.0.78", "quote 1.0.35", "syn 1.0.107", @@ -2872,6 +2987,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "downcast" version = "0.11.0" @@ -2932,6 +3053,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "signature 2.0.0", +] + [[package]] name = "ed25519-consensus" version = "2.1.0" @@ -2947,6 +3077,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "ed25519-dalek" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871" +dependencies = [ + "curve25519-dalek", + "ed25519 2.2.3", + "sha2 0.10.6", + "signature 2.0.0", + "subtle", +] + [[package]] name = "either" version = "1.8.0" @@ -3604,6 +3747,12 @@ dependencies = [ "syn 1.0.107", ] +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "filetime" version = "0.2.22" @@ -4273,7 +4422,7 @@ dependencies = [ "hyper", "log", "rustls 0.20.7", - "rustls-native-certs", + "rustls-native-certs 0.6.2", "tokio", "tokio-rustls 0.23.4", "webpki-roots 0.22.6", @@ -4289,7 +4438,7 @@ dependencies = [ "hyper", "log", "rustls 0.21.12", - "rustls-native-certs", + "rustls-native-certs 0.6.2", "tokio", "tokio-rustls 0.24.0", "webpki-roots 0.23.1", @@ -4459,7 +4608,7 @@ checksum = "4295cbb7573c16d310e99e713cf9e75101eb190ab31fccd35f2d2691b4352b19" dependencies = [ "console", "number_prefix", - "portable-atomic", + "portable-atomic 0.3.19", "unicode-width", ] @@ -4667,7 +4816,7 @@ dependencies = [ "jsonrpsee-core", "jsonrpsee-types", "pin-project", - "rustls-native-certs", + "rustls-native-certs 0.6.2", "soketto", "thiserror", "tokio", @@ -5017,7 +5166,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" dependencies = [ - "regex-automata", + "regex-automata 0.1.10", ] [[package]] @@ -5890,7 +6039,7 @@ name = "msim-macros" version = "0.1.0" source = "git+https://github.com/MystenLabs/mysten-sim.git?rev=3a490f1f1035cc6b7845f6a82d697ce47b067fd9#3a490f1f1035cc6b7845f6a82d697ce47b067fd9" dependencies = [ - "darling", + "darling 0.14.2", "proc-macro2 1.0.78", "quote 1.0.35", "syn 1.0.107", @@ -6431,6 +6580,21 @@ dependencies = [ "libc", ] +[[package]] +name = "nkeys" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2de02c883c178998da8d0c9816a88ef7ef5c58314dd1585c97a4a5679f3ab337" +dependencies = [ + "data-encoding", + "ed25519 2.2.3", + "ed25519-dalek", + "getrandom 0.2.15", + "log", + "rand 0.8.5", + "signatory", +] + [[package]] name = "no-std-compat" version = "0.4.1" @@ -6527,6 +6691,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "nuid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "num" version = "0.4.1" @@ -6747,7 +6920,7 @@ dependencies = [ "rand 0.8.5", "reqwest", "ring 0.16.20", - "rustls-pemfile", + "rustls-pemfile 1.0.2", "serde", "serde_json", "snafu", @@ -6757,6 +6930,23 @@ dependencies = [ "walkdir", ] +[[package]] +name = "odin" +version = "0.1.0" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?branch=dependencies#a96959bb1d0656ed796a013518e5f799601ef176" +dependencies = [ + "async-nats", + "bitcode", + "futures", + "serde", + "serde_json", + "serde_with 3.8.1", + "structs", + "tokio", + "tracing", + "typeshare", +] + [[package]] name = "oid-registry" version = "0.6.1" @@ -7410,6 +7600,12 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26f6a7b87c2e435a3241addceeeff740ff8b7e76b74c13bf9acb17fa454ea00b" +[[package]] +name = "portable-atomic" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da544ee218f0d287a911e9c99a39a8c9bc8fcad3cb8db5959940044ecfc67265" + [[package]] name = "powerfmt" version = "0.2.0" @@ -8123,13 +8319,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.8.4" +version = "1.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f" +checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29" dependencies = [ "aho-corasick 1.0.2", "memchr", - "regex-syntax 0.7.2", + "regex-automata 0.3.7", + "regex-syntax 0.7.5", ] [[package]] @@ -8141,6 +8338,17 @@ dependencies = [ "regex-syntax 0.6.28", ] +[[package]] +name = "regex-automata" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629" +dependencies = [ + "aho-corasick 1.0.2", + "memchr", + "regex-syntax 0.7.5", +] + [[package]] name = "regex-syntax" version = "0.6.28" @@ -8149,9 +8357,9 @@ checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" [[package]] name = "regex-syntax" -version = "0.7.2" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" +checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" [[package]] name = "reqwest" @@ -8177,7 +8385,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls 0.21.12", - "rustls-pemfile", + "rustls-pemfile 1.0.2", "serde", "serde_json", "serde_urlencoded", @@ -8430,6 +8638,20 @@ dependencies = [ "sct", ] +[[package]] +name = "rustls" +version = "0.23.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" +dependencies = [ + "once_cell", + "ring 0.17.8", + "rustls-pki-types", + "rustls-webpki 0.102.6", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.2" @@ -8437,7 +8659,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 1.0.2", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-native-certs" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.2", + "rustls-pki-types", "schannel", "security-framework", ] @@ -8451,6 +8686,22 @@ dependencies = [ "base64 0.21.2", ] +[[package]] +name = "rustls-pemfile" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +dependencies = [ + "base64 0.22.1", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" + [[package]] name = "rustls-webpki" version = "0.100.3" @@ -8471,6 +8722,17 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "rustls-webpki" +version = "0.102.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e" +dependencies = [ + "ring 0.17.8", + "rustls-pki-types", + "untrusted 0.9.0", +] + [[package]] name = "rustversion" version = "1.0.11" @@ -8805,6 +9067,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_nanos" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" +dependencies = [ + "serde", +] + [[package]] name = "serde_path_to_error" version = "0.1.9" @@ -8816,13 +9087,13 @@ dependencies = [ [[package]] name = "serde_repr" -version = "0.1.10" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a5ec9fa74a20ebbe5d9ac23dac1fc96ba0ecfe9f50f2843b52e537b10fbcb4e" +checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 1.0.107", + "syn 2.0.48", ] [[package]] @@ -8867,7 +9138,7 @@ dependencies = [ "indexmap 1.9.3", "serde", "serde_json", - "serde_with_macros", + "serde_with_macros 2.1.0", "time", ] @@ -8880,9 +9151,12 @@ dependencies = [ "base64 0.22.1", "chrono", "hex", + "indexmap 1.9.3", + "indexmap 2.1.0", "serde", "serde_derive", "serde_json", + "serde_with_macros 3.8.1", "time", ] @@ -8892,12 +9166,24 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3452b4c0f6c1e357f73fdb87cd1efabaa12acf328c7a528e252893baeb3f4aa" dependencies = [ - "darling", + "darling 0.14.2", "proc-macro2 1.0.78", "quote 1.0.35", "syn 1.0.107", ] +[[package]] +name = "serde_with_macros" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65569b702f41443e8bc8bbb1c5779bd0450bbe723b56198980e80ec45780bce2" +dependencies = [ + "darling 0.20.10", + "proc-macro2 1.0.78", + "quote 1.0.35", + "syn 2.0.48", +] + [[package]] name = "serde_yaml" version = "0.8.26" @@ -9047,6 +9333,18 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" +dependencies = [ + "pkcs8 0.10.2", + "rand_core 0.6.4", + "signature 2.0.0", + "zeroize", +] + [[package]] name = "signature" version = "1.6.4" @@ -9302,6 +9600,21 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "structs" +version = "0.1.0" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?branch=dependencies#a96959bb1d0656ed796a013518e5f799601ef176" +dependencies = [ + "bitcode", + "dotenvy", + "once_cell", + "rayon", + "serde", + "serde_json", + "serde_with 3.8.1", + "typeshare", +] + [[package]] name = "strum" version = "0.24.1" @@ -9892,6 +10205,7 @@ dependencies = [ "move-core-types", "mysten-metrics", "ntest", + "odin", "prometheus", "rayon", "regex", @@ -10674,7 +10988,7 @@ dependencies = [ "anyhow", "axum", "axum-server", - "ed25519", + "ed25519 1.5.3", "fastcrypto", "pkcs8 0.9.0", "rand 0.8.5", @@ -11090,7 +11404,7 @@ version = "3.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57d187b450bfb5b7939f82f9747dc1ebb15a7a9c4a93cd304a41aece7149608b" dependencies = [ - "darling", + "darling 0.14.2", "if_chain", "lazy_static", "proc-macro2 1.0.78", @@ -11319,6 +11633,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls 0.23.12", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.14" @@ -11758,6 +12083,17 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "tryhard" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9f0a709784e86923586cff0d872dba54cd2d2e116b3bc57587d15737cfce9d" +dependencies = [ + "futures", + "pin-project-lite", + "tokio", +] + [[package]] name = "tui" version = "0.19.0" @@ -11864,6 +12200,28 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +[[package]] +name = "typeshare" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04f17399b76c2e743d58eac0635d7686e9c00f48cd4776f00695d9882a7d3187" +dependencies = [ + "chrono", + "serde", + "serde_json", + "typeshare-annotation", +] + +[[package]] +name = "typeshare-annotation" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a615d6c2764852a2e88a4f16e9ce1ea49bb776b5872956309e170d63a042a34f" +dependencies = [ + "quote 1.0.35", + "syn 2.0.48", +] + [[package]] name = "ucd-trie" version = "0.1.5" diff --git a/Cargo.toml b/Cargo.toml index 3b9f1635849d9..b5b205e7e8f42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -696,6 +696,9 @@ sui-execution = { path = "sui-execution" } # move-vm-runtime = { path = "external-crates/move/move-vm/runtime" } # move-bytecode-verifier = { path = "external-crates/move/move-bytecode-verifier" } + +odin = { git = "ssh://git@github.com/nightly-labs/alexandria.git", branch = "dependencies", package = "odin" } + # suiop dependencies docker-api = "0.12.2" field_names = "0.2.0" diff --git a/crates/sui-indexer/Cargo.toml b/crates/sui-indexer/Cargo.toml index 5bfd59fcbc4de..721e457c6d466 100644 --- a/crates/sui-indexer/Cargo.toml +++ b/crates/sui-indexer/Cargo.toml @@ -7,6 +7,8 @@ publish = false edition = "2021" [dependencies] +odin = { workspace = true, version = "0.1.0" } + anyhow.workspace = true async-trait.workspace = true axum.workspace = true @@ -25,7 +27,7 @@ prometheus.workspace = true serde.workspace = true serde_json.workspace = true rayon.workspace = true -regex.workspace = true +regex = "1.9.1" tempfile.workspace = true thiserror.workspace = true tracing.workspace = true @@ -61,8 +63,16 @@ downcast = "0.11.0" [features] pg_integration = [] default = ["postgres-feature"] -postgres-feature = ["diesel/postgres", "diesel/postgres_backend", "diesel-derive-enum/postgres"] -mysql-feature = ["diesel/mysql", "diesel/mysql_backend", "diesel-derive-enum/mysql"] +postgres-feature = [ + "diesel/postgres", + "diesel/postgres_backend", + "diesel-derive-enum/postgres", +] +mysql-feature = [ + "diesel/mysql", + "diesel/mysql_backend", + "diesel-derive-enum/mysql", +] [dev-dependencies] sui-keys.workspace = true From b6a70adccc2cb7abbd6c2c6f98acdb6d22aba485 Mon Sep 17 00:00:00 2001 From: Giems <109511301+Giems@users.noreply.github.com> Date: Wed, 31 Jul 2024 09:40:31 +0200 Subject: [PATCH 2/3] inject nats queue sender into a worker --- Cargo.lock | 7 +- Cargo.toml | 2 +- crates/sui-data-ingestion-core/Cargo.toml | 2 + .../sui-data-ingestion-core/src/executor.rs | 2 + crates/sui-indexer/src/db.rs | 3 + .../src/handlers/checkpoint_handler.rs | 6 ++ crates/sui-indexer/src/indexer.rs | 21 ++++- crates/sui-indexer/src/main.rs | 15 +++- crates/sui-indexer/src/test_utils.rs | 5 ++ crates/sui-types/Cargo.toml | 3 + crates/sui-types/src/lib.rs | 1 + crates/sui-types/src/nats_queue.rs | 86 +++++++++++++++++++ 12 files changed, 147 insertions(+), 6 deletions(-) create mode 100644 crates/sui-types/src/nats_queue.rs diff --git a/Cargo.lock b/Cargo.lock index a8162edf3ed13..5f494f372b874 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6933,7 +6933,7 @@ dependencies = [ [[package]] name = "odin" version = "0.1.0" -source = "git+ssh://git@github.com/nightly-labs/alexandria.git?branch=dependencies#a96959bb1d0656ed796a013518e5f799601ef176" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?branch=sui-indexer-nats-update#6be6c61597198176244fd364110a77661e0ceb64" dependencies = [ "async-nats", "bitcode", @@ -9603,7 +9603,7 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "structs" version = "0.1.0" -source = "git+ssh://git@github.com/nightly-labs/alexandria.git?branch=dependencies#a96959bb1d0656ed796a013518e5f799601ef176" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?branch=sui-indexer-nats-update#6be6c61597198176244fd364110a77661e0ceb64" dependencies = [ "bitcode", "dotenvy", @@ -10060,6 +10060,7 @@ dependencies = [ "mysten-metrics", "notify", "object_store", + "odin", "prometheus", "rand 0.8.5", "serde", @@ -11075,6 +11076,7 @@ dependencies = [ "num-bigint 0.4.4", "num-traits", "num_enum 0.6.1", + "odin", "once_cell", "parking_lot 0.12.1", "prometheus", @@ -11099,6 +11101,7 @@ dependencies = [ "sui-sdk 0.0.0", "tap", "thiserror", + "tokio", "tonic 0.11.0", "tracing", "typed-store-error", diff --git a/Cargo.toml b/Cargo.toml index b5b205e7e8f42..6bb8b40950f0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -697,7 +697,7 @@ sui-execution = { path = "sui-execution" } # move-bytecode-verifier = { path = "external-crates/move/move-bytecode-verifier" } -odin = { git = "ssh://git@github.com/nightly-labs/alexandria.git", branch = "dependencies", package = "odin" } +odin = { git = "ssh://git@github.com/nightly-labs/alexandria.git", branch = "sui-indexer-nats-update", package = "odin" } # suiop dependencies docker-api = "0.12.2" diff --git a/crates/sui-data-ingestion-core/Cargo.toml b/crates/sui-data-ingestion-core/Cargo.toml index 2c14170d76f4a..6b9aa4474f9af 100644 --- a/crates/sui-data-ingestion-core/Cargo.toml +++ b/crates/sui-data-ingestion-core/Cargo.toml @@ -7,6 +7,8 @@ edition = "2021" version = "0.1.0" [dependencies] +odin = { workspace = true, version = "0.1.0" } + anyhow.workspace = true async-trait.workspace = true backoff.workspace = true diff --git a/crates/sui-data-ingestion-core/src/executor.rs b/crates/sui-data-ingestion-core/src/executor.rs index 4ad9cde1213ec..927425e41becb 100644 --- a/crates/sui-data-ingestion-core/src/executor.rs +++ b/crates/sui-data-ingestion-core/src/executor.rs @@ -16,6 +16,7 @@ use std::path::PathBuf; use std::pin::Pin; use sui_types::full_checkpoint_content::CheckpointData; use sui_types::messages_checkpoint::CheckpointSequenceNumber; +use sui_types::nats_queue::NatsQueueSender; use tokio::sync::mpsc; use tokio::sync::oneshot; @@ -115,6 +116,7 @@ pub async fn setup_single_workflow( oneshot::Sender<()>, )> { let (exit_sender, exit_receiver) = oneshot::channel(); + // let metrics = DataIngestionMetrics::new(&Registry::new()); let progress_store = ShimProgressStore(initial_checkpoint_number); // let mut executor = IndexerExecutor::new(progress_store, 1, metrics); diff --git a/crates/sui-indexer/src/db.rs b/crates/sui-indexer/src/db.rs index 99b6df729463b..4dfc178df8b13 100644 --- a/crates/sui-indexer/src/db.rs +++ b/crates/sui-indexer/src/db.rs @@ -204,6 +204,7 @@ pub mod setup_postgres { use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use prometheus::Registry; use secrecy::ExposeSecret; + use sui_types::nats_queue::NatsQueueSender; use tracing::{error, info}; const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/pg"); @@ -257,6 +258,7 @@ pub mod setup_postgres { pub async fn setup( indexer_config: IndexerConfig, registry: Registry, + queue_sender: NatsQueueSender, ) -> Result<(), IndexerError> { let db_url_secret = indexer_config.get_db_url().map_err(|e| { IndexerError::PgPoolConnectionError(format!( @@ -318,6 +320,7 @@ pub mod setup_postgres { &indexer_config, store, indexer_metrics, + queue_sender, ) .await; } else if indexer_config.rpc_server_worker { diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index b3897f2fe8935..73835975eb2c7 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -20,6 +20,7 @@ use sui_types::dynamic_field::DynamicFieldType; use sui_types::messages_checkpoint::{ CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber, }; +use sui_types::nats_queue::NatsQueueSender; use sui_types::object::Object; use tokio_util::sync::CancellationToken; @@ -66,6 +67,7 @@ pub async fn new_handlers( metrics: IndexerMetrics, next_checkpoint_sequence_number: CheckpointSequenceNumber, cancel: CancellationToken, + queue_sender: NatsQueueSender, ) -> Result, IndexerError> where S: IndexerStore + Clone + Sync + Send + 'static, @@ -100,6 +102,7 @@ where metrics, indexed_checkpoint_sender, package_tx, + queue_sender, )) } @@ -111,6 +114,7 @@ pub struct CheckpointHandler { // they will be periodically GCed to avoid OOM. package_buffer: Arc>, package_resolver: Arc>>>, + nats_queue: NatsQueueSender, } #[async_trait] @@ -171,6 +175,7 @@ where metrics: IndexerMetrics, indexed_checkpoint_sender: mysten_metrics::metered_channel::Sender, package_tx: watch::Receiver>, + queue_sender: NatsQueueSender, ) -> Self { let package_buffer = IndexingPackageBuffer::start(package_tx); let pg_blocking_cp = Self::pg_blocking_cp(state.clone()).unwrap(); @@ -188,6 +193,7 @@ where indexed_checkpoint_sender, package_buffer, package_resolver, + nats_queue: queue_sender, } } diff --git a/crates/sui-indexer/src/indexer.rs b/crates/sui-indexer/src/indexer.rs index 5a8080fbb5ed4..f0b92f8a43057 100644 --- a/crates/sui-indexer/src/indexer.rs +++ b/crates/sui-indexer/src/indexer.rs @@ -3,10 +3,13 @@ use std::collections::HashMap; use std::env; +use std::sync::Arc; use anyhow::Result; use diesel::r2d2::R2D2Connection; +use odin::Odin; use prometheus::Registry; +use sui_types::nats_queue::NatsQueueSender; use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; use tracing::info; @@ -46,6 +49,7 @@ impl Indexer { config: &IndexerConfig, store: S, metrics: IndexerMetrics, + queue_sender: NatsQueueSender, ) -> Result<(), IndexerError> { let snapshot_config = SnapshotLagConfig::default(); Indexer::start_writer_with_config::( @@ -54,6 +58,7 @@ impl Indexer { metrics, snapshot_config, CancellationToken::new(), + queue_sender, ) .await } @@ -67,6 +72,7 @@ impl Indexer { metrics: IndexerMetrics, snapshot_config: SnapshotLagConfig, cancel: CancellationToken, + queue_sender: NatsQueueSender, ) -> Result<(), IndexerError> { info!( "Sui Indexer Writer (version {:?}) started...", @@ -124,8 +130,19 @@ impl Indexer { 1, // DataIngestionMetrics::new(&Registry::new()), ); - let worker = - new_handlers::(store, metrics, primary_watermark, cancel.clone()).await?; + + // Set init checkpoint for primary worker to the latest checkpoint sequence number. + queue_sender.init_checkpoint = primary_watermark; + queue_sender.run().await; + + let worker = new_handlers::( + store, + metrics, + primary_watermark, + cancel.clone(), + queue_sender, + ) + .await?; let worker_pool = WorkerPool::new(worker, "primary".to_string(), download_queue_size); executor.register(worker_pool).await?; diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index 02a92f7a507b2..10e089cf5fd0c 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -1,7 +1,11 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::sync::Arc; + use clap::Parser; +use odin::{get_odin, Odin}; +use sui_types::nats_queue::nats_queue; use tracing::info; use sui_indexer::errors::IndexerError; @@ -34,8 +38,17 @@ async fn main() -> Result<(), IndexerError> { .unwrap(), indexer_config.rpc_client_url.as_str(), )?; + + let odin_connection: Arc = Arc::new(get_odin().await); + let queue_sender = nats_queue(odin_connection.clone()); + #[cfg(feature = "postgres-feature")] - sui_indexer::db::setup_postgres::setup(indexer_config.clone(), registry.clone()).await?; + sui_indexer::db::setup_postgres::setup( + indexer_config.clone(), + registry.clone(), + queue_sender.clone(), + ) + .await?; #[cfg(feature = "mysql-feature")] #[cfg(not(feature = "postgres-feature"))] diff --git a/crates/sui-indexer/src/test_utils.rs b/crates/sui-indexer/src/test_utils.rs index 421039555ca9c..995217d06c616 100644 --- a/crates/sui-indexer/src/test_utils.rs +++ b/crates/sui-indexer/src/test_utils.rs @@ -4,6 +4,7 @@ use diesel::connection::SimpleConnection; use mysten_metrics::init_metrics; use secrecy::ExposeSecret; +use sui_types::nats_queue::NatsQueueSender; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; @@ -45,6 +46,7 @@ pub async fn start_test_indexer( rpc_url: String, reader_writer_config: ReaderWriterConfig, data_ingestion_path: PathBuf, + queue_sender: NatsQueueSender, ) -> (PgIndexerStore, JoinHandle>) { start_test_indexer_impl( db_url, @@ -53,6 +55,7 @@ pub async fn start_test_indexer( None, Some(data_ingestion_path), CancellationToken::new(), + queue_sender, ) .await } @@ -64,6 +67,7 @@ pub async fn start_test_indexer_impl( new_database: Option, data_ingestion_path: Option, cancel: CancellationToken, + queue_sender: NatsQueueSender, ) -> (PgIndexerStore, JoinHandle>) { // Reduce the connection pool size to 10 for testing // to prevent maxing out @@ -155,6 +159,7 @@ pub async fn start_test_indexer_impl( indexer_metrics, snapshot_config, cancel, + queue_sender, ) .await }) diff --git a/crates/sui-types/Cargo.toml b/crates/sui-types/Cargo.toml index 53d8707ce63f7..9408a697f19c5 100644 --- a/crates/sui-types/Cargo.toml +++ b/crates/sui-types/Cargo.toml @@ -7,6 +7,9 @@ publish = false edition = "2021" [dependencies] +odin = { workspace = true, version = "0.1.0" } +tokio.workspace = true + anemo.workspace = true anyhow.workspace = true bincode.workspace = true diff --git a/crates/sui-types/src/lib.rs b/crates/sui-types/src/lib.rs index 17de52e46d516..7fecba95167d3 100644 --- a/crates/sui-types/src/lib.rs +++ b/crates/sui-types/src/lib.rs @@ -67,6 +67,7 @@ pub mod mock_checkpoint_builder; pub mod move_package; pub mod multisig; pub mod multisig_legacy; +pub mod nats_queue; pub mod object; pub mod programmable_transaction_builder; pub mod quorum_driver_types; diff --git a/crates/sui-types/src/nats_queue.rs b/crates/sui-types/src/nats_queue.rs new file mode 100644 index 0000000000000..baff629cf7978 --- /dev/null +++ b/crates/sui-types/src/nats_queue.rs @@ -0,0 +1,86 @@ +// We want to make sure we send msg about block in correct order +// This task is responsible for gathering all the messages and sending them in correct order +// We will use a priority queue to store the messages and send them in correct order + +use odin::{sui_ws::SuiWsApiMsg, Odin}; +use std::{collections::BTreeMap, sync::Arc}; +use tokio::sync::{ + mpsc::{channel, Receiver, Sender}, + Mutex, +}; +use tracing::info; + +pub struct NatsQueueSender { + pub init_checkpoint: u64, + pub sender: Arc>, + pub receiver: Arc>>, + odin: Arc, +} + +pub fn nats_queue(odin: Arc) -> NatsQueueSender { + // Create sender and receiver + let (tx, rx) = channel::<(u64, SuiWsApiMsg)>(10_000); // 10k + + NatsQueueSender { + init_checkpoint: u64::MAX, + sender: Arc::new(tx), + receiver: Arc::new(Mutex::new(rx)), + odin, + } +} + +impl NatsQueueSender { + pub async fn run(&mut self) { + // Spawn task that will order the messages + let odin = self.odin.clone(); + let receiver = self.receiver.clone(); + let init_checkpoint = self.init_checkpoint; + + tokio::spawn(async move { + let mut next_index: u64 = init_checkpoint; // MAX means we have not received any message yet + + let mut receiver_lock = receiver.lock().await; + + // Log init checkpoint + info!("Nats queue init checkpoint: {}", init_checkpoint); + + //Cache if we get a message with a block number that is not in order + let mut bmap_checkpoints: BTreeMap> = BTreeMap::new(); + while let Some((checkpoint_seq_number, ws_update)) = receiver_lock.recv().await { + // Check if we have not received any message yet + if next_index == u64::MAX { + next_index = checkpoint_seq_number; + } + // Check if correct order + if checkpoint_seq_number == next_index { + // Send message + odin.publish_sui_ws_update(&ws_update).await; + + // Update next index + next_index = next_index + 1; + // Check if we have any cached messages + while let Some(next_checkpoint) = bmap_checkpoints.remove(&next_index) { + info!("Sending cached message with seq number {}", next_index); + + for ws_update in next_checkpoint.iter() { + odin.publish_sui_ws_update(&ws_update).await; + } + + // Update next index + next_index = next_index + 1; + } + } else { + info!( + "Received checkpoint with seq number {} but expected {}", + checkpoint_seq_number, next_index + ); + // Cache message + bmap_checkpoints + .entry(checkpoint_seq_number) + .or_insert(vec![]) + .push(ws_update); + } + } + }); + } +} From bf186853035106bc796850f4e8887b7e42eaa9b7 Mon Sep 17 00:00:00 2001 From: Giems <109511301+Giems@users.noreply.github.com> Date: Wed, 31 Jul 2024 14:10:40 +0200 Subject: [PATCH 3/3] balance changes --- Cargo.lock | 6 +- Cargo.toml | 2 +- .../src/handlers/checkpoint_handler.rs | 175 +++++++++++++++-- crates/sui-indexer/src/handlers/mod.rs | 33 +++- .../sui-indexer/src/handlers/tx_processor.rs | 48 +++++ crates/sui-indexer/src/indexer.rs | 2 +- crates/sui-indexer/src/main.rs | 8 +- crates/sui-indexer/src/types.rs | 33 ++++ .../sui-json-rpc-types/src/balance_changes.rs | 76 ++++++++ crates/sui-json-rpc/src/balance_changes.rs | 184 +++++++++++++++++- crates/sui-types/src/nats_queue.rs | 28 ++- 11 files changed, 560 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f494f372b874..068a39b9a20b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6933,7 +6933,7 @@ dependencies = [ [[package]] name = "odin" version = "0.1.0" -source = "git+ssh://git@github.com/nightly-labs/alexandria.git?branch=sui-indexer-nats-update#6be6c61597198176244fd364110a77661e0ceb64" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=ad438b1#ad438b1576469d76f9b2e7428a58fb27dfe1c561" dependencies = [ "async-nats", "bitcode", @@ -9603,7 +9603,7 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "structs" version = "0.1.0" -source = "git+ssh://git@github.com/nightly-labs/alexandria.git?branch=sui-indexer-nats-update#6be6c61597198176244fd364110a77661e0ceb64" +source = "git+ssh://git@github.com/nightly-labs/alexandria.git?rev=ad438b1#ad438b1576469d76f9b2e7428a58fb27dfe1c561" dependencies = [ "bitcode", "dotenvy", @@ -12136,7 +12136,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.8.5", + "rand 0.7.3", "static_assertions", ] diff --git a/Cargo.toml b/Cargo.toml index 6bb8b40950f0b..215c2265c0ac8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -697,7 +697,7 @@ sui-execution = { path = "sui-execution" } # move-bytecode-verifier = { path = "external-crates/move/move-bytecode-verifier" } -odin = { git = "ssh://git@github.com/nightly-labs/alexandria.git", branch = "sui-indexer-nats-update", package = "odin" } +odin = { git = "ssh://git@github.com/nightly-labs/alexandria.git", rev = "ad438b1", package = "odin" } # suiop dependencies docker-api = "0.12.2" diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index 73835975eb2c7..8c2ba76853c5d 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -3,12 +3,15 @@ use crate::handlers::committer::start_tx_checkpoint_commit_task; use crate::handlers::tx_processor::IndexingPackageBuffer; +use crate::handlers::CustomCheckpointDataToCommit; use crate::models::display::StoredDisplay; use async_trait::async_trait; use itertools::Itertools; use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout}; use move_core_types::language_storage::{StructTag, TypeTag}; use mysten_metrics::{get_metrics, spawn_monitored_task}; +use odin::sui_ws::{CoinCreated, CoinMutated, CoinObjectUpdateStatus}; +use odin::sui_ws::{SuiWsApiMsg, TokenBalanceUpdate, TokenUpdate}; use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Mutex}; use sui_package_resolver::{PackageStore, PackageStoreWithLruCache, Resolver}; @@ -20,17 +23,16 @@ use sui_types::dynamic_field::DynamicFieldType; use sui_types::messages_checkpoint::{ CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber, }; -use sui_types::nats_queue::NatsQueueSender; +use sui_types::nats_queue::{NatsQueueSender, WsPayload}; use sui_types::object::Object; -use tokio_util::sync::CancellationToken; - use tokio::sync::watch; +use tokio_util::sync::CancellationToken; use diesel::r2d2::R2D2Connection; use std::collections::hash_map::Entry; use std::collections::HashSet; use sui_data_ingestion_core::Worker; -use sui_json_rpc_types::SuiMoveValue; +use sui_json_rpc_types::{ObjectStatus, SuiMoveValue}; use sui_types::base_types::SequenceNumber; use sui_types::effects::{TransactionEffects, TransactionEffectsAPI}; use sui_types::event::SystemEpochInfoEvent; @@ -50,8 +52,8 @@ use crate::db::ConnectionPool; use crate::store::package_resolver::{IndexerStorePackageResolver, InterimPackageResolver}; use crate::store::{IndexerStore, PgIndexerStore}; use crate::types::{ - IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, IndexedObject, - IndexedPackage, IndexedTransaction, IndexerResult, TransactionKind, TxIndex, + CustomIndexedTransaction, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, + IndexedEvent, IndexedObject, IndexedPackage, IndexerResult, TransactionKind, TxIndex, }; use super::tx_processor::EpochEndIndexingObjectStore; @@ -151,7 +153,15 @@ where self.package_resolver.clone(), ) .await?; - self.indexed_checkpoint_sender.send(checkpoint_data).await?; + + // Send ws updates via nats + let nats_ws_payload = generate_ws_updates_from_checkpoint_data(&checkpoint_data); + self.nats_queue.sender.send(nats_ws_payload).await?; + + // Convert custom checkpoint data into original type + self.indexed_checkpoint_sender + .send(checkpoint_data.into()) + .await?; Ok(()) } @@ -280,7 +290,7 @@ where metrics: Arc, packages: Vec, package_resolver: Arc>, - ) -> Result { + ) -> Result { let checkpoint_seq = data.checkpoint_summary.sequence_number; info!(checkpoint_seq, "Indexing checkpoint data blob"); @@ -336,7 +346,7 @@ where checkpoint.sequence_number, time_now_ms, checkpoint.timestamp_ms ); - Ok(CheckpointDataToCommit { + Ok(CustomCheckpointDataToCommit { checkpoint, transactions: db_transactions, events: db_events, @@ -355,7 +365,7 @@ where checkpoint_contents: &CheckpointContents, metrics: &IndexerMetrics, ) -> IndexerResult<( - Vec, + Vec, Vec, Vec, BTreeMap, @@ -380,14 +390,14 @@ where let mut db_displays = BTreeMap::new(); let mut db_indices = Vec::new(); - for tx in transactions { + for checkpoint_tx in transactions { let CheckpointTransaction { transaction: sender_signed_data, effects: fx, events, input_objects, output_objects, - } = tx; + } = checkpoint_tx; // Unwrap safe - we checked they have equal length above let (tx_digest, tx_sequence_number) = tx_seq_num_iter.next().unwrap(); if tx_digest != *sender_signed_data.digest() { @@ -431,12 +441,30 @@ where .chain(output_objects.iter()) .collect::>(); + // Add out custom maps + let status_map = get_object_status_map(&fx); + let input_objects_to_owner = input_objects + .iter() + .map(|input_object| (input_object.id(), input_object.owner().clone())) + .collect::>(); + let output_objects_to_owner = output_objects + .iter() + .map(|output_object| (output_object.id(), output_object.owner().clone())) + .collect::>(); + let (balance_change, object_changes) = TxChangesProcessor::new(&objects, metrics.clone()) - .get_changes(tx, &fx, &tx_digest) + .custom_get_changes( + tx, + &fx, + &tx_digest, + status_map, + &input_objects_to_owner, + &output_objects_to_owner, + ) .await?; - let db_txn = IndexedTransaction { + let db_txn = CustomIndexedTransaction { tx_sequence_number, tx_digest, checkpoint_sequence_number: *checkpoint_summary.sequence_number(), @@ -720,6 +748,54 @@ where } } +pub fn get_object_status_map(effects: &TransactionEffects) -> HashMap { + let mut object_to_status: HashMap = HashMap::new(); + // Fill object_to_status objects + { + // Fill mutated objects + effects + .mutated() + .into_iter() + .map(|((id, _, _), _)| (id, ObjectStatus::Mutated)) + .collect::>() + .iter() + .for_each(|(k, v)| { + object_to_status.insert(*k, v.clone()); + }); + // Fill deleted objects + effects + .all_tombstones() + .into_iter() + .map(|(id, _)| (id, ObjectStatus::Deleted)) + .collect::>() + .iter() + .for_each(|(k, v)| { + object_to_status.insert(*k, v.clone()); + }); + // Fill created objects + effects + .created() + .into_iter() + .map(|((id, _, _), _)| (id, ObjectStatus::Created)) + .collect::>() + .iter() + .for_each(|(k, v)| { + object_to_status.insert(*k, v.clone()); + }); + // Fill created objects + effects + .unwrapped() + .into_iter() + .map(|((id, _, _), _)| (id, ObjectStatus::Created)) + .collect::>() + .iter() + .for_each(|(k, v)| { + object_to_status.insert(*k, v.clone()); + }); + } + object_to_status +} + async fn get_move_struct_layout_map( objects: &[Object], package_resolver: Arc>, @@ -881,3 +957,74 @@ fn try_create_dynamic_field_info( }, })) } + +pub fn generate_ws_updates_from_checkpoint_data( + checkpoint_data: &CustomCheckpointDataToCommit, +) -> WsPayload { + let block_number = checkpoint_data.checkpoint.sequence_number; + + let mut ws_balance_changes: HashMap = HashMap::new(); + + // transaction balance changes + for transaction in checkpoint_data.transactions.iter() { + for change in transaction.balance_change.iter() { + let user_address = match &change.owner.get_owner_address() { + Ok(address) => address.to_string(), + Err(_) => continue, + }; + let coin_type = change.coin_type.to_canonical_string(true); + + // Prepare ws update + let ws_update = ws_balance_changes + .entry(user_address.clone()) + .or_insert(TokenBalanceUpdate { + sui_address: user_address.clone(), + sequence_number: block_number, + changed_balances: HashMap::new(), + timestamp_ms: chrono::Utc::now().timestamp_millis() as u64, + }) + .changed_balances + .entry(coin_type.clone()) + .or_insert(TokenUpdate { + coin_type, + object_changes: HashMap::new(), + }); + match change.status { + // New coin is created + ObjectStatus::Created => { + // Update ws update + ws_update + .object_changes + .entry(change.object_id.clone()) + .or_insert(CoinObjectUpdateStatus::Created(CoinCreated { + amount: change.amount, + })); + } + ObjectStatus::Mutated => { + // Update ws update + ws_update + .object_changes + .entry(change.object_id.clone()) + .or_insert(CoinObjectUpdateStatus::Mutated(CoinMutated { + change: change.amount, + })); + } + ObjectStatus::Deleted => { + // Update ws update + ws_update + .object_changes + .entry(change.object_id.clone()) + .or_insert(CoinObjectUpdateStatus::Deleted); + } + } + } + } + + return ( + checkpoint_data.checkpoint.sequence_number, + ws_balance_changes + .into_values() + .map(|v| SuiWsApiMsg::TokenBalanceUpdate(v)) + .collect(), + ); +} diff --git a/crates/sui-indexer/src/handlers/mod.rs b/crates/sui-indexer/src/handlers/mod.rs index 587a13f22bb95..d5b570f813047 100644 --- a/crates/sui-indexer/src/handlers/mod.rs +++ b/crates/sui-indexer/src/handlers/mod.rs @@ -6,8 +6,8 @@ use std::collections::BTreeMap; use crate::{ models::display::StoredDisplay, types::{ - IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, IndexedObject, - IndexedPackage, IndexedTransaction, TxIndex, + CustomIndexedTransaction, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, + IndexedEvent, IndexedObject, IndexedPackage, IndexedTransaction, TxIndex, }, }; @@ -29,6 +29,35 @@ pub struct CheckpointDataToCommit { pub epoch: Option, } +#[derive(Debug)] +pub struct CustomCheckpointDataToCommit { + pub checkpoint: IndexedCheckpoint, + pub transactions: Vec, + pub events: Vec, + pub tx_indices: Vec, + pub display_updates: BTreeMap, + pub object_changes: TransactionObjectChangesToCommit, + pub object_history_changes: TransactionObjectChangesToCommit, + pub packages: Vec, + pub epoch: Option, +} + +impl From for CheckpointDataToCommit { + fn from(data: CustomCheckpointDataToCommit) -> Self { + Self { + checkpoint: data.checkpoint, + transactions: data.transactions.into_iter().map(Into::into).collect(), + events: data.events, + tx_indices: data.tx_indices, + display_updates: data.display_updates, + object_changes: data.object_changes, + object_history_changes: data.object_history_changes, + packages: data.packages, + epoch: data.epoch, + } + } +} + #[derive(Clone, Debug)] pub struct TransactionObjectChangesToCommit { pub changed_objects: Vec, diff --git a/crates/sui-indexer/src/handlers/tx_processor.rs b/crates/sui-indexer/src/handlers/tx_processor.rs index 04d96d6eafd04..64f1278caf454 100644 --- a/crates/sui-indexer/src/handlers/tx_processor.rs +++ b/crates/sui-indexer/src/handlers/tx_processor.rs @@ -7,7 +7,10 @@ use async_trait::async_trait; use mysten_metrics::monitored_scope; use mysten_metrics::spawn_monitored_task; +use sui_json_rpc::get_balance_changes_with_status_from_effect; +use sui_json_rpc_types::ObjectStatus; use sui_rest_api::CheckpointData; +use sui_types::object::Owner; use tokio::sync::watch; use std::collections::HashMap; @@ -211,6 +214,51 @@ impl TxChangesProcessor { .await?; Ok((balance_change, object_change)) } + + pub(crate) async fn custom_get_changes( + &self, + tx: &TransactionData, + effects: &TransactionEffects, + tx_digest: &TransactionDigest, + status_map: HashMap, + input_objects_to_owner: &HashMap, + output_objects_to_owner: &HashMap, + ) -> IndexerResult<( + Vec, + Vec, + )> { + let _timer = self + .metrics + .indexing_tx_object_changes_latency + .start_timer(); + let object_change: Vec<_> = get_object_changes( + self, + tx.sender(), + effects.modified_at_versions(), + effects.all_changed_objects(), + effects.all_removed_objects(), + ) + .await? + .into_iter() + .map(IndexedObjectChange::from) + .collect(); + let balance_change = get_balance_changes_with_status_from_effect( + self, + effects, + tx.input_objects().unwrap_or_else(|e| { + panic!( + "Checkpointed tx {:?} has inavlid input objects: {e}", + tx_digest, + ) + }), + None, + status_map, + input_objects_to_owner, + output_objects_to_owner, + ) + .await?; + Ok((balance_change, object_change)) + } } #[async_trait] diff --git a/crates/sui-indexer/src/indexer.rs b/crates/sui-indexer/src/indexer.rs index f0b92f8a43057..182e1c7a05d4c 100644 --- a/crates/sui-indexer/src/indexer.rs +++ b/crates/sui-indexer/src/indexer.rs @@ -72,7 +72,7 @@ impl Indexer { metrics: IndexerMetrics, snapshot_config: SnapshotLagConfig, cancel: CancellationToken, - queue_sender: NatsQueueSender, + mut queue_sender: NatsQueueSender, ) -> Result<(), IndexerError> { info!( "Sui Indexer Writer (version {:?}) started...", diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index 10e089cf5fd0c..99b03257a0ee2 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -43,12 +43,8 @@ async fn main() -> Result<(), IndexerError> { let queue_sender = nats_queue(odin_connection.clone()); #[cfg(feature = "postgres-feature")] - sui_indexer::db::setup_postgres::setup( - indexer_config.clone(), - registry.clone(), - queue_sender.clone(), - ) - .await?; + sui_indexer::db::setup_postgres::setup(indexer_config.clone(), registry.clone(), queue_sender) + .await?; #[cfg(feature = "mysql-feature")] #[cfg(not(feature = "postgres-feature"))] diff --git a/crates/sui-indexer/src/types.rs b/crates/sui-indexer/src/types.rs index c29bbd6c626b8..5f83fcbd6057e 100644 --- a/crates/sui-indexer/src/types.rs +++ b/crates/sui-indexer/src/types.rs @@ -353,6 +353,39 @@ pub struct IndexedTransaction { pub successful_tx_num: u64, } +#[derive(Debug, Clone)] +pub struct CustomIndexedTransaction { + pub tx_sequence_number: u64, + pub tx_digest: TransactionDigest, + pub sender_signed_data: SenderSignedData, + pub effects: TransactionEffects, + pub checkpoint_sequence_number: u64, + pub timestamp_ms: u64, + pub object_changes: Vec, + pub balance_change: Vec, + pub events: Vec, + pub transaction_kind: TransactionKind, + pub successful_tx_num: u64, +} + +impl From for IndexedTransaction { + fn from(tx: CustomIndexedTransaction) -> Self { + Self { + tx_sequence_number: tx.tx_sequence_number, + tx_digest: tx.tx_digest, + sender_signed_data: tx.sender_signed_data, + effects: tx.effects, + checkpoint_sequence_number: tx.checkpoint_sequence_number, + timestamp_ms: tx.timestamp_ms, + object_changes: tx.object_changes, + balance_change: tx.balance_change.into_iter().map(|bc| bc.into()).collect(), + events: tx.events, + transaction_kind: tx.transaction_kind, + successful_tx_num: tx.successful_tx_num, + } + } +} + #[derive(Debug, Clone)] pub struct TxIndex { pub tx_sequence_number: u64, diff --git a/crates/sui-json-rpc-types/src/balance_changes.rs b/crates/sui-json-rpc-types/src/balance_changes.rs index 83b659f75b0b5..0368ba0170107 100644 --- a/crates/sui-json-rpc-types/src/balance_changes.rs +++ b/crates/sui-json-rpc-types/src/balance_changes.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; use serde_with::serde_as; use serde_with::DisplayFromStr; use std::fmt::{Display, Formatter, Result}; +use sui_types::base_types::ObjectID; use sui_types::object::Owner; use sui_types::sui_serde::SuiTypeTag; @@ -34,3 +35,78 @@ impl Display for BalanceChange { ) } } + +#[serde_as] +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct BalanceChangeWithStatus { + /// Owner of the balance change + pub owner: Owner, + #[schemars(with = "String")] + #[serde_as(as = "SuiTypeTag")] + pub coin_type: TypeTag, + /// The amount indicate the balance value changes, + /// negative amount means spending coin value and positive means receiving coin value. + #[schemars(with = "String")] + #[serde_as(as = "DisplayFromStr")] + pub amount: i128, + /// Our additional fields + pub object_id: String, + pub status: ObjectStatus, +} + +impl From for BalanceChange { + fn from(balance_change: BalanceChangeWithStatus) -> Self { + BalanceChange { + owner: balance_change.owner, + coin_type: balance_change.coin_type, + amount: balance_change.amount, + } + } +} + +impl Display for BalanceChangeWithStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> Result { + write!( + f, + " ┌──\n │ Owner: {} \n │ CoinType: {} \n │ Amount: {}\n └──", + self.owner, self.coin_type, self.amount + ) + } +} + +#[derive(Debug, Clone, Deserialize, JsonSchema, Serialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub enum ObjectStatus { + Created, + Mutated, + Deleted, +} + +#[serde_as] +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct CustomBalanceChange { + /// Owner of the balance change + pub owner: Owner, + #[schemars(with = "String")] + #[serde_as(as = "SuiTypeTag")] + pub coin_type: TypeTag, + /// The amount indicate the balance value changes, + /// negative amount means spending coin value and positive means receiving coin value. + #[schemars(with = "String")] + #[serde_as(as = "DisplayFromStr")] + pub amount: i128, + /// Our additional fields + pub object_id: ObjectID, +} + +impl Display for CustomBalanceChange { + fn fmt(&self, f: &mut Formatter<'_>) -> Result { + write!( + f, + " ┌──\n │ Owner: {} \n │ CoinType: {} \n | ObjectId: {} \n │ Amount: {}\n └──", + self.owner, self.coin_type, self.object_id, self.amount + ) + } +} diff --git a/crates/sui-json-rpc/src/balance_changes.rs b/crates/sui-json-rpc/src/balance_changes.rs index 91b4b3011edaf..c7abc73a6f40b 100644 --- a/crates/sui-json-rpc/src/balance_changes.rs +++ b/crates/sui-json-rpc/src/balance_changes.rs @@ -8,7 +8,9 @@ use async_trait::async_trait; use move_core_types::language_storage::TypeTag; use tokio::sync::RwLock; -use sui_json_rpc_types::BalanceChange; +use sui_json_rpc_types::{ + BalanceChange, BalanceChangeWithStatus, CustomBalanceChange, ObjectStatus, +}; use sui_types::base_types::{ObjectID, ObjectRef, SequenceNumber}; use sui_types::coin::Coin; use sui_types::digests::ObjectDigest; @@ -152,6 +154,186 @@ async fn fetch_coins, E>( Ok(all_mutated_coins) } +////////////////////////////////////////////////////// Custom Methods ////////////////////////////////////////////////////// +// Out own version as the function is called via different methods in the original code which i don't want to fix +pub async fn get_balance_changes_with_status_from_effect, E>( + object_provider: &P, + effects: &TransactionEffects, + input_objs: Vec, + mocked_coin: Option, + status_map: HashMap, + input_objects_to_owner: &HashMap, + output_objects_to_owner: &HashMap, +) -> Result, E> { + let ((object_id, _, _), gas_owner) = effects.gas_object(); + + // Only charge gas when tx fails, skip all object parsing + if effects.status() != &ExecutionStatus::Success { + return Ok(vec![BalanceChangeWithStatus { + owner: gas_owner, + coin_type: GAS::type_tag(), + amount: effects.gas_cost_summary().net_gas_usage().neg() as i128, + object_id: object_id.to_canonical_string(true), + status: ObjectStatus::Mutated, + }]); + } + + let all_mutated = effects + .all_changed_objects() + .into_iter() + .filter_map(|((id, version, digest), _, _)| { + if matches!(mocked_coin, Some(coin) if id == coin) { + return None; + } + Some((id, version, Some(digest))) + }) + .collect::>(); + + let input_objs_to_digest = input_objs + .iter() + .filter_map(|k| match k { + InputObjectKind::ImmOrOwnedMoveObject(o) => Some((o.0, o.2)), + InputObjectKind::MovePackage(_) | InputObjectKind::SharedMoveObject { .. } => None, + }) + .collect::>(); + let unwrapped_then_deleted = effects + .unwrapped_then_deleted() + .iter() + .map(|e| e.0) + .collect::>(); + let all_balance_changes = custom_get_balance_changes( + object_provider, + &effects + .modified_at_versions() + .into_iter() + .filter_map(|(id, version)| { + if matches!(mocked_coin, Some(coin) if id == coin) { + return None; + } + // We won't be able to get dynamic object from object provider today + if unwrapped_then_deleted.contains(&id) { + return None; + } + Some((id, version, input_objs_to_digest.get(&id).cloned())) + }) + .collect::>(), + &all_mutated, + ) + .await?; + + // merge duplicated balance changes on same object + let mut all_balance_changes_with_status = vec![]; + all_balance_changes.into_iter().for_each(|bc| { + let old_owner = input_objects_to_owner.get(&bc.object_id); + // Check if input owner changed + if old_owner.is_some() && old_owner != Some(&bc.owner) { + all_balance_changes_with_status.push(BalanceChangeWithStatus { + object_id: bc.object_id.to_canonical_string(true), + owner: bc.owner, + coin_type: bc.coin_type, + amount: bc.amount, + status: ObjectStatus::Created, + }); + return; + } + let old_owner = output_objects_to_owner.get(&bc.object_id); + // Check if output owner changed + if old_owner.is_some() && old_owner != Some(&bc.owner) { + all_balance_changes_with_status.push(BalanceChangeWithStatus { + object_id: bc.object_id.to_canonical_string(true), + owner: bc.owner, + coin_type: bc.coin_type, + amount: bc.amount, + status: ObjectStatus::Deleted, + }); + return; + } + all_balance_changes_with_status.push(BalanceChangeWithStatus { + object_id: bc.object_id.to_canonical_string(true), + owner: bc.owner, + coin_type: bc.coin_type, + amount: bc.amount, + status: status_map + .get(&bc.object_id) + .cloned() + .expect("Use of object not in transaction effects"), + }); + }); + + return Ok(all_balance_changes_with_status); +} + +pub async fn custom_get_balance_changes, E>( + object_provider: &P, + modified_at_version: &[(ObjectID, SequenceNumber, Option)], + all_mutated: &[(ObjectID, SequenceNumber, Option)], +) -> Result, E> { + // 1. subtract all input coins + let balances = custom_fetch_coins(object_provider, modified_at_version) + .await? + .into_iter() + .fold( + BTreeMap::<_, i128>::new(), + |mut acc, (owner, type_, object_id, amount)| { + *acc.entry((owner, type_, object_id)).or_default() -= amount as i128; + acc + }, + ); + // 2. add all mutated coins + let balances = custom_fetch_coins(object_provider, all_mutated) + .await? + .into_iter() + .fold(balances, |mut acc, (owner, type_, object_id, amount)| { + *acc.entry((owner, type_, object_id)).or_default() += amount as i128; + acc + }); + + Ok(balances + .into_iter() + .filter_map(|((owner, coin_type, object_id), amount)| { + Some(CustomBalanceChange { + object_id, + owner, + coin_type, + amount, + }) + }) + .collect()) +} + +async fn custom_fetch_coins, E>( + object_provider: &P, + objects: &[(ObjectID, SequenceNumber, Option)], +) -> Result, E> { + let mut all_mutated_coins = vec![]; + for (id, version, digest_opt) in objects { + // TODO: use multi get object + let o = object_provider.get_object(id, version).await?; + if let Some(type_) = o.type_() { + if type_.is_coin() { + if let Some(digest) = digest_opt { + assert_eq!( + *digest, + o.digest(), + "Object digest mismatch--got bad data from object_provider?" + ) + } + let [coin_type]: [TypeTag; 1] = + type_.clone().into_type_params().try_into().unwrap(); + all_mutated_coins.push(( + o.owner, + coin_type, + o.id(), + // // we know this is a coin, safe to unwrap + // HW // NB: THIS IS FUCKING MYSTEN LABS CODE IF THIS CRASHES FUCK EM + Coin::extract_balance_if_coin(&o).unwrap().unwrap(), + )) + } + } + } + Ok(all_mutated_coins) +} + #[async_trait] pub trait ObjectProvider { type Error; diff --git a/crates/sui-types/src/nats_queue.rs b/crates/sui-types/src/nats_queue.rs index baff629cf7978..4854c08387c5f 100644 --- a/crates/sui-types/src/nats_queue.rs +++ b/crates/sui-types/src/nats_queue.rs @@ -10,16 +10,18 @@ use tokio::sync::{ }; use tracing::info; +pub type WsPayload = (u64, Vec); + pub struct NatsQueueSender { pub init_checkpoint: u64, - pub sender: Arc>, - pub receiver: Arc>>, + pub sender: Arc>, + pub receiver: Arc>>, odin: Arc, } pub fn nats_queue(odin: Arc) -> NatsQueueSender { // Create sender and receiver - let (tx, rx) = channel::<(u64, SuiWsApiMsg)>(10_000); // 10k + let (tx, rx) = channel::(10_000); // 10k NatsQueueSender { init_checkpoint: u64::MAX, @@ -46,7 +48,7 @@ impl NatsQueueSender { //Cache if we get a message with a block number that is not in order let mut bmap_checkpoints: BTreeMap> = BTreeMap::new(); - while let Some((checkpoint_seq_number, ws_update)) = receiver_lock.recv().await { + while let Some((checkpoint_seq_number, ws_updates)) = receiver_lock.recv().await { // Check if we have not received any message yet if next_index == u64::MAX { next_index = checkpoint_seq_number; @@ -54,13 +56,25 @@ impl NatsQueueSender { // Check if correct order if checkpoint_seq_number == next_index { // Send message - odin.publish_sui_ws_update(&ws_update).await; + + info!( + "Sending: {} ws updates with seq number {}", + ws_updates.len(), + checkpoint_seq_number + ); + for ws_update in ws_updates.iter() { + odin.publish_sui_ws_update(&ws_update).await; + } // Update next index next_index = next_index + 1; // Check if we have any cached messages while let Some(next_checkpoint) = bmap_checkpoints.remove(&next_index) { - info!("Sending cached message with seq number {}", next_index); + info!( + "Sending: {} cached ws updates with seq number {}", + next_checkpoint.len(), + next_index + ); for ws_update in next_checkpoint.iter() { odin.publish_sui_ws_update(&ws_update).await; @@ -78,7 +92,7 @@ impl NatsQueueSender { bmap_checkpoints .entry(checkpoint_seq_number) .or_insert(vec![]) - .push(ws_update); + .extend(ws_updates); } } });