Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Oct 5, 2023
1 parent 553966e commit ea3573c
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 55 deletions.
74 changes: 32 additions & 42 deletions src/sources/http_server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{collections::HashMap, net::SocketAddr};

use bytes::{Bytes, BytesMut};
use chrono::{DateTime, Utc};
use chrono::Utc;
use http::{StatusCode, Uri};
use http_serde;
use tokio_util::codec::Decoder as _;
Expand All @@ -15,7 +15,6 @@ use codecs::{
};
use lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
use vector_config::configurable_component;
use vector_core::event::LogEvent;
use vector_core::{
config::{DataType, LegacyKey, LogNamespace},
schema::Definition,
Expand Down Expand Up @@ -386,39 +385,8 @@ struct SimpleHttpSource {
}

impl HttpSource for SimpleHttpSource {
fn enrich_log(
&self,
log: &mut LogEvent,
now: DateTime<Utc>,
request_path: &str,
headers_config: &HeaderMap,
) {
// add request_path to each event
self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
path!("path"),
request_path.to_owned(),
);

// add each header to each event
for header_name in &self.headers {
let value = headers_config.get(header_name).map(HeaderValue::as_bytes);

self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!(header_name))),
path!("headers", header_name),
Value::from(value.map(Bytes::copy_from_slice)),
);
}

self.log_namespace
.insert_standard_vector_source_metadata(log, SimpleHttpConfig::NAME, now);
}
/// Enriches the passed in events with metadata for the `request_path` and for each of the headers.
/// Enriches the log events with metadata for the `request_path` and for each of the headers.
/// Non-log events are skipped.
fn enrich_events(
&self,
events: &mut [Event],
Expand All @@ -429,14 +397,36 @@ impl HttpSource for SimpleHttpSource {
let now = Utc::now();
for event in events.iter_mut() {
match event {
Event::Log(log_event) => {
self.enrich_log(log_event, now, request_path, headers_config);
}
Event::Metric(_) => {
// TODO discuss with reviewer what we want to do for metric and trace events
continue;
Event::Log(log) => {
// add request_path to each event
self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
path!("path"),
request_path.to_owned(),
);

// add each header to each event
for header_name in &self.headers {
let value = headers_config.get(header_name).map(HeaderValue::as_bytes);

self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!(header_name))),
path!("headers", header_name),
Value::from(value.map(Bytes::copy_from_slice)),
);
}

self.log_namespace.insert_standard_vector_source_metadata(
log,
SimpleHttpConfig::NAME,
now,
);
}
Event::Trace(_) => {
_ => {
continue;
}
}
Expand Down
22 changes: 9 additions & 13 deletions src/sources/util/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,15 @@ pub fn add_query_parameters(
for query_parameter_name in query_parameters_config {
let value = query_parameters.get(query_parameter_name);
for event in events.iter_mut() {
let log = if let Event::Log(ref mut log_event) = event {
log_event
} else {
continue;
};

log_namespace.insert_source_metadata(
source_name,
log,
Some(LegacyKey::Overwrite(path!(query_parameter_name))),
path!("query_parameters"),
crate::event::Value::from(value.map(String::to_owned)),
);
if let Event::Log(log) = event {
log_namespace.insert_source_metadata(
source_name,
log,
Some(LegacyKey::Overwrite(path!(query_parameter_name))),
path!("query_parameters"),
crate::event::Value::from(value.map(String::to_owned)),
);
}
}
}
}

0 comments on commit ea3573c

Please sign in to comment.