Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: replace path tuples with actual target paths #18139

Merged
merged 1 commit into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions lib/codecs/src/decoding/format/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,8 @@ impl Deserializer for BytesDeserializer {

#[cfg(test)]
mod tests {
use vector_core::config::log_schema;
use vrl::value::Value;

use super::*;
use vrl::value::Value;
pront marked this conversation as resolved.
Show resolved Hide resolved

#[test]
fn deserialize_bytes_legacy_namespace() {
Expand All @@ -105,10 +103,7 @@ mod tests {
{
let event = events.next().unwrap();
let log = event.as_log();
assert_eq!(
log[log_schema().message_key().unwrap().to_string()],
"foo".into()
);
assert_eq!(*log.get_message().unwrap(), "foo".into());
}

assert_eq!(events.next(), None);
Expand Down
24 changes: 12 additions & 12 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ impl LogEvent {
let mut log = LogEvent::default();
log.maybe_insert(log_schema().message_key_target_path(), msg.into());

if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
log.insert(timestamp_key, Utc::now());
}

log
Expand Down Expand Up @@ -495,8 +495,8 @@ impl LogEvent {
match self.namespace() {
LogNamespace::Vector => self.get_by_meaning("message"),
LogNamespace::Legacy => log_schema()
.message_key()
.and_then(|key| self.get((PathPrefix::Event, key))),
.message_key_target_path()
.and_then(|key| self.get(key)),
}
}

Expand All @@ -506,8 +506,8 @@ impl LogEvent {
match self.namespace() {
LogNamespace::Vector => self.get_by_meaning("timestamp"),
LogNamespace::Legacy => log_schema()
.timestamp_key()
.and_then(|key| self.get((PathPrefix::Event, key))),
.timestamp_key_target_path()
.and_then(|key| self.get(key)),
}
}

Expand All @@ -525,8 +525,8 @@ impl LogEvent {
match self.namespace() {
LogNamespace::Vector => self.get_by_meaning("host"),
LogNamespace::Legacy => log_schema()
.host_key()
.and_then(|key| self.get((PathPrefix::Event, key))),
.host_key_target_path()
.and_then(|key| self.get(key)),
}
}

Expand All @@ -536,8 +536,8 @@ impl LogEvent {
match self.namespace() {
LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")),
LogNamespace::Legacy => log_schema()
.source_type_key()
.and_then(|key| self.get((PathPrefix::Event, key))),
.source_type_key_target_path()
.and_then(|key| self.get(key)),
}
}
}
Expand Down Expand Up @@ -568,8 +568,8 @@ mod test_utils {
fn from(message: Bytes) -> Self {
let mut log = LogEvent::default();
log.maybe_insert(log_schema().message_key_target_path(), message);
if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
log.insert(timestamp_key, Utc::now());
}
log
}
Expand Down
10 changes: 2 additions & 8 deletions src/sinks/clickhouse/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,7 @@ async fn insert_events_unix_timestamps() {
format!(
"{}",
exp_event
.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap()
))
.get_timestamp()
.unwrap()
.as_timestamp()
.unwrap()
Expand Down Expand Up @@ -242,10 +239,7 @@ timestamp_format = "unix""#,
format!(
"{}",
exp_event
.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap()
))
.get_timestamp()
.unwrap()
.as_timestamp()
.unwrap()
Expand Down
24 changes: 14 additions & 10 deletions src/source_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ mod errors;
use crate::config::{ComponentKey, OutputId};
use crate::schema::Definition;
pub use errors::{ClosedError, StreamSendError};
use lookup::PathPrefix;

pub(crate) const CHUNK_SIZE: usize = 1000;

Expand Down Expand Up @@ -356,18 +355,23 @@ impl Inner {
fn emit_lag_time(&self, event: EventRef<'_>, reference: i64) {
if let Some(lag_time_metric) = &self.lag_time {
let timestamp = match event {
EventRef::Log(log) => log_schema().timestamp_key().and_then(|timestamp_key| {
log.get((PathPrefix::Event, timestamp_key))
.and_then(get_timestamp_millis)
}),
EventRef::Log(log) => {
log_schema()
.timestamp_key_target_path()
.and_then(|timestamp_key| {
log.get(timestamp_key).and_then(get_timestamp_millis)
})
}
EventRef::Metric(metric) => metric
.timestamp()
.map(|timestamp| timestamp.timestamp_millis()),
EventRef::Trace(trace) => log_schema().timestamp_key().and_then(|timestamp_key| {
trace
.get((PathPrefix::Event, timestamp_key))
.and_then(get_timestamp_millis)
}),
EventRef::Trace(trace) => {
log_schema()
.timestamp_key_target_path()
.and_then(|timestamp_key| {
trace.get(timestamp_key).and_then(get_timestamp_millis)
})
}
};
if let Some(timestamp) = timestamp {
// This will truncate precision for values larger than 2**52, but at that point the user
Expand Down
10 changes: 2 additions & 8 deletions src/sources/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,15 +711,9 @@ mod integration_test {

let log = events[0].as_log();
trace!("{:?}", log);
assert_eq!(
log[log_schema().message_key().unwrap().to_string()],
"my message".into()
);
assert_eq!(*log.get_message().unwrap(), "my message".into());
assert_eq!(log["routing"], routing_key.into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"amqp".into()
);
assert_eq!(*log.get_source_type().unwrap(), "amqp".into());
let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
.as_timestamp()
.unwrap();
Expand Down
5 changes: 1 addition & 4 deletions src/sources/aws_sqs/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,7 @@ mod tests {
events[0]
.clone()
.as_log()
.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap()
))
.get_timestamp()
.unwrap()
.to_string_lossy(),
now.to_rfc3339_opts(SecondsFormat::AutoSi, true)
Expand Down
42 changes: 9 additions & 33 deletions src/sources/datadog_agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use vrl::value::Kind;
use crate::schema::Definition;
use crate::{
common::datadog::{DatadogMetricType, DatadogPoint, DatadogSeriesMetric},
config::{log_schema, SourceConfig, SourceContext},
config::{SourceConfig, SourceContext},
event::{
into_event_stream,
metric::{MetricKind, MetricSketch, MetricValue},
Expand Down Expand Up @@ -238,10 +238,7 @@ async fn full_payload_v1() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -303,10 +300,7 @@ async fn full_payload_v2() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -368,10 +362,7 @@ async fn no_api_key() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -432,10 +423,7 @@ async fn api_key_in_url() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -500,10 +488,7 @@ async fn api_key_in_query_params() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -574,10 +559,7 @@ async fn api_key_in_header() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -724,10 +706,7 @@ async fn ignores_api_key() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(
event.metadata().schema_definition(),
Expand Down Expand Up @@ -1419,10 +1398,7 @@ async fn split_outputs() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down
20 changes: 4 additions & 16 deletions src/sources/docker_logs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,7 @@ mod integration_tests {
.unwrap()
.assert_valid_for_event(&events[0]);
let log = events[0].as_log();
assert_eq!(
log[log_schema().message_key().unwrap().to_string()],
message.into()
);
assert_eq!(*log.get_message().unwrap(), message.into());
assert_eq!(log[CONTAINER], id.into());
assert!(log.get(CREATED_AT).is_some());
assert_eq!(log[IMAGE], "busybox".into());
Expand Down Expand Up @@ -640,10 +637,7 @@ mod integration_tests {
.unwrap()
.assert_valid_for_event(&events[0]);
let log = events[0].as_log();
assert_eq!(
log[log_schema().message_key().unwrap().to_string()],
message.into()
);
assert_eq!(*log.get_message().unwrap(), message.into());
assert_eq!(log[CONTAINER], id.into());
assert!(log.get(CREATED_AT).is_some());
assert_eq!(log[IMAGE], "busybox".into());
Expand Down Expand Up @@ -778,10 +772,7 @@ mod integration_tests {
.unwrap()
.assert_valid_for_event(&events[0]);
let log = events[0].as_log();
assert_eq!(
log[log_schema().message_key().unwrap().to_string()],
message.into()
);
assert_eq!(*log.get_message().unwrap(), message.into());
assert_eq!(log[CONTAINER], id.into());
assert!(log.get(CREATED_AT).is_some());
assert_eq!(log[IMAGE], "busybox".into());
Expand Down Expand Up @@ -830,10 +821,7 @@ mod integration_tests {
.unwrap()
.assert_valid_for_event(&events[0]);
let log = events[0].as_log();
assert_eq!(
log[log_schema().message_key().unwrap().to_string()],
message.into()
);
assert_eq!(*log.get_message().unwrap(), message.into());
})
.await;
}
Expand Down
Loading