From 214f0ca9368a74164caa4aa5cc55320dfa49ee6a Mon Sep 17 00:00:00 2001 From: Piotr Gankiewicz Date: Wed, 16 Oct 2024 20:35:30 +0200 Subject: [PATCH] Add support for OpenTelemetry logs & traces, close #1289 (#1294) --- Cargo.lock | 196 +++++++++++++++++- bench/Cargo.toml | 2 +- bench/src/main.rs | 2 + cli/Cargo.toml | 4 +- configs/server.json | 12 ++ configs/server.toml | 23 +- examples/Cargo.toml | 2 +- examples/src/basic/consumer/main.rs | 8 +- examples/src/basic/producer/main.rs | 8 +- examples/src/getting-started/consumer/main.rs | 8 +- examples/src/getting-started/producer/main.rs | 8 +- .../src/message-envelope/consumer/main.rs | 8 +- .../src/message-envelope/producer/main.rs | 8 +- examples/src/message-headers/consumer/main.rs | 8 +- examples/src/message-headers/producer/main.rs | 8 +- examples/src/multi-tenant/consumer/main.rs | 8 +- examples/src/multi-tenant/producer/main.rs | 8 +- examples/src/new-sdk/consumer/main.rs | 8 +- examples/src/new-sdk/producer/main.rs | 8 +- integration/Cargo.toml | 2 +- integration/tests/examples/mod.rs | 2 + .../stream_size_validation_scenario.rs | 8 +- server/Cargo.toml | 15 +- .../create_consumer_group_handler.rs | 3 +- .../delete_consumer_group_handler.rs | 3 +- .../join_consumer_group_handler.rs | 3 +- .../leave_consumer_group_handler.rs | 3 +- .../messages/flush_unsaved_buffer_handler.rs | 3 +- .../partitions/create_partitions_handler.rs | 3 +- .../partitions/delete_partitions_handler.rs | 3 +- .../create_personal_access_token_handler.rs | 3 +- .../delete_personal_access_token_handler.rs | 3 +- ...ogin_with_personal_access_token_handler.rs | 3 +- .../handlers/streams/create_stream_handler.rs | 3 +- .../handlers/streams/delete_stream_handler.rs | 3 +- .../handlers/streams/purge_stream_handler.rs | 3 +- .../handlers/streams/update_stream_handler.rs | 3 +- .../handlers/topics/create_topic_handler.rs | 3 +- .../handlers/topics/delete_topic_handler.rs | 3 +- .../handlers/topics/purge_topic_handler.rs | 3 +- .../handlers/topics/update_topic_handler.rs | 3 +- .../handlers/users/change_password_handler.rs | 3 +- .../handlers/users/create_user_handler.rs | 3 +- .../handlers/users/delete_user_handler.rs | 3 +- .../handlers/users/login_user_handler.rs | 3 +- .../handlers/users/logout_user_handler.rs | 3 +- .../users/update_permissions_handler.rs | 3 +- .../handlers/users/update_user_handler.rs | 3 +- server/src/channels/commands/archive_state.rs | 3 +- .../commands/clean_personal_access_tokens.rs | 3 +- .../channels/commands/maintain_messages.rs | 3 +- server/src/channels/commands/save_messages.rs | 3 +- .../channels/commands/verify_heartbeats.rs | 3 +- server/src/configs/config_provider.rs | 10 +- server/src/configs/defaults.rs | 33 ++- server/src/configs/displays.rs | 37 +++- server/src/configs/server.rs | 43 ++++ server/src/configs/validators.rs | 38 +++- server/src/http/consumer_groups.rs | 3 + server/src/http/messages.rs | 2 + server/src/http/partitions.rs | 3 + server/src/http/personal_access_tokens.rs | 4 + server/src/http/streams.rs | 5 + server/src/http/topics.rs | 5 + server/src/http/users.rs | 8 + server/src/log/logger.rs | 151 ++++++++++++-- server/src/log/tokio_console.rs | 5 +- server/src/main.rs | 14 +- server/src/streaming/systems/stats.rs | 2 +- server/src/streaming/systems/system.rs | 5 +- tools/Cargo.toml | 2 +- tools/src/data-seeder/main.rs | 8 +- 72 files changed, 721 insertions(+), 111 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e08a7a976..aec6d1e16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1335,6 +1335,15 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "encoding_rs" +version = "0.8.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +dependencies = [ + "cfg-if", +] + [[package]] name = "env_filter" version = "0.1.2" @@ -1936,6 +1945,22 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.4.1", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.7" @@ -2045,7 +2070,7 @@ dependencies = [ [[package]] name = "iggy-cli" -version = "0.8.0" +version = "0.8.1" dependencies = [ "ahash 0.8.11", "anyhow", @@ -2296,7 +2321,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -2692,6 +2717,105 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry-appender-tracing" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f62d9a23c680ab91c74605f5006110768eb67600bb654937fef5c852fb8ec7" +dependencies = [ + "log", + "opentelemetry", + "tracing", + "tracing-core", + "tracing-subscriber", +] + +[[package]] +name = "opentelemetry-http" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6351496aeaa49d7c267fb480678d85d1cd30c5edb20b497c48c56f62a8c14b99" +dependencies = [ + "async-trait", + "bytes", + "http 1.1.0", + "opentelemetry", + "reqwest", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29e1f9c8b032d4f635c730c0efcf731d5e2530ea13fa8bef7939ddc8420696bd" +dependencies = [ + "async-trait", + "futures-core", + "http 1.1.0", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest", + "thiserror", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9d3968ce3aefdcca5c27e3c4ea4391b37547726a70893aab52d3de95d5f8b34" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db945c1eaea8ac6a9677185357480d215bb6999faa9f691d0c4d4d641eab7a09" + +[[package]] +name = "opentelemetry_sdk" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "percent-encoding", + "rand", + "serde_json", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -3308,18 +3432,23 @@ checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63" dependencies = [ "base64 0.22.1", "bytes", + "encoding_rs", + "futures-channel", "futures-core", "futures-util", + "h2", "http 1.1.0", "http-body 1.0.1", "http-body-util", "hyper 1.4.1", "hyper-rustls", + "hyper-tls 0.6.0", "hyper-util", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -3331,7 +3460,9 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper 1.0.1", + "system-configuration", "tokio", + "tokio-native-tls", "tokio-rustls", "tower-service", "url", @@ -3480,7 +3611,7 @@ dependencies = [ "hmac", "http 0.2.12", "hyper 0.14.30", - "hyper-tls", + "hyper-tls 0.5.0", "log", "maybe-async", "md5", @@ -3867,7 +3998,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.52" +version = "0.4.60" dependencies = [ "ahash 0.8.11", "anyhow", @@ -3893,9 +4024,15 @@ dependencies = [ "log", "moka", "openssl", + "opentelemetry", + "opentelemetry-appender-tracing", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", "prometheus-client", "quinn", "rcgen", + "reqwest", "ring", "rmp-serde", "rust-s3", @@ -3908,7 +4045,7 @@ dependencies = [ "static-toml", "strip-ansi-escapes", "strum", - "sysinfo 0.31.4", + "sysinfo 0.32.0", "thiserror", "tikv-jemallocator", "tokio", @@ -3917,6 +4054,7 @@ dependencies = [ "tower-http", "tracing", "tracing-appender", + "tracing-opentelemetry", "tracing-subscriber", "ulid", "uuid", @@ -4166,9 +4304,9 @@ dependencies = [ [[package]] name = "sysinfo" -version = "0.31.4" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "355dbe4f8799b304b05e1b0f05fc59b2a18d36645cf169607da45bde2f69a1be" +checksum = "e3b5ae3f4f7d64646c46c4cae4e3f01d1c5d255c7406fdd7c7f999a94e488791" dependencies = [ "core-foundation-sys", "libc", @@ -4178,6 +4316,27 @@ dependencies = [ "windows 0.57.0", ] +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags 2.6.0", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tagptr" version = "0.2.0" @@ -4490,15 +4649,14 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.5.2" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +checksum = "8437150ab6bbc8c5f0f519e3d5ed4aa883a83dd4cdd3d1b21f9482936046cb97" dependencies = [ "bitflags 2.6.0", "bytes", "http 1.1.0", "http-body 1.0.1", - "http-body-util", "pin-project-lite", "tower-layer", "tower-service", @@ -4573,6 +4731,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc58af5d3f6c5811462cabb3289aec0093f7338e367e5a33d28c0433b3c7360b" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" diff --git a/bench/Cargo.toml b/bench/Cargo.toml index 26f8b8779..b10d45586 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -18,7 +18,7 @@ serde = { version = "1.0.210", features = ["derive"] } tokio = { version = "1.40.0", features = ["full"] } toml = "0.8.14" tracing = { version = "0.1.37" } -tracing-subscriber = { version = "0.3.16" } +tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] } [[bin]] name = "iggy-bench" diff --git a/bench/src/main.rs b/bench/src/main.rs index 2f15a53ce..a4cbd81fb 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -12,6 +12,7 @@ use clap::Parser; use figlet_rs::FIGfont; use iggy::error::IggyError; use tracing::{error, info}; +use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() -> Result<(), IggyError> { @@ -19,6 +20,7 @@ async fn main() -> Result<(), IggyError> { let figure = standard_font.convert("Iggy Bench"); println!("{}", figure.unwrap()); tracing_subscriber::fmt::Subscriber::builder() + .with_env_filter(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) .with_ansi(true) .init(); diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 1cc5489d6..add7f0d69 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy-cli" -version = "0.8.0" +version = "0.8.1" edition = "2021" authors = ["bartosz.ciesla@gmail.com"] repository = "https://github.com/iggy-rs/iggy" @@ -27,7 +27,7 @@ thiserror = "1.0.61" tokio = { version = "1.40.0", features = ["full"] } tracing = "0.1.37" tracing-appender = "0.2.2" -tracing-subscriber = { version = "0.3.17" } +tracing-subscriber = { version = "0.3.18", default-features = false, features = ["fmt", "env-filter"] } [[bin]] name = "iggy" diff --git a/configs/server.json b/configs/server.json index 85a29235a..548353f85 100644 --- a/configs/server.json +++ b/configs/server.json @@ -122,6 +122,18 @@ "enabled": false, "interval": "5 s" }, + "telemetry": { + "enabled": false, + "service_name": "iggy", + "logs": { + "transport": "grpc", + "endpoint": "http://localhost:7281/v1/logs" + }, + "traces": { + "transport": "grpc", + "endpoint": "http://localhost:7281/v1/traces" + } + }, "system": { "path": "local_data", "backup": { diff --git a/configs/server.toml b/configs/server.toml index b109b458a..d1f600042 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -272,6 +272,27 @@ enabled = false # Interval for expected client heartbeats interval = "5 s" +# OpenTelemetry configuration +[telemetry] +# Enables or disables telemetry. +enabled = false +# Service name for telemetry. +service_name = "iggy" + +# OpenTelemetry logs configuration +[telemetry.logs] +# Transport for sending logs. Options: "grpc", "http". +transport = "grpc" +# Endpoint for sending logs. +endpoint = "http://localhost:7281/v1/logs" + +# OpenTelemetry traces configuration +[telemetry.traces] +# Transport for sending traces. Options: "grpc", "http". +transport = "grpc" +# Endpoint for sending traces. +endpoint = "http://localhost:7281/v1/traces" + # System configuration. [system] # Base path for system data storage. @@ -441,4 +462,4 @@ expiry = "1 m" # Recovery configuration in case of lost data [system.recovery] # Controls whether streams/topics/partitions should be recreated if the expected data for existing state is missing (boolean). -recreate_missing_state = true \ No newline at end of file +recreate_missing_state = true diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 1c495e1ab..6563814a5 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -63,4 +63,4 @@ serde = { version = "1.0.210", features = ["derive", "rc"] } serde_json = "1.0.127" tokio = { version = "1.40.0", features = ["full"] } tracing = { version = "0.1.37" } -tracing-subscriber = { version = "0.3.16" } +tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] } diff --git a/examples/src/basic/consumer/main.rs b/examples/src/basic/consumer/main.rs index d4f6ea046..627b6ccb1 100644 --- a/examples/src/basic/consumer/main.rs +++ b/examples/src/basic/consumer/main.rs @@ -9,11 +9,17 @@ use iggy_examples::shared::system; use std::error::Error; use std::sync::Arc; use tracing::info; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Registry}; #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::parse(); - tracing_subscriber::fmt::init(); + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); info!( "Basic consumer has started, selected transport: {}", args.transport diff --git a/examples/src/basic/producer/main.rs b/examples/src/basic/producer/main.rs index 24976fe7e..25c32119d 100644 --- a/examples/src/basic/producer/main.rs +++ b/examples/src/basic/producer/main.rs @@ -10,11 +10,17 @@ use std::error::Error; use std::str::FromStr; use std::sync::Arc; use tracing::info; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Registry}; #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::parse(); - tracing_subscriber::fmt::init(); + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); info!( "Basic producer has started, selected transport: {}", args.transport diff --git a/examples/src/getting-started/consumer/main.rs b/examples/src/getting-started/consumer/main.rs index 4896b9a7c..3b94f9884 100644 --- a/examples/src/getting-started/consumer/main.rs +++ b/examples/src/getting-started/consumer/main.rs @@ -10,6 +10,9 @@ use std::error::Error; use std::str::FromStr; use tokio::time::sleep; use tracing::info; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Registry}; const STREAM_ID: u32 = 1; const TOPIC_ID: u32 = 1; @@ -18,7 +21,10 @@ const BATCHES_LIMIT: u32 = 5; #[tokio::main] async fn main() -> Result<(), Box> { - tracing_subscriber::fmt::init(); + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); let client = IggyClientBuilder::new() .with_tcp() .with_server_address(get_tcp_server_addr()) diff --git a/examples/src/getting-started/producer/main.rs b/examples/src/getting-started/producer/main.rs index efbac9447..601ead7a4 100644 --- a/examples/src/getting-started/producer/main.rs +++ b/examples/src/getting-started/producer/main.rs @@ -11,6 +11,9 @@ use std::env; use std::error::Error; use std::str::FromStr; use tracing::{info, warn}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Registry}; const STREAM_ID: u32 = 1; const TOPIC_ID: u32 = 1; @@ -19,7 +22,10 @@ const BATCHES_LIMIT: u32 = 5; #[tokio::main] async fn main() -> Result<(), Box> { - tracing_subscriber::fmt::init(); + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); let client = IggyClientBuilder::new() .with_tcp() diff --git a/examples/src/message-envelope/consumer/main.rs b/examples/src/message-envelope/consumer/main.rs index 2f3def7bf..4b23fb699 100644 --- a/examples/src/message-envelope/consumer/main.rs +++ b/examples/src/message-envelope/consumer/main.rs @@ -11,11 +11,17 @@ use iggy_examples::shared::system; use std::error::Error; use std::sync::Arc; use tracing::{info, warn}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Registry}; #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::parse(); - tracing_subscriber::fmt::init(); + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); info!( "Message envelope consumer has started, selected transport: {}", args.transport diff --git a/examples/src/message-envelope/producer/main.rs b/examples/src/message-envelope/producer/main.rs index 6e706b830..bafb1a494 100644 --- a/examples/src/message-envelope/producer/main.rs +++ b/examples/src/message-envelope/producer/main.rs @@ -12,11 +12,17 @@ use std::error::Error; use std::str::FromStr; use std::sync::Arc; use tracing::info; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Registry}; #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::parse(); - tracing_subscriber::fmt::init(); + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); info!( "Message envelope producer has started, selected transport: {}", args.transport diff --git a/examples/src/message-headers/consumer/main.rs b/examples/src/message-headers/consumer/main.rs index 0416fc2c4..0f09b8291 100644 --- a/examples/src/message-headers/consumer/main.rs +++ b/examples/src/message-headers/consumer/main.rs @@ -12,11 +12,17 @@ use iggy_examples::shared::system; use std::error::Error; use std::sync::Arc; use tracing::{info, warn}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Registry}; #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::parse(); - tracing_subscriber::fmt::init(); + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); info!( "Message headers consumer has started, selected transport: {}", args.transport diff --git a/examples/src/message-headers/producer/main.rs b/examples/src/message-headers/producer/main.rs index aca79aa78..5d38381ce 100644 --- a/examples/src/message-headers/producer/main.rs +++ b/examples/src/message-headers/producer/main.rs @@ -15,11 +15,17 @@ use std::error::Error; use std::str::FromStr; use std::sync::Arc; use tracing::info; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Registry}; #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::parse(); - tracing_subscriber::fmt::init(); + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); info!( "Message headers producer has started, selected transport: {}", args.transport diff --git a/examples/src/multi-tenant/consumer/main.rs b/examples/src/multi-tenant/consumer/main.rs index 97de8658e..d70a3c5af 100644 --- a/examples/src/multi-tenant/consumer/main.rs +++ b/examples/src/multi-tenant/consumer/main.rs @@ -19,6 +19,9 @@ use std::error::Error; use std::str::FromStr; use tokio::task::JoinHandle; use tracing::{error, info}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Registry}; const TOPICS: &[&str] = &["events", "logs", "notifications"]; const CONSUMER_GROUP: &str = "multi-tenant"; @@ -69,7 +72,10 @@ impl TenantConsumer { #[tokio::main] async fn main() -> anyhow::Result<(), Box> { let args = Args::parse(); - tracing_subscriber::fmt::init(); + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); let tenants_count = env::var("TENANTS_COUNT") .unwrap_or_else(|_| 3.to_string()) .parse::() diff --git a/examples/src/multi-tenant/producer/main.rs b/examples/src/multi-tenant/producer/main.rs index cac1712a2..35a34d5cf 100644 --- a/examples/src/multi-tenant/producer/main.rs +++ b/examples/src/multi-tenant/producer/main.rs @@ -21,6 +21,9 @@ use std::error::Error; use std::str::FromStr; use tokio::task::JoinHandle; use tracing::{error, info}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Registry}; const TOPICS: &[&str] = &["events", "logs", "notifications"]; const PASSWORD: &str = "secret"; @@ -70,7 +73,10 @@ impl TenantProducer { #[tokio::main] async fn main() -> anyhow::Result<(), Box> { let args = Args::parse(); - tracing_subscriber::fmt::init(); + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); let tenants_count = env::var("TENANTS_COUNT") .unwrap_or_else(|_| 3.to_string()) .parse::() diff --git a/examples/src/new-sdk/consumer/main.rs b/examples/src/new-sdk/consumer/main.rs index 526afc0bb..22ff0e7eb 100644 --- a/examples/src/new-sdk/consumer/main.rs +++ b/examples/src/new-sdk/consumer/main.rs @@ -17,11 +17,17 @@ use std::error::Error; use std::str::FromStr; use std::sync::Arc; use tracing::{error, info, warn}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Registry}; #[tokio::main] async fn main() -> anyhow::Result<(), Box> { let args = Args::parse(); - tracing_subscriber::fmt::init(); + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); info!( "New SDK consumer has started, selected transport: {}", args.transport diff --git a/examples/src/new-sdk/producer/main.rs b/examples/src/new-sdk/producer/main.rs index ab385cb54..bee171f48 100644 --- a/examples/src/new-sdk/producer/main.rs +++ b/examples/src/new-sdk/producer/main.rs @@ -14,11 +14,17 @@ use std::error::Error; use std::str::FromStr; use std::sync::Arc; use tracing::info; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Registry}; #[tokio::main] async fn main() -> anyhow::Result<(), Box> { let args = Args::parse(); - tracing_subscriber::fmt::init(); + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); info!( "New SDK producer has started, selected transport: {}", args.transport diff --git a/integration/Cargo.toml b/integration/Cargo.toml index 5e937ffe0..1850d1621 100644 --- a/integration/Cargo.toml +++ b/integration/Cargo.toml @@ -25,7 +25,7 @@ serial_test = "3.1.1" server = { path = "../server" } tempfile = "3.10.1" tokio = { version = "1.40.0", features = ["full"] } -tracing-subscriber = "0.3.18" +tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] } uuid = { version = "1.1.0", features = ["v7", "fast-rng", "zerocopy"] } xxhash-rust = { version = "0.8.12", features = ["xxh32"] } diff --git a/integration/tests/examples/mod.rs b/integration/tests/examples/mod.rs index b8739dce3..36c400096 100644 --- a/integration/tests/examples/mod.rs +++ b/integration/tests/examples/mod.rs @@ -152,7 +152,9 @@ impl<'a> IggyExampleTest<'a> { args.push("--message-batches-limit".into()); args.push("1".into()); + producer_cmd.envs(vec![("RUST_LOG", "info")]); producer_cmd.args(args.clone()); + consumer_cmd.envs(vec![("RUST_LOG", "info")]); consumer_cmd.args(args); let producer_handle = tokio::spawn(async move { diff --git a/integration/tests/server/scenarios/stream_size_validation_scenario.rs b/integration/tests/server/scenarios/stream_size_validation_scenario.rs index 1d742a41e..a2d489bcb 100644 --- a/integration/tests/server/scenarios/stream_size_validation_scenario.rs +++ b/integration/tests/server/scenarios/stream_size_validation_scenario.rs @@ -9,6 +9,9 @@ use iggy::utils::expiry::IggyExpiry; use iggy::utils::topic_size::MaxTopicSize; use integration::test_server::{assert_clean_system, login_root, ClientFactory}; use std::str::FromStr; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Registry}; const S1_NAME: &str = "test-stream-1"; const T1_NAME: &str = "test-topic-1"; @@ -20,7 +23,10 @@ const MSGS_COUNT: u64 = 117; // number of messages in a single topic after one p const MSGS_SIZE: u64 = MSG_SIZE * MSGS_COUNT; // number of bytes in a single topic after one pass of appending pub async fn run(client_factory: &dyn ClientFactory) { - let _ = tracing_subscriber::fmt::try_init(); + let _ = Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .try_init(); let client = create_client(client_factory).await; // 0. Ping server, login as root user and ensure that streams do not exist diff --git a/server/Cargo.toml b/server/Cargo.toml index 391832121..b14db14c8 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.52" +version = "0.4.60" edition = "2021" build = "src/build.rs" @@ -34,9 +34,15 @@ jsonwebtoken = "9.3.0" log = "0.4.20" moka = { version = "0.12.5", features = ["future"] } openssl = { version = "0.10.66", features = ["vendored"] } +opentelemetry = { version = "0.26.0", features = ["trace", "logs"] } +opentelemetry-appender-tracing = { version = "0.26.0", features = ["log"] } +opentelemetry-otlp = { version = "0.26.0", features = ["logs", "trace", "grpc-tonic", "http", "http-proto", "reqwest-client", "tokio"] } +opentelemetry-semantic-conventions = { version = "0.26.0" } +opentelemetry_sdk = { version = "0.26.0", features = ["rt-tokio", "logs", "trace", "tokio"] } prometheus-client = "0.22.2" quinn = { version = "0.11.5" } rcgen = "0.13.1" +reqwest = { version = "0.12.4", features = ["rustls-tls", "rustls-tls-no-provider"] } ring = "0.17.8" rmp-serde = "1.3.0" rust-s3 = { version = "0.34.0", features = ["default"] } @@ -49,19 +55,20 @@ sled = "0.34.7" static-toml = "1.2.0" strip-ansi-escapes = "0.2.0" strum = { version = "0.26.2", features = ["derive"] } -sysinfo = "0.31.4" +sysinfo = "0.32.0" thiserror = "1.0.61" tokio = { version = "1.40.0", features = ["full"] } tokio-native-tls = "0.3.1" toml = "0.8.14" -tower-http = { version = "0.5.2", features = [ +tower-http = { version = "0.6.1", features = [ "add-extension", "cors", "trace", ] } tracing = { version = "0.1.40" } tracing-appender = "0.2.3" -tracing-subscriber = { version = "0.3.18", features = ["fmt"] } +tracing-opentelemetry = { version = "0.27.0" } +tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] } ulid = "1.1.2" uuid = { version = "1.1.0", features = ["v7", "fast-rng", "zerocopy"] } xxhash-rust = { version = "0.8.12", features = ["xxh32"] } diff --git a/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs b/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs index cf8e76a18..4c7b1ed46 100644 --- a/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs +++ b/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs @@ -6,8 +6,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::consumer_groups::create_consumer_group::CreateConsumerGroup; use iggy::error::IggyError; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = command.stream_id.as_string(), iggy_topic_id = command.topic_id.as_string()))] pub async fn handle( command: CreateConsumerGroup, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs b/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs index d51f5bb3e..c33aa8f16 100644 --- a/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs +++ b/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs @@ -5,8 +5,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::consumer_groups::delete_consumer_group::DeleteConsumerGroup; use iggy::error::IggyError; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = command.stream_id.as_string(), iggy_topic_id = command.topic_id.as_string()))] pub async fn handle( command: DeleteConsumerGroup, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs b/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs index a76caf85f..df5f89478 100644 --- a/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs +++ b/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs @@ -4,8 +4,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::consumer_groups::join_consumer_group::JoinConsumerGroup; use iggy::error::IggyError; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = command.stream_id.as_string(), iggy_topic_id = command.topic_id.as_string(), iggy_group_id = command.group_id.as_string()))] pub async fn handle( command: JoinConsumerGroup, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs b/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs index d2585aab9..416cd4452 100644 --- a/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs +++ b/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs @@ -4,8 +4,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::consumer_groups::leave_consumer_group::LeaveConsumerGroup; use iggy::error::IggyError; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = command.stream_id.as_string(), iggy_topic_id = command.topic_id.as_string(), iggy_group_id = command.group_id.as_string()))] pub async fn handle( command: LeaveConsumerGroup, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs b/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs index 1b75b0e83..85f6c4abd 100644 --- a/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs +++ b/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs @@ -4,8 +4,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::messages::flush_unsaved_buffer::FlushUnsavedBuffer; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = command.stream_id.as_string(), iggy_topic_id = command.topic_id.as_string(), iggy_partition_id = command.partition_id, iggy_fsync = command.fsync))] pub async fn handle( command: FlushUnsavedBuffer, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/partitions/create_partitions_handler.rs b/server/src/binary/handlers/partitions/create_partitions_handler.rs index 7f7af1794..0de42e406 100644 --- a/server/src/binary/handlers/partitions/create_partitions_handler.rs +++ b/server/src/binary/handlers/partitions/create_partitions_handler.rs @@ -5,8 +5,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::partitions::create_partitions::CreatePartitions; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = command.stream_id.as_string(), iggy_topic_id = command.topic_id.as_string()))] pub async fn handle( command: CreatePartitions, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/partitions/delete_partitions_handler.rs b/server/src/binary/handlers/partitions/delete_partitions_handler.rs index 5639a29df..b24e98b51 100644 --- a/server/src/binary/handlers/partitions/delete_partitions_handler.rs +++ b/server/src/binary/handlers/partitions/delete_partitions_handler.rs @@ -5,8 +5,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::partitions::delete_partitions::DeletePartitions; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = command.stream_id.as_string(), iggy_topic_id = command.topic_id.as_string()))] pub async fn handle( command: DeletePartitions, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs b/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs index e9d193b51..ee9e37d06 100644 --- a/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs +++ b/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs @@ -8,8 +8,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::personal_access_tokens::create_personal_access_token::CreatePersonalAccessToken; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] pub async fn handle( command: CreatePersonalAccessToken, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs b/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs index cf3a70823..47e273ec4 100644 --- a/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs +++ b/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs @@ -5,8 +5,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::personal_access_tokens::delete_personal_access_token::DeletePersonalAccessToken; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] pub async fn handle( command: DeletePersonalAccessToken, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs b/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs index dbdc2bad6..f368fe2d2 100644 --- a/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs +++ b/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs @@ -5,8 +5,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::personal_access_tokens::login_with_personal_access_token::LoginWithPersonalAccessToken; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] pub async fn handle( command: LoginWithPersonalAccessToken, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/streams/create_stream_handler.rs b/server/src/binary/handlers/streams/create_stream_handler.rs index e9320ab80..57de3dbb6 100644 --- a/server/src/binary/handlers/streams/create_stream_handler.rs +++ b/server/src/binary/handlers/streams/create_stream_handler.rs @@ -6,8 +6,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::streams::create_stream::CreateStream; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] pub async fn handle( command: CreateStream, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/streams/delete_stream_handler.rs b/server/src/binary/handlers/streams/delete_stream_handler.rs index 5b3965593..d636f94b1 100644 --- a/server/src/binary/handlers/streams/delete_stream_handler.rs +++ b/server/src/binary/handlers/streams/delete_stream_handler.rs @@ -5,8 +5,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::streams::delete_stream::DeleteStream; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = command.stream_id.as_string()))] pub async fn handle( command: DeleteStream, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/streams/purge_stream_handler.rs b/server/src/binary/handlers/streams/purge_stream_handler.rs index cf256fe9d..eb3fb635c 100644 --- a/server/src/binary/handlers/streams/purge_stream_handler.rs +++ b/server/src/binary/handlers/streams/purge_stream_handler.rs @@ -5,8 +5,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::streams::purge_stream::PurgeStream; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = command.stream_id.as_string()))] pub async fn handle( command: PurgeStream, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/streams/update_stream_handler.rs b/server/src/binary/handlers/streams/update_stream_handler.rs index 5ef155bf2..a60d2ff2d 100644 --- a/server/src/binary/handlers/streams/update_stream_handler.rs +++ b/server/src/binary/handlers/streams/update_stream_handler.rs @@ -5,8 +5,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::streams::update_stream::UpdateStream; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = command.stream_id.as_string()))] pub async fn handle( command: UpdateStream, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/topics/create_topic_handler.rs b/server/src/binary/handlers/topics/create_topic_handler.rs index a6a62cd8f..afb15c567 100644 --- a/server/src/binary/handlers/topics/create_topic_handler.rs +++ b/server/src/binary/handlers/topics/create_topic_handler.rs @@ -6,8 +6,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::topics::create_topic::CreateTopic; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = command.stream_id.as_string()))] pub async fn handle( mut command: CreateTopic, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/topics/delete_topic_handler.rs b/server/src/binary/handlers/topics/delete_topic_handler.rs index 629426b96..3945f3604 100644 --- a/server/src/binary/handlers/topics/delete_topic_handler.rs +++ b/server/src/binary/handlers/topics/delete_topic_handler.rs @@ -5,8 +5,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::topics::delete_topic::DeleteTopic; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = command.stream_id.as_string(), iggy_topic_id = command.topic_id.as_string()))] pub async fn handle( command: DeleteTopic, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/topics/purge_topic_handler.rs b/server/src/binary/handlers/topics/purge_topic_handler.rs index ba4dcdeac..6c2aff4df 100644 --- a/server/src/binary/handlers/topics/purge_topic_handler.rs +++ b/server/src/binary/handlers/topics/purge_topic_handler.rs @@ -5,8 +5,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::topics::purge_topic::PurgeTopic; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = command.stream_id.as_string(), iggy_topic_id = command.topic_id.as_string()))] pub async fn handle( command: PurgeTopic, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/topics/update_topic_handler.rs b/server/src/binary/handlers/topics/update_topic_handler.rs index 5a6da44ad..33f5a62b2 100644 --- a/server/src/binary/handlers/topics/update_topic_handler.rs +++ b/server/src/binary/handlers/topics/update_topic_handler.rs @@ -5,8 +5,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::topics::update_topic::UpdateTopic; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = command.stream_id.as_string(), iggy_topic_id = command.topic_id.as_string()))] pub async fn handle( mut command: UpdateTopic, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/users/change_password_handler.rs b/server/src/binary/handlers/users/change_password_handler.rs index 6f1ea25ff..700d1ef83 100644 --- a/server/src/binary/handlers/users/change_password_handler.rs +++ b/server/src/binary/handlers/users/change_password_handler.rs @@ -6,8 +6,9 @@ use crate::streaming::utils::crypto; use anyhow::Result; use iggy::error::IggyError; use iggy::users::change_password::ChangePassword; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] pub async fn handle( command: ChangePassword, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/users/create_user_handler.rs b/server/src/binary/handlers/users/create_user_handler.rs index 4aecc9545..b831d45d9 100644 --- a/server/src/binary/handlers/users/create_user_handler.rs +++ b/server/src/binary/handlers/users/create_user_handler.rs @@ -7,8 +7,9 @@ use crate::streaming::utils::crypto; use anyhow::Result; use iggy::error::IggyError; use iggy::users::create_user::CreateUser; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] pub async fn handle( command: CreateUser, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/users/delete_user_handler.rs b/server/src/binary/handlers/users/delete_user_handler.rs index e0f24cda6..678a276f1 100644 --- a/server/src/binary/handlers/users/delete_user_handler.rs +++ b/server/src/binary/handlers/users/delete_user_handler.rs @@ -5,8 +5,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::users::delete_user::DeleteUser; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] pub async fn handle( command: DeleteUser, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/users/login_user_handler.rs b/server/src/binary/handlers/users/login_user_handler.rs index bdf789385..5c898831e 100644 --- a/server/src/binary/handlers/users/login_user_handler.rs +++ b/server/src/binary/handlers/users/login_user_handler.rs @@ -5,8 +5,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::users::login_user::LoginUser; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] pub async fn handle( command: LoginUser, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/users/logout_user_handler.rs b/server/src/binary/handlers/users/logout_user_handler.rs index b847d46a2..8867eb0f5 100644 --- a/server/src/binary/handlers/users/logout_user_handler.rs +++ b/server/src/binary/handlers/users/logout_user_handler.rs @@ -4,8 +4,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::users::logout_user::LogoutUser; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] pub async fn handle( command: LogoutUser, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/users/update_permissions_handler.rs b/server/src/binary/handlers/users/update_permissions_handler.rs index f2341c124..85dd29aa5 100644 --- a/server/src/binary/handlers/users/update_permissions_handler.rs +++ b/server/src/binary/handlers/users/update_permissions_handler.rs @@ -5,8 +5,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::users::update_permissions::UpdatePermissions; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] pub async fn handle( command: UpdatePermissions, sender: &mut dyn Sender, diff --git a/server/src/binary/handlers/users/update_user_handler.rs b/server/src/binary/handlers/users/update_user_handler.rs index 128a0fd10..712c9bc91 100644 --- a/server/src/binary/handlers/users/update_user_handler.rs +++ b/server/src/binary/handlers/users/update_user_handler.rs @@ -5,8 +5,9 @@ use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy::error::IggyError; use iggy::users::update_user::UpdateUser; -use tracing::debug; +use tracing::{debug, instrument}; +#[instrument(skip_all, fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] pub async fn handle( command: UpdateUser, sender: &mut dyn Sender, diff --git a/server/src/channels/commands/archive_state.rs b/server/src/channels/commands/archive_state.rs index f7a63b69d..836f80a25 100644 --- a/server/src/channels/commands/archive_state.rs +++ b/server/src/channels/commands/archive_state.rs @@ -6,7 +6,7 @@ use flume::Sender; use iggy::utils::duration::IggyDuration; use iggy::utils::timestamp::IggyTimestamp; use tokio::time; -use tracing::{error, info, warn}; +use tracing::{error, info, instrument, warn}; pub struct StateArchiver { enabled: bool, @@ -59,6 +59,7 @@ impl StateArchiver { #[async_trait] impl ServerCommand for ArchiveStateExecutor { + #[instrument(skip_all)] async fn execute(&mut self, system: &SharedSystem, command: ArchiveStateCommand) { let system = system.read().await; if system.archiver.is_none() { diff --git a/server/src/channels/commands/clean_personal_access_tokens.rs b/server/src/channels/commands/clean_personal_access_tokens.rs index 2589cecee..bb1b3fb73 100644 --- a/server/src/channels/commands/clean_personal_access_tokens.rs +++ b/server/src/channels/commands/clean_personal_access_tokens.rs @@ -6,7 +6,7 @@ use flume::Sender; use iggy::utils::duration::IggyDuration; use iggy::utils::timestamp::IggyTimestamp; use tokio::time; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, instrument}; pub struct PersonalAccessTokenCleaner { enabled: bool, @@ -60,6 +60,7 @@ impl PersonalAccessTokenCleaner { #[async_trait] impl ServerCommand for CleanPersonalAccessTokensExecutor { + #[instrument(skip_all)] async fn execute(&mut self, system: &SharedSystem, _command: CleanPersonalAccessTokensCommand) { // TODO: System write lock, investigate if it's necessary. let mut system = system.write().await; diff --git a/server/src/channels/commands/maintain_messages.rs b/server/src/channels/commands/maintain_messages.rs index f8089f169..f68630747 100644 --- a/server/src/channels/commands/maintain_messages.rs +++ b/server/src/channels/commands/maintain_messages.rs @@ -12,7 +12,7 @@ use iggy::utils::duration::IggyDuration; use iggy::utils::timestamp::IggyTimestamp; use std::sync::Arc; use tokio::time; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, instrument}; pub struct MessagesMaintainer { cleaner_enabled: bool, @@ -77,6 +77,7 @@ impl MessagesMaintainer { #[async_trait] impl ServerCommand for MaintainMessagesExecutor { + #[instrument(skip_all)] async fn execute(&mut self, system: &SharedSystem, command: MaintainMessagesCommand) { let system = system.read().await; let streams = system.get_streams(); diff --git a/server/src/channels/commands/save_messages.rs b/server/src/channels/commands/save_messages.rs index 88a0c190f..6c489a876 100644 --- a/server/src/channels/commands/save_messages.rs +++ b/server/src/channels/commands/save_messages.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use flume::{Receiver, Sender}; use iggy::utils::duration::IggyDuration; use tokio::time; -use tracing::{error, info, warn}; +use tracing::{error, info, instrument, warn}; pub struct MessagesSaver { enabled: bool, @@ -58,6 +58,7 @@ impl MessagesSaver { #[async_trait] impl ServerCommand for SaveMessagesExecutor { + #[instrument(skip_all)] async fn execute(&mut self, system: &SharedSystem, _command: SaveMessagesCommand) { let saved_messages_count = system.read().await.persist_messages().await; match saved_messages_count { diff --git a/server/src/channels/commands/verify_heartbeats.rs b/server/src/channels/commands/verify_heartbeats.rs index d47ac86ca..930b7f525 100644 --- a/server/src/channels/commands/verify_heartbeats.rs +++ b/server/src/channels/commands/verify_heartbeats.rs @@ -7,7 +7,7 @@ use iggy::locking::IggySharedMutFn; use iggy::utils::duration::IggyDuration; use iggy::utils::timestamp::IggyTimestamp; use tokio::time; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, instrument, warn}; const MAX_THRESHOLD: f64 = 1.2; @@ -65,6 +65,7 @@ impl VerifyHeartbeats { #[async_trait] impl ServerCommand for VerifyHeartbeatsExecutor { + #[instrument(skip_all)] async fn execute(&mut self, system: &SharedSystem, command: VerifyHeartbeatsCommand) { let system = system.read().await; let clients; diff --git a/server/src/configs/config_provider.rs b/server/src/configs/config_provider.rs index a2a95c608..6e28ee134 100644 --- a/server/src/configs/config_provider.rs +++ b/server/src/configs/config_provider.rs @@ -9,7 +9,7 @@ use figment::{ }; use std::{env, path::Path}; use toml::{map::Map, Value as TomlValue}; -use tracing::{debug, info}; +use tracing::debug; const DEFAULT_CONFIG_PROVIDER: &str = "file"; const DEFAULT_CONFIG_PATH: &str = "configs/server.toml"; @@ -197,7 +197,7 @@ impl Provider for CustomEnvProvider { value = "******".to_string(); } - info!("{env_key} value changed to: {value} from environment variable"); + println!("{env_key} value changed to: {value} from environment variable"); Self::insert_overridden_values_from_env( &source_dict, &mut new_dict, @@ -255,7 +255,7 @@ fn file_exists>(path: P) -> bool { #[async_trait] impl ConfigProvider for FileConfigProvider { async fn load_config(&self) -> Result { - info!("Loading config from path: '{}'...", self.path); + println!("Loading config from path: '{}'...", self.path); if !file_exists(&self.path) { return Err(ServerError::CannotLoadConfiguration(format!( @@ -280,8 +280,8 @@ impl ConfigProvider for FileConfigProvider { match config_result { Ok(config) => { - info!("Config loaded from path: '{}'", self.path); - info!("Using Config: {config}"); + println!("Config loaded from path: '{}'", self.path); + println!("Using Config: {config}"); Ok(config) } Err(figment_error) => Err(ServerError::CannotLoadConfiguration(format!( diff --git a/server/src/configs/defaults.rs b/server/src/configs/defaults.rs index 41391963f..688bd3d3e 100644 --- a/server/src/configs/defaults.rs +++ b/server/src/configs/defaults.rs @@ -5,7 +5,8 @@ use crate::configs::quic::{QuicCertificateConfig, QuicConfig}; use crate::configs::server::{ ArchiverConfig, DataMaintenanceConfig, HeartbeatConfig, MessageSaverConfig, MessagesMaintenanceConfig, PersonalAccessTokenCleanerConfig, PersonalAccessTokenConfig, - ServerConfig, StateMaintenanceConfig, + ServerConfig, StateMaintenanceConfig, TelemetryConfig, TelemetryLogsConfig, + TelemetryTracesConfig, }; use crate::configs::system::{ BackupConfig, CacheConfig, CompatibilityConfig, CompressionConfig, EncryptionConfig, @@ -31,6 +32,7 @@ impl Default for ServerConfig { quic: QuicConfig::default(), tcp: TcpConfig::default(), http: HttpConfig::default(), + telemetry: TelemetryConfig::default(), } } } @@ -449,3 +451,32 @@ impl Default for RecoveryConfig { } } } + +impl Default for TelemetryConfig { + fn default() -> TelemetryConfig { + TelemetryConfig { + enabled: SERVER_CONFIG.telemetry.enabled, + service_name: SERVER_CONFIG.telemetry.service_name.parse().unwrap(), + logs: TelemetryLogsConfig::default(), + traces: TelemetryTracesConfig::default(), + } + } +} + +impl Default for TelemetryLogsConfig { + fn default() -> TelemetryLogsConfig { + TelemetryLogsConfig { + transport: SERVER_CONFIG.telemetry.logs.transport.parse().unwrap(), + endpoint: SERVER_CONFIG.telemetry.logs.endpoint.parse().unwrap(), + } + } +} + +impl Default for TelemetryTracesConfig { + fn default() -> TelemetryTracesConfig { + TelemetryTracesConfig { + transport: SERVER_CONFIG.telemetry.traces.transport.parse().unwrap(), + endpoint: SERVER_CONFIG.telemetry.traces.endpoint.parse().unwrap(), + } + } +} diff --git a/server/src/configs/displays.rs b/server/src/configs/displays.rs index 01224a5f3..18818973d 100644 --- a/server/src/configs/displays.rs +++ b/server/src/configs/displays.rs @@ -1,7 +1,8 @@ use crate::configs::quic::{QuicCertificateConfig, QuicConfig}; use crate::configs::server::{ ArchiverConfig, DataMaintenanceConfig, DiskArchiverConfig, HeartbeatConfig, - MessagesMaintenanceConfig, S3ArchiverConfig, StateMaintenanceConfig, + MessagesMaintenanceConfig, S3ArchiverConfig, StateMaintenanceConfig, TelemetryConfig, + TelemetryLogsConfig, TelemetryTracesConfig, }; use crate::configs::system::MessageDeduplicationConfig; use crate::configs::{ @@ -185,8 +186,8 @@ impl Display for ServerConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ data_maintenance: {}, message_saver: {}, heartbeat: {}, system: {}, quic: {}, tcp: {}, http: {} }}", - self.data_maintenance, self.message_saver, self.heartbeat, self.system, self.quic, self.tcp, self.http + "{{ data_maintenance: {}, message_saver: {}, heartbeat: {}, system: {}, quic: {}, tcp: {}, http: {}, telemetry: {} }}", + self.data_maintenance, self.message_saver, self.heartbeat, self.system, self.quic, self.tcp, self.http, self.telemetry ) } } @@ -305,6 +306,36 @@ impl Display for TcpTlsConfig { } } +impl Display for TelemetryConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ enabled: {}, service_name: {}, logs: {}, traces: {} }}", + self.enabled, self.service_name, self.logs, self.traces + ) + } +} + +impl Display for TelemetryLogsConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ transport: {}, endpoint: {} }}", + self.transport, self.endpoint + ) + } +} + +impl Display for TelemetryTracesConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ transport: {}, endpoint: {} }}", + self.transport, self.endpoint + ) + } +} + impl Display for SystemConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( diff --git a/server/src/configs/server.rs b/server/src/configs/server.rs index 0f4ed9e97..0fe24d5da 100644 --- a/server/src/configs/server.rs +++ b/server/src/configs/server.rs @@ -5,11 +5,13 @@ use crate::configs::quic::QuicConfig; use crate::configs::system::SystemConfig; use crate::configs::tcp::TcpConfig; use crate::server_error::ServerError; +use derive_more::Display; use iggy::utils::duration::IggyDuration; use iggy::validatable::Validatable; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use serde_with::DisplayFromStr; +use std::str::FromStr; use std::sync::Arc; #[derive(Debug, Deserialize, Serialize, Clone)] @@ -22,6 +24,7 @@ pub struct ServerConfig { pub quic: QuicConfig, pub tcp: TcpConfig, pub http: HttpConfig, + pub telemetry: TelemetryConfig, } #[serde_as] @@ -104,6 +107,46 @@ pub struct HeartbeatConfig { pub interval: IggyDuration, } +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct TelemetryConfig { + pub enabled: bool, + pub service_name: String, + pub logs: TelemetryLogsConfig, + pub traces: TelemetryTracesConfig, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct TelemetryLogsConfig { + pub transport: TelemetryTransport, + pub endpoint: String, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct TelemetryTracesConfig { + pub transport: TelemetryTransport, + pub endpoint: String, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Display, Copy, Clone)] +#[serde(rename_all = "lowercase")] +pub enum TelemetryTransport { + #[display("grpc")] + GRPC, + #[display("http")] + HTTP, +} + +impl FromStr for TelemetryTransport { + type Err = String; + fn from_str(s: &str) -> Result { + match s { + "grpc" => Ok(TelemetryTransport::GRPC), + "http" => Ok(TelemetryTransport::HTTP), + _ => Err(format!("Invalid telemetry transport: {s}")), + } + } +} + impl ServerConfig { pub async fn load(config_provider: &dyn ConfigProvider) -> Result { let server_config = config_provider.load_config().await?; diff --git a/server/src/configs/validators.rs b/server/src/configs/validators.rs index 75e7baf1a..3f0a1d19f 100644 --- a/server/src/configs/validators.rs +++ b/server/src/configs/validators.rs @@ -2,7 +2,7 @@ extern crate sysinfo; use super::server::{ ArchiverConfig, DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig, - StateMaintenanceConfig, + StateMaintenanceConfig, TelemetryConfig, }; use super::system::CompressionConfig; use crate::archiver::ArchiverKind; @@ -25,6 +25,7 @@ impl Validatable for ServerConfig { self.system.segment.validate()?; self.system.cache.validate()?; self.system.compression.validate()?; + self.telemetry.validate()?; let topic_size = match self.system.topic.max_size { MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()), @@ -75,14 +76,43 @@ impl Validatable for CompressionConfig { } } +impl Validatable for TelemetryConfig { + fn validate(&self) -> Result<(), ServerError> { + if !self.enabled { + return Ok(()); + } + + if self.service_name.trim().is_empty() { + return Err(ServerError::InvalidConfiguration( + "Telemetry service name cannot be empty.".into(), + )); + } + + if self.logs.endpoint.is_empty() { + return Err(ServerError::InvalidConfiguration( + "Telemetry logs endpoint cannot be empty.".into(), + )); + } + + if self.traces.endpoint.is_empty() { + return Err(ServerError::InvalidConfiguration( + "Telemetry traces endpoint cannot be empty.".into(), + )); + } + + Ok(()) + } +} + impl Validatable for CacheConfig { fn validate(&self) -> Result<(), ServerError> { let limit_bytes = self.size.clone().into(); let mut sys = System::new_all(); sys.refresh_all(); - sys.refresh_processes(ProcessesToUpdate::Some(&[ - Pid::from_u32(std::process::id()), - ])); + sys.refresh_processes( + ProcessesToUpdate::Some(&[Pid::from_u32(std::process::id())]), + true, + ); let total_memory = sys.total_memory(); let free_memory = sys.free_memory(); let cache_percentage = (limit_bytes as f64 / total_memory as f64) * 100.0; diff --git a/server/src/http/consumer_groups.rs b/server/src/http/consumer_groups.rs index 40d6fb9bf..2bbc45be0 100644 --- a/server/src/http/consumer_groups.rs +++ b/server/src/http/consumer_groups.rs @@ -14,6 +14,7 @@ use iggy::identifier::Identifier; use iggy::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails}; use iggy::validatable::Validatable; use std::sync::Arc; +use tracing::instrument; pub fn router(state: Arc) -> Router { Router::new() @@ -70,6 +71,7 @@ async fn get_consumer_groups( Ok(Json(consumer_groups)) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_stream_id = stream_id, iggy_topic_id = topic_id))] async fn create_consumer_group( State(state): State>, Extension(identity): Extension, @@ -104,6 +106,7 @@ async fn create_consumer_group( Ok((StatusCode::CREATED, Json(consumer_group_details))) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_stream_id = stream_id, iggy_topic_id = topic_id, iggy_group_id = group_id))] async fn delete_consumer_group( State(state): State>, Extension(identity): Extension, diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs index e29d687e5..6c0769531 100644 --- a/server/src/http/messages.rs +++ b/server/src/http/messages.rs @@ -15,6 +15,7 @@ use iggy::messages::send_messages::SendMessages; use iggy::models::messages::PolledMessages; use iggy::validatable::Validatable; use std::sync::Arc; +use tracing::instrument; pub fn router(state: Arc) -> Router { Router::new() @@ -87,6 +88,7 @@ async fn send_messages( Ok(StatusCode::CREATED) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_stream_id = stream_id, iggy_topic_id = topic_id, iggy_partition_id = partition_id, iggy_fsync = fsync))] async fn flush_unsaved_buffer( State(state): State>, Extension(identity): Extension, diff --git a/server/src/http/partitions.rs b/server/src/http/partitions.rs index 391b2e4b6..8a73a8646 100644 --- a/server/src/http/partitions.rs +++ b/server/src/http/partitions.rs @@ -12,6 +12,7 @@ use iggy::partitions::create_partitions::CreatePartitions; use iggy::partitions::delete_partitions::DeletePartitions; use iggy::validatable::Validatable; use std::sync::Arc; +use tracing::instrument; pub fn router(state: Arc) -> Router { Router::new() @@ -22,6 +23,7 @@ pub fn router(state: Arc) -> Router { .with_state(state) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_stream_id = stream_id, iggy_topic_id = topic_id))] async fn create_partitions( State(state): State>, Extension(identity): Extension, @@ -51,6 +53,7 @@ async fn create_partitions( Ok(StatusCode::CREATED) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_stream_id = stream_id, iggy_topic_id = topic_id))] async fn delete_partitions( State(state): State>, Extension(identity): Extension, diff --git a/server/src/http/personal_access_tokens.rs b/server/src/http/personal_access_tokens.rs index da9f6ccef..6854ec0d9 100644 --- a/server/src/http/personal_access_tokens.rs +++ b/server/src/http/personal_access_tokens.rs @@ -18,6 +18,7 @@ use iggy::personal_access_tokens::delete_personal_access_token::DeletePersonalAc use iggy::personal_access_tokens::login_with_personal_access_token::LoginWithPersonalAccessToken; use iggy::validatable::Validatable; use std::sync::Arc; +use tracing::instrument; pub fn router(state: Arc) -> Router { Router::new() @@ -48,6 +49,7 @@ async fn get_personal_access_tokens( Ok(Json(personal_access_tokens)) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id))] async fn create_personal_access_token( State(state): State>, Extension(identity): Extension, @@ -81,6 +83,7 @@ async fn create_personal_access_token( Ok(Json(RawPersonalAccessToken { token })) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id))] async fn delete_personal_access_token( State(state): State>, Extension(identity): Extension, @@ -107,6 +110,7 @@ async fn delete_personal_access_token( Ok(StatusCode::NO_CONTENT) } +#[instrument(skip_all)] async fn login_with_personal_access_token( State(state): State>, Json(command): Json, diff --git a/server/src/http/streams.rs b/server/src/http/streams.rs index cae198c58..fbd2aad8c 100644 --- a/server/src/http/streams.rs +++ b/server/src/http/streams.rs @@ -17,6 +17,7 @@ use iggy::validatable::Validatable; use crate::state::command::EntryCommand; use std::sync::Arc; +use tracing::instrument; pub fn router(state: Arc) -> Router { Router::new() @@ -59,6 +60,7 @@ async fn get_streams( Ok(Json(streams)) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id))] async fn create_stream( State(state): State>, Extension(identity): Extension, @@ -86,6 +88,7 @@ async fn create_stream( Ok(response) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_stream_id = stream_id))] async fn update_stream( State(state): State>, Extension(identity): Extension, @@ -113,6 +116,7 @@ async fn update_stream( Ok(StatusCode::NO_CONTENT) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_stream_id = stream_id))] async fn delete_stream( State(state): State>, Extension(identity): Extension, @@ -140,6 +144,7 @@ async fn delete_stream( Ok(StatusCode::NO_CONTENT) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_stream_id = stream_id))] async fn purge_stream( State(state): State>, Extension(identity): Extension, diff --git a/server/src/http/topics.rs b/server/src/http/topics.rs index e97f06544..b83c085eb 100644 --- a/server/src/http/topics.rs +++ b/server/src/http/topics.rs @@ -16,6 +16,7 @@ use iggy::topics::purge_topic::PurgeTopic; use iggy::topics::update_topic::UpdateTopic; use iggy::validatable::Validatable; use std::sync::Arc; +use tracing::instrument; pub fn router(state: Arc) -> Router { Router::new() @@ -70,6 +71,7 @@ async fn get_topics( Ok(Json(topics)) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_stream_id = stream_id))] async fn create_topic( State(state): State>, Extension(identity): Extension, @@ -107,6 +109,7 @@ async fn create_topic( Ok(response) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_stream_id = stream_id, iggy_topic_id = topic_id))] async fn update_topic( State(state): State>, Extension(identity): Extension, @@ -142,6 +145,7 @@ async fn update_topic( Ok(StatusCode::NO_CONTENT) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_stream_id = stream_id, iggy_topic_id = topic_id))] async fn delete_topic( State(state): State>, Extension(identity): Extension, @@ -174,6 +178,7 @@ async fn delete_topic( Ok(StatusCode::NO_CONTENT) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_stream_id = stream_id, iggy_topic_id = topic_id))] async fn purge_topic( State(state): State>, Extension(identity): Extension, diff --git a/server/src/http/users.rs b/server/src/http/users.rs index 5f5bec984..5a87a251a 100644 --- a/server/src/http/users.rs +++ b/server/src/http/users.rs @@ -22,6 +22,7 @@ use iggy::users::update_user::UpdateUser; use iggy::validatable::Validatable; use serde::Deserialize; use std::sync::Arc; +use tracing::instrument; pub fn router(state: Arc) -> Router { Router::new() @@ -69,6 +70,7 @@ async fn get_users( Ok(Json(users)) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id))] async fn create_user( State(state): State>, Extension(identity): Extension, @@ -108,6 +110,7 @@ async fn create_user( Ok(response) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_updated_user_id = user_id))] async fn update_user( State(state): State>, Extension(identity): Extension, @@ -136,6 +139,7 @@ async fn update_user( Ok(StatusCode::NO_CONTENT) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_updated_user_id = user_id))] async fn update_permissions( State(state): State>, Extension(identity): Extension, @@ -163,6 +167,7 @@ async fn update_permissions( Ok(StatusCode::NO_CONTENT) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_updated_user_id = user_id))] async fn change_password( State(state): State>, Extension(identity): Extension, @@ -199,6 +204,7 @@ async fn change_password( Ok(StatusCode::NO_CONTENT) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id, iggy_deleted_user_id = user_id))] async fn delete_user( State(state): State>, Extension(identity): Extension, @@ -226,6 +232,7 @@ async fn delete_user( Ok(StatusCode::NO_CONTENT) } +#[instrument(skip_all)] async fn login_user( State(state): State>, Json(command): Json, @@ -239,6 +246,7 @@ async fn login_user( Ok(Json(map_generated_access_token_to_identity_info(tokens))) } +#[instrument(skip_all, fields(iggy_user_id = identity.user_id))] async fn logout_user( State(state): State>, Extension(identity): Extension, diff --git a/server/src/log/logger.rs b/server/src/log/logger.rs index 7a582b268..b9605d415 100644 --- a/server/src/log/logger.rs +++ b/server/src/log/logger.rs @@ -1,17 +1,30 @@ +use crate::configs::server::{TelemetryConfig, TelemetryTransport}; use crate::configs::system::LoggingConfig; use crate::server_error::ServerError; +use opentelemetry::logs::LoggerProvider as _; +use opentelemetry::trace::TracerProvider as _; +use opentelemetry::{global, KeyValue}; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use opentelemetry_sdk::runtime::Tokio; +use opentelemetry_sdk::Resource; use std::io::{self, Write}; use std::path::PathBuf; use std::str::FromStr; use std::sync::{Arc, Mutex}; use tracing::{event, info, trace, Level}; use tracing_appender::non_blocking::WorkerGuard; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{ - filter::LevelFilter, fmt, fmt::format::Format, fmt::MakeWriter, prelude::*, reload, - reload::Handle, Layer, Registry, + filter::LevelFilter, fmt, fmt::format::Format, fmt::MakeWriter, reload, reload::Handle, + EnvFilter, Layer, Registry, }; const IGGY_LOG_FILE_PREFIX: &str = "iggy-server.log"; +const VERSION: &str = env!("CARGO_PKG_VERSION"); // Writer that does nothing struct NullWriter; @@ -88,10 +101,12 @@ pub struct Logging { filtering_file_reload_handle: Option, early_logs_buffer: Arc>>, + + telemetry_config: TelemetryConfig, } impl Logging { - pub fn new() -> Self { + pub fn new(telemetry_config: TelemetryConfig) -> Self { Self { stdout_guard: None, stdout_reload_handle: None, @@ -100,6 +115,7 @@ impl Logging { filtering_stdout_reload_handle: None, filtering_file_reload_handle: None, early_logs_buffer: Arc::new(Mutex::new(vec![])), + telemetry_config, } } @@ -136,26 +152,102 @@ impl Logging { self.file_reload_handle = Some(file_layer_reload_handle); layers.push(file_layer.and_then(filtering_file_layer)); - let subscriber = tracing_subscriber::registry().with(layers); - - tracing::subscriber::set_global_default(subscriber) - .expect("Setting global default subscriber failed"); - - if option_env!("IGGY_CI_BUILD") == Some("true") { - let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"); - let hash = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown"); - let built_at = option_env!("VERGEN_BUILD_TIMESTAMP").unwrap_or("unknown"); - let rust_version = option_env!("VERGEN_RUSTC_SEMVER").unwrap_or("unknown"); - let target = option_env!("VERGEN_CARGO_TARGET_TRIPLE").unwrap_or("unknown"); - info!( - "Version: {}, hash: {}, built at: {} using rust version: {} for target: {}", - version, hash, built_at, rust_version, target - ); - } else { - info!("It seems that you are a developer. Environment variable IGGY_CI_BUILD is not set to 'true', skipping build info print.") + if !self.telemetry_config.enabled { + // This is moment when we can start logging something and not worry about losing it. + Registry::default() + .with(layers) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); + Self::print_build_info(); + return; } - // This is moment when we can start logging something and not worry about losing it. + let service_name = self.telemetry_config.service_name.to_owned(); + let resource = Resource::new(vec![ + KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_NAME, + service_name.to_owned(), + ), + KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_VERSION, + VERSION, + ), + ]); + + let logger_provider = match self.telemetry_config.logs.transport { + TelemetryTransport::GRPC => opentelemetry_otlp::new_pipeline() + .logging() + .with_resource(resource.clone()) + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(self.telemetry_config.logs.endpoint.clone()), + ) + .install_batch(Tokio) + .expect("Failed to initialize gRPC logger."), + TelemetryTransport::HTTP => opentelemetry_otlp::new_pipeline() + .logging() + .with_resource(resource.clone()) + .with_exporter( + opentelemetry_otlp::new_exporter() + .http() + .with_http_client(reqwest::Client::new()) + .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) + .with_endpoint(self.telemetry_config.logs.endpoint.clone()), + ) + .install_batch(Tokio) + .expect("Failed to initialize HTTP logger."), + }; + + let logger = logger_provider + .logger_builder(service_name.to_owned()) + .with_version(VERSION) + .build(); + + let tracer_provider = match self.telemetry_config.traces.transport { + TelemetryTransport::GRPC => opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(self.telemetry_config.traces.endpoint.clone()), + ) + .with_trace_config( + opentelemetry_sdk::trace::Config::default().with_resource(resource), + ) + .install_batch(Tokio) + .expect("Failed to initialize gRPC tracer."), + TelemetryTransport::HTTP => opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter() + .http() + .with_http_client(reqwest::Client::new()) + .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) + .with_endpoint(self.telemetry_config.traces.endpoint.clone()), + ) + .with_trace_config( + opentelemetry_sdk::trace::Config::default().with_resource(resource), + ) + .install_batch(Tokio) + .expect("Failed to initialize HTTP tracer."), + }; + + let tracer = tracer_provider + .tracer_builder(service_name.to_owned()) + .with_version(VERSION) + .build(); + global::set_tracer_provider(tracer_provider.clone()); + global::set_text_map_propagator(TraceContextPropagator::new()); + global::shutdown_tracer_provider(); + + Registry::default() + .with(layers) + .with(OpenTelemetryTracingBridge::new(logger.provider())) + .with(OpenTelemetryLayer::new(tracer)) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); + Self::print_build_info(); } pub fn late_init( @@ -278,10 +370,25 @@ impl Logging { fn _install_log_rotation_handler(&self) { todo!("Implement log rotation handler based on size and retention time"); } + + fn print_build_info() { + if option_env!("IGGY_CI_BUILD") == Some("true") { + let hash = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown"); + let built_at = option_env!("VERGEN_BUILD_TIMESTAMP").unwrap_or("unknown"); + let rust_version = option_env!("VERGEN_RUSTC_SEMVER").unwrap_or("unknown"); + let target = option_env!("VERGEN_CARGO_TARGET_TRIPLE").unwrap_or("unknown"); + info!( + "Version: {VERSION}, hash: {}, built at: {} using rust version: {} for target: {}", + hash, built_at, rust_version, target + ); + } else { + info!("It seems that you are a developer. Environment variable IGGY_CI_BUILD is not set to 'true', skipping build info print.") + } + } } impl Default for Logging { fn default() -> Self { - Self::new() + Self::new(TelemetryConfig::default()) } } diff --git a/server/src/log/tokio_console.rs b/server/src/log/tokio_console.rs index 3163bbce5..621f4b4e2 100644 --- a/server/src/log/tokio_console.rs +++ b/server/src/log/tokio_console.rs @@ -1,3 +1,4 @@ +use crate::configs::server::TelemetryConfig; use crate::configs::system::LoggingConfig; use crate::server_error::ServerError; use tracing_subscriber::prelude::*; @@ -5,7 +6,7 @@ use tracing_subscriber::prelude::*; pub struct Logging {} impl Logging { - pub fn new() -> Self { + pub fn new(_: TelemetryConfig) -> Self { Self {} } @@ -29,6 +30,6 @@ impl Logging { impl Default for Logging { fn default() -> Self { - Self::new() + Self::new(TelemetryConfig::default()) } } diff --git a/server/src/main.rs b/server/src/main.rs index 55cbcbbcb..54496e2b9 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -21,25 +21,25 @@ use server::server_error::ServerError; use server::streaming::systems::system::{SharedSystem, System}; use server::tcp::tcp_server; use tokio::time::Instant; -use tracing::info; +use tracing::{info, instrument}; #[tokio::main] +#[instrument(skip_all)] async fn main() -> Result<(), ServerError> { let startup_timestamp = Instant::now(); let standard_font = FIGfont::standard().unwrap(); let figure = standard_font.convert("Iggy Server"); println!("{}", figure.unwrap()); - let mut logging = Logging::new(); - logging.early_init(); - - // From this point on, we can use tracing macros to log messages. - let args = Args::parse(); - let config_provider = config_provider::resolve(&args.config_provider)?; let config = ServerConfig::load(config_provider.as_ref()).await?; + let mut logging = Logging::new(config.telemetry.clone()); + logging.early_init(); + + // From this point on, we can use tracing macros to log messages. + logging.late_init(config.system.get_system_path(), &config.system.logging)?; let system = SharedSystem::new(System::new( diff --git a/server/src/streaming/systems/stats.rs b/server/src/streaming/systems/stats.rs index 0273e2316..20475f7e7 100644 --- a/server/src/streaming/systems/stats.rs +++ b/server/src/streaming/systems/stats.rs @@ -28,7 +28,7 @@ impl System { let process_id = std::process::id(); sys.refresh_cpu_all(); sys.refresh_memory(); - sys.refresh_processes(ProcessesToUpdate::Some(&[Pid::from_u32(process_id)])); + sys.refresh_processes(ProcessesToUpdate::Some(&[Pid::from_u32(process_id)]), true); let total_cpu_usage = sys.global_cpu_usage(); let total_memory = sys.total_memory().into(); diff --git a/server/src/streaming/systems/system.rs b/server/src/streaming/systems/system.rs index f53d9ef8f..72df3f4c9 100644 --- a/server/src/streaming/systems/system.rs +++ b/server/src/streaming/systems/system.rs @@ -15,7 +15,7 @@ use std::path::Path; use std::sync::Arc; use tokio::fs::{create_dir, remove_dir_all}; use tokio::time::Instant; -use tracing::{info, trace}; +use tracing::{info, instrument, trace}; use crate::archiver::disk::DiskArchiver; use crate::archiver::s3::S3Archiver; @@ -174,6 +174,7 @@ impl System { } } + #[instrument(skip_all)] pub async fn init(&mut self) -> Result<(), IggyError> { let system_path = self.config.get_system_path(); if !Path::new(&system_path).exists() && create_dir(&system_path).await.is_err() { @@ -231,11 +232,13 @@ impl System { Ok(()) } + #[instrument(skip_all)] pub async fn shutdown(&mut self) -> Result<(), IggyError> { self.persist_messages().await?; Ok(()) } + #[instrument(skip_all)] pub async fn persist_messages(&self) -> Result { trace!("Saving buffered messages on disk..."); let mut saved_messages_number = 0; diff --git a/tools/Cargo.toml b/tools/Cargo.toml index 1d976fc9a..4a90da87b 100644 --- a/tools/Cargo.toml +++ b/tools/Cargo.toml @@ -14,4 +14,4 @@ iggy = { path = "../sdk" } rand = "0.8.5" tokio = { version = "1.40.0", features = ["full"] } tracing = { version = "0.1.37" } -tracing-subscriber = { version = "0.3.16" } +tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] } diff --git a/tools/src/data-seeder/main.rs b/tools/src/data-seeder/main.rs index 64070fdb2..777922032 100644 --- a/tools/src/data-seeder/main.rs +++ b/tools/src/data-seeder/main.rs @@ -11,6 +11,9 @@ use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor}; use std::error::Error; use std::sync::Arc; use tracing::info; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Registry}; #[derive(Parser, Debug, Clone)] #[command(author, version, about, long_about = None)] @@ -30,7 +33,10 @@ async fn main() -> Result<(), Box> { let args = DataSeederArgs::parse(); let iggy_args = Args::from(vec![args.iggy.clone()]); - tracing_subscriber::fmt::init(); + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); let encryptor: Option> = match iggy_args.encryption_key.is_empty() { true => None, false => Some(Arc::new(