Skip to content

Commit

Permalink
fix(new_relic sink): Multiple fixes related to metrics (vectordotdev#…
Browse files Browse the repository at this point in the history
…18151)

* Convert tags into attributes.

* Decompose metric into parts to reuse tags instead of cloning.

* Remove unnecessary blocks.

* Set metric type.

* Fix field name typo.

* Remove unnecessary brackets.

* Cargo fmt.

* Update src/sinks/new_relic/model.rs

Co-authored-by: Bruce Guenter <[email protected]>

* Update src/sinks/new_relic/model.rs

Co-authored-by: Bruce Guenter <[email protected]>

* Fix typo.

* Add tests for newly supported metrics.

* Simplify metric type cases.

* Cargo fmt.

* Update to handle new tag model

New Relic only accepts key/value tags so we use `iter_single()` here.

Signed-off-by: Jesse Szwedko <[email protected]>

* PR feedback

Signed-off-by: Jesse Szwedko <[email protected]>

* refactor

* add dropped event metrics

* track unsupported metric types as well

* feedback

* more feedback

---------

Signed-off-by: Jesse Szwedko <[email protected]>
Co-authored-by: Andreu <[email protected]>
Co-authored-by: Bruce Guenter <[email protected]>
Co-authored-by: Doug Smith <[email protected]>
  • Loading branch information
4 people authored Sep 12, 2023
1 parent c74a469 commit 953e305
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 71 deletions.
224 changes: 156 additions & 68 deletions src/sinks/new_relic/model.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use std::{collections::HashMap, convert::TryFrom, fmt::Debug, time::SystemTime};
use std::{
collections::{BTreeMap, HashMap},
convert::TryFrom,
fmt::Debug,
time::SystemTime,
};

use chrono::{DateTime, Utc};
use ordered_float::NotNan;
use serde::{Deserialize, Serialize};
use vector_common::internal_event::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL};
use vrl::event_path;

use super::NewRelicSinkError;
use crate::event::{Event, MetricValue, Value};
use crate::event::{Event, MetricKind, MetricValue, Value};

#[derive(Debug)]
pub enum NewRelicApiModel {
Expand All @@ -22,30 +28,9 @@ type DataStore = HashMap<String, Vec<KeyValData>>;
pub struct MetricsApiModel(pub Vec<DataStore>);

impl MetricsApiModel {
pub fn new(metric_array: Vec<(Value, Value, Value)>) -> Self {
let mut metric_data_array = vec![];
for (m_name, m_value, m_timestamp) in metric_array {
let mut metric_data = KeyValData::new();
metric_data.insert("name".to_owned(), m_name);
metric_data.insert("value".to_owned(), m_value);
match m_timestamp {
Value::Timestamp(ts) => {
metric_data.insert("timestamp".to_owned(), Value::from(ts.timestamp()));
}
Value::Integer(i) => {
metric_data.insert("timestamp".to_owned(), Value::from(i));
}
_ => {
metric_data.insert(
"timestamp".to_owned(),
Value::from(DateTime::<Utc>::from(SystemTime::now()).timestamp()),
);
}
}
metric_data_array.push(metric_data);
}
pub fn new(metric_array: Vec<KeyValData>) -> Self {
let mut metric_store = DataStore::new();
metric_store.insert("metrics".to_owned(), metric_data_array);
metric_store.insert("metrics".to_owned(), metric_array);
Self(vec![metric_store])
}
}
Expand All @@ -54,43 +39,107 @@ impl TryFrom<Vec<Event>> for MetricsApiModel {
type Error = NewRelicSinkError;

fn try_from(buf_events: Vec<Event>) -> Result<Self, Self::Error> {
let mut metric_array = vec![];

for buf_event in buf_events {
if let Event::Metric(metric) = buf_event {
// Future improvement: put metric type. If type = count, NR metric model requires an interval.ms field, that is not provided by the Vector Metric model.
match metric.value() {
MetricValue::Gauge { value } => {
metric_array.push((
Value::from(metric.name().to_owned()),
Value::from(
NotNan::new(*value).map_err(|_| {
NewRelicSinkError::new("NaN value not supported")
})?,
),
Value::from(metric.timestamp()),
));
}
MetricValue::Counter { value } => {
metric_array.push((
Value::from(metric.name().to_owned()),
Value::from(
NotNan::new(*value).map_err(|_| {
NewRelicSinkError::new("NaN value not supported")
})?,
),
Value::from(metric.timestamp()),
));
let mut num_non_metric_events = 0;
let mut num_missing_interval = 0;
let mut num_nan_value = 0;
let mut num_unsupported_metric_type = 0;

let metric_array: Vec<_> = buf_events
.into_iter()
.filter_map(|event| {
let Some(metric) = event.try_into_metric() else {
num_non_metric_events += 1;
return None;
};

// Generate Value::Object() from BTreeMap<String, String>
let (series, data, _) = metric.into_parts();

let mut metric_data = KeyValData::new();

// We only handle gauge and counter metrics
// Extract value & type and set type-related attributes
let (value, metric_type) = match (data.value, &data.kind) {
(MetricValue::Counter { value }, MetricKind::Incremental) => {
let Some(interval_ms) = data.time.interval_ms else {
// Incremental counter without an interval is worthless, skip this metric
num_missing_interval += 1;
return None;
};
metric_data.insert(
"interval.ms".to_owned(),
Value::from(interval_ms.get() as i64),
);
(value, "count")
}
(MetricValue::Counter { value }, MetricKind::Absolute) => (value, "gauge"),
(MetricValue::Gauge { value }, _) => (value, "gauge"),
_ => {
// Unrecognized metric type
// Unsupported metric type
num_unsupported_metric_type += 1;
return None;
}
};

// Set name, type, value, timestamp, and attributes
metric_data.insert("name".to_owned(), Value::from(series.name.name));
metric_data.insert("type".to_owned(), Value::from(metric_type));
let Some(value) = NotNan::new(value).ok() else {
num_nan_value += 1;
return None;
};
metric_data.insert("value".to_owned(), Value::from(value));
metric_data.insert(
"timestamp".to_owned(),
Value::from(
data.time
.timestamp
.unwrap_or_else(|| DateTime::<Utc>::from(SystemTime::now()))
.timestamp(),
),
);
if let Some(tags) = series.tags {
metric_data.insert(
"attributes".to_owned(),
Value::from(
tags.iter_single()
.map(|(key, value)| (key.to_string(), Value::from(value)))
.collect::<BTreeMap<_, _>>(),
),
);
}
}

Some(metric_data)
})
.collect();

if num_non_metric_events > 0 {
emit!(ComponentEventsDropped::<INTENTIONAL> {
count: num_non_metric_events,
reason: "non-metric event"
});
}
if num_unsupported_metric_type > 0 {
emit!(ComponentEventsDropped::<INTENTIONAL> {
count: num_unsupported_metric_type,
reason: "unsupported metric type"
});
}
if num_nan_value > 0 {
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: num_nan_value,
reason: "NaN value not supported"
});
}
if num_missing_interval > 0 {
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: num_missing_interval,
reason: "incremental counter missing interval"
});
}

if !metric_array.is_empty() {
Ok(MetricsApiModel::new(metric_array))
Ok(Self::new(metric_array))
} else {
Err(NewRelicSinkError::new("No valid metrics to generate"))
}
Expand All @@ -110,9 +159,17 @@ impl TryFrom<Vec<Event>> for EventsApiModel {
type Error = NewRelicSinkError;

fn try_from(buf_events: Vec<Event>) -> Result<Self, Self::Error> {
let mut events_array = vec![];
for buf_event in buf_events {
if let Event::Log(log) = buf_event {
let mut num_non_log_events = 0;
let mut num_nan_value = 0;

let events_array: Vec<HashMap<String, Value>> = buf_events
.into_iter()
.filter_map(|event| {
let Some(log) = event.try_into_log() else {
num_non_log_events += 1;
return None;
};

let mut event_model = KeyValData::new();
for (k, v) in log.convert_to_fields() {
event_model.insert(k, v.clone());
Expand All @@ -133,8 +190,9 @@ impl TryFrom<Vec<Event>> for EventsApiModel {
if let Some(f) = n.as_f64() {
event_model.insert(
k,
Value::from(NotNan::new(f).map_err(|_| {
NewRelicSinkError::new("NaN value not supported")
Value::from(NotNan::new(f).ok().or_else(|| {
num_nan_value += 1;
None
})?),
);
} else {
Expand All @@ -144,7 +202,9 @@ impl TryFrom<Vec<Event>> for EventsApiModel {
serde_json::Value::Bool(b) => {
event_model.insert(k, Value::from(b));
}
_ => {}
_ => {
// Note that arrays and nested objects are silently dropped.
}
}
}
event_model.remove("message");
Expand All @@ -156,8 +216,21 @@ impl TryFrom<Vec<Event>> for EventsApiModel {
.insert("eventType".to_owned(), Value::from("VectorSink".to_owned()));
}

events_array.push(event_model);
}
Some(event_model)
})
.collect();

if num_non_log_events > 0 {
emit!(ComponentEventsDropped::<INTENTIONAL> {
count: num_non_log_events,
reason: "non-log event"
});
}
if num_nan_value > 0 {
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: num_nan_value,
reason: "NaN value not supported"
});
}

if !events_array.is_empty() {
Expand All @@ -183,9 +256,16 @@ impl TryFrom<Vec<Event>> for LogsApiModel {
type Error = NewRelicSinkError;

fn try_from(buf_events: Vec<Event>) -> Result<Self, Self::Error> {
let mut logs_array = vec![];
for buf_event in buf_events {
if let Event::Log(log) = buf_event {
let mut num_non_log_events = 0;

let logs_array: Vec<HashMap<String, Value>> = buf_events
.into_iter()
.filter_map(|event| {
let Some(log) = event.try_into_log() else {
num_non_log_events += 1;
return None;
};

let mut log_model = KeyValData::new();
for (k, v) in log.convert_to_fields() {
log_model.insert(k, v.clone());
Expand All @@ -196,8 +276,16 @@ impl TryFrom<Vec<Event>> for LogsApiModel {
Value::from("log from vector".to_owned()),
);
}
logs_array.push(log_model);
}

Some(log_model)
})
.collect();

if num_non_log_events > 0 {
emit!(ComponentEventsDropped::<INTENTIONAL> {
count: num_non_log_events,
reason: "non-log event"
});
}

if !logs_array.is_empty() {
Expand Down
33 changes: 30 additions & 3 deletions src/sinks/new_relic/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, convert::TryFrom, time::SystemTime};
use std::{collections::HashMap, convert::TryFrom, num::NonZeroU32, time::SystemTime};

use chrono::{DateTime, Utc};
use futures::{future::ready, stream};
Expand Down Expand Up @@ -211,7 +211,7 @@ fn generate_metric_api_model() {
MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model");
let metrics = model.0[0]
.get("metrics")
.expect("Logs data store not present");
.expect("Metric data store not present");

assert_eq!(metrics.len(), 1);
assert!(metrics[0].get("name").is_some());
Expand All @@ -235,7 +235,7 @@ fn generate_metric_api_model() {
MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model");
let metrics = model.0[0]
.get("metrics")
.expect("Logs data store not present");
.expect("Metric data store not present");

assert_eq!(metrics.len(), 1);
assert!(metrics[0].get("name").is_some());
Expand All @@ -246,4 +246,31 @@ fn generate_metric_api_model() {
assert!(metrics[0].get("value").is_some());
assert_eq!(metrics[0].get("value").unwrap(), &Value::from(100.0));
assert!(metrics[0].get("timestamp").is_some());

// Incremental counter
let m = Metric::new(
"my_metric",
MetricKind::Incremental,
MetricValue::Counter { value: 100.0 },
)
.with_timestamp(Some(DateTime::<Utc>::from(SystemTime::now())))
.with_interval_ms(NonZeroU32::new(1000));
let event = Event::Metric(m);
let model =
MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model");
let metrics = model.0[0]
.get("metrics")
.expect("Metric data store not present");

assert_eq!(metrics.len(), 1);
assert!(metrics[0].get("name").is_some());
assert_eq!(
metrics[0].get("name").unwrap().to_string_lossy(),
"my_metric".to_owned()
);
assert!(metrics[0].get("value").is_some());
assert_eq!(metrics[0].get("value").unwrap(), &Value::from(100.0));
assert!(metrics[0].get("timestamp").is_some());
assert!(metrics[0].get("interval.ms").is_some());
assert_eq!(metrics[0].get("interval.ms").unwrap(), &Value::from(1000));
}

0 comments on commit 953e305

Please sign in to comment.