Skip to content

Commit

Permalink
chore: temporarily revert 7325
Browse files Browse the repository at this point in the history
Revert "enhancement(datadog source, http source): Add acknowledgement support (#7325)"

This appears to be causing CI to hang. I figured reverting was better
than letting it pile up.

This reverts commit 838bdb2.
  • Loading branch information
jszwedko committed May 12, 2021
1 parent 151dacf commit f1a58b4
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 413 deletions.
25 changes: 5 additions & 20 deletions lib/vector-core/src/event/finalization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use atomig::{Atom, AtomInteger, Atomic, Ordering};
use serde::{Deserialize, Serialize};
use std::iter::{self, ExactSizeIterator};
use std::{mem, sync::Arc};
use tokio::sync::oneshot;

Expand Down Expand Up @@ -30,33 +29,19 @@ impl EventFinalizers {
Self(vec![Arc::new(finalizer)].into())
}

/// Add a single finalizer to this array.
pub fn add(&mut self, finalizer: EventFinalizer) {
self.add_generic(iter::once(Arc::new(finalizer)));
}

/// Merge the given list of finalizers into this array.
pub fn merge(&mut self, other: Self) {
// Box<[T]> is missing IntoIterator; this just adds a `capacity` value
let other: Vec<_> = other.0.into();
self.add_generic(other.into_iter());
}

fn add_generic<I>(&mut self, items: I)
where
I: ExactSizeIterator<Item = Arc<EventFinalizer>>,
{
if self.0.is_empty() {
self.0 = items.collect::<Vec<_>>().into();
} else if items.len() > 0 {
if !other.0.is_empty() {
// This requires a bit of extra work both to avoid cloning
// the actual elements and because `self.0` cannot be
// mutated in place.
let finalizers = mem::replace(&mut self.0, vec![].into());
let mut result: Vec<_> = finalizers.into();
// This is the only step that may cause a (re)allocation.
result.reserve_exact(items.len());
for entry in items {
result.reserve_exact(other.0.len());
// Box<[T]> is missing IntoIterator
let other: Vec<_> = other.0.into();
for entry in other {
// Deduplicate by hand, assume the list is trivially small
if !result.iter().any(|existing| Arc::ptr_eq(existing, &entry)) {
result.push(entry);
Expand Down
4 changes: 0 additions & 4 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ impl LogEvent {
Self::from_parts(fields, metadata.with_finalizer(EventFinalizer::new(batch)))
}

pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
self.metadata.add_finalizer(finalizer);
}

#[instrument(level = "trace", skip(self, key), fields(key = %key.as_ref()))]
pub fn get(&self, key: impl AsRef<str>) -> Option<&Value> {
util::log::get(self.as_map(), key.as_ref())
Expand Down
5 changes: 0 additions & 5 deletions lib/vector-core/src/event/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ impl EventMetadata {
pub fn update_sources(&mut self) {
self.finalizers.update_sources();
}

/// Add a new finalizer to the array
pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
self.finalizers.add(finalizer);
}
}

impl EventDataEq for EventMetadata {
Expand Down
6 changes: 1 addition & 5 deletions lib/vector-core/src/event/metric.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{EventFinalizer, EventMetadata};
use super::EventMetadata;
use crate::metrics::Handle;
use chrono::{DateTime, Utc};
use derive_is_enum_variant::is_enum_variant;
Expand Down Expand Up @@ -275,10 +275,6 @@ impl Metric {
self
}

pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
self.metadata.add_finalizer(finalizer);
}

pub fn with_tags(mut self, tags: Option<MetricTags>) -> Self {
self.series.tags = tags;
self
Expand Down
9 changes: 0 additions & 9 deletions lib/vector-core/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use shared::EventDataEq;
use std::collections::{BTreeMap, HashMap};
use std::convert::{TryFrom, TryInto};
use std::fmt::Debug;
use std::sync::Arc;
use tracing::field::{Field, Visit};
pub use util::log::PathComponent;
pub use util::log::PathIter;
Expand Down Expand Up @@ -139,14 +138,6 @@ impl Event {
Self::Metric(metric) => metric.metadata_mut(),
}
}

pub fn add_batch_notifier(&mut self, batch: Arc<BatchNotifier>) {
let finalizer = EventFinalizer::new(batch);
match self {
Self::Log(log) => log.add_finalizer(finalizer),
Self::Metric(metric) => metric.add_finalizer(finalizer),
}
}
}

impl EventDataEq for Event {
Expand Down
22 changes: 1 addition & 21 deletions src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
use crate::{internal_events::EventOut, transforms::FunctionTransform};
use crate::{event::Event, internal_events::EventOut, transforms::FunctionTransform};
use futures::{channel::mpsc, task::Poll, Sink};
#[cfg(test)]
use futures::{Stream, StreamExt};
use std::{collections::VecDeque, fmt, pin::Pin, task::Context};
use vector_core::event::Event;
#[cfg(test)]
use vector_core::event::EventStatus;

#[derive(Debug)]
pub struct ClosedError;
Expand Down Expand Up @@ -109,21 +104,6 @@ impl Pipeline {
Self::new_with_buffer(100, vec![])
}

#[cfg(test)]
pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) {
let (pipe, recv) = Self::new_with_buffer(100, vec![]);
// In a source test pipeline, there is no sink to acknowledge
// events, so we have to add a map to the receiver to handle the
// finalization.
let recv = recv.map(move |mut event| {
let metadata = event.metadata_mut();
metadata.update_status(status);
metadata.update_sources();
event
});
(pipe, recv)
}

pub fn new_with_buffer(
n: usize,
inlines: Vec<Box<dyn FunctionTransform>>,
Expand Down
128 changes: 43 additions & 85 deletions src/sources/datadog/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl SourceConfig for DatadogLogsConfig {
struct DatadogLogsSource {}

impl HttpSource for DatadogLogsSource {
fn build_events(
fn build_event(
&self,
body: Bytes,
header_map: HeaderMap,
Expand All @@ -100,7 +100,7 @@ impl HttpSource for DatadogLogsSource {
decode_body(body, Encoding::Json).map(|mut events| {
// Add source type & Datadog API key
let key = log_schema().source_type_key();
for event in &mut events {
for event in events.iter_mut() {
let log = event.as_mut_log();
log.try_insert(key, Bytes::from("datadog_logs"));
if let Some(k) = &api_key {
Expand Down Expand Up @@ -128,11 +128,11 @@ mod tests {

use crate::{
config::{log_schema, SourceConfig, SourceContext},
event::{Event, EventStatus},
test_util::{next_addr, spawn_collect_n, trace_init, wait_for_tcp},
event::Event,
test_util::{collect_n, next_addr, trace_init, wait_for_tcp},
Pipeline,
};
use futures::Stream;
use futures::channel::mpsc;
use http::HeaderMap;
use pretty_assertions::assert_eq;
use std::net::SocketAddr;
Expand All @@ -142,8 +142,8 @@ mod tests {
crate::test_util::test_generate_config::<DatadogLogsConfig>();
}

async fn source(status: EventStatus) -> (impl Stream<Item = Event>, SocketAddr) {
let (sender, recv) = Pipeline::new_test_finalize(status);
async fn source() -> (mpsc::Receiver<Event>, SocketAddr) {
let (sender, recv) = Pipeline::new_test();
let address = next_addr();
tokio::spawn(async move {
DatadogLogsConfig {
Expand Down Expand Up @@ -181,26 +181,20 @@ mod tests {
#[tokio::test]
async fn no_api_key() {
trace_init();
let (rx, addr) = source(EventStatus::Delivered).await;
let (rx, addr) = source().await;

let mut events = spawn_collect_n(
async move {
assert_eq!(
200,
send_with_path(
addr,
r#"[{"message":"foo", "timestamp": 123}]"#,
HeaderMap::new(),
"/v1/input/"
)
.await
);
},
rx,
1,
)
.await;
assert_eq!(
200,
send_with_path(
addr,
r#"[{"message":"foo", "timestamp": 123}]"#,
HeaderMap::new(),
"/v1/input/"
)
.await
);

let mut events = collect_n(rx, 1).await;
{
let event = events.remove(0);
let log = event.as_log();
Expand All @@ -214,26 +208,20 @@ mod tests {
#[tokio::test]
async fn api_key_in_url() {
trace_init();
let (rx, addr) = source(EventStatus::Delivered).await;
let (rx, addr) = source().await;

let mut events = spawn_collect_n(
async move {
assert_eq!(
200,
send_with_path(
addr,
r#"[{"message":"bar", "timestamp": 456}]"#,
HeaderMap::new(),
"/v1/input/12345678abcdefgh12345678abcdefgh"
)
.await
);
},
rx,
1,
)
.await;
assert_eq!(
200,
send_with_path(
addr,
r#"[{"message":"bar", "timestamp": 456}]"#,
HeaderMap::new(),
"/v1/input/12345678abcdefgh12345678abcdefgh"
)
.await
);

let mut events = collect_n(rx, 1).await;
{
let event = events.remove(0);
let log = event.as_log();
Expand All @@ -247,32 +235,26 @@ mod tests {
#[tokio::test]
async fn api_key_in_header() {
trace_init();
let (rx, addr) = source(EventStatus::Delivered).await;
let (rx, addr) = source().await;

let mut headers = HeaderMap::new();
headers.insert(
"dd-api-key",
"12345678abcdefgh12345678abcdefgh".parse().unwrap(),
);

let mut events = spawn_collect_n(
async move {
assert_eq!(
200,
send_with_path(
addr,
r#"[{"message":"baz", "timestamp": 789}]"#,
headers,
"/v1/input/"
)
.await
);
},
rx,
1,
)
.await;
assert_eq!(
200,
send_with_path(
addr,
r#"[{"message":"baz", "timestamp": 789}]"#,
headers,
"/v1/input/"
)
.await
);

let mut events = collect_n(rx, 1).await;
{
let event = events.remove(0);
let log = event.as_log();
Expand All @@ -282,28 +264,4 @@ mod tests {
assert_eq!(log[log_schema().source_type_key()], "datadog_logs".into());
}
}

#[tokio::test]
async fn delivery_failure() {
trace_init();
let (rx, addr) = source(EventStatus::Failed).await;

spawn_collect_n(
async move {
assert_eq!(
400,
send_with_path(
addr,
r#"[{"message":"foo", "timestamp": 123}]"#,
HeaderMap::new(),
"/v1/input/"
)
.await
);
},
rx,
1,
)
.await;
}
}
2 changes: 1 addition & 1 deletion src/sources/heroku_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct LogplexSource {
}

impl HttpSource for LogplexSource {
fn build_events(
fn build_event(
&self,
body: Bytes,
header_map: HeaderMap,
Expand Down
Loading

0 comments on commit f1a58b4

Please sign in to comment.