Skip to content

Commit

Permalink
feat: add methodfrom_env to prometheus exporter builder (#605)
Browse files Browse the repository at this point in the history
* feat: add method`from_env` to promethes exporter builder

Fixes #293

Allows configuring the `PrometheusExporter` using environment variables defined in the semantic conventions of the Otel specification. Currently, supports setting the host and port for prometheus in the builder and will fall back to the defaults defined in the otel specification.

* addressed feedback in favor of impl `Default` over `from_env`

* fixes clippy lint issues after upgrading to rust 1.54

* convert port from String to u16

An error is logged to the global OpenTelemetry error handling when we fail to parse the port from an OS Environment variable
  • Loading branch information
LanceEa authored Aug 3, 2021
1 parent d3bbfd2 commit 3a639bf
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 17 deletions.
2 changes: 1 addition & 1 deletion opentelemetry-datadog/src/exporter/model/v05.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn encode_traces(
let mut encoded = Vec::new();
rmp::encode::write_array_len(&mut encoded, traces.len() as u32)?;

let service_interned = interner.intern(&service_name);
let service_interned = interner.intern(service_name);

for trace in traces.into_iter() {
rmp::encode::write_array_len(&mut encoded, trace.len() as u32)?;
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/transform/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ pub(crate) mod tonic {
fn from(number: NumberWithKind<'a>) -> Self {
match &number.1 {
NumberKind::I64 | NumberKind::U64 => {
number_data_point::Value::AsInt(number.0.to_i64(&number.1))
number_data_point::Value::AsInt(number.0.to_i64(number.1))
}
NumberKind::F64 => number_data_point::Value::AsDouble(number.0.to_f64(&number.1)),
NumberKind::F64 => number_data_point::Value::AsDouble(number.0.to_f64(number.1)),
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions opentelemetry-prometheus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

## v0.9.0

### Added

- Adds `Default` implementation to `ExporterBuilder` based on the otel specification environment variables #242

### Deprecated

- `PrometheusExporter::new()` is deprecated in favor of using `ExporterBuilder`

## v0.8.0

### Changed
Expand All @@ -15,9 +25,11 @@
## v0.6.0

### Added

- Add sanitization of prometheus label names #462

### Changed

- Update to opentelemetry v0.13.0
- Update prometheus dependency #485

Expand Down
144 changes: 140 additions & 4 deletions opentelemetry-prometheus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ use opentelemetry::{
metrics::{registry::RegistryMeterProvider, MetricsError, NumberKind},
Key, Value,
};
use std::env;
use std::num::ParseIntError;
use std::sync::{Arc, Mutex};
use std::time::Duration;

Expand All @@ -92,13 +94,25 @@ const DEFAULT_CACHE_PERIOD: Duration = Duration::from_secs(0);

const EXPORT_KIND_SELECTOR: ExportKindSelector = ExportKindSelector::Cumulative;

/// Default host used by the Prometheus Exporter when env variable not found
const DEFAULT_EXPORTER_HOST: &str = "0.0.0.0";

/// Default port used by the Prometheus Exporter when env variable not found
const DEFAULT_EXPORTER_PORT: u16 = 9464;

/// The hostname for the Promtheus Exporter
const ENV_EXPORTER_HOST: &str = "OTEL_EXPORTER_PROMETHEUS_HOST";

/// The port for the Prometheus Exporter
const ENV_EXPORTER_PORT: &str = "OTEL_EXPORTER_PROMETHEUS_PORT";

/// Create a new prometheus exporter builder.
pub fn exporter() -> ExporterBuilder {
ExporterBuilder::default()
}

/// Configuration for the prometheus exporter.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct ExporterBuilder {
/// The OpenTelemetry `Resource` associated with all Meters
/// created by the pull controller.
Expand All @@ -122,6 +136,45 @@ pub struct ExporterBuilder {
///
/// If not set a new empty `Registry` is created.
registry: Option<prometheus::Registry>,

/// The host used by the prometheus exporter
///
/// If not set it will be defaulted to all addresses "0.0.0.0"
host: Option<String>,

/// The port used by the prometheus exporter
///
/// If not set it will be defaulted to port 9464
port: Option<u16>,
}

impl Default for ExporterBuilder {
fn default() -> Self {
let port: Option<u16> = match env::var(ENV_EXPORTER_PORT) {
Err(_) => None,
Ok(p_str) => p_str
.parse()
.map_err(|err: ParseIntError| {
let err_msg = format!(
"Unable to parse environment variable {}=\"{}\" - {}. Falling back to default port {}. ",
ENV_EXPORTER_PORT, p_str, err, DEFAULT_EXPORTER_PORT
);
global::handle_error(global::Error::Other(err_msg));
err
})
.ok(),
};

ExporterBuilder {
resource: None,
cache_period: None,
default_histogram_boundaries: None,
default_summary_quantiles: None,
registry: None,
host: env::var(ENV_EXPORTER_HOST).ok().filter(|s| !s.is_empty()),
port,
}
}
}

impl ExporterBuilder {
Expand Down Expand Up @@ -160,6 +213,22 @@ impl ExporterBuilder {
}
}

/// Set the host for the prometheus exporter
pub fn with_host(self, host: String) -> Self {
ExporterBuilder {
host: Some(host),
..self
}
}

/// Set the port for the prometheus exporter
pub fn with_port(self, port: u16) -> Self {
ExporterBuilder {
port: Some(port),
..self
}
}

/// Set the prometheus registry to be used by this exporter
pub fn with_registry(self, registry: prometheus::Registry) -> Self {
ExporterBuilder {
Expand Down Expand Up @@ -189,12 +258,25 @@ impl ExporterBuilder {

global::set_meter_provider(controller.provider());

PrometheusExporter::new(
let host = self
.host
.unwrap_or_else(|| DEFAULT_EXPORTER_HOST.to_string());
let port = self.port.unwrap_or(DEFAULT_EXPORTER_PORT);

let controller = Arc::new(Mutex::new(controller));
let collector = Collector::with_controller(controller.clone());
registry
.register(Box::new(collector))
.map_err(|e| MetricsError::Other(e.to_string()))?;

Ok(PrometheusExporter {
registry,
controller,
default_summary_quantiles,
default_histogram_boundaries,
)
host,
port,
})
}

/// Sets up a complete export pipeline with the recommended setup, using the
Expand All @@ -218,15 +300,23 @@ pub struct PrometheusExporter {
controller: Arc<Mutex<PullController>>,
default_summary_quantiles: Vec<f64>,
default_histogram_boundaries: Vec<f64>,
host: String,
port: u16,
}

impl PrometheusExporter {
#[deprecated(
since = "0.9.0",
note = "Please use the ExporterBuilder to initialize a PrometheusExporter"
)]
/// Create a new prometheus exporter
pub fn new(
registry: prometheus::Registry,
controller: PullController,
default_summary_quantiles: Vec<f64>,
default_histogram_boundaries: Vec<f64>,
host: String,
port: u16,
) -> Result<Self, MetricsError> {
let controller = Arc::new(Mutex::new(controller));
let collector = Collector::with_controller(controller.clone());
Expand All @@ -239,6 +329,8 @@ impl PrometheusExporter {
controller,
default_summary_quantiles,
default_histogram_boundaries,
host,
port,
})
}

Expand All @@ -254,6 +346,16 @@ impl PrometheusExporter {
.map_err(Into::into)
.map(|locked| locked.provider())
}

/// Get the exporters host for prometheus.
pub fn host(&self) -> &str {
self.host.as_str()
}

/// Get the exporters port for prometheus.
pub fn port(&self) -> u16 {
self.port
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -288,7 +390,7 @@ impl prometheus::core::Collector for Collector {
let number_kind = record.descriptor().number_kind();
let instrument_kind = record.descriptor().instrument_kind();

let desc = get_metric_desc(&record);
let desc = get_metric_desc(record);
let labels = get_metric_labels(record);

if let Some(hist) = agg.as_any().downcast_ref::<HistogramAggregator>() {
Expand Down Expand Up @@ -458,3 +560,37 @@ fn get_metric_desc(record: &Record<'_>) -> PrometheusMetricDesc {
.unwrap_or_else(|| desc.name().to_string());
PrometheusMetricDesc { name, help }
}

#[cfg(test)]
mod tests {
use std::env;

use super::*;

#[test]
fn test_exporter_builder_default() {
env::remove_var(ENV_EXPORTER_HOST);
env::remove_var(ENV_EXPORTER_PORT);
let exporter = ExporterBuilder::default().init();
assert_eq!(exporter.host(), "0.0.0.0");
assert_eq!(exporter.port(), 9464);

env::set_var(ENV_EXPORTER_HOST, "prometheus-test");
env::set_var(ENV_EXPORTER_PORT, "9000");
let exporter = ExporterBuilder::default().init();
assert_eq!(exporter.host(), "prometheus-test");
assert_eq!(exporter.port(), 9000);

env::set_var(ENV_EXPORTER_HOST, "");
env::set_var(ENV_EXPORTER_PORT, "");
let exporter = ExporterBuilder::default().init();
assert_eq!(exporter.host(), "0.0.0.0");
assert_eq!(exporter.port(), 9464);

env::set_var(ENV_EXPORTER_HOST, "");
env::set_var(ENV_EXPORTER_PORT, "not_a_number");
let exporter = ExporterBuilder::default().init();
assert_eq!(exporter.host(), "0.0.0.0");
assert_eq!(exporter.port(), 9464);
}
}
6 changes: 3 additions & 3 deletions opentelemetry/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ pub enum Array {
impl fmt::Display for Array {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Array::Bool(values) => display_array_str(&values, fmt),
Array::I64(values) => display_array_str(&values, fmt),
Array::F64(values) => display_array_str(&values, fmt),
Array::Bool(values) => display_array_str(values, fmt),
Array::I64(values) => display_array_str(values, fmt),
Array::F64(values) => display_array_str(values, fmt),
Array::String(values) => {
write!(fmt, "[")?;
for (i, t) in values.iter().enumerate() {
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry/src/labels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl<'a, A: Iterator<Item = (&'a Key, &'a Value)>, B: Iterator<Item = (&'a Key,
type Item = (&'a Key, &'a Value);
fn next(&mut self) -> Option<Self::Item> {
let which = match (self.a.peek(), self.b.peek()) {
(Some(a), Some(b)) => Some(a.0.cmp(&b.0)),
(Some(a), Some(b)) => Some(a.0.cmp(b.0)),
(Some(_), None) => Some(Ordering::Less),
(None, Some(_)) => Some(Ordering::Greater),
(None, None) => None,
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry/src/sdk/metrics/aggregators/ddsketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl Inner {
};
(-self.log_gamma(positive_num.to_f64(kind)).ceil()) as i64 - self.offset
} else if num.to_f64(kind) > self.key_epsilon {
self.log_gamma(num.to_f64(&kind)).ceil() as i64 + self.offset
self.log_gamma(num.to_f64(kind)).ceil() as i64 + self.offset
} else {
0i64
}
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry/src/sdk/metrics/aggregators/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::{Arc, RwLock};
/// Create a new histogram for the given descriptor with the given boundaries
pub fn histogram(desc: &Descriptor, boundaries: &[f64]) -> HistogramAggregator {
let mut sorted_boundaries = boundaries.to_owned();
sorted_boundaries.sort_by(|a, b| a.partial_cmp(&b).unwrap());
sorted_boundaries.sort_by(|a, b| a.partial_cmp(b).unwrap());
let state = State::empty(&sorted_boundaries);

HistogramAggregator {
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry/src/sdk/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl AccumulatorCore {
} else {
// Having no updates since last collection, try to remove if
// there are no bound handles
if Arc::strong_count(&value) == 1 {
if Arc::strong_count(value) == 1 {
// There's a potential race between loading collected count and
// loading the strong count in this function. Since this is the
// last we'll see of this record, checkpoint.
Expand Down Expand Up @@ -245,7 +245,7 @@ impl AccumulatorCore {
record.instrument.descriptor(),
&record.labels,
&self.resource,
&checkpoint,
checkpoint,
);
if let Err(err) = locked_processor.process(accumulation) {
global::handle_error(err);
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry/src/sdk/metrics/processors/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl<'a> LockedProcessor for BasicLockedProcessor<'a> {
if let Some(current) = self.parent.aggregation_selector().aggregator_for(desc) {
value.current = current;
value.current_owned = true;
tmp.synchronized_move(&value.current, &desc)?;
tmp.synchronized_move(&value.current, desc)?;
}
}

Expand All @@ -141,7 +141,7 @@ impl<'a> LockedProcessor for BasicLockedProcessor<'a> {
let stateful = self
.parent
.export_selector
.export_kind_for(&desc)
.export_kind_for(desc)
.memory_required(desc.instrument_kind());

let mut delta = None;
Expand Down

0 comments on commit 3a639bf

Please sign in to comment.