From 2df0488df4f4cd3774411e1e258d56903d93f3da Mon Sep 17 00:00:00 2001 From: kyle criddle Date: Wed, 7 Sep 2022 11:41:07 -0600 Subject: [PATCH 1/5] address both influxdb sinks- TODOs as blocked --- src/internal_events/influxdb.rs | 37 +++++++++++++++++++++++++++++++++ src/internal_events/mod.rs | 4 ++++ src/sinks/influxdb/logs.rs | 28 ++++++++++++++++++------- src/sinks/influxdb/metrics.rs | 13 ++++++++++-- 4 files changed, 73 insertions(+), 9 deletions(-) create mode 100644 src/internal_events/influxdb.rs diff --git a/src/internal_events/influxdb.rs b/src/internal_events/influxdb.rs new file mode 100644 index 0000000000000..51fca3140d9ee --- /dev/null +++ b/src/internal_events/influxdb.rs @@ -0,0 +1,37 @@ +use crate::{ + emit, + internal_events::{ComponentEventsDropped, UNINTENTIONAL}, +}; +use metrics::counter; +use vector_core::internal_event::InternalEvent; + +use super::prelude::{error_stage, error_type}; + +#[derive(Debug)] +pub struct InfluxdbEncodingError { + pub error_message: &'static str, + pub count: u64, +} + +impl InternalEvent for InfluxdbEncodingError { + fn emit(self) { + let reason = "Failed to encode event."; + error!( + message = reason, + error = %self.error_message, + error_type = error_type::ENCODER_FAILED, + stage = error_stage::PROCESSING, + internal_log_rate_secs = 10, + ); + counter!( + "component_errors_total", 1, + "error_type" => error_type::ENCODER_FAILED, + "stage" => error_stage::PROCESSING, + ); + + emit!(ComponentEventsDropped:: { + count: self.count, + reason + }); + } +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index e964ea8cace0e..da6007836dbae 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -57,6 +57,8 @@ mod http; pub mod http_client; #[cfg(feature = "sources-utils-http-scrape")] mod http_scrape; +#[cfg(feature = "sinks-influxdb")] +mod influxdb; #[cfg(feature = "sources-internal_logs")] mod internal_logs; #[cfg(feature = "sources-internal_metrics")] @@ -191,6 +193,8 @@ pub(crate) use self::host_metrics::*; pub(crate) use self::http::*; #[cfg(feature = "sources-utils-http-scrape")] pub(crate) use self::http_scrape::*; +#[cfg(feature = "sinks-influxdb")] +pub(crate) use self::influxdb::*; #[cfg(feature = "sources-internal_logs")] pub(crate) use self::internal_logs::*; #[cfg(feature = "sources-internal_metrics")] diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index 7a1d1740a65b1..ba36c85386ebb 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -11,6 +11,7 @@ use crate::{ config::{log_schema, AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, event::{Event, Value}, http::HttpClient, + internal_events::InfluxdbEncodingError, sinks::{ influxdb::{ encode_timestamp, healthcheck, influx_line_protocol, influxdb_settings, Field, @@ -210,7 +211,7 @@ impl HttpEventEncoder for InfluxDbLogsEncoder { }); let mut output = BytesMut::new(); - if let Err(error) = influx_line_protocol( + if let Err(error_message) = influx_line_protocol( self.protocol_version, &self.measurement, Some(tags), @@ -218,7 +219,10 @@ impl HttpEventEncoder for InfluxDbLogsEncoder { timestamp, &mut output, ) { - warn!(message = "Failed to encode event; dropping event.", %error, internal_log_rate_secs = 30); + emit!(InfluxdbEncodingError { + error_message, + count: 1 + }); return None; }; @@ -296,7 +300,7 @@ fn to_field(value: &Value) -> Field { #[cfg(test)] mod tests { use chrono::{offset::TimeZone, Utc}; - use futures::{channel::mpsc, StreamExt}; + use futures::{channel::mpsc, stream, StreamExt}; use http::{request::Parts, StatusCode}; use indoc::indoc; use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; @@ -307,7 +311,10 @@ mod tests { influxdb::test_util::{assert_fields, split_line_protocol, ts}, util::test::{build_test_server_status, load_sink}, }, - test_util::{components, components::HTTP_SINK_TAGS, next_addr}, + test_util::{ + components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + next_addr, + }, }; type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>; @@ -681,12 +688,19 @@ mod tests { } drop(batch); - components::init_test(); - sink.run_events(events).await.unwrap(); if batch_status == BatchStatus::Delivered { - components::SINK_TESTS.assert(&HTTP_SINK_TAGS); + run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await; + } else { + // TODO use assert_sink_error + sink.run_events(events).await.unwrap(); } + // components::init_test(); + // sink.run_events(events).await.unwrap(); + // if batch_status == BatchStatus::Delivered { + // components::SINK_TESTS.assert(&HTTP_SINK_TAGS); + // } + assert_eq!(receiver.try_recv(), Ok(batch_status)); rx diff --git a/src/sinks/influxdb/metrics.rs b/src/sinks/influxdb/metrics.rs index dc05e8bfce7dd..472d064f01f2d 100644 --- a/src/sinks/influxdb/metrics.rs +++ b/src/sinks/influxdb/metrics.rs @@ -21,6 +21,7 @@ use crate::{ Event, }, http::HttpClient, + internal_events::InfluxdbEncodingError, sinks::{ influxdb::{ encode_timestamp, healthcheck, influx_line_protocol, influxdb_settings, Field, @@ -193,10 +194,12 @@ impl Service> for InfluxDbSvc { type Error = crate::Error; type Future = BoxFuture<'static, Result>; + // Emission of Error internal event is handled upstream by the caller fn poll_ready(&mut self, cx: &mut std::task::Context) -> Poll> { self.inner.poll_ready(cx) } + // Emission of Error internal event is handled upstream by the caller fn call(&mut self, items: Vec) -> Self::Future { let input = encode_events( self.protocol_version, @@ -273,6 +276,8 @@ fn encode_events( quantiles: &[f64], ) -> BytesMut { let mut output = BytesMut::new(); + let count = events.len() as u64; + for event in events.into_iter() { let fullname = encode_namespace(event.namespace().or(default_namespace), '.', event.name()); let ts = encode_timestamp(event.timestamp()); @@ -281,7 +286,8 @@ fn encode_events( let mut unwrapped_tags = tags.unwrap_or_default(); unwrapped_tags.insert("metric_type".to_owned(), metric_type.to_owned()); - if let Err(error) = influx_line_protocol( + + if let Err(error_message) = influx_line_protocol( protocol_version, &fullname, Some(unwrapped_tags), @@ -289,7 +295,10 @@ fn encode_events( ts, &mut output, ) { - warn!(message = "Failed to encode event; dropping event.", %error, internal_log_rate_secs = 30); + emit!(InfluxdbEncodingError { + error_message, + count, + }); }; } From 920cf3c4770683ede724b68dc4daaac5a653fca9 Mon Sep 17 00:00:00 2001 From: kyle criddle Date: Wed, 7 Sep 2022 16:22:41 -0600 Subject: [PATCH 2/5] add important TODO --- src/sinks/influxdb/metrics.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sinks/influxdb/metrics.rs b/src/sinks/influxdb/metrics.rs index 472d064f01f2d..a85fecf163649 100644 --- a/src/sinks/influxdb/metrics.rs +++ b/src/sinks/influxdb/metrics.rs @@ -183,6 +183,9 @@ impl InfluxDbSvc { .map(|metric| Ok(EncodedEvent::new(metric, byte_size))) }) }) + + // TODO emit an Error event below, and see if we get a double emission when + // implementing an error path test that uses the test helper .sink_map_err(|error| error!(message = "Fatal influxdb sink error.", %error)); Ok(VectorSink::from_event_sink(sink)) From 6fb5effbdd20e6fc598e56a24c31148785804e70 Mon Sep 17 00:00:00 2001 From: kyle criddle Date: Thu, 8 Sep 2022 17:08:14 -0600 Subject: [PATCH 3/5] emit error internal event and compliance test --- src/internal_events/http.rs | 23 +++++++++++++++++++++++ src/sinks/influxdb/logs.rs | 14 +++++--------- src/sinks/influxdb/metrics.rs | 3 --- src/sinks/util/http.rs | 7 ++++++- 4 files changed, 34 insertions(+), 13 deletions(-) diff --git a/src/internal_events/http.rs b/src/internal_events/http.rs index 4c99988bd0a3c..91b3e9969852c 100644 --- a/src/internal_events/http.rs +++ b/src/internal_events/http.rs @@ -130,6 +130,29 @@ impl<'a> InternalEvent for HttpDecompressError<'a> { } } +pub struct HttpRequestError<'a> { + pub status_code: http::StatusCode, + pub endpoint: &'a str, +} + +impl<'a> InternalEvent for HttpRequestError<'a> { + fn emit(self) { + error!( + message = "HTTP call failed.", + endpoint = %self.endpoint, + status_code = %self.status_code, + error_type = error_type::REQUEST_FAILED, + stage = error_stage::SENDING, + internal_log_rate_secs = 10 + ); + counter!( + "component_errors_total", 1, + "error_type" => error_type::REQUEST_FAILED, + "stage" => error_stage::SENDING, + ); + } +} + pub struct HttpInternalError { pub message: &'static str, } diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index ba36c85386ebb..c076e714c2c6b 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -312,7 +312,10 @@ mod tests { util::test::{build_test_server_status, load_sink}, }, test_util::{ - components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + components::{ + run_and_assert_sink_compliance, run_and_assert_sink_error, HTTP_SINK_TAGS, + SOURCE_ERROR_TAGS, + }, next_addr, }, }; @@ -691,16 +694,9 @@ mod tests { if batch_status == BatchStatus::Delivered { run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await; } else { - // TODO use assert_sink_error - sink.run_events(events).await.unwrap(); + run_and_assert_sink_error(sink, stream::iter(events), &SOURCE_ERROR_TAGS).await; } - // components::init_test(); - // sink.run_events(events).await.unwrap(); - // if batch_status == BatchStatus::Delivered { - // components::SINK_TESTS.assert(&HTTP_SINK_TAGS); - // } - assert_eq!(receiver.try_recv(), Ok(batch_status)); rx diff --git a/src/sinks/influxdb/metrics.rs b/src/sinks/influxdb/metrics.rs index a85fecf163649..472d064f01f2d 100644 --- a/src/sinks/influxdb/metrics.rs +++ b/src/sinks/influxdb/metrics.rs @@ -183,9 +183,6 @@ impl InfluxDbSvc { .map(|metric| Ok(EncodedEvent::new(metric, byte_size))) }) }) - - // TODO emit an Error event below, and see if we get a double emission when - // implementing an error path test that uses the test helper .sink_map_err(|error| error!(message = "Fatal influxdb sink error.", %error)); Ok(VectorSink::from_event_sink(sink)) diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index be713cd7fa985..2c1a1698555db 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -27,7 +27,7 @@ use super::{ use crate::{ event::Event, http::{HttpClient, HttpError}, - internal_events::EndpointBytesSent, + internal_events::{EndpointBytesSent, HttpRequestError}, }; pub trait HttpEventEncoder { @@ -394,6 +394,11 @@ where protocol: &protocol, endpoint: &endpoint }); + } else { + emit!(HttpRequestError { + status_code: response.status(), + endpoint: &endpoint + }); } let (parts, body) = response.into_parts(); From 5874d69f841af1a29e4fed266ad8ac5a9ab4c1e9 Mon Sep 17 00:00:00 2001 From: kyle criddle Date: Fri, 9 Sep 2022 11:37:45 -0600 Subject: [PATCH 4/5] merge cleanup --- src/internal_events/http.rs | 23 ----------------------- src/internal_events/influxdb.rs | 3 +-- src/sinks/influxdb/logs.rs | 6 +++--- src/sinks/util/http.rs | 7 +------ 4 files changed, 5 insertions(+), 34 deletions(-) diff --git a/src/internal_events/http.rs b/src/internal_events/http.rs index dfce7948c4f24..dd0763a152a66 100644 --- a/src/internal_events/http.rs +++ b/src/internal_events/http.rs @@ -130,29 +130,6 @@ impl<'a> InternalEvent for HttpDecompressError<'a> { } } -pub struct HttpRequestError<'a> { - pub status_code: http::StatusCode, - pub endpoint: &'a str, -} - -impl<'a> InternalEvent for HttpRequestError<'a> { - fn emit(self) { - error!( - message = "HTTP call failed.", - endpoint = %self.endpoint, - status_code = %self.status_code, - error_type = error_type::REQUEST_FAILED, - stage = error_stage::SENDING, - internal_log_rate_secs = 10 - ); - counter!( - "component_errors_total", 1, - "error_type" => error_type::REQUEST_FAILED, - "stage" => error_stage::SENDING, - ); - } -} - pub struct HttpInternalError { pub message: &'static str, } diff --git a/src/internal_events/influxdb.rs b/src/internal_events/influxdb.rs index 51fca3140d9ee..eae676759beb2 100644 --- a/src/internal_events/influxdb.rs +++ b/src/internal_events/influxdb.rs @@ -3,10 +3,9 @@ use crate::{ internal_events::{ComponentEventsDropped, UNINTENTIONAL}, }; use metrics::counter; +use vector_common::internal_event::{error_stage, error_type}; use vector_core::internal_event::InternalEvent; -use super::prelude::{error_stage, error_type}; - #[derive(Debug)] pub struct InfluxdbEncodingError { pub error_message: &'static str, diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index c076e714c2c6b..22701ddfcf769 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -313,8 +313,8 @@ mod tests { }, test_util::{ components::{ - run_and_assert_sink_compliance, run_and_assert_sink_error, HTTP_SINK_TAGS, - SOURCE_ERROR_TAGS, + run_and_assert_sink_compliance, run_and_assert_sink_error, COMPONENT_ERROR_TAGS, + HTTP_SINK_TAGS, }, next_addr, }, @@ -694,7 +694,7 @@ mod tests { if batch_status == BatchStatus::Delivered { run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await; } else { - run_and_assert_sink_error(sink, stream::iter(events), &SOURCE_ERROR_TAGS).await; + run_and_assert_sink_error(sink, stream::iter(events), &COMPONENT_ERROR_TAGS).await; } assert_eq!(receiver.try_recv(), Ok(batch_status)); diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index 2c1a1698555db..be713cd7fa985 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -27,7 +27,7 @@ use super::{ use crate::{ event::Event, http::{HttpClient, HttpError}, - internal_events::{EndpointBytesSent, HttpRequestError}, + internal_events::EndpointBytesSent, }; pub trait HttpEventEncoder { @@ -394,11 +394,6 @@ where protocol: &protocol, endpoint: &endpoint }); - } else { - emit!(HttpRequestError { - status_code: response.status(), - endpoint: &endpoint - }); } let (parts, body) = response.into_parts(); From 66c8d440d2d1a7fb9c94b3284c64787a4d1567bd Mon Sep 17 00:00:00 2001 From: kyle criddle Date: Fri, 9 Sep 2022 15:32:08 -0600 Subject: [PATCH 5/5] component fix --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 27d9d98ee612a..903010dc5ed14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -777,7 +777,7 @@ nats-integration-tests = ["sinks-nats", "sources-nats"] nginx-integration-tests = ["sources-nginx_metrics"] opentelemetry-integration-tests = ["sources-opentelemetry"] postgresql_metrics-integration-tests = ["sources-postgresql_metrics"] -prometheus-integration-tests = ["sinks-prometheus", "sources-prometheus"] +prometheus-integration-tests = ["sinks-prometheus", "sources-prometheus", "sinks-influxdb"] pulsar-integration-tests = ["sinks-pulsar"] redis-integration-tests = ["sinks-redis", "sources-redis"] splunk-integration-tests = ["sinks-splunk_hec"]