From 52a8036722ab5cd4ed92d8916b89d85d6447f8c0 Mon Sep 17 00:00:00 2001 From: neuronull Date: Wed, 19 Jul 2023 14:57:47 -0600 Subject: [PATCH] fix(component validation): make tests deterministic through absolute comparisons instead of bounds checks (#17956) ### Changes - Fixes a bug which was occasionally circumvented by comparing against a lower bound for some of the _bytes metrics- it seems the offered reasoning for the lower bound (inability to synchronize with the component's internal logic) was inaccurate. - The bug itself was that we were converting the `EventData` read from the yaml file, into a proper Event (and thus generating metadata for it), in two places- first when sending the event as input in the input runner, and again in the validator to construct the expected result. Because the event construction adds a timestamp, and because our determination of timestamp precision is based on whether there are trailing zeros that can be trimmed, this created a scenario where the actual event input to the component under test, and the event used to calculated expected metrics, had a timestamp that varied in it's number of bytes. This of course did not happen all of the time, which meant that the tests sometimes passed and sometimes failed. - The fix for this is to create the proper Event immediately after parsing the EventData from the yaml file (thereby only creating it once). - Moves the calculation of expected metrics, out of the Validators and up to the input/output runners. This removes the need for a lot of logic that was previously being done in the Validators. - Increases the static sleep we have when initializing the runner topology from 1s to 2s, and additionally increased the number of internal metric ticks to wait for before shutting down the telemetry from 2 ticks to 3. I found through experimentation that occasionally we would receive no events. The two attributes were increased independently and it was only through increasing both that the pass rate became 100%. - Extracts duplicated code in the sources validator functions into helper functions. ### Testing Done The tests were run in a loop, stopping on the first failure. This method was used to calibrate the waits we have on the topology as mentioned above. Before the current settings, the maximum sequential passes was about 100. With the current settings, the loop was manually stopped at about 1.2k iterations. --- src/components/validation/mod.rs | 13 + src/components/validation/resources/event.rs | 111 ++++- src/components/validation/resources/http.rs | 69 +--- src/components/validation/resources/mod.rs | 6 +- src/components/validation/runner/io.rs | 31 +- src/components/validation/runner/mod.rs | 171 ++++++-- src/components/validation/runner/telemetry.rs | 49 +-- .../validators/component_spec/mod.rs | 32 +- .../validators/component_spec/sources.rs | 383 +++++------------- src/components/validation/validators/mod.rs | 5 +- 10 files changed, 426 insertions(+), 444 deletions(-) diff --git a/src/components/validation/mod.rs b/src/components/validation/mod.rs index 7457cd6548ad9..90723ff416a50 100644 --- a/src/components/validation/mod.rs +++ b/src/components/validation/mod.rs @@ -170,6 +170,19 @@ macro_rules! register_validatable_component { }; } +/// Input and Output runners populate this structure as they send and receive events. +/// The structure is passed into the validator to use as the expected values for the +/// metrics that the components under test actually output. +#[derive(Default)] +pub struct RunnerMetrics { + pub received_events_total: u64, + pub received_event_bytes_total: u64, + pub received_bytes_total: u64, + pub sent_bytes_total: u64, // a reciprocal for received_bytes_total + pub sent_event_bytes_total: u64, + pub sent_events_total: u64, +} + #[cfg(all(test, feature = "component-validation-tests"))] mod tests { use std::{ diff --git a/src/components/validation/resources/event.rs b/src/components/validation/resources/event.rs index 21c38e58bfad5..1f91acfbe660d 100644 --- a/src/components/validation/resources/event.rs +++ b/src/components/validation/resources/event.rs @@ -1,10 +1,20 @@ +use bytes::BytesMut; use serde::Deserialize; +use snafu::Snafu; +use tokio_util::codec::Encoder as _; + +use crate::codecs::Encoder; +use codecs::{ + encoding, JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues, + NewlineDelimitedEncoder, +}; use vector_core::event::{Event, LogEvent}; -/// An event used in a test case. +/// A test case event for deserialization from yaml file. +/// This is an intermediary step to TestEvent. #[derive(Clone, Debug, Deserialize)] #[serde(untagged)] -pub enum TestEvent { +pub enum RawTestEvent { /// The event is used, as-is, without modification. Passthrough(EventData), @@ -20,15 +30,6 @@ pub enum TestEvent { Modified { modified: bool, event: EventData }, } -impl TestEvent { - pub fn into_event(self) -> Event { - match self { - Self::Passthrough(event) => event.into_event(), - Self::Modified { event, .. } => event.into_event(), - } - } -} - #[derive(Clone, Debug, Deserialize)] #[serde(untagged)] pub enum EventData { @@ -44,3 +45,91 @@ impl EventData { } } } + +/// An event used in a test case. +/// It is important to have created the event with all fields, immediately after deserializing from the +/// test case definition yaml file. This ensures that the event data we are using in the expected/actual +/// metrics collection is based on the same event. Namely, one issue that can arise from creating the event +/// from the event data twice (once for the expected and once for actual), it can result in a timestamp in +/// the event which may or may not have the same millisecond precision as it's counterpart. +#[derive(Clone, Debug, Deserialize)] +#[serde(from = "RawTestEvent")] +#[serde(untagged)] +pub enum TestEvent { + /// The event is used, as-is, without modification. + Passthrough(Event), + + /// The event is potentially modified by the external resource. + /// + /// The modification made is dependent on the external resource, but this mode is made available + /// for when a test case wants to exercise the failure path, but cannot cause a failure simply + /// by constructing the event in a certain way i.e. adding an invalid field, or removing a + /// required field, or using an invalid field value, and so on. + /// + /// For transforms and sinks, generally, the only way to cause an error is if the event itself + /// is malformed in some way, which can be achieved without this test event variant. + Modified { modified: bool, event: Event }, +} + +impl TestEvent { + #[allow(clippy::missing_const_for_fn)] // const cannot run destructor + pub fn into_event(self) -> Event { + match self { + Self::Passthrough(event) => event, + Self::Modified { event, .. } => event, + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Snafu)] +pub enum RawTestEventParseError {} + +impl From for TestEvent { + fn from(other: RawTestEvent) -> Self { + match other { + RawTestEvent::Passthrough(event_data) => { + TestEvent::Passthrough(event_data.into_event()) + } + RawTestEvent::Modified { modified, event } => TestEvent::Modified { + modified, + event: event.into_event(), + }, + } + } +} + +pub fn encode_test_event( + encoder: &mut Encoder, + buf: &mut BytesMut, + event: TestEvent, +) { + match event { + TestEvent::Passthrough(event) => { + // Encode the event normally. + encoder + .encode(event, buf) + .expect("should not fail to encode input event"); + } + TestEvent::Modified { event, .. } => { + // This is a little fragile, but we check what serializer this encoder uses, and based + // on `Serializer::supports_json`, we choose an opposing codec. For example, if the + // encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise + // versa. + let mut alt_encoder = if encoder.serializer().supports_json() { + Encoder::::new( + LengthDelimitedEncoder::new().into(), + LogfmtSerializer::new().into(), + ) + } else { + Encoder::::new( + NewlineDelimitedEncoder::new().into(), + JsonSerializer::new(MetricTagValues::default()).into(), + ) + }; + + alt_encoder + .encode(event, buf) + .expect("should not fail to encode input event"); + } + } +} diff --git a/src/components/validation/resources/http.rs b/src/components/validation/resources/http.rs index 8fa066b6f3f4a..44e2b7ba7579b 100644 --- a/src/components/validation/resources/http.rs +++ b/src/components/validation/resources/http.rs @@ -1,5 +1,6 @@ use std::{ collections::VecDeque, + future::Future, net::{IpAddr, SocketAddr}, str::FromStr, sync::Arc, @@ -11,26 +12,18 @@ use axum::{ Router, }; use bytes::BytesMut; -use codecs::{ - encoding, JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues, - NewlineDelimitedEncoder, -}; use http::{Method, Request, StatusCode, Uri}; use hyper::{Body, Client, Server}; -use std::future::Future; use tokio::{ select, sync::{mpsc, oneshot, Mutex, Notify}, }; -use tokio_util::codec::{Decoder, Encoder as _}; -use vector_core::event::Event; +use tokio_util::codec::Decoder; -use crate::{ - codecs::Encoder, - components::validation::sync::{Configuring, TaskCoordinator}, -}; +use crate::components::validation::sync::{Configuring, TaskCoordinator}; +use vector_core::event::Event; -use super::{ResourceCodec, ResourceDirection, TestEvent}; +use super::{encode_test_event, ResourceCodec, ResourceDirection, TestEvent}; /// An HTTP resource. #[derive(Clone)] @@ -67,7 +60,7 @@ impl HttpResourceConfig { self, direction: ResourceDirection, codec: ResourceCodec, - output_tx: mpsc::Sender, + output_tx: mpsc::Sender>, task_coordinator: &TaskCoordinator, ) { match direction { @@ -230,7 +223,7 @@ fn spawn_input_http_client( fn spawn_output_http_server( config: HttpResourceConfig, codec: ResourceCodec, - output_tx: mpsc::Sender, + output_tx: mpsc::Sender>, task_coordinator: &TaskCoordinator, ) { // This HTTP server will wait for events to be sent by a sink, and collect them and send them on @@ -252,12 +245,10 @@ fn spawn_output_http_server( loop { match decoder.decode_eof(&mut body) { Ok(Some((events, _byte_size))) => { - for event in events { - output_tx - .send(event) - .await - .expect("should not fail to send output event"); - } + output_tx + .send(events.to_vec()) + .await + .expect("should not fail to send output event"); } Ok(None) => return StatusCode::OK.into_response(), Err(_) => return StatusCode::INTERNAL_SERVER_ERROR.into_response(), @@ -290,7 +281,7 @@ fn spawn_output_http_server( fn spawn_output_http_client( _config: HttpResourceConfig, _codec: ResourceCodec, - _output_tx: mpsc::Sender, + _output_tx: mpsc::Sender>, _task_coordinator: &TaskCoordinator, ) { // TODO: The `prometheus_exporter` sink is the only sink that exposes an HTTP server which must be @@ -400,39 +391,3 @@ fn socketaddr_from_uri(uri: &Uri) -> SocketAddr { SocketAddr::from((uri_host, uri_port)) } - -pub fn encode_test_event( - encoder: &mut Encoder, - buf: &mut BytesMut, - event: TestEvent, -) { - match event { - TestEvent::Passthrough(event) => { - // Encode the event normally. - encoder - .encode(event.into_event(), buf) - .expect("should not fail to encode input event"); - } - TestEvent::Modified { event, .. } => { - // This is a little fragile, but we check what serializer this encoder uses, and based - // on `Serializer::supports_json`, we choose an opposing codec. For example, if the - // encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise - // versa. - let mut alt_encoder = if encoder.serializer().supports_json() { - Encoder::::new( - LengthDelimitedEncoder::new().into(), - LogfmtSerializer::new().into(), - ) - } else { - Encoder::::new( - NewlineDelimitedEncoder::new().into(), - JsonSerializer::new(MetricTagValues::default()).into(), - ) - }; - - alt_encoder - .encode(event.into_event(), buf) - .expect("should not fail to encode input event"); - } - } -} diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index 2b9fc3c542ccb..a22d6fc324dbd 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -13,8 +13,8 @@ use vector_core::{config::DataType, event::Event}; use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming}; -pub use self::event::{EventData, TestEvent}; -pub use self::http::{encode_test_event, HttpResourceConfig}; +pub use self::event::{encode_test_event, TestEvent}; +pub use self::http::HttpResourceConfig; use super::sync::{Configuring, TaskCoordinator}; @@ -308,7 +308,7 @@ impl ExternalResource { /// Spawns this resource for use as an output for a sink. pub fn spawn_as_output( self, - output_tx: mpsc::Sender, + output_tx: mpsc::Sender>, task_coordinator: &TaskCoordinator, ) { match self.definition { diff --git a/src/components/validation/runner/io.rs b/src/components/validation/runner/io.rs index c454ba433c847..55e4fca1eaecf 100644 --- a/src/components/validation/runner/io.rs +++ b/src/components/validation/runner/io.rs @@ -27,11 +27,11 @@ use crate::{ #[derive(Clone)] pub struct EventForwardService { - tx: mpsc::Sender, + tx: mpsc::Sender>, } -impl From> for EventForwardService { - fn from(tx: mpsc::Sender) -> Self { +impl From>> for EventForwardService { + fn from(tx: mpsc::Sender>) -> Self { Self { tx } } } @@ -42,14 +42,17 @@ impl VectorService for EventForwardService { &self, request: tonic::Request, ) -> Result, Status> { - let events = request.into_inner().events.into_iter().map(Event::from); - - for event in events { - self.tx - .send(event) - .await - .expect("event forward rx should not close first"); - } + let events = request + .into_inner() + .events + .into_iter() + .map(Event::from) + .collect(); + + self.tx + .send(events) + .await + .expect("event forward rx should not close first"); Ok(tonic::Response::new(PushEventsResponse {})) } @@ -74,7 +77,7 @@ pub struct InputEdge { pub struct OutputEdge { listen_addr: GrpcAddress, service: VectorServer, - rx: mpsc::Receiver, + rx: mpsc::Receiver>, } impl InputEdge { @@ -129,7 +132,7 @@ impl OutputEdge { pub fn spawn_output_server( self, task_coordinator: &TaskCoordinator, - ) -> mpsc::Receiver { + ) -> mpsc::Receiver> { spawn_grpc_server(self.listen_addr, self.service, task_coordinator); self.rx } @@ -184,5 +187,5 @@ pub fn spawn_grpc_server( pub struct ControlledEdges { pub input: Option>, - pub output: Option>, + pub output: Option>>, } diff --git a/src/components/validation/runner/mod.rs b/src/components/validation/runner/mod.rs index 2c2a066b17806..b4bcd72de75bf 100644 --- a/src/components/validation/runner/mod.rs +++ b/src/components/validation/runner/mod.rs @@ -2,18 +2,34 @@ pub mod config; mod io; mod telemetry; -use std::{collections::HashMap, path::PathBuf, time::Duration}; +use std::{ + collections::HashMap, + path::PathBuf, + sync::{Arc, Mutex}, + time::Duration, +}; + +use bytes::BytesMut; +use tokio::{ + runtime::Builder, + select, + sync::mpsc::{self, Receiver, Sender}, + task::JoinHandle, +}; +use tokio_util::codec::Encoder as _; -use tokio::{runtime::Builder, select, sync::mpsc}; -use vector_core::event::Event; +use codecs::encoding; +use vector_core::{event::Event, EstimatedJsonEncodedSizeOf}; use crate::{ - components::validation::TestCase, + codecs::Encoder, + components::validation::{RunnerMetrics, TestCase}, config::{ConfigBuilder, ConfigDiff}, topology, }; use super::{ + encode_test_event, sync::{Configuring, TaskCoordinator}, ComponentType, TestCaseExpectation, TestEvent, ValidationConfiguration, Validator, }; @@ -73,7 +89,7 @@ pub enum RunnerOutput { /// external resource pulls output events from the sink. /// /// Only sinks have external inputs. - External(mpsc::Receiver), + External(mpsc::Receiver>), /// The component uses a "controlled" edge for its output. /// @@ -93,8 +109,8 @@ impl RunnerOutput { /// this function will panic, as one or the other must be provided. pub fn into_receiver( self, - controlled_edge: Option>, - ) -> mpsc::Receiver { + controlled_edge: Option>>, + ) -> mpsc::Receiver> { match (self, controlled_edge) { (Self::External(_), Some(_)) => panic!("Runner output declared as external resource, but controlled output edge was also specified."), (Self::Controlled, None) => panic!("Runner output declared as controlled, but no controlled output edge was specified."), @@ -216,6 +232,11 @@ impl Runner { debug!("Component topology configuration built and telemetry collector spawned."); + // Create the data structure that the input and output runners will use to store + // their received/sent metrics. This is then shared with the Validator for comparison + // against the actual metrics output by the component under test. + let runner_metrics = Arc::new(Mutex::new(RunnerMetrics::default())); + // After that, we'll build the external resource necessary for this component, if any. // Once that's done, we build the input event/output event sender and receiver based on // whatever we spawned for an external resource. @@ -226,13 +247,13 @@ impl Runner { // For example, if we're validating a source, we would have added a filler sink for our // controlled output edge, which means we then need a server task listening for the // events sent by that sink. - let (runner_input, runner_output) = build_external_resource( + let (runner_input, runner_output, maybe_runner_encoder) = build_external_resource( &self.configuration, &input_task_coordinator, &output_task_coordinator, ); let input_tx = runner_input.into_sender(controlled_edges.input); - let mut output_rx = runner_output.into_receiver(controlled_edges.output); + let output_rx = runner_output.into_receiver(controlled_edges.output); debug!("External resource (if any) and controlled edges built and spawned."); // Now with any external resource spawned, as well as any tasks for handling controlled @@ -271,25 +292,20 @@ impl Runner { // like the aforementioned unit tests, switch to any improved mechanism we come up with // in the future to make these tests more deterministic and waste less time waiting // around if we can avoid it. - tokio::time::sleep(Duration::from_secs(1)).await; - - let input_events = test_case.events.clone(); - let input_driver = tokio::spawn(async move { - for input_event in input_events { - input_tx - .send(input_event) - .await - .expect("input channel should not be closed"); - } - }); + tokio::time::sleep(Duration::from_secs(2)).await; - let output_driver = tokio::spawn(async move { - let mut output_events = Vec::new(); - while let Some(output_event) = output_rx.recv().await { - output_events.push(output_event); - } - output_events - }); + let input_driver = spawn_input_driver( + test_case.events.clone(), + input_tx, + &runner_metrics, + maybe_runner_encoder.as_ref().cloned(), + ); + + let output_driver = spawn_output_driver( + output_rx, + &runner_metrics, + maybe_runner_encoder.as_ref().cloned(), + ); // At this point, the component topology is running, and all input/output/telemetry // tasks are running as well. Our input driver should be sending (or will have already @@ -334,12 +350,12 @@ impl Runner { .values() .map(|validator| { validator.check_validation( - self.configuration.clone(), component_type, expectation, &input_events, &output_events, &telemetry_events, + &runner_metrics.lock().unwrap(), ) }) .collect(); @@ -397,9 +413,12 @@ fn build_external_resource( configuration: &ValidationConfiguration, input_task_coordinator: &TaskCoordinator, output_task_coordinator: &TaskCoordinator, -) -> (RunnerInput, RunnerOutput) { +) -> (RunnerInput, RunnerOutput, Option>) { let component_type = configuration.component_type(); let maybe_external_resource = configuration.external_resource(); + let maybe_encoder = maybe_external_resource + .as_ref() + .map(|resource| resource.codec.into_encoder()); match component_type { ComponentType::Source => { // As an external resource for a source, we create a channel that the validation runner @@ -411,11 +430,15 @@ fn build_external_resource( maybe_external_resource.expect("a source must always have an external resource"); resource.spawn_as_input(rx, input_task_coordinator); - (RunnerInput::External(tx), RunnerOutput::Controlled) + ( + RunnerInput::External(tx), + RunnerOutput::Controlled, + maybe_encoder, + ) } ComponentType::Transform => { // Transforms have no external resources. - (RunnerInput::Controlled, RunnerOutput::Controlled) + (RunnerInput::Controlled, RunnerOutput::Controlled, None) } ComponentType::Sink => { // As an external resource for a sink, we create a channel that the validation runner @@ -427,7 +450,11 @@ fn build_external_resource( maybe_external_resource.expect("a sink must always have an external resource"); resource.spawn_as_output(tx, output_task_coordinator); - (RunnerInput::Controlled, RunnerOutput::External(rx)) + ( + RunnerInput::Controlled, + RunnerOutput::External(rx), + maybe_encoder, + ) } } } @@ -483,6 +510,86 @@ fn spawn_component_topology( }); } +fn spawn_input_driver( + input_events: Vec, + input_tx: Sender, + runner_metrics: &Arc>, + mut maybe_encoder: Option>, +) -> JoinHandle<()> { + let input_runner_metrics = Arc::clone(runner_metrics); + + tokio::spawn(async move { + for input_event in input_events { + input_tx + .send(input_event.clone()) + .await + .expect("input channel should not be closed"); + + // Update the runner metrics for the sent event. This will later + // be used in the Validators, as the "expected" case. + let mut input_runner_metrics = input_runner_metrics.lock().unwrap(); + + if let Some(encoder) = maybe_encoder.as_mut() { + let mut buffer = BytesMut::new(); + encode_test_event(encoder, &mut buffer, input_event.clone()); + + input_runner_metrics.sent_bytes_total += buffer.len() as u64; + } + + let (modified, event) = match input_event { + TestEvent::Passthrough(event) => (false, event), + TestEvent::Modified { modified, event } => (modified, event), + }; + + // account for failure case + if !modified { + input_runner_metrics.sent_events_total += 1; + + input_runner_metrics.sent_event_bytes_total += + vec![event].estimated_json_encoded_size_of().get() as u64; + } + } + }) +} + +fn spawn_output_driver( + mut output_rx: Receiver>, + runner_metrics: &Arc>, + maybe_encoder: Option>, +) -> JoinHandle> { + let output_runner_metrics = Arc::clone(runner_metrics); + + tokio::spawn(async move { + let mut output_events = Vec::new(); + while let Some(events) = output_rx.recv().await { + output_events.extend(events.clone()); + + // Update the runner metrics for the received event. This will later + // be used in the Validators, as the "expected" case. + let mut output_runner_metrics = output_runner_metrics.lock().unwrap(); + + for output_event in events { + output_runner_metrics.received_events_total += 1; + output_runner_metrics.received_event_bytes_total += vec![output_event.clone()] + .estimated_json_encoded_size_of() + .get() + as u64; + + if let Some(encoder) = maybe_encoder.as_ref() { + let mut buffer = BytesMut::new(); + encoder + .clone() + .encode(output_event, &mut buffer) + .expect("should not fail to encode output event"); + + output_runner_metrics.received_bytes_total += buffer.len() as u64; + } + } + } + output_events + }) +} + fn initialize_test_environment() { // Make sure our metrics recorder is installed and in test mode. This is necessary for // proper internal telemetry collect when running the component topology, even though it's diff --git a/src/components/validation/runner/telemetry.rs b/src/components/validation/runner/telemetry.rs index 9ef813838f1dc..76e8c29f62a9f 100644 --- a/src/components/validation/runner/telemetry.rs +++ b/src/components/validation/runner/telemetry.rs @@ -23,14 +23,13 @@ const INTERNAL_LOGS_KEY: &str = "_telemetry_logs"; const INTERNAL_METRICS_KEY: &str = "_telemetry_metrics"; const VECTOR_SINK_KEY: &str = "_telemetry_out"; -// The metrics event to monitor for before shutting down a telemetry collector. -const INTERNAL_METRICS_SHUTDOWN_EVENT: &str = "component_received_events_total"; +const SHUTDOWN_TICKS: u8 = 3; /// Telemetry collector for a component under validation. pub struct Telemetry { listen_addr: GrpcAddress, service: VectorServer, - rx: mpsc::Receiver, + rx: mpsc::Receiver>, } impl Telemetry { @@ -98,47 +97,49 @@ impl Telemetry { select! { _ = telemetry_shutdown_handle.wait() => { // After we receive the shutdown signal, we need to wait - // for two event emissions from the internal_metrics + // for two batches of event emissions from the internal_metrics // source. This is to ensure that we've received all the // events from the components that we're testing. // // We need exactly two because the internal_metrics // source does not emit component events until after the // component_received_events_total metric has been - // emitted. Thus, two events ensure that all component + // emitted. Thus, two batches ensure that all component // events have been emitted. debug!("Telemetry: waiting for final internal_metrics events before shutting down."); - let mut events_seen = 0; - let current_time = chrono::Utc::now(); + let mut batches_received = 0; + + let timeout = tokio::time::sleep(Duration::from_secs(5)); + tokio::pin!(timeout); loop { - match &rx.recv().await { - None => break 'outer, - Some(telemetry_event) => { - telemetry_events.push(telemetry_event.clone()); - if let Event::Metric(metric) = telemetry_event { - if let Some(tags) = metric.tags() { - if metric.name() == INTERNAL_METRICS_SHUTDOWN_EVENT && - tags.get("component_name") == Some(INTERNAL_LOGS_KEY) && - metric.data().timestamp().unwrap() > ¤t_time { - debug!("Telemetry: processed one component_received_events_total event."); - - events_seen += 1; - if events_seen == 2 { - break 'outer; - } + select! { + d = rx.recv() => { + match d { + None => break, + Some(telemetry_event_batch) => { + telemetry_events.extend(telemetry_event_batch); + debug!("Telemetry: processed one batch of internal_metrics."); + batches_received += 1; + if batches_received == SHUTDOWN_TICKS { + break; } } } - } + }, + _ = &mut timeout => break, } } + if batches_received != SHUTDOWN_TICKS { + panic!("Did not receive {SHUTDOWN_TICKS} events while waiting for shutdown! Only received {batches_received}!"); + } + break 'outer; }, maybe_telemetry_event = rx.recv() => match maybe_telemetry_event { None => break, - Some(telemetry_event) => telemetry_events.push(telemetry_event), + Some(telemetry_event_batch) => telemetry_events.extend(telemetry_event_batch), }, } } diff --git a/src/components/validation/validators/component_spec/mod.rs b/src/components/validation/validators/component_spec/mod.rs index e566403d95bf9..36aa2d4452b05 100644 --- a/src/components/validation/validators/component_spec/mod.rs +++ b/src/components/validation/validators/component_spec/mod.rs @@ -2,13 +2,11 @@ mod sources; use vector_core::event::{Event, Metric}; -use crate::components::validation::{ - ComponentType, TestCaseExpectation, TestEvent, ValidationConfiguration, -}; +use crate::components::validation::{ComponentType, RunnerMetrics, TestCaseExpectation, TestEvent}; use super::Validator; -use self::sources::{validate_sources, SourceMetrics}; +use self::sources::{validate_sources, SourceMetricType}; /// Validates that the component meets the requirements of the [Component Specification][component_spec]. /// @@ -28,12 +26,12 @@ impl Validator for ComponentSpecValidator { fn check_validation( &self, - configuration: ValidationConfiguration, component_type: ComponentType, expectation: TestCaseExpectation, inputs: &[TestEvent], outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { for input in inputs { debug!("Validator observed input event: {:?}", input); @@ -84,13 +82,7 @@ impl Validator for ComponentSpecValidator { format!("received {} telemetry events", telemetry_events.len()), ]; - let out = validate_telemetry( - configuration, - component_type, - inputs, - outputs, - telemetry_events, - )?; + let out = validate_telemetry(component_type, telemetry_events, runner_metrics)?; run_out.extend(out); Ok(run_out) @@ -98,18 +90,16 @@ impl Validator for ComponentSpecValidator { } fn validate_telemetry( - configuration: ValidationConfiguration, component_type: ComponentType, - inputs: &[TestEvent], - outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { let mut out: Vec = Vec::new(); let mut errs: Vec = Vec::new(); match component_type { ComponentType::Source => { - let result = validate_sources(&configuration, inputs, outputs, telemetry_events); + let result = validate_sources(telemetry_events, runner_metrics); match result { Ok(o) => out.extend(o), Err(e) => errs.extend(e), @@ -128,9 +118,9 @@ fn validate_telemetry( fn filter_events_by_metric_and_component<'a>( telemetry_events: &'a [Event], - metric: SourceMetrics, + metric: &SourceMetricType, component_name: &'a str, -) -> Result, Vec> { +) -> Vec<&'a Metric> { let metrics: Vec<&Metric> = telemetry_events .iter() .flat_map(|e| { @@ -155,9 +145,5 @@ fn filter_events_by_metric_and_component<'a>( debug!("{}: {} metrics found.", metric.to_string(), metrics.len(),); - if metrics.is_empty() { - return Err(vec![format!("{}: no metrics were emitted.", metric)]); - } - - Ok(metrics) + metrics } diff --git a/src/components/validation/validators/component_spec/sources.rs b/src/components/validation/validators/component_spec/sources.rs index c25b217a399e4..6c1a3dca88399 100644 --- a/src/components/validation/validators/component_spec/sources.rs +++ b/src/components/validation/validators/component_spec/sources.rs @@ -1,17 +1,14 @@ use std::fmt::{Display, Formatter}; -use bytes::BytesMut; -use vector_common::json_size::JsonSize; use vector_core::event::{Event, MetricKind}; -use vector_core::EstimatedJsonEncodedSizeOf; -use crate::components::validation::{encode_test_event, TestEvent, ValidationConfiguration}; +use crate::components::validation::RunnerMetrics; use super::filter_events_by_metric_and_component; const TEST_SOURCE_NAME: &str = "test_source"; -pub enum SourceMetrics { +pub enum SourceMetricType { EventsReceived, EventsReceivedBytes, ReceivedBytesTotal, @@ -19,23 +16,27 @@ pub enum SourceMetrics { SentEventBytesTotal, } -impl SourceMetrics { +impl SourceMetricType { const fn name(&self) -> &'static str { match self { - SourceMetrics::EventsReceived => "component_received_events_total", - SourceMetrics::EventsReceivedBytes => "component_received_event_bytes_total", - SourceMetrics::ReceivedBytesTotal => "component_received_bytes_total", - SourceMetrics::SentEventsTotal => "component_sent_events_total", - SourceMetrics::SentEventBytesTotal => "component_sent_event_bytes_total", + SourceMetricType::EventsReceived => "component_received_events_total", + SourceMetricType::EventsReceivedBytes => "component_received_event_bytes_total", + SourceMetricType::ReceivedBytesTotal => "component_received_bytes_total", + SourceMetricType::SentEventsTotal => "component_sent_events_total", + SourceMetricType::SentEventBytesTotal => "component_sent_event_bytes_total", } } } +impl Display for SourceMetricType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.name()) + } +} + pub fn validate_sources( - configuration: &ValidationConfiguration, - inputs: &[TestEvent], - outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { let mut out: Vec = Vec::new(); let mut errs: Vec = Vec::new(); @@ -49,7 +50,7 @@ pub fn validate_sources( ]; for v in validations.iter() { - match v(configuration, inputs, outputs, telemetry_events) { + match v(telemetry_events, runner_metrics) { Err(e) => errs.extend(e), Ok(m) => out.extend(m), } @@ -62,63 +63,54 @@ pub fn validate_sources( } } -impl Display for SourceMetrics { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.name()) - } -} +fn sum_counters( + metric_name: &SourceMetricType, + metrics: &[&vector_core::event::Metric], +) -> Result> { + let mut sum: f64 = 0.0; + let mut errs = Vec::new(); -fn validate_component_received_events_total( - _configuration: &ValidationConfiguration, - inputs: &[TestEvent], - _outputs: &[Event], - telemetry_events: &[Event], -) -> Result, Vec> { - let mut errs: Vec = Vec::new(); - - let metrics = filter_events_by_metric_and_component( - telemetry_events, - SourceMetrics::EventsReceived, - TEST_SOURCE_NAME, - )?; - - let mut events = 0; for m in metrics { match m.value() { vector_core::event::MetricValue::Counter { value } => { if let MetricKind::Absolute = m.data().kind { - events = *value as i32 + sum = *value; } else { - events += *value as i32 + sum += *value; } } - _ => errs.push(format!( - "{}: metric value is not a counter", - SourceMetrics::EventsReceived, - )), + _ => errs.push(format!("{}: metric value is not a counter", metric_name,)), } } - let expected_events = inputs.iter().fold(0, |acc, i| { - if let TestEvent::Passthrough(_) = i { - return acc + 1; - } - acc - }); + if errs.is_empty() { + Ok(sum as u64) + } else { + Err(errs) + } +} + +fn validate_events_total( + telemetry_events: &[Event], + metric_type: &SourceMetricType, + expected_events: u64, +) -> Result, Vec> { + let mut errs: Vec = Vec::new(); + + let metrics = + filter_events_by_metric_and_component(telemetry_events, metric_type, TEST_SOURCE_NAME); + + let actual_events = sum_counters(metric_type, &metrics)?; debug!( "{}: {} events, {} expected events.", - SourceMetrics::EventsReceived, - events, - expected_events, + metric_type, actual_events, expected_events, ); - if events != expected_events { + if actual_events != expected_events { errs.push(format!( "{}: expected {} events, but received {}", - SourceMetrics::EventsReceived, - expected_events, - events + metric_type, expected_events, actual_events )); } @@ -126,66 +118,30 @@ fn validate_component_received_events_total( return Err(errs); } - Ok(vec![format!( - "{}: {}", - SourceMetrics::EventsReceived, - events, - )]) + Ok(vec![format!("{}: {}", metric_type, actual_events)]) } -fn validate_component_received_event_bytes_total( - _configuration: &ValidationConfiguration, - inputs: &[TestEvent], - _outputs: &[Event], +fn validate_bytes_total( telemetry_events: &[Event], + metric_type: &SourceMetricType, + expected_bytes: u64, ) -> Result, Vec> { let mut errs: Vec = Vec::new(); - let metrics = filter_events_by_metric_and_component( - telemetry_events, - SourceMetrics::EventsReceivedBytes, - TEST_SOURCE_NAME, - )?; + let metrics = + filter_events_by_metric_and_component(telemetry_events, metric_type, TEST_SOURCE_NAME); - let mut metric_bytes: f64 = 0.0; - for m in metrics { - match m.value() { - vector_core::event::MetricValue::Counter { value } => { - if let MetricKind::Absolute = m.data().kind { - metric_bytes = *value - } else { - metric_bytes += value - } - } - _ => errs.push(format!( - "{}: metric value is not a counter", - SourceMetrics::EventsReceivedBytes, - )), - } - } - - let expected_bytes = inputs.iter().fold(JsonSize::new(0), |acc, i| { - if let TestEvent::Passthrough(_) = i { - let size = vec![i.clone().into_event()].estimated_json_encoded_size_of(); - return acc + size; - } - - acc - }); + let actual_bytes = sum_counters(metric_type, &metrics)?; debug!( "{}: {} bytes, {} expected bytes.", - SourceMetrics::EventsReceivedBytes, - metric_bytes, - expected_bytes, + metric_type, actual_bytes, expected_bytes, ); - if JsonSize::new(metric_bytes as usize) != expected_bytes { + if actual_bytes != expected_bytes { errs.push(format!( "{}: expected {} bytes, but received {}", - SourceMetrics::EventsReceivedBytes, - expected_bytes, - metric_bytes + metric_type, expected_bytes, actual_bytes )); } @@ -193,209 +149,80 @@ fn validate_component_received_event_bytes_total( return Err(errs); } - Ok(vec![format!( - "{}: {}", - SourceMetrics::EventsReceivedBytes, - metric_bytes, - )]) + Ok(vec![format!("{}: {}", metric_type, actual_bytes)]) } -fn validate_component_received_bytes_total( - configuration: &ValidationConfiguration, - inputs: &[TestEvent], - _outputs: &[Event], +fn validate_component_received_events_total( telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { - let mut errs: Vec = Vec::new(); + // The reciprocal metric for events received is events sent, + // so the expected value is what the input runner sent. + let expected_events = runner_metrics.sent_events_total; - let metrics = filter_events_by_metric_and_component( + validate_events_total( telemetry_events, - SourceMetrics::ReceivedBytesTotal, - TEST_SOURCE_NAME, - )?; - - let mut metric_bytes: f64 = 0.0; - for m in metrics { - match m.value() { - vector_core::event::MetricValue::Counter { value } => { - if let MetricKind::Absolute = m.data().kind { - metric_bytes = *value - } else { - metric_bytes += value - } - } - _ => errs.push(format!( - "{}: metric value is not a counter", - SourceMetrics::ReceivedBytesTotal, - )), - } - } + &SourceMetricType::EventsReceived, + expected_events, + ) +} - let mut expected_bytes = 0; - if let Some(c) = &configuration.external_resource { - let mut encoder = c.codec.into_encoder(); - for i in inputs { - let mut buffer = BytesMut::new(); - encode_test_event(&mut encoder, &mut buffer, i.clone()); - expected_bytes += buffer.len() - } - } +fn validate_component_received_event_bytes_total( + telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, +) -> Result, Vec> { + // The reciprocal metric for received_event_bytes is sent_event_bytes, + // so the expected value is what the input runner sent. + let expected_bytes = runner_metrics.sent_event_bytes_total; - debug!( - "{}: {} bytes, expected at least {} bytes.", - SourceMetrics::ReceivedBytesTotal, - metric_bytes, + validate_bytes_total( + telemetry_events, + &SourceMetricType::EventsReceivedBytes, expected_bytes, - ); - - // We'll just establish a lower bound because we can't guarantee that the - // source will receive an exact number of bytes, since we can't synchronize - // with its internal logic. For example, some sources push or pull metrics - // on a schedule (http_client). - if metric_bytes < expected_bytes as f64 { - errs.push(format!( - "{}: expected at least {} bytes, but received {}", - SourceMetrics::ReceivedBytesTotal, - expected_bytes, - metric_bytes - )); - } + ) +} - if !errs.is_empty() { - return Err(errs); - } +fn validate_component_received_bytes_total( + telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, +) -> Result, Vec> { + // The reciprocal metric for received_bytes is sent_bytes, + // so the expected value is what the input runner sent. + let expected_bytes = runner_metrics.sent_bytes_total; - Ok(vec![format!( - "{}: {}", - SourceMetrics::ReceivedBytesTotal, - metric_bytes, - )]) + validate_bytes_total( + telemetry_events, + &SourceMetricType::ReceivedBytesTotal, + expected_bytes, + ) } fn validate_component_sent_events_total( - _configuration: &ValidationConfiguration, - inputs: &[TestEvent], - _outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { - let mut errs: Vec = Vec::new(); + // The reciprocal metric for events sent is events received, + // so the expected value is what the output runner received. + let expected_events = runner_metrics.received_events_total; - let metrics = filter_events_by_metric_and_component( + validate_events_total( telemetry_events, - SourceMetrics::SentEventsTotal, - TEST_SOURCE_NAME, - )?; - - let mut events = 0; - for m in metrics { - match m.value() { - vector_core::event::MetricValue::Counter { value } => { - if let MetricKind::Absolute = m.data().kind { - events = *value as i32 - } else { - events += *value as i32 - } - } - _ => errs.push(format!( - "{}: metric value is not a counter", - SourceMetrics::SentEventsTotal, - )), - } - } - - let expected_events = inputs.iter().fold(0, |acc, i| { - if let TestEvent::Passthrough(_) = i { - return acc + 1; - } - acc - }); - - debug!( - "{}: {} events, {} expected events.", - SourceMetrics::SentEventsTotal, - events, + &SourceMetricType::SentEventsTotal, expected_events, - ); - - if events != expected_events { - errs.push(format!( - "{}: expected {} events, but received {}", - SourceMetrics::SentEventsTotal, - inputs.len(), - events - )); - } - - if !errs.is_empty() { - return Err(errs); - } - - Ok(vec![format!( - "{}: {}", - SourceMetrics::SentEventsTotal, - events, - )]) + ) } fn validate_component_sent_event_bytes_total( - _configuration: &ValidationConfiguration, - _inputs: &[TestEvent], - outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { - let mut errs: Vec = Vec::new(); + // The reciprocal metric for sent_event_bytes is received_event_bytes, + // so the expected value is what the output runner received. + let expected_bytes = runner_metrics.received_event_bytes_total; - let metrics = filter_events_by_metric_and_component( + validate_bytes_total( telemetry_events, - SourceMetrics::SentEventBytesTotal, - TEST_SOURCE_NAME, - )?; - - let mut metric_bytes: f64 = 0.0; - for m in metrics { - match m.value() { - vector_core::event::MetricValue::Counter { value } => { - if let MetricKind::Absolute = m.data().kind { - metric_bytes = *value - } else { - metric_bytes += value - } - } - _ => errs.push(format!( - "{}: metric value is not a counter", - SourceMetrics::SentEventBytesTotal, - )), - } - } - - let mut expected_bytes = JsonSize::zero(); - for e in outputs { - expected_bytes += vec![e].estimated_json_encoded_size_of(); - } - - debug!( - "{}: {} bytes, {} expected bytes.", - SourceMetrics::SentEventBytesTotal, - metric_bytes, + &SourceMetricType::SentEventBytesTotal, expected_bytes, - ); - - if JsonSize::new(metric_bytes as usize) != expected_bytes { - errs.push(format!( - "{}: expected {} bytes, but received {}.", - SourceMetrics::SentEventBytesTotal, - expected_bytes, - metric_bytes - )); - } - - if !errs.is_empty() { - return Err(errs); - } - - Ok(vec![format!( - "{}: {}", - SourceMetrics::SentEventBytesTotal, - metric_bytes, - )]) + ) } diff --git a/src/components/validation/validators/mod.rs b/src/components/validation/validators/mod.rs index 8cb4c8945b16b..e563488d7f416 100644 --- a/src/components/validation/validators/mod.rs +++ b/src/components/validation/validators/mod.rs @@ -1,9 +1,10 @@ mod component_spec; + pub use self::component_spec::ComponentSpecValidator; use vector_core::event::Event; -use super::{ComponentType, TestCaseExpectation, TestEvent, ValidationConfiguration}; +use super::{ComponentType, RunnerMetrics, TestCaseExpectation, TestEvent}; /// A component validator. /// @@ -19,12 +20,12 @@ pub trait Validator { /// provided as well. fn check_validation( &self, - configuration: ValidationConfiguration, component_type: ComponentType, expectation: TestCaseExpectation, inputs: &[TestEvent], outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec>; }