diff --git a/src/internal_events/amqp.rs b/src/internal_events/amqp.rs index 4577753d83590..fa5c46db4a18d 100644 --- a/src/internal_events/amqp.rs +++ b/src/internal_events/amqp.rs @@ -88,91 +88,3 @@ pub mod source { } } } - -#[cfg(feature = "sinks-amqp")] -pub mod sink { - use crate::emit; - use metrics::counter; - use vector_lib::internal_event::InternalEvent; - use vector_lib::internal_event::{ - error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL, - }; - - #[derive(Debug)] - pub struct AmqpDeliveryError<'a> { - pub error: &'a lapin::Error, - } - - impl InternalEvent for AmqpDeliveryError<'_> { - fn emit(self) { - const DELIVER_REASON: &str = "Unable to deliver."; - - error!(message = DELIVER_REASON, - error = ?self.error, - error_type = error_type::REQUEST_FAILED, - stage = error_stage::SENDING, - internal_log_rate_limit = true, - ); - counter!( - "component_errors_total", 1, - "error_type" => error_type::REQUEST_FAILED, - "stage" => error_stage::SENDING, - ); - emit!(ComponentEventsDropped:: { - count: 1, - reason: DELIVER_REASON - }); - } - } - - #[derive(Debug)] - pub struct AmqpAcknowledgementError<'a> { - pub error: &'a lapin::Error, - } - - impl InternalEvent for AmqpAcknowledgementError<'_> { - fn emit(self) { - const ACK_REASON: &str = "Acknowledgement failed."; - - error!(message = ACK_REASON, - error = ?self.error, - error_type = error_type::REQUEST_FAILED, - stage = error_stage::SENDING, - internal_log_rate_limit = true, - ); - counter!( - "component_errors_total", 1, - "error_type" => error_type::REQUEST_FAILED, - "stage" => error_stage::SENDING, - ); - emit!(ComponentEventsDropped:: { - count: 1, - reason: ACK_REASON - }); - } - } - - #[derive(Debug)] - pub struct AmqpNackError; - - impl InternalEvent for AmqpNackError { - fn emit(self) { - const DELIVER_REASON: &str = "Received Negative Acknowledgement from AMQP broker."; - error!( - message = DELIVER_REASON, - error_type = error_type::ACKNOWLEDGMENT_FAILED, - stage = error_stage::SENDING, - internal_log_rate_limit = true, - ); - counter!( - "component_errors_total", 1, - "error_type" => error_type::ACKNOWLEDGMENT_FAILED, - "stage" => error_stage::SENDING, - ); - emit!(ComponentEventsDropped:: { - count: 1, - reason: DELIVER_REASON - }); - } - } -} diff --git a/src/sinks/amqp/service.rs b/src/sinks/amqp/service.rs index 36e2845afa821..babc502b9c193 100644 --- a/src/sinks/amqp/service.rs +++ b/src/sinks/amqp/service.rs @@ -1,9 +1,6 @@ //! The main tower service that takes the request created by the request builder //! and sends it to `AMQP`. -use crate::{ - internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError, AmqpNackError}, - sinks::prelude::*, -}; +use crate::sinks::prelude::*; use bytes::Bytes; use futures::future::BoxFuture; use lapin::{options::BasicPublishOptions, BasicProperties}; @@ -125,25 +122,14 @@ impl Service for AmqpService { match fut { Ok(result) => match result.await { - Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => { - emit!(AmqpNackError); - Err(AmqpError::Nack) - } - Err(error) => { - // TODO: In due course the caller could emit these on error. - emit!(AmqpAcknowledgementError { error: &error }); - Err(AmqpError::AcknowledgementFailed { error }) - } + Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => Err(AmqpError::Nack), + Err(error) => Err(AmqpError::AcknowledgementFailed { error }), Ok(_) => Ok(AmqpResponse { json_size: req.metadata.into_events_estimated_json_encoded_byte_size(), byte_size, }), }, - Err(error) => { - // TODO: In due course the caller could emit these on error. - emit!(AmqpDeliveryError { error: &error }); - Err(AmqpError::DeliveryFailed { error }) - } + Err(error) => Err(AmqpError::DeliveryFailed { error }), } }) }