diff --git a/src/internal_events/parser.rs b/src/internal_events/parser.rs index 64dac33c6e1e9..d7a25e8515c25 100644 --- a/src/internal_events/parser.rs +++ b/src/internal_events/parser.rs @@ -3,6 +3,10 @@ use std::borrow::Cow; use metrics::counter; use vector_core::internal_event::InternalEvent; +use crate::{ + emit, + internal_events::{ComponentEventsDropped, UNINTENTIONAL}, +}; use vector_common::internal_event::{error_stage, error_type}; fn truncate_string_at(s: &str, maxlen: usize) -> Cow { @@ -51,8 +55,9 @@ pub struct ParserMissingFieldError<'a> { impl InternalEvent for ParserMissingFieldError<'_> { fn emit(self) { + let reason = "Field does not exist."; error!( - message = "Field does not exist.", + message = reason, field = %self.field, error_code = "field_not_found", error_type = error_type::CONDITION_FAILED, @@ -68,6 +73,8 @@ impl InternalEvent for ParserMissingFieldError<'_> { ); // deprecated counter!("processing_errors_total", 1, "error_type" => "missing_field"); + + emit!(ComponentEventsDropped:: { count: 1, reason }); } } diff --git a/src/sinks/datadog/events/sink.rs b/src/sinks/datadog/events/sink.rs index 4ee87c2fc7204..214ce8293ae56 100644 --- a/src/sinks/datadog/events/sink.rs +++ b/src/sinks/datadog/events/sink.rs @@ -3,14 +3,18 @@ use std::{fmt, num::NonZeroUsize}; use async_trait::async_trait; use futures::{stream::BoxStream, StreamExt}; use tower::Service; +use vector_config::NamedComponent; use vector_core::stream::DriverResponse; use crate::{ config::log_schema, event::Event, - internal_events::ParserMissingFieldError, + internal_events::{ParserMissingFieldError, SinkRequestBuildError}, sinks::{ - datadog::events::request_builder::{DatadogEventsRequest, DatadogEventsRequestBuilder}, + datadog::events::{ + request_builder::{DatadogEventsRequest, DatadogEventsRequestBuilder}, + DatadogEventsConfig, + }, util::{SinkBuilderExt, StreamSink}, }, }; @@ -34,8 +38,11 @@ where .request_builder(concurrency_limit, DatadogEventsRequestBuilder::new()) .filter_map(|request| async move { match request { - Err(e) => { - error!("Failed to build DatadogEvents request: {:?}.", e); + Err(error) => { + emit!(SinkRequestBuildError { + name: DatadogEventsConfig::NAME, + error + }); None } Ok(req) => Some(req), diff --git a/src/sinks/datadog/events/tests.rs b/src/sinks/datadog/events/tests.rs index 34fb93672eaee..2d7531b2ca2ba 100644 --- a/src/sinks/datadog/events/tests.rs +++ b/src/sinks/datadog/events/tests.rs @@ -17,7 +17,7 @@ use crate::{ event::EventArray, sinks::util::test::{build_test_server_status, load_sink}, test_util::{ - components::{self, HTTP_SINK_TAGS}, + components::{self, COMPONENT_ERROR_TAGS, HTTP_SINK_TAGS}, next_addr, random_lines_with_stream, }, }; @@ -63,10 +63,10 @@ async fn start_test( let (batch, mut receiver) = BatchNotifier::new_with_receiver(); let (expected, events) = random_events_with_stream(100, 10, Some(batch)); - components::init_test(); - sink.run(events).await.unwrap(); if batch_status == BatchStatus::Delivered { - components::SINK_TESTS.assert(&HTTP_SINK_TAGS); + components::run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + } else { + components::run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await; } assert_eq!(receiver.try_recv(), Ok(batch_status));