Skip to content

Commit

Permalink
chore(sinks): Update PartitionBatcher to use BatchConfig (#18792)
Browse files Browse the repository at this point in the history
* Update PartitionBatcher to use BatchConfig

Signed-off-by: Stephen Wakely <[email protected]>

* Remove zorkwonk

Signed-off-by: Stephen Wakely <[email protected]>

* Make into fns as fns instead

Signed-off-by: Stephen Wakely <[email protected]>

* Spelling

Signed-off-by: Stephen Wakely <[email protected]>

* Don't box the closure

Signed-off-by: Stephen Wakely <[email protected]>

* Clippy

Signed-off-by: Stephen Wakely <[email protected]>

* Only insert timeout if we add the batch to the list

Signed-off-by: Stephen Wakely <[email protected]>

* Allow the timer to remove an item

Signed-off-by: Stephen Wakely <[email protected]>

* Update stackdriver metrics batch settings config

Signed-off-by: Stephen Wakely <[email protected]>

* Dont panic when timer references a batch that no longer exists

Signed-off-by: Stephen Wakely <[email protected]>

---------

Signed-off-by: Stephen Wakely <[email protected]>
  • Loading branch information
StephenWakely authored Oct 16, 2023
1 parent 2deeba1 commit 4a7d0c3
Show file tree
Hide file tree
Showing 29 changed files with 152 additions and 231 deletions.
9 changes: 3 additions & 6 deletions lib/vector-core/src/stream/batcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,7 @@ mod test {
NonZeroUsize::new(10000).unwrap(),
NonZeroUsize::new(2).unwrap(),
);
let batcher = Batcher::new(
stream,
settings.into_item_size_config(|x: &u32| *x as usize),
);
let batcher = Batcher::new(stream, settings.as_item_size_config(|x: &u32| *x as usize));
let batches: Vec<_> = batcher.collect().await;
assert_eq!(batches, vec![vec![1, 2], vec![3],]);
}
Expand All @@ -146,7 +143,7 @@ mod test {
NonZeroUsize::new(5).unwrap(),
NonZeroUsize::new(100).unwrap(),
)
.into_item_size_config(|x: &u32| *x as usize),
.as_item_size_config(|x: &u32| *x as usize),
);
let batches: Vec<_> = batcher.collect().await;
assert_eq!(
Expand Down Expand Up @@ -176,7 +173,7 @@ mod test {
NonZeroUsize::new(5).unwrap(),
NonZeroUsize::new(100).unwrap(),
)
.into_item_size_config(|x: &u32| *x as usize),
.as_item_size_config(|x: &u32| *x as usize),
);

tokio::pin!(batcher);
Expand Down
Loading

0 comments on commit 4a7d0c3

Please sign in to comment.