Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: temporarily revert 7325 #7426

Merged
merged 1 commit into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 5 additions & 20 deletions lib/vector-core/src/event/finalization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use atomig::{Atom, AtomInteger, Atomic, Ordering};
use serde::{Deserialize, Serialize};
use std::iter::{self, ExactSizeIterator};
use std::{mem, sync::Arc};
use tokio::sync::oneshot;

Expand Down Expand Up @@ -30,33 +29,19 @@ impl EventFinalizers {
Self(vec![Arc::new(finalizer)].into())
}

/// Add a single finalizer to this array.
pub fn add(&mut self, finalizer: EventFinalizer) {
self.add_generic(iter::once(Arc::new(finalizer)));
}

/// Merge the given list of finalizers into this array.
pub fn merge(&mut self, other: Self) {
// Box<[T]> is missing IntoIterator; this just adds a `capacity` value
let other: Vec<_> = other.0.into();
self.add_generic(other.into_iter());
}

fn add_generic<I>(&mut self, items: I)
where
I: ExactSizeIterator<Item = Arc<EventFinalizer>>,
{
if self.0.is_empty() {
self.0 = items.collect::<Vec<_>>().into();
} else if items.len() > 0 {
if !other.0.is_empty() {
// This requires a bit of extra work both to avoid cloning
// the actual elements and because `self.0` cannot be
// mutated in place.
let finalizers = mem::replace(&mut self.0, vec![].into());
let mut result: Vec<_> = finalizers.into();
// This is the only step that may cause a (re)allocation.
result.reserve_exact(items.len());
for entry in items {
result.reserve_exact(other.0.len());
// Box<[T]> is missing IntoIterator
let other: Vec<_> = other.0.into();
for entry in other {
// Deduplicate by hand, assume the list is trivially small
if !result.iter().any(|existing| Arc::ptr_eq(existing, &entry)) {
result.push(entry);
Expand Down
4 changes: 0 additions & 4 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ impl LogEvent {
Self::from_parts(fields, metadata.with_finalizer(EventFinalizer::new(batch)))
}

pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
self.metadata.add_finalizer(finalizer);
}

#[instrument(level = "trace", skip(self, key), fields(key = %key.as_ref()))]
pub fn get(&self, key: impl AsRef<str>) -> Option<&Value> {
util::log::get(self.as_map(), key.as_ref())
Expand Down
5 changes: 0 additions & 5 deletions lib/vector-core/src/event/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ impl EventMetadata {
pub fn update_sources(&mut self) {
self.finalizers.update_sources();
}

/// Add a new finalizer to the array
pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
self.finalizers.add(finalizer);
}
}

impl EventDataEq for EventMetadata {
Expand Down
6 changes: 1 addition & 5 deletions lib/vector-core/src/event/metric.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{EventFinalizer, EventMetadata};
use super::EventMetadata;
use crate::metrics::Handle;
use chrono::{DateTime, Utc};
use derive_is_enum_variant::is_enum_variant;
Expand Down Expand Up @@ -275,10 +275,6 @@ impl Metric {
self
}

pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
self.metadata.add_finalizer(finalizer);
}

pub fn with_tags(mut self, tags: Option<MetricTags>) -> Self {
self.series.tags = tags;
self
Expand Down
9 changes: 0 additions & 9 deletions lib/vector-core/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use shared::EventDataEq;
use std::collections::{BTreeMap, HashMap};
use std::convert::{TryFrom, TryInto};
use std::fmt::Debug;
use std::sync::Arc;
use tracing::field::{Field, Visit};
pub use util::log::PathComponent;
pub use util::log::PathIter;
Expand Down Expand Up @@ -139,14 +138,6 @@ impl Event {
Self::Metric(metric) => metric.metadata_mut(),
}
}

pub fn add_batch_notifier(&mut self, batch: Arc<BatchNotifier>) {
let finalizer = EventFinalizer::new(batch);
match self {
Self::Log(log) => log.add_finalizer(finalizer),
Self::Metric(metric) => metric.add_finalizer(finalizer),
}
}
}

impl EventDataEq for Event {
Expand Down
22 changes: 1 addition & 21 deletions src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
use crate::{internal_events::EventOut, transforms::FunctionTransform};
use crate::{event::Event, internal_events::EventOut, transforms::FunctionTransform};
use futures::{channel::mpsc, task::Poll, Sink};
#[cfg(test)]
use futures::{Stream, StreamExt};
use std::{collections::VecDeque, fmt, pin::Pin, task::Context};
use vector_core::event::Event;
#[cfg(test)]
use vector_core::event::EventStatus;

#[derive(Debug)]
pub struct ClosedError;
Expand Down Expand Up @@ -109,21 +104,6 @@ impl Pipeline {
Self::new_with_buffer(100, vec![])
}

#[cfg(test)]
pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) {
let (pipe, recv) = Self::new_with_buffer(100, vec![]);
// In a source test pipeline, there is no sink to acknowledge
// events, so we have to add a map to the receiver to handle the
// finalization.
let recv = recv.map(move |mut event| {
let metadata = event.metadata_mut();
metadata.update_status(status);
metadata.update_sources();
event
});
(pipe, recv)
}

pub fn new_with_buffer(
n: usize,
inlines: Vec<Box<dyn FunctionTransform>>,
Expand Down
128 changes: 43 additions & 85 deletions src/sources/datadog/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl SourceConfig for DatadogLogsConfig {
struct DatadogLogsSource {}

impl HttpSource for DatadogLogsSource {
fn build_events(
fn build_event(
&self,
body: Bytes,
header_map: HeaderMap,
Expand All @@ -100,7 +100,7 @@ impl HttpSource for DatadogLogsSource {
decode_body(body, Encoding::Json).map(|mut events| {
// Add source type & Datadog API key
let key = log_schema().source_type_key();
for event in &mut events {
for event in events.iter_mut() {
let log = event.as_mut_log();
log.try_insert(key, Bytes::from("datadog_logs"));
if let Some(k) = &api_key {
Expand Down Expand Up @@ -128,11 +128,11 @@ mod tests {

use crate::{
config::{log_schema, SourceConfig, SourceContext},
event::{Event, EventStatus},
test_util::{next_addr, spawn_collect_n, trace_init, wait_for_tcp},
event::Event,
test_util::{collect_n, next_addr, trace_init, wait_for_tcp},
Pipeline,
};
use futures::Stream;
use futures::channel::mpsc;
use http::HeaderMap;
use pretty_assertions::assert_eq;
use std::net::SocketAddr;
Expand All @@ -142,8 +142,8 @@ mod tests {
crate::test_util::test_generate_config::<DatadogLogsConfig>();
}

async fn source(status: EventStatus) -> (impl Stream<Item = Event>, SocketAddr) {
let (sender, recv) = Pipeline::new_test_finalize(status);
async fn source() -> (mpsc::Receiver<Event>, SocketAddr) {
let (sender, recv) = Pipeline::new_test();
let address = next_addr();
tokio::spawn(async move {
DatadogLogsConfig {
Expand Down Expand Up @@ -181,26 +181,20 @@ mod tests {
#[tokio::test]
async fn no_api_key() {
trace_init();
let (rx, addr) = source(EventStatus::Delivered).await;
let (rx, addr) = source().await;

let mut events = spawn_collect_n(
async move {
assert_eq!(
200,
send_with_path(
addr,
r#"[{"message":"foo", "timestamp": 123}]"#,
HeaderMap::new(),
"/v1/input/"
)
.await
);
},
rx,
1,
)
.await;
assert_eq!(
200,
send_with_path(
addr,
r#"[{"message":"foo", "timestamp": 123}]"#,
HeaderMap::new(),
"/v1/input/"
)
.await
);

let mut events = collect_n(rx, 1).await;
{
let event = events.remove(0);
let log = event.as_log();
Expand All @@ -214,26 +208,20 @@ mod tests {
#[tokio::test]
async fn api_key_in_url() {
trace_init();
let (rx, addr) = source(EventStatus::Delivered).await;
let (rx, addr) = source().await;

let mut events = spawn_collect_n(
async move {
assert_eq!(
200,
send_with_path(
addr,
r#"[{"message":"bar", "timestamp": 456}]"#,
HeaderMap::new(),
"/v1/input/12345678abcdefgh12345678abcdefgh"
)
.await
);
},
rx,
1,
)
.await;
assert_eq!(
200,
send_with_path(
addr,
r#"[{"message":"bar", "timestamp": 456}]"#,
HeaderMap::new(),
"/v1/input/12345678abcdefgh12345678abcdefgh"
)
.await
);

let mut events = collect_n(rx, 1).await;
{
let event = events.remove(0);
let log = event.as_log();
Expand All @@ -247,32 +235,26 @@ mod tests {
#[tokio::test]
async fn api_key_in_header() {
trace_init();
let (rx, addr) = source(EventStatus::Delivered).await;
let (rx, addr) = source().await;

let mut headers = HeaderMap::new();
headers.insert(
"dd-api-key",
"12345678abcdefgh12345678abcdefgh".parse().unwrap(),
);

let mut events = spawn_collect_n(
async move {
assert_eq!(
200,
send_with_path(
addr,
r#"[{"message":"baz", "timestamp": 789}]"#,
headers,
"/v1/input/"
)
.await
);
},
rx,
1,
)
.await;
assert_eq!(
200,
send_with_path(
addr,
r#"[{"message":"baz", "timestamp": 789}]"#,
headers,
"/v1/input/"
)
.await
);

let mut events = collect_n(rx, 1).await;
{
let event = events.remove(0);
let log = event.as_log();
Expand All @@ -282,28 +264,4 @@ mod tests {
assert_eq!(log[log_schema().source_type_key()], "datadog_logs".into());
}
}

#[tokio::test]
async fn delivery_failure() {
trace_init();
let (rx, addr) = source(EventStatus::Failed).await;

spawn_collect_n(
async move {
assert_eq!(
400,
send_with_path(
addr,
r#"[{"message":"foo", "timestamp": 123}]"#,
HeaderMap::new(),
"/v1/input/"
)
.await
);
},
rx,
1,
)
.await;
}
}
2 changes: 1 addition & 1 deletion src/sources/heroku_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct LogplexSource {
}

impl HttpSource for LogplexSource {
fn build_events(
fn build_event(
&self,
body: Bytes,
header_map: HeaderMap,
Expand Down
Loading