Skip to content

Commit

Permalink
Implement streaming versions of Dataframe.collect methods (#789)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Jul 30, 2021
1 parent 7dde5b1 commit d637871
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 37 deletions.
31 changes: 31 additions & 0 deletions datafusion/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::logical_plan::{
};
use std::sync::Arc;

use crate::physical_plan::SendableRecordBatchStream;
use async_trait::async_trait;

/// DataFrame represents a logical set of rows with the same named columns.
Expand Down Expand Up @@ -222,6 +223,21 @@ pub trait DataFrame: Send + Sync {
/// ```
async fn collect(&self) -> Result<Vec<RecordBatch>>;

/// Executes this DataFrame and returns a stream over a single partition
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let stream = df.execute_stream().await?;
/// # Ok(())
/// # }
/// ```
async fn execute_stream(&self) -> Result<SendableRecordBatchStream>;

/// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
/// maintaining the input partitioning.
///
Expand All @@ -238,6 +254,21 @@ pub trait DataFrame: Send + Sync {
/// ```
async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>>;

/// Executes this DataFrame and returns one stream per partition.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let batches = df.execute_stream_partitioned().await?;
/// # Ok(())
/// # }
/// ```
async fn execute_stream_partitioned(&self) -> Result<Vec<SendableRecordBatchStream>>;

/// Returns the schema describing the output of this DataFrame in terms of columns returned,
/// where each column has a name, data type, and nullability attribute.
Expand Down
44 changes: 32 additions & 12 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ use crate::{
physical_plan::{collect, collect_partitioned},
};

use crate::physical_plan::{
execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream,
};
use async_trait::async_trait;

/// Implementation of DataFrame API
Expand All @@ -47,6 +50,14 @@ impl DataFrameImpl {
plan: plan.clone(),
}
}

/// Create a physical plan
async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
let state = self.ctx_state.lock().unwrap().clone();
let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
let plan = ctx.optimize(&self.plan)?;
ctx.create_physical_plan(&plan)
}
}

#[async_trait]
Expand Down Expand Up @@ -138,26 +149,35 @@ impl DataFrame for DataFrameImpl {
self.plan.clone()
}

// Convert the logical plan represented by this DataFrame into a physical plan and
// execute it
/// Convert the logical plan represented by this DataFrame into a physical plan and
/// execute it, collecting all resulting batches into memory
async fn collect(&self) -> Result<Vec<RecordBatch>> {
let state = self.ctx_state.lock().unwrap().clone();
let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
let plan = ctx.optimize(&self.plan)?;
let plan = ctx.create_physical_plan(&plan)?;
let plan = self.create_physical_plan().await?;
Ok(collect(plan).await?)
}

// Convert the logical plan represented by this DataFrame into a physical plan and
// execute it
/// Convert the logical plan represented by this DataFrame into a physical plan and
/// execute it, returning a stream over a single partition
async fn execute_stream(&self) -> Result<SendableRecordBatchStream> {
let plan = self.create_physical_plan().await?;
execute_stream(plan).await
}

/// Convert the logical plan represented by this DataFrame into a physical plan and
/// execute it, collecting all resulting batches into memory while maintaining
/// partitioning
async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>> {
let state = self.ctx_state.lock().unwrap().clone();
let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
let plan = ctx.optimize(&self.plan)?;
let plan = ctx.create_physical_plan(&plan)?;
let plan = self.create_physical_plan().await?;
Ok(collect_partitioned(plan).await?)
}

/// Convert the logical plan represented by this DataFrame into a physical plan and
/// execute it, returning a stream for each partition
async fn execute_stream_partitioned(&self) -> Result<Vec<SendableRecordBatchStream>> {
let plan = self.create_physical_plan().await?;
Ok(execute_stream_partitioned(plan).await?)
}

/// Returns the schema from the logical plan
fn schema(&self) -> &DFSchema {
self.plan.schema()
Expand Down
92 changes: 67 additions & 25 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

//! Traits for physical query plan, supporting parallel execution for partitioned relations.
use std::fmt;
use std::fmt::{Debug, Display};
use std::ops::Range;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, pin::Pin};

use self::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
};
Expand All @@ -35,12 +43,6 @@ use async_trait::async_trait;
pub use display::DisplayFormatType;
use futures::stream::Stream;
use hashbrown::HashMap;
use std::fmt;
use std::fmt::{Debug, Display};
use std::ops::Range;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{any::Any, pin::Pin};

/// Trait for types that stream [arrow::record_batch::RecordBatch]
pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
Expand All @@ -54,6 +56,37 @@ pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
/// Trait for a stream of record batches.
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send + Sync>>;

/// EmptyRecordBatchStream can be used to create a RecordBatchStream
/// that will produce no results
pub struct EmptyRecordBatchStream {
/// Schema
schema: SchemaRef,
}

impl EmptyRecordBatchStream {
/// Create an empty RecordBatchStream
pub fn new(schema: SchemaRef) -> Self {
Self { schema }
}
}

impl RecordBatchStream for EmptyRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

impl Stream for EmptyRecordBatchStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
}

/// SQL metric type
#[derive(Debug, Clone)]
pub enum MetricType {
Expand Down Expand Up @@ -313,18 +346,23 @@ pub fn plan_metrics(plan: Arc<dyn ExecutionPlan>) -> HashMap<String, SQLMetric>

/// Execute the [ExecutionPlan] and collect the results in memory
pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
let stream = execute_stream(plan).await?;
common::collect(stream).await
}

/// Execute the [ExecutionPlan] and return a single stream of results
pub async fn execute_stream(
plan: Arc<dyn ExecutionPlan>,
) -> Result<SendableRecordBatchStream> {
match plan.output_partitioning().partition_count() {
0 => Ok(vec![]),
1 => {
let it = plan.execute(0).await?;
common::collect(it).await
}
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1 => plan.execute(0).await,
_ => {
// merge into a single partition
let plan = CoalescePartitionsExec::new(plan.clone());
// CoalescePartitionsExec must produce a single partition
assert_eq!(1, plan.output_partitioning().partition_count());
common::collect(plan.execute(0).await?).await
plan.execute(0).await
}
}
}
Expand All @@ -333,20 +371,24 @@ pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
pub async fn collect_partitioned(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<Vec<RecordBatch>>> {
match plan.output_partitioning().partition_count() {
0 => Ok(vec![]),
1 => {
let it = plan.execute(0).await?;
Ok(vec![common::collect(it).await?])
}
_ => {
let mut partitions = vec![];
for i in 0..plan.output_partitioning().partition_count() {
partitions.push(common::collect(plan.execute(i).await?).await?)
}
Ok(partitions)
}
let streams = execute_stream_partitioned(plan).await?;
let mut batches = Vec::with_capacity(streams.len());
for stream in streams {
batches.push(common::collect(stream).await?);
}
Ok(batches)
}

/// Execute the [ExecutionPlan] and return a vec with one stream per output partition
pub async fn execute_stream_partitioned(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<SendableRecordBatchStream>> {
let num_partitions = plan.output_partitioning().partition_count();
let mut streams = Vec::with_capacity(num_partitions);
for i in 0..num_partitions {
streams.push(plan.execute(i).await?);
}
Ok(streams)
}

/// Partitioning schemes supported by operators.
Expand Down

0 comments on commit d637871

Please sign in to comment.