Skip to content

Commit

Permalink
feat: replace tuples with &OwnedTargetPath wherever possible (vectord…
Browse files Browse the repository at this point in the history
…otdev#18097)

* feat: replace tuples with &OwnedTargetPath wherever possible

* one more replacement

* remove unused imports
  • Loading branch information
pront authored Jul 27, 2023
1 parent 065eecb commit 28f5c23
Show file tree
Hide file tree
Showing 19 changed files with 67 additions and 112 deletions.
10 changes: 2 additions & 8 deletions benches/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ fn bench_elasticsearch_index(c: &mut Criterion) {
let index = Template::try_from("index-%Y.%m.%d").unwrap();
let mut event = Event::Log(LogEvent::from("hello world"));
event.as_mut_log().insert(
(
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
),
log_schema().timestamp_key_target_path().unwrap(),
Utc::now(),
);

Expand All @@ -31,10 +28,7 @@ fn bench_elasticsearch_index(c: &mut Criterion) {
let index = Template::try_from("index").unwrap();
let mut event = Event::Log(LogEvent::from("hello world"));
event.as_mut_log().insert(
(
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
),
log_schema().timestamp_key_target_path().unwrap(),
Utc::now(),
);

Expand Down
3 changes: 1 addition & 2 deletions lib/codecs/src/decoding/format/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use vector_core::{
event::{Event, LogEvent},
schema,
};
use vrl::path::PathPrefix;
use vrl::value::Kind;

use super::Deserializer;
Expand Down Expand Up @@ -63,7 +62,7 @@ impl BytesDeserializer {
LogNamespace::Vector => log_namespace.new_log_from_data(bytes),
LogNamespace::Legacy => {
let mut log = LogEvent::default();
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), bytes);
log.maybe_insert(log_schema().message_key_target_path(), bytes);
log
}
}
Expand Down
15 changes: 6 additions & 9 deletions lib/codecs/src/decoding/format/gelf.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::Bytes;
use chrono::{DateTime, NaiveDateTime, Utc};
use derivative::Derivative;
use lookup::{event_path, owned_value_path, PathPrefix};
use lookup::{event_path, owned_value_path};
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use std::collections::HashMap;
Expand Down Expand Up @@ -130,20 +130,17 @@ impl GelfDeserializer {
log.insert(FULL_MESSAGE, full_message.to_string());
}

if let Some(timestamp_key) = log_schema().timestamp_key() {
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
if let Some(timestamp) = parsed.timestamp {
let naive = NaiveDateTime::from_timestamp_opt(
f64::trunc(timestamp) as i64,
f64::fract(timestamp) as u32,
)
.expect("invalid timestamp");
log.insert(
(PathPrefix::Event, timestamp_key),
DateTime::<Utc>::from_utc(naive, Utc),
);
log.insert(timestamp_key, DateTime::<Utc>::from_utc(naive, Utc));
// per GELF spec- add timestamp if not provided
} else {
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
log.insert(timestamp_key, Utc::now());
}
}

Expand Down Expand Up @@ -293,7 +290,7 @@ mod tests {
Some(&Value::Bytes(Bytes::from_static(b"example.org")))
);
assert_eq!(
log.get((PathPrefix::Event, log_schema().message_key().unwrap())),
log.get(log_schema().message_key_target_path().unwrap()),
Some(&Value::Bytes(Bytes::from_static(
b"A short message that helps you identify what is going on"
)))
Expand Down Expand Up @@ -348,7 +345,7 @@ mod tests {
let events = deserialize_gelf_input(&input).unwrap();
assert_eq!(events.len(), 1);
let log = events[0].as_log();
assert!(log.contains((PathPrefix::Event, log_schema().message_key().unwrap())));
assert!(log.contains(log_schema().message_key_target_path().unwrap()));
}

// filter out id
Expand Down
9 changes: 4 additions & 5 deletions lib/codecs/src/decoding/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::convert::TryInto;
use bytes::Bytes;
use chrono::Utc;
use derivative::Derivative;
use lookup::PathPrefix;
use smallvec::{smallvec, SmallVec};
use vector_config::configurable_component;
use vector_core::{
Expand Down Expand Up @@ -133,11 +132,11 @@ impl Deserializer for JsonDeserializer {
LogNamespace::Legacy => {
let timestamp = Utc::now();

if let Some(timestamp_key) = log_schema().timestamp_key() {
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
for event in &mut events {
let log = event.as_mut_log();
if !log.contains((PathPrefix::Event, timestamp_key)) {
log.insert((PathPrefix::Event, timestamp_key), timestamp);
if !log.contains(timestamp_key) {
log.insert(timestamp_key, timestamp);
}
}
}
Expand Down Expand Up @@ -218,7 +217,7 @@ mod tests {
let log = event.as_log();
assert_eq!(log["bar"], 456.into());
assert_eq!(
log.get((PathPrefix::Event, log_schema().timestamp_key().unwrap()))
log.get(log_schema().timestamp_key_target_path().unwrap())
.is_some(),
namespace == LogNamespace::Legacy
);
Expand Down
8 changes: 3 additions & 5 deletions lib/codecs/src/decoding/format/syslog.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::Bytes;
use chrono::{DateTime, Datelike, Utc};
use derivative::Derivative;
use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix};
use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath};
use smallvec::{smallvec, SmallVec};
use std::borrow::Cow;
use std::collections::BTreeMap;
Expand Down Expand Up @@ -428,7 +428,7 @@ fn insert_fields_from_syslog(
) {
match log_namespace {
LogNamespace::Legacy => {
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), parsed.msg);
log.maybe_insert(log_schema().message_key_target_path(), parsed.msg);
}
LogNamespace::Vector => {
log.insert(event_path!("message"), parsed.msg);
Expand All @@ -439,9 +439,7 @@ fn insert_fields_from_syslog(
let timestamp = DateTime::<Utc>::from(timestamp);
match log_namespace {
LogNamespace::Legacy => {
if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), timestamp);
}
log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
}
LogNamespace::Vector => {
log.insert(event_path!("timestamp"), timestamp);
Expand Down
3 changes: 1 addition & 2 deletions lib/opentelemetry-proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use vector_core::{
config::{log_schema, LegacyKey, LogNamespace},
event::{Event, LogEvent},
};
use vrl::path::PathPrefix;
use vrl::value::Value;

use super::proto::{
Expand Down Expand Up @@ -95,7 +94,7 @@ impl ResourceLog {
LogNamespace::Legacy => {
let mut log = LogEvent::default();
if let Some(v) = self.log_record.body.and_then(|av| av.value) {
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), v);
log.maybe_insert(log_schema().message_key_target_path(), v);
}
log
}
Expand Down
15 changes: 5 additions & 10 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use vector_common::{
request_metadata::GetEventCountTags,
EventDataEq,
};
use vrl::path::{OwnedTargetPath, OwnedValuePath};
use vrl::path::OwnedTargetPath;

use super::{
estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf,
Expand Down Expand Up @@ -160,7 +160,7 @@ impl LogEvent {
/// valid for `LogNamespace::Legacy`
pub fn from_str_legacy(msg: impl Into<String>) -> Self {
let mut log = LogEvent::default();
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), msg.into());
log.maybe_insert(log_schema().message_key_target_path(), msg.into());

if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
Expand Down Expand Up @@ -356,14 +356,9 @@ impl LogEvent {
}
}

pub fn maybe_insert(
&mut self,
prefix: PathPrefix,
path: Option<&OwnedValuePath>,
value: impl Into<Value>,
) {
pub fn maybe_insert<'a>(&mut self, path: Option<impl TargetPath<'a>>, value: impl Into<Value>) {
if let Some(path) = path {
self.insert((prefix, path), value);
self.insert(path, value);
}
}

Expand Down Expand Up @@ -572,7 +567,7 @@ mod test_utils {
impl From<Bytes> for LogEvent {
fn from(message: Bytes) -> Self {
let mut log = LogEvent::default();
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), message);
log.maybe_insert(log_schema().message_key_target_path(), message);
if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
}
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/event/vrl_target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct TargetIter<T> {

fn create_log_event(value: Value, metadata: EventMetadata) -> LogEvent {
let mut log = LogEvent::new_with_metadata(metadata);
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), value);
log.maybe_insert(log_schema().message_key_target_path(), value);
log
}

Expand Down
5 changes: 1 addition & 4 deletions src/sinks/humio/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,7 @@ mod integration_tests {
);

let ts = Utc.timestamp_nanos(Utc::now().timestamp_millis() * 1_000_000 + 132_456);
event.insert(
(PathPrefix::Event, log_schema().timestamp_key().unwrap()),
ts,
);
event.insert(log_schema().timestamp_key_target_path().unwrap(), ts);

run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;

Expand Down
9 changes: 4 additions & 5 deletions src/sinks/loki/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use vector_core::{
config::LogNamespace,
event::{BatchNotifier, BatchStatus, Event, LogEvent},
};
use vrl::path::PathPrefix;
use vrl::value::{kind::Collection, Kind};

use super::config::{LokiConfig, OutOfOrderAction};
Expand Down Expand Up @@ -328,7 +327,7 @@ async fn many_streams() {
let index = (i % 5) * 2;
let message = lines[index]
.as_log()
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
.get(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy();
assert_eq!(output, &message);
Expand All @@ -338,7 +337,7 @@ async fn many_streams() {
let index = ((i % 5) * 2) + 1;
let message = lines[index]
.as_log()
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
.get(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy();
assert_eq!(output, &message);
Expand Down Expand Up @@ -385,7 +384,7 @@ async fn interpolate_stream_key() {
for (i, output) in outputs.iter().enumerate() {
let message = lines[i]
.as_log()
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
.get(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy();
assert_eq!(output, &message);
Expand Down Expand Up @@ -638,7 +637,7 @@ async fn test_out_of_order_events(
assert_eq!(
&expected[i]
.as_log()
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
.get(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy(),
output,
Expand Down
3 changes: 1 addition & 2 deletions src/sources/aws_sqs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use aws_sdk_sqs::output::CreateQueueOutput;
use aws_types::region::Region;
use futures::StreamExt;
use tokio::time::timeout;
use vrl::path::PathPrefix;

use crate::{
aws::{auth::AwsAuthentication, region::RegionOrEndpoint},
Expand Down Expand Up @@ -110,7 +109,7 @@ pub(crate) async fn test() {
for event in events {
let message = event
.as_log()
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
.get(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy();
if !expected_messages.remove(message.as_ref()) {
Expand Down
10 changes: 4 additions & 6 deletions src/sources/aws_sqs/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,12 @@ async fn delete_messages(client: SqsClient, receipts: Vec<String>, queue_url: St

#[cfg(test)]
mod tests {
use crate::codecs::DecodingConfig;
use chrono::SecondsFormat;
use lookup::path;
use vrl::path::PathPrefix;

use super::*;
use crate::codecs::DecodingConfig;
use crate::config::{log_schema, SourceConfig};
use crate::sources::aws_sqs::AwsSqsConfig;
use chrono::SecondsFormat;
use lookup::path;

#[tokio::test]
async fn test_decode_vector_namespace() {
Expand Down Expand Up @@ -313,7 +311,7 @@ mod tests {
events[0]
.clone()
.as_log()
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
.get(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy(),
message
Expand Down
2 changes: 1 addition & 1 deletion src/sources/docker_logs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ mod integration_tests {

event
.into_log()
.remove((PathPrefix::Event, log_schema().message_key().unwrap()))
.remove(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy()
.into_owned()
Expand Down
6 changes: 2 additions & 4 deletions src/sources/fluent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use chrono::Utc;
use codecs::{BytesDeserializerConfig, StreamDecodingError};
use flate2::read::MultiGzDecoder;
use lookup::lookup_v2::parse_value_path;
use lookup::{metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix};
use lookup::{metadata_path, owned_value_path, path, OwnedValuePath};
use rmp_serde::{decode, Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
Expand Down Expand Up @@ -599,9 +599,7 @@ impl From<FluentEvent<'_>> for LogEvent {
log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
}
LogNamespace::Legacy => {
if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), timestamp);
}
log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
}
}

Expand Down
8 changes: 3 additions & 5 deletions src/sources/journald.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bytes::Bytes;
use chrono::{TimeZone, Utc};
use codecs::{decoding::BoxedFramingError, CharacterDelimitedDecoder};
use futures::{poll, stream::BoxStream, task::Poll, StreamExt};
use lookup::{metadata_path, owned_value_path, path, PathPrefix};
use lookup::{metadata_path, owned_value_path, path};
use nix::{
sys::signal::{kill, Signal},
unistd::Pid,
Expand Down Expand Up @@ -741,9 +741,7 @@ fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) {
}
LogNamespace::Legacy => {
if let Some(ts) = timestamp {
if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), ts);
}
log.maybe_insert(log_schema().timestamp_key_target_path(), ts);
}
}
}
Expand Down Expand Up @@ -784,7 +782,7 @@ fn create_log_event_from_record(
let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch);

if let Some(message) = log.remove(MESSAGE) {
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), message);
log.maybe_insert(log_schema().message_key_target_path(), message);
}

log
Expand Down
Loading

0 comments on commit 28f5c23

Please sign in to comment.