Skip to content

Commit

Permalink
fix(http_server source): panic when http server receives metric events (
Browse files Browse the repository at this point in the history
#18781)

* fix: panic when http server receives metric events

* refactor
  • Loading branch information
pront authored Oct 10, 2023
1 parent 67c4beb commit 9a621e3
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 58 deletions.
106 changes: 55 additions & 51 deletions src/sources/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@ use std::{collections::HashMap, net::SocketAddr};

use bytes::{Bytes, BytesMut};
use chrono::Utc;
use http::{StatusCode, Uri};
use http_serde;
use tokio_util::codec::Decoder as _;
use vrl::value::{kind::Collection, Kind};
use warp::http::{HeaderMap, HeaderValue};

use codecs::{
decoding::{DeserializerConfig, FramingConfig},
BytesDecoderConfig, BytesDeserializerConfig, JsonDeserializerConfig,
NewlineDelimitedDecoderConfig,
};

use http::{StatusCode, Uri};
use http_serde;
use lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
use tokio_util::codec::Decoder as _;
use vector_config::configurable_component;
use vector_core::{
config::{DataType, LegacyKey, LogNamespace},
schema::Definition,
};
use vrl::value::{kind::Collection, Kind};
use warp::http::{HeaderMap, HeaderValue};

use crate::{
codecs::{Decoder, DecodingConfig},
Expand Down Expand Up @@ -385,37 +385,50 @@ struct SimpleHttpSource {
}

impl HttpSource for SimpleHttpSource {
/// Enriches the passed in events with metadata for the `request_path` and for each of the headers.
/// Enriches the log events with metadata for the `request_path` and for each of the headers.
/// Non-log events are skipped.
fn enrich_events(
&self,
events: &mut [Event],
request_path: &str,
headers_config: &HeaderMap,
query_parameters: &HashMap<String, String>,
) {
let now = Utc::now();
for event in events.iter_mut() {
let log = event.as_mut_log();

// add request_path to each event
self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
path!("path"),
request_path.to_owned(),
);

// add each header to each event
for header_name in &self.headers {
let value = headers_config.get(header_name).map(HeaderValue::as_bytes);
match event {
Event::Log(log) => {
// add request_path to each event
self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
path!("path"),
request_path.to_owned(),
);

self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!(header_name))),
path!("headers", header_name),
Value::from(value.map(Bytes::copy_from_slice)),
);
// add each header to each event
for header_name in &self.headers {
let value = headers_config.get(header_name).map(HeaderValue::as_bytes);

self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!(header_name))),
path!("headers", header_name),
Value::from(value.map(Bytes::copy_from_slice)),
);
}

self.log_namespace.insert_standard_vector_source_metadata(
log,
SimpleHttpConfig::NAME,
now,
);
}
_ => {
continue;
}
}
}

Expand All @@ -426,17 +439,6 @@ impl HttpSource for SimpleHttpSource {
self.log_namespace,
SimpleHttpConfig::NAME,
);

let now = Utc::now();
for event in events {
let log = event.as_mut_log();

self.log_namespace.insert_standard_vector_source_metadata(
log,
SimpleHttpConfig::NAME,
now,
);
}
}

fn build_events(
Expand Down Expand Up @@ -474,29 +476,29 @@ impl HttpSource for SimpleHttpSource {

#[cfg(test)]
mod tests {
use lookup::{event_path, owned_value_path, OwnedTargetPath};
use std::str::FromStr;
use std::{collections::BTreeMap, io::Write, net::SocketAddr};
use vector_core::config::LogNamespace;
use vector_core::event::LogEvent;
use vector_core::schema::Definition;
use vrl::value::kind::Collection;
use vrl::value::Kind;

use codecs::{
decoding::{DeserializerConfig, FramingConfig},
BytesDecoderConfig, JsonDeserializerConfig,
};
use flate2::{
write::{GzEncoder, ZlibEncoder},
Compression,
};
use futures::Stream;
use http::{HeaderMap, Method, StatusCode};
use lookup::lookup_v2::OptionalValuePath;
use similar_asserts::assert_eq;
use vrl::value::kind::Collection;
use vrl::value::Kind;

use codecs::{
decoding::{DeserializerConfig, FramingConfig},
BytesDecoderConfig, JsonDeserializerConfig,
};
use lookup::lookup_v2::OptionalValuePath;
use lookup::{event_path, owned_value_path, OwnedTargetPath};
use vector_core::config::LogNamespace;
use vector_core::event::LogEvent;
use vector_core::schema::Definition;

use super::{remove_duplicates, SimpleHttpConfig};
use crate::sources::http_server::HttpMethod;
use crate::{
config::{log_schema, SourceConfig, SourceContext},
Expand All @@ -508,6 +510,8 @@ mod tests {
SourceSender,
};

use super::{remove_duplicates, SimpleHttpConfig};

#[test]
fn generate_config() {
crate::test_util::test_generate_config::<SimpleHttpConfig>();
Expand Down
16 changes: 9 additions & 7 deletions src/sources/util/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ pub fn add_query_parameters(
for query_parameter_name in query_parameters_config {
let value = query_parameters.get(query_parameter_name);
for event in events.iter_mut() {
log_namespace.insert_source_metadata(
source_name,
event.as_mut_log(),
Some(LegacyKey::Overwrite(path!(query_parameter_name))),
path!("query_parameters"),
crate::event::Value::from(value.map(String::to_owned)),
);
if let Event::Log(log) = event {
log_namespace.insert_source_metadata(
source_name,
log,
Some(LegacyKey::Overwrite(path!(query_parameter_name))),
path!("query_parameters"),
crate::event::Value::from(value.map(String::to_owned)),
);
}
}
}
}

0 comments on commit 9a621e3

Please sign in to comment.