Skip to content

Commit

Permalink
more changes
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Aug 17, 2023
1 parent ebcefef commit bffd3b7
Show file tree
Hide file tree
Showing 24 changed files with 116 additions and 119 deletions.
7 changes: 7 additions & 0 deletions lib/vector-lookup/src/lookup_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ impl<'a> ValuePath<'a> for &'a ConfigValuePath {
}
}

#[cfg(any(test, feature = "test"))]
impl From<&str> for ConfigValuePath {
fn from(path: &str) -> Self {
ConfigValuePath::try_from(path.to_string()).unwrap()
}
}

/// A wrapper around `OwnedTargetPath` that allows it to be used in Vector config
/// with prefix default to `PathPrefix::Event`
#[configurable_component]
Expand Down
15 changes: 3 additions & 12 deletions src/codecs/encoding/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ mod test {
transformer.only_fields(),
&Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
);
assert_eq!(
transformer.except_fields(),
&Some(vec!["ignore_me".to_owned()])
);
assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
}

Expand Down Expand Up @@ -207,10 +204,7 @@ mod test {
transformer.only_fields(),
&Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
);
assert_eq!(
transformer.except_fields(),
&Some(vec!["ignore_me".to_owned()])
);
assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
}

Expand Down Expand Up @@ -239,10 +233,7 @@ mod test {
transformer.only_fields(),
&Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
);
assert_eq!(
transformer.except_fields(),
&Some(vec!["ignore_me".to_owned()])
);
assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
}
}
26 changes: 13 additions & 13 deletions src/codecs/encoding/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ impl<'de> Deserialize<'de> for Transformer {
Self::new(
inner
.only_fields
.map(|v| v.iter().map(|p| ConfigValuePath { 0: p.clone() }).collect()),
.map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()),
inner
.except_fields
.map(|v| v.iter().map(|p| ConfigValuePath { 0: p.clone() }).collect()),
.map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()),
inner.timestamp_format,
)
.map_err(serde::de::Error::custom)
Expand Down Expand Up @@ -156,18 +156,18 @@ impl Transformer {

fn apply_except_fields(&self, log: &mut LogEvent) {
if let Some(except_fields) = self.except_fields.as_ref() {
let service_path = log
.metadata()
.schema_definition()
.meaning_path(meaning::SERVICE);

for field in except_fields {
let value = log.remove((PathPrefix::Event, field));
let value_path = &field.0;
let value = log.remove((PathPrefix::Event, value_path));

let service_path = log
.metadata()
.schema_definition()
.meaning_path(meaning::SERVICE);
// If we are removing the service field we need to store this in a `dropped_fields` list as we may need to
// refer to this later when emitting metrics.
if let Some(v) = value {
if matches!(service_path, Some(target_path) if target_path.path == field.0) {
if let (Some(v), Some(service_path)) = (value, service_path) {
if service_path.path == *value_path {
log.metadata_mut()
.add_dropped_field(meaning::SERVICE.to_string(), v);
}
Expand Down Expand Up @@ -274,7 +274,7 @@ mod tests {
#[test]
fn deserialize_and_transform_except() {
let transformer: Transformer =
toml::from_str(r#"except_fields = ["a.b.c", "b", "c[0].y", "d\\.z", "e"]"#).unwrap();
toml::from_str(r#"except_fields = ["a.b.c", "b", "c[0].y", "d.z", "e"]"#).unwrap();
let mut log = LogEvent::default();
{
log.insert("a", 1);
Expand All @@ -285,7 +285,7 @@ mod tests {
log.insert("b[1].x", 1);
log.insert("c[0].x", 1);
log.insert("c[0].y", 1);
log.insert("d\\.z", 1);
log.insert("d.z", 1);
log.insert("e.a", 1);
log.insert("e.b", 1);
}
Expand All @@ -295,7 +295,7 @@ mod tests {
assert!(!event.as_mut_log().contains("b"));
assert!(!event.as_mut_log().contains("b[1].x"));
assert!(!event.as_mut_log().contains("c[0].y"));
assert!(!event.as_mut_log().contains("d\\.z"));
assert!(!event.as_mut_log().contains("d.z"));
assert!(!event.as_mut_log().contains("e.a"));

assert!(event.as_mut_log().contains("a.b.d"));
Expand Down
5 changes: 3 additions & 2 deletions src/internal_events/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use metrics::{counter, gauge};
use vector_core::{internal_event::InternalEvent, update_counter};
use vrl::path::OwnedTargetPath;

use vector_common::{
internal_event::{error_stage, error_type},
Expand Down Expand Up @@ -161,7 +162,7 @@ impl InternalEvent for KafkaStatisticsReceived<'_> {
}

pub struct KafkaHeaderExtractionError<'a> {
pub header_field: &'a str,
pub header_field: &'a OwnedTargetPath,
}

impl InternalEvent for KafkaHeaderExtractionError<'_> {
Expand All @@ -171,7 +172,7 @@ impl InternalEvent for KafkaHeaderExtractionError<'_> {
error_code = "extracting_header",
error_type = error_type::PARSER_FAILED,
stage = error_stage::RECEIVING,
header_field = self.header_field,
header_field = self.header_field.to_string(),
internal_log_rate_limit = true,
);
counter!(
Expand Down
27 changes: 11 additions & 16 deletions src/sinks/elasticsearch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::{
},
template::Template,
};
use lookup::owned_value_path;

// helper to unwrap template strings for tests only
fn parse_template(input: &str) -> Template {
Expand Down Expand Up @@ -53,7 +52,7 @@ async fn sets_create_action_when_configured() {
.request_builder
.encoder
.encode_input(
vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()],
vec![process_log(log, &es.mode, None, &config.encoding).unwrap()],
&mut encoded,
)
.unwrap();
Expand Down Expand Up @@ -129,7 +128,7 @@ async fn encode_datastream_mode() {
.request_builder
.encoder
.encode_input(
vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()],
vec![process_log(log, &es.mode, None, &config.encoding).unwrap()],
&mut encoded,
)
.unwrap();
Expand Down Expand Up @@ -187,7 +186,7 @@ async fn encode_datastream_mode_no_routing() {
.request_builder
.encoder
.encode_input(
vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()],
vec![process_log(log, &es.mode, None, &config.encoding).unwrap()],
&mut encoded,
)
.unwrap();
Expand Down Expand Up @@ -223,7 +222,7 @@ async fn handle_metrics() {
es.request_builder
.encoder
.encode_input(
vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()],
vec![process_log(log, &es.mode, None, &config.encoding).unwrap()],
&mut encoded,
)
.unwrap();
Expand Down Expand Up @@ -339,7 +338,7 @@ async fn encode_datastream_mode_no_sync() {
.request_builder
.encoder
.encode_input(
vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()],
vec![process_log(log, &es.mode, None, &config.encoding).unwrap()],
&mut encoded,
)
.unwrap();
Expand All @@ -358,12 +357,8 @@ async fn allows_using_except_fields() {
index: parse_template("{{ idx }}"),
..Default::default()
},
encoding: Transformer::new(
None,
Some(vec!["idx".to_string(), "timestamp".to_string()]),
None,
)
.unwrap(),
encoding: Transformer::new(None, Some(vec!["idx".into(), "timestamp".into()]), None)
.unwrap(),
endpoints: vec![String::from("https://example.com")],
api_version: ElasticsearchApiVersion::V6,
..Default::default()
Expand All @@ -379,7 +374,7 @@ async fn allows_using_except_fields() {
.request_builder
.encoder
.encode_input(
vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()],
vec![process_log(log, &es.mode, None, &config.encoding).unwrap()],
&mut encoded,
)
.unwrap();
Expand All @@ -398,7 +393,7 @@ async fn allows_using_only_fields() {
index: parse_template("{{ idx }}"),
..Default::default()
},
encoding: Transformer::new(Some(vec![owned_value_path!("foo")]), None, None).unwrap(),
encoding: Transformer::new(Some(vec!["foo".into()]), None, None).unwrap(),
endpoints: vec![String::from("https://example.com")],
api_version: ElasticsearchApiVersion::V6,
..Default::default()
Expand All @@ -414,7 +409,7 @@ async fn allows_using_only_fields() {
.request_builder
.encoder
.encode_input(
vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()],
vec![process_log(log, &es.mode, None, &config.encoding).unwrap()],
&mut encoded,
)
.unwrap();
Expand Down Expand Up @@ -540,7 +535,7 @@ async fn datastream_index_name() {
),
);

let processed_event = process_log(log, &es.mode, &None, &config.encoding).unwrap();
let processed_event = process_log(log, &es.mode, None, &config.encoding).unwrap();
assert_eq!(processed_event.index, test_case.want, "{test_case:?}");
}
}
4 changes: 2 additions & 2 deletions src/sinks/humio/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use codecs::JsonSerializerConfig;
use futures::StreamExt;
use futures_util::stream::BoxStream;
use indoc::indoc;
use lookup::lookup_v2::OptionalValuePath;
use lookup::lookup_v2::{ConfigValuePath, OptionalValuePath};
use vector_common::sensitive_string::SensitiveString;
use vector_config::configurable_component;
use vector_core::sink::StreamSink;
Expand Down Expand Up @@ -97,7 +97,7 @@ pub struct HumioMetricsConfig {
///
/// [humio_data_format]: https://docs.humio.com/integrations/data-shippers/hec/#format-of-data
#[serde(default)]
indexed_fields: Vec<String>,
indexed_fields: Vec<ConfigValuePath>,

/// Optional name of the repository to ingest into.
///
Expand Down
9 changes: 6 additions & 3 deletions src/sinks/kafka/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{collections::HashMap, time::Duration};

use codecs::JsonSerializerConfig;
use futures::FutureExt;
use lookup::lookup_v2::ConfigTargetPath;
use rdkafka::ClientConfig;
use serde_with::serde_as;
use vector_config::configurable_component;
Expand Down Expand Up @@ -53,7 +54,9 @@ pub struct KafkaSinkConfig {
/// no key.
#[configurable(metadata(docs::advanced))]
#[configurable(metadata(docs::examples = "user_id"))]
pub key_field: Option<String>,
#[configurable(metadata(docs::examples = ".my_topic"))]
#[configurable(metadata(docs::examples = "%my_topic"))]
pub key_field: Option<ConfigTargetPath>,

#[configurable(derived)]
pub encoding: EncodingConfig,
Expand Down Expand Up @@ -108,7 +111,7 @@ pub struct KafkaSinkConfig {
#[configurable(metadata(docs::advanced))]
#[serde(alias = "headers_field")] // accidentally released as `headers_field` in 0.18
#[configurable(metadata(docs::examples = "headers"))]
pub headers_key: Option<String>,
pub headers_key: Option<ConfigTargetPath>,

#[configurable(derived)]
#[serde(
Expand Down Expand Up @@ -249,7 +252,7 @@ impl GenerateConfig for KafkaSinkConfig {
toml::Value::try_from(Self {
bootstrap_servers: "10.14.22.123:9092,10.14.23.332:9092".to_owned(),
topic: Template::try_from("topic-1234".to_owned()).unwrap(),
key_field: Some("user_id".to_owned()),
key_field: Some(ConfigTargetPath::try_from("user_id".to_owned()).unwrap()),
encoding: JsonSerializerConfig::default().into(),
batch: Default::default(),
compression: KafkaCompression::None,
Expand Down
31 changes: 15 additions & 16 deletions src/sinks/kafka/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::num::NonZeroUsize;
use bytes::{Bytes, BytesMut};
use rdkafka::message::{Header, OwnedHeaders};
use tokio_util::codec::Encoder as _;
use vrl::path::OwnedTargetPath;

use crate::{
codecs::{Encoder, Transformer},
Expand All @@ -16,8 +17,8 @@ use crate::{
};

pub struct KafkaRequestBuilder {
pub key_field: Option<String>,
pub headers_key: Option<String>,
pub key_field: Option<OwnedTargetPath>,
pub headers_key: Option<OwnedTargetPath>,
pub topic_template: Template,
pub transformer: Transformer,
pub encoder: Encoder<()>,
Expand All @@ -39,9 +40,9 @@ impl KafkaRequestBuilder {

let metadata = KafkaRequestMetadata {
finalizers: event.take_finalizers(),
key: get_key(&event, &self.key_field),
key: get_key(&event, self.key_field.as_ref()),
timestamp_millis: get_timestamp_millis(&event),
headers: get_headers(&event, &self.headers_key),
headers: get_headers(&event, self.headers_key.as_ref()),
topic,
};
self.transformer.transform(&mut event);
Expand All @@ -64,14 +65,12 @@ impl KafkaRequestBuilder {
}
}

fn get_key(event: &Event, key_field: &Option<String>) -> Option<Bytes> {
key_field.as_ref().and_then(|key_field| match event {
Event::Log(log) => log
.get(key_field.as_str())
.map(|value| value.coerce_to_bytes()),
fn get_key(event: &Event, key_field: Option<&OwnedTargetPath>) -> Option<Bytes> {
key_field.and_then(|key_field| match event {
Event::Log(log) => log.get(key_field).map(|value| value.coerce_to_bytes()),
Event::Metric(metric) => metric
.tags()
.and_then(|tags| tags.get(key_field))
.and_then(|tags| tags.get(key_field.to_string().as_str()))
.map(|value| value.to_owned().into()),
_ => None,
})
Expand All @@ -86,10 +85,10 @@ fn get_timestamp_millis(event: &Event) -> Option<i64> {
.map(|ts| ts.timestamp_millis())
}

fn get_headers(event: &Event, headers_key: &Option<String>) -> Option<OwnedHeaders> {
headers_key.as_ref().and_then(|headers_key| {
fn get_headers(event: &Event, headers_key: Option<&OwnedTargetPath>) -> Option<OwnedHeaders> {
headers_key.and_then(|headers_key| {
if let Event::Log(log) = event {
if let Some(headers) = log.get(headers_key.as_str()) {
if let Some(headers) = log.get(headers_key) {
match headers {
Value::Object(headers_map) => {
let mut owned_headers = OwnedHeaders::new_with_capacity(headers_map.len());
Expand Down Expand Up @@ -131,15 +130,15 @@ mod tests {

#[test]
fn kafka_get_headers() {
let headers_key = "headers";
let headers_key = OwnedTargetPath::try_from("headers".to_string()).unwrap();
let mut header_values = BTreeMap::new();
header_values.insert("a-key".to_string(), Value::Bytes(Bytes::from("a-value")));
header_values.insert("b-key".to_string(), Value::Bytes(Bytes::from("b-value")));

let mut event = Event::Log(LogEvent::from("hello"));
event.as_mut_log().insert(headers_key, header_values);
event.as_mut_log().insert(&headers_key, header_values);

let headers = get_headers(&event, &Some(headers_key.to_string())).unwrap();
let headers = get_headers(&event, Some(&headers_key)).unwrap();
assert_eq!(headers.get(0).key, "a-key");
assert_eq!(headers.get(0).value.unwrap(), "a-value".as_bytes());
assert_eq!(headers.get(1).key, "b-key");
Expand Down
Loading

0 comments on commit bffd3b7

Please sign in to comment.