diff --git a/src/batching.rs b/src/batching.rs index 6c7695f..bc4ce11 100644 --- a/src/batching.rs +++ b/src/batching.rs @@ -1,4 +1,50 @@ //! Append records batching stream. +//! +//! [`StreamClient::append_session`] accepts a stream of [`AppendInput`]s which +//! requires a user to batch records into [`AppendRecordBatch`]es. This module +//! provides a way to smartly batch [`AppendRecord`]s based on size limits and +//! linger duration. +//! +//! The stream enforces the provided fencing token (if any) and auto-increments +//! matching sequence number for concurrency control. +//! +//! # Example usage +//! +//! ```no_run +//! # use streamstore::client::*; +//! # use streamstore::types::*; +//! # use streamstore::batching::*; +//! # use std::time::Duration; +//! # let config = ClientConfig::new("token"); +//! # let basin: BasinName = "my-basin".parse().unwrap(); +//! # let stream_client = StreamClient::new(config, basin, "stream"); +//! # let fencing_token = FencingToken::generate(16).unwrap(); +//! let append_records_stream = futures::stream::iter([ +//! AppendRecord::new("hello").unwrap(), +//! AppendRecord::new("bye").unwrap(), +//! // ... +//! ]); +//! +//! let batching_opts = AppendRecordsBatchingOpts::new() +//! .with_max_batch_records(100) +//! .with_linger(Duration::from_millis(100)) +//! .with_fencing_token(Some(fencing_token)) +//! .with_match_seq_num(Some(10)); +//! +//! let batching_stream = AppendRecordsBatchingStream::new( +//! append_records_stream, +//! batching_opts, +//! ); +//! +//! # let _ = async move { +//! let ack_stream = stream_client.append_session(batching_stream).await?; +//! # return Ok::<(), ClientError>(()); }; +//! ``` +//! +//! [`StreamClient::append_session`]: crate::client::StreamClient::append_session +//! [`AppendInput`]: crate::types::AppendInput +//! [`AppendRecordBatch`]: crate::types::AppendRecordBatch +//! [`AppendRecord`]: crate::types::AppendRecord use std::{ pin::Pin, @@ -10,7 +56,7 @@ use futures::{Stream, StreamExt}; use crate::types; -/// Options to configure append records batching scheme. +/// Options to configure batching scheme for [`AppendRecordsBatchingStream`]. #[derive(Debug, Clone)] pub struct AppendRecordsBatchingOpts { max_batch_records: usize, @@ -95,8 +141,14 @@ impl AppendRecordsBatchingOpts { } } -/// Wrapper stream that takes a stream of append records and batches them -/// together to send as an `AppendOutput`. +/// Wrap a stream of [`AppendRecord`]s as a stream of [`AppendInput`]s by +/// smartly batching records together based on batching options provided by +/// [`AppendRecordsBatchingOpts`]. +/// +/// See the module level documentation for detailed usage. +/// +/// [`AppendRecord`]: crate::types::AppendRecord +/// [`AppendInput`]: crate::types::AppendInput pub struct AppendRecordsBatchingStream(Pin + Send>>); impl AppendRecordsBatchingStream { diff --git a/src/client.rs b/src/client.rs index 0ffac4d..86827bb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -115,7 +115,7 @@ pub enum AppendRetryPolicy { } impl S2Endpoints { - /// Create S2 endpoints for the specified cloud. + /// Get S2 endpoints for the specified cloud. pub fn for_cloud(cloud: S2Cloud) -> Self { Self { account: format!("{cloud}.s2.dev") @@ -129,7 +129,7 @@ impl S2Endpoints { } } - /// Create S2 endpoints for the specified cell. + /// Get S2 endpoints for the specified cell. pub fn for_cell( cloud: S2Cloud, cell_id: impl Into, @@ -141,7 +141,7 @@ impl S2Endpoints { }) } - /// Create S2 endpoints from environment variables. + /// Get S2 endpoints from environment variables. /// /// The following environment variables are used: /// - `S2_CLOUD`: Valid S2 cloud name. Defaults to AWS.