Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(file sink): supports input based on encoding type #21726

Merged
merged 5 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The file sink now supports any input event type that the configured encoding supports. It previously only supported log events.

authors: nionata
158 changes: 142 additions & 16 deletions src/sinks/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use vector_lib::{

use crate::{
codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
event::{Event, EventStatus, Finalizable},
expiring_hash_map::ExpiringHashMap,
internal_events::{
Expand Down Expand Up @@ -194,7 +194,7 @@ impl SinkConfig for FileSinkConfig {
}

fn input(&self) -> Input {
Input::new(self.encoding.config().1.input_type() & DataType::Log)
Input::new(self.encoding.config().1.input_type())
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
Expand Down Expand Up @@ -443,17 +443,23 @@ impl StreamSink<Event> for FileSink {
mod tests {
use std::convert::TryInto;

use chrono::{SubsecRound, Utc};
use futures::{stream, SinkExt};
use similar_asserts::assert_eq;
use vector_lib::{event::LogEvent, sink::VectorSink};
use vector_lib::{
codecs::JsonSerializerConfig,
event::{LogEvent, TraceEvent},
sink::VectorSink,
};

use super::*;
use crate::{
config::log_schema,
test_util::{
components::{assert_sink_compliance, FILE_SINK_TAGS},
lines_from_file, lines_from_gzip_file, lines_from_zstd_file, random_events_with_stream,
random_lines_with_stream, temp_dir, temp_file, trace_init,
random_lines_with_stream, random_metrics_with_stream,
random_metrics_with_stream_timestamp, temp_dir, temp_file, trace_init,
},
};

Expand All @@ -463,7 +469,7 @@ mod tests {
}

#[tokio::test]
async fn single_partition() {
async fn log_single_partition() {
let template = temp_file();

let config = FileSinkConfig {
Expand All @@ -480,7 +486,7 @@ mod tests {

let (input, _events) = random_lines_with_stream(100, 64, None);

run_assert_log_sink(config, input.clone()).await;
run_assert_log_sink(&config, input.clone()).await;

let output = lines_from_file(template);
for (input, output) in input.into_iter().zip(output) {
Expand All @@ -489,7 +495,7 @@ mod tests {
}

#[tokio::test]
async fn single_partition_gzip() {
async fn log_single_partition_gzip() {
let template = temp_file();

let config = FileSinkConfig {
Expand All @@ -506,7 +512,7 @@ mod tests {

let (input, _) = random_lines_with_stream(100, 64, None);

run_assert_log_sink(config, input.clone()).await;
run_assert_log_sink(&config, input.clone()).await;

let output = lines_from_gzip_file(template);
for (input, output) in input.into_iter().zip(output) {
Expand All @@ -515,7 +521,7 @@ mod tests {
}

#[tokio::test]
async fn single_partition_zstd() {
async fn log_single_partition_zstd() {
let template = temp_file();

let config = FileSinkConfig {
Expand All @@ -532,7 +538,7 @@ mod tests {

let (input, _) = random_lines_with_stream(100, 64, None);

run_assert_log_sink(config, input.clone()).await;
run_assert_log_sink(&config, input.clone()).await;

let output = lines_from_zstd_file(template);
for (input, output) in input.into_iter().zip(output) {
Expand All @@ -541,7 +547,7 @@ mod tests {
}

#[tokio::test]
async fn many_partitions() {
async fn log_many_partitions() {
let directory = temp_dir();

let mut template = directory.to_string_lossy().to_string();
Expand Down Expand Up @@ -579,7 +585,7 @@ mod tests {
input[7].as_mut_log().insert("date", "2019-29-07");
input[7].as_mut_log().insert("level", "error");

run_assert_sink(config, input.clone().into_iter()).await;
run_assert_sink(&config, input.clone().into_iter()).await;

let output = [
lines_from_file(directory.join("warnings-2019-26-07.log")),
Expand Down Expand Up @@ -626,7 +632,7 @@ mod tests {
}

#[tokio::test]
async fn reopening() {
async fn log_reopening() {
trace_init();

let template = temp_file();
Expand Down Expand Up @@ -683,17 +689,137 @@ mod tests {
sink_handle.await.unwrap();
}

async fn run_assert_log_sink(config: FileSinkConfig, events: Vec<String>) {
#[tokio::test]
async fn metric_single_partition() {
let template = temp_file();

let config = FileSinkConfig {
path: template.clone().try_into().unwrap(),
idle_timeout: default_idle_timeout(),
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::None,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: FileInternalMetricsConfig {
include_file_tag: true,
},
};

let (input, _events) = random_metrics_with_stream(100, None, None);

run_assert_sink(&config, input.clone().into_iter()).await;

let output = lines_from_file(template);
for (input, output) in input.into_iter().zip(output) {
let metric_name = input.as_metric().name();
assert!(output.contains(metric_name));
}
}

#[tokio::test]
async fn metric_many_partitions() {
let directory = temp_dir();

let format = "%Y-%m-%d-%H-%M-%S";
let mut template = directory.to_string_lossy().to_string();
template.push_str(&format!("/{}.log", format));

let config = FileSinkConfig {
path: template.try_into().unwrap(),
idle_timeout: default_idle_timeout(),
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::None,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: FileInternalMetricsConfig {
include_file_tag: true,
},
};

let metric_count = 3;
let timestamp = Utc::now().trunc_subsecs(3);
let timestamp_offset = Duration::from_secs(1);

let (input, _events) = random_metrics_with_stream_timestamp(
metric_count,
None,
None,
timestamp,
timestamp_offset,
);

run_assert_sink(&config, input.clone().into_iter()).await;

let output = (0..metric_count).map(|index| {
let expected_timestamp = timestamp + (timestamp_offset * index as u32);
let expected_filename =
directory.join(format!("{}.log", expected_timestamp.format(format)));

lines_from_file(expected_filename)
});
for (input, output) in input.iter().zip(output) {
// The format will partition by second and metrics are a second apart.
assert_eq!(
output.len(),
1,
"Expected the output file to contain one metric"
);
let output = &output[0];

let metric_name = input.as_metric().name();
assert!(output.contains(metric_name));
}
}

#[tokio::test]
async fn trace_single_partition() {
let template = temp_file();

let config = FileSinkConfig {
path: template.clone().try_into().unwrap(),
idle_timeout: default_idle_timeout(),
encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
compression: Compression::None,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: FileInternalMetricsConfig {
include_file_tag: true,
},
};

let (input, _events) = random_lines_with_stream(100, 64, None);

run_assert_trace_sink(&config, input.clone()).await;

let output = lines_from_file(template);
for (input, output) in input.iter().zip(output) {
assert!(output.contains(input));
}
}

async fn run_assert_log_sink(config: &FileSinkConfig, events: Vec<String>) {
run_assert_sink(
config,
events.into_iter().map(LogEvent::from).map(Event::Log),
)
.await;
}

async fn run_assert_sink(config: FileSinkConfig, events: impl Iterator<Item = Event> + Send) {
async fn run_assert_trace_sink(config: &FileSinkConfig, events: Vec<String>) {
run_assert_sink(
config,
events
.into_iter()
.map(LogEvent::from)
.map(TraceEvent::from)
.map(Event::Trace),
)
.await;
}

async fn run_assert_sink(config: &FileSinkConfig, events: impl Iterator<Item = Event> + Send) {
assert_sink_compliance(&FILE_SINK_TAGS, async move {
let sink = FileSink::new(&config, SinkContext::default()).unwrap();
let sink = FileSink::new(config, SinkContext::default()).unwrap();
VectorSink::from_event_streamsink(sink)
.run(Box::pin(stream::iter(events.map(Into::into))))
.await
Expand Down
32 changes: 29 additions & 3 deletions src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{
task::{ready, Context, Poll},
};

use chrono::{SubsecRound, Utc};
use chrono::{DateTime, SubsecRound, Utc};
use flate2::read::MultiGzDecoder;
use futures::{stream, task::noop_waker_ref, FutureExt, SinkExt, Stream, StreamExt, TryStreamExt};
use openssl::ssl::{SslConnector, SslFiletype, SslMethod, SslVerifyMode};
Expand Down Expand Up @@ -286,10 +286,36 @@ pub fn random_metrics_with_stream(
batch: Option<BatchNotifier>,
tags: Option<MetricTags>,
) -> (Vec<Event>, impl Stream<Item = EventArray>) {
Copy link
Contributor Author

@nionata nionata Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function should perform the exact same for existing callers as the previously hardcoded timestamp params are being passed into the new function

let timestamp = Utc::now().trunc_subsecs(3);
random_metrics_with_stream_timestamp(
count,
batch,
tags,
Utc::now().trunc_subsecs(3),
std::time::Duration::from_secs(2),
)
}

/// Generates event metrics with the provided tags and timestamp.
///
/// # Parameters
/// - `count`: the number of metrics to generate
/// - `batch`: the batch notifier to use with the stream
/// - `tags`: the tags to apply to each metric event
/// - `timestamp`: the timestamp to use for each metric event
/// - `timestamp_offset`: the offset from the `timestamp` to use for each additional metric
///
/// # Returns
/// A tuple of the generated metric events and the stream of the generated events
pub fn random_metrics_with_stream_timestamp(
count: usize,
batch: Option<BatchNotifier>,
tags: Option<MetricTags>,
timestamp: DateTime<Utc>,
timestamp_offset: std::time::Duration,
) -> (Vec<Event>, impl Stream<Item = EventArray>) {
let events: Vec<_> = (0..count)
.map(|index| {
let ts = timestamp + (std::time::Duration::from_secs(2) * index as u32);
let ts = timestamp + (timestamp_offset * index as u32);
Event::Metric(
Metric::new(
format!("counter_{}", thread_rng().gen::<u32>()),
Expand Down
Loading