Skip to content

Commit

Permalink
fix(amqp sink): remove duplicate events (vectordotdev#18932)
Browse files Browse the repository at this point in the history
  • Loading branch information
dsmith3197 authored Oct 25, 2023
1 parent ffed6f7 commit a916605
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 106 deletions.
88 changes: 0 additions & 88 deletions src/internal_events/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,91 +88,3 @@ pub mod source {
}
}
}

#[cfg(feature = "sinks-amqp")]
pub mod sink {
use crate::emit;
use metrics::counter;
use vector_lib::internal_event::InternalEvent;
use vector_lib::internal_event::{
error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL,
};

#[derive(Debug)]
pub struct AmqpDeliveryError<'a> {
pub error: &'a lapin::Error,
}

impl InternalEvent for AmqpDeliveryError<'_> {
fn emit(self) {
const DELIVER_REASON: &str = "Unable to deliver.";

error!(message = DELIVER_REASON,
error = ?self.error,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::SENDING,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total", 1,
"error_type" => error_type::REQUEST_FAILED,
"stage" => error_stage::SENDING,
);
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: 1,
reason: DELIVER_REASON
});
}
}

#[derive(Debug)]
pub struct AmqpAcknowledgementError<'a> {
pub error: &'a lapin::Error,
}

impl InternalEvent for AmqpAcknowledgementError<'_> {
fn emit(self) {
const ACK_REASON: &str = "Acknowledgement failed.";

error!(message = ACK_REASON,
error = ?self.error,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::SENDING,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total", 1,
"error_type" => error_type::REQUEST_FAILED,
"stage" => error_stage::SENDING,
);
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: 1,
reason: ACK_REASON
});
}
}

#[derive(Debug)]
pub struct AmqpNackError;

impl InternalEvent for AmqpNackError {
fn emit(self) {
const DELIVER_REASON: &str = "Received Negative Acknowledgement from AMQP broker.";
error!(
message = DELIVER_REASON,
error_type = error_type::ACKNOWLEDGMENT_FAILED,
stage = error_stage::SENDING,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total", 1,
"error_type" => error_type::ACKNOWLEDGMENT_FAILED,
"stage" => error_stage::SENDING,
);
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: 1,
reason: DELIVER_REASON
});
}
}
}
22 changes: 4 additions & 18 deletions src/sinks/amqp/service.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
//! The main tower service that takes the request created by the request builder
//! and sends it to `AMQP`.
use crate::{
internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError, AmqpNackError},
sinks::prelude::*,
};
use crate::sinks::prelude::*;
use bytes::Bytes;
use futures::future::BoxFuture;
use lapin::{options::BasicPublishOptions, BasicProperties};
Expand Down Expand Up @@ -125,25 +122,14 @@ impl Service<AmqpRequest> for AmqpService {

match fut {
Ok(result) => match result.await {
Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => {
emit!(AmqpNackError);
Err(AmqpError::Nack)
}
Err(error) => {
// TODO: In due course the caller could emit these on error.
emit!(AmqpAcknowledgementError { error: &error });
Err(AmqpError::AcknowledgementFailed { error })
}
Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => Err(AmqpError::Nack),
Err(error) => Err(AmqpError::AcknowledgementFailed { error }),
Ok(_) => Ok(AmqpResponse {
json_size: req.metadata.into_events_estimated_json_encoded_byte_size(),
byte_size,
}),
},
Err(error) => {
// TODO: In due course the caller could emit these on error.
emit!(AmqpDeliveryError { error: &error });
Err(AmqpError::DeliveryFailed { error })
}
Err(error) => Err(AmqpError::DeliveryFailed { error }),
}
})
}
Expand Down

0 comments on commit a916605

Please sign in to comment.