Skip to content

Commit

Permalink
docs: batching module Rust docs (#119)
Browse files Browse the repository at this point in the history
Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal authored Dec 18, 2024
1 parent 9c725e0 commit 43cce8d
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 6 deletions.
58 changes: 55 additions & 3 deletions src/batching.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,50 @@
//! Append records batching stream.
//!
//! [`StreamClient::append_session`] accepts a stream of [`AppendInput`]s which
//! requires a user to batch records into [`AppendRecordBatch`]es. This module
//! provides a way to smartly batch [`AppendRecord`]s based on size limits and
//! linger duration.
//!
//! The stream enforces the provided fencing token (if any) and auto-increments
//! matching sequence number for concurrency control.
//!
//! # Example usage
//!
//! ```no_run
//! # use streamstore::client::*;
//! # use streamstore::types::*;
//! # use streamstore::batching::*;
//! # use std::time::Duration;
//! # let config = ClientConfig::new("token");
//! # let basin: BasinName = "my-basin".parse().unwrap();
//! # let stream_client = StreamClient::new(config, basin, "stream");
//! # let fencing_token = FencingToken::generate(16).unwrap();
//! let append_records_stream = futures::stream::iter([
//! AppendRecord::new("hello").unwrap(),
//! AppendRecord::new("bye").unwrap(),
//! // ...
//! ]);
//!
//! let batching_opts = AppendRecordsBatchingOpts::new()
//! .with_max_batch_records(100)
//! .with_linger(Duration::from_millis(100))
//! .with_fencing_token(Some(fencing_token))
//! .with_match_seq_num(Some(10));
//!
//! let batching_stream = AppendRecordsBatchingStream::new(
//! append_records_stream,
//! batching_opts,
//! );
//!
//! # let _ = async move {
//! let ack_stream = stream_client.append_session(batching_stream).await?;
//! # return Ok::<(), ClientError>(()); };
//! ```
//!
//! [`StreamClient::append_session`]: crate::client::StreamClient::append_session
//! [`AppendInput`]: crate::types::AppendInput
//! [`AppendRecordBatch`]: crate::types::AppendRecordBatch
//! [`AppendRecord`]: crate::types::AppendRecord
use std::{
pin::Pin,
Expand All @@ -10,7 +56,7 @@ use futures::{Stream, StreamExt};

use crate::types;

/// Options to configure append records batching scheme.
/// Options to configure batching scheme for [`AppendRecordsBatchingStream`].
#[derive(Debug, Clone)]
pub struct AppendRecordsBatchingOpts {
max_batch_records: usize,
Expand Down Expand Up @@ -95,8 +141,14 @@ impl AppendRecordsBatchingOpts {
}
}

/// Wrapper stream that takes a stream of append records and batches them
/// together to send as an `AppendOutput`.
/// Wrap a stream of [`AppendRecord`]s as a stream of [`AppendInput`]s by
/// smartly batching records together based on batching options provided by
/// [`AppendRecordsBatchingOpts`].
///
/// See the module level documentation for detailed usage.
///
/// [`AppendRecord`]: crate::types::AppendRecord
/// [`AppendInput`]: crate::types::AppendInput
pub struct AppendRecordsBatchingStream(Pin<Box<dyn Stream<Item = types::AppendInput> + Send>>);

impl AppendRecordsBatchingStream {
Expand Down
6 changes: 3 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub enum AppendRetryPolicy {
}

impl S2Endpoints {
/// Create S2 endpoints for the specified cloud.
/// Get S2 endpoints for the specified cloud.
pub fn for_cloud(cloud: S2Cloud) -> Self {
Self {
account: format!("{cloud}.s2.dev")
Expand All @@ -129,7 +129,7 @@ impl S2Endpoints {
}
}

/// Create S2 endpoints for the specified cell.
/// Get S2 endpoints for the specified cell.
pub fn for_cell(
cloud: S2Cloud,
cell_id: impl Into<String>,
Expand All @@ -141,7 +141,7 @@ impl S2Endpoints {
})
}

/// Create S2 endpoints from environment variables.
/// Get S2 endpoints from environment variables.
///
/// The following environment variables are used:
/// - `S2_CLOUD`: Valid S2 cloud name. Defaults to AWS.
Expand Down

0 comments on commit 43cce8d

Please sign in to comment.