From 2d7e2dff0ea4dcf3ba545f072725adfb6ace0a0b Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Wed, 6 Oct 2021 20:38:16 -0600 Subject: [PATCH 1/5] chore(observability, sinks): Add component tests to HttpSink sinks Signed-off-by: Bruce Guenter --- src/sinks/clickhouse.rs | 23 +++++------ src/sinks/datadog/events.rs | 12 +++++- src/sinks/elasticsearch/mod.rs | 31 ++++++++------- src/sinks/gcp/pubsub.rs | 4 +- src/sinks/http.rs | 8 ++-- src/sinks/influxdb/logs.rs | 11 +++++- src/sinks/logdna.rs | 6 ++- src/sinks/loki.rs | 69 +++++++++++---------------------- src/sinks/splunk_hec/logs.rs | 21 +++++----- src/sinks/splunk_hec/metrics.rs | 7 +++- src/test_util/components.rs | 41 +++++++++++++++++++- 11 files changed, 135 insertions(+), 98 deletions(-) diff --git a/src/sinks/clickhouse.rs b/src/sinks/clickhouse.rs index 8843ae1842a92..2791bb04bcdbe 100644 --- a/src/sinks/clickhouse.rs +++ b/src/sinks/clickhouse.rs @@ -281,7 +281,10 @@ mod integration_tests { use crate::{ config::{log_schema, SinkConfig, SinkContext}, sinks::util::encoding::TimestampFormat, - test_util::{random_string, trace_init}, + test_util::{ + components::{self, SINK_TESTS}, + random_string, trace_init, + }, }; use futures::{future, stream}; use serde_json::Value; @@ -298,6 +301,8 @@ mod integration_tests { use vector_core::event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, LogEvent}; use warp::Filter; + const SINK_TAGS: [&str; 1] = ["endpoint"]; + #[tokio::test] async fn insert_events() { trace_init(); @@ -335,9 +340,7 @@ mod integration_tests { .as_mut_log() .insert("items", vec!["item1", "item2"]); - sink.run(stream::once(ready(input_event.clone()))) - .await - .unwrap(); + components::run_sink_event(sink, input_event.clone(), &SINK_TAGS).await; let output = client.select_all(&table).await; assert_eq!(1, output.rows); @@ -381,9 +384,7 @@ mod integration_tests { let (mut input_event, mut receiver) = make_event(); input_event.as_mut_log().insert("unknown", "mysteries"); - sink.run(stream::once(ready(input_event.clone()))) - .await - .unwrap(); + components::run_sink_event(sink, input_event.clone(), &SINK_TAGS).await; let output = client.select_all(&table).await; assert_eq!(1, output.rows); @@ -434,9 +435,7 @@ mod integration_tests { let (mut input_event, _receiver) = make_event(); - sink.run(stream::once(future::ready(input_event.clone()))) - .await - .unwrap(); + components::run_sink_event(sink, input_event.clone(), &SINK_TAGS).await; let output = client.select_all(&table).await; assert_eq!(1, output.rows); @@ -493,9 +492,7 @@ timestamp_format = "unix""#, let (mut input_event, _receiver) = make_event(); - sink.run(stream::once(future::ready(input_event.clone()))) - .await - .unwrap(); + components::run_sink_event(sink, input_event.clone(), &SINK_TAGS).await; let output = client.select_all(&table).await; assert_eq!(1, output.rows); diff --git a/src/sinks/datadog/events.rs b/src/sinks/datadog/events.rs index be2f6d0a0de0c..dbb41c4e1d87a 100644 --- a/src/sinks/datadog/events.rs +++ b/src/sinks/datadog/events.rs @@ -268,7 +268,7 @@ mod tests { use crate::{ config::SinkConfig, sinks::util::test::{build_test_server_status, load_sink}, - test_util::{next_addr, random_lines_with_stream}, + test_util::{components, next_addr, random_lines_with_stream}, }; use bytes::Bytes; use futures::{ @@ -281,6 +281,8 @@ mod tests { use pretty_assertions::assert_eq; use vector_core::event::{BatchNotifier, BatchStatus}; + const SINK_TAGS: [&str; 1] = ["endpoint"]; + #[test] fn generate_config() { crate::test_util::test_generate_config::(); @@ -325,7 +327,11 @@ mod tests { let (batch, mut receiver) = BatchNotifier::new_with_receiver(); let (expected, events) = random_events_with_stream(100, 10, Some(batch)); - let _ = sink.run(events).await.unwrap(); + components::init(); + sink.run(events).await.unwrap(); + if batch_status == BatchStatus::Delivered { + components::SINK_TESTS.assert(&SINK_TAGS); + } assert_eq!(receiver.try_recv(), Ok(batch_status)); @@ -392,7 +398,9 @@ mod tests { Ok(e) }); + components::init(); let _ = sink.into_sink().send_all(&mut events).await.unwrap(); + components::SINK_TESTS.assert(&SINK_TAGS); let output = rx.take(expected.len()).collect::>().await; for (i, val) in output.iter().enumerate() { diff --git a/src/sinks/elasticsearch/mod.rs b/src/sinks/elasticsearch/mod.rs index 4a011a1b1a11a..cd0fd735d8e34 100644 --- a/src/sinks/elasticsearch/mod.rs +++ b/src/sinks/elasticsearch/mod.rs @@ -1107,7 +1107,7 @@ mod integration_tests { config::{ProxyConfig, SinkConfig, SinkContext}, http::HttpClient, sinks::HealthcheckError, - test_util::{random_events_with_stream, random_string, trace_init}, + test_util::{components, random_events_with_stream, random_string, trace_init}, tls::{self, TlsOptions}, }; use chrono::Utc; @@ -1118,6 +1118,8 @@ mod integration_tests { use std::{fs::File, future::ready, io::Read}; use vector_core::event::{BatchNotifier, BatchStatus, LogEvent}; + const SINK_TAGS: [&str; 1] = ["endpoint"]; + impl ElasticSearchCommon { async fn flush_request(&self) -> crate::Result<()> { let url = format!("{}/_flush", self.base_url) @@ -1229,9 +1231,7 @@ mod integration_tests { let timestamp = input_event[crate::config::log_schema().timestamp_key()].clone(); - sink.run(stream::once(ready(input_event.into()))) - .await - .unwrap(); + components::run_sink_event(sink, input_event.into(), &SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); @@ -1446,17 +1446,20 @@ mod integration_tests { if break_events { // Break all but the first event to simulate some kind of partial failure let mut doit = false; - sink.run(events.map(move |mut event| { - if doit { - event.as_mut_log().insert("_type", 1); - } - doit = true; - event - })) - .await - .expect("Sending events failed"); + components::run_sink( + sink, + events.map(move |mut event| { + if doit { + event.as_mut_log().insert("_type", 1); + } + doit = true; + event + }), + &SINK_TAGS, + ) + .await; } else { - sink.run(events).await.expect("Sending events failed"); + components::run_sink(sink, events, &SINK_TAGS).await; } assert_eq!(receiver.try_recv(), Ok(batch_status)); diff --git a/src/sinks/gcp/pubsub.rs b/src/sinks/gcp/pubsub.rs index a3daeda858c39..599a0b6122634 100644 --- a/src/sinks/gcp/pubsub.rs +++ b/src/sinks/gcp/pubsub.rs @@ -221,7 +221,7 @@ mod tests { #[cfg(feature = "gcp-pubsub-integration-tests")] mod integration_tests { use super::*; - use crate::test_util::{random_events_with_stream, random_string, trace_init}; + use crate::test_util::{components, random_events_with_stream, random_string, trace_init}; use reqwest::{Client, Method, Response}; use serde_json::{json, Value}; use vector_core::event::{BatchNotifier, BatchStatus}; @@ -255,7 +255,7 @@ mod integration_tests { let (batch, mut receiver) = BatchNotifier::new_with_receiver(); let (input, events) = random_events_with_stream(100, 100, Some(batch)); - sink.run(events).await.expect("Sending events failed"); + components::run_sink(sink, events, &["endpoint"]); assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); let response = pull_messages(&subscription, 1000).await; diff --git a/src/sinks/http.rs b/src/sinks/http.rs index a1e2bc0f8bf74..150ea9e952e04 100644 --- a/src/sinks/http.rs +++ b/src/sinks/http.rs @@ -312,7 +312,7 @@ mod tests { test::{build_test_server, build_test_server_generic, build_test_server_status}, }, }, - test_util::{next_addr, random_lines_with_stream}, + test_util::{components, next_addr, random_lines_with_stream}, }; use bytes::{Buf, Bytes}; use flate2::read::MultiGzDecoder; @@ -664,13 +664,11 @@ mod tests { let (in_addr, sink) = build_sink(extra_config).await; let (rx, trigger, server) = build_test_server(in_addr); + tokio::spawn(server); let (batch, mut receiver) = BatchNotifier::new_with_receiver(); let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); - let pump = sink.run(events); - - tokio::spawn(server); - pump.await.unwrap(); + components::run_sink(sink, events, &["endpoint"]).await; drop(trigger); assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index e87f16caaca5e..8be9b3e73020c 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -240,7 +240,7 @@ mod tests { http::HttpSink, test::{build_test_server_status, load_sink}, }, - test_util::next_addr, + test_util::{components, next_addr}, }; use chrono::{offset::TimeZone, Utc}; use futures::{channel::mpsc, stream, StreamExt}; @@ -248,6 +248,8 @@ mod tests { use indoc::indoc; use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; + const SINK_TAGS: [&str; 1] = ["endpoint"]; + type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>; #[test] @@ -591,7 +593,11 @@ mod tests { } drop(batch); + components::init(); sink.run(stream::iter(events)).await.unwrap(); + if batch_status == BatchStatus::Delivered { + components::SINK_TESTS.assert(&SINK_TAGS); + } assert_eq!(receiver.try_recv(), Ok(batch_status)); @@ -663,6 +669,7 @@ mod integration_tests { test_util::{onboarding_v2, BUCKET, ORG, TOKEN}, InfluxDb2Settings, }, + test_util::components, }; use chrono::Utc; use futures::stream; @@ -708,7 +715,7 @@ mod integration_tests { let events = vec![Event::Log(event1), Event::Log(event2)]; - sink.run(stream::iter(events)).await.unwrap(); + components::run_sink(sink, stream::iter(events), &SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); diff --git a/src/sinks/logdna.rs b/src/sinks/logdna.rs index 9f9cb7e361042..ecb4934a70cac 100644 --- a/src/sinks/logdna.rs +++ b/src/sinks/logdna.rs @@ -296,7 +296,7 @@ mod tests { use crate::{ config::SinkConfig, sinks::util::test::{build_test_server_status, load_sink}, - test_util::{next_addr, random_lines, trace_init}, + test_util::{components, next_addr, random_lines, trace_init}, }; use futures::{channel::mpsc, stream, StreamExt}; use http::{request::Parts, StatusCode}; @@ -401,7 +401,11 @@ mod tests { } drop(batch); + components::init(); sink.run(stream::iter(events)).await.unwrap(); + if batch_status == BatchStatus::Delivered { + components::SINK_TESTS.assert(&["endpoint"]); + } assert_eq!(receiver.try_recv(), Ok(batch_status)); diff --git a/src/sinks/loki.rs b/src/sinks/loki.rs index 1fd4b8cfbb237..2ab1123b74367 100644 --- a/src/sinks/loki.rs +++ b/src/sinks/loki.rs @@ -528,8 +528,11 @@ mod tests { mod integration_tests { use super::*; use crate::{ - config::SinkConfig, sinks::util::test::load_sink, sinks::VectorSink, template::Template, - test_util::random_lines, + config::SinkConfig, + sinks::util::test::load_sink, + sinks::VectorSink, + template::Template, + test_util::{components, random_lines}, }; use bytes::Bytes; use chrono::{DateTime, Duration, Utc}; @@ -537,6 +540,8 @@ mod integration_tests { use std::convert::TryFrom; use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; + const SINK_TAGS: [&str; 1] = ["endpoint"]; + async fn build_sink(encoding: &str) -> (uuid::Uuid, VectorSink) { let stream = uuid::Uuid::new_v4(); @@ -566,6 +571,13 @@ mod integration_tests { (stream, sink) } + fn add_batch_notifier(events: &[Event], batch: BatchNotifier) -> Vec { + events + .iter() + .map(|event| event.clone().into_log().with_batch_notifier(&batch).into()) + .collect() + } + #[tokio::test] async fn text() { let (stream, sink) = build_sink("text").await; @@ -577,11 +589,7 @@ mod integration_tests { .clone() .into_iter() .map(move |line| Event::from(LogEvent::from(line).with_batch_notifier(&batch))); - let _ = sink - .into_sink() - .send_all(&mut stream::iter(events).map(Ok)) - .await - .unwrap(); + components::sink_send_all(sink, events, &SINK_TAGS); assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); let (_, outputs) = fetch_stream(stream.to_string(), "default").await; @@ -600,13 +608,7 @@ mod integration_tests { .map(Event::from) .collect::>(); let (batch, mut receiver) = BatchNotifier::new_with_receiver(); - let _ = sink - .into_sink() - .send_all(&mut stream::iter(events.clone().into_iter().map( - move |event| Ok(event.into_log().with_batch_notifier(&batch).into()), - ))) - .await - .unwrap(); + components::sink_send_all(sink, add_batch_notifier(&events, batch), &SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); let (_, outputs) = fetch_stream(stream.to_string(), "default").await; @@ -632,13 +634,7 @@ mod integration_tests { }) .collect::>(); let (batch, mut receiver) = BatchNotifier::new_with_receiver(); - let _ = sink - .into_sink() - .send_all(&mut stream::iter(events.clone().into_iter().map( - move |event| Ok(event.into_log().with_batch_notifier(&batch).into()), - ))) - .await - .unwrap(); + components::sink_send_all(sink, add_batch_notifier(&events, batch), &SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); let (_, outputs) = fetch_stream(stream.to_string(), "default").await; @@ -658,13 +654,7 @@ mod integration_tests { .map(Event::from) .collect::>(); let (batch, mut receiver) = BatchNotifier::new_with_receiver(); - let _ = sink - .into_sink() - .send_all(&mut stream::iter(events.clone().into_iter().map( - move |event| Ok(event.into_log().with_batch_notifier(&batch).into()), - ))) - .await - .unwrap(); + components::sink_send_all(sink, add_batch_notifier(&events, batch), &SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); let (_, outputs) = fetch_stream(stream.to_string(), "default").await; @@ -711,11 +701,7 @@ mod integration_tests { } } - let _ = sink - .into_sink() - .send_all(&mut stream::iter(events).map(Ok)) - .await - .unwrap(); + components::sink_send_all(sink, events, &SINK_TAGS).await; let (_, outputs1) = fetch_stream(stream1.to_string(), "default").await; let (_, outputs2) = fetch_stream(stream2.to_string(), "default").await; @@ -766,11 +752,7 @@ mod integration_tests { event.as_mut_log().insert("stream_key", "test_name"); } - let _ = sink - .into_sink() - .send_all(&mut stream::iter(events).map(Ok)) - .await - .unwrap(); + components::sink_send_all(sink, events, &SINK_TAGS).await; let (_, outputs) = fetch_stream(stream.to_string(), "default").await; @@ -823,11 +805,7 @@ mod integration_tests { } } - let _ = sink - .into_sink() - .send_all(&mut stream::iter(events).map(Ok)) - .await - .unwrap(); + components::sink_send_all(sink, events, &SINK_TAGS).await; let (_, outputs1) = fetch_stream(stream.to_string(), "tenant1").await; let (_, outputs2) = fetch_stream(stream.to_string(), "tenant2").await; @@ -967,10 +945,7 @@ mod integration_tests { config.batch.max_bytes = Some(4_000_000); let (sink, _) = config.build(cx).await.unwrap(); - sink.into_sink() - .send_all(&mut stream::iter(events.clone()).map(Ok)) - .await - .unwrap(); + components::sink_send_all(sink, events.clone(), &SINK_TAGS).await; let (timestamps, outputs) = fetch_stream(stream.to_string(), "default").await; assert_eq!(expected.len(), outputs.len()); diff --git a/src/sinks/splunk_hec/logs.rs b/src/sinks/splunk_hec/logs.rs index 98458d4b5bc75..dad1c0ca67dbd 100644 --- a/src/sinks/splunk_hec/logs.rs +++ b/src/sinks/splunk_hec/logs.rs @@ -350,7 +350,7 @@ mod integration_tests { use crate::{ config::{SinkConfig, SinkContext}, sinks::splunk_hec::conn::integration_test_helpers::get_token, - test_util::{random_lines_with_stream, random_string}, + test_util::{components, random_lines_with_stream, random_string}, }; use futures::stream; use serde_json::Value as JsonValue; @@ -362,6 +362,8 @@ mod integration_tests { const USERNAME: &str = "admin"; const PASSWORD: &str = "password"; + const SINK_TAGS: [&str; 1] = ["endpoint"]; + // It usually takes ~1 second for the event to show up in search, so poll until // we see it. async fn find_entry(message: &str) -> serde_json::value::Value { @@ -391,7 +393,7 @@ mod integration_tests { .with_batch_notifier(&batch) .into(); drop(batch); - sink.run(stream::once(ready(event))).await.unwrap(); + components::run_sink_event(sink, event, &SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); let entry = find_entry(message.as_str()).await; @@ -429,7 +431,7 @@ mod integration_tests { let message = random_string(100); let event = Event::from(message.clone()); - sink.run(stream::once(ready(event))).await.unwrap(); + components::run_sink_event(sink, event, &SINK_TAGS).await; let entry = find_entry(message.as_str()).await; @@ -446,7 +448,7 @@ mod integration_tests { let message = random_string(100); let event = Event::from(message.clone()); - sink.run(stream::once(ready(event))).await.unwrap(); + components::run_sink_event(sink, event, &SINK_TAGS).await; let entry = find_entry(message.as_str()).await; @@ -466,7 +468,7 @@ mod integration_tests { let message = random_string(100); let mut event = Event::from(message.clone()); event.as_mut_log().insert("index_name", "custom_index"); - sink.run(stream::once(ready(event))).await.unwrap(); + components::run_sink_event(sink, event, &SINK_TAGS).await; let entry = find_entry(message.as_str()).await; @@ -482,6 +484,7 @@ mod integration_tests { let (sink, _) = config.build(cx).await.unwrap(); let (messages, events) = random_lines_with_stream(100, 10, None); + components::run_sink(sink, events, &SINK_TAGS).await; sink.run(events).await.unwrap(); let mut found_all = false; @@ -515,7 +518,7 @@ mod integration_tests { let message = random_string(100); let mut event = Event::from(message.clone()); event.as_mut_log().insert("asdf", "hello"); - sink.run(stream::once(ready(event))).await.unwrap(); + components::run_sink_event(sink, event, &SINK_TAGS).await; let entry = find_entry(message.as_str()).await; @@ -536,7 +539,7 @@ mod integration_tests { let mut event = Event::from(message.clone()); event.as_mut_log().insert("asdf", "hello"); event.as_mut_log().insert("host", "example.com:1234"); - sink.run(stream::once(ready(event))).await.unwrap(); + components::run_sink_event(sink, event, &SINK_TAGS).await; let entry = find_entry(message.as_str()).await; @@ -560,7 +563,7 @@ mod integration_tests { let message = random_string(100); let mut event = Event::from(message.clone()); event.as_mut_log().insert("asdf", "hello"); - sink.run(stream::once(ready(event))).await.unwrap(); + components::run_sink_event(sink, event, &SINK_TAGS).await; let entry = find_entry(message.as_str()).await; @@ -587,7 +590,7 @@ mod integration_tests { event.as_mut_log().insert("asdf", "hello"); event.as_mut_log().insert("host", "example.com:1234"); event.as_mut_log().insert("roast", "beef.example.com:1234"); - sink.run(stream::once(ready(event))).await.unwrap(); + components::run_sink_event(sink, event, &SINK_TAGS).await; let entry = find_entry(message.as_str()).await; diff --git a/src/sinks/splunk_hec/metrics.rs b/src/sinks/splunk_hec/metrics.rs index d9a1b2105e832..785ec40ab16e8 100644 --- a/src/sinks/splunk_hec/metrics.rs +++ b/src/sinks/splunk_hec/metrics.rs @@ -571,6 +571,7 @@ mod integration_tests { config::{SinkConfig, SinkContext}, event::{Metric, MetricKind}, sinks::splunk_hec::conn::integration_test_helpers::get_token, + test_util::components, }; use futures::stream; use serde_json::Value as JsonValue; @@ -581,6 +582,8 @@ mod integration_tests { const USERNAME: &str = "admin"; const PASSWORD: &str = "password"; + const SINK_TAGS: [&str; 1] = ["endpoint"]; + #[tokio::test] async fn splunk_insert_counter_metric() { let cx = SinkContext::new_test(); @@ -601,7 +604,7 @@ mod integration_tests { .with_batch_notifier(&batch) .into(); drop(batch); - sink.run(stream::once(ready(event))).await.unwrap(); + components::run_sink_event(sink, event, &SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); assert!( @@ -633,7 +636,7 @@ mod integration_tests { .with_batch_notifier(&batch) .into(); drop(batch); - sink.run(stream::once(ready(event))).await.unwrap(); + components::run_sink_event(sink, event, &SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); assert!( diff --git a/src/test_util/components.rs b/src/test_util/components.rs index 960ebcf2e5e2d..066ba739387b9 100644 --- a/src/test_util/components.rs +++ b/src/test_util/components.rs @@ -5,8 +5,10 @@ //! internal events and metrics, and testing that they fit the required //! patterns. -use crate::event::{Metric, MetricValue}; +use crate::event::{Event, Metric, MetricValue}; use crate::metrics::{self, Controller}; +use crate::sinks::VectorSink; +use futures::{stream, SinkExt, Stream, StreamExt}; use lazy_static::lazy_static; use std::cell::RefCell; use std::collections::HashSet; @@ -41,6 +43,17 @@ lazy_static! { "component_sent_event_bytes_total", ], }; + /// The component test specification for all sinks + pub static ref SINK_TESTS: ComponentTests = ComponentTests { + events: &["EventsSent", "BytesSent"], // EventsReceived is emitted in the topology + tagged_counters: &[ + "component_sent_bytes_total", + ], + untagged_counters: &[ + "component_sent_events_total", + "component_sent_event_bytes_total", + ], + }; } impl ComponentTests { @@ -145,3 +158,29 @@ impl ComponentTester { } } } + +/// Convenience wrapper for running sink tests +pub async fn run_sink(sink: VectorSink, events: S, tags: &[&str]) +where + S: Stream + Send, +{ + init(); + sink.run(events).await.expect("Running sink failed"); + SINK_TESTS.assert(tags); +} + +/// Convenience wrapper for running a sink with a single event +pub async fn run_sink_event(sink: VectorSink, event: Event, tags: &[&str]) { + init(); + run_sink(sink, stream::once(std::future::ready(event)), tags).await +} + +/// Convenience wrapper for running sinks with `send_all` +pub async fn sink_send_all(sink: VectorSink, events: Vec, tags: &[&str]) { + init(); + sink.into_sink() + .send_all(&mut stream::iter(events).map(Ok)) + .await + .expect("Sending events to sink failed"); + SINK_TESTS.assert(tags); +} From e59633f335de19de08fb0aa9c8dec18ac0e71312 Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Fri, 8 Oct 2021 12:19:32 -0600 Subject: [PATCH 2/5] Fixups and more tests Signed-off-by: Bruce Guenter --- src/sinks/clickhouse.rs | 6 +----- src/sinks/datadog/events.rs | 4 +--- src/sinks/elasticsearch/mod.rs | 4 ++-- src/sinks/gcp/pubsub.rs | 2 +- src/sinks/humio/logs.rs | 15 ++++++++------- src/sinks/humio/metrics.rs | 4 ++-- src/sinks/influxdb/logs.rs | 4 ++-- src/sinks/loki.rs | 6 +++--- src/sinks/new_relic_logs.rs | 8 +++----- src/sinks/sematext/logs.rs | 4 ++-- src/sinks/splunk_hec/logs.rs | 1 - src/sinks/splunk_hec/metrics.rs | 2 -- src/test_util/components.rs | 20 ++++++++++++++++---- 13 files changed, 41 insertions(+), 39 deletions(-) diff --git a/src/sinks/clickhouse.rs b/src/sinks/clickhouse.rs index 2791bb04bcdbe..8cbbca8c09250 100644 --- a/src/sinks/clickhouse.rs +++ b/src/sinks/clickhouse.rs @@ -281,16 +281,12 @@ mod integration_tests { use crate::{ config::{log_schema, SinkConfig, SinkContext}, sinks::util::encoding::TimestampFormat, - test_util::{ - components::{self, SINK_TESTS}, - random_string, trace_init, - }, + test_util::{components, random_string, trace_init}, }; use futures::{future, stream}; use serde_json::Value; use std::{ convert::Infallible, - future::ready, net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, diff --git a/src/sinks/datadog/events.rs b/src/sinks/datadog/events.rs index dbb41c4e1d87a..62d56ed869c27 100644 --- a/src/sinks/datadog/events.rs +++ b/src/sinks/datadog/events.rs @@ -398,9 +398,7 @@ mod tests { Ok(e) }); - components::init(); - let _ = sink.into_sink().send_all(&mut events).await.unwrap(); - components::SINK_TESTS.assert(&SINK_TAGS); + components::sink_send_stream(sink, events, &SINK_TAGS).await; let output = rx.take(expected.len()).collect::>().await; for (i, val) in output.iter().enumerate() { diff --git a/src/sinks/elasticsearch/mod.rs b/src/sinks/elasticsearch/mod.rs index cd0fd735d8e34..9f10c278f1775 100644 --- a/src/sinks/elasticsearch/mod.rs +++ b/src/sinks/elasticsearch/mod.rs @@ -1111,11 +1111,11 @@ mod integration_tests { tls::{self, TlsOptions}, }; use chrono::Utc; - use futures::{stream, StreamExt}; + use futures::StreamExt; use http::{Request, StatusCode}; use hyper::Body; use serde_json::{json, Value}; - use std::{fs::File, future::ready, io::Read}; + use std::{fs::File, io::Read}; use vector_core::event::{BatchNotifier, BatchStatus, LogEvent}; const SINK_TAGS: [&str; 1] = ["endpoint"]; diff --git a/src/sinks/gcp/pubsub.rs b/src/sinks/gcp/pubsub.rs index 599a0b6122634..ee4299785e233 100644 --- a/src/sinks/gcp/pubsub.rs +++ b/src/sinks/gcp/pubsub.rs @@ -255,7 +255,7 @@ mod integration_tests { let (batch, mut receiver) = BatchNotifier::new_with_receiver(); let (input, events) = random_events_with_stream(100, 100, Some(batch)); - components::run_sink(sink, events, &["endpoint"]); + components::run_sink(sink, events, &["endpoint"]).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); let response = pull_messages(&subscription, 1000).await; diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index 4910e6bd65d31..7948e6c8c3506 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -150,17 +150,18 @@ mod integration_tests { config::{log_schema, SinkConfig, SinkContext}, event::Event, sinks::util::Compression, - test_util::random_string, + test_util::{components, random_string}, }; use chrono::Utc; - use futures::stream; use indoc::indoc; use serde_json::{json, Value as JsonValue}; - use std::{collections::HashMap, convert::TryFrom, future::ready}; + use std::{collections::HashMap, convert::TryFrom}; // matches humio container address const HOST: &str = "http://localhost:8080"; + const SINK_TAGS: [&str; 1] = ["endpoint"]; + #[tokio::test] async fn humio_insert_message() { let cx = SinkContext::new_test(); @@ -177,7 +178,7 @@ mod integration_tests { let log = event.as_mut_log(); log.insert(log_schema().host_key(), host.clone()); - sink.run(stream::once(ready(event))).await.unwrap(); + components::run_sink_event(sink, event, &SINK_TAGS).await; let entry = find_entry(repo.name.as_str(), message.as_str()).await; @@ -213,7 +214,7 @@ mod integration_tests { let message = random_string(100); let event = Event::from(message.clone()); - sink.run(stream::once(ready(event))).await.unwrap(); + components::run_sink_event(sink, event, &SINK_TAGS).await; let entry = find_entry(repo.name.as_str(), message.as_str()).await; @@ -246,7 +247,7 @@ mod integration_tests { .as_mut_log() .insert("@timestamp", Utc::now().to_rfc3339()); - sink.run(stream::once(ready(event))).await.unwrap(); + components::run_sink_event(sink, event, &SINK_TAGS).await; let entry = find_entry(repo.name.as_str(), message.as_str()).await; @@ -269,7 +270,7 @@ mod integration_tests { let message = random_string(100); let event = Event::from(message.clone()); - sink.run(stream::once(ready(event))).await.unwrap(); + components::run_sink_event(sink, event, &SINK_TAGS).await; let entry = find_entry(repo.name.as_str(), message.as_str()).await; diff --git a/src/sinks/humio/metrics.rs b/src/sinks/humio/metrics.rs index 18519d331bfba..3fe142a13ca9a 100644 --- a/src/sinks/humio/metrics.rs +++ b/src/sinks/humio/metrics.rs @@ -116,7 +116,7 @@ mod tests { Event, Metric, }, sinks::util::test::{build_test_server, load_sink}, - test_util, + test_util::{self, components}, }; use chrono::{offset::TimeZone, Utc}; use indoc::indoc; @@ -203,7 +203,7 @@ mod tests { ]; let len = metrics.len(); - let _ = sink.run(stream::iter(metrics)).await.unwrap(); + components::run_sink(sink, stream::iter(metrics), &["endpoint"]).await; let output = rx.take(len).collect::>().await; assert_eq!( diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index 8be9b3e73020c..a9d66124af0c8 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -248,7 +248,7 @@ mod tests { use indoc::indoc; use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; - const SINK_TAGS: [&str; 1] = ["endpoint"]; + pub(super) const SINK_TAGS: [&str; 1] = ["endpoint"]; type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>; @@ -715,7 +715,7 @@ mod integration_tests { let events = vec![Event::Log(event1), Event::Log(event2)]; - components::run_sink(sink, stream::iter(events), &SINK_TAGS).await; + components::run_sink(sink, stream::iter(events), &tests::SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); diff --git a/src/sinks/loki.rs b/src/sinks/loki.rs index 2ab1123b74367..9b65202c796d3 100644 --- a/src/sinks/loki.rs +++ b/src/sinks/loki.rs @@ -536,8 +536,8 @@ mod integration_tests { }; use bytes::Bytes; use chrono::{DateTime, Duration, Utc}; - use futures::{stream, StreamExt}; use std::convert::TryFrom; + use std::sync::Arc; use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; const SINK_TAGS: [&str; 1] = ["endpoint"]; @@ -571,7 +571,7 @@ mod integration_tests { (stream, sink) } - fn add_batch_notifier(events: &[Event], batch: BatchNotifier) -> Vec { + fn add_batch_notifier(events: &[Event], batch: Arc) -> Vec { events .iter() .map(|event| event.clone().into_log().with_batch_notifier(&batch).into()) @@ -589,7 +589,7 @@ mod integration_tests { .clone() .into_iter() .map(move |line| Event::from(LogEvent::from(line).with_batch_notifier(&batch))); - components::sink_send_all(sink, events, &SINK_TAGS); + components::sink_send_all(sink, events, &SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); let (_, outputs) = fetch_stream(stream.to_string(), "default").await; diff --git a/src/sinks/new_relic_logs.rs b/src/sinks/new_relic_logs.rs index ba1d4175e8f8a..6af7e486460f8 100644 --- a/src/sinks/new_relic_logs.rs +++ b/src/sinks/new_relic_logs.rs @@ -176,7 +176,7 @@ mod tests { encoding::EncodingConfiguration, service::RATE_LIMIT_NUM_DEFAULT, test::build_test_server, Concurrency, }, - test_util::next_addr, + test_util::{components, next_addr}, }; use bytes::Buf; use futures::{stream, StreamExt}; @@ -335,14 +335,12 @@ mod tests { let (sink, _healthcheck) = http_config.build(SinkContext::new_test()).await.unwrap(); let (rx, trigger, server) = build_test_server(in_addr); + tokio::spawn(server); let input_lines = (0..100).map(|i| format!("msg {}", i)).collect::>(); let events = stream::iter(input_lines.clone()).map(Event::from); - let pump = sink.run(events); - - tokio::spawn(server); - pump.await.unwrap(); + components::run_sink(sink, events, &["endpoint"]).await; drop(trigger); let output_lines = rx diff --git a/src/sinks/sematext/logs.rs b/src/sinks/sematext/logs.rs index 4f9e36b9e5f3e..a527fa947a37c 100644 --- a/src/sinks/sematext/logs.rs +++ b/src/sinks/sematext/logs.rs @@ -116,7 +116,7 @@ mod tests { use crate::{ config::SinkConfig, sinks::util::test::{build_test_server, load_sink}, - test_util::{next_addr, random_lines_with_stream}, + test_util::{components, next_addr, random_lines_with_stream}, }; use futures::StreamExt; use indoc::indoc; @@ -149,7 +149,7 @@ mod tests { tokio::spawn(server); let (expected, events) = random_lines_with_stream(100, 10, None); - sink.run(events).await.unwrap(); + components::run_sink(sink, events, &["endpoint"]).await; let output = rx.next().await.unwrap(); diff --git a/src/sinks/splunk_hec/logs.rs b/src/sinks/splunk_hec/logs.rs index dad1c0ca67dbd..843723c28f0a4 100644 --- a/src/sinks/splunk_hec/logs.rs +++ b/src/sinks/splunk_hec/logs.rs @@ -485,7 +485,6 @@ mod integration_tests { let (messages, events) = random_lines_with_stream(100, 10, None); components::run_sink(sink, events, &SINK_TAGS).await; - sink.run(events).await.unwrap(); let mut found_all = false; for _ in 0..20 { diff --git a/src/sinks/splunk_hec/metrics.rs b/src/sinks/splunk_hec/metrics.rs index 785ec40ab16e8..d077ec117548b 100644 --- a/src/sinks/splunk_hec/metrics.rs +++ b/src/sinks/splunk_hec/metrics.rs @@ -573,11 +573,9 @@ mod integration_tests { sinks::splunk_hec::conn::integration_test_helpers::get_token, test_util::components, }; - use futures::stream; use serde_json::Value as JsonValue; use shared::btreemap; use std::convert::TryFrom; - use std::future::ready; use vector_core::event::{BatchNotifier, BatchStatus}; const USERNAME: &str = "admin"; const PASSWORD: &str = "password"; diff --git a/src/test_util/components.rs b/src/test_util/components.rs index 066ba739387b9..74d16c224460c 100644 --- a/src/test_util/components.rs +++ b/src/test_util/components.rs @@ -8,7 +8,7 @@ use crate::event::{Event, Metric, MetricValue}; use crate::metrics::{self, Controller}; use crate::sinks::VectorSink; -use futures::{stream, SinkExt, Stream, StreamExt}; +use futures::{stream, SinkExt, Stream}; use lazy_static::lazy_static; use std::cell::RefCell; use std::collections::HashSet; @@ -176,11 +176,23 @@ pub async fn run_sink_event(sink: VectorSink, event: Event, tags: &[&str]) { } /// Convenience wrapper for running sinks with `send_all` -pub async fn sink_send_all(sink: VectorSink, events: Vec, tags: &[&str]) { +pub async fn sink_send_all(sink: VectorSink, events: I, tags: &[&str]) +where + I: IntoIterator, + I::IntoIter: Send, +{ + sink_send_stream(sink, stream::iter(events.into_iter().map(Ok)), tags).await +} + +/// Convenience wrapper for running sinks with a stream of events +pub async fn sink_send_stream(sink: VectorSink, mut events: S, tags: &[&str]) +where + S: Stream> + Send + Unpin, +{ init(); sink.into_sink() - .send_all(&mut stream::iter(events).map(Ok)) + .send_all(&mut events) .await - .expect("Sending events to sink failed"); + .expect("Sending event stream to sink failed"); SINK_TESTS.assert(tags); } From 917effd5be6d61469f1b916b3973f0094049a79b Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Fri, 8 Oct 2021 14:06:39 -0600 Subject: [PATCH 3/5] Fix unneeded mut Signed-off-by: Bruce Guenter --- src/sinks/datadog/events.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/datadog/events.rs b/src/sinks/datadog/events.rs index 62d56ed869c27..dc3790f0cffef 100644 --- a/src/sinks/datadog/events.rs +++ b/src/sinks/datadog/events.rs @@ -391,7 +391,7 @@ mod tests { let (expected, events) = random_events_with_stream(100, 10, None); - let mut events = events.map(|mut e| { + let events = events.map(|mut e| { e.as_mut_log() .metadata_mut() .set_datadog_api_key(Some(Arc::from("from_metadata"))); From da7b81e983ad10bc47a517ecd2849905a338b16e Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Wed, 13 Oct 2021 09:24:31 -0600 Subject: [PATCH 4/5] Centralize the HTTP sink tags array Signed-off-by: Bruce Guenter --- src/sinks/clickhouse.rs | 13 ++++++------- src/sinks/datadog/events.rs | 9 ++++----- src/sinks/elasticsearch/mod.rs | 11 +++++------ src/sinks/gcp/pubsub.rs | 5 +++-- src/sinks/http.rs | 4 ++-- src/sinks/humio/logs.rs | 12 +++++------- src/sinks/humio/metrics.rs | 4 ++-- src/sinks/influxdb/logs.rs | 10 ++++------ src/sinks/logdna.rs | 5 +++-- src/sinks/loki.rs | 20 +++++++++----------- src/sinks/new_relic_logs.rs | 4 ++-- src/sinks/sematext/logs.rs | 5 +++-- src/sinks/splunk_hec/logs.rs | 23 +++++++++++------------ src/sinks/splunk_hec/metrics.rs | 8 +++----- src/test_util/components.rs | 3 +++ 15 files changed, 65 insertions(+), 71 deletions(-) diff --git a/src/sinks/clickhouse.rs b/src/sinks/clickhouse.rs index 8cbbca8c09250..974e674a316d0 100644 --- a/src/sinks/clickhouse.rs +++ b/src/sinks/clickhouse.rs @@ -281,7 +281,8 @@ mod integration_tests { use crate::{ config::{log_schema, SinkConfig, SinkContext}, sinks::util::encoding::TimestampFormat, - test_util::{components, random_string, trace_init}, + test_util::components::{self, HTTP_SINK_TAGS}, + test_util::{random_string, trace_init}, }; use futures::{future, stream}; use serde_json::Value; @@ -297,8 +298,6 @@ mod integration_tests { use vector_core::event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, LogEvent}; use warp::Filter; - const SINK_TAGS: [&str; 1] = ["endpoint"]; - #[tokio::test] async fn insert_events() { trace_init(); @@ -336,7 +335,7 @@ mod integration_tests { .as_mut_log() .insert("items", vec!["item1", "item2"]); - components::run_sink_event(sink, input_event.clone(), &SINK_TAGS).await; + components::run_sink_event(sink, input_event.clone(), &HTTP_SINK_TAGS).await; let output = client.select_all(&table).await; assert_eq!(1, output.rows); @@ -380,7 +379,7 @@ mod integration_tests { let (mut input_event, mut receiver) = make_event(); input_event.as_mut_log().insert("unknown", "mysteries"); - components::run_sink_event(sink, input_event.clone(), &SINK_TAGS).await; + components::run_sink_event(sink, input_event.clone(), &HTTP_SINK_TAGS).await; let output = client.select_all(&table).await; assert_eq!(1, output.rows); @@ -431,7 +430,7 @@ mod integration_tests { let (mut input_event, _receiver) = make_event(); - components::run_sink_event(sink, input_event.clone(), &SINK_TAGS).await; + components::run_sink_event(sink, input_event.clone(), &HTTP_SINK_TAGS).await; let output = client.select_all(&table).await; assert_eq!(1, output.rows); @@ -488,7 +487,7 @@ timestamp_format = "unix""#, let (mut input_event, _receiver) = make_event(); - components::run_sink_event(sink, input_event.clone(), &SINK_TAGS).await; + components::run_sink_event(sink, input_event.clone(), &HTTP_SINK_TAGS).await; let output = client.select_all(&table).await; assert_eq!(1, output.rows); diff --git a/src/sinks/datadog/events.rs b/src/sinks/datadog/events.rs index dc3790f0cffef..333794b1408e4 100644 --- a/src/sinks/datadog/events.rs +++ b/src/sinks/datadog/events.rs @@ -268,7 +268,8 @@ mod tests { use crate::{ config::SinkConfig, sinks::util::test::{build_test_server_status, load_sink}, - test_util::{components, next_addr, random_lines_with_stream}, + test_util::components::{self, HTTP_SINK_TAGS}, + test_util::{next_addr, random_lines_with_stream}, }; use bytes::Bytes; use futures::{ @@ -281,8 +282,6 @@ mod tests { use pretty_assertions::assert_eq; use vector_core::event::{BatchNotifier, BatchStatus}; - const SINK_TAGS: [&str; 1] = ["endpoint"]; - #[test] fn generate_config() { crate::test_util::test_generate_config::(); @@ -330,7 +329,7 @@ mod tests { components::init(); sink.run(events).await.unwrap(); if batch_status == BatchStatus::Delivered { - components::SINK_TESTS.assert(&SINK_TAGS); + components::SINK_TESTS.assert(&HTTP_SINK_TAGS); } assert_eq!(receiver.try_recv(), Ok(batch_status)); @@ -398,7 +397,7 @@ mod tests { Ok(e) }); - components::sink_send_stream(sink, events, &SINK_TAGS).await; + components::sink_send_stream(sink, events, &HTTP_SINK_TAGS).await; let output = rx.take(expected.len()).collect::>().await; for (i, val) in output.iter().enumerate() { diff --git a/src/sinks/elasticsearch/mod.rs b/src/sinks/elasticsearch/mod.rs index 9f10c278f1775..5dce633782323 100644 --- a/src/sinks/elasticsearch/mod.rs +++ b/src/sinks/elasticsearch/mod.rs @@ -1107,7 +1107,8 @@ mod integration_tests { config::{ProxyConfig, SinkConfig, SinkContext}, http::HttpClient, sinks::HealthcheckError, - test_util::{components, random_events_with_stream, random_string, trace_init}, + test_util::components::{self, HTTP_SINK_TAGS}, + test_util::{random_events_with_stream, random_string, trace_init}, tls::{self, TlsOptions}, }; use chrono::Utc; @@ -1118,8 +1119,6 @@ mod integration_tests { use std::{fs::File, io::Read}; use vector_core::event::{BatchNotifier, BatchStatus, LogEvent}; - const SINK_TAGS: [&str; 1] = ["endpoint"]; - impl ElasticSearchCommon { async fn flush_request(&self) -> crate::Result<()> { let url = format!("{}/_flush", self.base_url) @@ -1231,7 +1230,7 @@ mod integration_tests { let timestamp = input_event[crate::config::log_schema().timestamp_key()].clone(); - components::run_sink_event(sink, input_event.into(), &SINK_TAGS).await; + components::run_sink_event(sink, input_event.into(), &HTTP_SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); @@ -1455,11 +1454,11 @@ mod integration_tests { doit = true; event }), - &SINK_TAGS, + &HTTP_SINK_TAGS, ) .await; } else { - components::run_sink(sink, events, &SINK_TAGS).await; + components::run_sink(sink, events, &HTTP_SINK_TAGS).await; } assert_eq!(receiver.try_recv(), Ok(batch_status)); diff --git a/src/sinks/gcp/pubsub.rs b/src/sinks/gcp/pubsub.rs index ee4299785e233..21db250bbd2ff 100644 --- a/src/sinks/gcp/pubsub.rs +++ b/src/sinks/gcp/pubsub.rs @@ -221,7 +221,8 @@ mod tests { #[cfg(feature = "gcp-pubsub-integration-tests")] mod integration_tests { use super::*; - use crate::test_util::{components, random_events_with_stream, random_string, trace_init}; + use crate::test_util::components::{self, HTTP_SINK_TAGS}; + use crate::test_util::{random_events_with_stream, random_string, trace_init}; use reqwest::{Client, Method, Response}; use serde_json::{json, Value}; use vector_core::event::{BatchNotifier, BatchStatus}; @@ -255,7 +256,7 @@ mod integration_tests { let (batch, mut receiver) = BatchNotifier::new_with_receiver(); let (input, events) = random_events_with_stream(100, 100, Some(batch)); - components::run_sink(sink, events, &["endpoint"]).await; + components::run_sink(sink, events, &HTTP_SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); let response = pull_messages(&subscription, 1000).await; diff --git a/src/sinks/http.rs b/src/sinks/http.rs index 150ea9e952e04..26f9dbef7e693 100644 --- a/src/sinks/http.rs +++ b/src/sinks/http.rs @@ -312,7 +312,7 @@ mod tests { test::{build_test_server, build_test_server_generic, build_test_server_status}, }, }, - test_util::{components, next_addr, random_lines_with_stream}, + test_util::{components, components::HTTP_SINK_TAGS, next_addr, random_lines_with_stream}, }; use bytes::{Buf, Bytes}; use flate2::read::MultiGzDecoder; @@ -668,7 +668,7 @@ mod tests { let (batch, mut receiver) = BatchNotifier::new_with_receiver(); let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); - components::run_sink(sink, events, &["endpoint"]).await; + components::run_sink(sink, events, &HTTP_SINK_TAGS).await; drop(trigger); assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index 7948e6c8c3506..315151eb9ff79 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -150,7 +150,7 @@ mod integration_tests { config::{log_schema, SinkConfig, SinkContext}, event::Event, sinks::util::Compression, - test_util::{components, random_string}, + test_util::{components, components::HTTP_SINK_TAGS, random_string}, }; use chrono::Utc; use indoc::indoc; @@ -160,8 +160,6 @@ mod integration_tests { // matches humio container address const HOST: &str = "http://localhost:8080"; - const SINK_TAGS: [&str; 1] = ["endpoint"]; - #[tokio::test] async fn humio_insert_message() { let cx = SinkContext::new_test(); @@ -178,7 +176,7 @@ mod integration_tests { let log = event.as_mut_log(); log.insert(log_schema().host_key(), host.clone()); - components::run_sink_event(sink, event, &SINK_TAGS).await; + components::run_sink_event(sink, event, &HTTP_SINK_TAGS).await; let entry = find_entry(repo.name.as_str(), message.as_str()).await; @@ -214,7 +212,7 @@ mod integration_tests { let message = random_string(100); let event = Event::from(message.clone()); - components::run_sink_event(sink, event, &SINK_TAGS).await; + components::run_sink_event(sink, event, &HTTP_SINK_TAGS).await; let entry = find_entry(repo.name.as_str(), message.as_str()).await; @@ -247,7 +245,7 @@ mod integration_tests { .as_mut_log() .insert("@timestamp", Utc::now().to_rfc3339()); - components::run_sink_event(sink, event, &SINK_TAGS).await; + components::run_sink_event(sink, event, &HTTP_SINK_TAGS).await; let entry = find_entry(repo.name.as_str(), message.as_str()).await; @@ -270,7 +268,7 @@ mod integration_tests { let message = random_string(100); let event = Event::from(message.clone()); - components::run_sink_event(sink, event, &SINK_TAGS).await; + components::run_sink_event(sink, event, &HTTP_SINK_TAGS).await; let entry = find_entry(repo.name.as_str(), message.as_str()).await; diff --git a/src/sinks/humio/metrics.rs b/src/sinks/humio/metrics.rs index 3fe142a13ca9a..f126d718bc10b 100644 --- a/src/sinks/humio/metrics.rs +++ b/src/sinks/humio/metrics.rs @@ -116,7 +116,7 @@ mod tests { Event, Metric, }, sinks::util::test::{build_test_server, load_sink}, - test_util::{self, components}, + test_util::{self, components, components::HTTP_SINK_TAGS}, }; use chrono::{offset::TimeZone, Utc}; use indoc::indoc; @@ -203,7 +203,7 @@ mod tests { ]; let len = metrics.len(); - components::run_sink(sink, stream::iter(metrics), &["endpoint"]).await; + components::run_sink(sink, stream::iter(metrics), &HTTP_SINK_TAGS).await; let output = rx.take(len).collect::>().await; assert_eq!( diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index a9d66124af0c8..e3588ca984da4 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -240,7 +240,7 @@ mod tests { http::HttpSink, test::{build_test_server_status, load_sink}, }, - test_util::{components, next_addr}, + test_util::{components, components::HTTP_SINK_TAGS, next_addr}, }; use chrono::{offset::TimeZone, Utc}; use futures::{channel::mpsc, stream, StreamExt}; @@ -248,8 +248,6 @@ mod tests { use indoc::indoc; use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; - pub(super) const SINK_TAGS: [&str; 1] = ["endpoint"]; - type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>; #[test] @@ -596,7 +594,7 @@ mod tests { components::init(); sink.run(stream::iter(events)).await.unwrap(); if batch_status == BatchStatus::Delivered { - components::SINK_TESTS.assert(&SINK_TAGS); + components::SINK_TESTS.assert(&HTTP_SINK_TAGS); } assert_eq!(receiver.try_recv(), Ok(batch_status)); @@ -669,7 +667,7 @@ mod integration_tests { test_util::{onboarding_v2, BUCKET, ORG, TOKEN}, InfluxDb2Settings, }, - test_util::components, + test_util::components::{self, HTTP_SINK_TAGS}, }; use chrono::Utc; use futures::stream; @@ -715,7 +713,7 @@ mod integration_tests { let events = vec![Event::Log(event1), Event::Log(event2)]; - components::run_sink(sink, stream::iter(events), &tests::SINK_TAGS).await; + components::run_sink(sink, stream::iter(events), &HTTP_SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); diff --git a/src/sinks/logdna.rs b/src/sinks/logdna.rs index ecb4934a70cac..546b63429b79d 100644 --- a/src/sinks/logdna.rs +++ b/src/sinks/logdna.rs @@ -296,7 +296,8 @@ mod tests { use crate::{ config::SinkConfig, sinks::util::test::{build_test_server_status, load_sink}, - test_util::{components, next_addr, random_lines, trace_init}, + test_util::components::{self, HTTP_SINK_TAGS}, + test_util::{next_addr, random_lines, trace_init}, }; use futures::{channel::mpsc, stream, StreamExt}; use http::{request::Parts, StatusCode}; @@ -404,7 +405,7 @@ mod tests { components::init(); sink.run(stream::iter(events)).await.unwrap(); if batch_status == BatchStatus::Delivered { - components::SINK_TESTS.assert(&["endpoint"]); + components::SINK_TESTS.assert(&HTTP_SINK_TAGS); } assert_eq!(receiver.try_recv(), Ok(batch_status)); diff --git a/src/sinks/loki.rs b/src/sinks/loki.rs index 9b65202c796d3..e1d76ab1afaaa 100644 --- a/src/sinks/loki.rs +++ b/src/sinks/loki.rs @@ -532,7 +532,7 @@ mod integration_tests { sinks::util::test::load_sink, sinks::VectorSink, template::Template, - test_util::{components, random_lines}, + test_util::{components, components::HTTP_SINK_TAGS, random_lines}, }; use bytes::Bytes; use chrono::{DateTime, Duration, Utc}; @@ -540,8 +540,6 @@ mod integration_tests { use std::sync::Arc; use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; - const SINK_TAGS: [&str; 1] = ["endpoint"]; - async fn build_sink(encoding: &str) -> (uuid::Uuid, VectorSink) { let stream = uuid::Uuid::new_v4(); @@ -589,7 +587,7 @@ mod integration_tests { .clone() .into_iter() .map(move |line| Event::from(LogEvent::from(line).with_batch_notifier(&batch))); - components::sink_send_all(sink, events, &SINK_TAGS).await; + components::sink_send_all(sink, events, &HTTP_SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); let (_, outputs) = fetch_stream(stream.to_string(), "default").await; @@ -608,7 +606,7 @@ mod integration_tests { .map(Event::from) .collect::>(); let (batch, mut receiver) = BatchNotifier::new_with_receiver(); - components::sink_send_all(sink, add_batch_notifier(&events, batch), &SINK_TAGS).await; + components::sink_send_all(sink, add_batch_notifier(&events, batch), &HTTP_SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); let (_, outputs) = fetch_stream(stream.to_string(), "default").await; @@ -634,7 +632,7 @@ mod integration_tests { }) .collect::>(); let (batch, mut receiver) = BatchNotifier::new_with_receiver(); - components::sink_send_all(sink, add_batch_notifier(&events, batch), &SINK_TAGS).await; + components::sink_send_all(sink, add_batch_notifier(&events, batch), &HTTP_SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); let (_, outputs) = fetch_stream(stream.to_string(), "default").await; @@ -654,7 +652,7 @@ mod integration_tests { .map(Event::from) .collect::>(); let (batch, mut receiver) = BatchNotifier::new_with_receiver(); - components::sink_send_all(sink, add_batch_notifier(&events, batch), &SINK_TAGS).await; + components::sink_send_all(sink, add_batch_notifier(&events, batch), &HTTP_SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); let (_, outputs) = fetch_stream(stream.to_string(), "default").await; @@ -701,7 +699,7 @@ mod integration_tests { } } - components::sink_send_all(sink, events, &SINK_TAGS).await; + components::sink_send_all(sink, events, &HTTP_SINK_TAGS).await; let (_, outputs1) = fetch_stream(stream1.to_string(), "default").await; let (_, outputs2) = fetch_stream(stream2.to_string(), "default").await; @@ -752,7 +750,7 @@ mod integration_tests { event.as_mut_log().insert("stream_key", "test_name"); } - components::sink_send_all(sink, events, &SINK_TAGS).await; + components::sink_send_all(sink, events, &HTTP_SINK_TAGS).await; let (_, outputs) = fetch_stream(stream.to_string(), "default").await; @@ -805,7 +803,7 @@ mod integration_tests { } } - components::sink_send_all(sink, events, &SINK_TAGS).await; + components::sink_send_all(sink, events, &HTTP_SINK_TAGS).await; let (_, outputs1) = fetch_stream(stream.to_string(), "tenant1").await; let (_, outputs2) = fetch_stream(stream.to_string(), "tenant2").await; @@ -945,7 +943,7 @@ mod integration_tests { config.batch.max_bytes = Some(4_000_000); let (sink, _) = config.build(cx).await.unwrap(); - components::sink_send_all(sink, events.clone(), &SINK_TAGS).await; + components::sink_send_all(sink, events.clone(), &HTTP_SINK_TAGS).await; let (timestamps, outputs) = fetch_stream(stream.to_string(), "default").await; assert_eq!(expected.len(), outputs.len()); diff --git a/src/sinks/new_relic_logs.rs b/src/sinks/new_relic_logs.rs index 6af7e486460f8..cc231f965357d 100644 --- a/src/sinks/new_relic_logs.rs +++ b/src/sinks/new_relic_logs.rs @@ -176,7 +176,7 @@ mod tests { encoding::EncodingConfiguration, service::RATE_LIMIT_NUM_DEFAULT, test::build_test_server, Concurrency, }, - test_util::{components, next_addr}, + test_util::{components, components::HTTP_SINK_TAGS, next_addr}, }; use bytes::Buf; use futures::{stream, StreamExt}; @@ -340,7 +340,7 @@ mod tests { let input_lines = (0..100).map(|i| format!("msg {}", i)).collect::>(); let events = stream::iter(input_lines.clone()).map(Event::from); - components::run_sink(sink, events, &["endpoint"]).await; + components::run_sink(sink, events, &HTTP_SINK_TAGS).await; drop(trigger); let output_lines = rx diff --git a/src/sinks/sematext/logs.rs b/src/sinks/sematext/logs.rs index a527fa947a37c..43451b7f927d5 100644 --- a/src/sinks/sematext/logs.rs +++ b/src/sinks/sematext/logs.rs @@ -116,7 +116,8 @@ mod tests { use crate::{ config::SinkConfig, sinks::util::test::{build_test_server, load_sink}, - test_util::{components, next_addr, random_lines_with_stream}, + test_util::components::{self, HTTP_SINK_TAGS}, + test_util::{next_addr, random_lines_with_stream}, }; use futures::StreamExt; use indoc::indoc; @@ -149,7 +150,7 @@ mod tests { tokio::spawn(server); let (expected, events) = random_lines_with_stream(100, 10, None); - components::run_sink(sink, events, &["endpoint"]).await; + components::run_sink(sink, events, &HTTP_SINK_TAGS).await; let output = rx.next().await.unwrap(); diff --git a/src/sinks/splunk_hec/logs.rs b/src/sinks/splunk_hec/logs.rs index 843723c28f0a4..6808c6ab18334 100644 --- a/src/sinks/splunk_hec/logs.rs +++ b/src/sinks/splunk_hec/logs.rs @@ -350,7 +350,8 @@ mod integration_tests { use crate::{ config::{SinkConfig, SinkContext}, sinks::splunk_hec::conn::integration_test_helpers::get_token, - test_util::{components, random_lines_with_stream, random_string}, + test_util::components::{self, HTTP_SINK_TAGS}, + test_util::{random_lines_with_stream, random_string}, }; use futures::stream; use serde_json::Value as JsonValue; @@ -362,8 +363,6 @@ mod integration_tests { const USERNAME: &str = "admin"; const PASSWORD: &str = "password"; - const SINK_TAGS: [&str; 1] = ["endpoint"]; - // It usually takes ~1 second for the event to show up in search, so poll until // we see it. async fn find_entry(message: &str) -> serde_json::value::Value { @@ -393,7 +392,7 @@ mod integration_tests { .with_batch_notifier(&batch) .into(); drop(batch); - components::run_sink_event(sink, event, &SINK_TAGS).await; + components::run_sink_event(sink, event, &HTTP_SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); let entry = find_entry(message.as_str()).await; @@ -431,7 +430,7 @@ mod integration_tests { let message = random_string(100); let event = Event::from(message.clone()); - components::run_sink_event(sink, event, &SINK_TAGS).await; + components::run_sink_event(sink, event, &HTTP_SINK_TAGS).await; let entry = find_entry(message.as_str()).await; @@ -448,7 +447,7 @@ mod integration_tests { let message = random_string(100); let event = Event::from(message.clone()); - components::run_sink_event(sink, event, &SINK_TAGS).await; + components::run_sink_event(sink, event, &HTTP_SINK_TAGS).await; let entry = find_entry(message.as_str()).await; @@ -468,7 +467,7 @@ mod integration_tests { let message = random_string(100); let mut event = Event::from(message.clone()); event.as_mut_log().insert("index_name", "custom_index"); - components::run_sink_event(sink, event, &SINK_TAGS).await; + components::run_sink_event(sink, event, &HTTP_SINK_TAGS).await; let entry = find_entry(message.as_str()).await; @@ -484,7 +483,7 @@ mod integration_tests { let (sink, _) = config.build(cx).await.unwrap(); let (messages, events) = random_lines_with_stream(100, 10, None); - components::run_sink(sink, events, &SINK_TAGS).await; + components::run_sink(sink, events, &HTTP_SINK_TAGS).await; let mut found_all = false; for _ in 0..20 { @@ -517,7 +516,7 @@ mod integration_tests { let message = random_string(100); let mut event = Event::from(message.clone()); event.as_mut_log().insert("asdf", "hello"); - components::run_sink_event(sink, event, &SINK_TAGS).await; + components::run_sink_event(sink, event, &HTTP_SINK_TAGS).await; let entry = find_entry(message.as_str()).await; @@ -538,7 +537,7 @@ mod integration_tests { let mut event = Event::from(message.clone()); event.as_mut_log().insert("asdf", "hello"); event.as_mut_log().insert("host", "example.com:1234"); - components::run_sink_event(sink, event, &SINK_TAGS).await; + components::run_sink_event(sink, event, &HTTP_SINK_TAGS).await; let entry = find_entry(message.as_str()).await; @@ -562,7 +561,7 @@ mod integration_tests { let message = random_string(100); let mut event = Event::from(message.clone()); event.as_mut_log().insert("asdf", "hello"); - components::run_sink_event(sink, event, &SINK_TAGS).await; + components::run_sink_event(sink, event, &HTTP_SINK_TAGS).await; let entry = find_entry(message.as_str()).await; @@ -589,7 +588,7 @@ mod integration_tests { event.as_mut_log().insert("asdf", "hello"); event.as_mut_log().insert("host", "example.com:1234"); event.as_mut_log().insert("roast", "beef.example.com:1234"); - components::run_sink_event(sink, event, &SINK_TAGS).await; + components::run_sink_event(sink, event, &HTTP_SINK_TAGS).await; let entry = find_entry(message.as_str()).await; diff --git a/src/sinks/splunk_hec/metrics.rs b/src/sinks/splunk_hec/metrics.rs index d077ec117548b..1f85bbad8f79c 100644 --- a/src/sinks/splunk_hec/metrics.rs +++ b/src/sinks/splunk_hec/metrics.rs @@ -571,7 +571,7 @@ mod integration_tests { config::{SinkConfig, SinkContext}, event::{Metric, MetricKind}, sinks::splunk_hec::conn::integration_test_helpers::get_token, - test_util::components, + test_util::components::{self, HTTP_SINK_TAGS}, }; use serde_json::Value as JsonValue; use shared::btreemap; @@ -580,8 +580,6 @@ mod integration_tests { const USERNAME: &str = "admin"; const PASSWORD: &str = "password"; - const SINK_TAGS: [&str; 1] = ["endpoint"]; - #[tokio::test] async fn splunk_insert_counter_metric() { let cx = SinkContext::new_test(); @@ -602,7 +600,7 @@ mod integration_tests { .with_batch_notifier(&batch) .into(); drop(batch); - components::run_sink_event(sink, event, &SINK_TAGS).await; + components::run_sink_event(sink, event, &HTTP_SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); assert!( @@ -634,7 +632,7 @@ mod integration_tests { .with_batch_notifier(&batch) .into(); drop(batch); - components::run_sink_event(sink, event, &SINK_TAGS).await; + components::run_sink_event(sink, event, &HTTP_SINK_TAGS).await; assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); assert!( diff --git a/src/test_util/components.rs b/src/test_util/components.rs index c5377b78ed855..e600189c53e57 100644 --- a/src/test_util/components.rs +++ b/src/test_util/components.rs @@ -22,6 +22,9 @@ thread_local!( /// The standard set of tags for all `TcpSource`-based sources. pub const TCP_SOURCE_TAGS: [&str; 2] = ["peer_addr", "protocol"]; +/// The standard set of tags for all `HttpSink`-based sinks. +pub const HTTP_SINK_TAGS: [&str; 1] = ["endpoint"]; + /// This struct is used to describe a set of component tests. pub struct ComponentTests { /// The list of event (suffixes) that must be emitted by the component From dd80b6404f72d566190c8ea5a279950728b77d58 Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Wed, 13 Oct 2021 11:52:34 -0600 Subject: [PATCH 5/5] Fix missing increment in `FinalizersBatch` Signed-off-by: Bruce Guenter --- src/sinks/util/batch.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sinks/util/batch.rs b/src/sinks/util/batch.rs index 5c256120d01f1..8f5e1fec99118 100644 --- a/src/sinks/util/batch.rs +++ b/src/sinks/util/batch.rs @@ -284,6 +284,7 @@ impl Batch for FinalizersBatch { match self.inner.push(item) { PushResult::Ok(full) => { self.finalizers.merge(finalizers); + self.count += 1; self.byte_size += byte_size; PushResult::Ok(full) }