Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(file sink): Adhere to instrumentation spec #14302

Merged
merged 2 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions src/internal_events/file.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::borrow::Cow;

use bytes::Bytes;
use metrics::{counter, gauge};
use std::borrow::Cow;
use vector_core::internal_event::InternalEvent;

use crate::{
emit,
internal_events::{ComponentEventsDropped, UNINTENTIONAL},
};

#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
pub use self::source::*;
use super::prelude::{error_stage, error_type};
Expand Down Expand Up @@ -42,17 +45,19 @@ impl InternalEvent for FileBytesSent<'_> {
}

#[derive(Debug)]
pub struct FileIoError<'a> {
pub struct FileIoError<'a, P> {
pub error: std::io::Error,
pub code: &'static str,
pub message: &'static str,
pub path: Option<&'a Bytes>,
pub path: &'a P,
pub dropped_events: u64,
}

impl<'a> InternalEvent for FileIoError<'a> {
impl<'a, P: std::fmt::Debug> InternalEvent for FileIoError<'a, P> {
fn emit(self) {
error!(
message = %self.message,
path = ?self.path,
error = %self.error,
error_code = %self.code,
error_type = error_type::IO_FAILED,
Expand All @@ -65,6 +70,13 @@ impl<'a> InternalEvent for FileIoError<'a> {
"error_type" => error_type::IO_FAILED,
"stage" => error_stage::SENDING,
);

if self.dropped_events > 0 {
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: self.dropped_events,
reason: self.message,
});
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/internal_events/kubernetes_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl InternalEvent for KubernetesLogsEventNamespaceAnnotationError<'_> {
error_code = ANNOTATION_FAILED,
error_type = error_type::READER_FAILED,
stage = error_stage::PROCESSING,
rate_limit_secs = 10,
internal_log_rate_secs = 10,
);
counter!(
"component_errors_total", 1,
Expand All @@ -110,7 +110,7 @@ impl InternalEvent for KubernetesLogsEventNodeAnnotationError<'_> {
error_code = ANNOTATION_FAILED,
error_type = error_type::READER_FAILED,
stage = error_stage::PROCESSING,
rate_limit_secs = 10,
internal_log_rate_secs = 10,
);
counter!(
"component_errors_total", 1,
Expand Down Expand Up @@ -149,7 +149,7 @@ impl InternalEvent for KubernetesLogsDockerFormatParseError<'_> {
error = %self.error,
error_type = error_type::PARSER_FAILED,
stage = error_stage::PROCESSING,
rate_limit_secs = 10,
internal_log_rate_secs = 10,
);
counter!(
"component_errors_total", 1,
Expand All @@ -176,7 +176,7 @@ impl<E: std::fmt::Display> InternalEvent for KubernetesLifecycleError<E> {
error_code = KUBERNETES_LIFECYCLE,
error_type = error_type::READER_FAILED,
stage = error_stage::PROCESSING,
rate_limit_secs = 10,
internal_log_rate_secs = 10,
);
counter!(
"component_errors_total", 1,
Expand Down
4 changes: 2 additions & 2 deletions src/internal_events/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl<'a> InternalEvent for RedisSendEventError<'a> {
error_code = %self.error_code,
error_type = error_type::WRITER_FAILED,
stage = error_stage::SENDING,
rate_limit_secs = 10,
internal_log_rate_secs = 10,
);
counter!(
"component_errors_total", 1,
Expand Down Expand Up @@ -61,7 +61,7 @@ impl InternalEvent for RedisReceiveEventError {
error_code = %self.error_code,
error_type = error_type::READER_FAILED,
stage = error_stage::SENDING,
rate_limit_secs = 10,
internal_log_rate_secs = 10,
);
counter!(
"component_errors_total", 1,
Expand Down
4 changes: 2 additions & 2 deletions src/internal_events/statsd_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl<'a> InternalEvent for StatsdInvalidRecordError<'a> {
error_type = error_type::PARSER_FAILED,
stage = error_stage::PROCESSING,
bytes = %String::from_utf8_lossy(&self.bytes),
rate_limit_secs = 10,
internal_log_rate_secs = 10,
);
counter!(
"component_errors_total", 1,
Expand Down Expand Up @@ -83,7 +83,7 @@ impl<T: std::fmt::Debug + std::fmt::Display> InternalEvent for StatsdSocketError
error_code = %error_code,
error_type = error_type::CONNECTION_FAILED,
stage = error_stage::RECEIVING,
rate_limit_secs = 10,
internal_log_rate_secs = 10,
);
counter!(
"component_errors_total", 1,
Expand Down
19 changes: 14 additions & 5 deletions src/sinks/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ impl FileSink {
error,
code: "failed_closing_file",
message: "Failed to close file.",
path: Some(path),
path,
dropped_events: 0,
});
} else{
trace!(message = "Successfully closed file.", path = ?path);
Expand All @@ -263,7 +264,13 @@ impl FileSink {
// We got an expired file. All we really want is to
// flush and close it.
if let Err(error) = expired_file.close().await {
error!(message = "Failed to close file.", path = ?path, %error);
emit!(FileIoError {
error,
code: "failed_closing_file",
message: "Failed to close file.",
path: &path,
dropped_events: 0,
});
}
drop(expired_file); // ignore close error
emit!(FileOpen {
Expand All @@ -284,7 +291,7 @@ impl FileSink {
None => {
// We weren't able to find the path to use for the
// file.
// This is already logged at `partition_event`, so
// The error is already handled at `partition_event`, so
// here we just skip the event.
event.metadata().update_status(EventStatus::Errored);
return;
Expand All @@ -309,7 +316,8 @@ impl FileSink {
code: "failed_opening_file",
message: "Unable to open the file.",
error,
path: Some(&path),
path: &path,
dropped_events: 1,
});
event.metadata().update_status(EventStatus::Errored);
return;
Expand Down Expand Up @@ -347,7 +355,8 @@ impl FileSink {
code: "failed_writing_file",
message: "Failed to write the file.",
error,
path: Some(&path),
path: &path,
dropped_events: 1,
});
}
}
Expand Down