diff --git a/Cargo.toml b/Cargo.toml index 27d9d98ee612a..903010dc5ed14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -777,7 +777,7 @@ nats-integration-tests = ["sinks-nats", "sources-nats"] nginx-integration-tests = ["sources-nginx_metrics"] opentelemetry-integration-tests = ["sources-opentelemetry"] postgresql_metrics-integration-tests = ["sources-postgresql_metrics"] -prometheus-integration-tests = ["sinks-prometheus", "sources-prometheus"] +prometheus-integration-tests = ["sinks-prometheus", "sources-prometheus", "sinks-influxdb"] pulsar-integration-tests = ["sinks-pulsar"] redis-integration-tests = ["sinks-redis", "sources-redis"] splunk-integration-tests = ["sinks-splunk_hec"] diff --git a/src/internal_events/influxdb.rs b/src/internal_events/influxdb.rs new file mode 100644 index 0000000000000..eae676759beb2 --- /dev/null +++ b/src/internal_events/influxdb.rs @@ -0,0 +1,36 @@ +use crate::{ + emit, + internal_events::{ComponentEventsDropped, UNINTENTIONAL}, +}; +use metrics::counter; +use vector_common::internal_event::{error_stage, error_type}; +use vector_core::internal_event::InternalEvent; + +#[derive(Debug)] +pub struct InfluxdbEncodingError { + pub error_message: &'static str, + pub count: u64, +} + +impl InternalEvent for InfluxdbEncodingError { + fn emit(self) { + let reason = "Failed to encode event."; + error!( + message = reason, + error = %self.error_message, + error_type = error_type::ENCODER_FAILED, + stage = error_stage::PROCESSING, + internal_log_rate_secs = 10, + ); + counter!( + "component_errors_total", 1, + "error_type" => error_type::ENCODER_FAILED, + "stage" => error_stage::PROCESSING, + ); + + emit!(ComponentEventsDropped:: { + count: self.count, + reason + }); + } +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index e964ea8cace0e..da6007836dbae 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -57,6 +57,8 @@ mod http; pub mod http_client; #[cfg(feature = "sources-utils-http-scrape")] mod http_scrape; +#[cfg(feature = "sinks-influxdb")] +mod influxdb; #[cfg(feature = "sources-internal_logs")] mod internal_logs; #[cfg(feature = "sources-internal_metrics")] @@ -191,6 +193,8 @@ pub(crate) use self::host_metrics::*; pub(crate) use self::http::*; #[cfg(feature = "sources-utils-http-scrape")] pub(crate) use self::http_scrape::*; +#[cfg(feature = "sinks-influxdb")] +pub(crate) use self::influxdb::*; #[cfg(feature = "sources-internal_logs")] pub(crate) use self::internal_logs::*; #[cfg(feature = "sources-internal_metrics")] diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index 7a1d1740a65b1..22701ddfcf769 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -11,6 +11,7 @@ use crate::{ config::{log_schema, AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, event::{Event, Value}, http::HttpClient, + internal_events::InfluxdbEncodingError, sinks::{ influxdb::{ encode_timestamp, healthcheck, influx_line_protocol, influxdb_settings, Field, @@ -210,7 +211,7 @@ impl HttpEventEncoder for InfluxDbLogsEncoder { }); let mut output = BytesMut::new(); - if let Err(error) = influx_line_protocol( + if let Err(error_message) = influx_line_protocol( self.protocol_version, &self.measurement, Some(tags), @@ -218,7 +219,10 @@ impl HttpEventEncoder for InfluxDbLogsEncoder { timestamp, &mut output, ) { - warn!(message = "Failed to encode event; dropping event.", %error, internal_log_rate_secs = 30); + emit!(InfluxdbEncodingError { + error_message, + count: 1 + }); return None; }; @@ -296,7 +300,7 @@ fn to_field(value: &Value) -> Field { #[cfg(test)] mod tests { use chrono::{offset::TimeZone, Utc}; - use futures::{channel::mpsc, StreamExt}; + use futures::{channel::mpsc, stream, StreamExt}; use http::{request::Parts, StatusCode}; use indoc::indoc; use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; @@ -307,7 +311,13 @@ mod tests { influxdb::test_util::{assert_fields, split_line_protocol, ts}, util::test::{build_test_server_status, load_sink}, }, - test_util::{components, components::HTTP_SINK_TAGS, next_addr}, + test_util::{ + components::{ + run_and_assert_sink_compliance, run_and_assert_sink_error, COMPONENT_ERROR_TAGS, + HTTP_SINK_TAGS, + }, + next_addr, + }, }; type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>; @@ -681,10 +691,10 @@ mod tests { } drop(batch); - components::init_test(); - sink.run_events(events).await.unwrap(); if batch_status == BatchStatus::Delivered { - components::SINK_TESTS.assert(&HTTP_SINK_TAGS); + run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await; + } else { + run_and_assert_sink_error(sink, stream::iter(events), &COMPONENT_ERROR_TAGS).await; } assert_eq!(receiver.try_recv(), Ok(batch_status)); diff --git a/src/sinks/influxdb/metrics.rs b/src/sinks/influxdb/metrics.rs index dc05e8bfce7dd..472d064f01f2d 100644 --- a/src/sinks/influxdb/metrics.rs +++ b/src/sinks/influxdb/metrics.rs @@ -21,6 +21,7 @@ use crate::{ Event, }, http::HttpClient, + internal_events::InfluxdbEncodingError, sinks::{ influxdb::{ encode_timestamp, healthcheck, influx_line_protocol, influxdb_settings, Field, @@ -193,10 +194,12 @@ impl Service> for InfluxDbSvc { type Error = crate::Error; type Future = BoxFuture<'static, Result>; + // Emission of Error internal event is handled upstream by the caller fn poll_ready(&mut self, cx: &mut std::task::Context) -> Poll> { self.inner.poll_ready(cx) } + // Emission of Error internal event is handled upstream by the caller fn call(&mut self, items: Vec) -> Self::Future { let input = encode_events( self.protocol_version, @@ -273,6 +276,8 @@ fn encode_events( quantiles: &[f64], ) -> BytesMut { let mut output = BytesMut::new(); + let count = events.len() as u64; + for event in events.into_iter() { let fullname = encode_namespace(event.namespace().or(default_namespace), '.', event.name()); let ts = encode_timestamp(event.timestamp()); @@ -281,7 +286,8 @@ fn encode_events( let mut unwrapped_tags = tags.unwrap_or_default(); unwrapped_tags.insert("metric_type".to_owned(), metric_type.to_owned()); - if let Err(error) = influx_line_protocol( + + if let Err(error_message) = influx_line_protocol( protocol_version, &fullname, Some(unwrapped_tags), @@ -289,7 +295,10 @@ fn encode_events( ts, &mut output, ) { - warn!(message = "Failed to encode event; dropping event.", %error, internal_log_rate_secs = 30); + emit!(InfluxdbEncodingError { + error_message, + count, + }); }; }