Skip to content

Commit

Permalink
feat: Pre-validate append record batch (#72)
Browse files Browse the repository at this point in the history
Resolves: #61

---------

Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal authored Nov 20, 2024
1 parent d9ee1e8 commit c51d541
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 100 deletions.
8 changes: 4 additions & 4 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use streamstore::{
batching::AppendRecordsBatchingStream,
client::{Client, ClientConfig, ClientError, HostEndpoints},
types::{
AppendInput, AppendRecord, BasinName, CreateBasinRequest, CreateStreamRequest,
DeleteBasinRequest, DeleteStreamRequest, ListBasinsRequest, ListStreamsRequest,
ReadSessionRequest,
AppendInput, AppendRecord, AppendRecordBatch, BasinName, CreateBasinRequest,
CreateStreamRequest, DeleteBasinRequest, DeleteStreamRequest, ListBasinsRequest,
ListStreamsRequest, ReadSessionRequest,
},
};

Expand Down Expand Up @@ -114,7 +114,7 @@ async fn main() {
AppendRecord::new(b"bye world"),
];

let append_input = AppendInput::new(records.clone());
let append_input = AppendInput::new(AppendRecordBatch::try_from_iter(records.clone()).unwrap());

match stream_client.append(append_input.clone()).await {
Ok(resp) => {
Expand Down
156 changes: 81 additions & 75 deletions src/batching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ use std::{
time::Duration,
};

use bytesize::ByteSize;
use futures::{Stream, StreamExt};

use crate::types::{self, MeteredSize as _};
use crate::types;

/// Options to configure append records batching scheme.
#[derive(Debug, Clone)]
pub struct AppendRecordsBatchingOpts {
max_batch_records: usize,
max_batch_size: ByteSize,
#[cfg(test)]
max_batch_size: bytesize::ByteSize,
match_seq_num: Option<u64>,
fencing_token: Option<Vec<u8>>,
linger_duration: Duration,
Expand All @@ -23,7 +23,8 @@ impl Default for AppendRecordsBatchingOpts {
fn default() -> Self {
Self {
max_batch_records: 1000,
max_batch_size: ByteSize::mib(1),
#[cfg(test)]
max_batch_size: bytesize::ByteSize::mib(1),
match_seq_num: None,
fencing_token: None,
linger_duration: Duration::from_millis(5),
Expand All @@ -38,31 +39,36 @@ impl AppendRecordsBatchingOpts {
}

/// Maximum number of records in a batch.
pub fn with_max_batch_records(self, max_batch_records: usize) -> Self {
assert!(
max_batch_records > 0 && max_batch_records <= 1000,
"max_batch_records should be between (0, 1000]"
);

Self {
max_batch_records,
..self
pub fn with_max_batch_records(self, max_batch_records: usize) -> Result<Self, String> {
if max_batch_records == 0
|| max_batch_records > types::AppendRecordBatch::MAX_BATCH_CAPACITY
{
Err("Batch capacity must be between 1 and 1000".to_string())
} else {
Ok(Self {
max_batch_records,
..self
})
}
}

/// Maximum size of a batch in bytes.
#[cfg(test)]
pub fn with_max_batch_size(self, max_batch_size: impl Into<ByteSize>) -> Self {
pub fn with_max_batch_size(
self,
max_batch_size: impl Into<bytesize::ByteSize>,
) -> Result<Self, String> {
let max_batch_size = max_batch_size.into();

assert!(
max_batch_size > ByteSize(0) && max_batch_size <= ByteSize::mib(1),
"max_batch_size should be between (0, 1] MiB"
);

Self {
max_batch_size,
..self
if max_batch_size == bytesize::ByteSize(0)
|| max_batch_size > types::AppendRecordBatch::MAX_METERED_SIZE
{
Err("Batch size must be between 1 byte and 1000 MiB".to_string())
} else {
Ok(Self {
max_batch_size,
..self
})
}
}

Expand Down Expand Up @@ -113,6 +119,7 @@ impl AppendRecordsBatchingStream {

impl Stream for AppendRecordsBatchingStream {
type Item = types::AppendInput;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
Expand Down Expand Up @@ -158,17 +165,9 @@ where
};
}

if !batch_builder.is_empty() {
yield batch_builder.flush();
if let Some(input) = batch_builder.flush() {
yield input;
}

// Now that we have flushed (if required), the batch builder should
// definitely not be full. It might not be empty since the peeked
// record might have been pushed into the batch.
assert!(
!batch_builder.is_full(),
"dangling peeked record does not fit into size limits",
);
}
}
}
Expand All @@ -177,78 +176,81 @@ struct BatchBuilder<'a> {
opts: &'a AppendRecordsBatchingOpts,
peeked_record: Option<types::AppendRecord>,
next_match_seq_num: Option<u64>,
batch: Vec<types::AppendRecord>,
batch_size: ByteSize,
batch: types::AppendRecordBatch,
}

impl<'a> BatchBuilder<'a> {
pub fn new<'b: 'a>(opts: &'b AppendRecordsBatchingOpts) -> Self {
Self {
peeked_record: None,
next_match_seq_num: opts.match_seq_num,
batch: Vec::with_capacity(opts.max_batch_records),
batch_size: ByteSize(0),
batch: Self::new_batch(opts),
opts,
}
}

#[cfg(not(test))]
fn new_batch(opts: &AppendRecordsBatchingOpts) -> types::AppendRecordBatch {
types::AppendRecordBatch::with_max_capacity(opts.max_batch_records)
.expect("previously validated max capacity")
}

#[cfg(test)]
fn new_batch(opts: &AppendRecordsBatchingOpts) -> types::AppendRecordBatch {
types::AppendRecordBatch::with_max_capacity_and_size(
opts.max_batch_records,
opts.max_batch_size,
)
.expect("previously validated max capacity and size parameters")
}

pub fn push(&mut self, record: impl Into<types::AppendRecord>) {
assert!(!self.is_full());
let record = record.into();
let record_size = record.metered_size();
if self.batch_size + record_size > self.opts.max_batch_size {
if let Err(record) = self.batch.push(record) {
let ret = self.peeked_record.replace(record);
assert!(ret.is_none());
} else {
self.batch_size += record_size;
self.batch.push(record);
}
}

pub fn is_empty(&self) -> bool {
if self.batch.is_empty() {
assert_eq!(self.batch_size, ByteSize(0));
true
} else {
false
}
self.batch.is_empty()
}

pub fn len(&self) -> usize {
self.batch.len()
}

pub fn is_full(&self) -> bool {
assert!(self.batch.len() <= self.opts.max_batch_records);
self.batch.len() == self.opts.max_batch_records || self.peeked_record.is_some()
self.batch.is_full() || self.peeked_record.is_some()
}

pub fn flush(&mut self) -> types::AppendInput {
assert!(!self.is_empty());

let match_seq_num = self.next_match_seq_num;
if let Some(next_match_seq_num) = self.next_match_seq_num.as_mut() {
*next_match_seq_num += self.batch.len() as u64;
}
pub fn flush(&mut self) -> Option<types::AppendInput> {
let ret = if self.batch.is_empty() {
None
} else {
let match_seq_num = self.next_match_seq_num;
if let Some(next_match_seq_num) = self.next_match_seq_num.as_mut() {
*next_match_seq_num += self.batch.len() as u64;
}

// Reset the inner batch, batch_size and push back the peeked record
// into the batch.
let records = {
self.batch_size = ByteSize(0);
std::mem::replace(
&mut self.batch,
Vec::with_capacity(self.opts.max_batch_records),
)
let records = std::mem::replace(&mut self.batch, Self::new_batch(self.opts));
Some(types::AppendInput {
records,
match_seq_num,
fencing_token: self.opts.fencing_token.clone(),
})
};

if let Some(record) = self.peeked_record.take() {
self.push(record);
}

types::AppendInput {
records,
match_seq_num,
fencing_token: self.opts.fencing_token.clone(),
}
// If the peeked record could not be moved into the batch, it doesn't fit size limits.
assert!(
self.peeked_record.is_none(),
"dangling peeked record does not fit into size limits"
);

ret
}
}

Expand Down Expand Up @@ -280,10 +282,10 @@ mod tests {

let mut opts = AppendRecordsBatchingOpts::new().with_linger(Duration::ZERO);
if let Some(max_batch_records) = max_batch_records {
opts = opts.with_max_batch_records(max_batch_records);
opts = opts.with_max_batch_records(max_batch_records).unwrap();
}
if let Some(max_batch_size) = max_batch_size {
opts = opts.with_max_batch_size(max_batch_size);
opts = opts.with_max_batch_size(max_batch_size).unwrap();
}

let batch_stream = AppendRecordsBatchingStream::new(stream, opts);
Expand Down Expand Up @@ -314,7 +316,9 @@ mod tests {
AppendRecordsBatchingOpts::new()
.with_linger(Duration::from_secs(2))
.with_max_batch_records(3)
.with_max_batch_size(ByteSize::b(40)),
.unwrap()
.with_max_batch_size(ByteSize::b(40))
.unwrap(),
);

batch_stream
Expand Down Expand Up @@ -398,7 +402,9 @@ mod tests {

let mut batch_stream = AppendRecordsBatchingStream::new(
stream,
AppendRecordsBatchingOpts::new().with_max_batch_size(ByteSize::b(1)),
AppendRecordsBatchingOpts::new()
.with_max_batch_size(ByteSize::b(1))
.unwrap(),
);

let _ = batch_stream.next().await;
Expand Down
7 changes: 2 additions & 5 deletions src/service/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl ServiceRequest for AppendServiceRequest {

fn prepare_request(&mut self) -> Result<tonic::Request<Self::ApiRequest>, types::ConvertError> {
Ok(api::AppendRequest {
input: Some(self.req.clone().try_into_api_type(self.stream.clone())?),
input: Some(self.req.clone().into_api_type(self.stream.clone())),
}
.into_request())
}
Expand Down Expand Up @@ -299,10 +299,7 @@ impl StreamingRequest for AppendSessionStreamingRequest {

fn prepare_request_item(&self, req: Self::RequestItem) -> Self::ApiRequestItem {
api::AppendSessionRequest {
input: Some(
req.try_into_api_type(&self.stream)
.expect("append input batch should be valid"),
),
input: Some(req.into_api_type(&self.stream)),
}
}
}
Expand Down
Loading

0 comments on commit c51d541

Please sign in to comment.