Skip to content

Commit

Permalink
chore(flush on shutdown): validate s3 sink flushes (#17667)
Browse files Browse the repository at this point in the history
<!--
**Your PR title must conform to the conventional commit spec!**

  <type>(<scope>)!: <description>

  * `type` = chore, enhancement, feat, fix, docs
  * `!` = OPTIONAL: signals a breaking change
* `scope` = Optional when `type` is "chore" or "docs", available scopes
https://github.com/vectordotdev/vector/blob/master/.github/semantic.yml#L20
  * `description` = short description of the change

Examples:

  * enhancement(file source): Add `sort` option to sort discovered files
  * feat(new source): Initial `statsd` source
  * fix(file source): Fix a bug discovering new files
  * chore(external docs): Clarify `batch_size` option
-->

Adding regression test for related issue:
#11405
  • Loading branch information
DominicBurkart authored Jun 16, 2023
1 parent bebac21 commit c21f892
Showing 1 changed file with 74 additions and 0 deletions.
74 changes: 74 additions & 0 deletions src/sinks/aws_s3/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,80 @@ async fn s3_healthchecks_invalid_bucket() {
.is_err());
}

#[tokio::test]
async fn s3_flush_on_exhaustion() {
let cx = SinkContext::new_test();

let bucket = uuid::Uuid::new_v4().to_string();
create_bucket(&bucket, false).await;

// batch size of ten events, timeout of ten seconds
let config = {
let mut batch = BatchConfig::default();
batch.max_events = Some(10);
batch.timeout_secs = Some(10.0);

S3SinkConfig {
bucket: bucket.to_string(),
key_prefix: random_string(10) + "/date=%F",
filename_time_format: default_filename_time_format(),
filename_append_uuid: true,
filename_extension: None,
options: S3Options::default(),
region: RegionOrEndpoint::with_both("minio", s3_address()),
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::None,
batch,
request: TowerRequestConfig::default(),
tls: Default::default(),
auth: Default::default(),
acknowledgements: Default::default(),
}
};
let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();

let (lines, _events) = random_lines_with_stream(100, 2, None); // only generate two events (less than batch size)

let events = lines.clone().into_iter().enumerate().map(|(i, line)| {
let mut e = LogEvent::from(line);
let i = if i < 10 {
1
} else if i < 20 {
2
} else {
3
};
e.insert("i", i.to_string());
Event::from(e)
});

// Here, we validate that the s3 sink flushes when its source stream is exhausted
// by giving it a number of inputs less than the batch size, verifying that the
// outputs for the in-flight batch are flushed. By timing out in 3 seconds with a
// flush period of ten seconds, we verify that the flush is triggered *at stream
// completion* and not because of periodic flushing.
assert!(tokio::time::timeout(
Duration::from_secs(3),
run_and_assert_sink_compliance(sink, stream::iter(events), &AWS_SINK_TAGS)
)
.await
.is_ok());

let keys = get_keys(&bucket, prefix).await;
assert_eq!(keys.len(), 1);

let mut response_lines: Vec<String> = Vec::new();
let mut key_stream = stream::iter(keys);
while let Some(key) = key_stream.next().await {
let obj = get_object(&bucket, key).await;
response_lines.append(&mut get_lines(obj).await);
}

assert_eq!(lines, response_lines); // if all events are received, and lines.len() < batch size, then a flush was performed.
}

async fn client() -> S3Client {
let auth = AwsAuthentication::test_auth();
let region = RegionOrEndpoint::with_both("minio", s3_address());
Expand Down

0 comments on commit c21f892

Please sign in to comment.