From d1b76cfba88769125ebe5b7c0513568451c82c70 Mon Sep 17 00:00:00 2001 From: Renee Tso <8248583+rtso@users.noreply.github.com> Date: Wed, 14 Aug 2024 13:00:00 -0700 Subject: [PATCH] [EI-444] Migrate events processor to sdk (#470) * setup * import events processor * make it work * comments * use db config enum * comments * make it work with ending_version * cargo * dockerfile * comments * lint * bump sdk * chain id check * logging * bump version that fixes pollable async step * custom channel size * bump sdk --- rust/Cargo.lock | 902 ++++++++++- rust/Cargo.toml | 19 +- rust/Dockerfile | 3 + rust/processor/Cargo.toml | 1 + rust/processor/src/lib.rs | 2 +- rust/sdk-processor/Cargo.toml | 49 + rust/sdk-processor/src/config/db_config.rs | 50 + .../src/config/indexer_processor_config.rs | 40 + rust/sdk-processor/src/config/mod.rs | 3 + .../src/config/processor_config.rs | 75 + rust/sdk-processor/src/db/common/mod.rs | 1 + .../db/common/models/events_models/events.rs | 79 + .../src/db/common/models/events_models/mod.rs | 4 + .../sdk-processor/src/db/common/models/mod.rs | 2 + .../src/db/common/models/processor_status.rs | 41 + rust/sdk-processor/src/db/mod.rs | 1 + rust/sdk-processor/src/lib.rs | 5 + rust/sdk-processor/src/main.rs | 28 + .../src/processors/events_processor.rs | 143 ++ rust/sdk-processor/src/processors/mod.rs | 1 + rust/sdk-processor/src/schema.rs | 1328 +++++++++++++++++ .../latest_processed_version_tracker.rs | 186 +++ rust/sdk-processor/src/steps/common/mod.rs | 1 + .../events_processor/events_extractor.rs | 86 ++ .../steps/events_processor/events_storer.rs | 108 ++ .../src/steps/events_processor/mod.rs | 5 + rust/sdk-processor/src/steps/mod.rs | 2 + rust/sdk-processor/src/utils/chain_id.rs | 42 + rust/sdk-processor/src/utils/database.rs | 329 ++++ rust/sdk-processor/src/utils/mod.rs | 3 + .../src/utils/starting_version.rs | 52 + rust/server-framework/src/lib.rs | 4 + 32 files changed, 3519 insertions(+), 76 deletions(-) create mode 100644 rust/sdk-processor/Cargo.toml create mode 100644 rust/sdk-processor/src/config/db_config.rs create mode 100644 rust/sdk-processor/src/config/indexer_processor_config.rs create mode 100644 rust/sdk-processor/src/config/mod.rs create mode 100644 rust/sdk-processor/src/config/processor_config.rs create mode 100644 rust/sdk-processor/src/db/common/mod.rs create mode 100644 rust/sdk-processor/src/db/common/models/events_models/events.rs create mode 100644 rust/sdk-processor/src/db/common/models/events_models/mod.rs create mode 100644 rust/sdk-processor/src/db/common/models/mod.rs create mode 100644 rust/sdk-processor/src/db/common/models/processor_status.rs create mode 100644 rust/sdk-processor/src/db/mod.rs create mode 100644 rust/sdk-processor/src/lib.rs create mode 100644 rust/sdk-processor/src/main.rs create mode 100644 rust/sdk-processor/src/processors/events_processor.rs create mode 100644 rust/sdk-processor/src/processors/mod.rs create mode 100644 rust/sdk-processor/src/schema.rs create mode 100644 rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs create mode 100644 rust/sdk-processor/src/steps/common/mod.rs create mode 100644 rust/sdk-processor/src/steps/events_processor/events_extractor.rs create mode 100644 rust/sdk-processor/src/steps/events_processor/events_storer.rs create mode 100644 rust/sdk-processor/src/steps/events_processor/mod.rs create mode 100644 rust/sdk-processor/src/steps/mod.rs create mode 100644 rust/sdk-processor/src/utils/chain_id.rs create mode 100644 rust/sdk-processor/src/utils/database.rs create mode 100644 rust/sdk-processor/src/utils/mod.rs create mode 100644 rust/sdk-processor/src/utils/starting_version.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 0b6b51ac1..1963f8d2f 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -134,13 +134,97 @@ checksum = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5" [[package]] name = "anyhow" -version = "1.0.71" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" +checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" + +[[package]] +name = "aptos-indexer-processor-sdk" +version = "0.1.0" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7#e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7" +dependencies = [ + "anyhow", + "aptos-indexer-transaction-stream", + "aptos-protos", + "async-trait", + "bcs", + "bigdecimal", + "chrono", + "derive_builder", + "futures", + "hex", + "instrumented-channel", + "kanal", + "mockall", + "num_cpus", + "once_cell", + "petgraph", + "prometheus", + "prometheus-client", + "serde", + "serde_json", + "thiserror", + "tiny-keccak", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "aptos-indexer-processor-sdk-server-framework" +version = "1.0.0" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7#e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7" +dependencies = [ + "anyhow", + "aptos-indexer-processor-sdk", + "aptos-system-utils", + "async-trait", + "autometrics", + "axum 0.7.5", + "backtrace", + "clap", + "instrumented-channel", + "prometheus-client", + "serde", + "serde_yaml", + "tempfile", + "tokio", + "toml", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "aptos-indexer-transaction-stream" +version = "0.1.0" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7#e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7" +dependencies = [ + "anyhow", + "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7)", + "aptos-protos", + "chrono", + "futures-util", + "once_cell", + "prometheus", + "prost 0.12.3", + "serde", + "tokio", + "tonic 0.11.0", + "tracing", + "url", +] + +[[package]] +name = "aptos-moving-average" +version = "0.1.0" +dependencies = [ + "chrono", +] [[package]] name = "aptos-moving-average" version = "0.1.0" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7#e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7" dependencies = [ "chrono", ] @@ -178,8 +262,8 @@ dependencies = [ "anyhow", "aptos-profiler", "async-mutex", - "http", - "hyper", + "http 0.2.9", + "hyper 0.14.27", "lazy_static", "mime", "pprof", @@ -240,9 +324,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.71" +version = "0.1.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a564d521dd56509c4c47480d00b80ee55f7e385ae48db5744c67ad50c92d2ebf" +checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", @@ -255,6 +339,39 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "autometrics" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10eaae539e7319a3813dc8cd53776a7128bdd6d82067275c12586f5a0fce9137" +dependencies = [ + "autometrics-macros", + "cfg_aliases", + "http 1.1.0", + "linkme", + "metrics-exporter-prometheus", + "once_cell", + "opentelemetry-prometheus", + "opentelemetry_sdk", + "prometheus", + "prometheus-client", + "spez", + "thiserror", +] + +[[package]] +name = "autometrics-macros" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdf7c9ebfee6425011c65788c746adf80fac99ba38957ba1cdb824b593cfc993" +dependencies = [ + "percent-encoding", + "proc-macro2", + "quote", + "regex", + "syn 2.0.48", +] + [[package]] name = "axum" version = "0.6.18" @@ -262,13 +379,42 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8175979259124331c1d7bf6586ee7e0da434155e4b2d48ec2c8386281d8df39" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes", "futures-util", - "http", - "http-body", - "hyper", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.27", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 0.1.2", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +dependencies = [ + "async-trait", + "axum-core 0.4.3", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-util", "itoa", "matchit", "memchr", @@ -277,10 +423,15 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", - "sync_wrapper", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "tokio", "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -292,12 +443,33 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 0.2.9", + "http-body 0.4.5", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", "mime", + "pin-project-lite", "rustversion", + "sync_wrapper 0.1.2", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -474,6 +646,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "chrono" version = "0.4.38" @@ -509,7 +687,7 @@ dependencies = [ "anstream", "anstyle", "clap_lex", - "strsim", + "strsim 0.10.0", ] [[package]] @@ -642,6 +820,25 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.19" @@ -684,6 +881,41 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "darling" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.11.1", + "syn 2.0.48", +] + +[[package]] +name = "darling_macro" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.48", +] + [[package]] name = "debugid" version = "0.8.0" @@ -693,6 +925,17 @@ dependencies = [ "uuid", ] +[[package]] +name = "delegate" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e018fccbeeb50ff26562ece792ed06659b9c2dae79ece77c4456bb10d9bf79b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "der" version = "0.5.1" @@ -714,6 +957,37 @@ dependencies = [ "serde", ] +[[package]] +name = "derive_builder" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0350b5cb0331628a5916d6c5c0b72e97393b8b6b03b47a9284f4e7f5a405ffd7" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d48cda787f839151732d396ac69e3473923d54312c070ee21e9effcaa8ca0b1d" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "derive_builder_macro" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206868b8242f27cecce124c19fd88157fbd0dd334df2587f36417bafbc85097b" +dependencies = [ + "derive_builder_core", + "syn 2.0.48", +] + [[package]] name = "diesel" version = "2.1.4" @@ -799,6 +1073,18 @@ dependencies = [ "subtle", ] +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + +[[package]] +name = "dtoa" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbb2bf8e87535c23f7a8a321e364ce21462d0ff10cb6407820e8e96dfff6653" + [[package]] name = "dw" version = "0.2.0" @@ -932,6 +1218,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.0.26" @@ -992,13 +1284,19 @@ checksum = "aa9a19cbb55df58761df49b23516a86d432839add4af60fc256da840f66ed35b" [[package]] name = "form_urlencoded" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "futures" version = "0.3.30" @@ -1115,6 +1413,12 @@ version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "google-cloud-auth" version = "0.12.0" @@ -1144,7 +1448,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8bdaaa4bc036e8318274d1b25f0f2265b3e95418b765fd1ea1c7ef938fd69bd" dependencies = [ "google-cloud-token", - "http", + "http 0.2.9", "thiserror", "tokio", "tokio-retry", @@ -1244,7 +1548,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.9", "indexmap 2.0.0", "slab", "tokio", @@ -1271,9 +1575,18 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.14.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" +dependencies = [ + "ahash", +] + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" [[package]] name = "headers" @@ -1285,7 +1598,7 @@ dependencies = [ "bitflags 1.3.2", "bytes", "headers-core", - "http", + "http 0.2.9", "httpdate", "mime", "sha1", @@ -1297,7 +1610,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" dependencies = [ - "http", + "http 0.2.9", ] [[package]] @@ -1347,6 +1660,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.5" @@ -1354,7 +1678,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http", + "http 0.2.9", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -1381,8 +1728,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.9", + "http-body 0.4.5", "httparse", "httpdate", "itoa", @@ -1394,13 +1741,32 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", +] + [[package]] name = "hyper-timeout" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.27", "pin-project-lite", "tokio", "tokio-io-timeout", @@ -1413,12 +1779,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.27", "native-tls", "tokio", "tokio-native-tls", ] +[[package]] +name = "hyper-util" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "hyper 1.4.1", + "pin-project-lite", + "tokio", +] + [[package]] name = "iana-time-zone" version = "0.1.57" @@ -1442,6 +1823,12 @@ dependencies = [ "cc", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.2.3" @@ -1465,9 +1852,9 @@ dependencies = [ [[package]] name = "idna" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ "unicode-bidi", "unicode-normalization", @@ -1509,7 +1896,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" dependencies = [ "equivalent", - "hashbrown 0.14.0", + "hashbrown 0.14.5", ] [[package]] @@ -1539,6 +1926,19 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "instrumented-channel" +version = "0.1.0" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7#e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7" +dependencies = [ + "delegate", + "derive_builder", + "kanal", + "once_cell", + "prometheus", + "prometheus-client", +] + [[package]] name = "integer-encoding" version = "3.0.4" @@ -1695,6 +2095,26 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "linkme" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccb76662d78edc9f9bf56360d6919bdacc8b7761227727e5082f128eeb90bbf5" +dependencies = [ + "linkme-impl", +] + +[[package]] +name = "linkme-impl" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dccda732e04fa3baf2e17cf835bfe2601c7c2edafd64417c627dabae3a8cda" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "linux-raw-sys" version = "0.3.8" @@ -1732,6 +2152,15 @@ dependencies = [ "twox-hash", ] +[[package]] +name = "mach2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b955cdeb2a02b9117f121ce63aa52d08ade45de53e48fe6a38b39c10f6f709" +dependencies = [ + "libc", +] + [[package]] name = "matchers" version = "0.1.0" @@ -1778,6 +2207,57 @@ dependencies = [ "libc", ] +[[package]] +name = "metrics" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" +dependencies = [ + "ahash", + "metrics-macros", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d4fa7ce7c4862db464a37b0b31d89bca874562f034bd7993895572783d02950" +dependencies = [ + "base64 0.21.2", + "indexmap 1.9.3", + "metrics", + "metrics-util", + "quanta", + "thiserror", +] + +[[package]] +name = "metrics-macros" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b4faf00617defe497754acde3024865bc143d44a86799b24e191ecff91354f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "metrics-util" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.13.1", + "metrics", + "num_cpus", + "quanta", + "sketches-ddsketch", +] + [[package]] name = "migrations_internals" version = "2.1.0" @@ -1835,6 +2315,33 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mockall" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43766c2b5203b10de348ffe19f7e54564b64f3d6018ff7648d1e2d6d3a0f0a48" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af7cbce79ec385a1d4f54baa90a76401eb15d9cab93685f62e7e9f942aa00ae2" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "multer" version = "2.1.0" @@ -1844,7 +2351,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-util", - "http", + "http 0.2.9", "httparse", "log", "memchr", @@ -2020,9 +2527,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "opaque-debug" @@ -2074,6 +2581,52 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" +dependencies = [ + "futures-core", + "futures-sink", + "indexmap 2.0.0", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + +[[package]] +name = "opentelemetry-prometheus" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8f082da115b0dcb250829e3ed0b8792b8f963a1ad42466e48422fbe6a079bd" +dependencies = [ + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "prometheus", + "protobuf", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f16aec8a98a457a52664d69e0091bac3a0abd18ead9b641cb00202ba4e0efe4" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "ordered-float 4.2.1", + "thiserror", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -2083,6 +2636,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "4.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19ff2cf528c6c03d9ed653d6c4ce1dc0582dc4af309790ad92f07c1cd551b0be" +dependencies = [ + "num-traits", +] + [[package]] name = "overload" version = "0.1.1" @@ -2123,7 +2685,7 @@ dependencies = [ "chrono", "futures", "half", - "hashbrown 0.14.0", + "hashbrown 0.14.5", "lz4_flex", "num", "num-bigint", @@ -2182,9 +2744,19 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.3.0" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + +[[package]] +name = "petgraph" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.0.0", +] [[package]] name = "phf" @@ -2264,6 +2836,12 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "portable-atomic" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" + [[package]] name = "postgres-native-tls" version = "0.5.0" @@ -2350,6 +2928,32 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "predicates" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68b87bfd4605926cdfefc1c3b5f8fe560e3feca9d5552cf68c466d3d8236c7e8" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174" + +[[package]] +name = "predicates-tree" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "proc-macro2" version = "1.0.76" @@ -2367,7 +2971,8 @@ dependencies = [ "allocative", "allocative_derive", "anyhow", - "aptos-moving-average", + "aptos-indexer-processor-sdk", + "aptos-moving-average 0.1.0", "aptos-protos", "async-trait", "bcs", @@ -2387,7 +2992,7 @@ dependencies = [ "google-cloud-pubsub", "google-cloud-storage", "hex", - "hyper", + "hyper 0.14.27", "itertools 0.12.1", "jemallocator", "kanal", @@ -2419,18 +3024,42 @@ dependencies = [ [[package]] name = "prometheus" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" dependencies = [ "cfg-if", "fnv", "lazy_static", "memchr", "parking_lot", + "protobuf", "thiserror", ] +[[package]] +name = "prometheus-client" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504ee9ff529add891127c4827eb481bd69dc0ebc72e9a682e187db4caa60c3ca" +dependencies = [ + "dtoa", + "itoa", + "parking_lot", + "prometheus-client-derive-encode", +] + +[[package]] +name = "prometheus-client-derive-encode" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "prost" version = "0.11.9" @@ -2527,6 +3156,22 @@ dependencies = [ "psl-types", ] +[[package]] +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" +dependencies = [ + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.26.0" @@ -2575,6 +3220,35 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags 1.3.2", +] + +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.3.5" @@ -2642,9 +3316,9 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", - "hyper", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.27", "hyper-tls", "ipnet", "js-sys", @@ -2950,6 +3624,45 @@ dependencies = [ "untrusted 0.7.1", ] +[[package]] +name = "sdk-processor" +version = "0.1.0" +dependencies = [ + "ahash", + "anyhow", + "aptos-indexer-processor-sdk", + "aptos-indexer-processor-sdk-server-framework", + "async-trait", + "bcs", + "bigdecimal", + "chrono", + "clap", + "diesel", + "diesel-async", + "diesel_migrations", + "field_count", + "futures", + "futures-util", + "hex", + "jemallocator", + "kanal", + "lazy_static", + "native-tls", + "num_cpus", + "postgres-native-tls", + "processor", + "rayon", + "serde", + "serde_json", + "sha2 0.9.9", + "strum", + "tiny-keccak", + "tokio", + "tokio-postgres", + "tracing", + "url", +] + [[package]] name = "security-framework" version = "2.9.1" @@ -2981,18 +3694,18 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.193" +version = "1.0.204" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" +checksum = "bc76f558e0cbb2a839d37354c575f1dc3fdc6546b5be373ba43d95f231bf7c12" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.193" +version = "1.0.204" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" +checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" dependencies = [ "proc-macro2", "quote", @@ -3011,6 +3724,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_spanned" version = "0.6.3" @@ -3147,6 +3870,12 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "sketches-ddsketch" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c" + [[package]] name = "slab" version = "0.4.8" @@ -3158,9 +3887,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.0" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "socket2" @@ -3182,6 +3911,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "spez" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c87e960f4dca2788eeb86bbdde8dd246be8948790b7618d656e68f9b720a86e8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "spin" version = "0.5.2" @@ -3239,6 +3979,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "strum" version = "0.24.1" @@ -3318,6 +4064,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "tempfile" version = "3.6.0" @@ -3332,20 +4084,26 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "thiserror" -version = "1.0.56" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", @@ -3370,7 +4128,7 @@ checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" dependencies = [ "byteorder", "integer-encoding", - "ordered-float", + "ordered-float 2.10.1", ] [[package]] @@ -3430,9 +4188,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.35.1" +version = "1.38.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" +checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df" dependencies = [ "backtrace", "bytes", @@ -3459,9 +4217,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", @@ -3626,16 +4384,16 @@ checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.18", "base64 0.21.2", "bytes", "flate2", "futures-core", "futures-util", "h2", - "http", - "http-body", - "hyper", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.27", "hyper-timeout", "percent-encoding", "pin-project", @@ -3659,14 +4417,14 @@ checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.18", "base64 0.21.2", "bytes", "flate2", "h2", - "http", - "http-body", - "hyper", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.27", "hyper-timeout", "percent-encoding", "pin-project", @@ -3807,7 +4565,7 @@ dependencies = [ "base64 0.13.1", "byteorder", "bytes", - "http", + "http 0.2.9", "httparse", "log", "rand", @@ -3883,12 +4641,12 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.4.0" +version = "2.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb" +checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" dependencies = [ "form_urlencoded", - "idna 0.4.0", + "idna 0.5.0", "percent-encoding", "serde", ] @@ -3954,8 +4712,8 @@ dependencies = [ "futures-channel", "futures-util", "headers", - "http", - "hyper", + "http 0.2.9", + "hyper 0.14.27", "log", "mime", "mime_guess", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 12e7584cf..56e75b0b0 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -1,7 +1,13 @@ [workspace] resolver = "2" -members = ["indexer-metrics", "moving-average", "processor", "server-framework"] +members = [ + "indexer-metrics", + "moving-average", + "processor", + "sdk-processor", + "server-framework", +] [workspace.package] authors = ["Aptos Labs "] @@ -16,9 +22,12 @@ rust-version = "1.75" processor = { path = "processor" } server-framework = { path = "server-framework" } aptos-moving-average = { path = "moving-average" } +sdk-processor = { path = "sdk-processor" } ahash = { version = "0.8.7", features = ["serde"] } -anyhow = "1.0.62" +anyhow = "1.0.86" +aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7" } +aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7" } aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb" } aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "4541add3fd29826ec57f22658ca286d2d6134b93" } async-trait = "0.1.53" @@ -73,6 +82,7 @@ pbjson = "0.5.1" prometheus = { version = "0.13.0", default-features = false } prost = { version = "0.12.3", features = ["no-recursion-limit"] } prost-types = "0.12.3" +rayon = "1.10.0" regex = "1.5.5" reqwest = { version = "0.11.20", features = [ "blocking", @@ -111,7 +121,10 @@ postgres-native-tls = "0.5.0" tokio-postgres = "0.7.10" # Parquet support -parquet = { version = "52.0.0", default-features = false, features = ["async", "lz4"] } +parquet = { version = "52.0.0", default-features = false, features = [ + "async", + "lz4", +] } num = "0.4.0" google-cloud-storage = "0.13.0" hyper = { version = "0.14.18", features = ["full"] } diff --git a/rust/Dockerfile b/rust/Dockerfile index fbf528cdd..c08a3890c 100644 --- a/rust/Dockerfile +++ b/rust/Dockerfile @@ -14,6 +14,8 @@ RUN cargo build --locked --release -p processor RUN cp target/release/processor /usr/local/bin RUN cargo build --locked --release -p indexer-metrics RUN cp target/release/indexer-metrics /usr/local/bin +RUN cargo build --locked --release -p sdk-processor +RUN cp target/release/sdk-processor /usr/local/bin # add build info ARG GIT_TAG @@ -29,6 +31,7 @@ FROM debian:bullseye-slim COPY --from=builder /usr/local/bin/processor /usr/local/bin COPY --from=builder /usr/local/bin/indexer-metrics /usr/local/bin +COPY --from=builder /usr/local/bin/sdk-processor /usr/local/bin RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \ --mount=type=cache,target=/var/lib/apt,sharing=locked \ diff --git a/rust/processor/Cargo.toml b/rust/processor/Cargo.toml index 1fbb57cbd..4e748c8f7 100644 --- a/rust/processor/Cargo.toml +++ b/rust/processor/Cargo.toml @@ -17,6 +17,7 @@ ahash = { workspace = true } allocative = { workspace = true } allocative_derive = { workspace = true } anyhow = { workspace = true } +aptos-indexer-processor-sdk = { workspace = true } aptos-moving-average = { workspace = true } aptos-protos = { workspace = true } async-trait = { workspace = true } diff --git a/rust/processor/src/lib.rs b/rust/processor/src/lib.rs index 8dcb43fa3..8b4c4d201 100644 --- a/rust/processor/src/lib.rs +++ b/rust/processor/src/lib.rs @@ -20,7 +20,7 @@ pub use config::IndexerGrpcProcessorConfig; pub mod bq_analytics; mod config; -mod db; +pub mod db; pub mod gap_detectors; pub mod grpc_stream; pub mod processors; diff --git a/rust/sdk-processor/Cargo.toml b/rust/sdk-processor/Cargo.toml new file mode 100644 index 000000000..620336f3b --- /dev/null +++ b/rust/sdk-processor/Cargo.toml @@ -0,0 +1,49 @@ +[package] +name = "sdk-processor" +version = "0.1.0" +edition = "2021" +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +ahash = { workspace = true } +anyhow = { workspace = true } +aptos-indexer-processor-sdk = { workspace = true } +aptos-indexer-processor-sdk-server-framework = { workspace = true } +async-trait = { workspace = true } +bcs = { workspace = true } +bigdecimal = { workspace = true } +chrono = { workspace = true } +clap = { workspace = true } +diesel = { workspace = true } +diesel-async = { workspace = true } +diesel_migrations = { workspace = true } +field_count = { workspace = true } +futures = { workspace = true } +futures-util = { workspace = true } +hex = { workspace = true } +jemallocator = { workspace = true } +kanal = { workspace = true } +lazy_static = { workspace = true } +num_cpus = { workspace = true } +processor = { workspace = true } +rayon = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +sha2 = { workspace = true } +strum = { workspace = true } +tiny-keccak = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +url = { workspace = true } + +# Postgres SSL support +native-tls = { workspace = true } +postgres-native-tls = { workspace = true } +tokio-postgres = { workspace = true } + +[features] +libpq = ["diesel/postgres"] +# When using the default features we enable the diesel/postgres feature. We configure +# it in a feature so the CLI can opt out, since it cannot tolerate the libpq dep. +# Recall that features should always be additive. +default = ["libpq"] diff --git a/rust/sdk-processor/src/config/db_config.rs b/rust/sdk-processor/src/config/db_config.rs new file mode 100644 index 000000000..05c0b2632 --- /dev/null +++ b/rust/sdk-processor/src/config/db_config.rs @@ -0,0 +1,50 @@ +use serde::{Deserialize, Serialize}; + +/// This enum captures the configs for all the different db storages that are defined. +/// The configs for each db storage should only contain configuration specific to that +/// type. +#[derive(Clone, Debug, Deserialize, Serialize, strum::IntoStaticStr, strum::EnumDiscriminants)] +#[serde(tag = "type", rename_all = "snake_case")] +// What is all this strum stuff? Let me explain. +// +// Previously we had consts called NAME in each module and a function called `name` on +// the ProcessorTrait. As such it was possible for this name to not match the snake case +// representation of the struct name. By using strum we can have a single source for +// processor names derived from the enum variants themselves. +// +// That's what this strum_discriminants stuff is, it uses macro magic to generate the +// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this +// generation logic, e.g. to make sure we use snake_case. +#[strum(serialize_all = "snake_case")] +#[strum_discriminants( + derive( + Deserialize, + Serialize, + strum::EnumVariantNames, + strum::IntoStaticStr, + strum::Display, + clap::ValueEnum + ), + name(DbTypeName), + clap(rename_all = "snake_case"), + serde(rename_all = "snake_case"), + strum(serialize_all = "snake_case") +)] +pub enum DbConfig { + PostgresConfig(PostgresConfig), +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct PostgresConfig { + pub connection_string: String, + // Size of the pool for writes/reads to the DB. Limits maximum number of queries in flight + #[serde(default = "PostgresConfig::default_db_pool_size")] + pub db_pool_size: u32, +} + +impl PostgresConfig { + pub const fn default_db_pool_size() -> u32 { + 150 + } +} diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs new file mode 100644 index 000000000..361ec56b3 --- /dev/null +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -0,0 +1,40 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::{db_config::DbConfig, processor_config::ProcessorConfig}; +use crate::processors::events_processor::EventsProcessor; +use anyhow::Result; +use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig; +use aptos_indexer_processor_sdk_server_framework::RunnableConfig; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct IndexerProcessorConfig { + pub processor_config: ProcessorConfig, + pub transaction_stream_config: TransactionStreamConfig, + pub db_config: DbConfig, +} + +#[async_trait::async_trait] +impl RunnableConfig for IndexerProcessorConfig { + async fn run(&self) -> Result<()> { + match self.processor_config { + ProcessorConfig::EventsProcessor(_) => { + let events_processor = EventsProcessor::new(self.clone()).await?; + events_processor.run_processor().await + }, + } + } + + fn get_server_name(&self) -> String { + // Get the part before the first _ and trim to 12 characters. + let before_underscore = self + .processor_config + .name() + .split('_') + .next() + .unwrap_or("unknown"); + before_underscore[..before_underscore.len().min(12)].to_string() + } +} diff --git a/rust/sdk-processor/src/config/mod.rs b/rust/sdk-processor/src/config/mod.rs new file mode 100644 index 000000000..cbbfceeb7 --- /dev/null +++ b/rust/sdk-processor/src/config/mod.rs @@ -0,0 +1,3 @@ +pub mod db_config; +pub mod indexer_processor_config; +pub mod processor_config; diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs new file mode 100644 index 000000000..24e2be86b --- /dev/null +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -0,0 +1,75 @@ +use crate::processors::events_processor::EventsProcessorConfig; +use serde::{Deserialize, Serialize}; + +/// This enum captures the configs for all the different processors that are defined. +/// The configs for each processor should only contain configuration specific to that +/// processor. For configuration that is common to all processors, put it in +/// IndexerGrpcProcessorConfig. +#[derive(Clone, Debug, Deserialize, Serialize, strum::IntoStaticStr, strum::EnumDiscriminants)] +#[serde(tag = "type", rename_all = "snake_case")] +// What is all this strum stuff? Let me explain. +// +// Previously we had consts called NAME in each module and a function called `name` on +// the ProcessorTrait. As such it was possible for this name to not match the snake case +// representation of the struct name. By using strum we can have a single source for +// processor names derived from the enum variants themselves. +// +// That's what this strum_discriminants stuff is, it uses macro magic to generate the +// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this +// generation logic, e.g. to make sure we use snake_case. +#[strum(serialize_all = "snake_case")] +#[strum_discriminants( + derive( + Deserialize, + Serialize, + strum::EnumVariantNames, + strum::IntoStaticStr, + strum::Display, + clap::ValueEnum + ), + name(ProcessorName), + clap(rename_all = "snake_case"), + serde(rename_all = "snake_case"), + strum(serialize_all = "snake_case") +)] +pub enum ProcessorConfig { + EventsProcessor(EventsProcessorConfig), +} + +impl ProcessorConfig { + /// Get the name of the processor config as a static str. This is a convenience + /// method to access the derived functionality implemented by strum::IntoStaticStr. + pub fn name(&self) -> &'static str { + self.into() + } +} +#[derive(Debug)] +// To ensure that the variants of ProcessorConfig and Processor line up, in the testing +// build path we derive EnumDiscriminants on this enum as well and make sure the two +// sets of variants match up in `test_processor_names_complete`. +#[cfg_attr( + test, + derive(strum::EnumDiscriminants), + strum_discriminants( + derive(strum::EnumVariantNames), + name(ProcessorDiscriminants), + strum(serialize_all = "snake_case") + ) +)] +pub enum Processor { + EventsProcessor, +} + +#[cfg(test)] +mod test { + use super::*; + use strum::VariantNames; + + /// This test exists to make sure that when a new processor is added, it is added + /// to both Processor and ProcessorConfig. To make sure this passes, make sure the + /// variants are in the same order (lexicographical) and the names match. + #[test] + fn test_processor_names_complete() { + assert_eq!(ProcessorName::VARIANTS, ProcessorDiscriminants::VARIANTS); + } +} diff --git a/rust/sdk-processor/src/db/common/mod.rs b/rust/sdk-processor/src/db/common/mod.rs new file mode 100644 index 000000000..c446ac883 --- /dev/null +++ b/rust/sdk-processor/src/db/common/mod.rs @@ -0,0 +1 @@ +pub mod models; diff --git a/rust/sdk-processor/src/db/common/models/events_models/events.rs b/rust/sdk-processor/src/db/common/models/events_models/events.rs new file mode 100644 index 000000000..fa7699d50 --- /dev/null +++ b/rust/sdk-processor/src/db/common/models/events_models/events.rs @@ -0,0 +1,79 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +#![allow(clippy::extra_unused_lifetimes)] + +// Copied from processor crate. The only difference is the protos are imported from the SDK +// instead of the aptos-protos crate. +use aptos_indexer_processor_sdk::aptos_protos::transaction::v1::Event as EventPB; +use diesel::{Identifiable, Insertable}; +use field_count::FieldCount; +use processor::{ + schema::events, + utils::util::{standardize_address, truncate_str}, +}; +use serde::{Deserialize, Serialize}; + +// p99 currently is 303 so using 300 as a safe max length +const EVENT_TYPE_MAX_LENGTH: usize = 300; + +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[diesel(primary_key(transaction_version, event_index))] +#[diesel(table_name = events)] +pub struct Event { + pub sequence_number: i64, + pub creation_number: i64, + pub account_address: String, + pub transaction_version: i64, + pub transaction_block_height: i64, + pub type_: String, + pub data: serde_json::Value, + pub event_index: i64, + pub indexed_type: String, +} + +impl Event { + pub fn from_event( + event: &EventPB, + transaction_version: i64, + transaction_block_height: i64, + event_index: i64, + ) -> Self { + let t: &str = event.type_str.as_ref(); + Event { + account_address: standardize_address( + event.key.as_ref().unwrap().account_address.as_str(), + ), + creation_number: event.key.as_ref().unwrap().creation_number as i64, + sequence_number: event.sequence_number as i64, + transaction_version, + transaction_block_height, + type_: t.to_string(), + data: serde_json::from_str(event.data.as_str()).unwrap(), + event_index, + indexed_type: truncate_str(t, EVENT_TYPE_MAX_LENGTH), + } + } + + pub fn from_events( + events: &[EventPB], + transaction_version: i64, + transaction_block_height: i64, + ) -> Vec { + events + .iter() + .enumerate() + .map(|(index, event)| { + Self::from_event( + event, + transaction_version, + transaction_block_height, + index as i64, + ) + }) + .collect::>() + } +} + +// Prevent conflicts with other things named `Event` +pub type EventModel = Event; diff --git a/rust/sdk-processor/src/db/common/models/events_models/mod.rs b/rust/sdk-processor/src/db/common/models/events_models/mod.rs new file mode 100644 index 000000000..9d363699e --- /dev/null +++ b/rust/sdk-processor/src/db/common/models/events_models/mod.rs @@ -0,0 +1,4 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod events; diff --git a/rust/sdk-processor/src/db/common/models/mod.rs b/rust/sdk-processor/src/db/common/models/mod.rs new file mode 100644 index 000000000..0ed33bb3c --- /dev/null +++ b/rust/sdk-processor/src/db/common/models/mod.rs @@ -0,0 +1,2 @@ +pub mod events_models; +pub mod processor_status; diff --git a/rust/sdk-processor/src/db/common/models/processor_status.rs b/rust/sdk-processor/src/db/common/models/processor_status.rs new file mode 100644 index 000000000..e9332a222 --- /dev/null +++ b/rust/sdk-processor/src/db/common/models/processor_status.rs @@ -0,0 +1,41 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +#![allow(clippy::extra_unused_lifetimes)] + +use crate::utils::database::DbPoolConnection; +use diesel::{AsChangeset, ExpressionMethods, Insertable, OptionalExtension, QueryDsl, Queryable}; +use diesel_async::RunQueryDsl; +use processor::schema::processor_status; + +#[derive(AsChangeset, Debug, Insertable)] +#[diesel(table_name = processor_status)] +/// Only tracking the latest version successfully processed +pub struct ProcessorStatus { + pub processor: String, + pub last_success_version: i64, + pub last_transaction_timestamp: Option, +} + +#[derive(AsChangeset, Debug, Queryable)] +#[diesel(table_name = processor_status)] +/// Only tracking the latest version successfully processed +pub struct ProcessorStatusQuery { + pub processor: String, + pub last_success_version: i64, + pub last_updated: chrono::NaiveDateTime, + pub last_transaction_timestamp: Option, +} + +impl ProcessorStatusQuery { + pub async fn get_by_processor( + processor_name: &str, + conn: &mut DbPoolConnection<'_>, + ) -> diesel::QueryResult> { + processor_status::table + .filter(processor_status::processor.eq(processor_name)) + .first::(conn) + .await + .optional() + } +} diff --git a/rust/sdk-processor/src/db/mod.rs b/rust/sdk-processor/src/db/mod.rs new file mode 100644 index 000000000..34994bf5a --- /dev/null +++ b/rust/sdk-processor/src/db/mod.rs @@ -0,0 +1 @@ +pub mod common; diff --git a/rust/sdk-processor/src/lib.rs b/rust/sdk-processor/src/lib.rs new file mode 100644 index 000000000..c614c73ae --- /dev/null +++ b/rust/sdk-processor/src/lib.rs @@ -0,0 +1,5 @@ +pub mod config; +mod db; +pub mod processors; +pub mod steps; +pub mod utils; diff --git a/rust/sdk-processor/src/main.rs b/rust/sdk-processor/src/main.rs new file mode 100644 index 000000000..4825b8858 --- /dev/null +++ b/rust/sdk-processor/src/main.rs @@ -0,0 +1,28 @@ +use anyhow::Result; +use aptos_indexer_processor_sdk_server_framework::ServerArgs; +use clap::Parser; +use sdk_processor::config::indexer_processor_config::IndexerProcessorConfig; + +#[cfg(unix)] +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; + +const RUNTIME_WORKER_MULTIPLIER: usize = 2; + +fn main() -> Result<()> { + let num_cpus = num_cpus::get(); + let worker_threads = (num_cpus * RUNTIME_WORKER_MULTIPLIER).max(16); + + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder + .disable_lifo_slot() + .enable_all() + .worker_threads(worker_threads) + .build() + .unwrap() + .block_on(async { + let args = ServerArgs::parse(); + args.run::(tokio::runtime::Handle::current()) + .await + }) +} diff --git a/rust/sdk-processor/src/processors/events_processor.rs b/rust/sdk-processor/src/processors/events_processor.rs new file mode 100644 index 000000000..379b01358 --- /dev/null +++ b/rust/sdk-processor/src/processors/events_processor.rs @@ -0,0 +1,143 @@ +use crate::{ + config::{ + db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + processor_config::ProcessorConfig, + }, + steps::{ + common::latest_processed_version_tracker::LatestVersionProcessedTracker, + events_processor::{EventsExtractor, EventsStorer}, + }, + utils::{ + chain_id::check_or_update_chain_id, + database::{new_db_pool, run_migrations, ArcDbPool}, + starting_version::get_starting_version, + }, +}; +use ahash::AHashMap; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::TransactionStreamStep, + traits::IntoRunnableStep, +}; +use serde::{Deserialize, Serialize}; +use tracing::{debug, info}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct EventsProcessorConfig { + // Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2) + #[serde(default = "AHashMap::new")] + pub per_table_chunk_sizes: AHashMap, + // Size of channel between steps + #[serde(default = "EventsProcessorConfig::default_channel_size")] + pub channel_size: usize, +} + +impl EventsProcessorConfig { + pub const fn default_channel_size() -> usize { + 10 + } +} + +pub struct EventsProcessor { + pub config: IndexerProcessorConfig, + pub db_pool: ArcDbPool, +} + +impl EventsProcessor { + pub async fn new(config: IndexerProcessorConfig) -> Result { + match config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + let conn_pool = new_db_pool( + &postgres_config.connection_string, + Some(postgres_config.db_pool_size), + ) + .await + .map_err(|e| { + anyhow::anyhow!( + "Failed to create connection pool for PostgresConfig: {:?}", + e + ) + })?; + + Ok(Self { + config, + db_pool: conn_pool, + }) + }, + } + } + + pub async fn run_processor(self) -> Result<()> { + let processor_name = self.config.processor_config.name(); + + // (Optional) Run migrations + match self.config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; + }, + } + + // (Optional) Merge the starting version from config and the latest processed version from the DB + let starting_version = get_starting_version(&self.config, self.db_pool.clone()).await?; + + // (Optional) Check and update the ledger chain id to ensure we're indexing the correct chain + let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) + .await? + .get_chain_id() + .await?; + check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; + + let ProcessorConfig::EventsProcessor(events_processor_config) = + self.config.processor_config; + let channel_size = events_processor_config.channel_size; + + // Define processor steps + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config + }) + .await?; + let events_extractor = EventsExtractor {}; + let events_storer = EventsStorer::new(self.db_pool.clone(), events_processor_config); + let version_tracker = LatestVersionProcessedTracker::new( + self.db_pool.clone(), + starting_version, + processor_name.to_string(), + ); + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to(events_extractor.into_runnable_step(), channel_size) + .connect_to(events_storer.into_runnable_step(), channel_size) + .connect_to(version_tracker.into_runnable_step(), channel_size) + .end_and_return_output_receiver(channel_size); + + // (Optional) Parse the results + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + if txn_context.data.is_empty() { + continue; + } + debug!( + "Finished processing events from versions [{:?}, {:?}]", + txn_context.start_version, txn_context.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} diff --git a/rust/sdk-processor/src/processors/mod.rs b/rust/sdk-processor/src/processors/mod.rs new file mode 100644 index 000000000..110ce858e --- /dev/null +++ b/rust/sdk-processor/src/processors/mod.rs @@ -0,0 +1 @@ +pub mod events_processor; diff --git a/rust/sdk-processor/src/schema.rs b/rust/sdk-processor/src/schema.rs new file mode 100644 index 000000000..b50808303 --- /dev/null +++ b/rust/sdk-processor/src/schema.rs @@ -0,0 +1,1328 @@ +// @generated automatically by Diesel CLI. + +diesel::table! { + account_transactions (account_address, transaction_version) { + transaction_version -> Int8, + #[max_length = 66] + account_address -> Varchar, + inserted_at -> Timestamp, + } +} + +diesel::table! { + ans_lookup (transaction_version, write_set_change_index) { + transaction_version -> Int8, + write_set_change_index -> Int8, + #[max_length = 64] + domain -> Varchar, + #[max_length = 64] + subdomain -> Varchar, + #[max_length = 66] + registered_address -> Nullable, + expiration_timestamp -> Nullable, + #[max_length = 140] + token_name -> Varchar, + is_deleted -> Bool, + inserted_at -> Timestamp, + } +} + +diesel::table! { + ans_lookup_v2 (transaction_version, write_set_change_index) { + transaction_version -> Int8, + write_set_change_index -> Int8, + #[max_length = 64] + domain -> Varchar, + #[max_length = 64] + subdomain -> Varchar, + #[max_length = 10] + token_standard -> Varchar, + #[max_length = 66] + registered_address -> Nullable, + expiration_timestamp -> Nullable, + #[max_length = 140] + token_name -> Varchar, + is_deleted -> Bool, + inserted_at -> Timestamp, + subdomain_expiration_policy -> Nullable, + } +} + +diesel::table! { + ans_primary_name (transaction_version, write_set_change_index) { + transaction_version -> Int8, + write_set_change_index -> Int8, + #[max_length = 66] + registered_address -> Varchar, + #[max_length = 64] + domain -> Nullable, + #[max_length = 64] + subdomain -> Nullable, + #[max_length = 140] + token_name -> Nullable, + is_deleted -> Bool, + inserted_at -> Timestamp, + } +} + +diesel::table! { + ans_primary_name_v2 (transaction_version, write_set_change_index) { + transaction_version -> Int8, + write_set_change_index -> Int8, + #[max_length = 66] + registered_address -> Varchar, + #[max_length = 64] + domain -> Nullable, + #[max_length = 64] + subdomain -> Nullable, + #[max_length = 10] + token_standard -> Varchar, + #[max_length = 140] + token_name -> Nullable, + is_deleted -> Bool, + inserted_at -> Timestamp, + } +} + +diesel::table! { + block_metadata_transactions (version) { + version -> Int8, + block_height -> Int8, + #[max_length = 66] + id -> Varchar, + round -> Int8, + epoch -> Int8, + previous_block_votes_bitvec -> Jsonb, + #[max_length = 66] + proposer -> Varchar, + failed_proposer_indices -> Jsonb, + timestamp -> Timestamp, + inserted_at -> Timestamp, + } +} + +diesel::table! { + coin_activities (transaction_version, event_account_address, event_creation_number, event_sequence_number) { + transaction_version -> Int8, + #[max_length = 66] + event_account_address -> Varchar, + event_creation_number -> Int8, + event_sequence_number -> Int8, + #[max_length = 66] + owner_address -> Varchar, + #[max_length = 5000] + coin_type -> Varchar, + amount -> Numeric, + #[max_length = 200] + activity_type -> Varchar, + is_gas_fee -> Bool, + is_transaction_success -> Bool, + #[max_length = 1000] + entry_function_id_str -> Nullable, + block_height -> Int8, + transaction_timestamp -> Timestamp, + inserted_at -> Timestamp, + event_index -> Nullable, + #[max_length = 66] + gas_fee_payer_address -> Nullable, + storage_refund_amount -> Numeric, + } +} + +diesel::table! { + coin_balances (transaction_version, owner_address, coin_type_hash) { + transaction_version -> Int8, + #[max_length = 66] + owner_address -> Varchar, + #[max_length = 64] + coin_type_hash -> Varchar, + #[max_length = 5000] + coin_type -> Varchar, + amount -> Numeric, + transaction_timestamp -> Timestamp, + inserted_at -> Timestamp, + } +} + +diesel::table! { + coin_infos (coin_type_hash) { + #[max_length = 64] + coin_type_hash -> Varchar, + #[max_length = 5000] + coin_type -> Varchar, + transaction_version_created -> Int8, + #[max_length = 66] + creator_address -> Varchar, + #[max_length = 32] + name -> Varchar, + #[max_length = 10] + symbol -> Varchar, + decimals -> Int4, + transaction_created_timestamp -> Timestamp, + inserted_at -> Timestamp, + #[max_length = 66] + supply_aggregator_table_handle -> Nullable, + supply_aggregator_table_key -> Nullable, + } +} + +diesel::table! { + coin_supply (transaction_version, coin_type_hash) { + transaction_version -> Int8, + #[max_length = 64] + coin_type_hash -> Varchar, + #[max_length = 5000] + coin_type -> Varchar, + supply -> Numeric, + transaction_timestamp -> Timestamp, + transaction_epoch -> Int8, + inserted_at -> Timestamp, + } +} + +diesel::table! { + collection_datas (collection_data_id_hash, transaction_version) { + #[max_length = 64] + collection_data_id_hash -> Varchar, + transaction_version -> Int8, + #[max_length = 66] + creator_address -> Varchar, + #[max_length = 128] + collection_name -> Varchar, + description -> Text, + #[max_length = 512] + metadata_uri -> Varchar, + supply -> Numeric, + maximum -> Numeric, + maximum_mutable -> Bool, + uri_mutable -> Bool, + description_mutable -> Bool, + inserted_at -> Timestamp, + #[max_length = 66] + table_handle -> Varchar, + transaction_timestamp -> Timestamp, + } +} + +diesel::table! { + collections_v2 (transaction_version, write_set_change_index) { + transaction_version -> Int8, + write_set_change_index -> Int8, + #[max_length = 66] + collection_id -> Varchar, + #[max_length = 66] + creator_address -> Varchar, + #[max_length = 128] + collection_name -> Varchar, + description -> Text, + #[max_length = 512] + uri -> Varchar, + current_supply -> Numeric, + max_supply -> Nullable, + total_minted_v2 -> Nullable, + mutable_description -> Nullable, + mutable_uri -> Nullable, + #[max_length = 66] + table_handle_v1 -> Nullable, + #[max_length = 10] + token_standard -> Varchar, + transaction_timestamp -> Timestamp, + inserted_at -> Timestamp, + } +} + +diesel::table! { + current_ans_lookup (domain, subdomain) { + #[max_length = 64] + domain -> Varchar, + #[max_length = 64] + subdomain -> Varchar, + #[max_length = 66] + registered_address -> Nullable, + expiration_timestamp -> Timestamp, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + #[max_length = 140] + token_name -> Varchar, + is_deleted -> Bool, + } +} + +diesel::table! { + current_ans_lookup_v2 (domain, subdomain, token_standard) { + #[max_length = 64] + domain -> Varchar, + #[max_length = 64] + subdomain -> Varchar, + #[max_length = 10] + token_standard -> Varchar, + #[max_length = 140] + token_name -> Nullable, + #[max_length = 66] + registered_address -> Nullable, + expiration_timestamp -> Timestamp, + last_transaction_version -> Int8, + is_deleted -> Bool, + inserted_at -> Timestamp, + subdomain_expiration_policy -> Nullable, + } +} + +diesel::table! { + current_ans_primary_name (registered_address) { + #[max_length = 66] + registered_address -> Varchar, + #[max_length = 64] + domain -> Nullable, + #[max_length = 64] + subdomain -> Nullable, + #[max_length = 140] + token_name -> Nullable, + is_deleted -> Bool, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + } +} + +diesel::table! { + current_ans_primary_name_v2 (registered_address, token_standard) { + #[max_length = 66] + registered_address -> Varchar, + #[max_length = 10] + token_standard -> Varchar, + #[max_length = 64] + domain -> Nullable, + #[max_length = 64] + subdomain -> Nullable, + #[max_length = 140] + token_name -> Nullable, + is_deleted -> Bool, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + } +} + +diesel::table! { + current_coin_balances (owner_address, coin_type_hash) { + #[max_length = 66] + owner_address -> Varchar, + #[max_length = 64] + coin_type_hash -> Varchar, + #[max_length = 5000] + coin_type -> Varchar, + amount -> Numeric, + last_transaction_version -> Int8, + last_transaction_timestamp -> Timestamp, + inserted_at -> Timestamp, + } +} + +diesel::table! { + current_collection_datas (collection_data_id_hash) { + #[max_length = 64] + collection_data_id_hash -> Varchar, + #[max_length = 66] + creator_address -> Varchar, + #[max_length = 128] + collection_name -> Varchar, + description -> Text, + #[max_length = 512] + metadata_uri -> Varchar, + supply -> Numeric, + maximum -> Numeric, + maximum_mutable -> Bool, + uri_mutable -> Bool, + description_mutable -> Bool, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + #[max_length = 66] + table_handle -> Varchar, + last_transaction_timestamp -> Timestamp, + } +} + +diesel::table! { + current_collections_v2 (collection_id) { + #[max_length = 66] + collection_id -> Varchar, + #[max_length = 66] + creator_address -> Varchar, + #[max_length = 128] + collection_name -> Varchar, + description -> Text, + #[max_length = 512] + uri -> Varchar, + current_supply -> Numeric, + max_supply -> Nullable, + total_minted_v2 -> Nullable, + mutable_description -> Nullable, + mutable_uri -> Nullable, + #[max_length = 66] + table_handle_v1 -> Nullable, + #[max_length = 10] + token_standard -> Varchar, + last_transaction_version -> Int8, + last_transaction_timestamp -> Timestamp, + inserted_at -> Timestamp, + } +} + +diesel::table! { + current_delegated_staking_pool_balances (staking_pool_address) { + #[max_length = 66] + staking_pool_address -> Varchar, + total_coins -> Numeric, + total_shares -> Numeric, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + operator_commission_percentage -> Numeric, + #[max_length = 66] + inactive_table_handle -> Varchar, + #[max_length = 66] + active_table_handle -> Varchar, + } +} + +diesel::table! { + current_delegated_voter (delegation_pool_address, delegator_address) { + #[max_length = 66] + delegation_pool_address -> Varchar, + #[max_length = 66] + delegator_address -> Varchar, + #[max_length = 66] + table_handle -> Nullable, + #[max_length = 66] + voter -> Nullable, + #[max_length = 66] + pending_voter -> Nullable, + last_transaction_version -> Int8, + last_transaction_timestamp -> Timestamp, + inserted_at -> Timestamp, + } +} + +diesel::table! { + current_delegator_balances (delegator_address, pool_address, pool_type, table_handle) { + #[max_length = 66] + delegator_address -> Varchar, + #[max_length = 66] + pool_address -> Varchar, + #[max_length = 100] + pool_type -> Varchar, + #[max_length = 66] + table_handle -> Varchar, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + shares -> Numeric, + #[max_length = 66] + parent_table_handle -> Varchar, + } +} + +diesel::table! { + current_fungible_asset_balances (storage_id) { + #[max_length = 66] + storage_id -> Varchar, + #[max_length = 66] + owner_address -> Varchar, + #[max_length = 1000] + asset_type -> Varchar, + is_primary -> Bool, + is_frozen -> Bool, + amount -> Numeric, + last_transaction_timestamp -> Timestamp, + last_transaction_version -> Int8, + #[max_length = 10] + token_standard -> Varchar, + inserted_at -> Timestamp, + } +} + +diesel::table! { + current_objects (object_address) { + #[max_length = 66] + object_address -> Varchar, + #[max_length = 66] + owner_address -> Varchar, + #[max_length = 66] + state_key_hash -> Varchar, + allow_ungated_transfer -> Bool, + last_guid_creation_num -> Numeric, + last_transaction_version -> Int8, + is_deleted -> Bool, + inserted_at -> Timestamp, + } +} + +diesel::table! { + current_staking_pool_voter (staking_pool_address) { + #[max_length = 66] + staking_pool_address -> Varchar, + #[max_length = 66] + voter_address -> Varchar, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + #[max_length = 66] + operator_address -> Varchar, + } +} + +diesel::table! { + current_table_items (table_handle, key_hash) { + #[max_length = 66] + table_handle -> Varchar, + #[max_length = 64] + key_hash -> Varchar, + key -> Text, + decoded_key -> Jsonb, + decoded_value -> Nullable, + is_deleted -> Bool, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + } +} + +diesel::table! { + current_token_datas (token_data_id_hash) { + #[max_length = 64] + token_data_id_hash -> Varchar, + #[max_length = 66] + creator_address -> Varchar, + #[max_length = 128] + collection_name -> Varchar, + #[max_length = 128] + name -> Varchar, + maximum -> Numeric, + supply -> Numeric, + largest_property_version -> Numeric, + #[max_length = 512] + metadata_uri -> Varchar, + #[max_length = 66] + payee_address -> Varchar, + royalty_points_numerator -> Numeric, + royalty_points_denominator -> Numeric, + maximum_mutable -> Bool, + uri_mutable -> Bool, + description_mutable -> Bool, + properties_mutable -> Bool, + royalty_mutable -> Bool, + default_properties -> Jsonb, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + #[max_length = 64] + collection_data_id_hash -> Varchar, + last_transaction_timestamp -> Timestamp, + description -> Text, + } +} + +diesel::table! { + current_token_datas_v2 (token_data_id) { + #[max_length = 66] + token_data_id -> Varchar, + #[max_length = 66] + collection_id -> Varchar, + #[max_length = 128] + token_name -> Varchar, + maximum -> Nullable, + supply -> Nullable, + largest_property_version_v1 -> Nullable, + #[max_length = 512] + token_uri -> Varchar, + description -> Text, + token_properties -> Jsonb, + #[max_length = 10] + token_standard -> Varchar, + is_fungible_v2 -> Nullable, + last_transaction_version -> Int8, + last_transaction_timestamp -> Timestamp, + inserted_at -> Timestamp, + decimals -> Nullable, + is_deleted_v2 -> Nullable, + } +} + +diesel::table! { + current_token_ownerships (token_data_id_hash, property_version, owner_address) { + #[max_length = 64] + token_data_id_hash -> Varchar, + property_version -> Numeric, + #[max_length = 66] + owner_address -> Varchar, + #[max_length = 66] + creator_address -> Varchar, + #[max_length = 128] + collection_name -> Varchar, + #[max_length = 128] + name -> Varchar, + amount -> Numeric, + token_properties -> Jsonb, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + #[max_length = 64] + collection_data_id_hash -> Varchar, + table_type -> Text, + last_transaction_timestamp -> Timestamp, + } +} + +diesel::table! { + current_token_ownerships_v2 (token_data_id, property_version_v1, owner_address, storage_id) { + #[max_length = 66] + token_data_id -> Varchar, + property_version_v1 -> Numeric, + #[max_length = 66] + owner_address -> Varchar, + #[max_length = 66] + storage_id -> Varchar, + amount -> Numeric, + #[max_length = 66] + table_type_v1 -> Nullable, + token_properties_mutated_v1 -> Nullable, + is_soulbound_v2 -> Nullable, + #[max_length = 10] + token_standard -> Varchar, + is_fungible_v2 -> Nullable, + last_transaction_version -> Int8, + last_transaction_timestamp -> Timestamp, + inserted_at -> Timestamp, + non_transferrable_by_owner -> Nullable, + } +} + +diesel::table! { + current_token_pending_claims (token_data_id_hash, property_version, from_address, to_address) { + #[max_length = 64] + token_data_id_hash -> Varchar, + property_version -> Numeric, + #[max_length = 66] + from_address -> Varchar, + #[max_length = 66] + to_address -> Varchar, + #[max_length = 64] + collection_data_id_hash -> Varchar, + #[max_length = 66] + creator_address -> Varchar, + #[max_length = 128] + collection_name -> Varchar, + #[max_length = 128] + name -> Varchar, + amount -> Numeric, + #[max_length = 66] + table_handle -> Varchar, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + last_transaction_timestamp -> Timestamp, + #[max_length = 66] + token_data_id -> Varchar, + #[max_length = 66] + collection_id -> Varchar, + } +} + +diesel::table! { + current_token_v2_metadata (object_address, resource_type) { + #[max_length = 66] + object_address -> Varchar, + #[max_length = 128] + resource_type -> Varchar, + data -> Jsonb, + #[max_length = 66] + state_key_hash -> Varchar, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + } +} + +diesel::table! { + current_unified_fungible_asset_balances (storage_id) { + #[max_length = 66] + storage_id -> Varchar, + #[max_length = 66] + owner_address -> Varchar, + #[max_length = 66] + asset_type -> Varchar, + #[max_length = 1000] + coin_type -> Nullable, + is_primary -> Nullable, + is_frozen -> Bool, + amount_v1 -> Nullable, + amount_v2 -> Nullable, + amount -> Nullable, + last_transaction_version_v1 -> Nullable, + last_transaction_version_v2 -> Nullable, + last_transaction_version -> Nullable, + last_transaction_timestamp_v1 -> Nullable, + last_transaction_timestamp_v2 -> Nullable, + last_transaction_timestamp -> Nullable, + inserted_at -> Timestamp, + } +} + +diesel::table! { + delegated_staking_activities (transaction_version, event_index) { + transaction_version -> Int8, + event_index -> Int8, + #[max_length = 66] + delegator_address -> Varchar, + #[max_length = 66] + pool_address -> Varchar, + event_type -> Text, + amount -> Numeric, + inserted_at -> Timestamp, + } +} + +diesel::table! { + delegated_staking_pool_balances (transaction_version, staking_pool_address) { + transaction_version -> Int8, + #[max_length = 66] + staking_pool_address -> Varchar, + total_coins -> Numeric, + total_shares -> Numeric, + inserted_at -> Timestamp, + operator_commission_percentage -> Numeric, + #[max_length = 66] + inactive_table_handle -> Varchar, + #[max_length = 66] + active_table_handle -> Varchar, + } +} + +diesel::table! { + delegated_staking_pools (staking_pool_address) { + #[max_length = 66] + staking_pool_address -> Varchar, + first_transaction_version -> Int8, + inserted_at -> Timestamp, + } +} + +diesel::table! { + delegator_balances (transaction_version, write_set_change_index) { + transaction_version -> Int8, + write_set_change_index -> Int8, + #[max_length = 66] + delegator_address -> Varchar, + #[max_length = 66] + pool_address -> Varchar, + #[max_length = 100] + pool_type -> Varchar, + #[max_length = 66] + table_handle -> Varchar, + shares -> Numeric, + #[max_length = 66] + parent_table_handle -> Varchar, + inserted_at -> Timestamp, + } +} + +diesel::table! { + event_size_info (transaction_version, index) { + transaction_version -> Int8, + index -> Int8, + type_tag_bytes -> Int8, + total_bytes -> Int8, + inserted_at -> Timestamp, + } +} + +diesel::table! { + events (transaction_version, event_index) { + sequence_number -> Int8, + creation_number -> Int8, + #[max_length = 66] + account_address -> Varchar, + transaction_version -> Int8, + transaction_block_height -> Int8, + #[sql_name = "type"] + type_ -> Text, + data -> Jsonb, + inserted_at -> Timestamp, + event_index -> Int8, + #[max_length = 300] + indexed_type -> Varchar, + } +} + +diesel::table! { + fungible_asset_activities (transaction_version, event_index) { + transaction_version -> Int8, + event_index -> Int8, + #[max_length = 66] + owner_address -> Varchar, + #[max_length = 66] + storage_id -> Varchar, + #[max_length = 1000] + asset_type -> Varchar, + is_frozen -> Nullable, + amount -> Nullable, + #[sql_name = "type"] + type_ -> Varchar, + is_gas_fee -> Bool, + #[max_length = 66] + gas_fee_payer_address -> Nullable, + is_transaction_success -> Bool, + #[max_length = 1000] + entry_function_id_str -> Nullable, + block_height -> Int8, + #[max_length = 10] + token_standard -> Varchar, + transaction_timestamp -> Timestamp, + inserted_at -> Timestamp, + storage_refund_amount -> Numeric, + } +} + +diesel::table! { + fungible_asset_balances (transaction_version, write_set_change_index) { + transaction_version -> Int8, + write_set_change_index -> Int8, + #[max_length = 66] + storage_id -> Varchar, + #[max_length = 66] + owner_address -> Varchar, + #[max_length = 1000] + asset_type -> Varchar, + is_primary -> Bool, + is_frozen -> Bool, + amount -> Numeric, + transaction_timestamp -> Timestamp, + #[max_length = 10] + token_standard -> Varchar, + inserted_at -> Timestamp, + } +} + +diesel::table! { + fungible_asset_metadata (asset_type) { + #[max_length = 1000] + asset_type -> Varchar, + #[max_length = 66] + creator_address -> Varchar, + #[max_length = 32] + name -> Varchar, + #[max_length = 10] + symbol -> Varchar, + decimals -> Int4, + #[max_length = 512] + icon_uri -> Nullable, + #[max_length = 512] + project_uri -> Nullable, + last_transaction_version -> Int8, + last_transaction_timestamp -> Timestamp, + #[max_length = 66] + supply_aggregator_table_handle_v1 -> Nullable, + supply_aggregator_table_key_v1 -> Nullable, + #[max_length = 10] + token_standard -> Varchar, + inserted_at -> Timestamp, + is_token_v2 -> Nullable, + supply_v2 -> Nullable, + maximum_v2 -> Nullable, + } +} + +diesel::table! { + indexer_status (db) { + #[max_length = 50] + db -> Varchar, + is_indexer_up -> Bool, + inserted_at -> Timestamp, + } +} + +diesel::table! { + ledger_infos (chain_id) { + chain_id -> Int8, + } +} + +diesel::table! { + move_modules (transaction_version, write_set_change_index) { + transaction_version -> Int8, + write_set_change_index -> Int8, + transaction_block_height -> Int8, + name -> Text, + #[max_length = 66] + address -> Varchar, + bytecode -> Nullable, + friends -> Nullable, + exposed_functions -> Nullable, + structs -> Nullable, + is_deleted -> Bool, + inserted_at -> Timestamp, + } +} + +diesel::table! { + move_resources (transaction_version, write_set_change_index) { + transaction_version -> Int8, + write_set_change_index -> Int8, + transaction_block_height -> Int8, + name -> Text, + #[max_length = 66] + address -> Varchar, + #[sql_name = "type"] + type_ -> Text, + module -> Text, + generic_type_params -> Nullable, + data -> Nullable, + is_deleted -> Bool, + inserted_at -> Timestamp, + #[max_length = 66] + state_key_hash -> Varchar, + } +} + +diesel::table! { + nft_points (transaction_version) { + transaction_version -> Int8, + #[max_length = 66] + owner_address -> Varchar, + token_name -> Text, + point_type -> Text, + amount -> Numeric, + transaction_timestamp -> Timestamp, + inserted_at -> Timestamp, + } +} + +diesel::table! { + objects (transaction_version, write_set_change_index) { + transaction_version -> Int8, + write_set_change_index -> Int8, + #[max_length = 66] + object_address -> Varchar, + #[max_length = 66] + owner_address -> Varchar, + #[max_length = 66] + state_key_hash -> Varchar, + guid_creation_num -> Numeric, + allow_ungated_transfer -> Bool, + is_deleted -> Bool, + inserted_at -> Timestamp, + } +} + +diesel::table! { + processor_status (processor) { + #[max_length = 50] + processor -> Varchar, + last_success_version -> Int8, + last_updated -> Timestamp, + last_transaction_timestamp -> Nullable, + } +} + +diesel::table! { + proposal_votes (transaction_version, proposal_id, voter_address) { + transaction_version -> Int8, + proposal_id -> Int8, + #[max_length = 66] + voter_address -> Varchar, + #[max_length = 66] + staking_pool_address -> Varchar, + num_votes -> Numeric, + should_pass -> Bool, + transaction_timestamp -> Timestamp, + inserted_at -> Timestamp, + } +} + +diesel::table! { + signatures (transaction_version, multi_agent_index, multi_sig_index, is_sender_primary) { + transaction_version -> Int8, + multi_agent_index -> Int8, + multi_sig_index -> Int8, + transaction_block_height -> Int8, + #[max_length = 66] + signer -> Varchar, + is_sender_primary -> Bool, + #[sql_name = "type"] + type_ -> Varchar, + #[max_length = 136] + public_key -> Varchar, + signature -> Text, + threshold -> Int8, + public_key_indices -> Jsonb, + inserted_at -> Timestamp, + } +} + +diesel::table! { + spam_assets (asset) { + #[max_length = 1100] + asset -> Varchar, + is_spam -> Bool, + last_updated -> Timestamp, + } +} + +diesel::table! { + table_items (transaction_version, write_set_change_index) { + key -> Text, + transaction_version -> Int8, + write_set_change_index -> Int8, + transaction_block_height -> Int8, + #[max_length = 66] + table_handle -> Varchar, + decoded_key -> Jsonb, + decoded_value -> Nullable, + is_deleted -> Bool, + inserted_at -> Timestamp, + } +} + +diesel::table! { + table_metadatas (handle) { + #[max_length = 66] + handle -> Varchar, + key_type -> Text, + value_type -> Text, + inserted_at -> Timestamp, + } +} + +diesel::table! { + token_activities (transaction_version, event_account_address, event_creation_number, event_sequence_number) { + transaction_version -> Int8, + #[max_length = 66] + event_account_address -> Varchar, + event_creation_number -> Int8, + event_sequence_number -> Int8, + #[max_length = 64] + collection_data_id_hash -> Varchar, + #[max_length = 64] + token_data_id_hash -> Varchar, + property_version -> Numeric, + #[max_length = 66] + creator_address -> Varchar, + #[max_length = 128] + collection_name -> Varchar, + #[max_length = 128] + name -> Varchar, + #[max_length = 50] + transfer_type -> Varchar, + #[max_length = 66] + from_address -> Nullable, + #[max_length = 66] + to_address -> Nullable, + token_amount -> Numeric, + coin_type -> Nullable, + coin_amount -> Nullable, + inserted_at -> Timestamp, + transaction_timestamp -> Timestamp, + event_index -> Nullable, + } +} + +diesel::table! { + token_activities_v2 (transaction_version, event_index) { + transaction_version -> Int8, + event_index -> Int8, + #[max_length = 66] + event_account_address -> Varchar, + #[max_length = 66] + token_data_id -> Varchar, + property_version_v1 -> Numeric, + #[sql_name = "type"] + type_ -> Varchar, + #[max_length = 66] + from_address -> Nullable, + #[max_length = 66] + to_address -> Nullable, + token_amount -> Numeric, + before_value -> Nullable, + after_value -> Nullable, + #[max_length = 1000] + entry_function_id_str -> Nullable, + #[max_length = 10] + token_standard -> Varchar, + is_fungible_v2 -> Nullable, + transaction_timestamp -> Timestamp, + inserted_at -> Timestamp, + } +} + +diesel::table! { + token_datas (token_data_id_hash, transaction_version) { + #[max_length = 64] + token_data_id_hash -> Varchar, + transaction_version -> Int8, + #[max_length = 66] + creator_address -> Varchar, + #[max_length = 128] + collection_name -> Varchar, + #[max_length = 128] + name -> Varchar, + maximum -> Numeric, + supply -> Numeric, + largest_property_version -> Numeric, + #[max_length = 512] + metadata_uri -> Varchar, + #[max_length = 66] + payee_address -> Varchar, + royalty_points_numerator -> Numeric, + royalty_points_denominator -> Numeric, + maximum_mutable -> Bool, + uri_mutable -> Bool, + description_mutable -> Bool, + properties_mutable -> Bool, + royalty_mutable -> Bool, + default_properties -> Jsonb, + inserted_at -> Timestamp, + #[max_length = 64] + collection_data_id_hash -> Varchar, + transaction_timestamp -> Timestamp, + description -> Text, + } +} + +diesel::table! { + token_datas_v2 (transaction_version, write_set_change_index) { + transaction_version -> Int8, + write_set_change_index -> Int8, + #[max_length = 66] + token_data_id -> Varchar, + #[max_length = 66] + collection_id -> Varchar, + #[max_length = 128] + token_name -> Varchar, + maximum -> Nullable, + supply -> Nullable, + largest_property_version_v1 -> Nullable, + #[max_length = 512] + token_uri -> Varchar, + token_properties -> Jsonb, + description -> Text, + #[max_length = 10] + token_standard -> Varchar, + is_fungible_v2 -> Nullable, + transaction_timestamp -> Timestamp, + inserted_at -> Timestamp, + decimals -> Nullable, + is_deleted_v2 -> Nullable, + } +} + +diesel::table! { + token_ownerships (token_data_id_hash, property_version, transaction_version, table_handle) { + #[max_length = 64] + token_data_id_hash -> Varchar, + property_version -> Numeric, + transaction_version -> Int8, + #[max_length = 66] + table_handle -> Varchar, + #[max_length = 66] + creator_address -> Varchar, + #[max_length = 128] + collection_name -> Varchar, + #[max_length = 128] + name -> Varchar, + #[max_length = 66] + owner_address -> Nullable, + amount -> Numeric, + table_type -> Nullable, + inserted_at -> Timestamp, + #[max_length = 64] + collection_data_id_hash -> Varchar, + transaction_timestamp -> Timestamp, + } +} + +diesel::table! { + token_ownerships_v2 (transaction_version, write_set_change_index) { + transaction_version -> Int8, + write_set_change_index -> Int8, + #[max_length = 66] + token_data_id -> Varchar, + property_version_v1 -> Numeric, + #[max_length = 66] + owner_address -> Nullable, + #[max_length = 66] + storage_id -> Varchar, + amount -> Numeric, + #[max_length = 66] + table_type_v1 -> Nullable, + token_properties_mutated_v1 -> Nullable, + is_soulbound_v2 -> Nullable, + #[max_length = 10] + token_standard -> Varchar, + is_fungible_v2 -> Nullable, + transaction_timestamp -> Timestamp, + inserted_at -> Timestamp, + non_transferrable_by_owner -> Nullable, + } +} + +diesel::table! { + tokens (token_data_id_hash, property_version, transaction_version) { + #[max_length = 64] + token_data_id_hash -> Varchar, + property_version -> Numeric, + transaction_version -> Int8, + #[max_length = 66] + creator_address -> Varchar, + #[max_length = 128] + collection_name -> Varchar, + #[max_length = 128] + name -> Varchar, + token_properties -> Jsonb, + inserted_at -> Timestamp, + #[max_length = 64] + collection_data_id_hash -> Varchar, + transaction_timestamp -> Timestamp, + } +} + +diesel::table! { + transaction_size_info (transaction_version) { + transaction_version -> Int8, + size_bytes -> Int8, + inserted_at -> Timestamp, + } +} + +diesel::table! { + transactions (version) { + version -> Int8, + block_height -> Int8, + #[max_length = 66] + hash -> Varchar, + #[sql_name = "type"] + type_ -> Varchar, + payload -> Nullable, + #[max_length = 66] + state_change_hash -> Varchar, + #[max_length = 66] + event_root_hash -> Varchar, + #[max_length = 66] + state_checkpoint_hash -> Nullable, + gas_used -> Numeric, + success -> Bool, + vm_status -> Text, + #[max_length = 66] + accumulator_root_hash -> Varchar, + num_events -> Int8, + num_write_set_changes -> Int8, + inserted_at -> Timestamp, + epoch -> Int8, + #[max_length = 50] + payload_type -> Nullable, + } +} + +diesel::table! { + user_transactions (version) { + version -> Int8, + block_height -> Int8, + #[max_length = 50] + parent_signature_type -> Varchar, + #[max_length = 66] + sender -> Varchar, + sequence_number -> Int8, + max_gas_amount -> Numeric, + expiration_timestamp_secs -> Timestamp, + gas_unit_price -> Numeric, + timestamp -> Timestamp, + #[max_length = 1000] + entry_function_id_str -> Varchar, + inserted_at -> Timestamp, + epoch -> Int8, + } +} + +diesel::table! { + write_set_changes (transaction_version, index) { + transaction_version -> Int8, + index -> Int8, + #[max_length = 66] + hash -> Varchar, + transaction_block_height -> Int8, + #[sql_name = "type"] + type_ -> Text, + #[max_length = 66] + address -> Varchar, + inserted_at -> Timestamp, + } +} + +diesel::table! { + write_set_size_info (transaction_version, index) { + transaction_version -> Int8, + index -> Int8, + key_bytes -> Int8, + value_bytes -> Int8, + inserted_at -> Timestamp, + } +} + +diesel::allow_tables_to_appear_in_same_query!( + account_transactions, + ans_lookup, + ans_lookup_v2, + ans_primary_name, + ans_primary_name_v2, + block_metadata_transactions, + coin_activities, + coin_balances, + coin_infos, + coin_supply, + collection_datas, + collections_v2, + current_ans_lookup, + current_ans_lookup_v2, + current_ans_primary_name, + current_ans_primary_name_v2, + current_coin_balances, + current_collection_datas, + current_collections_v2, + current_delegated_staking_pool_balances, + current_delegated_voter, + current_delegator_balances, + current_fungible_asset_balances, + current_objects, + current_staking_pool_voter, + current_table_items, + current_token_datas, + current_token_datas_v2, + current_token_ownerships, + current_token_ownerships_v2, + current_token_pending_claims, + current_token_v2_metadata, + current_unified_fungible_asset_balances, + delegated_staking_activities, + delegated_staking_pool_balances, + delegated_staking_pools, + delegator_balances, + event_size_info, + events, + fungible_asset_activities, + fungible_asset_balances, + fungible_asset_metadata, + indexer_status, + ledger_infos, + move_modules, + move_resources, + nft_points, + objects, + processor_status, + proposal_votes, + signatures, + spam_assets, + table_items, + table_metadatas, + token_activities, + token_activities_v2, + token_datas, + token_datas_v2, + token_ownerships, + token_ownerships_v2, + tokens, + transaction_size_info, + transactions, + user_transactions, + write_set_changes, + write_set_size_info, +); diff --git a/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs b/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs new file mode 100644 index 000000000..7e8b09546 --- /dev/null +++ b/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs @@ -0,0 +1,186 @@ +use crate::utils::database::{execute_with_better_error, ArcDbPool}; +use ahash::AHashMap; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + traits::{ + pollable_async_step::PollableAsyncRunType, NamedStep, PollableAsyncStep, Processable, + }, + types::transaction_context::TransactionContext, + utils::{errors::ProcessorError, time::parse_timestamp}, +}; +use async_trait::async_trait; +use diesel::{upsert::excluded, ExpressionMethods}; +use processor::{db::common::models::processor_status::ProcessorStatus, schema::processor_status}; +use tracing::info; + +const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; + +pub struct LatestVersionProcessedTracker +where + Self: Sized + Send + 'static, + T: Send + 'static, +{ + conn_pool: ArcDbPool, + tracker_name: String, + // Next version to process that we expect. + next_version: u64, + // Last successful batch of sequentially processed transactions. Includes metadata to write to storage. + last_success_batch: Option>, + // Tracks all the versions that have been processed out of order. + seen_versions: AHashMap>, +} + +impl LatestVersionProcessedTracker +where + Self: Sized + Send + 'static, + T: Send + 'static, +{ + pub fn new(conn_pool: ArcDbPool, starting_version: u64, tracker_name: String) -> Self { + Self { + conn_pool, + tracker_name, + next_version: starting_version, + last_success_batch: None, + seen_versions: AHashMap::new(), + } + } + + fn update_last_success_batch(&mut self, current_batch: TransactionContext) { + let mut new_prev_batch = current_batch; + // While there are batches in seen_versions that are in order, update the new_prev_batch to the next batch. + while let Some(next_version) = self.seen_versions.remove(&(new_prev_batch.end_version + 1)) + { + new_prev_batch = next_version; + } + self.next_version = new_prev_batch.end_version + 1; + self.last_success_batch = Some(new_prev_batch); + } + + async fn save_processor_status(&mut self) -> Result<(), ProcessorError> { + // Update the processor status + if let Some(last_success_batch) = self.last_success_batch.as_ref() { + let end_timestamp = last_success_batch + .end_transaction_timestamp + .as_ref() + .map(|t| parse_timestamp(t, last_success_batch.end_version as i64)) + .map(|t| t.naive_utc()); + let status = ProcessorStatus { + processor: self.tracker_name.clone(), + last_success_version: last_success_batch.end_version as i64, + last_transaction_timestamp: end_timestamp, + }; + execute_with_better_error( + self.conn_pool.clone(), + diesel::insert_into(processor_status::table) + .values(&status) + .on_conflict(processor_status::processor) + .do_update() + .set(( + processor_status::last_success_version + .eq(excluded(processor_status::last_success_version)), + processor_status::last_updated.eq(excluded(processor_status::last_updated)), + processor_status::last_transaction_timestamp + .eq(excluded(processor_status::last_transaction_timestamp)), + )), + Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "), + ).await.map_err(|e| ProcessorError::DBStoreError { + message: format!("Failed to update processor status: {}", e), + })?; + } + Ok(()) + } +} + +#[async_trait] +impl Processable for LatestVersionProcessedTracker +where + Self: Sized + Send + 'static, + T: Send + 'static, +{ + type Input = T; + type Output = T; + type RunType = PollableAsyncRunType; + + async fn process( + &mut self, + current_batch: TransactionContext, + ) -> Result>, ProcessorError> { + // info!( + // start_version = current_batch.start_version, + // end_version = current_batch.end_version, + // step_name = self.name(), + // "Processing versions" + // ); + // If there's a gap in the next_version and current_version, save the current_version to seen_versions for + // later processing. + if self.next_version != current_batch.start_version { + info!( + expected_next_version = self.next_version, + step = self.name(), + batch_version = current_batch.start_version, + "Gap detected", + ); + self.seen_versions + .insert(current_batch.start_version, TransactionContext { + data: vec![], // No data is needed for tracking. This is to avoid clone. + start_version: current_batch.start_version, + end_version: current_batch.end_version, + start_transaction_timestamp: current_batch.start_transaction_timestamp.clone(), + end_transaction_timestamp: current_batch.end_transaction_timestamp.clone(), + total_size_in_bytes: current_batch.total_size_in_bytes, + }); + } else { + // info!("No gap detected"); + // If the current_batch is the next expected version, update the last success batch + self.update_last_success_batch(TransactionContext { + data: vec![], // No data is needed for tracking. This is to avoid clone. + start_version: current_batch.start_version, + end_version: current_batch.end_version, + start_transaction_timestamp: current_batch.start_transaction_timestamp.clone(), + end_transaction_timestamp: current_batch.end_transaction_timestamp.clone(), + total_size_in_bytes: current_batch.total_size_in_bytes, + }); + } + // Pass through + Ok(Some(current_batch)) + } + + async fn cleanup( + &mut self, + ) -> Result>>, ProcessorError> { + // If processing or polling ends, save the last successful batch to the database. + self.save_processor_status().await?; + Ok(None) + } +} + +#[async_trait] +impl PollableAsyncStep for LatestVersionProcessedTracker +where + Self: Sized + Send + Sync + 'static, + T: Send + 'static, +{ + fn poll_interval(&self) -> std::time::Duration { + std::time::Duration::from_secs(UPDATE_PROCESSOR_STATUS_SECS) + } + + async fn poll(&mut self) -> Result>>, ProcessorError> { + // TODO: Add metrics for gap count + self.save_processor_status().await?; + // Nothing should be returned + Ok(None) + } +} + +impl NamedStep for LatestVersionProcessedTracker +where + Self: Sized + Send + 'static, + T: Send + 'static, +{ + fn name(&self) -> String { + format!( + "LatestVersionProcessedTracker: {}", + std::any::type_name::() + ) + } +} diff --git a/rust/sdk-processor/src/steps/common/mod.rs b/rust/sdk-processor/src/steps/common/mod.rs new file mode 100644 index 000000000..b0479bbe3 --- /dev/null +++ b/rust/sdk-processor/src/steps/common/mod.rs @@ -0,0 +1 @@ +pub mod latest_processed_version_tracker; diff --git a/rust/sdk-processor/src/steps/events_processor/events_extractor.rs b/rust/sdk-processor/src/steps/events_processor/events_extractor.rs new file mode 100644 index 000000000..7aec68b29 --- /dev/null +++ b/rust/sdk-processor/src/steps/events_processor/events_extractor.rs @@ -0,0 +1,86 @@ +use crate::db::common::models::events_models::events::EventModel; +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::{transaction::TxnData, Transaction}, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use rayon::prelude::*; +use tracing::warn; + +pub const MIN_TRANSACTIONS_PER_RAYON_JOB: usize = 64; + +pub struct EventsExtractor +where + Self: Sized + Send + 'static, {} + +#[async_trait] +impl Processable for EventsExtractor { + type Input = Transaction; + type Output = EventModel; + type RunType = AsyncRunType; + + async fn process( + &mut self, + item: TransactionContext, + ) -> Result>, ProcessorError> { + // info!( + // start_version = item.start_version, + // end_version = item.end_version, + // step_name = self.name(), + // "Processing versions", + // ); + let events = item + .data + .par_iter() + .with_min_len(MIN_TRANSACTIONS_PER_RAYON_JOB) + .map(|txn| { + let mut events = vec![]; + let txn_version = txn.version as i64; + let block_height = txn.block_height as i64; + let txn_data = match txn.txn_data.as_ref() { + Some(data) => data, + None => { + warn!( + transaction_version = txn_version, + "Transaction data doesn't exist" + ); + // PROCESSOR_UNKNOWN_TYPE_COUNT + // .with_label_values(&["EventsProcessor"]) + // .inc(); + return vec![]; + }, + }; + let default = vec![]; + let raw_events = match txn_data { + TxnData::BlockMetadata(tx_inner) => &tx_inner.events, + TxnData::Genesis(tx_inner) => &tx_inner.events, + TxnData::User(tx_inner) => &tx_inner.events, + _ => &default, + }; + + let txn_events = EventModel::from_events(raw_events, txn_version, block_height); + events.extend(txn_events); + events + }) + .flatten() + .collect::>(); + Ok(Some(TransactionContext { + data: events, + start_version: item.start_version, + end_version: item.end_version, + start_transaction_timestamp: item.start_transaction_timestamp, + end_transaction_timestamp: item.end_transaction_timestamp, + total_size_in_bytes: item.total_size_in_bytes, + })) + } +} + +impl AsyncStep for EventsExtractor {} + +impl NamedStep for EventsExtractor { + fn name(&self) -> String { + "EventsExtractor".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/events_processor/events_storer.rs b/rust/sdk-processor/src/steps/events_processor/events_storer.rs new file mode 100644 index 000000000..cda8ccf76 --- /dev/null +++ b/rust/sdk-processor/src/steps/events_processor/events_storer.rs @@ -0,0 +1,108 @@ +use crate::{ + db::common::models::events_models::events::EventModel, + processors::events_processor::EventsProcessorConfig, + utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, +}; +use ahash::AHashMap; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use diesel::{ + pg::{upsert::excluded, Pg}, + query_builder::QueryFragment, + ExpressionMethods, +}; +use processor::schema; +use tracing::debug; + +pub struct EventsStorer +where + Self: Sized + Send + 'static, +{ + conn_pool: ArcDbPool, + processor_config: EventsProcessorConfig, +} + +impl EventsStorer { + pub fn new(conn_pool: ArcDbPool, processor_config: EventsProcessorConfig) -> Self { + Self { + conn_pool, + processor_config, + } + } +} + +fn insert_events_query( + items_to_insert: Vec, +) -> ( + impl QueryFragment + diesel::query_builder::QueryId + Send, + Option<&'static str>, +) { + use schema::events::dsl::*; + ( + diesel::insert_into(schema::events::table) + .values(items_to_insert) + .on_conflict((transaction_version, event_index)) + .do_update() + .set(( + inserted_at.eq(excluded(inserted_at)), + indexed_type.eq(excluded(indexed_type)), + )), + None, + ) +} + +#[async_trait] +impl Processable for EventsStorer { + type Input = EventModel; + type Output = EventModel; + type RunType = AsyncRunType; + + async fn process( + &mut self, + events: TransactionContext, + ) -> Result>, ProcessorError> { + // tracing::info!( + // start_version = events.start_version, + // end_version = events.end_version, + // step_name = self.name(), + // "Processing versions", + // ); + let per_table_chunk_sizes: AHashMap = + self.processor_config.per_table_chunk_sizes.clone(); + let execute_res = execute_in_chunks( + self.conn_pool.clone(), + insert_events_query, + &events.data, + get_config_table_chunk_size::("events", &per_table_chunk_sizes), + ) + .await; + match execute_res { + Ok(_) => { + debug!( + "Events version [{}, {}] stored successfully", + events.start_version, events.end_version + ); + Ok(Some(events)) + }, + Err(e) => Err(ProcessorError::DBStoreError { + message: format!( + "Failed to store events versions {} to {}: {:?}", + events.start_version, events.end_version, e, + ), + }), + } + } +} + +impl AsyncStep for EventsStorer {} + +impl NamedStep for EventsStorer { + fn name(&self) -> String { + "EventsStorer".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/events_processor/mod.rs b/rust/sdk-processor/src/steps/events_processor/mod.rs new file mode 100644 index 000000000..0c9727f85 --- /dev/null +++ b/rust/sdk-processor/src/steps/events_processor/mod.rs @@ -0,0 +1,5 @@ +pub mod events_extractor; +pub mod events_storer; + +pub use events_extractor::EventsExtractor; +pub use events_storer::EventsStorer; diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs new file mode 100644 index 000000000..0d9f7cf7a --- /dev/null +++ b/rust/sdk-processor/src/steps/mod.rs @@ -0,0 +1,2 @@ +pub mod common; +pub mod events_processor; diff --git a/rust/sdk-processor/src/utils/chain_id.rs b/rust/sdk-processor/src/utils/chain_id.rs new file mode 100644 index 000000000..db19f691e --- /dev/null +++ b/rust/sdk-processor/src/utils/chain_id.rs @@ -0,0 +1,42 @@ +use super::database::ArcDbPool; +use crate::utils::database::execute_with_better_error_conn; +use anyhow::{Context, Result}; +use processor::{db::common::models::ledger_info::LedgerInfo, schema::ledger_infos}; +use tracing::info; + +/// Verify the chain id from GRPC against the database. +pub async fn check_or_update_chain_id(grpc_chain_id: i64, db_pool: ArcDbPool) -> Result { + info!("Checking if chain id is correct"); + let mut conn = db_pool.get().await?; + + let maybe_existing_chain_id = LedgerInfo::get(&mut conn).await?.map(|li| li.chain_id); + + match maybe_existing_chain_id { + Some(chain_id) => { + anyhow::ensure!(chain_id == grpc_chain_id, "Wrong chain detected! Trying to index chain {} now but existing data is for chain {}", grpc_chain_id, chain_id); + info!( + chain_id = chain_id, + "Chain id matches! Continue to index...", + ); + Ok(chain_id as u64) + }, + None => { + info!( + chain_id = grpc_chain_id, + "Adding chain id to db, continue to index..." + ); + execute_with_better_error_conn( + &mut conn, + diesel::insert_into(ledger_infos::table) + .values(LedgerInfo { + chain_id: grpc_chain_id, + }) + .on_conflict_do_nothing(), + None, + ) + .await + .context("Error updating chain_id!") + .map(|_| grpc_chain_id as u64) + }, + } +} diff --git a/rust/sdk-processor/src/utils/database.rs b/rust/sdk-processor/src/utils/database.rs new file mode 100644 index 000000000..dd9de0c54 --- /dev/null +++ b/rust/sdk-processor/src/utils/database.rs @@ -0,0 +1,329 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +//! Database-related functions +#![allow(clippy::extra_unused_lifetimes)] + +use ahash::AHashMap; +use aptos_indexer_processor_sdk::utils::convert::remove_null_bytes; +use diesel::{ + query_builder::{AstPass, Query, QueryFragment, QueryId}, + ConnectionResult, QueryResult, +}; +use diesel_async::{ + pooled_connection::{ + bb8::{Pool, PooledConnection}, + AsyncDieselConnectionManager, ManagerConfig, PoolError, + }, + AsyncPgConnection, RunQueryDsl, +}; +use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; +use futures_util::{future::BoxFuture, FutureExt}; +use std::sync::Arc; + +pub type Backend = diesel::pg::Pg; + +pub type MyDbConnection = AsyncPgConnection; +pub type DbPool = Pool; +pub type ArcDbPool = Arc; +pub type DbPoolConnection<'a> = PooledConnection<'a, MyDbConnection>; + +pub const MIGRATIONS: EmbeddedMigrations = + embed_migrations!("../processor/src/db/postgres/migrations"); + +pub const DEFAULT_MAX_POOL_SIZE: u32 = 150; + +#[derive(QueryId)] +/// Using this will append a where clause at the end of the string upsert function, e.g. +/// INSERT INTO ... ON CONFLICT DO UPDATE SET ... WHERE "transaction_version" = excluded."transaction_version" +/// This is needed when we want to maintain a table with only the latest state +pub struct UpsertFilterLatestTransactionQuery { + query: T, + where_clause: Option<&'static str>, +} + +// the max is actually u16::MAX but we see that when the size is too big we get an overflow error so reducing it a bit +pub const MAX_DIESEL_PARAM_SIZE: usize = (u16::MAX / 2) as usize; + +/// This function will clean the data for postgres. Currently it has support for removing +/// null bytes from strings but in the future we will add more functionality. +pub fn clean_data_for_db serde::Deserialize<'de>>( + items: Vec, + should_remove_null_bytes: bool, +) -> Vec { + if should_remove_null_bytes { + items.iter().map(remove_null_bytes).collect() + } else { + items + } +} + +fn establish_connection(database_url: &str) -> BoxFuture> { + use native_tls::{Certificate, TlsConnector}; + use postgres_native_tls::MakeTlsConnector; + + (async move { + let (url, cert_path) = parse_and_clean_db_url(database_url); + let cert = std::fs::read(cert_path.unwrap()).expect("Could not read certificate"); + + let cert = Certificate::from_pem(&cert).expect("Could not parse certificate"); + let connector = TlsConnector::builder() + .danger_accept_invalid_certs(true) + .add_root_certificate(cert) + .build() + .expect("Could not build TLS connector"); + let connector = MakeTlsConnector::new(connector); + + let (client, connection) = tokio_postgres::connect(&url, connector) + .await + .expect("Could not connect to database"); + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + AsyncPgConnection::try_from(client).await + }) + .boxed() +} + +fn parse_and_clean_db_url(url: &str) -> (String, Option) { + let mut db_url = url::Url::parse(url).expect("Could not parse database url"); + let mut cert_path = None; + + let mut query = "".to_string(); + db_url.query_pairs().for_each(|(k, v)| { + if k == "sslrootcert" { + cert_path = Some(v.parse().unwrap()); + } else { + query.push_str(&format!("{}={}&", k, v)); + } + }); + db_url.set_query(Some(&query)); + + (db_url.to_string(), cert_path) +} + +pub async fn new_db_pool( + database_url: &str, + max_pool_size: Option, +) -> Result { + let (_url, cert_path) = parse_and_clean_db_url(database_url); + + let config = if cert_path.is_some() { + let mut config = ManagerConfig::::default(); + config.custom_setup = Box::new(|conn| Box::pin(establish_connection(conn))); + AsyncDieselConnectionManager::::new_with_config(database_url, config) + } else { + AsyncDieselConnectionManager::::new(database_url) + }; + let pool = Pool::builder() + .max_size(max_pool_size.unwrap_or(DEFAULT_MAX_POOL_SIZE)) + .build(config) + .await?; + Ok(Arc::new(pool)) +} + +pub async fn execute_in_chunks( + conn: ArcDbPool, + build_query: fn(Vec) -> (U, Option<&'static str>), + items_to_insert: &[T], + chunk_size: usize, +) -> Result<(), diesel::result::Error> +where + U: QueryFragment + diesel::query_builder::QueryId + Send + 'static, + T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + 'static, +{ + let tasks = items_to_insert + .chunks(chunk_size) + .map(|chunk| { + let conn = conn.clone(); + let items = chunk.to_vec(); + tokio::spawn(async move { + let (query, additional_where_clause) = build_query(items.clone()); + execute_or_retry_cleaned(conn, build_query, items, query, additional_where_clause) + .await + }) + }) + .collect::>(); + + let results = futures_util::future::try_join_all(tasks) + .await + .expect("Task panicked executing in chunks"); + for res in results { + res? + } + + Ok(()) +} + +pub async fn execute_with_better_error( + pool: ArcDbPool, + query: U, + mut additional_where_clause: Option<&'static str>, +) -> QueryResult +where + U: QueryFragment + diesel::query_builder::QueryId + Send, +{ + let original_query = diesel::debug_query::(&query).to_string(); + // This is needed because if we don't insert any row, then diesel makes a call like this + // SELECT 1 FROM TABLE WHERE 1=0 + if original_query.to_lowercase().contains("where") { + additional_where_clause = None; + } + let final_query = UpsertFilterLatestTransactionQuery { + query, + where_clause: additional_where_clause, + }; + let debug_string = diesel::debug_query::(&final_query).to_string(); + tracing::debug!("Executing query: {:?}", debug_string); + let conn = &mut pool.get().await.map_err(|e| { + tracing::warn!("Error getting connection from pool: {:?}", e); + diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UnableToSendCommand, + Box::new(e.to_string()), + ) + })?; + let res = final_query.execute(conn).await; + if let Err(ref e) = res { + tracing::warn!("Error running query: {:?}\n{:?}", e, debug_string); + } + res +} + +/// Returns the entry for the config hashmap, or the default field count for the insert +/// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX), +/// we default to chunk an array of items based on how many columns are in the table. +pub fn get_config_table_chunk_size( + table_name: &str, + per_table_chunk_sizes: &AHashMap, +) -> usize { + per_table_chunk_sizes + .get(table_name) + .copied() + .unwrap_or_else(|| MAX_DIESEL_PARAM_SIZE / T::field_count()) +} + +pub async fn execute_with_better_error_conn( + conn: &mut MyDbConnection, + query: U, + mut additional_where_clause: Option<&'static str>, +) -> QueryResult +where + U: QueryFragment + diesel::query_builder::QueryId + Send, +{ + let original_query = diesel::debug_query::(&query).to_string(); + // This is needed because if we don't insert any row, then diesel makes a call like this + // SELECT 1 FROM TABLE WHERE 1=0 + if original_query.to_lowercase().contains("where") { + additional_where_clause = None; + } + let final_query = UpsertFilterLatestTransactionQuery { + query, + where_clause: additional_where_clause, + }; + let debug_string = diesel::debug_query::(&final_query).to_string(); + tracing::debug!("Executing query: {:?}", debug_string); + let res = final_query.execute(conn).await; + if let Err(ref e) = res { + tracing::warn!("Error running query: {:?}\n{:?}", e, debug_string); + } + res +} + +async fn execute_or_retry_cleaned( + conn: ArcDbPool, + build_query: fn(Vec) -> (U, Option<&'static str>), + items: Vec, + query: U, + additional_where_clause: Option<&'static str>, +) -> Result<(), diesel::result::Error> +where + U: QueryFragment + diesel::query_builder::QueryId + Send, + T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone, +{ + match execute_with_better_error(conn.clone(), query, additional_where_clause).await { + Ok(_) => {}, + Err(_) => { + let cleaned_items = clean_data_for_db(items, true); + let (cleaned_query, additional_where_clause) = build_query(cleaned_items); + match execute_with_better_error(conn.clone(), cleaned_query, additional_where_clause) + .await + { + Ok(_) => {}, + Err(e) => { + return Err(e); + }, + } + }, + } + Ok(()) +} + +pub fn run_pending_migrations(conn: &mut impl MigrationHarness) { + conn.run_pending_migrations(MIGRATIONS) + .expect("[Parser] Migrations failed!"); +} + +// For the normal processor build we just use standard Diesel with the postgres +// feature enabled (which uses libpq under the hood, hence why we named the feature +// this way). +#[cfg(feature = "libpq")] +pub async fn run_migrations(postgres_connection_string: String, _conn_pool: ArcDbPool) { + use diesel::{Connection, PgConnection}; + + tracing::info!("Running migrations: {:?}", postgres_connection_string); + let migration_time = std::time::Instant::now(); + let mut conn = + PgConnection::establish(&postgres_connection_string).expect("migrations failed!"); + run_pending_migrations(&mut conn); + tracing::info!( + duration_in_secs = migration_time.elapsed().as_secs_f64(), + "[Parser] Finished migrations" + ); +} + +// If the libpq feature isn't enabled, we use diesel async instead. This is used by +// the CLI for the local testnet, where we cannot tolerate the libpq dependency. +#[cfg(not(feature = "libpq"))] +pub async fn run_migrations(postgres_connection_string: String, conn_pool: ArcDbPool) { + use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; + + tracing::info!("Running migrations: {:?}", postgres_connection_string); + let conn = conn_pool + // We need to use this since AsyncConnectionWrapper doesn't know how to + // work with a pooled connection. + .dedicated_connection() + .await + .expect("[Parser] Failed to get connection"); + // We use spawn_blocking since run_pending_migrations is a blocking function. + tokio::task::spawn_blocking(move || { + // This lets us use the connection like a normal diesel connection. See more: + // https://docs.rs/diesel-async/latest/diesel_async/async_connection_wrapper/type.AsyncConnectionWrapper.html + let mut conn: AsyncConnectionWrapper = + AsyncConnectionWrapper::from(conn); + run_pending_migrations(&mut conn); + }) + .await + .expect("[Parser] Failed to run migrations"); +} + +/// Section below is required to modify the query. +impl Query for UpsertFilterLatestTransactionQuery { + type SqlType = T::SqlType; +} + +//impl RunQueryDsl for UpsertFilterLatestTransactionQuery {} + +impl QueryFragment for UpsertFilterLatestTransactionQuery +where + T: QueryFragment, +{ + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Backend>) -> QueryResult<()> { + self.query.walk_ast(out.reborrow())?; + if let Some(w) = self.where_clause { + out.push_sql(w); + } + Ok(()) + } +} diff --git a/rust/sdk-processor/src/utils/mod.rs b/rust/sdk-processor/src/utils/mod.rs new file mode 100644 index 000000000..b829042b8 --- /dev/null +++ b/rust/sdk-processor/src/utils/mod.rs @@ -0,0 +1,3 @@ +pub mod chain_id; +pub mod database; +pub mod starting_version; diff --git a/rust/sdk-processor/src/utils/starting_version.rs b/rust/sdk-processor/src/utils/starting_version.rs new file mode 100644 index 000000000..9b87fd9d5 --- /dev/null +++ b/rust/sdk-processor/src/utils/starting_version.rs @@ -0,0 +1,52 @@ +use super::database::ArcDbPool; +use crate::config::indexer_processor_config::IndexerProcessorConfig; +use anyhow::{Context, Result}; +use processor::db::common::models::processor_status::ProcessorStatusQuery; + +pub async fn get_starting_version( + indexer_processor_config: &IndexerProcessorConfig, + conn_pool: ArcDbPool, +) -> Result { + // If starting_version is set in TransactionStreamConfig, use that + if indexer_processor_config + .transaction_stream_config + .starting_version + .is_some() + { + return Ok(indexer_processor_config + .transaction_stream_config + .starting_version + .unwrap()); + } + + // If it's not set, check if the DB has latest_processed_version set and use that + let latest_processed_version_from_db = + get_latest_processed_version_from_db(indexer_processor_config, conn_pool) + .await + .context("Failed to get latest processed version from DB")?; + if let Some(latest_processed_version_tracker) = latest_processed_version_from_db { + // Add +1 to start from the version after the last successful version + return Ok(latest_processed_version_tracker + 1); + } + + // If latest_processed_version is not stored in DB, return the default 0 + Ok(0) +} + +/// Gets the start version for the processor. If not found, start from 0. +pub async fn get_latest_processed_version_from_db( + indexer_processor_config: &IndexerProcessorConfig, + conn_pool: ArcDbPool, +) -> Result> { + let mut conn = conn_pool.get().await?; + + match ProcessorStatusQuery::get_by_processor( + indexer_processor_config.processor_config.name(), + &mut conn, + ) + .await? + { + Some(status) => Ok(Some(status.last_success_version as u64)), + None => Ok(None), + } +} diff --git a/rust/server-framework/src/lib.rs b/rust/server-framework/src/lib.rs index 6e9a6101c..bf4ea8198 100644 --- a/rust/server-framework/src/lib.rs +++ b/rust/server-framework/src/lib.rs @@ -9,6 +9,7 @@ use prometheus::{Encoder, TextEncoder}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; #[cfg(target_os = "linux")] use std::convert::Infallible; +// TODO: remove deprecated lint when new clippy nightly is released #[allow(deprecated)] use std::{fs::File, io::Read, panic::PanicInfo, path::PathBuf, process}; use tokio::runtime::Handle; @@ -113,12 +114,15 @@ pub struct CrashInfo { /// details/backtrace and then exit. #[allow(deprecated)] pub fn setup_panic_handler() { + // TODO: remove deprecated lint when new clippy nightly is released + #[allow(deprecated)] std::panic::set_hook(Box::new(move |pi: &PanicInfo<'_>| { handle_panic(pi); })); } // Formats and logs panic information +// TODO: remove deprecated lint when new clippy nightly is released #[allow(deprecated)] fn handle_panic(panic_info: &PanicInfo<'_>) { // The Display formatter for a PanicInfo contains the message, payload and location.