From f5b2a29b2e8809f3783cb4f2db4dae3985a1009a Mon Sep 17 00:00:00 2001 From: kyle criddle Date: Fri, 26 Aug 2022 16:50:39 -0600 Subject: [PATCH 1/4] emit StreamClosed event --- src/sources/journald.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 2b96c82055c1b..dfee4db121243 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -42,6 +42,7 @@ use crate::{ event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent, Value}, internal_events::{ JournaldInvalidRecordError, JournaldNegativeAcknowledgmentError, OldEventsReceived, + StreamClosedError, }, serde::bool_or_struct, shutdown::ShutdownSignal, @@ -447,8 +448,9 @@ impl<'a> Batch<'a> { } if !self.events.is_empty() { + let count = self.events.len(); emit!(OldEventsReceived { - count: self.events.len(), + count, byte_size: self.events.size_of(), }); @@ -459,7 +461,7 @@ impl<'a> Batch<'a> { } } Err(error) => { - error!(message = "Could not send journald log.", %error); + emit!(StreamClosedError { error, count }); // `out` channel is closed, don't restart journalctl. self.exiting = Some(false); } From 3ee91f10bb4b9d4c1cf03e7e9e22ef7783d8f18e Mon Sep 17 00:00:00 2001 From: kyle criddle Date: Mon, 29 Aug 2022 10:43:56 -0600 Subject: [PATCH 2/4] enhancement(journald source): emit Error events to comply with instrumentation spec --- src/internal_events/journald.rs | 70 +++++++++++++++++++++++++++++++++ src/sources/journald.rs | 46 +++++++++++++++------- 2 files changed, 101 insertions(+), 15 deletions(-) diff --git a/src/internal_events/journald.rs b/src/internal_events/journald.rs index 4b105fe99aff9..c3ed5d0b51793 100644 --- a/src/internal_events/journald.rs +++ b/src/internal_events/journald.rs @@ -3,6 +3,30 @@ use vector_core::internal_event::InternalEvent; use super::prelude::{error_stage, error_type}; +#[derive(Debug)] +pub struct JournaldError { + pub message: &'static str, + pub error: E, + pub error_type: &'static str, + pub error_stage: &'static str, +} + +impl InternalEvent for JournaldError { + fn emit(self) { + error!( + message = self.message, + error = %self.error, + stage = self.error_stage, + error_type = self.error_type, + ); + counter!( + "component_errors_total", 1, + "stage" => self.error_stage, + "error_type" => self.error_type, + ); + } +} + #[derive(Debug)] pub struct JournaldInvalidRecordError { pub error: serde_json::Error, @@ -49,3 +73,49 @@ impl InternalEvent for JournaldNegativeAcknowledgmentError<'_> { ); } } + +#[derive(Debug)] +pub struct JournaldCheckpointSetError { + pub error: std::io::Error, + pub filename: String, +} + +impl InternalEvent for JournaldCheckpointSetError { + fn emit(self) { + error!( + message = "Could not set journald checkpoint.", + filename = ?self.filename, + error = %self.error, + stage = error_stage::PROCESSING, + error_type = error_type::IO_FAILED, + ); + counter!( + "component_errors_total", 1, + "stage" => error_stage::PROCESSING, + "error_type" => error_type::IO_FAILED, + ); + } +} + +#[derive(Debug)] +pub struct JournaldCheckpointFileOpenError { + pub error: std::io::Error, + pub path: String, +} + +impl InternalEvent for JournaldCheckpointFileOpenError { + fn emit(self) { + error!( + message = "Unable to open checkpoint file.", + path = ?self.path, + error = %self.error, + stage = error_stage::RECEIVING, + error_type = error_type::IO_FAILED, + ); + counter!( + "component_errors_total", 1, + "stage" => error_stage::RECEIVING, + "error_type" => error_type::IO_FAILED, + ); + } +} diff --git a/src/sources/journald.rs b/src/sources/journald.rs index dfee4db121243..521953722c6a4 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -41,6 +41,8 @@ use crate::{ config::{log_schema, AcknowledgementsConfig, DataType, Output, SourceConfig, SourceContext}, event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent, Value}, internal_events::{ + prelude::{error_stage, error_type}, + JournaldCheckpointFileOpenError, JournaldCheckpointSetError, JournaldError, JournaldInvalidRecordError, JournaldNegativeAcknowledgmentError, OldEventsReceived, StreamClosedError, }, @@ -271,11 +273,14 @@ impl JournaldSource { let checkpointer = StatefulCheckpointer::new(self.checkpoint_path.clone()) .await .map_err(|error| { - error!( - message = "Unable to open checkpoint file.", - path = ?self.checkpoint_path, - %error, - ); + emit!(JournaldCheckpointFileOpenError { + error, + path: self + .checkpoint_path + .to_str() + .unwrap_or("unknown") + .to_string(), + }); })?; let checkpointer = SharedCheckpointer::new(checkpointer); @@ -312,7 +317,12 @@ impl JournaldSource { drop(running); } Err(error) => { - error!(message = "Error starting journalctl process.", %error); + emit!(JournaldError { + message: "Error starting journalctl process.", + error, + error_type: error_type::COMMAND_FAILED, + error_stage: error_stage::RECEIVING, + }); } } @@ -401,10 +411,12 @@ impl<'a> Batch<'a> { false } Some(Err(error)) => { - error!( - message = "Could not read from journald source.", - %error, - ); + emit!(JournaldError { + message: "Could not read from journald source.", + error, + error_type: error_type::READER_FAILED, + error_stage: error_stage::PROCESSING, + }); false } Some(Ok(bytes)) => { @@ -790,11 +802,15 @@ impl StatefulCheckpointer { async fn set(&mut self, token: String) { if let Err(error) = self.checkpointer.set(&token).await { - error!( - message = "Could not set journald checkpoint.", - %error, - filename = ?self.checkpointer.filename, - ); + emit!(JournaldCheckpointSetError { + error, + filename: self + .checkpointer + .filename + .to_str() + .unwrap_or("unknown") + .to_string(), + }); } self.cursor = Some(token); } From 8d5299fad8d9a5af49cabce7df115d13f40d1e28 Mon Sep 17 00:00:00 2001 From: kyle criddle Date: Mon, 29 Aug 2022 11:29:08 -0600 Subject: [PATCH 3/4] explicit events --- src/internal_events/journald.rs | 68 +++++++++++++++++++++------------ src/sources/journald.rs | 21 +++------- 2 files changed, 49 insertions(+), 40 deletions(-) diff --git a/src/internal_events/journald.rs b/src/internal_events/journald.rs index c3ed5d0b51793..1709897a424cc 100644 --- a/src/internal_events/journald.rs +++ b/src/internal_events/journald.rs @@ -1,32 +1,9 @@ +use codecs::decoding::BoxedFramingError; use metrics::counter; use vector_core::internal_event::InternalEvent; use super::prelude::{error_stage, error_type}; -#[derive(Debug)] -pub struct JournaldError { - pub message: &'static str, - pub error: E, - pub error_type: &'static str, - pub error_stage: &'static str, -} - -impl InternalEvent for JournaldError { - fn emit(self) { - error!( - message = self.message, - error = %self.error, - stage = self.error_stage, - error_type = self.error_type, - ); - counter!( - "component_errors_total", 1, - "stage" => self.error_stage, - "error_type" => self.error_type, - ); - } -} - #[derive(Debug)] pub struct JournaldInvalidRecordError { pub error: serde_json::Error, @@ -74,6 +51,49 @@ impl InternalEvent for JournaldNegativeAcknowledgmentError<'_> { } } +#[derive(Debug)] +pub struct JournaldStartJournalctlError { + pub error: crate::Error, +} + +impl InternalEvent for JournaldStartJournalctlError { + fn emit(self) { + error!( + message = "Error starting journalctl process.", + error = %self.error, + stage = error_stage::RECEIVING, + error_type = error_type::COMMAND_FAILED, + ); + counter!( + "component_errors_total", 1, + "stage" => error_stage::RECEIVING, + "error_type" => error_type::COMMAND_FAILED, + ); + } +} + +#[derive(Debug)] +pub struct JournaldReadError { + pub error: BoxedFramingError, +} + +impl InternalEvent for JournaldReadError { + fn emit(self) { + error!( + message = "Cound not read from journald source.", + error = %self.error, + stage = error_stage::PROCESSING, + error_type = error_type::READER_FAILED, + ); + counter!( + "component_errors_total", + 1, + "stage" => error_stage::PROCESSING, + "error_type" => error_type::READER_FAILED, + ); + } +} + #[derive(Debug)] pub struct JournaldCheckpointSetError { pub error: std::io::Error, diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 521953722c6a4..684651810f95d 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -41,10 +41,9 @@ use crate::{ config::{log_schema, AcknowledgementsConfig, DataType, Output, SourceConfig, SourceContext}, event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent, Value}, internal_events::{ - prelude::{error_stage, error_type}, - JournaldCheckpointFileOpenError, JournaldCheckpointSetError, JournaldError, - JournaldInvalidRecordError, JournaldNegativeAcknowledgmentError, OldEventsReceived, - StreamClosedError, + JournaldCheckpointFileOpenError, JournaldCheckpointSetError, JournaldInvalidRecordError, + JournaldNegativeAcknowledgmentError, JournaldReadError, JournaldStartJournalctlError, + OldEventsReceived, StreamClosedError, }, serde::bool_or_struct, shutdown::ShutdownSignal, @@ -317,12 +316,7 @@ impl JournaldSource { drop(running); } Err(error) => { - emit!(JournaldError { - message: "Error starting journalctl process.", - error, - error_type: error_type::COMMAND_FAILED, - error_stage: error_stage::RECEIVING, - }); + emit!(JournaldStartJournalctlError { error }); } } @@ -411,12 +405,7 @@ impl<'a> Batch<'a> { false } Some(Err(error)) => { - emit!(JournaldError { - message: "Could not read from journald source.", - error, - error_type: error_type::READER_FAILED, - error_stage: error_stage::PROCESSING, - }); + emit!(JournaldReadError { error }); false } Some(Ok(bytes)) => { From 269ce1677ce3daadc9139a73bdfc9c91c5d29e18 Mon Sep 17 00:00:00 2001 From: Kyle Criddle Date: Mon, 29 Aug 2022 12:01:23 -0600 Subject: [PATCH 4/4] Update src/internal_events/journald.rs Co-authored-by: Spencer Gilbert --- src/internal_events/journald.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/internal_events/journald.rs b/src/internal_events/journald.rs index 1709897a424cc..f7f95b1c192bc 100644 --- a/src/internal_events/journald.rs +++ b/src/internal_events/journald.rs @@ -80,7 +80,7 @@ pub struct JournaldReadError { impl InternalEvent for JournaldReadError { fn emit(self) { error!( - message = "Cound not read from journald source.", + message = "Cound not read from journald.", error = %self.error, stage = error_stage::PROCESSING, error_type = error_type::READER_FAILED,