From 701b2823d1015488d103bb69b29f92bd24026da6 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 1 Aug 2023 17:59:30 -0400 Subject: [PATCH] chore: replace path tuples with actual target paths --- lib/codecs/src/decoding/format/bytes.rs | 9 +- lib/vector-core/src/event/log_event.rs | 24 ++--- src/sinks/clickhouse/integration_tests.rs | 10 +-- src/source_sender/mod.rs | 24 ++--- src/sources/amqp.rs | 10 +-- src/sources/aws_sqs/source.rs | 5 +- src/sources/datadog_agent/tests.rs | 42 ++------- src/sources/docker_logs/tests.rs | 20 +---- src/sources/exec/mod.rs | 87 ++++-------------- src/sources/file.rs | 28 ++---- src/sources/heroku_logs.rs | 53 +++-------- src/sources/http_client/integration_tests.rs | 4 +- src/sources/http_server.rs | 91 ++++--------------- src/sources/splunk_hec/mod.rs | 95 +++----------------- src/template.rs | 29 +++--- src/test_util/mock/transforms/basic.rs | 8 +- src/topology/test/mod.rs | 7 +- 17 files changed, 137 insertions(+), 409 deletions(-) diff --git a/lib/codecs/src/decoding/format/bytes.rs b/lib/codecs/src/decoding/format/bytes.rs index 1c4e7cbc6ca06..06a97b67c2950 100644 --- a/lib/codecs/src/decoding/format/bytes.rs +++ b/lib/codecs/src/decoding/format/bytes.rs @@ -89,10 +89,8 @@ impl Deserializer for BytesDeserializer { #[cfg(test)] mod tests { - use vector_core::config::log_schema; - use vrl::value::Value; - use super::*; + use vrl::value::Value; #[test] fn deserialize_bytes_legacy_namespace() { @@ -105,10 +103,7 @@ mod tests { { let event = events.next().unwrap(); let log = event.as_log(); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "foo".into() - ); + assert_eq!(*log.get_message().unwrap(), "foo".into()); } assert_eq!(events.next(), None); diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index 11f627f4c1418..4eee59b608bc5 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -162,8 +162,8 @@ impl LogEvent { let mut log = LogEvent::default(); log.maybe_insert(log_schema().message_key_target_path(), msg.into()); - if let Some(timestamp_key) = log_schema().timestamp_key() { - log.insert((PathPrefix::Event, timestamp_key), Utc::now()); + if let Some(timestamp_key) = log_schema().timestamp_key_target_path() { + log.insert(timestamp_key, Utc::now()); } log @@ -495,8 +495,8 @@ impl LogEvent { match self.namespace() { LogNamespace::Vector => self.get_by_meaning("message"), LogNamespace::Legacy => log_schema() - .message_key() - .and_then(|key| self.get((PathPrefix::Event, key))), + .message_key_target_path() + .and_then(|key| self.get(key)), } } @@ -506,8 +506,8 @@ impl LogEvent { match self.namespace() { LogNamespace::Vector => self.get_by_meaning("timestamp"), LogNamespace::Legacy => log_schema() - .timestamp_key() - .and_then(|key| self.get((PathPrefix::Event, key))), + .timestamp_key_target_path() + .and_then(|key| self.get(key)), } } @@ -525,8 +525,8 @@ impl LogEvent { match self.namespace() { LogNamespace::Vector => self.get_by_meaning("host"), LogNamespace::Legacy => log_schema() - .host_key() - .and_then(|key| self.get((PathPrefix::Event, key))), + .host_key_target_path() + .and_then(|key| self.get(key)), } } @@ -536,8 +536,8 @@ impl LogEvent { match self.namespace() { LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")), LogNamespace::Legacy => log_schema() - .source_type_key() - .and_then(|key| self.get((PathPrefix::Event, key))), + .source_type_key_target_path() + .and_then(|key| self.get(key)), } } } @@ -568,8 +568,8 @@ mod test_utils { fn from(message: Bytes) -> Self { let mut log = LogEvent::default(); log.maybe_insert(log_schema().message_key_target_path(), message); - if let Some(timestamp_key) = log_schema().timestamp_key() { - log.insert((PathPrefix::Event, timestamp_key), Utc::now()); + if let Some(timestamp_key) = log_schema().timestamp_key_target_path() { + log.insert(timestamp_key, Utc::now()); } log } diff --git a/src/sinks/clickhouse/integration_tests.rs b/src/sinks/clickhouse/integration_tests.rs index 0e18a7bd2a375..2408a892999f2 100644 --- a/src/sinks/clickhouse/integration_tests.rs +++ b/src/sinks/clickhouse/integration_tests.rs @@ -178,10 +178,7 @@ async fn insert_events_unix_timestamps() { format!( "{}", exp_event - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) + .get_timestamp() .unwrap() .as_timestamp() .unwrap() @@ -242,10 +239,7 @@ timestamp_format = "unix""#, format!( "{}", exp_event - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) + .get_timestamp() .unwrap() .as_timestamp() .unwrap() diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index 2869152f45e54..10e92916d3d62 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -23,7 +23,6 @@ mod errors; use crate::config::{ComponentKey, OutputId}; use crate::schema::Definition; pub use errors::{ClosedError, StreamSendError}; -use lookup::PathPrefix; pub(crate) const CHUNK_SIZE: usize = 1000; @@ -356,18 +355,23 @@ impl Inner { fn emit_lag_time(&self, event: EventRef<'_>, reference: i64) { if let Some(lag_time_metric) = &self.lag_time { let timestamp = match event { - EventRef::Log(log) => log_schema().timestamp_key().and_then(|timestamp_key| { - log.get((PathPrefix::Event, timestamp_key)) - .and_then(get_timestamp_millis) - }), + EventRef::Log(log) => { + log_schema() + .timestamp_key_target_path() + .and_then(|timestamp_key| { + log.get(timestamp_key).and_then(get_timestamp_millis) + }) + } EventRef::Metric(metric) => metric .timestamp() .map(|timestamp| timestamp.timestamp_millis()), - EventRef::Trace(trace) => log_schema().timestamp_key().and_then(|timestamp_key| { - trace - .get((PathPrefix::Event, timestamp_key)) - .and_then(get_timestamp_millis) - }), + EventRef::Trace(trace) => { + log_schema() + .timestamp_key_target_path() + .and_then(|timestamp_key| { + trace.get(timestamp_key).and_then(get_timestamp_millis) + }) + } }; if let Some(timestamp) = timestamp { // This will truncate precision for values larger than 2**52, but at that point the user diff --git a/src/sources/amqp.rs b/src/sources/amqp.rs index 99346ead6e790..578584808f451 100644 --- a/src/sources/amqp.rs +++ b/src/sources/amqp.rs @@ -711,15 +711,9 @@ mod integration_test { let log = events[0].as_log(); trace!("{:?}", log); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "my message".into() - ); + assert_eq!(*log.get_message().unwrap(), "my message".into()); assert_eq!(log["routing"], routing_key.into()); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "amqp".into() - ); + assert_eq!(*log.get_source_type().unwrap(), "amqp".into()); let log_ts = log[log_schema().timestamp_key().unwrap().to_string()] .as_timestamp() .unwrap(); diff --git a/src/sources/aws_sqs/source.rs b/src/sources/aws_sqs/source.rs index 2e290a85a65ae..1e7659a30cec0 100644 --- a/src/sources/aws_sqs/source.rs +++ b/src/sources/aws_sqs/source.rs @@ -320,10 +320,7 @@ mod tests { events[0] .clone() .as_log() - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) + .get_timestamp() .unwrap() .to_string_lossy(), now.to_rfc3339_opts(SecondsFormat::AutoSi, true) diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index 992a35a1d4078..3cc56a038bb36 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -30,7 +30,7 @@ use vrl::value::Kind; use crate::schema::Definition; use crate::{ common::datadog::{DatadogMetricType, DatadogPoint, DatadogSeriesMetric}, - config::{log_schema, SourceConfig, SourceContext}, + config::{SourceConfig, SourceContext}, event::{ into_event_stream, metric::{MetricKind, MetricSketch, MetricValue}, @@ -238,10 +238,7 @@ async fn full_payload_v1() { assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); assert!(event.metadata().datadog_api_key().is_none()); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "datadog_agent".into() - ); + assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into()); assert_eq!( event.metadata().schema_definition(), &test_logs_schema_definition() @@ -303,10 +300,7 @@ async fn full_payload_v2() { assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); assert!(event.metadata().datadog_api_key().is_none()); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "datadog_agent".into() - ); + assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into()); assert_eq!( event.metadata().schema_definition(), &test_logs_schema_definition() @@ -368,10 +362,7 @@ async fn no_api_key() { assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); assert!(event.metadata().datadog_api_key().is_none()); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "datadog_agent".into() - ); + assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into()); assert_eq!( event.metadata().schema_definition(), &test_logs_schema_definition() @@ -432,10 +423,7 @@ async fn api_key_in_url() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "datadog_agent".into() - ); + assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into()); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" @@ -500,10 +488,7 @@ async fn api_key_in_query_params() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "datadog_agent".into() - ); + assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into()); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" @@ -574,10 +559,7 @@ async fn api_key_in_header() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "datadog_agent".into() - ); + assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into()); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" @@ -724,10 +706,7 @@ async fn ignores_api_key() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "datadog_agent".into() - ); + assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into()); assert!(event.metadata().datadog_api_key().is_none()); assert_eq!( event.metadata().schema_definition(), @@ -1419,10 +1398,7 @@ async fn split_outputs() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "datadog_agent".into() - ); + assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into()); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" diff --git a/src/sources/docker_logs/tests.rs b/src/sources/docker_logs/tests.rs index 7a7b928444fa1..f68e3580920c5 100644 --- a/src/sources/docker_logs/tests.rs +++ b/src/sources/docker_logs/tests.rs @@ -440,10 +440,7 @@ mod integration_tests { .unwrap() .assert_valid_for_event(&events[0]); let log = events[0].as_log(); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - message.into() - ); + assert_eq!(*log.get_message().unwrap(), message.into()); assert_eq!(log[CONTAINER], id.into()); assert!(log.get(CREATED_AT).is_some()); assert_eq!(log[IMAGE], "busybox".into()); @@ -640,10 +637,7 @@ mod integration_tests { .unwrap() .assert_valid_for_event(&events[0]); let log = events[0].as_log(); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - message.into() - ); + assert_eq!(*log.get_message().unwrap(), message.into()); assert_eq!(log[CONTAINER], id.into()); assert!(log.get(CREATED_AT).is_some()); assert_eq!(log[IMAGE], "busybox".into()); @@ -778,10 +772,7 @@ mod integration_tests { .unwrap() .assert_valid_for_event(&events[0]); let log = events[0].as_log(); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - message.into() - ); + assert_eq!(*log.get_message().unwrap(), message.into()); assert_eq!(log[CONTAINER], id.into()); assert!(log.get(CREATED_AT).is_some()); assert_eq!(log[IMAGE], "busybox".into()); @@ -830,10 +821,7 @@ mod integration_tests { .unwrap() .assert_valid_for_event(&events[0]); let log = events[0].as_log(); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - message.into() - ); + assert_eq!(*log.get_message().unwrap(), message.into()); }) .await; } diff --git a/src/sources/exec/mod.rs b/src/sources/exec/mod.rs index dbb64b53f90fa..cbf2ac1d4b331 100644 --- a/src/sources/exec/mod.rs +++ b/src/sources/exec/mod.rs @@ -723,6 +723,8 @@ fn spawn_reader_thread( #[cfg(test)] mod tests { + use super::*; + use crate::{event::LogEvent, test_util::trace_init}; use bytes::Bytes; use std::io::Cursor; use vector_core::event::EventMetadata; @@ -731,11 +733,6 @@ mod tests { #[cfg(unix)] use futures::task::Poll; - use super::*; - use crate::config::log_schema; - - use crate::{event::LogEvent, test_util::trace_init}; - #[test] fn test_generate_config() { crate::test_util::test_generate_config::(); @@ -759,27 +756,13 @@ mod tests { ); let log = event.as_log(); - assert_eq!( - log[log_schema().host_key().unwrap().to_string().as_str()], - "Some.Machine".into() - ); + assert_eq!(*log.get_host().unwrap(), "Some.Machine".into()); assert_eq!(log[STREAM_KEY], STDOUT.into()); assert_eq!(log[PID_KEY], (8888_i64).into()); assert_eq!(log[COMMAND_KEY], config.command.into()); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "hello world".into() - ); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "exec".into() - ); - assert!(log - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert_eq!(*log.get_message().unwrap(), "hello world".into()); + assert_eq!(*log.get_source_type().unwrap(), "exec".into()); + assert!(log.get_timestamp().is_some()); } #[test] @@ -849,27 +832,13 @@ mod tests { ); let log = event.as_log(); - assert_eq!( - log[log_schema().host_key().unwrap().to_string().as_str()], - "Some.Machine".into() - ); + assert_eq!(*log.get_host().unwrap(), "Some.Machine".into()); assert_eq!(log[STREAM_KEY], STDOUT.into()); assert_eq!(log[PID_KEY], (8888_i64).into()); assert_eq!(log[COMMAND_KEY], config.command.into()); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "hello world".into() - ); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "exec".into() - ); - assert!(log - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert_eq!(*log.get_message().unwrap(), "hello world".into()); + assert_eq!(*log.get_source_type().unwrap(), "exec".into()); + assert!(log.get_timestamp().is_some()); } #[test] @@ -970,7 +939,7 @@ mod tests { assert_eq!(events.len(), 1); let log = events[0].as_log(); assert_eq!( - log[log_schema().message_key().unwrap().to_string()], + *log.get_message().unwrap(), Bytes::from("hello world").into() ); assert_eq!(origin, STDOUT); @@ -982,7 +951,7 @@ mod tests { assert_eq!(events.len(), 1); let log = events[0].as_log(); assert_eq!( - log[log_schema().message_key().unwrap().to_string()], + *log.get_message().unwrap(), Bytes::from("hello rocket 🚀").into() ); assert_eq!(origin, STDOUT); @@ -1062,25 +1031,11 @@ mod tests { let log = event.as_log(); assert_eq!(log[COMMAND_KEY], config.command.clone().into()); assert_eq!(log[STREAM_KEY], STDOUT.into()); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "exec".into() - ); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "Hello World!".into() - ); - assert_eq!( - log[log_schema().host_key().unwrap().to_string().as_str()], - "Some.Machine".into() - ); + assert_eq!(*log.get_source_type().unwrap(), "exec".into()); + assert_eq!(*log.get_message().unwrap(), "Hello World!".into()); + assert_eq!(*log.get_host().unwrap(), "Some.Machine".into()); assert!(log.get(PID_KEY).is_some()); - assert!(log - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(log.get_timestamp().is_some()); assert_eq!(8, log.all_fields().unwrap().count()); } else { @@ -1131,20 +1086,14 @@ mod tests { if let Poll::Ready(Some(event)) = futures::poll!(rx.next()) { let log = event.as_log(); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "signal received".into() - ); + assert_eq!(*log.get_message().unwrap(), "signal received".into()); } else { panic!("Expected to receive event"); } if let Poll::Ready(Some(event)) = futures::poll!(rx.next()) { let log = event.as_log(); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "slept".into() - ); + assert_eq!(*log.get_message().unwrap(), "slept".into()); } else { panic!("Expected to receive event"); } diff --git a/src/sources/file.rs b/src/sources/file.rs index ae4a2a79a6b01..bb1cc2301fa36 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -1036,14 +1036,8 @@ mod tests { assert_eq!(log["file"], "some_file.rs".into()); assert_eq!(log["host"], "Some.Machine".into()); assert_eq!(log["offset"], 0.into()); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "hello world".into() - ); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "file".into() - ); + assert_eq!(*log.get_message().unwrap(), "hello world".into()); + assert_eq!(*log.get_source_type().unwrap(), "file".into()); assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp()); } @@ -1064,14 +1058,8 @@ mod tests { assert_eq!(log["file_path"], "some_file.rs".into()); assert_eq!(log["hostname"], "Some.Machine".into()); assert_eq!(log["off"], 0.into()); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "hello world".into() - ); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "file".into() - ); + assert_eq!(*log.get_message().unwrap(), "hello world".into()); + assert_eq!(*log.get_source_type().unwrap(), "file".into()); assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp()); } @@ -2336,11 +2324,7 @@ mod tests { received .into_iter() .map(Event::into_log) - .map(|log| { - log[log_schema().message_key().unwrap().to_string()] - .to_string_lossy() - .into_owned() - }) + .map(|log| log.get_message().unwrap().to_string_lossy().into_owned()) .collect() } @@ -2348,7 +2332,7 @@ mod tests { received .into_iter() .map(Event::into_log) - .map(|log| log[log_schema().message_key().unwrap().to_string()].clone()) + .map(|log| log.get_message().unwrap().clone()) .collect() } } diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index cbfa33ce7c6e3..fe55fad2037d1 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -521,7 +521,7 @@ mod tests { let log = event.as_log(); assert_eq!( - log[log_schema().message_key().unwrap().to_string()], + *log.get_message().unwrap(), r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into() ); assert_eq!( @@ -531,8 +531,8 @@ mod tests { .unwrap() .into() ); - assert_eq!(log[log_schema().host_key().unwrap().to_string()], "host".into()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "heroku_logs".into()); + assert_eq!(*log.get_host().unwrap(), "host".into()); + assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into()); assert_eq!(log["appname"], "lumberjack-store".into()); assert_eq!(log["absent"], Value::Null); }).await; @@ -606,10 +606,7 @@ mod tests { let events = super::line_to_events(Default::default(), log_namespace, body.into()); let log = events[0].as_log(); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "foo bar baz".into() - ); + assert_eq!(*log.get_message().unwrap(), "foo bar baz".into()); assert_eq!( log[log_schema().timestamp_key().unwrap().to_string()], "2020-01-08T22:33:57.353034+00:00" @@ -617,14 +614,8 @@ mod tests { .unwrap() .into() ); - assert_eq!( - log[log_schema().host_key().unwrap().to_string().as_str()], - "host".into() - ); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "heroku_logs".into() - ); + assert_eq!(*log.get_host().unwrap(), "host".into()); + assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into()); } #[test] @@ -634,20 +625,9 @@ mod tests { let events = super::line_to_events(Default::default(), log_namespace, body.into()); let log = events[0].as_log(); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "what am i doing here".into() - ); - assert!(log - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "heroku_logs".into() - ); + assert_eq!(*log.get_message().unwrap(), "what am i doing here".into()); + assert!(log.get_timestamp().is_some()); + assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into()); } #[test] @@ -657,10 +637,7 @@ mod tests { let events = super::line_to_events(Default::default(), log_namespace, body.into()); let log = events[0].as_log(); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "i'm not that long".into() - ); + assert_eq!(*log.get_message().unwrap(), "i'm not that long".into()); assert_eq!( log[log_schema().timestamp_key().unwrap().to_string()], "2020-01-08T22:33:57.353034+00:00" @@ -668,14 +645,8 @@ mod tests { .unwrap() .into() ); - assert_eq!( - log[log_schema().host_key().unwrap().to_string().as_str()], - "host".into() - ); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], - "heroku_logs".into() - ); + assert_eq!(*log.get_host().unwrap(), "host".into()); + assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into()); } #[test] diff --git a/src/sources/http_client/integration_tests.rs b/src/sources/http_client/integration_tests.rs index 495941ed25d9e..68e4842ba99d5 100644 --- a/src/sources/http_client/integration_tests.rs +++ b/src/sources/http_client/integration_tests.rs @@ -86,7 +86,7 @@ async fn collected_logs_bytes() { // panics if not log event let log = events[0].as_log(); assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], + *log.get_source_type().unwrap(), HttpClientConfig::NAME.into() ); } @@ -111,7 +111,7 @@ async fn collected_logs_json() { // panics if not log event let log = events[0].as_log(); assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], + *log.get_source_type().unwrap(), HttpClientConfig::NAME.into() ); } diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 354e1c4bb4525..be3b349df94c0 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -654,18 +654,10 @@ mod tests { { let event = events.remove(0); let log = event.as_log(); + assert_eq!(*log.get_message().unwrap(), "test body".into()); + assert!(log.get_timestamp().is_some()); assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "test body".into() - ); - assert!(log - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); - assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], + *log.get_source_type().unwrap(), SimpleHttpConfig::NAME.into() ); assert_eq!(log["http_path"], "/".into()); @@ -674,10 +666,7 @@ mod tests { { let event = events.remove(0); let log = event.as_log(); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "test body 2".into() - ); + assert_eq!(*log.get_message().unwrap(), "test body 2".into()); assert_event_metadata(log).await; } } @@ -709,19 +698,13 @@ mod tests { { let event = events.remove(0); let log = event.as_log(); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "test body".into() - ); + assert_eq!(*log.get_message().unwrap(), "test body".into()); assert_event_metadata(log).await; } { let event = events.remove(0); let log = event.as_log(); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "test body 2".into() - ); + assert_eq!(*log.get_message().unwrap(), "test body 2".into()); assert_event_metadata(log).await; } } @@ -754,10 +737,7 @@ mod tests { { let event = events.remove(0); let log = event.as_log(); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "foo\nbar".into() - ); + assert_eq!(*log.get_message().unwrap(), "foo\nbar".into()); assert_event_metadata(log).await; } } @@ -794,22 +774,8 @@ mod tests { }) .await; - assert!(events - .remove(1) - .as_log() - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); - assert!(events - .remove(0) - .as_log() - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(events.remove(1).as_log().get_timestamp().is_some()); + assert!(events.remove(0).as_log().get_timestamp().is_some()); } #[tokio::test] @@ -967,12 +933,7 @@ mod tests { } async fn assert_event_metadata(log: &LogEvent) { - assert!(log - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(log.get_timestamp().is_some()); let source_type_key_value = log .get(( @@ -1109,10 +1070,7 @@ mod tests { { let event = events.remove(0); let log = event.as_log(); - assert_eq!( - log[log_schema().message_key().unwrap().to_string()], - "test body".into() - ); + assert_eq!(*log.get_message().unwrap(), "test body".into()); assert_event_metadata(log).await; } } @@ -1148,14 +1106,9 @@ mod tests { let log = event.as_log(); assert_eq!(log["key1"], "value1".into()); assert_eq!(log["vector_http_path"], "/event/path".into()); - assert!(log - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(log.get_timestamp().is_some()); assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], + *log.get_source_type().unwrap(), SimpleHttpConfig::NAME.into() ); } @@ -1201,14 +1154,9 @@ mod tests { let log = event.as_log(); assert_eq!(log["key1"], "value1".into()); assert_eq!(log["vector_http_path"], "/event/path1".into()); - assert!(log - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(log.get_timestamp().is_some()); assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], + *log.get_source_type().unwrap(), SimpleHttpConfig::NAME.into() ); } @@ -1217,14 +1165,9 @@ mod tests { let log = event.as_log(); assert_eq!(log["key2"], "value2".into()); assert_eq!(log["vector_http_path"], "/event/path2".into()); - assert!(log - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(log.get_timestamp().is_some()); assert_eq!( - log[log_schema().source_type_key().unwrap().to_string()], + *log.get_source_type().unwrap(), SimpleHttpConfig::NAME.into() ); } diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index b6489ef8b22d3..78deae6cc60fc 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -1434,13 +1434,7 @@ mod tests { event.as_log()[log_schema().message_key().unwrap().to_string()], message.into() ); - assert!(event - .as_log() - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(event.as_log().get_timestamp().is_some()); assert_eq!( event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() @@ -1464,13 +1458,7 @@ mod tests { event.as_log()[log_schema().message_key().unwrap().to_string()], message.into() ); - assert!(event - .as_log() - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(event.as_log().get_timestamp().is_some()); assert_eq!( event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() @@ -1498,13 +1486,7 @@ mod tests { event.as_log()[log_schema().message_key().unwrap().to_string()], msg.into() ); - assert!(event - .as_log() - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(event.as_log().get_timestamp().is_some()); assert_eq!( event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() @@ -1529,13 +1511,7 @@ mod tests { event.as_log()[log_schema().message_key().unwrap().to_string()], message.into() ); - assert!(event - .as_log() - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(event.as_log().get_timestamp().is_some()); assert_eq!( event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() @@ -1563,13 +1539,7 @@ mod tests { event.as_log()[log_schema().message_key().unwrap().to_string()], msg.into() ); - assert!(event - .as_log() - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(event.as_log().get_timestamp().is_some()); assert_eq!( event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() @@ -1595,12 +1565,7 @@ mod tests { let event = collect_n(source, 1).await.remove(0).into_log(); assert_eq!(event["greeting"], "hello".into()); assert_eq!(event["name"], "bob".into()); - assert!(event - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(event.get_timestamp().is_some()); assert_eq!( event[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() @@ -1665,13 +1630,7 @@ mod tests { message.into() ); assert_eq!(event.as_log()[&super::CHANNEL], "channel".into()); - assert!(event - .as_log() - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(event.as_log().get_timestamp().is_some()); assert_eq!( event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() @@ -1695,13 +1654,7 @@ mod tests { "root".into() ); assert_eq!(event.as_log()[&super::CHANNEL], "channel".into()); - assert!(event - .as_log() - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(event.as_log().get_timestamp().is_some()); assert_eq!( event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() @@ -1949,13 +1902,7 @@ mod tests { message.into() ); assert_eq!(event.as_log()[&super::CHANNEL], "channel".into()); - assert!(event - .as_log() - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(event.as_log().get_timestamp().is_some()); assert_eq!( event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() @@ -2035,13 +1982,7 @@ mod tests { event.as_log()[log_schema().message_key().unwrap().to_string()], "first".into() ); - assert!(event - .as_log() - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(event.as_log().get_timestamp().is_some()); assert_eq!( event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() @@ -2068,13 +2009,7 @@ mod tests { event.as_log()[log_schema().message_key().unwrap().to_string()], "first".into() ); - assert!(event - .as_log() - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(event.as_log().get_timestamp().is_some()); assert_eq!( event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() @@ -2099,13 +2034,7 @@ mod tests { event.as_log()[log_schema().message_key().unwrap().to_string()], "first".into() ); - assert!(event - .as_log() - .get(( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap() - )) - .is_some()); + assert!(event.as_log().get_timestamp().is_some()); assert_eq!( event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() diff --git a/src/template.rs b/src/template.rs index 8b7fd823b97b0..b4230f351b12e 100644 --- a/src/template.rs +++ b/src/template.rs @@ -7,7 +7,6 @@ use chrono::{ Utc, }; use lookup::lookup_v2::parse_target_path; -use lookup::PathPrefix; use once_cell::sync::Lazy; use regex::Regex; use snafu::Snafu; @@ -343,18 +342,24 @@ fn render_metric_field<'a>(key: &str, metric: &'a Metric) -> Option<&'a str> { fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>) -> String { match event { - EventRef::Log(log) => log_schema().timestamp_key().and_then(|timestamp_key| { - log.get((PathPrefix::Event, timestamp_key)) - .and_then(Value::as_timestamp) - .copied() - }), + EventRef::Log(log) => log_schema() + .timestamp_key_target_path() + .and_then(|timestamp_key| { + log.get(timestamp_key) + .and_then(Value::as_timestamp) + .copied() + }), EventRef::Metric(metric) => metric.timestamp(), - EventRef::Trace(trace) => log_schema().timestamp_key().and_then(|timestamp_key| { - trace - .get((PathPrefix::Event, timestamp_key)) - .and_then(Value::as_timestamp) - .copied() - }), + EventRef::Trace(trace) => { + log_schema() + .timestamp_key_target_path() + .and_then(|timestamp_key| { + trace + .get(timestamp_key) + .and_then(Value::as_timestamp) + .copied() + }) + } } .unwrap_or_else(Utc::now) .format_with_items(items.as_items()) diff --git a/src/test_util/mock/transforms/basic.rs b/src/test_util/mock/transforms/basic.rs index 1d9b65a5b27e3..e7906351706f9 100644 --- a/src/test_util/mock/transforms/basic.rs +++ b/src/test_util/mock/transforms/basic.rs @@ -12,7 +12,6 @@ use vector_core::{ schema, transform::{FunctionTransform, OutputBuffer, Transform}, }; -use vrl::path::PathPrefix; use vrl::value::Value; use crate::config::{OutputId, TransformConfig, TransformContext}; @@ -76,11 +75,10 @@ impl FunctionTransform for BasicTransform { fn transform(&mut self, output: &mut OutputBuffer, mut event: Event) { match &mut event { Event::Log(log) => { - if let Some(message_key) = crate::config::log_schema().message_key() { - let target_path = (PathPrefix::Event, message_key); - let mut v = log.get(target_path).unwrap().to_string_lossy().into_owned(); + if let Some(message_key) = crate::config::log_schema().message_key_target_path() { + let mut v = log.get(message_key).unwrap().to_string_lossy().into_owned(); v.push_str(&self.suffix); - log.insert(target_path, Value::from(v)); + log.insert(message_key, Value::from(v)); } } Event::Metric(metric) => { diff --git a/src/topology/test/mod.rs b/src/topology/test/mod.rs index d772a7162bf66..26283229384b1 100644 --- a/src/topology/test/mod.rs +++ b/src/topology/test/mod.rs @@ -29,7 +29,6 @@ use tokio::{ use vector_buffers::{BufferConfig, BufferType, WhenFull}; use vector_common::config::ComponentKey; use vector_core::config::OutputId; -use vrl::path::PathPrefix; mod backpressure; mod compliance; @@ -68,10 +67,12 @@ fn basic_config_with_sink_failing_healthcheck() -> Config { } fn into_message(event: Event) -> String { - let message_key = crate::config::log_schema().message_key().unwrap(); + let message_key = crate::config::log_schema() + .message_key_target_path() + .unwrap(); event .as_log() - .get((PathPrefix::Event, message_key)) + .get(message_key) .unwrap() .to_string_lossy() .into_owned()