From 24afd35562f01b56b8d8a81674afab217c0f7924 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 31 Oct 2023 18:20:28 +0000 Subject: [PATCH 1/2] Add implementation examples to ExecutionPlan::execute --- datafusion/physical-plan/src/lib.rs | 101 ++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 3ada2fa163fd..6df42cfe550d 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -236,6 +236,107 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { } /// Begin execution of `partition`, returning a stream of [`RecordBatch`]es. + /// + /// # Implementation Examples + /// + /// ## Return Precomputed Batch + /// + /// We can return a precomputed batch as a stream + /// + /// ``` + /// # use std::sync::Arc; + /// # use arrow_array::RecordBatch; + /// # use arrow_schema::SchemaRef; + /// # use datafusion_common::Result; + /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext}; + /// # use datafusion_physical_plan::memory::MemoryStream; + /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter; + /// struct MyPlan { + /// batch: RecordBatch, + /// } + /// + /// impl MyPlan { + /// fn execute( + /// &self, + /// partition: usize, + /// context: Arc + /// ) -> Result { + /// let fut = futures::future::ready(Ok(self.batch.clone())); + /// let stream = futures::stream::once(fut); + /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), stream))) + /// } + /// } + /// ``` + /// + /// ## Async Compute Batch + /// + /// We can also lazily compute a RecordBatch when the returned stream is polled + /// + /// ``` + /// # use std::sync::Arc; + /// # use arrow_array::RecordBatch; + /// # use arrow_schema::SchemaRef; + /// # use datafusion_common::Result; + /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext}; + /// # use datafusion_physical_plan::memory::MemoryStream; + /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter; + /// struct MyPlan { + /// schema: SchemaRef, + /// } + /// + /// async fn get_batch() -> Result { + /// todo!() + /// } + /// + /// impl MyPlan { + /// fn execute( + /// &self, + /// partition: usize, + /// context: Arc + /// ) -> Result { + /// let fut = get_batch(); + /// let stream = futures::stream::once(fut); + /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream))) + /// } + /// } + /// ``` + /// + /// ## Async Compute Batch Stream + /// + /// We can lazily compute a RecordBatch stream when the returned stream is polled + /// flattening the result into a single stream + /// + /// ``` + /// # use std::sync::Arc; + /// # use arrow_array::RecordBatch; + /// # use arrow_schema::SchemaRef; + /// # use futures::TryStreamExt; + /// # use datafusion_common::Result; + /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext}; + /// # use datafusion_physical_plan::memory::MemoryStream; + /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter; + /// struct MyPlan { + /// schema: SchemaRef, + /// } + /// + /// async fn get_batch_stream() -> Result { + /// todo!() + /// } + /// + /// impl MyPlan { + /// fn execute( + /// &self, + /// partition: usize, + /// context: Arc + /// ) -> Result { + /// // A future that yields a stream + /// let fut = get_batch_stream(); + /// // Use TryStreamExt::try_flatten to flatten the stream of streams + /// let stream = futures::stream::once(fut).try_flatten(); + /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream))) + /// } + /// } + /// ``` fn execute( &self, partition: usize, From 57b3de73fbf92e9a566756258141cd34f217abe5 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 31 Oct 2023 20:06:51 +0000 Subject: [PATCH 2/2] Review feedback --- datafusion/physical-plan/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 6df42cfe550d..8ae2a8686674 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -337,6 +337,9 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// } /// } /// ``` + /// + /// See [`futures::stream::StreamExt`] and [`futures::stream::TryStreamExt`] for further + /// combinators that can be used with streams fn execute( &self, partition: usize,