From d637871ce969102681616ea113cf84288ff6c252 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 30 Jul 2021 10:30:39 -0700 Subject: [PATCH] Implement streaming versions of Dataframe.collect methods (#789) --- datafusion/src/dataframe.rs | 31 ++++++++ datafusion/src/execution/dataframe_impl.rs | 44 ++++++++--- datafusion/src/physical_plan/mod.rs | 92 ++++++++++++++++------ 3 files changed, 130 insertions(+), 37 deletions(-) diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs index 507a79861cd5..1d4cffdf89d4 100644 --- a/datafusion/src/dataframe.rs +++ b/datafusion/src/dataframe.rs @@ -24,6 +24,7 @@ use crate::logical_plan::{ }; use std::sync::Arc; +use crate::physical_plan::SendableRecordBatchStream; use async_trait::async_trait; /// DataFrame represents a logical set of rows with the same named columns. @@ -222,6 +223,21 @@ pub trait DataFrame: Send + Sync { /// ``` async fn collect(&self) -> Result>; + /// Executes this DataFrame and returns a stream over a single partition + /// + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let mut ctx = ExecutionContext::new(); + /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; + /// let stream = df.execute_stream().await?; + /// # Ok(()) + /// # } + /// ``` + async fn execute_stream(&self) -> Result; + /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch /// maintaining the input partitioning. /// @@ -238,6 +254,21 @@ pub trait DataFrame: Send + Sync { /// ``` async fn collect_partitioned(&self) -> Result>>; + /// Executes this DataFrame and returns one stream per partition. + /// + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let mut ctx = ExecutionContext::new(); + /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; + /// let batches = df.execute_stream_partitioned().await?; + /// # Ok(()) + /// # } + /// ``` + async fn execute_stream_partitioned(&self) -> Result>; + /// Returns the schema describing the output of this DataFrame in terms of columns returned, /// where each column has a name, data type, and nullability attribute. diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 451c4c7ba502..1c0094b711d6 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -31,6 +31,9 @@ use crate::{ physical_plan::{collect, collect_partitioned}, }; +use crate::physical_plan::{ + execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream, +}; use async_trait::async_trait; /// Implementation of DataFrame API @@ -47,6 +50,14 @@ impl DataFrameImpl { plan: plan.clone(), } } + + /// Create a physical plan + async fn create_physical_plan(&self) -> Result> { + let state = self.ctx_state.lock().unwrap().clone(); + let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); + let plan = ctx.optimize(&self.plan)?; + ctx.create_physical_plan(&plan) + } } #[async_trait] @@ -138,26 +149,35 @@ impl DataFrame for DataFrameImpl { self.plan.clone() } - // Convert the logical plan represented by this DataFrame into a physical plan and - // execute it + /// Convert the logical plan represented by this DataFrame into a physical plan and + /// execute it, collecting all resulting batches into memory async fn collect(&self) -> Result> { - let state = self.ctx_state.lock().unwrap().clone(); - let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); - let plan = ctx.optimize(&self.plan)?; - let plan = ctx.create_physical_plan(&plan)?; + let plan = self.create_physical_plan().await?; Ok(collect(plan).await?) } - // Convert the logical plan represented by this DataFrame into a physical plan and - // execute it + /// Convert the logical plan represented by this DataFrame into a physical plan and + /// execute it, returning a stream over a single partition + async fn execute_stream(&self) -> Result { + let plan = self.create_physical_plan().await?; + execute_stream(plan).await + } + + /// Convert the logical plan represented by this DataFrame into a physical plan and + /// execute it, collecting all resulting batches into memory while maintaining + /// partitioning async fn collect_partitioned(&self) -> Result>> { - let state = self.ctx_state.lock().unwrap().clone(); - let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); - let plan = ctx.optimize(&self.plan)?; - let plan = ctx.create_physical_plan(&plan)?; + let plan = self.create_physical_plan().await?; Ok(collect_partitioned(plan).await?) } + /// Convert the logical plan represented by this DataFrame into a physical plan and + /// execute it, returning a stream for each partition + async fn execute_stream_partitioned(&self) -> Result> { + let plan = self.create_physical_plan().await?; + Ok(execute_stream_partitioned(plan).await?) + } + /// Returns the schema from the logical plan fn schema(&self) -> &DFSchema { self.plan.schema() diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index b3c0dd63e9ed..86bceb11b475 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -17,6 +17,14 @@ //! Traits for physical query plan, supporting parallel execution for partitioned relations. +use std::fmt; +use std::fmt::{Debug, Display}; +use std::ops::Range; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{any::Any, pin::Pin}; + use self::{ coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan, }; @@ -35,12 +43,6 @@ use async_trait::async_trait; pub use display::DisplayFormatType; use futures::stream::Stream; use hashbrown::HashMap; -use std::fmt; -use std::fmt::{Debug, Display}; -use std::ops::Range; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::{any::Any, pin::Pin}; /// Trait for types that stream [arrow::record_batch::RecordBatch] pub trait RecordBatchStream: Stream> { @@ -54,6 +56,37 @@ pub trait RecordBatchStream: Stream> { /// Trait for a stream of record batches. pub type SendableRecordBatchStream = Pin>; +/// EmptyRecordBatchStream can be used to create a RecordBatchStream +/// that will produce no results +pub struct EmptyRecordBatchStream { + /// Schema + schema: SchemaRef, +} + +impl EmptyRecordBatchStream { + /// Create an empty RecordBatchStream + pub fn new(schema: SchemaRef) -> Self { + Self { schema } + } +} + +impl RecordBatchStream for EmptyRecordBatchStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for EmptyRecordBatchStream { + type Item = ArrowResult; + + fn poll_next( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(None) + } +} + /// SQL metric type #[derive(Debug, Clone)] pub enum MetricType { @@ -313,18 +346,23 @@ pub fn plan_metrics(plan: Arc) -> HashMap /// Execute the [ExecutionPlan] and collect the results in memory pub async fn collect(plan: Arc) -> Result> { + let stream = execute_stream(plan).await?; + common::collect(stream).await +} + +/// Execute the [ExecutionPlan] and return a single stream of results +pub async fn execute_stream( + plan: Arc, +) -> Result { match plan.output_partitioning().partition_count() { - 0 => Ok(vec![]), - 1 => { - let it = plan.execute(0).await?; - common::collect(it).await - } + 0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))), + 1 => plan.execute(0).await, _ => { // merge into a single partition let plan = CoalescePartitionsExec::new(plan.clone()); // CoalescePartitionsExec must produce a single partition assert_eq!(1, plan.output_partitioning().partition_count()); - common::collect(plan.execute(0).await?).await + plan.execute(0).await } } } @@ -333,20 +371,24 @@ pub async fn collect(plan: Arc) -> Result> { pub async fn collect_partitioned( plan: Arc, ) -> Result>> { - match plan.output_partitioning().partition_count() { - 0 => Ok(vec![]), - 1 => { - let it = plan.execute(0).await?; - Ok(vec![common::collect(it).await?]) - } - _ => { - let mut partitions = vec![]; - for i in 0..plan.output_partitioning().partition_count() { - partitions.push(common::collect(plan.execute(i).await?).await?) - } - Ok(partitions) - } + let streams = execute_stream_partitioned(plan).await?; + let mut batches = Vec::with_capacity(streams.len()); + for stream in streams { + batches.push(common::collect(stream).await?); + } + Ok(batches) +} + +/// Execute the [ExecutionPlan] and return a vec with one stream per output partition +pub async fn execute_stream_partitioned( + plan: Arc, +) -> Result> { + let num_partitions = plan.output_partitioning().partition_count(); + let mut streams = Vec::with_capacity(num_partitions); + for i in 0..num_partitions { + streams.push(plan.execute(i).await?); } + Ok(streams) } /// Partitioning schemes supported by operators.