Skip to content

Commit

Permalink
fix snippet
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead committed Oct 31, 2023
1 parent 63242a0 commit 320cf24
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,24 +237,24 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {

/// Start plan execution for `partition`, returning a [`SendableRecordBatchStream`] of [`RecordBatch`]es.
///
///
/// Often its needed to call [`.await`](https://doc.rust-lang.org/std/keyword.await.html) just after the stream async combinators.
/// To call it within the sync method please refer the snippet below. The snippet can also be used to help output/debug the node execution on batch level
///
/// ```no_run
/// use datafusion_common::DataFusionError;
///
/// fn print_plan_exec() -> Result<(), DataFusionError> {
/// let stream = node.execute(0, task_ctx)?;
/// futures::stream::once(async move {
/// while let Some(batch) = stream.next().await {
/// The snippet below modifies the async stream in sync method, sorting the underlying data with external sorter which returns [`SendableRecordBatchStream`], and can also be used to help output/debug the node execution on batch level.
///
/// ```ignore
/// fn sort_stream(input: SendableRecordBatchStream, schema: SchemaRef) -> Result<SendableRecordBatchStream> {
/// Ok(Box::pin(RecordBatchStreamAdapter::new(
/// self.schema(),
/// futures::stream::once(async move {
/// while let Some(batch) = input.next().await {
/// let batch = batch?;
/// println!("{}", batch);
/// return Ok(());
/// // Debug/Output
/// dbg!(&batch);
/// sorter.insert_batch(batch).await?;
/// }
/// Ok(())
/// });
/// Ok(())
/// sorter.sort()
/// })
/// .try_flatten(),
/// )))
/// }
/// ```
fn execute(
Expand Down

0 comments on commit 320cf24

Please sign in to comment.