Skip to content

Commit

Permalink
feat: disable vrl 'string_path' feature
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Aug 11, 2023
1 parent 79d3a3f commit 243f6a9
Show file tree
Hide file tree
Showing 17 changed files with 98 additions and 78 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ members = [
]

[workspace.dependencies]
vrl = { version = "0.6.0", features = ["cli", "test", "test_framework", "arbitrary"] }
vrl = { version = "0.6.0", default-features = false, features = ["cli", "test", "test_framework", "arbitrary", "compiler", "value", "diagnostic", "path", "parser", "stdlib", "datadog", "core"] }

[dependencies]
vrl.workspace = true
Expand Down Expand Up @@ -368,6 +368,8 @@ tokio-test = "0.4.2"
tokio = { version = "1.30.0", features = ["test-util"] }
tower-test = "0.4.0"
vector-core = { path = "lib/vector-core", default-features = false, features = ["vrl", "test"] }
vrl = { version = "0.6.0", features = ["cli", "test", "test_framework", "arbitrary"] }

wiremock = "0.5.19"
zstd = { version = "0.12.4", default-features = false }

Expand Down
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/format/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ mod tests {
let events = deserialize_gelf_input(&input).unwrap();
assert_eq!(events.len(), 1);
let log = events[0].as_log();
assert!(!log.contains("_id"));
assert!(!log.contains(event_path!("_id")));
}
}

Expand Down
5 changes: 4 additions & 1 deletion lib/codecs/src/encoding/format/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ fn to_gelf_event(log: LogEvent) -> vector_common::Result<LogEvent> {
coerce_field_names_and_values(log).map(|(mut log, missing_prefix)| {
// rename additional fields that were flagged as missing the underscore prefix
for field in missing_prefix {
log.rename_key(event_path!(field.as_str()), format!("_{}", &field).as_str());
log.rename_key(
event_path!(field.as_str()),
event_path!(format!("_{}", &field).as_str()),
);
}
log
})
Expand Down
7 changes: 4 additions & 3 deletions src/api/schema/events/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::borrow::Cow;
use async_graphql::Object;
use chrono::{DateTime, Utc};
use vector_common::encode_logfmt;
use vrl::event_path;

use super::EventEncodingType;
use crate::{event, topology::TapOutput};
Expand All @@ -19,11 +20,11 @@ impl Log {
}

pub fn get_message(&self) -> Option<Cow<'_, str>> {
Some(self.event.get("message")?.to_string_lossy())
Some(self.event.get(event_path!("message"))?.to_string_lossy())
}

pub fn get_timestamp(&self) -> Option<&DateTime<Utc>> {
self.event.get("timestamp")?.as_timestamp()
self.event.get(event_path!("timestamp"))?.as_timestamp()
}
}

Expand Down Expand Up @@ -69,7 +70,7 @@ impl Log {

/// Get JSON field data on the log event, by field name
async fn json(&self, field: String) -> Option<String> {
self.event.get(field.as_str()).map(|field| {
self.event.get(event_path!(field.as_str())).map(|field| {
serde_json::to_string(field)
.expect("JSON serialization of trace event field failed. Please report.")
})
Expand Down
14 changes: 7 additions & 7 deletions src/sinks/datadog/events/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,34 +50,34 @@ where
async fn ensure_required_fields(event: Event) -> Option<Event> {
let mut log = event.into_log();

if !log.contains("title") {
if !log.contains(event_path!("title")) {
emit!(ParserMissingFieldError::<DROP_EVENT> { field: "title" });
return None;
}

if !log.contains("text") {
if !log.contains(event_path!("text")) {
let message_path = log
.message_path()
.expect("message is required (make sure the \"message\" semantic meaning is set)")
.clone();
log.rename_key(&message_path, event_path!("text"));
}

if !log.contains("host") {
if !log.contains(event_path!("host")) {
if let Some(host_path) = log.host_path().cloned().as_ref() {
log.rename_key(host_path, event_path!("host"));
}
}

if !log.contains("date_happened") {
if !log.contains(event_path!("date_happened")) {
if let Some(timestamp_path) = log.timestamp_path().cloned().as_ref() {
log.rename_key(timestamp_path, "date_happened");
log.rename_key(timestamp_path, event_path!("date_happened"));
}
}

if !log.contains("source_type_name") {
if !log.contains(event_path!("source_type_name")) {
if let Some(source_type_path) = log.source_type_path().cloned().as_ref() {
log.rename_key(source_type_path, "source_type_name");
log.rename_key(source_type_path, event_path!("source_type_name"));
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/sinks/mezmo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use http::{Request, StatusCode, Uri};
use serde_json::json;
use vector_common::sensitive_string::SensitiveString;
use vector_config::configurable_component;
use vrl::event_path;
use vrl::value::{Kind, Value};

use crate::{
Expand Down Expand Up @@ -269,15 +270,15 @@ impl HttpEventEncoder<PartitionInnerBuffer<serde_json::Value, PartitionKey>> for
map.insert("line".to_string(), json!(line));
map.insert("timestamp".to_string(), json!(timestamp));

if let Some(env) = log.remove("env") {
if let Some(env) = log.remove(event_path!("env")) {
map.insert("env".to_string(), json!(env));
}

if let Some(app) = log.remove("app") {
if let Some(app) = log.remove(event_path!("app")) {
map.insert("app".to_string(), json!(app));
}

if let Some(file) = log.remove("file") {
if let Some(file) = log.remove(event_path!("file")) {
map.insert("file".to_string(), json!(file));
}

Expand Down
5 changes: 3 additions & 2 deletions src/sinks/sematext/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use futures::stream::{BoxStream, StreamExt};
use indoc::indoc;
use vector_common::sensitive_string::SensitiveString;
use vector_config::configurable_component;
use vrl::event_path;

use super::Region;
use crate::{
Expand Down Expand Up @@ -144,11 +145,11 @@ fn map_timestamp(mut events: EventArray) -> EventArray {
EventArray::Logs(logs) => {
for log in logs {
if let Some(path) = log.timestamp_path().cloned().as_ref() {
log.rename_key(path, "@timestamp");
log.rename_key(path, event_path!("@timestamp"));
}

if let Some(path) = log.host_path().cloned().as_ref() {
log.rename_key(path, "os.host");
log.rename_key(path, event_path!("os.host"));
}
}
}
Expand Down
15 changes: 6 additions & 9 deletions src/sources/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use codecs::decoding::{DeserializerConfig, FramingConfig};
use futures::{FutureExt, StreamExt};
use futures_util::Stream;
use lapin::{acker::Acker, message::Delivery, Channel};
use lookup::{lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path, PathPrefix};
use lookup::{lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path};
use snafu::Snafu;
use std::{io::Cursor, pin::Pin};
use tokio_util::codec::FramedRead;
Expand Down Expand Up @@ -253,7 +253,7 @@ fn populate_event(
.path
.as_ref()
.map(LegacyKey::InsertIfEmpty),
"routing",
path!("routing"),
keys.routing.to_string(),
);

Expand All @@ -264,15 +264,15 @@ fn populate_event(
.path
.as_ref()
.map(LegacyKey::InsertIfEmpty),
"exchange",
path!("exchange"),
keys.exchange.to_string(),
);

log_namespace.insert_source_metadata(
AmqpSourceConfig::NAME,
log,
keys.offset_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
"offset",
path!("offset"),
keys.delivery_tag,
);

Expand All @@ -298,11 +298,8 @@ fn populate_event(
log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
}
LogNamespace::Legacy => {
if let Some(timestamp_key) = log_schema().timestamp_key() {
log.try_insert(
(PathPrefix::Event, timestamp_key),
timestamp.unwrap_or_else(Utc::now),
);
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
log.try_insert(timestamp_key, timestamp.unwrap_or_else(Utc::now));
}
}
};
Expand Down
27 changes: 14 additions & 13 deletions src/sources/docker_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use vector_common::internal_event::{
};
use vector_config::configurable_component;
use vector_core::config::{LegacyKey, LogNamespace};
use vrl::event_path;
use vrl::value::{kind::Collection, Kind};

use super::util::MultilineConfig;
Expand Down Expand Up @@ -1270,21 +1271,24 @@ fn line_agg_adapter(
) -> impl Stream<Item = LogEvent> {
let line_agg_in = inner.map(move |mut log| {
let message_value = match log_namespace {
LogNamespace::Vector => log.remove(".").expect("`.` must exist in the event"),
LogNamespace::Vector => log
.remove(event_path!())
.expect("`.` must exist in the event"),
LogNamespace::Legacy => log
.remove((
PathPrefix::Event,
.remove(
log_schema()
.message_key()
.message_key_target_path()
.expect("global log_schema.message_key to be valid path"),
))
)
.expect("`message` must exist in the event"),
};
let stream_value = match log_namespace {
LogNamespace::Vector => log
.get(metadata_path!(DockerLogsConfig::NAME, STREAM))
.expect("`docker_logs.stream` must exist in the metadata"),
LogNamespace::Legacy => log.get(STREAM).expect("stream must exist in the event"),
LogNamespace::Legacy => log
.get(event_path!(STREAM))
.expect("stream must exist in the event"),
};

let stream = stream_value.coerce_to_bytes();
Expand All @@ -1294,14 +1298,11 @@ fn line_agg_adapter(
let line_agg_out = LineAgg::<_, Bytes, LogEvent>::new(line_agg_in, logic);
line_agg_out.map(move |(_, message, mut log)| {
match log_namespace {
LogNamespace::Vector => log.insert(".", message),
LogNamespace::Vector => log.insert(event_path!(), message),
LogNamespace::Legacy => log.insert(
(
PathPrefix::Event,
log_schema()
.message_key()
.expect("global log_schema.message_key to be valid path"),
),
log_schema()
.message_key_target_path()
.expect("global log_schema.message_key to be valid path"),
message,
),
};
Expand Down
9 changes: 5 additions & 4 deletions src/sources/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use vector_common::internal_event::{
use vector_common::{byte_size_of::ByteSizeOf, finalizer::UnorderedFinalizer};
use vector_config::configurable_component;
use vector_core::config::{LegacyKey, LogNamespace};
use vrl::path;
use vrl::value::{kind::Collection, Kind};

use crate::{
Expand Down Expand Up @@ -687,15 +688,15 @@ impl PubsubSource {
log_namespace.insert_source_metadata(
PubsubConfig::NAME,
log,
Some(LegacyKey::Overwrite("message_id")),
"message_id",
Some(LegacyKey::Overwrite(path!("message_id"))),
path!("message_id"),
message.message_id.clone(),
);
log_namespace.insert_source_metadata(
PubsubConfig::NAME,
log,
Some(LegacyKey::Overwrite("attributes")),
"attributes",
Some(LegacyKey::Overwrite(path!("attributes"))),
path!("attributes"),
attributes.clone(),
)
}
Expand Down
2 changes: 1 addition & 1 deletion src/sources/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ async fn nats_source(
NatsSourceConfig::NAME,
log,
legacy_subject_key_field,
"subject",
&owned_value_path!("subject"),
msg.subject.as_str(),
)
}
Expand Down
10 changes: 5 additions & 5 deletions src/sources/splunk_hec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> {
self.log_namespace.insert_vector_metadata(
&mut log,
log_schema().source_type_key(),
lookup::path!("source_type"),
&owned_value_path!("source_type"),
SplunkConfig::NAME,
);

Expand All @@ -685,15 +685,15 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> {
self.log_namespace.insert_source_metadata(
SplunkConfig::NAME,
&mut log,
Some(LegacyKey::Overwrite(CHANNEL)),
Some(LegacyKey::Overwrite(&owned_value_path!(CHANNEL))),
CHANNEL,
guid,
);
} else if let Some(guid) = self.channel.as_ref() {
self.log_namespace.insert_source_metadata(
SplunkConfig::NAME,
&mut log,
Some(LegacyKey::Overwrite(CHANNEL)),
Some(LegacyKey::Overwrite(&owned_value_path!(CHANNEL))),
CHANNEL,
guid.clone(),
);
Expand All @@ -705,7 +705,7 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> {
self.log_namespace.insert_source_metadata(
SplunkConfig::NAME,
&mut log,
Some(LegacyKey::Overwrite(key.as_str())),
Some(LegacyKey::Overwrite(&owned_value_path!(key.as_str()))),
key.as_str(),
value,
);
Expand Down Expand Up @@ -1009,7 +1009,7 @@ fn raw_event(
log_namespace.insert_source_metadata(
SplunkConfig::NAME,
&mut log,
Some(LegacyKey::Overwrite(CHANNEL)),
Some(LegacyKey::Overwrite(&owned_value_path!(CHANNEL))),
CHANNEL,
channel,
);
Expand Down
7 changes: 4 additions & 3 deletions src/sources/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use smallvec::SmallVec;
use tokio_util::udp::UdpFramed;
use vector_config::configurable_component;
use vector_core::config::{LegacyKey, LogNamespace};
use vrl::event_path;

#[cfg(unix)]
use crate::sources::util::build_unix_stream_source;
Expand Down Expand Up @@ -394,14 +395,14 @@ fn enrich_syslog_event(
log_namespace.insert_source_metadata(
SyslogConfig::NAME,
log,
Some(LegacyKey::Overwrite("source_ip")),
Some(LegacyKey::Overwrite(path!("source_ip"))),
path!("source_ip"),
default_host.clone(),
);
}

let parsed_hostname = log
.get("hostname")
.get(event_path!("hostname"))
.map(|hostname| hostname.coerce_to_bytes());

if let Some(parsed_host) = parsed_hostname.or(default_host) {
Expand All @@ -420,7 +421,7 @@ fn enrich_syslog_event(

if log_namespace == LogNamespace::Legacy {
let timestamp = log
.get("timestamp")
.get(event_path!("timestamp"))
.and_then(|timestamp| timestamp.as_timestamp().cloned())
.unwrap_or_else(Utc::now);
log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
Expand Down
Loading

0 comments on commit 243f6a9

Please sign in to comment.