diff --git a/src/internal_events/file.rs b/src/internal_events/file.rs index b776249598d86..b98f2bd980981 100644 --- a/src/internal_events/file.rs +++ b/src/internal_events/file.rs @@ -1,9 +1,12 @@ -use std::borrow::Cow; - -use bytes::Bytes; use metrics::{counter, gauge}; +use std::borrow::Cow; use vector_core::internal_event::InternalEvent; +use crate::{ + emit, + internal_events::{ComponentEventsDropped, UNINTENTIONAL}, +}; + #[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))] pub use self::source::*; use super::prelude::{error_stage, error_type}; @@ -42,17 +45,19 @@ impl InternalEvent for FileBytesSent<'_> { } #[derive(Debug)] -pub struct FileIoError<'a> { +pub struct FileIoError<'a, P> { pub error: std::io::Error, pub code: &'static str, pub message: &'static str, - pub path: Option<&'a Bytes>, + pub path: &'a P, + pub dropped_events: u64, } -impl<'a> InternalEvent for FileIoError<'a> { +impl<'a, P: std::fmt::Debug> InternalEvent for FileIoError<'a, P> { fn emit(self) { error!( message = %self.message, + path = ?self.path, error = %self.error, error_code = %self.code, error_type = error_type::IO_FAILED, @@ -65,6 +70,13 @@ impl<'a> InternalEvent for FileIoError<'a> { "error_type" => error_type::IO_FAILED, "stage" => error_stage::SENDING, ); + + if self.dropped_events > 0 { + emit!(ComponentEventsDropped:: { + count: self.dropped_events, + reason: self.message, + }); + } } } diff --git a/src/internal_events/kubernetes_logs.rs b/src/internal_events/kubernetes_logs.rs index 503f770efd097..3586390474674 100644 --- a/src/internal_events/kubernetes_logs.rs +++ b/src/internal_events/kubernetes_logs.rs @@ -85,7 +85,7 @@ impl InternalEvent for KubernetesLogsEventNamespaceAnnotationError<'_> { error_code = ANNOTATION_FAILED, error_type = error_type::READER_FAILED, stage = error_stage::PROCESSING, - rate_limit_secs = 10, + internal_log_rate_secs = 10, ); counter!( "component_errors_total", 1, @@ -110,7 +110,7 @@ impl InternalEvent for KubernetesLogsEventNodeAnnotationError<'_> { error_code = ANNOTATION_FAILED, error_type = error_type::READER_FAILED, stage = error_stage::PROCESSING, - rate_limit_secs = 10, + internal_log_rate_secs = 10, ); counter!( "component_errors_total", 1, @@ -149,7 +149,7 @@ impl InternalEvent for KubernetesLogsDockerFormatParseError<'_> { error = %self.error, error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, - rate_limit_secs = 10, + internal_log_rate_secs = 10, ); counter!( "component_errors_total", 1, @@ -176,7 +176,7 @@ impl InternalEvent for KubernetesLifecycleError { error_code = KUBERNETES_LIFECYCLE, error_type = error_type::READER_FAILED, stage = error_stage::PROCESSING, - rate_limit_secs = 10, + internal_log_rate_secs = 10, ); counter!( "component_errors_total", 1, diff --git a/src/internal_events/redis.rs b/src/internal_events/redis.rs index ce0a8bcd207b8..c2dcbf5d87de3 100644 --- a/src/internal_events/redis.rs +++ b/src/internal_events/redis.rs @@ -27,7 +27,7 @@ impl<'a> InternalEvent for RedisSendEventError<'a> { error_code = %self.error_code, error_type = error_type::WRITER_FAILED, stage = error_stage::SENDING, - rate_limit_secs = 10, + internal_log_rate_secs = 10, ); counter!( "component_errors_total", 1, @@ -61,7 +61,7 @@ impl InternalEvent for RedisReceiveEventError { error_code = %self.error_code, error_type = error_type::READER_FAILED, stage = error_stage::SENDING, - rate_limit_secs = 10, + internal_log_rate_secs = 10, ); counter!( "component_errors_total", 1, diff --git a/src/internal_events/statsd_source.rs b/src/internal_events/statsd_source.rs index 86ba943afe1cd..c3186bff31011 100644 --- a/src/internal_events/statsd_source.rs +++ b/src/internal_events/statsd_source.rs @@ -21,7 +21,7 @@ impl<'a> InternalEvent for StatsdInvalidRecordError<'a> { error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, bytes = %String::from_utf8_lossy(&self.bytes), - rate_limit_secs = 10, + internal_log_rate_secs = 10, ); counter!( "component_errors_total", 1, @@ -83,7 +83,7 @@ impl InternalEvent for StatsdSocketError error_code = %error_code, error_type = error_type::CONNECTION_FAILED, stage = error_stage::RECEIVING, - rate_limit_secs = 10, + internal_log_rate_secs = 10, ); counter!( "component_errors_total", 1, diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index f99311b97eafc..248de6df54bcf 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -239,7 +239,8 @@ impl FileSink { error, code: "failed_closing_file", message: "Failed to close file.", - path: Some(path), + path, + dropped_events: 0, }); } else{ trace!(message = "Successfully closed file.", path = ?path); @@ -263,7 +264,13 @@ impl FileSink { // We got an expired file. All we really want is to // flush and close it. if let Err(error) = expired_file.close().await { - error!(message = "Failed to close file.", path = ?path, %error); + emit!(FileIoError { + error, + code: "failed_closing_file", + message: "Failed to close file.", + path: &path, + dropped_events: 0, + }); } drop(expired_file); // ignore close error emit!(FileOpen { @@ -284,7 +291,7 @@ impl FileSink { None => { // We weren't able to find the path to use for the // file. - // This is already logged at `partition_event`, so + // The error is already handled at `partition_event`, so // here we just skip the event. event.metadata().update_status(EventStatus::Errored); return; @@ -309,7 +316,8 @@ impl FileSink { code: "failed_opening_file", message: "Unable to open the file.", error, - path: Some(&path), + path: &path, + dropped_events: 1, }); event.metadata().update_status(EventStatus::Errored); return; @@ -347,7 +355,8 @@ impl FileSink { code: "failed_writing_file", message: "Failed to write the file.", error, - path: Some(&path), + path: &path, + dropped_events: 1, }); } }