From a333c5563630a38301ba632a712f90a96ca61e02 Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Wed, 12 May 2021 10:03:09 -0400 Subject: [PATCH] chore: temporarily revert 7325 Revert "enhancement(datadog source, http source): Add acknowledgement support (#7325)" This appears to be causing CI to hang. I figured reverting was better than letting it pile up. This reverts commit 838bdb227c42ee42aee5f70e45706d5f0ae6d29a. Signed-off-by: Jesse Szwedko --- lib/vector-core/src/event/finalization.rs | 25 +-- lib/vector-core/src/event/log_event.rs | 4 - lib/vector-core/src/event/metadata.rs | 5 - lib/vector-core/src/event/metric.rs | 6 +- lib/vector-core/src/event/mod.rs | 9 - src/pipeline.rs | 22 +- src/sources/datadog/logs.rs | 128 ++++------- src/sources/heroku_logs.rs | 2 +- src/sources/http.rs | 262 ++++++---------------- src/sources/prometheus/remote_write.rs | 8 +- src/sources/util/body_decoding.rs | 23 +- src/sources/util/http.rs | 41 +--- src/test_util/mod.rs | 16 -- 13 files changed, 138 insertions(+), 413 deletions(-) diff --git a/lib/vector-core/src/event/finalization.rs b/lib/vector-core/src/event/finalization.rs index 7cd6bda5c812f..5b2de2eab7077 100644 --- a/lib/vector-core/src/event/finalization.rs +++ b/lib/vector-core/src/event/finalization.rs @@ -2,7 +2,6 @@ use atomig::{Atom, AtomInteger, Atomic, Ordering}; use serde::{Deserialize, Serialize}; -use std::iter::{self, ExactSizeIterator}; use std::{mem, sync::Arc}; use tokio::sync::oneshot; @@ -30,33 +29,19 @@ impl EventFinalizers { Self(vec![Arc::new(finalizer)].into()) } - /// Add a single finalizer to this array. - pub fn add(&mut self, finalizer: EventFinalizer) { - self.add_generic(iter::once(Arc::new(finalizer))); - } - /// Merge the given list of finalizers into this array. pub fn merge(&mut self, other: Self) { - // Box<[T]> is missing IntoIterator; this just adds a `capacity` value - let other: Vec<_> = other.0.into(); - self.add_generic(other.into_iter()); - } - - fn add_generic(&mut self, items: I) - where - I: ExactSizeIterator>, - { - if self.0.is_empty() { - self.0 = items.collect::>().into(); - } else if items.len() > 0 { + if !other.0.is_empty() { // This requires a bit of extra work both to avoid cloning // the actual elements and because `self.0` cannot be // mutated in place. let finalizers = mem::replace(&mut self.0, vec![].into()); let mut result: Vec<_> = finalizers.into(); // This is the only step that may cause a (re)allocation. - result.reserve_exact(items.len()); - for entry in items { + result.reserve_exact(other.0.len()); + // Box<[T]> is missing IntoIterator + let other: Vec<_> = other.0.into(); + for entry in other { // Deduplicate by hand, assume the list is trivially small if !result.iter().any(|existing| Arc::ptr_eq(existing, &entry)) { result.push(entry); diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index 8721a9010eb81..74fc763ecab1c 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -75,10 +75,6 @@ impl LogEvent { Self::from_parts(fields, metadata.with_finalizer(EventFinalizer::new(batch))) } - pub fn add_finalizer(&mut self, finalizer: EventFinalizer) { - self.metadata.add_finalizer(finalizer); - } - #[instrument(level = "trace", skip(self, key), fields(key = %key.as_ref()))] pub fn get(&self, key: impl AsRef) -> Option<&Value> { util::log::get(self.as_map(), key.as_ref()) diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index bf17f54bd863b..15739db17050f 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -33,11 +33,6 @@ impl EventMetadata { pub fn update_sources(&mut self) { self.finalizers.update_sources(); } - - /// Add a new finalizer to the array - pub fn add_finalizer(&mut self, finalizer: EventFinalizer) { - self.finalizers.add(finalizer); - } } impl EventDataEq for EventMetadata { diff --git a/lib/vector-core/src/event/metric.rs b/lib/vector-core/src/event/metric.rs index 348ba7039b55d..9e2236753e914 100644 --- a/lib/vector-core/src/event/metric.rs +++ b/lib/vector-core/src/event/metric.rs @@ -1,4 +1,4 @@ -use super::{EventFinalizer, EventMetadata}; +use super::EventMetadata; use crate::metrics::Handle; use chrono::{DateTime, Utc}; use derive_is_enum_variant::is_enum_variant; @@ -275,10 +275,6 @@ impl Metric { self } - pub fn add_finalizer(&mut self, finalizer: EventFinalizer) { - self.metadata.add_finalizer(finalizer); - } - pub fn with_tags(mut self, tags: Option) -> Self { self.series.tags = tags; self diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index d916c3ec02eed..816941ffbd9ab 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -12,7 +12,6 @@ use shared::EventDataEq; use std::collections::{BTreeMap, HashMap}; use std::convert::{TryFrom, TryInto}; use std::fmt::Debug; -use std::sync::Arc; use tracing::field::{Field, Visit}; pub use util::log::PathComponent; pub use util::log::PathIter; @@ -139,14 +138,6 @@ impl Event { Self::Metric(metric) => metric.metadata_mut(), } } - - pub fn add_batch_notifier(&mut self, batch: Arc) { - let finalizer = EventFinalizer::new(batch); - match self { - Self::Log(log) => log.add_finalizer(finalizer), - Self::Metric(metric) => metric.add_finalizer(finalizer), - } - } } impl EventDataEq for Event { diff --git a/src/pipeline.rs b/src/pipeline.rs index 2747ab91605a1..430f1299e04cf 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -1,11 +1,6 @@ -use crate::{internal_events::EventOut, transforms::FunctionTransform}; +use crate::{event::Event, internal_events::EventOut, transforms::FunctionTransform}; use futures::{channel::mpsc, task::Poll, Sink}; -#[cfg(test)] -use futures::{Stream, StreamExt}; use std::{collections::VecDeque, fmt, pin::Pin, task::Context}; -use vector_core::event::Event; -#[cfg(test)] -use vector_core::event::EventStatus; #[derive(Debug)] pub struct ClosedError; @@ -109,21 +104,6 @@ impl Pipeline { Self::new_with_buffer(100, vec![]) } - #[cfg(test)] - pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_with_buffer(100, vec![]); - // In a source test pipeline, there is no sink to acknowledge - // events, so we have to add a map to the receiver to handle the - // finalization. - let recv = recv.map(move |mut event| { - let metadata = event.metadata_mut(); - metadata.update_status(status); - metadata.update_sources(); - event - }); - (pipe, recv) - } - pub fn new_with_buffer( n: usize, inlines: Vec>, diff --git a/src/sources/datadog/logs.rs b/src/sources/datadog/logs.rs index 6c2957a66dfee..c15f6d125e479 100644 --- a/src/sources/datadog/logs.rs +++ b/src/sources/datadog/logs.rs @@ -79,7 +79,7 @@ impl SourceConfig for DatadogLogsConfig { struct DatadogLogsSource {} impl HttpSource for DatadogLogsSource { - fn build_events( + fn build_event( &self, body: Bytes, header_map: HeaderMap, @@ -100,7 +100,7 @@ impl HttpSource for DatadogLogsSource { decode_body(body, Encoding::Json).map(|mut events| { // Add source type & Datadog API key let key = log_schema().source_type_key(); - for event in &mut events { + for event in events.iter_mut() { let log = event.as_mut_log(); log.try_insert(key, Bytes::from("datadog_logs")); if let Some(k) = &api_key { @@ -128,11 +128,11 @@ mod tests { use crate::{ config::{log_schema, SourceConfig, SourceContext}, - event::{Event, EventStatus}, - test_util::{next_addr, spawn_collect_n, trace_init, wait_for_tcp}, + event::Event, + test_util::{collect_n, next_addr, trace_init, wait_for_tcp}, Pipeline, }; - use futures::Stream; + use futures::channel::mpsc; use http::HeaderMap; use pretty_assertions::assert_eq; use std::net::SocketAddr; @@ -142,8 +142,8 @@ mod tests { crate::test_util::test_generate_config::(); } - async fn source(status: EventStatus) -> (impl Stream, SocketAddr) { - let (sender, recv) = Pipeline::new_test_finalize(status); + async fn source() -> (mpsc::Receiver, SocketAddr) { + let (sender, recv) = Pipeline::new_test(); let address = next_addr(); tokio::spawn(async move { DatadogLogsConfig { @@ -181,26 +181,20 @@ mod tests { #[tokio::test] async fn no_api_key() { trace_init(); - let (rx, addr) = source(EventStatus::Delivered).await; + let (rx, addr) = source().await; - let mut events = spawn_collect_n( - async move { - assert_eq!( - 200, - send_with_path( - addr, - r#"[{"message":"foo", "timestamp": 123}]"#, - HeaderMap::new(), - "/v1/input/" - ) - .await - ); - }, - rx, - 1, - ) - .await; + assert_eq!( + 200, + send_with_path( + addr, + r#"[{"message":"foo", "timestamp": 123}]"#, + HeaderMap::new(), + "/v1/input/" + ) + .await + ); + let mut events = collect_n(rx, 1).await; { let event = events.remove(0); let log = event.as_log(); @@ -214,26 +208,20 @@ mod tests { #[tokio::test] async fn api_key_in_url() { trace_init(); - let (rx, addr) = source(EventStatus::Delivered).await; + let (rx, addr) = source().await; - let mut events = spawn_collect_n( - async move { - assert_eq!( - 200, - send_with_path( - addr, - r#"[{"message":"bar", "timestamp": 456}]"#, - HeaderMap::new(), - "/v1/input/12345678abcdefgh12345678abcdefgh" - ) - .await - ); - }, - rx, - 1, - ) - .await; + assert_eq!( + 200, + send_with_path( + addr, + r#"[{"message":"bar", "timestamp": 456}]"#, + HeaderMap::new(), + "/v1/input/12345678abcdefgh12345678abcdefgh" + ) + .await + ); + let mut events = collect_n(rx, 1).await; { let event = events.remove(0); let log = event.as_log(); @@ -247,7 +235,7 @@ mod tests { #[tokio::test] async fn api_key_in_header() { trace_init(); - let (rx, addr) = source(EventStatus::Delivered).await; + let (rx, addr) = source().await; let mut headers = HeaderMap::new(); headers.insert( @@ -255,24 +243,18 @@ mod tests { "12345678abcdefgh12345678abcdefgh".parse().unwrap(), ); - let mut events = spawn_collect_n( - async move { - assert_eq!( - 200, - send_with_path( - addr, - r#"[{"message":"baz", "timestamp": 789}]"#, - headers, - "/v1/input/" - ) - .await - ); - }, - rx, - 1, - ) - .await; + assert_eq!( + 200, + send_with_path( + addr, + r#"[{"message":"baz", "timestamp": 789}]"#, + headers, + "/v1/input/" + ) + .await + ); + let mut events = collect_n(rx, 1).await; { let event = events.remove(0); let log = event.as_log(); @@ -282,28 +264,4 @@ mod tests { assert_eq!(log[log_schema().source_type_key()], "datadog_logs".into()); } } - - #[tokio::test] - async fn delivery_failure() { - trace_init(); - let (rx, addr) = source(EventStatus::Failed).await; - - spawn_collect_n( - async move { - assert_eq!( - 400, - send_with_path( - addr, - r#"[{"message":"foo", "timestamp": 123}]"#, - HeaderMap::new(), - "/v1/input/" - ) - .await - ); - }, - rx, - 1, - ) - .await; - } } diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index 369448a1a58f8..4af02c95aa74b 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -55,7 +55,7 @@ struct LogplexSource { } impl HttpSource for LogplexSource { - fn build_events( + fn build_event( &self, body: Bytes, header_map: HeaderMap, diff --git a/src/sources/http.rs b/src/sources/http.rs index f0df3cc8d17ca..7594a5b30378d 100644 --- a/src/sources/http.rs +++ b/src/sources/http.rs @@ -72,7 +72,7 @@ struct SimpleHttpSource { } impl HttpSource for SimpleHttpSource { - fn build_events( + fn build_event( &self, body: Bytes, header_map: HeaderMap, @@ -86,7 +86,7 @@ impl HttpSource for SimpleHttpSource { .map(|mut events| { // Add source type let key = log_schema().source_type_key(); - for event in &mut events { + for event in events.iter_mut() { event.as_mut_log().try_insert(key, Bytes::from("http")); } events @@ -162,15 +162,15 @@ mod tests { use super::{Encoding, SimpleHttpConfig}; use crate::{ config::{log_schema, SourceConfig, SourceContext}, - event::{Event, EventStatus, Value}, - test_util::{next_addr, spawn_collect_n, trace_init, wait_for_tcp}, + event::{Event, Value}, + test_util::{collect_n, next_addr, trace_init, wait_for_tcp}, Pipeline, }; use flate2::{ write::{DeflateEncoder, GzEncoder}, Compression, }; - use futures::Stream; + use futures::channel::mpsc; use http::HeaderMap; use pretty_assertions::assert_eq; use std::collections::BTreeMap; @@ -189,9 +189,8 @@ mod tests { path_key: &str, path: &str, strict_path: bool, - status: EventStatus, - ) -> (impl Stream, SocketAddr) { - let (sender, recv) = Pipeline::new_test_finalize(status); + ) -> (mpsc::Receiver, SocketAddr) { + let (sender, recv) = Pipeline::new_test(); let address = next_addr(); let path = path.to_owned(); let path_key = path_key.to_owned(); @@ -274,33 +273,17 @@ mod tests { .as_u16() } - async fn spawn_ok_collect_n( - send: impl std::future::Future + Send + 'static, - rx: impl Stream + Unpin, - n: usize, - ) -> Vec { - spawn_collect_n(async move { assert_eq!(200, send.await) }, rx, n).await - } - #[tokio::test] async fn http_multiline_text() { trace_init(); let body = "test body\n\ntest body 2"; - let (rx, addr) = source( - Encoding::default(), - vec![], - vec![], - "http_path", - "/", - true, - EventStatus::Delivered, - ) - .await; + let (rx, addr) = source(Encoding::default(), vec![], vec![], "http_path", "/", true).await; - let mut events = spawn_ok_collect_n(send(addr, body), rx, 2).await; + assert_eq!(200, send(addr, body).await); + let mut events = collect_n(rx, 2).await; { let event = events.remove(0); let log = event.as_log(); @@ -326,19 +309,11 @@ mod tests { //same as above test but with a newline at the end let body = "test body\n\ntest body 2\n"; - let (rx, addr) = source( - Encoding::default(), - vec![], - vec![], - "http_path", - "/", - true, - EventStatus::Delivered, - ) - .await; + let (rx, addr) = source(Encoding::default(), vec![], vec![], "http_path", "/", true).await; - let mut events = spawn_ok_collect_n(send(addr, body), rx, 2).await; + assert_eq!(200, send(addr, body).await); + let mut events = collect_n(rx, 2).await; { let event = events.remove(0); let log = event.as_log(); @@ -361,30 +336,15 @@ mod tests { async fn http_json_parsing() { trace_init(); - let (rx, addr) = source( - Encoding::Json, - vec![], - vec![], - "http_path", - "/", - true, - EventStatus::Delivered, - ) - .await; + let (rx, addr) = source(Encoding::Json, vec![], vec![], "http_path", "/", true).await; - let mut events = spawn_collect_n( - async move { - assert_eq!(400, send(addr, "{").await); //malformed - assert_eq!(400, send(addr, r#"{"key"}"#).await); //key without value + assert_eq!(400, send(addr, "{").await); //malformed + assert_eq!(400, send(addr, r#"{"key"}"#).await); //key without value - assert_eq!(200, send(addr, "{}").await); //can be one object or array of objects - assert_eq!(200, send(addr, "[{},{},{}]").await); - }, - rx, - 2, - ) - .await; + assert_eq!(200, send(addr, "{}").await); //can be one object or array of objects + assert_eq!(200, send(addr, "[{},{},{}]").await); + let mut events = collect_n(rx, 2).await; assert!(events .remove(1) .as_log() @@ -401,27 +361,12 @@ mod tests { async fn http_json_values() { trace_init(); - let (rx, addr) = source( - Encoding::Json, - vec![], - vec![], - "http_path", - "/", - true, - EventStatus::Delivered, - ) - .await; + let (rx, addr) = source(Encoding::Json, vec![], vec![], "http_path", "/", true).await; - let mut events = spawn_collect_n( - async move { - assert_eq!(200, send(addr, r#"[{"key":"value"}]"#).await); - assert_eq!(200, send(addr, r#"{"key2":"value2"}"#).await); - }, - rx, - 2, - ) - .await; + assert_eq!(200, send(addr, r#"[{"key":"value"}]"#).await); + assert_eq!(200, send(addr, r#"{"key2":"value2"}"#).await); + let mut events = collect_n(rx, 2).await; { let event = events.remove(0); let log = event.as_log(); @@ -444,30 +389,15 @@ mod tests { async fn http_json_dotted_keys() { trace_init(); - let (rx, addr) = source( - Encoding::Json, - vec![], - vec![], - "http_path", - "/", - true, - EventStatus::Delivered, - ) - .await; + let (rx, addr) = source(Encoding::Json, vec![], vec![], "http_path", "/", true).await; - let mut events = spawn_collect_n( - async move { - assert_eq!(200, send(addr, r#"[{"dotted.key":"value"}]"#).await); - assert_eq!( - 200, - send(addr, r#"{"nested":{"dotted.key2":"value2"}}"#).await - ); - }, - rx, - 2, - ) - .await; + assert_eq!(200, send(addr, r#"[{"dotted.key":"value"}]"#).await); + assert_eq!( + 200, + send(addr, r#"{"nested":{"dotted.key2":"value2"}}"#).await + ); + let mut events = collect_n(rx, 2).await; { let event = events.remove(0); let log = event.as_log(); @@ -486,31 +416,16 @@ mod tests { async fn http_ndjson() { trace_init(); - let (rx, addr) = source( - Encoding::Ndjson, - vec![], - vec![], - "http_path", - "/", - true, - EventStatus::Delivered, - ) - .await; + let (rx, addr) = source(Encoding::Ndjson, vec![], vec![], "http_path", "/", true).await; - let mut events = spawn_collect_n( - async move { - assert_eq!(400, send(addr, r#"[{"key":"value"}]"#).await); //one object per line - - assert_eq!( - 200, - send(addr, "{\"key1\":\"value1\"}\n\n{\"key2\":\"value2\"}").await - ); - }, - rx, - 2, - ) - .await; + assert_eq!(400, send(addr, r#"[{"key":"value"}]"#).await); //one object per line + assert_eq!( + 200, + send(addr, "{\"key1\":\"value1\"}\n\n{\"key2\":\"value2\"}").await + ); + + let mut events = collect_n(rx, 2).await; { let event = events.remove(0); let log = event.as_log(); @@ -548,17 +463,15 @@ mod tests { "http_path", "/", true, - EventStatus::Delivered, ) .await; - let mut events = spawn_ok_collect_n( - send_with_headers(addr, "{\"key1\":\"value1\"}", headers), - rx, - 1, - ) - .await; + assert_eq!( + 200, + send_with_headers(addr, "{\"key1\":\"value1\"}", headers).await + ); + let mut events = collect_n(rx, 1).await; { let event = events.remove(0); let log = event.as_log(); @@ -586,17 +499,15 @@ mod tests { "http_path", "/", true, - EventStatus::Delivered, ) .await; - let mut events = spawn_ok_collect_n( - send_with_query(addr, "{\"key1\":\"value1\"}", "source=staging®ion=gb"), - rx, - 1, - ) - .await; + assert_eq!( + 200, + send_with_query(addr, "{\"key1\":\"value1\"}", "source=staging®ion=gb").await + ); + let mut events = collect_n(rx, 1).await; { let event = events.remove(0); let log = event.as_log(); @@ -627,19 +538,11 @@ mod tests { let mut headers = HeaderMap::new(); headers.insert("Content-Encoding", "gzip, deflate".parse().unwrap()); - let (rx, addr) = source( - Encoding::default(), - vec![], - vec![], - "http_path", - "/", - true, - EventStatus::Delivered, - ) - .await; + let (rx, addr) = source(Encoding::default(), vec![], vec![], "http_path", "/", true).await; - let mut events = spawn_ok_collect_n(send_bytes(addr, body, headers), rx, 1).await; + assert_eq!(200, send_bytes(addr, body, headers).await); + let mut events = collect_n(rx, 1).await; { let event = events.remove(0); let log = event.as_log(); @@ -660,17 +563,15 @@ mod tests { "vector_http_path", "/event/path", true, - EventStatus::Delivered, ) .await; - let mut events = spawn_ok_collect_n( - send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path"), - rx, - 1, - ) - .await; + assert_eq!( + 200, + send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path").await + ); + let mut events = collect_n(rx, 1).await; { let event = events.remove(0); let log = event.as_log(); @@ -691,26 +592,19 @@ mod tests { "vector_http_path", "/event", false, - EventStatus::Delivered, ) .await; - let mut events = spawn_collect_n( - async move { - assert_eq!( - 200, - send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path1").await - ); - assert_eq!( - 200, - send_with_path(addr, "{\"key2\":\"value2\"}", "/event/path2").await - ); - }, - rx, - 2, - ) - .await; + assert_eq!( + 200, + send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path1").await + ); + assert_eq!( + 200, + send_with_path(addr, "{\"key2\":\"value2\"}", "/event/path2").await + ); + let mut events = collect_n(rx, 2).await; { let event = events.remove(0); let log = event.as_log(); @@ -739,7 +633,6 @@ mod tests { "vector_http_path", "/", true, - EventStatus::Delivered, ) .await; @@ -748,29 +641,4 @@ mod tests { send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path").await ); } - - #[tokio::test] - async fn http_delivery_failure() { - trace_init(); - - let (rx, addr) = source( - Encoding::default(), - vec![], - vec![], - "http_path", - "/", - true, - EventStatus::Failed, - ) - .await; - - spawn_collect_n( - async move { - assert_eq!(400, send(addr, "test body\n").await); - }, - rx, - 1, - ) - .await; - } } diff --git a/src/sources/prometheus/remote_write.rs b/src/sources/prometheus/remote_write.rs index 98b5e51731085..d7c39f6cd6f96 100644 --- a/src/sources/prometheus/remote_write.rs +++ b/src/sources/prometheus/remote_write.rs @@ -91,7 +91,7 @@ impl RemoteWriteSource { } impl HttpSource for RemoteWriteSource { - fn build_events( + fn build_event( &self, mut body: Bytes, header_map: HeaderMap, @@ -107,10 +107,10 @@ impl HttpSource for RemoteWriteSource { { body = decode(&Some("snappy".to_string()), body)?; } - let events = self.decode_body(body)?; - let count = events.len(); + let result = self.decode_body(body)?; + let count = result.len(); emit!(PrometheusRemoteWriteReceived { count }); - Ok(events) + Ok(result) } } diff --git a/src/sources/util/body_decoding.rs b/src/sources/util/body_decoding.rs index 4c7468e51a53f..ce2d71bd447bf 100644 --- a/src/sources/util/body_decoding.rs +++ b/src/sources/util/body_decoding.rs @@ -1,8 +1,4 @@ -use crate::{ - config::log_schema, - event::{Event, LogEvent}, - sources::util::http::ErrorMessage, -}; +use crate::{config::log_schema, event::Event, sources::util::http::ErrorMessage}; use bytes::{Bytes, BytesMut}; use chrono::Utc; use codec::BytesDelimitedCodec; @@ -48,13 +44,13 @@ fn body_to_lines(buf: Bytes) -> impl Iterator pub fn decode_body(body: Bytes, enc: Encoding) -> Result, ErrorMessage> { match enc { Encoding::Text => body_to_lines(body) - .map(|r| Ok(LogEvent::from(r?).into())) + .map(|r| Ok(Event::from(r?))) .collect::>(), Encoding::Ndjson => body_to_lines(body) .map(|j| { let parsed_json = serde_json::from_slice(&j?) .map_err(|error| json_error(format!("Error parsing Ndjson: {:?}", error)))?; - json_parse_object(parsed_json).map(Into::into) + json_parse_object(parsed_json) }) .collect::>(), Encoding::Json => { @@ -66,15 +62,16 @@ pub fn decode_body(body: Bytes, enc: Encoding) -> Result, ErrorMessag } #[cfg(any(feature = "sources-http", feature = "sources-datadog"))] -fn json_parse_object(value: JsonValue) -> Result { +fn json_parse_object(value: JsonValue) -> Result { + let mut event = Event::new_empty_log(); + let log = event.as_mut_log(); + log.insert(log_schema().timestamp_key(), Utc::now()); // Add timestamp match value { JsonValue::Object(map) => { - let mut log = LogEvent::default(); - log.insert(log_schema().timestamp_key(), Utc::now()); // Add timestamp for (k, v) in map { log.insert_flat(k, v); } - Ok(log) + Ok(event) } _ => Err(json_error(format!( "Expected Object, got {}", @@ -88,11 +85,11 @@ fn json_parse_array_of_object(value: JsonValue) -> Result, ErrorMessa match value { JsonValue::Array(v) => v .into_iter() - .map(|object| json_parse_object(object).map(Into::into)) + .map(json_parse_object) .collect::>(), JsonValue::Object(map) => { //treat like an array of one object - Ok(vec![json_parse_object(JsonValue::Object(map))?.into()]) + Ok(vec![json_parse_object(JsonValue::Object(map))?]) } _ => Err(json_error(format!( "Expected Array or Object, got {}.", diff --git a/src/sources/util/http.rs b/src/sources/util/http.rs index 967b0878503de..8a31516c67261 100644 --- a/src/sources/util/http.rs +++ b/src/sources/util/http.rs @@ -1,5 +1,5 @@ use crate::{ - event::{BatchNotifier, BatchStatus, Event}, + event::Event, internal_events::{HttpBadRequest, HttpDecompressError, HttpEventsReceived}, shutdown::ShutdownSignal, tls::{MaybeTlsSettings, TlsConfig}, @@ -12,9 +12,7 @@ use futures::{FutureExt, SinkExt, StreamExt, TryFutureExt}; use headers::{Authorization, HeaderMapExt}; use serde::{Deserialize, Serialize}; use snap::raw::Decoder as SnappyDecoder; -use std::{ - collections::HashMap, convert::TryFrom, error::Error, fmt, io::Read, net::SocketAddr, sync::Arc, -}; +use std::{collections::HashMap, convert::TryFrom, error::Error, fmt, io::Read, net::SocketAddr}; use tracing_futures::Instrument; use warp::{ filters::{path::FullPath, path::Tail, BoxedFilter}, @@ -177,7 +175,7 @@ fn handle_decode_error(encoding: &str, error: impl std::error::Error) -> ErrorMe #[async_trait] pub trait HttpSource: Clone + Send + Sync + 'static { - fn build_events( + fn build_event( &self, body: Bytes, header_map: HeaderMap, @@ -238,25 +236,18 @@ pub trait HttpSource: Clone + Send + Sync + 'static { .is_valid(&auth_header) .and_then(|()| decode(&encoding_header, body)) .and_then(|body| { - let body_len = body.len(); - self.build_events(body, headers, query_parameters, path.as_str()) - .map(|events|(events, body_len)) + let body_len=body.len(); + self.build_event(body, headers, query_parameters, path.as_str()) + .map(|events| (events, body_len)) }); async move { match events { - Ok((mut events, body_size)) => { + Ok((events,body_size)) => { emit!(HttpEventsReceived { events_count: events.len(), byte_size: body_size, }); - - let (batch, receiver) = BatchNotifier::new_with_receiver(); - for event in &mut events { - event.add_batch_notifier(Arc::clone(&batch)); - } - drop(batch); - out.send_all(&mut futures::stream::iter(events).map(Ok)) .map_err(move |error: crate::pipeline::ClosedError| { // can only fail if receiving end disconnected, so we are shutting down, @@ -265,23 +256,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static { error!(message = "Tried to send the following event.", %error); warp::reject::custom(RejectShuttingDown) }) - .and_then(|_| async move { - match receiver.await.unwrap_or(BatchStatus::Delivered) { - BatchStatus::Delivered => Ok(warp::reply()), - BatchStatus::Errored => Err(warp::reject::custom( - ErrorMessage::new( - StatusCode::INTERNAL_SERVER_ERROR, - "Error delivering contents to sink".into(), - ), - )), - BatchStatus::Failed => Err(warp::reject::custom( - ErrorMessage::new( - StatusCode::BAD_REQUEST, - "Contents failed to deliver to sink".into(), - ), - )), - } - }) + .map_ok(|_| warp::reply()) .await } Err(error) => { diff --git a/src/test_util/mod.rs b/src/test_util/mod.rs index 60be039dd8a3c..a486051f2c6a8 100644 --- a/src/test_util/mod.rs +++ b/src/test_util/mod.rs @@ -545,19 +545,3 @@ pub async fn start_topology( .await .unwrap() } - -/// Collect the first `n` events from a stream while a future is spawned -/// in the background. This is used for tests where the collect has to -/// happen concurrent with the sending process (ie the stream is -/// handling finalization, which is required for the future to receive -/// an acknowledgement). -pub async fn spawn_collect_n(future: F, stream: S, n: usize) -> Vec -where - F: Future + Send + 'static, - S: Stream + Unpin, -{ - let sender = tokio::spawn(future); - let events = collect_n(stream, n).await; - sender.await.expect("Failed to send data"); - events -}