Skip to content

Commit

Permalink
chore: assertions instead of errors for batch capacity & size
Browse files Browse the repository at this point in the history
  • Loading branch information
shikhar committed Nov 21, 2024
1 parent a69a608 commit 68d8dce
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 54 deletions.
62 changes: 26 additions & 36 deletions src/batching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::{
time::Duration,
};

#[cfg(test)]
use bytesize::ByteSize;
use futures::{Stream, StreamExt};

use crate::types;
Expand All @@ -13,7 +15,7 @@ use crate::types;
pub struct AppendRecordsBatchingOpts {
max_batch_records: usize,
#[cfg(test)]
max_batch_size: bytesize::ByteSize,
max_batch_size: ByteSize,
match_seq_num: Option<u64>,
fencing_token: Option<Vec<u8>>,
linger_duration: Duration,
Expand All @@ -24,7 +26,7 @@ impl Default for AppendRecordsBatchingOpts {
Self {
max_batch_records: 1000,
#[cfg(test)]
max_batch_size: bytesize::ByteSize::mib(1),
max_batch_size: ByteSize::mib(1),
match_seq_num: None,
fencing_token: None,
linger_duration: Duration::from_millis(5),
Expand All @@ -39,36 +41,30 @@ impl AppendRecordsBatchingOpts {
}

/// Maximum number of records in a batch.
pub fn with_max_batch_records(self, max_batch_records: usize) -> Result<Self, String> {
if max_batch_records == 0
|| max_batch_records > types::AppendRecordBatch::MAX_BATCH_CAPACITY
{
Err("Batch capacity must be between 1 and 1000".to_string())
} else {
Ok(Self {
max_batch_records,
..self
})
pub fn with_max_batch_records(self, max_batch_records: usize) -> Self {
assert!(
max_batch_records > 0
&& max_batch_records <= types::AppendRecordBatch::MAX_BATCH_CAPACITY,
"Batch capacity must be between 1 and 1000"
);
Self {
max_batch_records,
..self
}
}

/// Maximum size of a batch in bytes.
#[cfg(test)]
pub fn with_max_batch_size(
self,
max_batch_size: impl Into<bytesize::ByteSize>,
) -> Result<Self, String> {
pub fn with_max_batch_size(self, max_batch_size: impl Into<ByteSize>) -> Self {
let max_batch_size = max_batch_size.into();

if max_batch_size == bytesize::ByteSize(0)
|| max_batch_size > types::AppendRecordBatch::MAX_METERED_SIZE
{
Err("Batch size must be between 1 byte and 1000 MiB".to_string())
} else {
Ok(Self {
max_batch_size,
..self
})
assert!(
max_batch_size > ByteSize::b(0)
&& max_batch_size <= types::AppendRecordBatch::MAX_METERED_SIZE,
"Batch capacity must be between 1 byte and 1 MiB"
);
Self {
max_batch_size,
..self
}
}

Expand Down Expand Up @@ -192,7 +188,6 @@ impl<'a> BatchBuilder<'a> {
#[cfg(not(test))]
fn new_batch(opts: &AppendRecordsBatchingOpts) -> types::AppendRecordBatch {
types::AppendRecordBatch::with_max_capacity(opts.max_batch_records)
.expect("previously validated max capacity")
}

#[cfg(test)]
Expand All @@ -201,7 +196,6 @@ impl<'a> BatchBuilder<'a> {
opts.max_batch_records,
opts.max_batch_size,
)
.expect("previously validated max capacity and size parameters")
}

pub fn push(&mut self, record: impl Into<types::AppendRecord>) {
Expand Down Expand Up @@ -282,10 +276,10 @@ mod tests {

let mut opts = AppendRecordsBatchingOpts::new().with_linger(Duration::ZERO);
if let Some(max_batch_records) = max_batch_records {
opts = opts.with_max_batch_records(max_batch_records).unwrap();
opts = opts.with_max_batch_records(max_batch_records);
}
if let Some(max_batch_size) = max_batch_size {
opts = opts.with_max_batch_size(max_batch_size).unwrap();
opts = opts.with_max_batch_size(max_batch_size);
}

let batch_stream = AppendRecordsBatchingStream::new(stream, opts);
Expand Down Expand Up @@ -316,9 +310,7 @@ mod tests {
AppendRecordsBatchingOpts::new()
.with_linger(Duration::from_secs(2))
.with_max_batch_records(3)
.unwrap()
.with_max_batch_size(ByteSize::b(40))
.unwrap(),
.with_max_batch_size(ByteSize::b(40)),
);

batch_stream
Expand Down Expand Up @@ -402,9 +394,7 @@ mod tests {

let mut batch_stream = AppendRecordsBatchingStream::new(
stream,
AppendRecordsBatchingOpts::new()
.with_max_batch_size(ByteSize::b(1))
.unwrap(),
AppendRecordsBatchingOpts::new().with_max_batch_size(ByteSize::b(1)),
);

let _ = batch_stream.next().await;
Expand Down
31 changes: 13 additions & 18 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -875,39 +875,34 @@ impl AppendRecordBatch {

pub fn new() -> Self {
Self::with_max_capacity_and_size_inner(Self::MAX_BATCH_CAPACITY, Self::MAX_METERED_SIZE)
.expect("valid default max capacity and size")
}

pub fn with_max_capacity(max_capacity: usize) -> Result<Self, ConvertError> {
pub fn with_max_capacity(max_capacity: usize) -> Self {
Self::with_max_capacity_and_size_inner(max_capacity, Self::MAX_METERED_SIZE)
}

#[cfg(test)]
pub fn with_max_capacity_and_size(
max_capacity: usize,
max_size: ByteSize,
) -> Result<Self, ConvertError> {
pub fn with_max_capacity_and_size(max_capacity: usize, max_size: ByteSize) -> Self {
Self::with_max_capacity_and_size_inner(max_capacity, max_size)
}

fn with_max_capacity_and_size_inner(
max_capacity: usize,
max_size: ByteSize,
) -> Result<Self, ConvertError> {
if max_capacity == 0 || max_capacity > Self::MAX_BATCH_CAPACITY {
return Err("Batch capacity must be between 1 and 1000".into());
}
fn with_max_capacity_and_size_inner(max_capacity: usize, max_size: ByteSize) -> Self {
assert!(
max_capacity > 0 && max_capacity <= Self::MAX_BATCH_CAPACITY,
"Batch capacity must be between 1 and 1000"
);

if max_size == ByteSize(0) || max_size > Self::MAX_METERED_SIZE {
return Err("Batch size must be between 1 byte and 1000 MiB".into());
}
assert!(
max_size > ByteSize(0) || max_size <= Self::MAX_METERED_SIZE,
"Batch size must be between 1 byte and 1 MiB"
);

Ok(Self {
Self {
records: Vec::with_capacity(max_capacity),
metered_size: ByteSize(0),
max_capacity,
max_size,
})
}
}

pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
Expand Down

0 comments on commit 68d8dce

Please sign in to comment.