Skip to content

Commit

Permalink
Improve internal opentelemetry logging - directly using tracing mcros (
Browse files Browse the repository at this point in the history
…#2152)

Co-authored-by: Cijo Thomas <[email protected]>
  • Loading branch information
lalitb and cijothomas authored Sep 30, 2024
1 parent 8bdd189 commit 81a95e3
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 10 deletions.
4 changes: 2 additions & 2 deletions examples/self-diagnostics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ publish = false

[dependencies]
opentelemetry = { path = "../../opentelemetry" }
opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["rt-tokio"]}
opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["rt-tokio", "experimental-internal-logs"]}
opentelemetry-stdout = { path = "../../opentelemetry-stdout"}
opentelemetry-appender-tracing = { path = "../../opentelemetry-appender-tracing"}
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true, features = ["std"]}
tracing-core = { workspace = true }
tracing-subscriber = { version = "0.3.18", features = ["env-filter","registry", "std"]}
opentelemetry-otlp = { path = "../../opentelemetry-otlp", features = ["http-proto", "reqwest-client", "logs"] }
opentelemetry-otlp = { path = "../../opentelemetry-otlp", features = ["http-proto", "reqwest-client", "logs", "experimental-internal-logs"] }
once_cell ={ version = "1.19.0"}
ctrlc = "3.4"
35 changes: 30 additions & 5 deletions examples/self-diagnostics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use opentelemetry::global::{self, set_error_handler, Error as OtelError};
use opentelemetry::KeyValue;
use opentelemetry_appender_tracing::layer;
use opentelemetry_otlp::WithExportConfig;
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
use tracing_subscriber::fmt;
use tracing_subscriber::prelude::*;
use tracing_subscriber::EnvFilter;

use std::error::Error;
use tracing::error;
Expand Down Expand Up @@ -59,6 +60,7 @@ fn init_logger_provider() -> opentelemetry_sdk::logs::LoggerProvider {
)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.unwrap();
let cloned_provider = provider.clone();

// Add a tracing filter to filter events from crates used by opentelemetry-otlp.
// The filter levels are set as follows:
Expand All @@ -72,11 +74,34 @@ fn init_logger_provider() -> opentelemetry_sdk::logs::LoggerProvider {
.add_directive("hyper=error".parse().unwrap())
.add_directive("tonic=error".parse().unwrap())
.add_directive("reqwest=error".parse().unwrap());
let cloned_provider = provider.clone();
let layer = layer::OpenTelemetryTracingBridge::new(&cloned_provider);

// Configuring the formatting layer specifically for OpenTelemetry internal logs.
// These logs starts with "opentelemetry" prefix in target. This allows specific logs
// from the OpenTelemetry-related components to be filtered and handled separately
// from the application logs

let opentelemetry_filter = tracing_subscriber::filter::filter_fn(|metadata| {
metadata.target().starts_with("opentelemetry")
});

let fmt_opentelemetry_layer = fmt::layer()
.with_filter(LevelFilter::DEBUG)
.with_filter(opentelemetry_filter);

// Configures the appender tracing layer, filtering out OpenTelemetry internal logs
// to prevent infinite logging loops.

let non_opentelemetry_filter = tracing_subscriber::filter::filter_fn(|metadata| {
!metadata.target().starts_with("opentelemetry")
});

let otel_layer = layer::OpenTelemetryTracingBridge::new(&cloned_provider)
.with_filter(non_opentelemetry_filter.clone());

tracing_subscriber::registry()
.with(filter)
.with(layer)
.with(fmt_opentelemetry_layer)
.with(fmt::layer().with_filter(filter))
.with(otel_layer)
.init();
provider
}
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ opentelemetry = { version = "0.25", default-features = false, path = "../opentel
opentelemetry_sdk = { version = "0.25", default-features = false, path = "../opentelemetry-sdk" }
opentelemetry-http = { version = "0.25", path = "../opentelemetry-http", optional = true }
opentelemetry-proto = { version = "0.25", path = "../opentelemetry-proto", default-features = false }
tracing = {workspace = true, optional = true}

prost = { workspace = true, optional = true }
tonic = { workspace = true, optional = true }
Expand All @@ -57,6 +58,7 @@ trace = ["opentelemetry/trace", "opentelemetry_sdk/trace", "opentelemetry-proto/
metrics = ["opentelemetry/metrics", "opentelemetry_sdk/metrics", "opentelemetry-proto/metrics"]
logs = ["opentelemetry/logs", "opentelemetry_sdk/logs", "opentelemetry-proto/logs"]
populate-logs-event-name = ["opentelemetry-proto/populate-logs-event-name"]
experimental-internal-logs = ["tracing"]

# add ons
serialize = ["serde", "serde_json"]
Expand Down
24 changes: 23 additions & 1 deletion opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,29 @@ impl TemporalitySelector for MetricsExporter {
#[async_trait]
impl PushMetricsExporter for MetricsExporter {
async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> {
self.client.export(metrics).await
#[cfg(feature = "experimental-internal-logs")]
tracing::debug!(
name = "export_metrics",
target = "opentelemetry-otlp",
metrics_count = metrics
.scope_metrics
.iter()
.map(|scope| scope.metrics.len())
.sum::<usize>(),
status = "started"
);
let result = self.client.export(metrics).await;
#[cfg(feature = "experimental-internal-logs")]
tracing::debug!(
name = "export_metrics",
target = "opentelemetry-otlp",
status = if result.is_ok() {
"completed"
} else {
"failed"
}
);
result
}

async fn force_flush(&self) -> Result<()> {
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ url = { workspace = true, optional = true }
tokio = { workspace = true, features = ["rt", "time"], optional = true }
tokio-stream = { workspace = true, optional = true }
http = { workspace = true, optional = true }
tracing = {workspace = true, optional = true}

[package.metadata.docs.rs]
all-features = true
Expand All @@ -51,6 +52,7 @@ testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std",
rt-tokio = ["tokio", "tokio-stream"]
rt-tokio-current-thread = ["tokio", "tokio-stream"]
rt-async-std = ["async-std"]
experimental-internal-logs = ["tracing"]

[[bench]]
name = "context"
Expand Down
13 changes: 13 additions & 0 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ impl LogProcessor for SimpleLogProcessor {
return;
}

#[cfg(feature = "experimental-internal-logs")]
tracing::debug!(
name: "simple_log_processor_emit",
target: "opentelemetry-sdk",
event_name = record.event_name
);

let result = self
.exporter
.lock()
Expand Down Expand Up @@ -217,6 +224,12 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
// Log has finished, add to buffer of pending logs.
BatchMessage::ExportLog(log) => {
logs.push(log);
#[cfg(feature = "experimental-internal-logs")]
tracing::debug!(
name: "batch_log_processor_record_count",
target: "opentelemetry-sdk",
current_batch_size = logs.len()
);

if logs.len() == config.max_export_batch_size {
let result = export_with_timeout(
Expand Down
29 changes: 27 additions & 2 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ struct PeriodicReaderWorker<RT: Runtime> {

impl<RT: Runtime> PeriodicReaderWorker<RT> {
async fn collect_and_export(&mut self) -> Result<()> {
#[cfg(feature = "experimental-internal-logs")]
tracing::debug!(name: "metrics_collect_and_export", target: "opentelemetry-sdk", status = "started");
self.reader.collect(&mut self.rm)?;
if self.rm.scope_metrics.is_empty() {
// No metrics to export.
Expand All @@ -246,25 +248,48 @@ impl<RT: Runtime> PeriodicReaderWorker<RT> {
pin_mut!(timeout);

match future::select(export, timeout).await {
Either::Left((res, _)) => res, // return the status of export.
Either::Right(_) => Err(MetricsError::Other("export timed out".into())),
Either::Left((res, _)) => {
#[cfg(feature = "experimental-internal-logs")]
tracing::debug!(
name: "collect_and_export",
target: "opentelemetry-sdk",
status = "completed",
result = ?res
);
res // return the status of export.
}
Either::Right(_) => {
#[cfg(feature = "experimental-internal-logs")]
tracing::error!(
name = "collect_and_export",
target = "opentelemetry-sdk",
status = "timed_out"
);
Err(MetricsError::Other("export timed out".into()))
}
}
}

async fn process_message(&mut self, message: Message) -> bool {
match message {
Message::Export => {
#[cfg(feature = "experimental-internal-logs")]
tracing::debug!(name: "process_message", target: "opentelemetry-sdk", message_type = "export");
if let Err(err) = self.collect_and_export().await {
global::handle_error(err)
}
}
Message::Flush(ch) => {
#[cfg(feature = "experimental-internal-logs")]
tracing::debug!(name: "process_message", target: "opentelemetry-sdk", message_type = "flush");
let res = self.collect_and_export().await;
if ch.send(res).is_err() {
global::handle_error(MetricsError::Other("flush channel closed".into()))
}
}
Message::Shutdown(ch) => {
#[cfg(feature = "experimental-internal-logs")]
tracing::debug!(name: "process_message", target: "opentelemetry-sdk", message_type = "shutdown");
let res = self.collect_and_export().await;
let _ = self.reader.exporter.shutdown();
if ch.send(res).is_err() {
Expand Down

0 comments on commit 81a95e3

Please sign in to comment.