Skip to content

Commit

Permalink
Merge branch 'master' into feature/LOG-18882
Browse files Browse the repository at this point in the history
This pulls in recent additions from the master branch.
  • Loading branch information
darinspivey committed Jan 3, 2024
2 parents c05f969 + 9728bcd commit d217387
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 5 deletions.
19 changes: 19 additions & 0 deletions MEZMO_CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
## [1.32.1](https://github.com/answerbook/vector/compare/v1.32.0...v1.32.1) (2023-12-22)


### Bug Fixes

* Use buffer ref to account for event size in transforms [9842893](https://github.com/answerbook/vector/commit/9842893dd0db567c2604a64c181dee3833fce55b) - Jorge Bay [LOG-18897](https://logdna.atlassian.net/browse/LOG-18897)

# [1.32.0](https://github.com/answerbook/vector/compare/v1.31.0...v1.32.0) (2023-12-21)


### Features

* **s3-sink**: file consolidation off default [fb46e73](https://github.com/answerbook/vector/commit/fb46e7359b442466069e1fc89d24254821b2a869) - dominic-mcallister-logdna [LOG-18535](https://logdna.atlassian.net/browse/LOG-18535)


### Miscellaneous

* Merge pull request #378 from answerbook/dominic/LOG-18535-defaultoff [af67c9e](https://github.com/answerbook/vector/commit/af67c9e1af1bc4bb6d5add3ea88888709f500f38) - GitHub [LOG-18535](https://logdna.atlassian.net/browse/LOG-18535)

# [1.31.0](https://github.com/answerbook/vector/compare/v1.30.0...v1.31.0) (2023-12-20)


Expand Down
8 changes: 5 additions & 3 deletions lib/vector-core/src/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,16 @@ impl TransformOutputs {
usage_tracker: &dyn OutputUsageTracker,
) -> Result<(), Box<dyn error::Error + Send + Sync>> {
if let Some(primary) = self.primary_output.as_mut() {
let send_buf = buf.primary_buffer.as_mut().expect("mismatched outputs");
Self::send_single_buffer(send_buf, primary).await?;

// Use the reference of the buffer FIRST to get the original value
// to calculate counts/sizes
let usage_profile = buf.primary_buffer.as_ref().map_or(Default::default(), |o| {
o.0.iter()
.map(|a| usage_tracker.get_size_and_profile(a))
.sum()
});

let send_buf = buf.primary_buffer.as_mut().expect("mismatched outputs");
Self::send_single_buffer(send_buf, primary).await?;
// We only want to track the primary transform output.
// Named outputs are for stuff like route/swimlanes that we don't want to track atm.
// We only want to capture the traffic of the remap transform after the node representing
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "vector",
"version": "1.31.0",
"version": "1.32.1",
"description": "Vector is a high-performance, end-to-end (agent & aggregator) observability data pipeline",
"repository": {
"type": "git",
Expand Down
10 changes: 10 additions & 0 deletions src/sinks/aws_s3/file_consolidator_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ impl FileConsolidatorAsync {
}

pub fn start(&mut self) -> bool {
// default situation so the config isn't enabled
if !self.file_consolidation_config.enabled {
return false;
}

if self.join_handle.is_some() {
info!(
message =
Expand Down Expand Up @@ -192,6 +197,11 @@ impl FileConsolidatorAsync {
}

pub fn stop(&mut self) -> bool {
// default situation so the config isn't enabled
if !self.file_consolidation_config.enabled {
return false;
}

info!(
message = "Triggering shutdown for S3 file consolidation",
bucket = self.bucket,
Expand Down
18 changes: 17 additions & 1 deletion src/sinks/aws_s3/integration_tests_mezmo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async fn s3_message_objects_not_reshaped_because_of_env() {
}

#[tokio::test]
async fn s3_file_consolidator_run() {
async fn s3_file_consolidator_enabled_run() {
let _cx = SinkContext::new_test();
let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -179,6 +179,22 @@ async fn s3_file_consolidator_run() {
assert_eq!(stopped, true, "stopped true");
}

#[tokio::test]
async fn s3_file_consolidator_disabled_run() {
let _cx = SinkContext::new_test();

// testing the default scenario where the consolidator is disabled
let mut fc: FileConsolidatorAsync = Default::default();

let started = fc.start();
assert!(!started, "started false");

thread::sleep(time::Duration::from_millis(1000));

let stopped = fc.stop();
assert!(!stopped, "stopped false");
}

#[tokio::test]
async fn s3_file_consolidation_process_no_files() {
let _cx = SinkContext::new_test();
Expand Down

0 comments on commit d217387

Please sign in to comment.