From 28f5c23aa84f70736fe5ef5132e274b3611cceb9 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Thu, 27 Jul 2023 13:06:22 -0400 Subject: [PATCH] feat: replace tuples with &OwnedTargetPath wherever possible (#18097) * feat: replace tuples with &OwnedTargetPath wherever possible * one more replacement * remove unused imports --- benches/template.rs | 10 ++----- lib/codecs/src/decoding/format/bytes.rs | 3 +- lib/codecs/src/decoding/format/gelf.rs | 15 ++++------ lib/codecs/src/decoding/format/json.rs | 9 +++--- lib/codecs/src/decoding/format/syslog.rs | 8 ++---- lib/opentelemetry-proto/src/convert.rs | 3 +- lib/vector-core/src/event/log_event.rs | 15 ++++------ lib/vector-core/src/event/vrl_target.rs | 2 +- src/sinks/humio/logs.rs | 5 +--- src/sinks/loki/integration_tests.rs | 9 +++--- src/sources/aws_sqs/integration_tests.rs | 3 +- src/sources/aws_sqs/source.rs | 10 +++---- src/sources/docker_logs/tests.rs | 2 +- src/sources/fluent/mod.rs | 6 ++-- src/sources/journald.rs | 8 ++---- src/sources/splunk_hec/mod.rs | 11 ++------ src/sources/syslog.rs | 16 +++++------ src/transforms/log_to_metric.rs | 36 ++++++++++-------------- src/transforms/metric_to_log.rs | 8 ++---- 19 files changed, 67 insertions(+), 112 deletions(-) diff --git a/benches/template.rs b/benches/template.rs index 949f0beeb52b5..008df426c98bc 100644 --- a/benches/template.rs +++ b/benches/template.rs @@ -13,10 +13,7 @@ fn bench_elasticsearch_index(c: &mut Criterion) { let index = Template::try_from("index-%Y.%m.%d").unwrap(); let mut event = Event::Log(LogEvent::from("hello world")); event.as_mut_log().insert( - ( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap(), - ), + log_schema().timestamp_key_target_path().unwrap(), Utc::now(), ); @@ -31,10 +28,7 @@ fn bench_elasticsearch_index(c: &mut Criterion) { let index = Template::try_from("index").unwrap(); let mut event = Event::Log(LogEvent::from("hello world")); event.as_mut_log().insert( - ( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap(), - ), + log_schema().timestamp_key_target_path().unwrap(), Utc::now(), ); diff --git a/lib/codecs/src/decoding/format/bytes.rs b/lib/codecs/src/decoding/format/bytes.rs index cb3d19f1f81d2..666b2dacbe530 100644 --- a/lib/codecs/src/decoding/format/bytes.rs +++ b/lib/codecs/src/decoding/format/bytes.rs @@ -8,7 +8,6 @@ use vector_core::{ event::{Event, LogEvent}, schema, }; -use vrl::path::PathPrefix; use vrl::value::Kind; use super::Deserializer; @@ -63,7 +62,7 @@ impl BytesDeserializer { LogNamespace::Vector => log_namespace.new_log_from_data(bytes), LogNamespace::Legacy => { let mut log = LogEvent::default(); - log.maybe_insert(PathPrefix::Event, log_schema().message_key(), bytes); + log.maybe_insert(log_schema().message_key_target_path(), bytes); log } } diff --git a/lib/codecs/src/decoding/format/gelf.rs b/lib/codecs/src/decoding/format/gelf.rs index f762eb0e151a5..7f9f17353b42a 100644 --- a/lib/codecs/src/decoding/format/gelf.rs +++ b/lib/codecs/src/decoding/format/gelf.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use chrono::{DateTime, NaiveDateTime, Utc}; use derivative::Derivative; -use lookup::{event_path, owned_value_path, PathPrefix}; +use lookup::{event_path, owned_value_path}; use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; use std::collections::HashMap; @@ -130,20 +130,17 @@ impl GelfDeserializer { log.insert(FULL_MESSAGE, full_message.to_string()); } - if let Some(timestamp_key) = log_schema().timestamp_key() { + if let Some(timestamp_key) = log_schema().timestamp_key_target_path() { if let Some(timestamp) = parsed.timestamp { let naive = NaiveDateTime::from_timestamp_opt( f64::trunc(timestamp) as i64, f64::fract(timestamp) as u32, ) .expect("invalid timestamp"); - log.insert( - (PathPrefix::Event, timestamp_key), - DateTime::::from_utc(naive, Utc), - ); + log.insert(timestamp_key, DateTime::::from_utc(naive, Utc)); // per GELF spec- add timestamp if not provided } else { - log.insert((PathPrefix::Event, timestamp_key), Utc::now()); + log.insert(timestamp_key, Utc::now()); } } @@ -293,7 +290,7 @@ mod tests { Some(&Value::Bytes(Bytes::from_static(b"example.org"))) ); assert_eq!( - log.get((PathPrefix::Event, log_schema().message_key().unwrap())), + log.get(log_schema().message_key_target_path().unwrap()), Some(&Value::Bytes(Bytes::from_static( b"A short message that helps you identify what is going on" ))) @@ -348,7 +345,7 @@ mod tests { let events = deserialize_gelf_input(&input).unwrap(); assert_eq!(events.len(), 1); let log = events[0].as_log(); - assert!(log.contains((PathPrefix::Event, log_schema().message_key().unwrap()))); + assert!(log.contains(log_schema().message_key_target_path().unwrap())); } // filter out id diff --git a/lib/codecs/src/decoding/format/json.rs b/lib/codecs/src/decoding/format/json.rs index 67e7bc624bbdf..0906f739b803a 100644 --- a/lib/codecs/src/decoding/format/json.rs +++ b/lib/codecs/src/decoding/format/json.rs @@ -3,7 +3,6 @@ use std::convert::TryInto; use bytes::Bytes; use chrono::Utc; use derivative::Derivative; -use lookup::PathPrefix; use smallvec::{smallvec, SmallVec}; use vector_config::configurable_component; use vector_core::{ @@ -133,11 +132,11 @@ impl Deserializer for JsonDeserializer { LogNamespace::Legacy => { let timestamp = Utc::now(); - if let Some(timestamp_key) = log_schema().timestamp_key() { + if let Some(timestamp_key) = log_schema().timestamp_key_target_path() { for event in &mut events { let log = event.as_mut_log(); - if !log.contains((PathPrefix::Event, timestamp_key)) { - log.insert((PathPrefix::Event, timestamp_key), timestamp); + if !log.contains(timestamp_key) { + log.insert(timestamp_key, timestamp); } } } @@ -218,7 +217,7 @@ mod tests { let log = event.as_log(); assert_eq!(log["bar"], 456.into()); assert_eq!( - log.get((PathPrefix::Event, log_schema().timestamp_key().unwrap())) + log.get(log_schema().timestamp_key_target_path().unwrap()) .is_some(), namespace == LogNamespace::Legacy ); diff --git a/lib/codecs/src/decoding/format/syslog.rs b/lib/codecs/src/decoding/format/syslog.rs index 23e412ad75ea2..83870e30cfaa0 100644 --- a/lib/codecs/src/decoding/format/syslog.rs +++ b/lib/codecs/src/decoding/format/syslog.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use chrono::{DateTime, Datelike, Utc}; use derivative::Derivative; -use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix}; +use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath}; use smallvec::{smallvec, SmallVec}; use std::borrow::Cow; use std::collections::BTreeMap; @@ -428,7 +428,7 @@ fn insert_fields_from_syslog( ) { match log_namespace { LogNamespace::Legacy => { - log.maybe_insert(PathPrefix::Event, log_schema().message_key(), parsed.msg); + log.maybe_insert(log_schema().message_key_target_path(), parsed.msg); } LogNamespace::Vector => { log.insert(event_path!("message"), parsed.msg); @@ -439,9 +439,7 @@ fn insert_fields_from_syslog( let timestamp = DateTime::::from(timestamp); match log_namespace { LogNamespace::Legacy => { - if let Some(timestamp_key) = log_schema().timestamp_key() { - log.insert((PathPrefix::Event, timestamp_key), timestamp); - } + log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp); } LogNamespace::Vector => { log.insert(event_path!("timestamp"), timestamp); diff --git a/lib/opentelemetry-proto/src/convert.rs b/lib/opentelemetry-proto/src/convert.rs index 4b9f4804b5dcd..875be35d4dd9c 100644 --- a/lib/opentelemetry-proto/src/convert.rs +++ b/lib/opentelemetry-proto/src/convert.rs @@ -7,7 +7,6 @@ use vector_core::{ config::{log_schema, LegacyKey, LogNamespace}, event::{Event, LogEvent}, }; -use vrl::path::PathPrefix; use vrl::value::Value; use super::proto::{ @@ -95,7 +94,7 @@ impl ResourceLog { LogNamespace::Legacy => { let mut log = LogEvent::default(); if let Some(v) = self.log_record.body.and_then(|av| av.value) { - log.maybe_insert(PathPrefix::Event, log_schema().message_key(), v); + log.maybe_insert(log_schema().message_key_target_path(), v); } log } diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index 5e26e2542957e..11f627f4c1418 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -20,7 +20,7 @@ use vector_common::{ request_metadata::GetEventCountTags, EventDataEq, }; -use vrl::path::{OwnedTargetPath, OwnedValuePath}; +use vrl::path::OwnedTargetPath; use super::{ estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf, @@ -160,7 +160,7 @@ impl LogEvent { /// valid for `LogNamespace::Legacy` pub fn from_str_legacy(msg: impl Into) -> Self { let mut log = LogEvent::default(); - log.maybe_insert(PathPrefix::Event, log_schema().message_key(), msg.into()); + log.maybe_insert(log_schema().message_key_target_path(), msg.into()); if let Some(timestamp_key) = log_schema().timestamp_key() { log.insert((PathPrefix::Event, timestamp_key), Utc::now()); @@ -356,14 +356,9 @@ impl LogEvent { } } - pub fn maybe_insert( - &mut self, - prefix: PathPrefix, - path: Option<&OwnedValuePath>, - value: impl Into, - ) { + pub fn maybe_insert<'a>(&mut self, path: Option>, value: impl Into) { if let Some(path) = path { - self.insert((prefix, path), value); + self.insert(path, value); } } @@ -572,7 +567,7 @@ mod test_utils { impl From for LogEvent { fn from(message: Bytes) -> Self { let mut log = LogEvent::default(); - log.maybe_insert(PathPrefix::Event, log_schema().message_key(), message); + log.maybe_insert(log_schema().message_key_target_path(), message); if let Some(timestamp_key) = log_schema().timestamp_key() { log.insert((PathPrefix::Event, timestamp_key), Utc::now()); } diff --git a/lib/vector-core/src/event/vrl_target.rs b/lib/vector-core/src/event/vrl_target.rs index bbd2f6d1c9754..a3d039778b60c 100644 --- a/lib/vector-core/src/event/vrl_target.rs +++ b/lib/vector-core/src/event/vrl_target.rs @@ -54,7 +54,7 @@ pub struct TargetIter { fn create_log_event(value: Value, metadata: EventMetadata) -> LogEvent { let mut log = LogEvent::new_with_metadata(metadata); - log.maybe_insert(PathPrefix::Event, log_schema().message_key(), value); + log.maybe_insert(log_schema().message_key_target_path(), value); log } diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index fc76533a5830d..a1d48088deddc 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -269,10 +269,7 @@ mod integration_tests { ); let ts = Utc.timestamp_nanos(Utc::now().timestamp_millis() * 1_000_000 + 132_456); - event.insert( - (PathPrefix::Event, log_schema().timestamp_key().unwrap()), - ts, - ); + event.insert(log_schema().timestamp_key_target_path().unwrap(), ts); run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await; diff --git a/src/sinks/loki/integration_tests.rs b/src/sinks/loki/integration_tests.rs index 8f4232de6fdcf..7f27cc5c19fb7 100644 --- a/src/sinks/loki/integration_tests.rs +++ b/src/sinks/loki/integration_tests.rs @@ -10,7 +10,6 @@ use vector_core::{ config::LogNamespace, event::{BatchNotifier, BatchStatus, Event, LogEvent}, }; -use vrl::path::PathPrefix; use vrl::value::{kind::Collection, Kind}; use super::config::{LokiConfig, OutOfOrderAction}; @@ -328,7 +327,7 @@ async fn many_streams() { let index = (i % 5) * 2; let message = lines[index] .as_log() - .get((PathPrefix::Event, log_schema().message_key().unwrap())) + .get(log_schema().message_key_target_path().unwrap()) .unwrap() .to_string_lossy(); assert_eq!(output, &message); @@ -338,7 +337,7 @@ async fn many_streams() { let index = ((i % 5) * 2) + 1; let message = lines[index] .as_log() - .get((PathPrefix::Event, log_schema().message_key().unwrap())) + .get(log_schema().message_key_target_path().unwrap()) .unwrap() .to_string_lossy(); assert_eq!(output, &message); @@ -385,7 +384,7 @@ async fn interpolate_stream_key() { for (i, output) in outputs.iter().enumerate() { let message = lines[i] .as_log() - .get((PathPrefix::Event, log_schema().message_key().unwrap())) + .get(log_schema().message_key_target_path().unwrap()) .unwrap() .to_string_lossy(); assert_eq!(output, &message); @@ -638,7 +637,7 @@ async fn test_out_of_order_events( assert_eq!( &expected[i] .as_log() - .get((PathPrefix::Event, log_schema().message_key().unwrap())) + .get(log_schema().message_key_target_path().unwrap()) .unwrap() .to_string_lossy(), output, diff --git a/src/sources/aws_sqs/integration_tests.rs b/src/sources/aws_sqs/integration_tests.rs index be2cc6d67e938..567ba62cfc383 100644 --- a/src/sources/aws_sqs/integration_tests.rs +++ b/src/sources/aws_sqs/integration_tests.rs @@ -7,7 +7,6 @@ use aws_sdk_sqs::output::CreateQueueOutput; use aws_types::region::Region; use futures::StreamExt; use tokio::time::timeout; -use vrl::path::PathPrefix; use crate::{ aws::{auth::AwsAuthentication, region::RegionOrEndpoint}, @@ -110,7 +109,7 @@ pub(crate) async fn test() { for event in events { let message = event .as_log() - .get((PathPrefix::Event, log_schema().message_key().unwrap())) + .get(log_schema().message_key_target_path().unwrap()) .unwrap() .to_string_lossy(); if !expected_messages.remove(message.as_ref()) { diff --git a/src/sources/aws_sqs/source.rs b/src/sources/aws_sqs/source.rs index 2f8f9634317b1..2e290a85a65ae 100644 --- a/src/sources/aws_sqs/source.rs +++ b/src/sources/aws_sqs/source.rs @@ -219,14 +219,12 @@ async fn delete_messages(client: SqsClient, receipts: Vec, queue_url: St #[cfg(test)] mod tests { - use crate::codecs::DecodingConfig; - use chrono::SecondsFormat; - use lookup::path; - use vrl::path::PathPrefix; - use super::*; + use crate::codecs::DecodingConfig; use crate::config::{log_schema, SourceConfig}; use crate::sources::aws_sqs::AwsSqsConfig; + use chrono::SecondsFormat; + use lookup::path; #[tokio::test] async fn test_decode_vector_namespace() { @@ -313,7 +311,7 @@ mod tests { events[0] .clone() .as_log() - .get((PathPrefix::Event, log_schema().message_key().unwrap())) + .get(log_schema().message_key_target_path().unwrap()) .unwrap() .to_string_lossy(), message diff --git a/src/sources/docker_logs/tests.rs b/src/sources/docker_logs/tests.rs index 11738dca029e9..7a7b928444fa1 100644 --- a/src/sources/docker_logs/tests.rs +++ b/src/sources/docker_logs/tests.rs @@ -970,7 +970,7 @@ mod integration_tests { event .into_log() - .remove((PathPrefix::Event, log_schema().message_key().unwrap())) + .remove(log_schema().message_key_target_path().unwrap()) .unwrap() .to_string_lossy() .into_owned() diff --git a/src/sources/fluent/mod.rs b/src/sources/fluent/mod.rs index 1980fd514ae1f..f6ae0955d15eb 100644 --- a/src/sources/fluent/mod.rs +++ b/src/sources/fluent/mod.rs @@ -9,7 +9,7 @@ use chrono::Utc; use codecs::{BytesDeserializerConfig, StreamDecodingError}; use flate2::read::MultiGzDecoder; use lookup::lookup_v2::parse_value_path; -use lookup::{metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix}; +use lookup::{metadata_path, owned_value_path, path, OwnedValuePath}; use rmp_serde::{decode, Deserializer, Serializer}; use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; @@ -599,9 +599,7 @@ impl From> for LogEvent { log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now()); } LogNamespace::Legacy => { - if let Some(timestamp_key) = log_schema().timestamp_key() { - log.insert((PathPrefix::Event, timestamp_key), timestamp); - } + log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp); } } diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 0b876da9b7a28..bd31188d79a0f 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -12,7 +12,7 @@ use bytes::Bytes; use chrono::{TimeZone, Utc}; use codecs::{decoding::BoxedFramingError, CharacterDelimitedDecoder}; use futures::{poll, stream::BoxStream, task::Poll, StreamExt}; -use lookup::{metadata_path, owned_value_path, path, PathPrefix}; +use lookup::{metadata_path, owned_value_path, path}; use nix::{ sys::signal::{kill, Signal}, unistd::Pid, @@ -741,9 +741,7 @@ fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) { } LogNamespace::Legacy => { if let Some(ts) = timestamp { - if let Some(timestamp_key) = log_schema().timestamp_key() { - log.insert((PathPrefix::Event, timestamp_key), ts); - } + log.maybe_insert(log_schema().timestamp_key_target_path(), ts); } } } @@ -784,7 +782,7 @@ fn create_log_event_from_record( let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch); if let Some(message) = log.remove(MESSAGE) { - log.maybe_insert(PathPrefix::Event, log_schema().message_key(), message); + log.maybe_insert(log_schema().message_key_target_path(), message); } log diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 3c957be4084a7..b6489ef8b22d3 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -25,7 +25,6 @@ use vector_core::{ schema::meaning, EstimatedJsonEncodedSizeOf, }; -use vrl::path::PathPrefix; use vrl::value::{kind::Collection, Kind}; use warp::{filters::BoxedFilter, path, reject::Rejection, reply::Response, Filter, Reply}; @@ -812,7 +811,7 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> { if string.is_empty() { return Err(ApiError::EmptyEventField { event: self.events }.into()); } - log.maybe_insert(PathPrefix::Event, log_schema().message_key(), string); + log.maybe_insert(log_schema().message_key_target_path(), string); } JsonValue::Object(mut object) => { if object.is_empty() { @@ -827,11 +826,7 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> { log.insert("line", line); } _ => { - log.maybe_insert( - PathPrefix::Event, - log_schema().message_key(), - line, - ); + log.maybe_insert(log_schema().message_key_target_path(), line); } } } @@ -1005,7 +1000,7 @@ fn raw_event( LogNamespace::Vector => LogEvent::from(message), LogNamespace::Legacy => { let mut log = LogEvent::default(); - log.maybe_insert(PathPrefix::Event, log_schema().message_key(), message); + log.maybe_insert(log_schema().message_key_target_path(), message); log } }; diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index 0d57a77903020..0150da7f34466 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -10,7 +10,7 @@ use codecs::{ }; use futures::StreamExt; use listenfd::ListenFd; -use lookup::{lookup_v2::OptionalValuePath, path, OwnedValuePath, PathPrefix}; +use lookup::{lookup_v2::OptionalValuePath, path, OwnedValuePath}; use smallvec::SmallVec; use tokio_util::udp::UdpFramed; use vector_config::configurable_component; @@ -423,8 +423,8 @@ fn enrich_syslog_event( .get("timestamp") .and_then(|timestamp| timestamp.as_timestamp().cloned()) .unwrap_or_else(Utc::now); - if let Some(timestamp_key) = log_schema().timestamp_key() { - log.insert((PathPrefix::Event, timestamp_key), timestamp); + if let Some(timestamp_key) = log_schema().timestamp_key_target_path() { + log.insert(timestamp_key, timestamp); } } @@ -810,7 +810,7 @@ mod test { .expect("invalid timestamp"), ); expected.insert( - (PathPrefix::Event, log_schema().source_type_key().unwrap()), + log_schema().source_type_key_target_path().unwrap(), "syslog", ); expected.insert("host", "74794bfb6795"); @@ -868,7 +868,7 @@ mod test { ); expected.insert("hostname", "74794bfb6795"); expected.insert( - (PathPrefix::Event, log_schema().source_type_key().unwrap()), + log_schema().source_type_key_target_path().unwrap(), "syslog", ); expected.insert("severity", "notice"); @@ -1014,7 +1014,7 @@ mod test { "74794bfb6795", ); expected.insert( - (PathPrefix::Event, log_schema().source_type_key().unwrap()), + log_schema().source_type_key_target_path().unwrap(), "syslog", ); expected.insert("hostname", "74794bfb6795"); @@ -1062,7 +1062,7 @@ mod test { expected_date, ); expected.insert( - (PathPrefix::Event, log_schema().source_type_key().unwrap()), + log_schema().source_type_key_target_path().unwrap(), "syslog", ); expected.insert("host", "74794bfb6795"); @@ -1102,7 +1102,7 @@ mod test { .expect("invalid timestamp"), ); expected.insert( - (PathPrefix::Event, log_schema().source_type_key().unwrap()), + log_schema().source_type_key_target_path().unwrap(), "syslog", ); expected.insert("host", "74794bfb6795"); diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs index cb99cb186de8b..29c90fa370af9 100644 --- a/src/transforms/log_to_metric.rs +++ b/src/transforms/log_to_metric.rs @@ -408,15 +408,6 @@ impl FunctionTransform for LogToMetric { #[cfg(test)] mod tests { - use chrono::{offset::TimeZone, DateTime, Timelike, Utc}; - use lookup::PathPrefix; - use std::sync::Arc; - use std::time::Duration; - use tokio::sync::mpsc; - use tokio_stream::wrappers::ReceiverStream; - use vector_common::config::ComponentKey; - use vector_core::metric_tags; - use super::*; use crate::test_util::components::assert_transform_compliance; use crate::transforms::test::create_topology; @@ -427,6 +418,13 @@ mod tests { Event, LogEvent, }, }; + use chrono::{offset::TimeZone, DateTime, Timelike, Utc}; + use std::sync::Arc; + use std::time::Duration; + use tokio::sync::mpsc; + use tokio_stream::wrappers::ReceiverStream; + use vector_common::config::ComponentKey; + use vector_core::metric_tags; #[test] fn generate_config() { @@ -451,10 +449,8 @@ mod tests { fn create_event(key: &str, value: impl Into + std::fmt::Debug) -> Event { let mut log = Event::Log(LogEvent::from("i am a log")); log.as_mut_log().insert(key, value); - log.as_mut_log().insert( - (PathPrefix::Event, log_schema().timestamp_key().unwrap()), - ts(), - ); + log.as_mut_log() + .insert(log_schema().timestamp_key_target_path().unwrap(), ts()); log } @@ -837,10 +833,9 @@ mod tests { ); let mut event = Event::Log(LogEvent::from("i am a log")); - event.as_mut_log().insert( - (PathPrefix::Event, log_schema().timestamp_key().unwrap()), - ts(), - ); + event + .as_mut_log() + .insert(log_schema().timestamp_key_target_path().unwrap(), ts()); event.as_mut_log().insert("status", "42"); event.as_mut_log().insert("backtrace", "message"); let mut metadata = event.metadata().clone(); @@ -893,10 +888,9 @@ mod tests { ); let mut event = Event::Log(LogEvent::from("i am a log")); - event.as_mut_log().insert( - (PathPrefix::Event, log_schema().timestamp_key().unwrap()), - ts(), - ); + event + .as_mut_log() + .insert(log_schema().timestamp_key_target_path().unwrap(), ts()); event.as_mut_log().insert("status", "42"); event.as_mut_log().insert("backtrace", "message"); event.as_mut_log().insert("host", "local"); diff --git a/src/transforms/metric_to_log.rs b/src/transforms/metric_to_log.rs index a075573bd1a41..ed6b6d8c8abcd 100644 --- a/src/transforms/metric_to_log.rs +++ b/src/transforms/metric_to_log.rs @@ -309,17 +309,13 @@ impl MetricToLog { }) .unwrap_or_else(|| event::Value::Timestamp(Utc::now())); - if let Some(timestamp_key) = log_schema().timestamp_key() { - log.insert((PathPrefix::Event, timestamp_key), timestamp); - } + log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp); if let Some(host_tag) = &self.host_tag { if let Some(host_value) = log.remove_prune(host_tag.to_string().as_str(), true) { - if let Some(host_key) = log_schema().host_key() { - log.insert((PathPrefix::Event, host_key), host_value); - } + log.maybe_insert(log_schema().host_key_target_path(), host_value); } } }