Skip to content

Commit

Permalink
Minor: Add doc example to RecordBatchStreamAdapter
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 10, 2024
1 parent becd0c9 commit 770178a
Showing 1 changed file with 25 additions and 2 deletions.
27 changes: 25 additions & 2 deletions datafusion/physical-plan/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,9 @@ impl RecordBatchReceiverStream {

pin_project! {
/// Combines a [`Stream`] with a [`SchemaRef`] implementing
/// [`RecordBatchStream`] for the combination
/// [`SendableRecordBatchStream`] for the combination
///
/// See [`Self::new`] for an example
pub struct RecordBatchStreamAdapter<S> {
schema: SchemaRef,

Expand All @@ -347,7 +349,28 @@ pin_project! {
}

impl<S> RecordBatchStreamAdapter<S> {
/// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream
/// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream.
///
/// Note to create a [`SendableRecordBatchStream`] you pun the result
///
/// # Example
/// ```
/// # use arrow::array::record_batch;
/// # use datafusion_execution::SendableRecordBatchStream;
/// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
/// // Create stream of Result<RecordBatch>
/// let batch = record_batch!(
/// ("a", Int32, [1, 2, 3]),
/// ("b", Float64, [Some(4.0), None, Some(5.0)])
/// ).expect("created batch");
/// let schema = batch.schema();
/// let stream = futures::stream::iter(vec![Ok(batch)]);
/// // Convert the stream to a SendableRecordBatchStream
/// let adapter = RecordBatchStreamAdapter::new(schema, stream);
/// // Now you can use the adapter as a SendableRecordBatchStream
/// let batch_stream: SendableRecordBatchStream = Box::pin(adapter);
/// // ...
/// ```
pub fn new(schema: SchemaRef, stream: S) -> Self {
Self { schema, stream }
}
Expand Down

0 comments on commit 770178a

Please sign in to comment.