diff --git a/src/internal_events/journald.rs b/src/internal_events/journald.rs index 693ea2a0f5dd3..0a6a448cfd1ce 100644 --- a/src/internal_events/journald.rs +++ b/src/internal_events/journald.rs @@ -1,3 +1,4 @@ +use codecs::decoding::BoxedFramingError; use metrics::counter; use vector_core::internal_event::InternalEvent; @@ -27,3 +28,92 @@ impl InternalEvent for JournaldInvalidRecordError { counter!("invalid_record_bytes_total", self.text.len() as u64); // deprecated } } + +#[derive(Debug)] +pub struct JournaldStartJournalctlError { + pub error: crate::Error, +} + +impl InternalEvent for JournaldStartJournalctlError { + fn emit(self) { + error!( + message = "Error starting journalctl process.", + error = %self.error, + stage = error_stage::RECEIVING, + error_type = error_type::COMMAND_FAILED, + ); + counter!( + "component_errors_total", 1, + "stage" => error_stage::RECEIVING, + "error_type" => error_type::COMMAND_FAILED, + ); + } +} + +#[derive(Debug)] +pub struct JournaldReadError { + pub error: BoxedFramingError, +} + +impl InternalEvent for JournaldReadError { + fn emit(self) { + error!( + message = "Cound not read from journald.", + error = %self.error, + stage = error_stage::PROCESSING, + error_type = error_type::READER_FAILED, + ); + counter!( + "component_errors_total", + 1, + "stage" => error_stage::PROCESSING, + "error_type" => error_type::READER_FAILED, + ); + } +} + +#[derive(Debug)] +pub struct JournaldCheckpointSetError { + pub error: std::io::Error, + pub filename: String, +} + +impl InternalEvent for JournaldCheckpointSetError { + fn emit(self) { + error!( + message = "Could not set journald checkpoint.", + filename = ?self.filename, + error = %self.error, + stage = error_stage::PROCESSING, + error_type = error_type::IO_FAILED, + ); + counter!( + "component_errors_total", 1, + "stage" => error_stage::PROCESSING, + "error_type" => error_type::IO_FAILED, + ); + } +} + +#[derive(Debug)] +pub struct JournaldCheckpointFileOpenError { + pub error: std::io::Error, + pub path: String, +} + +impl InternalEvent for JournaldCheckpointFileOpenError { + fn emit(self) { + error!( + message = "Unable to open checkpoint file.", + path = ?self.path, + error = %self.error, + stage = error_stage::RECEIVING, + error_type = error_type::IO_FAILED, + ); + counter!( + "component_errors_total", 1, + "stage" => error_stage::RECEIVING, + "error_type" => error_type::IO_FAILED, + ); + } +} diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 5a56ed4ad2e34..36e8ed7f209e9 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -38,7 +38,10 @@ use vector_core::config::LogNamespace; use crate::{ config::{log_schema, AcknowledgementsConfig, DataType, Output, SourceConfig, SourceContext}, event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent, Value}, - internal_events::{JournaldInvalidRecordError, OldEventsReceived}, + internal_events::{ + JournaldCheckpointFileOpenError, JournaldCheckpointSetError, JournaldInvalidRecordError, + JournaldReadError, JournaldStartJournalctlError, OldEventsReceived, StreamClosedError, + }, serde::bool_or_struct, shutdown::ShutdownSignal, SourceSender, @@ -266,11 +269,14 @@ impl JournaldSource { let checkpointer = StatefulCheckpointer::new(self.checkpoint_path.clone()) .await .map_err(|error| { - error!( - message = "Unable to open checkpoint file.", - path = ?self.checkpoint_path, - %error, - ); + emit!(JournaldCheckpointFileOpenError { + error, + path: self + .checkpoint_path + .to_str() + .unwrap_or("unknown") + .to_string(), + }); })?; let checkpointer = SharedCheckpointer::new(checkpointer); @@ -307,7 +313,7 @@ impl JournaldSource { drop(running); } Err(error) => { - error!(message = "Error starting journalctl process.", %error); + emit!(JournaldStartJournalctlError { error }); } } @@ -396,10 +402,7 @@ impl<'a> Batch<'a> { false } Some(Err(error)) => { - error!( - message = "Could not read from journald source.", - %error, - ); + emit!(JournaldReadError { error }); false } Some(Ok(bytes)) => { @@ -443,8 +446,9 @@ impl<'a> Batch<'a> { } if !self.events.is_empty() { + let count = self.events.len(); emit!(OldEventsReceived { - count: self.events.len(), + count, byte_size: self.events.size_of(), }); @@ -455,7 +459,7 @@ impl<'a> Batch<'a> { } } Err(error) => { - error!(message = "Could not send journald log.", %error); + emit!(StreamClosedError { error, count }); // `out` channel is closed, don't restart journalctl. self.exiting = Some(false); } @@ -767,11 +771,15 @@ impl StatefulCheckpointer { async fn set(&mut self, token: impl Into) { let token = token.into(); if let Err(error) = self.checkpointer.set(&token).await { - error!( - message = "Could not set journald checkpoint.", - %error, - filename = ?self.checkpointer.filename, - ); + emit!(JournaldCheckpointSetError { + error, + filename: self + .checkpointer + .filename + .to_str() + .unwrap_or("unknown") + .to_string(), + }); } self.cursor = Some(token); }