diff --git a/src/internal_events/statsd_sink.rs b/src/internal_events/statsd_sink.rs index d867d09472bad..917fb6fb83b4e 100644 --- a/src/internal_events/statsd_sink.rs +++ b/src/internal_events/statsd_sink.rs @@ -2,6 +2,10 @@ use metrics::counter; use vector_core::internal_event::InternalEvent; use crate::event::metric::{MetricKind, MetricValue}; +use crate::{ + emit, + internal_events::{ComponentEventsDropped, UNINTENTIONAL}, +}; use vector_common::internal_event::{error_stage, error_type}; #[derive(Debug)] @@ -12,8 +16,9 @@ pub struct StatsdInvalidMetricError<'a> { impl<'a> InternalEvent for StatsdInvalidMetricError<'a> { fn emit(self) { + let reason = "Invalid metric type received."; error!( - message = "Invalid metric received; dropping event.", + message = reason, error_code = "invalid_metric", error_type = error_type::ENCODER_FAILED, stage = error_stage::PROCESSING, @@ -29,5 +34,7 @@ impl<'a> InternalEvent for StatsdInvalidMetricError<'a> { ); // deprecated counter!("processing_errors_total", 1); + + emit!(ComponentEventsDropped:: { reason, count: 1 }); } } diff --git a/src/sinks/datadog/events/service.rs b/src/sinks/datadog/events/service.rs index 56c6040c48c53..5fd00cdc6936f 100644 --- a/src/sinks/datadog/events/service.rs +++ b/src/sinks/datadog/events/service.rs @@ -82,12 +82,10 @@ impl Service for DatadogEventsService { type Error = crate::Error; type Future = BoxFuture<'static, Result>; - // Emission of Error internal event is handled upstream by the caller fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - // Emission of Error internal event is handled upstream by the caller fn call(&mut self, req: DatadogEventsRequest) -> Self::Future { let mut http_service = self.batch_http_service.clone(); diff --git a/src/sinks/datadog/logs/service.rs b/src/sinks/datadog/logs/service.rs index f1f4ac0ce1e04..42bd80100e193 100644 --- a/src/sinks/datadog/logs/service.rs +++ b/src/sinks/datadog/logs/service.rs @@ -107,12 +107,10 @@ impl Service for LogApiService { type Error = DatadogApiError; type Future = BoxFuture<'static, Result>; - // Emission of Error internal event is handled upstream by the caller fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { Poll::Ready(Ok(())) } - // Emission of Error internal event is handled upstream by the caller fn call(&mut self, request: LogApiRequest) -> Self::Future { let mut client = self.client.clone(); let http_request = Request::post(&self.uri) diff --git a/src/sinks/datadog/metrics/service.rs b/src/sinks/datadog/metrics/service.rs index 7bda65ae35767..476a295b6f72e 100644 --- a/src/sinks/datadog/metrics/service.rs +++ b/src/sinks/datadog/metrics/service.rs @@ -167,14 +167,12 @@ impl Service for DatadogMetricsService { type Error = DatadogApiError; type Future = BoxFuture<'static, Result>; - // Emission of Error internal event is handled upstream by the caller fn poll_ready(&mut self, cx: &mut Context) -> Poll> { self.client .poll_ready(cx) .map_err(|error| DatadogApiError::HttpError { error }) } - // Emission of Error internal event is handled upstream by the caller fn call(&mut self, request: DatadogMetricsRequest) -> Self::Future { let client = self.client.clone(); let api_key = self.api_key.clone(); diff --git a/src/sinks/datadog/traces/service.rs b/src/sinks/datadog/traces/service.rs index 0decb401d4f4e..cf74205ac8d65 100644 --- a/src/sinks/datadog/traces/service.rs +++ b/src/sinks/datadog/traces/service.rs @@ -134,12 +134,10 @@ impl Service for TraceApiService { type Error = HttpError; type Future = BoxFuture<'static, Result>; - // Emission of Error internal event is handled upstream by the caller fn poll_ready(&mut self, cx: &mut Context) -> Poll> { self.client.poll_ready(cx) } - // Emission of Error internal event is handled upstream by the caller fn call(&mut self, request: TraceApiRequest) -> Self::Future { let client = self.client.clone(); let protocol = request.uri.scheme_str().unwrap_or("http").to_string(); diff --git a/src/sinks/influxdb/metrics.rs b/src/sinks/influxdb/metrics.rs index 472d064f01f2d..52c37e7fe763a 100644 --- a/src/sinks/influxdb/metrics.rs +++ b/src/sinks/influxdb/metrics.rs @@ -194,12 +194,10 @@ impl Service> for InfluxDbSvc { type Error = crate::Error; type Future = BoxFuture<'static, Result>; - // Emission of Error internal event is handled upstream by the caller fn poll_ready(&mut self, cx: &mut std::task::Context) -> Poll> { self.inner.poll_ready(cx) } - // Emission of Error internal event is handled upstream by the caller fn call(&mut self, items: Vec) -> Self::Future { let input = encode_events( self.protocol_version, diff --git a/src/sinks/prometheus/remote_write.rs b/src/sinks/prometheus/remote_write.rs index 657b548cd3ec1..db5373107bfe3 100644 --- a/src/sinks/prometheus/remote_write.rs +++ b/src/sinks/prometheus/remote_write.rs @@ -283,12 +283,10 @@ impl tower::Service, PartitionKey>> for RemoteW type Error = crate::Error; type Future = BoxFuture<'static, Result>; - // Emission of Error internal event is handled upstream by the caller fn poll_ready(&mut self, _task: &mut task::Context<'_>) -> task::Poll> { task::Poll::Ready(Ok(())) } - // Emission of Error internal event is handled upstream by the caller fn call(&mut self, buffer: PartitionInnerBuffer, PartitionKey>) -> Self::Future { let (events, key) = buffer.into_parts(); let body = self.encode_events(events); diff --git a/src/sinks/statsd.rs b/src/sinks/statsd.rs index 4b02a3c3bc76d..0983278c3140a 100644 --- a/src/sinks/statsd.rs +++ b/src/sinks/statsd.rs @@ -142,6 +142,8 @@ impl SinkConfig for StatsdSinkConfig { stream::iter({ let byte_size = event.size_of(); let mut bytes = BytesMut::new(); + + // Errors are handled by `Encoder`. encoder .encode(event, &mut bytes) .map(|_| Ok(EncodedEvent::new(bytes, byte_size)))