Skip to content

Commit

Permalink
more changes
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Aug 16, 2023
1 parent 80608e3 commit c674027
Show file tree
Hide file tree
Showing 21 changed files with 87 additions and 86 deletions.
7 changes: 7 additions & 0 deletions lib/vector-lookup/src/lookup_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ impl<'a> ValuePath<'a> for &'a ConfigValuePath {
}
}

// #[cfg(any(test, feature = "test"))]
impl From<&str> for ConfigValuePath {
fn from(path: &str) -> Self {
ConfigValuePath::try_from(path.to_string()).unwrap()
}
}

/// A wrapper around `OwnedTargetPath` that allows it to be used in Vector config
/// with prefix default to `PathPrefix::Event`
#[configurable_component]
Expand Down
15 changes: 3 additions & 12 deletions src/codecs/encoding/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ mod test {
transformer.only_fields(),
&Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
);
assert_eq!(
transformer.except_fields(),
&Some(vec!["ignore_me".to_owned()])
);
assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
}

Expand Down Expand Up @@ -207,10 +204,7 @@ mod test {
transformer.only_fields(),
&Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
);
assert_eq!(
transformer.except_fields(),
&Some(vec!["ignore_me".to_owned()])
);
assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
}

Expand Down Expand Up @@ -239,10 +233,7 @@ mod test {
transformer.only_fields(),
&Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
);
assert_eq!(
transformer.except_fields(),
&Some(vec!["ignore_me".to_owned()])
);
assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
}
}
5 changes: 3 additions & 2 deletions src/internal_events/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use metrics::{counter, gauge};
use vector_core::{internal_event::InternalEvent, update_counter};
use vrl::path::OwnedTargetPath;

use vector_common::{
internal_event::{error_stage, error_type},
Expand Down Expand Up @@ -161,7 +162,7 @@ impl InternalEvent for KafkaStatisticsReceived<'_> {
}

pub struct KafkaHeaderExtractionError<'a> {
pub header_field: &'a str,
pub header_field: &'a OwnedTargetPath,
}

impl InternalEvent for KafkaHeaderExtractionError<'_> {
Expand All @@ -171,7 +172,7 @@ impl InternalEvent for KafkaHeaderExtractionError<'_> {
error_code = "extracting_header",
error_type = error_type::PARSER_FAILED,
stage = error_stage::RECEIVING,
header_field = self.header_field,
header_field = self.header_field.to_string(),
internal_log_rate_limit = true,
);
counter!(
Expand Down
27 changes: 11 additions & 16 deletions src/sinks/elasticsearch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::{
},
template::Template,
};
use lookup::owned_value_path;

// helper to unwrap template strings for tests only
fn parse_template(input: &str) -> Template {
Expand Down Expand Up @@ -53,7 +52,7 @@ async fn sets_create_action_when_configured() {
.request_builder
.encoder
.encode_input(
vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()],
vec![process_log(log, &es.mode, None, &config.encoding).unwrap()],
&mut encoded,
)
.unwrap();
Expand Down Expand Up @@ -129,7 +128,7 @@ async fn encode_datastream_mode() {
.request_builder
.encoder
.encode_input(
vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()],
vec![process_log(log, &es.mode, None, &config.encoding).unwrap()],
&mut encoded,
)
.unwrap();
Expand Down Expand Up @@ -187,7 +186,7 @@ async fn encode_datastream_mode_no_routing() {
.request_builder
.encoder
.encode_input(
vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()],
vec![process_log(log, &es.mode, None, &config.encoding).unwrap()],
&mut encoded,
)
.unwrap();
Expand Down Expand Up @@ -223,7 +222,7 @@ async fn handle_metrics() {
es.request_builder
.encoder
.encode_input(
vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()],
vec![process_log(log, &es.mode, None, &config.encoding).unwrap()],
&mut encoded,
)
.unwrap();
Expand Down Expand Up @@ -339,7 +338,7 @@ async fn encode_datastream_mode_no_sync() {
.request_builder
.encoder
.encode_input(
vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()],
vec![process_log(log, &es.mode, None, &config.encoding).unwrap()],
&mut encoded,
)
.unwrap();
Expand All @@ -358,12 +357,8 @@ async fn allows_using_except_fields() {
index: parse_template("{{ idx }}"),
..Default::default()
},
encoding: Transformer::new(
None,
Some(vec!["idx".to_string(), "timestamp".to_string()]),
None,
)
.unwrap(),
encoding: Transformer::new(None, Some(vec!["idx".into(), "timestamp".into()]), None)
.unwrap(),
endpoints: vec![String::from("https://example.com")],
api_version: ElasticsearchApiVersion::V6,
..Default::default()
Expand All @@ -379,7 +374,7 @@ async fn allows_using_except_fields() {
.request_builder
.encoder
.encode_input(
vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()],
vec![process_log(log, &es.mode, None, &config.encoding).unwrap()],
&mut encoded,
)
.unwrap();
Expand All @@ -398,7 +393,7 @@ async fn allows_using_only_fields() {
index: parse_template("{{ idx }}"),
..Default::default()
},
encoding: Transformer::new(Some(vec![owned_value_path!("foo")]), None, None).unwrap(),
encoding: Transformer::new(Some(vec!["foo".into()]), None, None).unwrap(),
endpoints: vec![String::from("https://example.com")],
api_version: ElasticsearchApiVersion::V6,
..Default::default()
Expand All @@ -414,7 +409,7 @@ async fn allows_using_only_fields() {
.request_builder
.encoder
.encode_input(
vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()],
vec![process_log(log, &es.mode, None, &config.encoding).unwrap()],
&mut encoded,
)
.unwrap();
Expand Down Expand Up @@ -540,7 +535,7 @@ async fn datastream_index_name() {
),
);

let processed_event = process_log(log, &es.mode, &None, &config.encoding).unwrap();
let processed_event = process_log(log, &es.mode, None, &config.encoding).unwrap();
assert_eq!(processed_event.index, test_case.want, "{test_case:?}");
}
}
4 changes: 2 additions & 2 deletions src/sinks/humio/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use codecs::JsonSerializerConfig;
use futures::StreamExt;
use futures_util::stream::BoxStream;
use indoc::indoc;
use lookup::lookup_v2::OptionalValuePath;
use lookup::lookup_v2::{ConfigValuePath, OptionalValuePath};
use vector_common::sensitive_string::SensitiveString;
use vector_config::configurable_component;
use vector_core::sink::StreamSink;
Expand Down Expand Up @@ -97,7 +97,7 @@ pub struct HumioMetricsConfig {
///
/// [humio_data_format]: https://docs.humio.com/integrations/data-shippers/hec/#format-of-data
#[serde(default)]
indexed_fields: Vec<String>,
indexed_fields: Vec<ConfigValuePath>,

/// Optional name of the repository to ingest into.
///
Expand Down
9 changes: 6 additions & 3 deletions src/sinks/kafka/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{collections::HashMap, time::Duration};

use codecs::JsonSerializerConfig;
use futures::FutureExt;
use lookup::lookup_v2::ConfigTargetPath;
use rdkafka::ClientConfig;
use serde_with::serde_as;
use vector_config::configurable_component;
Expand Down Expand Up @@ -53,7 +54,9 @@ pub struct KafkaSinkConfig {
/// no key.
#[configurable(metadata(docs::advanced))]
#[configurable(metadata(docs::examples = "user_id"))]
pub key_field: Option<String>,
#[configurable(metadata(docs::examples = ".my_topic"))]
#[configurable(metadata(docs::examples = "%my_topic"))]
pub key_field: Option<ConfigTargetPath>,

#[configurable(derived)]
pub encoding: EncodingConfig,
Expand Down Expand Up @@ -108,7 +111,7 @@ pub struct KafkaSinkConfig {
#[configurable(metadata(docs::advanced))]
#[serde(alias = "headers_field")] // accidentally released as `headers_field` in 0.18
#[configurable(metadata(docs::examples = "headers"))]
pub headers_key: Option<String>,
pub headers_key: Option<ConfigTargetPath>,

#[configurable(derived)]
#[serde(
Expand Down Expand Up @@ -249,7 +252,7 @@ impl GenerateConfig for KafkaSinkConfig {
toml::Value::try_from(Self {
bootstrap_servers: "10.14.22.123:9092,10.14.23.332:9092".to_owned(),
topic: Template::try_from("topic-1234".to_owned()).unwrap(),
key_field: Some("user_id".to_owned()),
key_field: Some(ConfigTargetPath::try_from("user_id".to_owned()).unwrap()),
encoding: JsonSerializerConfig::default().into(),
batch: Default::default(),
compression: KafkaCompression::None,
Expand Down
31 changes: 15 additions & 16 deletions src/sinks/kafka/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::num::NonZeroUsize;
use bytes::{Bytes, BytesMut};
use rdkafka::message::{Header, OwnedHeaders};
use tokio_util::codec::Encoder as _;
use vrl::path::OwnedTargetPath;

use crate::{
codecs::{Encoder, Transformer},
Expand All @@ -16,8 +17,8 @@ use crate::{
};

pub struct KafkaRequestBuilder {
pub key_field: Option<String>,
pub headers_key: Option<String>,
pub key_field: Option<OwnedTargetPath>,
pub headers_key: Option<OwnedTargetPath>,
pub topic_template: Template,
pub transformer: Transformer,
pub encoder: Encoder<()>,
Expand All @@ -39,9 +40,9 @@ impl KafkaRequestBuilder {

let metadata = KafkaRequestMetadata {
finalizers: event.take_finalizers(),
key: get_key(&event, &self.key_field),
key: get_key(&event, self.key_field.as_ref()),
timestamp_millis: get_timestamp_millis(&event),
headers: get_headers(&event, &self.headers_key),
headers: get_headers(&event, self.headers_key.as_ref()),
topic,
};
self.transformer.transform(&mut event);
Expand All @@ -64,14 +65,12 @@ impl KafkaRequestBuilder {
}
}

fn get_key(event: &Event, key_field: &Option<String>) -> Option<Bytes> {
key_field.as_ref().and_then(|key_field| match event {
Event::Log(log) => log
.get(key_field.as_str())
.map(|value| value.coerce_to_bytes()),
fn get_key(event: &Event, key_field: Option<&OwnedTargetPath>) -> Option<Bytes> {
key_field.and_then(|key_field| match event {
Event::Log(log) => log.get(key_field).map(|value| value.coerce_to_bytes()),
Event::Metric(metric) => metric
.tags()
.and_then(|tags| tags.get(key_field))
.and_then(|tags| tags.get(key_field.to_string().as_str()))
.map(|value| value.to_owned().into()),
_ => None,
})
Expand All @@ -86,10 +85,10 @@ fn get_timestamp_millis(event: &Event) -> Option<i64> {
.map(|ts| ts.timestamp_millis())
}

fn get_headers(event: &Event, headers_key: &Option<String>) -> Option<OwnedHeaders> {
headers_key.as_ref().and_then(|headers_key| {
fn get_headers(event: &Event, headers_key: Option<&OwnedTargetPath>) -> Option<OwnedHeaders> {
headers_key.and_then(|headers_key| {
if let Event::Log(log) = event {
if let Some(headers) = log.get(headers_key.as_str()) {
if let Some(headers) = log.get(headers_key) {
match headers {
Value::Object(headers_map) => {
let mut owned_headers = OwnedHeaders::new_with_capacity(headers_map.len());
Expand Down Expand Up @@ -131,15 +130,15 @@ mod tests {

#[test]
fn kafka_get_headers() {
let headers_key = "headers";
let headers_key = OwnedTargetPath::try_from("headers".to_string()).unwrap();
let mut header_values = BTreeMap::new();
header_values.insert("a-key".to_string(), Value::Bytes(Bytes::from("a-value")));
header_values.insert("b-key".to_string(), Value::Bytes(Bytes::from("b-value")));

let mut event = Event::Log(LogEvent::from("hello"));
event.as_mut_log().insert(headers_key, header_values);
event.as_mut_log().insert(&headers_key, header_values);

let headers = get_headers(&event, &Some(headers_key.to_string())).unwrap();
let headers = get_headers(&event, Some(&headers_key)).unwrap();
assert_eq!(headers.get(0).key, "a-key");
assert_eq!(headers.get(0).value.unwrap(), "a-value".as_bytes());
assert_eq!(headers.get(1).key, "b-key");
Expand Down
9 changes: 5 additions & 4 deletions src/sinks/kafka/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use rdkafka::{
use snafu::{ResultExt, Snafu};
use tokio::time::Duration;
use tower::limit::ConcurrencyLimit;
use vrl::path::OwnedTargetPath;

use super::config::{KafkaRole, KafkaSinkConfig};
use crate::{
Expand All @@ -32,8 +33,8 @@ pub struct KafkaSink {
encoder: Encoder<()>,
service: KafkaService,
topic: Template,
key_field: Option<String>,
headers_key: Option<String>,
key_field: Option<OwnedTargetPath>,
headers_key: Option<OwnedTargetPath>,
}

pub(crate) fn create_producer(
Expand All @@ -54,12 +55,12 @@ impl KafkaSink {
let encoder = Encoder::<()>::new(serializer);

Ok(KafkaSink {
headers_key: config.headers_key,
headers_key: config.headers_key.map(|key| key.0),
transformer,
encoder,
service: KafkaService::new(producer),
topic: config.topic,
key_field: config.key_field,
key_field: config.key_field.map(|key| key.0),
})
}

Expand Down
5 changes: 3 additions & 2 deletions src/sinks/kafka/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod integration_test {
use bytes::Bytes;
use codecs::TextSerializerConfig;
use futures::StreamExt;
use lookup::lookup_v2::ConfigTargetPath;
use rdkafka::{
consumer::{BaseConsumer, Consumer},
message::Headers,
Expand Down Expand Up @@ -302,7 +303,7 @@ mod integration_test {
}

let topic = format!("test-{}", random_string(10));
let headers_key = "headers_key".to_string();
let headers_key = ConfigTargetPath::try_from("headers_key".to_string()).unwrap();
let kafka_auth = KafkaAuthConfig { sasl, tls };
let config = KafkaSinkConfig {
bootstrap_servers: server.clone(),
Expand Down Expand Up @@ -335,7 +336,7 @@ mod integration_test {
Value::Bytes(Bytes::from(header_1_value)),
);
events.iter_logs_mut().for_each(move |log| {
log.insert(headers_key.as_str(), header_values.clone());
log.insert(&headers_key, header_values.clone());
});
events
});
Expand Down
5 changes: 4 additions & 1 deletion src/sinks/loki/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use once_cell::sync::Lazy;
use regex::Regex;
use snafu::Snafu;
use tokio_util::codec::Encoder as _;
use vrl::path::parse_target_path;

use super::{
config::{LokiConfig, OutOfOrderAction},
Expand Down Expand Up @@ -239,7 +240,9 @@ impl EventEncoder {
for template in self.labels.values() {
if let Some(fields) = template.get_fields() {
for field in fields {
event.as_mut_log().remove(field.as_str());
if let Some(path) = parse_target_path(field.as_str()).ok() {
event.as_mut_log().remove(&path);
}
}
}
}
Expand Down
Loading

0 comments on commit c674027

Please sign in to comment.