diff --git a/src/components/validation/mod.rs b/src/components/validation/mod.rs index 7457cd6548ad9..90723ff416a50 100644 --- a/src/components/validation/mod.rs +++ b/src/components/validation/mod.rs @@ -170,6 +170,19 @@ macro_rules! register_validatable_component { }; } +/// Input and Output runners populate this structure as they send and receive events. +/// The structure is passed into the validator to use as the expected values for the +/// metrics that the components under test actually output. +#[derive(Default)] +pub struct RunnerMetrics { + pub received_events_total: u64, + pub received_event_bytes_total: u64, + pub received_bytes_total: u64, + pub sent_bytes_total: u64, // a reciprocal for received_bytes_total + pub sent_event_bytes_total: u64, + pub sent_events_total: u64, +} + #[cfg(all(test, feature = "component-validation-tests"))] mod tests { use std::{ diff --git a/src/components/validation/resources/event.rs b/src/components/validation/resources/event.rs index 21c38e58bfad5..1f91acfbe660d 100644 --- a/src/components/validation/resources/event.rs +++ b/src/components/validation/resources/event.rs @@ -1,10 +1,20 @@ +use bytes::BytesMut; use serde::Deserialize; +use snafu::Snafu; +use tokio_util::codec::Encoder as _; + +use crate::codecs::Encoder; +use codecs::{ + encoding, JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues, + NewlineDelimitedEncoder, +}; use vector_core::event::{Event, LogEvent}; -/// An event used in a test case. +/// A test case event for deserialization from yaml file. +/// This is an intermediary step to TestEvent. #[derive(Clone, Debug, Deserialize)] #[serde(untagged)] -pub enum TestEvent { +pub enum RawTestEvent { /// The event is used, as-is, without modification. Passthrough(EventData), @@ -20,15 +30,6 @@ pub enum TestEvent { Modified { modified: bool, event: EventData }, } -impl TestEvent { - pub fn into_event(self) -> Event { - match self { - Self::Passthrough(event) => event.into_event(), - Self::Modified { event, .. } => event.into_event(), - } - } -} - #[derive(Clone, Debug, Deserialize)] #[serde(untagged)] pub enum EventData { @@ -44,3 +45,91 @@ impl EventData { } } } + +/// An event used in a test case. +/// It is important to have created the event with all fields, immediately after deserializing from the +/// test case definition yaml file. This ensures that the event data we are using in the expected/actual +/// metrics collection is based on the same event. Namely, one issue that can arise from creating the event +/// from the event data twice (once for the expected and once for actual), it can result in a timestamp in +/// the event which may or may not have the same millisecond precision as it's counterpart. +#[derive(Clone, Debug, Deserialize)] +#[serde(from = "RawTestEvent")] +#[serde(untagged)] +pub enum TestEvent { + /// The event is used, as-is, without modification. + Passthrough(Event), + + /// The event is potentially modified by the external resource. + /// + /// The modification made is dependent on the external resource, but this mode is made available + /// for when a test case wants to exercise the failure path, but cannot cause a failure simply + /// by constructing the event in a certain way i.e. adding an invalid field, or removing a + /// required field, or using an invalid field value, and so on. + /// + /// For transforms and sinks, generally, the only way to cause an error is if the event itself + /// is malformed in some way, which can be achieved without this test event variant. + Modified { modified: bool, event: Event }, +} + +impl TestEvent { + #[allow(clippy::missing_const_for_fn)] // const cannot run destructor + pub fn into_event(self) -> Event { + match self { + Self::Passthrough(event) => event, + Self::Modified { event, .. } => event, + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Snafu)] +pub enum RawTestEventParseError {} + +impl From for TestEvent { + fn from(other: RawTestEvent) -> Self { + match other { + RawTestEvent::Passthrough(event_data) => { + TestEvent::Passthrough(event_data.into_event()) + } + RawTestEvent::Modified { modified, event } => TestEvent::Modified { + modified, + event: event.into_event(), + }, + } + } +} + +pub fn encode_test_event( + encoder: &mut Encoder, + buf: &mut BytesMut, + event: TestEvent, +) { + match event { + TestEvent::Passthrough(event) => { + // Encode the event normally. + encoder + .encode(event, buf) + .expect("should not fail to encode input event"); + } + TestEvent::Modified { event, .. } => { + // This is a little fragile, but we check what serializer this encoder uses, and based + // on `Serializer::supports_json`, we choose an opposing codec. For example, if the + // encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise + // versa. + let mut alt_encoder = if encoder.serializer().supports_json() { + Encoder::::new( + LengthDelimitedEncoder::new().into(), + LogfmtSerializer::new().into(), + ) + } else { + Encoder::::new( + NewlineDelimitedEncoder::new().into(), + JsonSerializer::new(MetricTagValues::default()).into(), + ) + }; + + alt_encoder + .encode(event, buf) + .expect("should not fail to encode input event"); + } + } +} diff --git a/src/components/validation/resources/http.rs b/src/components/validation/resources/http.rs index 8fa066b6f3f4a..44e2b7ba7579b 100644 --- a/src/components/validation/resources/http.rs +++ b/src/components/validation/resources/http.rs @@ -1,5 +1,6 @@ use std::{ collections::VecDeque, + future::Future, net::{IpAddr, SocketAddr}, str::FromStr, sync::Arc, @@ -11,26 +12,18 @@ use axum::{ Router, }; use bytes::BytesMut; -use codecs::{ - encoding, JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues, - NewlineDelimitedEncoder, -}; use http::{Method, Request, StatusCode, Uri}; use hyper::{Body, Client, Server}; -use std::future::Future; use tokio::{ select, sync::{mpsc, oneshot, Mutex, Notify}, }; -use tokio_util::codec::{Decoder, Encoder as _}; -use vector_core::event::Event; +use tokio_util::codec::Decoder; -use crate::{ - codecs::Encoder, - components::validation::sync::{Configuring, TaskCoordinator}, -}; +use crate::components::validation::sync::{Configuring, TaskCoordinator}; +use vector_core::event::Event; -use super::{ResourceCodec, ResourceDirection, TestEvent}; +use super::{encode_test_event, ResourceCodec, ResourceDirection, TestEvent}; /// An HTTP resource. #[derive(Clone)] @@ -67,7 +60,7 @@ impl HttpResourceConfig { self, direction: ResourceDirection, codec: ResourceCodec, - output_tx: mpsc::Sender, + output_tx: mpsc::Sender>, task_coordinator: &TaskCoordinator, ) { match direction { @@ -230,7 +223,7 @@ fn spawn_input_http_client( fn spawn_output_http_server( config: HttpResourceConfig, codec: ResourceCodec, - output_tx: mpsc::Sender, + output_tx: mpsc::Sender>, task_coordinator: &TaskCoordinator, ) { // This HTTP server will wait for events to be sent by a sink, and collect them and send them on @@ -252,12 +245,10 @@ fn spawn_output_http_server( loop { match decoder.decode_eof(&mut body) { Ok(Some((events, _byte_size))) => { - for event in events { - output_tx - .send(event) - .await - .expect("should not fail to send output event"); - } + output_tx + .send(events.to_vec()) + .await + .expect("should not fail to send output event"); } Ok(None) => return StatusCode::OK.into_response(), Err(_) => return StatusCode::INTERNAL_SERVER_ERROR.into_response(), @@ -290,7 +281,7 @@ fn spawn_output_http_server( fn spawn_output_http_client( _config: HttpResourceConfig, _codec: ResourceCodec, - _output_tx: mpsc::Sender, + _output_tx: mpsc::Sender>, _task_coordinator: &TaskCoordinator, ) { // TODO: The `prometheus_exporter` sink is the only sink that exposes an HTTP server which must be @@ -400,39 +391,3 @@ fn socketaddr_from_uri(uri: &Uri) -> SocketAddr { SocketAddr::from((uri_host, uri_port)) } - -pub fn encode_test_event( - encoder: &mut Encoder, - buf: &mut BytesMut, - event: TestEvent, -) { - match event { - TestEvent::Passthrough(event) => { - // Encode the event normally. - encoder - .encode(event.into_event(), buf) - .expect("should not fail to encode input event"); - } - TestEvent::Modified { event, .. } => { - // This is a little fragile, but we check what serializer this encoder uses, and based - // on `Serializer::supports_json`, we choose an opposing codec. For example, if the - // encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise - // versa. - let mut alt_encoder = if encoder.serializer().supports_json() { - Encoder::::new( - LengthDelimitedEncoder::new().into(), - LogfmtSerializer::new().into(), - ) - } else { - Encoder::::new( - NewlineDelimitedEncoder::new().into(), - JsonSerializer::new(MetricTagValues::default()).into(), - ) - }; - - alt_encoder - .encode(event.into_event(), buf) - .expect("should not fail to encode input event"); - } - } -} diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index 2b9fc3c542ccb..a22d6fc324dbd 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -13,8 +13,8 @@ use vector_core::{config::DataType, event::Event}; use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming}; -pub use self::event::{EventData, TestEvent}; -pub use self::http::{encode_test_event, HttpResourceConfig}; +pub use self::event::{encode_test_event, TestEvent}; +pub use self::http::HttpResourceConfig; use super::sync::{Configuring, TaskCoordinator}; @@ -308,7 +308,7 @@ impl ExternalResource { /// Spawns this resource for use as an output for a sink. pub fn spawn_as_output( self, - output_tx: mpsc::Sender, + output_tx: mpsc::Sender>, task_coordinator: &TaskCoordinator, ) { match self.definition { diff --git a/src/components/validation/runner/io.rs b/src/components/validation/runner/io.rs index c454ba433c847..55e4fca1eaecf 100644 --- a/src/components/validation/runner/io.rs +++ b/src/components/validation/runner/io.rs @@ -27,11 +27,11 @@ use crate::{ #[derive(Clone)] pub struct EventForwardService { - tx: mpsc::Sender, + tx: mpsc::Sender>, } -impl From> for EventForwardService { - fn from(tx: mpsc::Sender) -> Self { +impl From>> for EventForwardService { + fn from(tx: mpsc::Sender>) -> Self { Self { tx } } } @@ -42,14 +42,17 @@ impl VectorService for EventForwardService { &self, request: tonic::Request, ) -> Result, Status> { - let events = request.into_inner().events.into_iter().map(Event::from); - - for event in events { - self.tx - .send(event) - .await - .expect("event forward rx should not close first"); - } + let events = request + .into_inner() + .events + .into_iter() + .map(Event::from) + .collect(); + + self.tx + .send(events) + .await + .expect("event forward rx should not close first"); Ok(tonic::Response::new(PushEventsResponse {})) } @@ -74,7 +77,7 @@ pub struct InputEdge { pub struct OutputEdge { listen_addr: GrpcAddress, service: VectorServer, - rx: mpsc::Receiver, + rx: mpsc::Receiver>, } impl InputEdge { @@ -129,7 +132,7 @@ impl OutputEdge { pub fn spawn_output_server( self, task_coordinator: &TaskCoordinator, - ) -> mpsc::Receiver { + ) -> mpsc::Receiver> { spawn_grpc_server(self.listen_addr, self.service, task_coordinator); self.rx } @@ -184,5 +187,5 @@ pub fn spawn_grpc_server( pub struct ControlledEdges { pub input: Option>, - pub output: Option>, + pub output: Option>>, } diff --git a/src/components/validation/runner/mod.rs b/src/components/validation/runner/mod.rs index 2c2a066b17806..b4bcd72de75bf 100644 --- a/src/components/validation/runner/mod.rs +++ b/src/components/validation/runner/mod.rs @@ -2,18 +2,34 @@ pub mod config; mod io; mod telemetry; -use std::{collections::HashMap, path::PathBuf, time::Duration}; +use std::{ + collections::HashMap, + path::PathBuf, + sync::{Arc, Mutex}, + time::Duration, +}; + +use bytes::BytesMut; +use tokio::{ + runtime::Builder, + select, + sync::mpsc::{self, Receiver, Sender}, + task::JoinHandle, +}; +use tokio_util::codec::Encoder as _; -use tokio::{runtime::Builder, select, sync::mpsc}; -use vector_core::event::Event; +use codecs::encoding; +use vector_core::{event::Event, EstimatedJsonEncodedSizeOf}; use crate::{ - components::validation::TestCase, + codecs::Encoder, + components::validation::{RunnerMetrics, TestCase}, config::{ConfigBuilder, ConfigDiff}, topology, }; use super::{ + encode_test_event, sync::{Configuring, TaskCoordinator}, ComponentType, TestCaseExpectation, TestEvent, ValidationConfiguration, Validator, }; @@ -73,7 +89,7 @@ pub enum RunnerOutput { /// external resource pulls output events from the sink. /// /// Only sinks have external inputs. - External(mpsc::Receiver), + External(mpsc::Receiver>), /// The component uses a "controlled" edge for its output. /// @@ -93,8 +109,8 @@ impl RunnerOutput { /// this function will panic, as one or the other must be provided. pub fn into_receiver( self, - controlled_edge: Option>, - ) -> mpsc::Receiver { + controlled_edge: Option>>, + ) -> mpsc::Receiver> { match (self, controlled_edge) { (Self::External(_), Some(_)) => panic!("Runner output declared as external resource, but controlled output edge was also specified."), (Self::Controlled, None) => panic!("Runner output declared as controlled, but no controlled output edge was specified."), @@ -216,6 +232,11 @@ impl Runner { debug!("Component topology configuration built and telemetry collector spawned."); + // Create the data structure that the input and output runners will use to store + // their received/sent metrics. This is then shared with the Validator for comparison + // against the actual metrics output by the component under test. + let runner_metrics = Arc::new(Mutex::new(RunnerMetrics::default())); + // After that, we'll build the external resource necessary for this component, if any. // Once that's done, we build the input event/output event sender and receiver based on // whatever we spawned for an external resource. @@ -226,13 +247,13 @@ impl Runner { // For example, if we're validating a source, we would have added a filler sink for our // controlled output edge, which means we then need a server task listening for the // events sent by that sink. - let (runner_input, runner_output) = build_external_resource( + let (runner_input, runner_output, maybe_runner_encoder) = build_external_resource( &self.configuration, &input_task_coordinator, &output_task_coordinator, ); let input_tx = runner_input.into_sender(controlled_edges.input); - let mut output_rx = runner_output.into_receiver(controlled_edges.output); + let output_rx = runner_output.into_receiver(controlled_edges.output); debug!("External resource (if any) and controlled edges built and spawned."); // Now with any external resource spawned, as well as any tasks for handling controlled @@ -271,25 +292,20 @@ impl Runner { // like the aforementioned unit tests, switch to any improved mechanism we come up with // in the future to make these tests more deterministic and waste less time waiting // around if we can avoid it. - tokio::time::sleep(Duration::from_secs(1)).await; - - let input_events = test_case.events.clone(); - let input_driver = tokio::spawn(async move { - for input_event in input_events { - input_tx - .send(input_event) - .await - .expect("input channel should not be closed"); - } - }); + tokio::time::sleep(Duration::from_secs(2)).await; - let output_driver = tokio::spawn(async move { - let mut output_events = Vec::new(); - while let Some(output_event) = output_rx.recv().await { - output_events.push(output_event); - } - output_events - }); + let input_driver = spawn_input_driver( + test_case.events.clone(), + input_tx, + &runner_metrics, + maybe_runner_encoder.as_ref().cloned(), + ); + + let output_driver = spawn_output_driver( + output_rx, + &runner_metrics, + maybe_runner_encoder.as_ref().cloned(), + ); // At this point, the component topology is running, and all input/output/telemetry // tasks are running as well. Our input driver should be sending (or will have already @@ -334,12 +350,12 @@ impl Runner { .values() .map(|validator| { validator.check_validation( - self.configuration.clone(), component_type, expectation, &input_events, &output_events, &telemetry_events, + &runner_metrics.lock().unwrap(), ) }) .collect(); @@ -397,9 +413,12 @@ fn build_external_resource( configuration: &ValidationConfiguration, input_task_coordinator: &TaskCoordinator, output_task_coordinator: &TaskCoordinator, -) -> (RunnerInput, RunnerOutput) { +) -> (RunnerInput, RunnerOutput, Option>) { let component_type = configuration.component_type(); let maybe_external_resource = configuration.external_resource(); + let maybe_encoder = maybe_external_resource + .as_ref() + .map(|resource| resource.codec.into_encoder()); match component_type { ComponentType::Source => { // As an external resource for a source, we create a channel that the validation runner @@ -411,11 +430,15 @@ fn build_external_resource( maybe_external_resource.expect("a source must always have an external resource"); resource.spawn_as_input(rx, input_task_coordinator); - (RunnerInput::External(tx), RunnerOutput::Controlled) + ( + RunnerInput::External(tx), + RunnerOutput::Controlled, + maybe_encoder, + ) } ComponentType::Transform => { // Transforms have no external resources. - (RunnerInput::Controlled, RunnerOutput::Controlled) + (RunnerInput::Controlled, RunnerOutput::Controlled, None) } ComponentType::Sink => { // As an external resource for a sink, we create a channel that the validation runner @@ -427,7 +450,11 @@ fn build_external_resource( maybe_external_resource.expect("a sink must always have an external resource"); resource.spawn_as_output(tx, output_task_coordinator); - (RunnerInput::Controlled, RunnerOutput::External(rx)) + ( + RunnerInput::Controlled, + RunnerOutput::External(rx), + maybe_encoder, + ) } } } @@ -483,6 +510,86 @@ fn spawn_component_topology( }); } +fn spawn_input_driver( + input_events: Vec, + input_tx: Sender, + runner_metrics: &Arc>, + mut maybe_encoder: Option>, +) -> JoinHandle<()> { + let input_runner_metrics = Arc::clone(runner_metrics); + + tokio::spawn(async move { + for input_event in input_events { + input_tx + .send(input_event.clone()) + .await + .expect("input channel should not be closed"); + + // Update the runner metrics for the sent event. This will later + // be used in the Validators, as the "expected" case. + let mut input_runner_metrics = input_runner_metrics.lock().unwrap(); + + if let Some(encoder) = maybe_encoder.as_mut() { + let mut buffer = BytesMut::new(); + encode_test_event(encoder, &mut buffer, input_event.clone()); + + input_runner_metrics.sent_bytes_total += buffer.len() as u64; + } + + let (modified, event) = match input_event { + TestEvent::Passthrough(event) => (false, event), + TestEvent::Modified { modified, event } => (modified, event), + }; + + // account for failure case + if !modified { + input_runner_metrics.sent_events_total += 1; + + input_runner_metrics.sent_event_bytes_total += + vec![event].estimated_json_encoded_size_of().get() as u64; + } + } + }) +} + +fn spawn_output_driver( + mut output_rx: Receiver>, + runner_metrics: &Arc>, + maybe_encoder: Option>, +) -> JoinHandle> { + let output_runner_metrics = Arc::clone(runner_metrics); + + tokio::spawn(async move { + let mut output_events = Vec::new(); + while let Some(events) = output_rx.recv().await { + output_events.extend(events.clone()); + + // Update the runner metrics for the received event. This will later + // be used in the Validators, as the "expected" case. + let mut output_runner_metrics = output_runner_metrics.lock().unwrap(); + + for output_event in events { + output_runner_metrics.received_events_total += 1; + output_runner_metrics.received_event_bytes_total += vec![output_event.clone()] + .estimated_json_encoded_size_of() + .get() + as u64; + + if let Some(encoder) = maybe_encoder.as_ref() { + let mut buffer = BytesMut::new(); + encoder + .clone() + .encode(output_event, &mut buffer) + .expect("should not fail to encode output event"); + + output_runner_metrics.received_bytes_total += buffer.len() as u64; + } + } + } + output_events + }) +} + fn initialize_test_environment() { // Make sure our metrics recorder is installed and in test mode. This is necessary for // proper internal telemetry collect when running the component topology, even though it's diff --git a/src/components/validation/runner/telemetry.rs b/src/components/validation/runner/telemetry.rs index 9ef813838f1dc..76e8c29f62a9f 100644 --- a/src/components/validation/runner/telemetry.rs +++ b/src/components/validation/runner/telemetry.rs @@ -23,14 +23,13 @@ const INTERNAL_LOGS_KEY: &str = "_telemetry_logs"; const INTERNAL_METRICS_KEY: &str = "_telemetry_metrics"; const VECTOR_SINK_KEY: &str = "_telemetry_out"; -// The metrics event to monitor for before shutting down a telemetry collector. -const INTERNAL_METRICS_SHUTDOWN_EVENT: &str = "component_received_events_total"; +const SHUTDOWN_TICKS: u8 = 3; /// Telemetry collector for a component under validation. pub struct Telemetry { listen_addr: GrpcAddress, service: VectorServer, - rx: mpsc::Receiver, + rx: mpsc::Receiver>, } impl Telemetry { @@ -98,47 +97,49 @@ impl Telemetry { select! { _ = telemetry_shutdown_handle.wait() => { // After we receive the shutdown signal, we need to wait - // for two event emissions from the internal_metrics + // for two batches of event emissions from the internal_metrics // source. This is to ensure that we've received all the // events from the components that we're testing. // // We need exactly two because the internal_metrics // source does not emit component events until after the // component_received_events_total metric has been - // emitted. Thus, two events ensure that all component + // emitted. Thus, two batches ensure that all component // events have been emitted. debug!("Telemetry: waiting for final internal_metrics events before shutting down."); - let mut events_seen = 0; - let current_time = chrono::Utc::now(); + let mut batches_received = 0; + + let timeout = tokio::time::sleep(Duration::from_secs(5)); + tokio::pin!(timeout); loop { - match &rx.recv().await { - None => break 'outer, - Some(telemetry_event) => { - telemetry_events.push(telemetry_event.clone()); - if let Event::Metric(metric) = telemetry_event { - if let Some(tags) = metric.tags() { - if metric.name() == INTERNAL_METRICS_SHUTDOWN_EVENT && - tags.get("component_name") == Some(INTERNAL_LOGS_KEY) && - metric.data().timestamp().unwrap() > ¤t_time { - debug!("Telemetry: processed one component_received_events_total event."); - - events_seen += 1; - if events_seen == 2 { - break 'outer; - } + select! { + d = rx.recv() => { + match d { + None => break, + Some(telemetry_event_batch) => { + telemetry_events.extend(telemetry_event_batch); + debug!("Telemetry: processed one batch of internal_metrics."); + batches_received += 1; + if batches_received == SHUTDOWN_TICKS { + break; } } } - } + }, + _ = &mut timeout => break, } } + if batches_received != SHUTDOWN_TICKS { + panic!("Did not receive {SHUTDOWN_TICKS} events while waiting for shutdown! Only received {batches_received}!"); + } + break 'outer; }, maybe_telemetry_event = rx.recv() => match maybe_telemetry_event { None => break, - Some(telemetry_event) => telemetry_events.push(telemetry_event), + Some(telemetry_event_batch) => telemetry_events.extend(telemetry_event_batch), }, } } diff --git a/src/components/validation/validators/component_spec/mod.rs b/src/components/validation/validators/component_spec/mod.rs index e566403d95bf9..36aa2d4452b05 100644 --- a/src/components/validation/validators/component_spec/mod.rs +++ b/src/components/validation/validators/component_spec/mod.rs @@ -2,13 +2,11 @@ mod sources; use vector_core::event::{Event, Metric}; -use crate::components::validation::{ - ComponentType, TestCaseExpectation, TestEvent, ValidationConfiguration, -}; +use crate::components::validation::{ComponentType, RunnerMetrics, TestCaseExpectation, TestEvent}; use super::Validator; -use self::sources::{validate_sources, SourceMetrics}; +use self::sources::{validate_sources, SourceMetricType}; /// Validates that the component meets the requirements of the [Component Specification][component_spec]. /// @@ -28,12 +26,12 @@ impl Validator for ComponentSpecValidator { fn check_validation( &self, - configuration: ValidationConfiguration, component_type: ComponentType, expectation: TestCaseExpectation, inputs: &[TestEvent], outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { for input in inputs { debug!("Validator observed input event: {:?}", input); @@ -84,13 +82,7 @@ impl Validator for ComponentSpecValidator { format!("received {} telemetry events", telemetry_events.len()), ]; - let out = validate_telemetry( - configuration, - component_type, - inputs, - outputs, - telemetry_events, - )?; + let out = validate_telemetry(component_type, telemetry_events, runner_metrics)?; run_out.extend(out); Ok(run_out) @@ -98,18 +90,16 @@ impl Validator for ComponentSpecValidator { } fn validate_telemetry( - configuration: ValidationConfiguration, component_type: ComponentType, - inputs: &[TestEvent], - outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { let mut out: Vec = Vec::new(); let mut errs: Vec = Vec::new(); match component_type { ComponentType::Source => { - let result = validate_sources(&configuration, inputs, outputs, telemetry_events); + let result = validate_sources(telemetry_events, runner_metrics); match result { Ok(o) => out.extend(o), Err(e) => errs.extend(e), @@ -128,9 +118,9 @@ fn validate_telemetry( fn filter_events_by_metric_and_component<'a>( telemetry_events: &'a [Event], - metric: SourceMetrics, + metric: &SourceMetricType, component_name: &'a str, -) -> Result, Vec> { +) -> Vec<&'a Metric> { let metrics: Vec<&Metric> = telemetry_events .iter() .flat_map(|e| { @@ -155,9 +145,5 @@ fn filter_events_by_metric_and_component<'a>( debug!("{}: {} metrics found.", metric.to_string(), metrics.len(),); - if metrics.is_empty() { - return Err(vec![format!("{}: no metrics were emitted.", metric)]); - } - - Ok(metrics) + metrics } diff --git a/src/components/validation/validators/component_spec/sources.rs b/src/components/validation/validators/component_spec/sources.rs index c25b217a399e4..6c1a3dca88399 100644 --- a/src/components/validation/validators/component_spec/sources.rs +++ b/src/components/validation/validators/component_spec/sources.rs @@ -1,17 +1,14 @@ use std::fmt::{Display, Formatter}; -use bytes::BytesMut; -use vector_common::json_size::JsonSize; use vector_core::event::{Event, MetricKind}; -use vector_core::EstimatedJsonEncodedSizeOf; -use crate::components::validation::{encode_test_event, TestEvent, ValidationConfiguration}; +use crate::components::validation::RunnerMetrics; use super::filter_events_by_metric_and_component; const TEST_SOURCE_NAME: &str = "test_source"; -pub enum SourceMetrics { +pub enum SourceMetricType { EventsReceived, EventsReceivedBytes, ReceivedBytesTotal, @@ -19,23 +16,27 @@ pub enum SourceMetrics { SentEventBytesTotal, } -impl SourceMetrics { +impl SourceMetricType { const fn name(&self) -> &'static str { match self { - SourceMetrics::EventsReceived => "component_received_events_total", - SourceMetrics::EventsReceivedBytes => "component_received_event_bytes_total", - SourceMetrics::ReceivedBytesTotal => "component_received_bytes_total", - SourceMetrics::SentEventsTotal => "component_sent_events_total", - SourceMetrics::SentEventBytesTotal => "component_sent_event_bytes_total", + SourceMetricType::EventsReceived => "component_received_events_total", + SourceMetricType::EventsReceivedBytes => "component_received_event_bytes_total", + SourceMetricType::ReceivedBytesTotal => "component_received_bytes_total", + SourceMetricType::SentEventsTotal => "component_sent_events_total", + SourceMetricType::SentEventBytesTotal => "component_sent_event_bytes_total", } } } +impl Display for SourceMetricType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.name()) + } +} + pub fn validate_sources( - configuration: &ValidationConfiguration, - inputs: &[TestEvent], - outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { let mut out: Vec = Vec::new(); let mut errs: Vec = Vec::new(); @@ -49,7 +50,7 @@ pub fn validate_sources( ]; for v in validations.iter() { - match v(configuration, inputs, outputs, telemetry_events) { + match v(telemetry_events, runner_metrics) { Err(e) => errs.extend(e), Ok(m) => out.extend(m), } @@ -62,63 +63,54 @@ pub fn validate_sources( } } -impl Display for SourceMetrics { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.name()) - } -} +fn sum_counters( + metric_name: &SourceMetricType, + metrics: &[&vector_core::event::Metric], +) -> Result> { + let mut sum: f64 = 0.0; + let mut errs = Vec::new(); -fn validate_component_received_events_total( - _configuration: &ValidationConfiguration, - inputs: &[TestEvent], - _outputs: &[Event], - telemetry_events: &[Event], -) -> Result, Vec> { - let mut errs: Vec = Vec::new(); - - let metrics = filter_events_by_metric_and_component( - telemetry_events, - SourceMetrics::EventsReceived, - TEST_SOURCE_NAME, - )?; - - let mut events = 0; for m in metrics { match m.value() { vector_core::event::MetricValue::Counter { value } => { if let MetricKind::Absolute = m.data().kind { - events = *value as i32 + sum = *value; } else { - events += *value as i32 + sum += *value; } } - _ => errs.push(format!( - "{}: metric value is not a counter", - SourceMetrics::EventsReceived, - )), + _ => errs.push(format!("{}: metric value is not a counter", metric_name,)), } } - let expected_events = inputs.iter().fold(0, |acc, i| { - if let TestEvent::Passthrough(_) = i { - return acc + 1; - } - acc - }); + if errs.is_empty() { + Ok(sum as u64) + } else { + Err(errs) + } +} + +fn validate_events_total( + telemetry_events: &[Event], + metric_type: &SourceMetricType, + expected_events: u64, +) -> Result, Vec> { + let mut errs: Vec = Vec::new(); + + let metrics = + filter_events_by_metric_and_component(telemetry_events, metric_type, TEST_SOURCE_NAME); + + let actual_events = sum_counters(metric_type, &metrics)?; debug!( "{}: {} events, {} expected events.", - SourceMetrics::EventsReceived, - events, - expected_events, + metric_type, actual_events, expected_events, ); - if events != expected_events { + if actual_events != expected_events { errs.push(format!( "{}: expected {} events, but received {}", - SourceMetrics::EventsReceived, - expected_events, - events + metric_type, expected_events, actual_events )); } @@ -126,66 +118,30 @@ fn validate_component_received_events_total( return Err(errs); } - Ok(vec![format!( - "{}: {}", - SourceMetrics::EventsReceived, - events, - )]) + Ok(vec![format!("{}: {}", metric_type, actual_events)]) } -fn validate_component_received_event_bytes_total( - _configuration: &ValidationConfiguration, - inputs: &[TestEvent], - _outputs: &[Event], +fn validate_bytes_total( telemetry_events: &[Event], + metric_type: &SourceMetricType, + expected_bytes: u64, ) -> Result, Vec> { let mut errs: Vec = Vec::new(); - let metrics = filter_events_by_metric_and_component( - telemetry_events, - SourceMetrics::EventsReceivedBytes, - TEST_SOURCE_NAME, - )?; + let metrics = + filter_events_by_metric_and_component(telemetry_events, metric_type, TEST_SOURCE_NAME); - let mut metric_bytes: f64 = 0.0; - for m in metrics { - match m.value() { - vector_core::event::MetricValue::Counter { value } => { - if let MetricKind::Absolute = m.data().kind { - metric_bytes = *value - } else { - metric_bytes += value - } - } - _ => errs.push(format!( - "{}: metric value is not a counter", - SourceMetrics::EventsReceivedBytes, - )), - } - } - - let expected_bytes = inputs.iter().fold(JsonSize::new(0), |acc, i| { - if let TestEvent::Passthrough(_) = i { - let size = vec![i.clone().into_event()].estimated_json_encoded_size_of(); - return acc + size; - } - - acc - }); + let actual_bytes = sum_counters(metric_type, &metrics)?; debug!( "{}: {} bytes, {} expected bytes.", - SourceMetrics::EventsReceivedBytes, - metric_bytes, - expected_bytes, + metric_type, actual_bytes, expected_bytes, ); - if JsonSize::new(metric_bytes as usize) != expected_bytes { + if actual_bytes != expected_bytes { errs.push(format!( "{}: expected {} bytes, but received {}", - SourceMetrics::EventsReceivedBytes, - expected_bytes, - metric_bytes + metric_type, expected_bytes, actual_bytes )); } @@ -193,209 +149,80 @@ fn validate_component_received_event_bytes_total( return Err(errs); } - Ok(vec![format!( - "{}: {}", - SourceMetrics::EventsReceivedBytes, - metric_bytes, - )]) + Ok(vec![format!("{}: {}", metric_type, actual_bytes)]) } -fn validate_component_received_bytes_total( - configuration: &ValidationConfiguration, - inputs: &[TestEvent], - _outputs: &[Event], +fn validate_component_received_events_total( telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { - let mut errs: Vec = Vec::new(); + // The reciprocal metric for events received is events sent, + // so the expected value is what the input runner sent. + let expected_events = runner_metrics.sent_events_total; - let metrics = filter_events_by_metric_and_component( + validate_events_total( telemetry_events, - SourceMetrics::ReceivedBytesTotal, - TEST_SOURCE_NAME, - )?; - - let mut metric_bytes: f64 = 0.0; - for m in metrics { - match m.value() { - vector_core::event::MetricValue::Counter { value } => { - if let MetricKind::Absolute = m.data().kind { - metric_bytes = *value - } else { - metric_bytes += value - } - } - _ => errs.push(format!( - "{}: metric value is not a counter", - SourceMetrics::ReceivedBytesTotal, - )), - } - } + &SourceMetricType::EventsReceived, + expected_events, + ) +} - let mut expected_bytes = 0; - if let Some(c) = &configuration.external_resource { - let mut encoder = c.codec.into_encoder(); - for i in inputs { - let mut buffer = BytesMut::new(); - encode_test_event(&mut encoder, &mut buffer, i.clone()); - expected_bytes += buffer.len() - } - } +fn validate_component_received_event_bytes_total( + telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, +) -> Result, Vec> { + // The reciprocal metric for received_event_bytes is sent_event_bytes, + // so the expected value is what the input runner sent. + let expected_bytes = runner_metrics.sent_event_bytes_total; - debug!( - "{}: {} bytes, expected at least {} bytes.", - SourceMetrics::ReceivedBytesTotal, - metric_bytes, + validate_bytes_total( + telemetry_events, + &SourceMetricType::EventsReceivedBytes, expected_bytes, - ); - - // We'll just establish a lower bound because we can't guarantee that the - // source will receive an exact number of bytes, since we can't synchronize - // with its internal logic. For example, some sources push or pull metrics - // on a schedule (http_client). - if metric_bytes < expected_bytes as f64 { - errs.push(format!( - "{}: expected at least {} bytes, but received {}", - SourceMetrics::ReceivedBytesTotal, - expected_bytes, - metric_bytes - )); - } + ) +} - if !errs.is_empty() { - return Err(errs); - } +fn validate_component_received_bytes_total( + telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, +) -> Result, Vec> { + // The reciprocal metric for received_bytes is sent_bytes, + // so the expected value is what the input runner sent. + let expected_bytes = runner_metrics.sent_bytes_total; - Ok(vec![format!( - "{}: {}", - SourceMetrics::ReceivedBytesTotal, - metric_bytes, - )]) + validate_bytes_total( + telemetry_events, + &SourceMetricType::ReceivedBytesTotal, + expected_bytes, + ) } fn validate_component_sent_events_total( - _configuration: &ValidationConfiguration, - inputs: &[TestEvent], - _outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { - let mut errs: Vec = Vec::new(); + // The reciprocal metric for events sent is events received, + // so the expected value is what the output runner received. + let expected_events = runner_metrics.received_events_total; - let metrics = filter_events_by_metric_and_component( + validate_events_total( telemetry_events, - SourceMetrics::SentEventsTotal, - TEST_SOURCE_NAME, - )?; - - let mut events = 0; - for m in metrics { - match m.value() { - vector_core::event::MetricValue::Counter { value } => { - if let MetricKind::Absolute = m.data().kind { - events = *value as i32 - } else { - events += *value as i32 - } - } - _ => errs.push(format!( - "{}: metric value is not a counter", - SourceMetrics::SentEventsTotal, - )), - } - } - - let expected_events = inputs.iter().fold(0, |acc, i| { - if let TestEvent::Passthrough(_) = i { - return acc + 1; - } - acc - }); - - debug!( - "{}: {} events, {} expected events.", - SourceMetrics::SentEventsTotal, - events, + &SourceMetricType::SentEventsTotal, expected_events, - ); - - if events != expected_events { - errs.push(format!( - "{}: expected {} events, but received {}", - SourceMetrics::SentEventsTotal, - inputs.len(), - events - )); - } - - if !errs.is_empty() { - return Err(errs); - } - - Ok(vec![format!( - "{}: {}", - SourceMetrics::SentEventsTotal, - events, - )]) + ) } fn validate_component_sent_event_bytes_total( - _configuration: &ValidationConfiguration, - _inputs: &[TestEvent], - outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { - let mut errs: Vec = Vec::new(); + // The reciprocal metric for sent_event_bytes is received_event_bytes, + // so the expected value is what the output runner received. + let expected_bytes = runner_metrics.received_event_bytes_total; - let metrics = filter_events_by_metric_and_component( + validate_bytes_total( telemetry_events, - SourceMetrics::SentEventBytesTotal, - TEST_SOURCE_NAME, - )?; - - let mut metric_bytes: f64 = 0.0; - for m in metrics { - match m.value() { - vector_core::event::MetricValue::Counter { value } => { - if let MetricKind::Absolute = m.data().kind { - metric_bytes = *value - } else { - metric_bytes += value - } - } - _ => errs.push(format!( - "{}: metric value is not a counter", - SourceMetrics::SentEventBytesTotal, - )), - } - } - - let mut expected_bytes = JsonSize::zero(); - for e in outputs { - expected_bytes += vec![e].estimated_json_encoded_size_of(); - } - - debug!( - "{}: {} bytes, {} expected bytes.", - SourceMetrics::SentEventBytesTotal, - metric_bytes, + &SourceMetricType::SentEventBytesTotal, expected_bytes, - ); - - if JsonSize::new(metric_bytes as usize) != expected_bytes { - errs.push(format!( - "{}: expected {} bytes, but received {}.", - SourceMetrics::SentEventBytesTotal, - expected_bytes, - metric_bytes - )); - } - - if !errs.is_empty() { - return Err(errs); - } - - Ok(vec![format!( - "{}: {}", - SourceMetrics::SentEventBytesTotal, - metric_bytes, - )]) + ) } diff --git a/src/components/validation/validators/mod.rs b/src/components/validation/validators/mod.rs index 8cb4c8945b16b..e563488d7f416 100644 --- a/src/components/validation/validators/mod.rs +++ b/src/components/validation/validators/mod.rs @@ -1,9 +1,10 @@ mod component_spec; + pub use self::component_spec::ComponentSpecValidator; use vector_core::event::Event; -use super::{ComponentType, TestCaseExpectation, TestEvent, ValidationConfiguration}; +use super::{ComponentType, RunnerMetrics, TestCaseExpectation, TestEvent}; /// A component validator. /// @@ -19,12 +20,12 @@ pub trait Validator { /// provided as well. fn check_validation( &self, - configuration: ValidationConfiguration, component_type: ComponentType, expectation: TestCaseExpectation, inputs: &[TestEvent], outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec>; }