Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: fencing_token and match_seq_num for AppendRecordsBatchStream #77

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 54 additions & 11 deletions src/batching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,17 @@ impl AppendRecordsBatchingOpts {
/// Enforce that the sequence number issued to the first record matches.
///
/// This is incremented automatically for each batch.
pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
pub fn with_match_seq_num(self, match_seq_num: Option<u64>) -> Self {
Self {
match_seq_num: Some(match_seq_num.into()),
match_seq_num,
..self
}
}

/// Enforce a fencing token.
pub fn with_fencing_token(self, fencing_token: impl Into<Vec<u8>>) -> Self {
pub fn with_fencing_token<T: Into<Vec<u8>>>(self, fencing_token: Option<T>) -> Self {
Self {
fencing_token: Some(fencing_token.into()),
fencing_token: fencing_token.map(Into::into),
..self
}
}
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<'a> BatchBuilder<'a> {
pub fn push(&mut self, record: impl Into<types::AppendRecord>) {
if let Err(record) = self.batch.push(record) {
let ret = self.peeked_record.replace(record);
assert!(ret.is_none());
assert_eq!(ret, None);
}
}

Expand Down Expand Up @@ -237,8 +237,8 @@ impl<'a> BatchBuilder<'a> {
}

// If the peeked record could not be moved into the batch, it doesn't fit size limits.
assert!(
self.peeked_record.is_none(),
assert_eq!(
self.peeked_record, None,
"dangling peeked record does not fit into size limits"
);

Expand All @@ -257,15 +257,15 @@ mod tests {
use tokio_stream::wrappers::UnboundedReceiverStream;

use super::{AppendRecordsBatchingOpts, AppendRecordsBatchingStream};
use crate::types;
use crate::types::{self, AppendInput, AppendRecordBatch};

#[rstest]
#[case(Some(2), None)]
#[case(None, Some(ByteSize::b(30)))]
#[case(Some(2), Some(ByteSize::b(100)))]
#[case(Some(10), Some(ByteSize::b(30)))]
#[tokio::test]
async fn test_append_record_stream_batching(
async fn test_append_record_batching_mechanics(
#[case] max_batch_records: Option<usize>,
#[case] max_batch_size: Option<ByteSize>,
) {
Expand Down Expand Up @@ -298,7 +298,7 @@ mod tests {
}

#[tokio::test(start_paused = true)]
async fn test_append_record_stream_linger() {
async fn test_append_record_batching_linger() {
let (stream_tx, stream_rx) = mpsc::unbounded_channel::<types::AppendRecord>();
let mut i = 0;

Expand Down Expand Up @@ -386,7 +386,7 @@ mod tests {

#[tokio::test]
#[should_panic]
async fn test_append_record_stream_panic_size_limits() {
async fn test_append_record_batching_panic_size_limits() {
let stream =
futures::stream::iter([types::AppendRecord::new("too long to fit into size limits")]);

Expand All @@ -397,4 +397,47 @@ mod tests {

let _ = batch_stream.next().await;
}

#[tokio::test]
async fn test_append_record_batching_append_input_opts() {
let test_record = types::AppendRecord::new("a");

let total_records = 12;
let test_records = (0..total_records)
.map(|_| test_record.clone())
.collect::<Vec<_>>();

let expected_fencing_token = "hello".as_bytes();
let mut expected_match_seq_num = 10;

let num_batch_records = 3;

let batch_stream = AppendRecordsBatchingStream::new(
futures::stream::iter(test_records),
AppendRecordsBatchingOpts::new()
.with_max_batch_records(num_batch_records)
.with_fencing_token(Some(expected_fencing_token))
.with_match_seq_num(Some(expected_match_seq_num)),
);

let batches = batch_stream.collect::<Vec<_>>().await;

assert_eq!(batches.len(), total_records / num_batch_records);

let expected_batch =
AppendRecordBatch::try_from_iter((0..num_batch_records).map(|_| test_record.clone()))
.unwrap();

for input in batches {
let AppendInput {
records,
match_seq_num,
fencing_token,
} = input;
assert_eq!(records, expected_batch);
assert_eq!(fencing_token.as_deref(), Some(expected_fencing_token));
assert_eq!(match_seq_num, Some(expected_match_seq_num));
expected_match_seq_num += num_batch_records as u64;
}
}
}
17 changes: 15 additions & 2 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ impl From<api::CheckTailResponse> for u64 {
}

#[sync_docs]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Header {
pub name: Vec<u8>,
pub value: Vec<u8>,
Expand Down Expand Up @@ -808,7 +808,7 @@ impl CommandRecord {
}

#[sync_docs]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AppendRecord {
pub headers: Vec<Header>,
pub body: Vec<u8>,
Expand Down Expand Up @@ -863,6 +863,19 @@ pub struct AppendRecordBatch {
max_size: ByteSize,
}

impl PartialEq for AppendRecordBatch {
fn eq(&self, other: &Self) -> bool {
if self.records.eq(&other.records) {
assert_eq!(self.metered_size, other.metered_size);
true
} else {
false
}
}
}

impl Eq for AppendRecordBatch {}

impl Default for AppendRecordBatch {
fn default() -> Self {
Self::new()
Expand Down