diff --git a/examples/aws-xray/src/client.rs b/examples/aws-xray/src/client.rs index 723c283940..533fa8192f 100644 --- a/examples/aws-xray/src/client.rs +++ b/examples/aws-xray/src/client.rs @@ -16,11 +16,11 @@ fn init_tracer() -> (sdktrace::Tracer, stdout::Uninstall) { // For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces. In a production // application, use `Sampler::ParentBased` or `Sampler::TraceIdRatioBased` with a desired ratio. stdout::new_pipeline() - .with_trace_config(sdktrace::Config { - default_sampler: Box::new(sdktrace::Sampler::AlwaysOn), - id_generator: Box::new(sdktrace::XrayIdGenerator::default()), - ..Default::default() - }) + .with_trace_config( + sdktrace::config() + .with_default_sampler(sdktrace::Sampler::AlwaysOn) + .with_id_generator(sdktrace::XrayIdGenerator::default()), + ) .install() } diff --git a/examples/aws-xray/src/server.rs b/examples/aws-xray/src/server.rs index ffea82c0e9..235688f8d5 100644 --- a/examples/aws-xray/src/server.rs +++ b/examples/aws-xray/src/server.rs @@ -37,11 +37,11 @@ fn init_tracer() -> (sdktrace::Tracer, stdout::Uninstall) { // For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces. In a production // application, use `Sampler::ParentBased` or `Sampler::TraceIdRatioBased` with a desired ratio. stdout::new_pipeline() - .with_trace_config(sdktrace::Config { - default_sampler: Box::new(sdktrace::Sampler::AlwaysOn), - id_generator: Box::new(sdktrace::XrayIdGenerator::default()), - ..Default::default() - }) + .with_trace_config( + sdktrace::config() + .with_default_sampler(sdktrace::Sampler::AlwaysOn) + .with_id_generator(sdktrace::XrayIdGenerator::default()), + ) .install() } diff --git a/examples/http/src/client.rs b/examples/http/src/client.rs index a8832829ff..5b0ce2c3fb 100644 --- a/examples/http/src/client.rs +++ b/examples/http/src/client.rs @@ -3,7 +3,7 @@ use opentelemetry::global; use opentelemetry::sdk::export::trace::stdout; use opentelemetry::sdk::{ propagation::TraceContextPropagator, - trace::{Config, Sampler}, + trace::{self, Sampler}, }; use opentelemetry::{ trace::{TraceContextExt, Tracer}, @@ -17,10 +17,7 @@ fn init_tracer() -> (impl Tracer, stdout::Uninstall) { // For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces. In a production // application, use `Sampler::ParentBased` or `Sampler::TraceIdRatioBased` with a desired ratio. stdout::new_pipeline() - .with_trace_config(Config { - default_sampler: Box::new(Sampler::AlwaysOn), - ..Default::default() - }) + .with_trace_config(trace::config().with_default_sampler(Sampler::AlwaysOn)) .install() } diff --git a/examples/http/src/server.rs b/examples/http/src/server.rs index 6d3a483b69..4051ca596b 100644 --- a/examples/http/src/server.rs +++ b/examples/http/src/server.rs @@ -5,7 +5,7 @@ use opentelemetry::{ sdk::export::trace::stdout, sdk::{ propagation::TraceContextPropagator, - trace::{Config, Sampler}, + trace::{self, Sampler}, }, trace::{Span, Tracer}, }; @@ -29,10 +29,7 @@ fn init_tracer() -> (impl Tracer, stdout::Uninstall) { // For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces. In a production // application, use `Sampler::ParentBased` or `Sampler::TraceIdRatioBased` with a desired ratio. stdout::new_pipeline() - .with_trace_config(Config { - default_sampler: Box::new(Sampler::AlwaysOn), - ..Default::default() - }) + .with_trace_config(trace::config().with_default_sampler(Sampler::AlwaysOn)) .install() } diff --git a/opentelemetry-jaeger/README.md b/opentelemetry-jaeger/README.md index 58f56bc37e..d5d683117b 100644 --- a/opentelemetry-jaeger/README.md +++ b/opentelemetry-jaeger/README.md @@ -77,30 +77,12 @@ opentelemetry-jaeger = { version = "*", features = ["tokio"] } ### Jaeger Exporter From Environment Variables -The jaeger pipeline builder can be configured dynamically via the [`from_env`] -method. All variables are optional, a full list of accepted options can be found -in the [jaeger variables spec]. +The jaeger pipeline builder can be configured dynamically via environment +variables. All variables are optional, a full list of accepted options can be +found in the [jaeger variables spec]. -[`from_env`]: https://docs.rs/opentelemetry-jaeger/latest/opentelemetry_jaeger/struct.PipelineBuilder.html#method.from_env [jaeger variables spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#jaeger-exporter -```rust -use opentelemetry::global; -use opentelemetry::trace::Tracer; - -fn main() -> Result<(), Box> { - global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); - // export OTEL_SERVICE_NAME=my-service-name - let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline().from_env().install()?; - - tracer.in_span("doing_work", |cx| { - // Traced app logic here... - }); - - Ok(()) -} -``` - ### Jaeger Collector Example If you want to skip the agent and submit spans directly to a Jaeger collector, @@ -163,7 +145,6 @@ use opentelemetry::KeyValue; fn main() -> Result<(), Box> { global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline() - .from_env() .with_agent_endpoint("localhost:6831") .with_service_name("my_app") .with_tags(vec![KeyValue::new("process_key", "process_value")]) diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs index 2f7c4d1d08..de547a4823 100644 --- a/opentelemetry-jaeger/src/exporter/mod.rs +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -135,7 +135,7 @@ pub struct PipelineBuilder { impl Default for PipelineBuilder { /// Return the default Exporter Builder. fn default() -> Self { - PipelineBuilder { + let builder_defaults = PipelineBuilder { agent_endpoint: vec![DEFAULT_AGENT_ENDPOINT.parse().unwrap()], #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] collector_endpoint: None, @@ -151,21 +151,14 @@ impl Default for PipelineBuilder { tags: Vec::new(), }, config: None, - } + }; + + // Override above defaults with env vars if set + env::assign_attrs(builder_defaults) } } impl PipelineBuilder { - /// Assign builder attributes from environment variables. - /// - /// See the [jaeger variable spec] for full list. - /// - /// [jaeger variable spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#jaeger-exporter - #[allow(clippy::wrong_self_convention)] - pub fn from_env(self) -> Self { - env::assign_attrs(self) - } - /// Assign the agent endpoint. pub fn with_agent_endpoint(self, agent_endpoint: T) -> Self { PipelineBuilder { diff --git a/opentelemetry-jaeger/src/lib.rs b/opentelemetry-jaeger/src/lib.rs index 569829ea67..9673991fbe 100644 --- a/opentelemetry-jaeger/src/lib.rs +++ b/opentelemetry-jaeger/src/lib.rs @@ -53,30 +53,12 @@ //! //! ### Jaeger Exporter From Environment Variables //! -//! The jaeger pipeline builder can be configured dynamically via the -//! [`from_env`] method. All variables are optinal, a full list of accepted -//! options can be found in the [jaeger variables spec]. +//! The jaeger pipeline builder can be configured dynamically via environment +//! variables. All variables are optional, a full list of accepted options can +//! be found in the [jaeger variables spec]. //! -//! [`from_env`]: struct.PipelineBuilder.html#method.from_env //! [jaeger variables spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#jaeger-exporter //! -//! ```no_run -//! use opentelemetry::trace::{Tracer, TraceError}; -//! use opentelemetry::global; -//! -//! fn main() -> Result<(), TraceError> { -//! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); -//! // export OTEL_SERVICE_NAME=my-service-name -//! let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline().from_env().install()?; -//! -//! tracer.in_span("doing_work", |cx| { -//! // Traced app logic here... -//! }); -//! -//! Ok(()) -//! } -//! ``` -//! //! ### Jaeger Collector Example //! //! If you want to skip the agent and submit spans directly to a Jaeger collector, @@ -129,7 +111,6 @@ //! fn main() -> Result<(), TraceError> { //! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); //! let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline() -//! .from_env() //! .with_agent_endpoint("localhost:6831") //! .with_service_name("my_app") //! .with_tags(vec![KeyValue::new("process_key", "process_value")]) diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index c718307e33..3cd6ca0804 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -394,7 +394,7 @@ mod tests { }; #[test] - fn test_pipeline_builder_from_otlp_env() { + fn test_pipeline_builder_from_env() { std::env::set_var(OTEL_EXPORTER_OTLP_ENDPOINT, "https://otlp_endpoint:4317"); std::env::set_var(OTEL_EXPORTER_OTLP_TIMEOUT, "bad_timeout"); @@ -416,10 +416,8 @@ mod tests { std::env::remove_var(OTEL_EXPORTER_OTLP_TIMEOUT); assert!(std::env::var(OTEL_EXPORTER_OTLP_ENDPOINT).is_err()); assert!(std::env::var(OTEL_EXPORTER_OTLP_TIMEOUT).is_err()); - } - #[test] - fn test_pipeline_builder_from_otlp_traces_env() { + // test from traces env var std::env::set_var( OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "https://otlp_traces_endpoint:4317", diff --git a/opentelemetry-semantic-conventions/src/resource.rs b/opentelemetry-semantic-conventions/src/resource.rs index be8abcd6a4..98d67b9f15 100644 --- a/opentelemetry-semantic-conventions/src/resource.rs +++ b/opentelemetry-semantic-conventions/src/resource.rs @@ -10,16 +10,12 @@ //! ```rust,no_run //! use opentelemetry::sdk; //! use opentelemetry_semantic_conventions as semcov; -//! use std::sync::Arc; //! //! let _tracer = opentelemetry::sdk::export::trace::stdout::new_pipeline() -//! .with_trace_config(sdk::trace::Config { -//! resource: Arc::new(sdk::Resource::new(vec![ -//! semcov::resource::SERVICE_NAME.string("my-service"), -//! semcov::resource::SERVICE_NAMESPACE.string("my-namespace"), -//! ])), -//! ..sdk::trace::Config::default() -//! }) +//! .with_trace_config(sdk::trace::config().with_resource(sdk::Resource::new(vec![ +//! semcov::resource::SERVICE_NAME.string("my-service"), +//! semcov::resource::SERVICE_NAMESPACE.string("my-namespace"), +//! ]))) //! .install(); //! ``` diff --git a/opentelemetry/src/sdk/resource.rs b/opentelemetry/src/sdk/resource.rs index e0be2b5418..0f1d93da4f 100644 --- a/opentelemetry/src/sdk/resource.rs +++ b/opentelemetry/src/sdk/resource.rs @@ -15,6 +15,7 @@ //! [`TracerProvider`]: ../../api/trace/provider/trait.TracerProvider.html #[cfg(feature = "metrics")] use crate::labels; +use crate::sdk::EnvResourceDetector; use crate::{Key, KeyValue, Value}; #[cfg(feature = "serialize")] use serde::{Deserialize, Serialize}; @@ -25,18 +26,34 @@ use std::time::Duration; /// /// Items are sorted by their key, and are only overwritten if the value is an empty string. #[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))] -#[derive(Clone, Debug, Default, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct Resource { attrs: BTreeMap, } +impl Default for Resource { + fn default() -> Self { + Self::from_detectors( + Duration::from_secs(0), + vec![Box::new(EnvResourceDetector::new())], + ) + } +} + impl Resource { + /// Creates an empty resource. + pub fn empty() -> Self { + Self { + attrs: Default::default(), + } + } + /// Create a new `Resource` from key value pairs. /// /// Values are de-duplicated by key, and the first key-value pair with a non-empty string value /// will be retained pub fn new>(kvs: T) -> Self { - let mut resource = Resource::default(); + let mut resource = Resource::empty(); for kv in kvs.into_iter() { resource.insert(kv); @@ -49,7 +66,7 @@ impl Resource { /// /// timeout will be applied to each detector. pub fn from_detectors(timeout: Duration, detectors: Vec>) -> Self { - let mut resource = Resource::default(); + let mut resource = Resource::empty(); for detector in detectors { let detected_res = detector.detect(timeout); for (key, value) in detected_res.into_iter() { @@ -72,7 +89,7 @@ impl Resource { return self.clone(); } - let mut resource = Resource::default(); + let mut resource = Resource::empty(); // attrs from self must be added first so they have priority for (k, v) in self.attrs.iter() { diff --git a/opentelemetry/src/sdk/trace/config.rs b/opentelemetry/src/sdk/trace/config.rs index cb04efda30..e405eca5f9 100644 --- a/opentelemetry/src/sdk/trace/config.rs +++ b/opentelemetry/src/sdk/trace/config.rs @@ -3,6 +3,8 @@ //! Configuration represents the global tracing configuration, overrides //! can be set for the default OpenTelemetry limits and Sampler. use crate::{sdk, sdk::trace::Sampler, trace::IdGenerator}; +use std::env; +use std::str::FromStr; use std::sync::Arc; /// Default trace configuration @@ -71,13 +73,36 @@ impl Config { impl Default for Config { /// Create default global sdk configuration. fn default() -> Self { - Config { + let mut config = Config { default_sampler: Box::new(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))), id_generator: Box::new(sdk::trace::IdGenerator::default()), max_events_per_span: 128, - max_attributes_per_span: 32, - max_links_per_span: 32, + max_attributes_per_span: 128, + max_links_per_span: 128, resource: Arc::new(sdk::Resource::default()), + }; + + if let Some(max_attributes_per_span) = env::var("OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT") + .ok() + .and_then(|count_limit| u32::from_str(&count_limit).ok()) + { + config.max_attributes_per_span = max_attributes_per_span; + } + + if let Some(max_events_per_span) = env::var("OTEL_SPAN_EVENT_COUNT_LIMIT") + .ok() + .and_then(|max_events| u32::from_str(&max_events).ok()) + { + config.max_events_per_span = max_events_per_span; } + + if let Some(max_links_per_span) = env::var("OTEL_SPAN_LINK_COUNT_LIMIT") + .ok() + .and_then(|max_links| u32::from_str(&max_links).ok()) + { + config.max_links_per_span = max_links_per_span; + } + + config } } diff --git a/opentelemetry/src/sdk/trace/id_generator/aws.rs b/opentelemetry/src/sdk/trace/id_generator/aws.rs index 65581fa69e..76b65d8016 100644 --- a/opentelemetry/src/sdk/trace/id_generator/aws.rs +++ b/opentelemetry/src/sdk/trace/id_generator/aws.rs @@ -24,14 +24,11 @@ use std::time::{Duration, UNIX_EPOCH}; /// /// ``` /// use opentelemetry::trace::NoopSpanExporter; -/// use opentelemetry::sdk::trace::{Config, TracerProvider, XrayIdGenerator}; +/// use opentelemetry::sdk::trace::{self, TracerProvider, XrayIdGenerator}; /// /// let _provider: TracerProvider = TracerProvider::builder() /// .with_simple_exporter(NoopSpanExporter::new()) -/// .with_config(Config { -/// id_generator: Box::new(XrayIdGenerator::default()), -/// ..Default::default() -/// }) +/// .with_config(trace::config().with_id_generator(XrayIdGenerator::default())) /// .build(); /// ``` /// diff --git a/opentelemetry/src/sdk/trace/provider.rs b/opentelemetry/src/sdk/trace/provider.rs index d8e0e748d4..c4573acda7 100644 --- a/opentelemetry/src/sdk/trace/provider.rs +++ b/opentelemetry/src/sdk/trace/provider.rs @@ -116,11 +116,11 @@ impl Builder { // drop. We cannot assume we are in a multi-threaded tokio runtime here, so use // `spawn_blocking` to avoid blocking the main thread. let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut)); - let batch = sdk::trace::BatchSpanProcessor::from_env( + let batch = sdk::trace::BatchSpanProcessor::builder( exporter, spawn, - crate::util::tokio_interval_stream, tokio::time::sleep, + crate::util::tokio_interval_stream, ); self.with_batch_exporter(batch.build()) } @@ -129,11 +129,11 @@ impl Builder { #[cfg(all(feature = "async-std", not(feature = "tokio")))] #[cfg_attr(docsrs, doc(cfg(feature = "async-std")))] pub fn with_exporter(self, exporter: T) -> Self { - let batch = sdk::trace::BatchSpanProcessor::from_env( + let batch = sdk::trace::BatchSpanProcessor::builder( exporter, async_std::task::spawn, - async_std::stream::interval, async_std::task::sleep, + async_std::stream::interval, ); self.with_batch_exporter(batch.build()) } diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 203ba12ffc..61c3026aa3 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -33,13 +33,6 @@ //! //! [`is_recording`]: ../span/trait.Span.html#method.is_recording //! [`TracerProvider`]: ../provider/trait.TracerProvider.html -use std::{fmt, str::FromStr, sync::Mutex, time}; - -use futures::{ - channel::mpsc, channel::oneshot, executor, future::BoxFuture, future::Either, pin_mut, Future, - FutureExt, Stream, StreamExt, -}; - use crate::global; use crate::sdk::trace::Span; use crate::{ @@ -47,23 +40,29 @@ use crate::{ trace::{TraceError, TraceResult}, Context, }; +use futures::{ + channel::mpsc, channel::oneshot, executor, future::BoxFuture, future::Either, pin_mut, Future, + FutureExt, Stream, StreamExt, +}; +use std::env; +use std::{fmt, str::FromStr, sync::Mutex, time::Duration}; -/// Delay interval between two consecutive exports, default to be 5000. -const OTEL_BSP_SCHEDULE_DELAY_MILLIS: &str = "OTEL_BSP_SCHEDULE_DELAY_MILLIS"; +/// Delay interval between two consecutive exports. +const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY"; /// Default delay interval between two consecutive exports. -const OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT: u64 = 5000; -/// Maximum queue size, default to be 2048 +const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: u64 = 5_000; +/// Maximum queue size const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE"; /// Default maximum queue size -const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2048; -/// Maximum batch size, must be less than or equal to OTEL_BSP_MAX_QUEUE_SIZE, default to be 512 +const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048; +/// Maximum batch size, must be less than or equal to OTEL_BSP_MAX_QUEUE_SIZE const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"; /// Default maximum batch size const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; -/// Maximum allowed time to export data -const OTEL_BSP_EXPORT_TIMEOUT_MILLIS: &str = "OTEL_BSP_EXPORT_TIMEOUT_MILLIS"; -/// Default maximum allowed time to export data -const OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT: u64 = 30000; +/// Maximum allowed time to export data. +const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT"; +/// Default maximum allowed time to export data. +const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000; /// `SpanProcessor` is an interface which allows hooks for span start and end /// method invocations. The span processors are invoked only when is_recording @@ -258,9 +257,9 @@ impl BatchSpanProcessor { where S: Fn(BoxFuture<'static, ()>) -> SH, SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IS, + I: Fn(Duration) -> IS, IS: Stream + Send + 'static, - D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, + D: (Fn(Duration) -> DS) + Send + Sync + 'static, DS: Future + 'static + Send + Sync, { let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size); @@ -362,8 +361,8 @@ impl BatchSpanProcessor { E: SpanExporter, S: Fn(BoxFuture<'static, ()>) -> SH, SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IO, - D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, + I: Fn(Duration) -> IO, + D: (Fn(Duration) -> DS) + Send + Sync + 'static, DS: Future + 'static + Send + Sync, { BatchSpanProcessorBuilder { @@ -371,82 +370,19 @@ impl BatchSpanProcessor { spawn, interval, delay, - config: Default::default(), - } - } - - /// Create a new batch processor builder and set the config value based on environment variables. - /// - /// If the value in environment variables is illegal, will fall back to use default value. - /// - /// Note that export batch size should be less than or equals to max queue size. - /// If export batch size is larger than max queue size, we will lower to be the same as max - /// queue size - pub fn from_env( - exporter: E, - spawn: S, - interval: I, - delay: D, - ) -> BatchSpanProcessorBuilder - where - E: SpanExporter, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IO, - D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, - DS: Future + 'static + Send + Sync, - { - let mut config = BatchConfig::default(); - let schedule_delay = std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS) - .map(|delay| u64::from_str(&delay).unwrap_or(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT)) - .unwrap_or(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT); - config.scheduled_delay = time::Duration::from_millis(schedule_delay); - - let max_queue_size = std::env::var(OTEL_BSP_MAX_QUEUE_SIZE) - .map(|queue_size| { - usize::from_str(&queue_size).unwrap_or(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT) - }) - .unwrap_or(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT); - config.max_queue_size = max_queue_size; - - let max_export_batch_size = std::env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE) - .map(|batch_size| { - usize::from_str(&batch_size).unwrap_or(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT) - }) - .unwrap_or(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT); - // max export batch size must be less or equal to max queue size. - // we set max export batch size to max queue size if it's larger than max queue size. - if max_export_batch_size > max_queue_size { - config.max_export_batch_size = max_queue_size; - } else { - config.max_export_batch_size = max_export_batch_size; - } - - let max_export_time_out = std::env::var(OTEL_BSP_EXPORT_TIMEOUT_MILLIS) - .map(|timeout| { - u64::from_str(&timeout).unwrap_or(OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT) - }) - .unwrap_or(OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT); - config.max_export_timeout = time::Duration::from_millis(max_export_time_out); - - BatchSpanProcessorBuilder { - config, - exporter, - spawn, - delay, - interval, + config: BatchConfig::default(), } } } async fn export_with_timeout( - time_out: time::Duration, + time_out: Duration, exporter: &mut E, delay: &D, batch: Vec, ) -> ExportResult where - D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, + D: (Fn(Duration) -> DS) + Send + Sync + 'static, DS: Future + 'static + Send + Sync, E: SpanExporter + ?Sized, { @@ -469,7 +405,7 @@ pub struct BatchConfig { /// The delay interval in milliseconds between two consecutive processing /// of batches. The default value is 5 seconds. - scheduled_delay: time::Duration, + scheduled_delay: Duration, /// The maximum number of spans to process in a single batch. If there are /// more than one batch worth of spans then it processes multiple batches @@ -478,17 +414,55 @@ pub struct BatchConfig { max_export_batch_size: usize, /// The maximum duration to export a batch of data. - max_export_timeout: time::Duration, + max_export_timeout: Duration, } impl Default for BatchConfig { fn default() -> Self { - BatchConfig { + let mut config = BatchConfig { max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, - scheduled_delay: time::Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT), + scheduled_delay: Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT), max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, - max_export_timeout: time::Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT), + max_export_timeout: Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT), + }; + + if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE) + .ok() + .and_then(|queue_size| usize::from_str(&queue_size).ok()) + { + config.max_queue_size = max_queue_size; } + + if let Some(scheduled_delay) = env::var(OTEL_BSP_SCHEDULE_DELAY) + .ok() + .or_else(|| env::var("OTEL_BSP_SCHEDULE_DELAY_MILLIS").ok()) + .and_then(|delay| u64::from_str(&delay).ok()) + { + config.scheduled_delay = Duration::from_millis(scheduled_delay); + } + + if let Some(max_export_batch_size) = env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE) + .ok() + .and_then(|batch_size| usize::from_str(&batch_size).ok()) + { + config.max_export_batch_size = max_export_batch_size; + } + + // max export batch size must be less or equal to max queue size. + // we set max export batch size to max queue size if it's larger than max queue size. + if config.max_export_batch_size > config.max_queue_size { + config.max_export_batch_size = config.max_queue_size; + } + + if let Some(max_export_timeout) = env::var(OTEL_BSP_EXPORT_TIMEOUT) + .ok() + .or_else(|| env::var("OTEL_BSP_EXPORT_TIMEOUT_MILLIS").ok()) + .and_then(|timeout| u64::from_str(&timeout).ok()) + { + config.max_export_timeout = Duration::from_millis(max_export_timeout); + } + + config } } @@ -509,9 +483,9 @@ where E: SpanExporter + 'static, S: Fn(BoxFuture<'static, ()>) -> SH, SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IS, + I: Fn(Duration) -> IS, IS: Stream + Send + 'static, - D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, + D: (Fn(Duration) -> DS) + Send + Sync + 'static, DS: Future + 'static + Send + Sync, { /// Set max queue size for batches @@ -523,7 +497,7 @@ where } /// Set scheduled delay for batches - pub fn with_scheduled_delay(self, delay: time::Duration) -> Self { + pub fn with_scheduled_delay(self, delay: Duration) -> Self { let mut config = self.config; config.scheduled_delay = delay; @@ -531,7 +505,7 @@ where } /// Set max timeout for exporting. - pub fn with_max_timeout(self, timeout: time::Duration) -> Self { + pub fn with_max_timeout(self, timeout: Duration) -> Self { let mut config = self.config; config.max_export_timeout = timeout; @@ -567,13 +541,11 @@ where #[cfg(all(test, feature = "testing", feature = "trace"))] mod tests { use std::fmt::Debug; - use std::time; use std::time::Duration; use async_trait::async_trait; use crate::sdk::export::trace::{stdout, ExportResult, SpanData, SpanExporter}; - use crate::sdk::trace::span_processor::OTEL_BSP_EXPORT_TIMEOUT_MILLIS; use crate::sdk::trace::BatchConfig; use crate::testing::trace::{ new_test_export_span_data, new_test_exporter, new_tokio_test_exporter, @@ -583,9 +555,9 @@ mod tests { use futures::Future; use super::{ - BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, - OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY_MILLIS, - OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT, + BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT, + OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, + OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT, }; #[test] @@ -605,22 +577,22 @@ mod tests { } #[test] - fn test_build_batch_span_processor_from_env() { + fn test_build_batch_span_processor_builder() { std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "500"); - std::env::set_var(OTEL_BSP_EXPORT_TIMEOUT_MILLIS, "2046"); - std::env::set_var(OTEL_BSP_SCHEDULE_DELAY_MILLIS, "I am not number"); + std::env::set_var(OTEL_BSP_EXPORT_TIMEOUT, "2046"); + std::env::set_var(OTEL_BSP_SCHEDULE_DELAY, "I am not number"); - let mut builder = BatchSpanProcessor::from_env( + let mut builder = BatchSpanProcessor::builder( stdout::Exporter::new(std::io::stdout(), true), tokio::spawn, - tokio_interval_stream, tokio::time::sleep, + tokio_interval_stream, ); // export batch size cannot exceed max queue size assert_eq!(builder.config.max_export_batch_size, 500); assert_eq!( builder.config.scheduled_delay, - time::Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT) + Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT) ); assert_eq!( builder.config.max_queue_size, @@ -628,15 +600,15 @@ mod tests { ); assert_eq!( builder.config.max_export_timeout, - time::Duration::from_millis(2046) + Duration::from_millis(2046) ); std::env::set_var(OTEL_BSP_MAX_QUEUE_SIZE, "120"); - builder = BatchSpanProcessor::from_env( + builder = BatchSpanProcessor::builder( stdout::Exporter::new(std::io::stdout(), true), tokio::spawn, - tokio_interval_stream, tokio::time::sleep, + tokio_interval_stream, ); assert_eq!(builder.config.max_export_batch_size, 120); @@ -681,13 +653,13 @@ mod tests { } struct BlockingExporter { - delay_for: time::Duration, + delay_for: Duration, delay_fn: D, } impl Debug for BlockingExporter where - D: Fn(time::Duration) -> DS + 'static + Send + Sync, + D: Fn(Duration) -> DS + 'static + Send + Sync, DS: Future + Send + Sync + 'static, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -698,7 +670,7 @@ mod tests { #[async_trait] impl SpanExporter for BlockingExporter where - D: Fn(time::Duration) -> DS + 'static + Send + Sync, + D: Fn(Duration) -> DS + 'static + Send + Sync, DS: Future + Send + Sync + 'static, { async fn export(&mut self, _batch: Vec) -> ExportResult { @@ -745,12 +717,12 @@ mod tests { #[cfg(feature = "async-std")] async fn timeout_test_std_async(time_out: bool) { let config = BatchConfig { - max_export_timeout: time::Duration::from_millis(if time_out { 5 } else { 60 }), + max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }), scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush ..Default::default() }; let exporter = BlockingExporter { - delay_for: time::Duration::from_millis(if !time_out { 5 } else { 60 }), + delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }), delay_fn: async_std::task::sleep, }; let mut processor = BatchSpanProcessor::new( @@ -775,12 +747,12 @@ mod tests { // otherwise the exporter should be able to export within time out duration. async fn timeout_test_tokio(time_out: bool) { let config = BatchConfig { - max_export_timeout: time::Duration::from_millis(if time_out { 5 } else { 60 }), + max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }), scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush, ..Default::default() }; let exporter = BlockingExporter { - delay_for: time::Duration::from_millis(if !time_out { 5 } else { 60 }), + delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }), delay_fn: tokio::time::sleep, }; let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut)); @@ -791,7 +763,7 @@ mod tests { tokio::time::sleep, config, ); - tokio::time::sleep(time::Duration::from_secs(1)).await; // skip the first + tokio::time::sleep(Duration::from_secs(1)).await; // skip the first processor.on_end(new_test_export_span_data()); let flush_res = processor.force_flush(); if time_out {