Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement streaming versions of Dataframe.collect methods #789

Merged
merged 23 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions datafusion/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::logical_plan::{
};
use std::sync::Arc;

use crate::physical_plan::SendableRecordBatchStream;
use async_trait::async_trait;

/// DataFrame represents a logical set of rows with the same named columns.
Expand Down Expand Up @@ -222,6 +223,21 @@ pub trait DataFrame: Send + Sync {
/// ```
async fn collect(&self) -> Result<Vec<RecordBatch>>;

/// Executes this DataFrame and returns a stream over a single partition
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let stream = df.execute_stream().await?;
/// # Ok(())
/// # }
/// ```
async fn execute_stream(&self) -> Result<SendableRecordBatchStream>;

/// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
/// maintaining the input partitioning.
///
Expand All @@ -238,6 +254,21 @@ pub trait DataFrame: Send + Sync {
/// ```
async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>>;

/// Executes this DataFrame and returns one stream per partition.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let batches = df.execute_stream_partitioned().await?;
/// # Ok(())
/// # }
/// ```
async fn execute_stream_partitioned(&self) -> Result<Vec<SendableRecordBatchStream>>;

/// Returns the schema describing the output of this DataFrame in terms of columns returned,
/// where each column has a name, data type, and nullability attribute.

Expand Down
44 changes: 32 additions & 12 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ use crate::{
physical_plan::{collect, collect_partitioned},
};

use crate::physical_plan::{
execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream,
};
use async_trait::async_trait;

/// Implementation of DataFrame API
Expand All @@ -47,6 +50,14 @@ impl DataFrameImpl {
plan: plan.clone(),
}
}

/// Create a physical plan
async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
let state = self.ctx_state.lock().unwrap().clone();
let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
let plan = ctx.optimize(&self.plan)?;
ctx.create_physical_plan(&plan)
}
}

#[async_trait]
Expand Down Expand Up @@ -138,26 +149,35 @@ impl DataFrame for DataFrameImpl {
self.plan.clone()
}

// Convert the logical plan represented by this DataFrame into a physical plan and
// execute it
/// Convert the logical plan represented by this DataFrame into a physical plan and
/// execute it, collecting all resulting batches into memory
async fn collect(&self) -> Result<Vec<RecordBatch>> {
let state = self.ctx_state.lock().unwrap().clone();
let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
let plan = ctx.optimize(&self.plan)?;
let plan = ctx.create_physical_plan(&plan)?;
let plan = self.create_physical_plan().await?;
Ok(collect(plan).await?)
}

// Convert the logical plan represented by this DataFrame into a physical plan and
// execute it
/// Convert the logical plan represented by this DataFrame into a physical plan and
/// execute it, returning a stream over a single partition
async fn execute_stream(&self) -> Result<SendableRecordBatchStream> {
let plan = self.create_physical_plan().await?;
execute_stream(plan).await
}

/// Convert the logical plan represented by this DataFrame into a physical plan and
/// execute it, collecting all resulting batches into memory while maintaining
/// partitioning
async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>> {
let state = self.ctx_state.lock().unwrap().clone();
let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
let plan = ctx.optimize(&self.plan)?;
let plan = ctx.create_physical_plan(&plan)?;
let plan = self.create_physical_plan().await?;
Ok(collect_partitioned(plan).await?)
}

/// Convert the logical plan represented by this DataFrame into a physical plan and
/// execute it, returning a stream for each partition
async fn execute_stream_partitioned(&self) -> Result<Vec<SendableRecordBatchStream>> {
let plan = self.create_physical_plan().await?;
Ok(execute_stream_partitioned(plan).await?)
}

/// Returns the schema from the logical plan
fn schema(&self) -> &DFSchema {
self.plan.schema()
Expand Down
92 changes: 67 additions & 25 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

//! Traits for physical query plan, supporting parallel execution for partitioned relations.

use std::fmt;
use std::fmt::{Debug, Display};
use std::ops::Range;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, pin::Pin};

use self::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
};
Expand All @@ -35,12 +43,6 @@ use async_trait::async_trait;
pub use display::DisplayFormatType;
use futures::stream::Stream;
use hashbrown::HashMap;
use std::fmt;
use std::fmt::{Debug, Display};
use std::ops::Range;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{any::Any, pin::Pin};

/// Trait for types that stream [arrow::record_batch::RecordBatch]
pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
Expand All @@ -54,6 +56,37 @@ pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
/// Trait for a stream of record batches.
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send + Sync>>;

/// EmptyRecordBatchStream can be used to create a RecordBatchStream
/// that will produce no results
pub struct EmptyRecordBatchStream {
/// Schema
schema: SchemaRef,
}

impl EmptyRecordBatchStream {
/// Create an empty RecordBatchStream
pub fn new(schema: SchemaRef) -> Self {
Self { schema }
}
}

impl RecordBatchStream for EmptyRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

impl Stream for EmptyRecordBatchStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
}

/// SQL metric type
#[derive(Debug, Clone)]
pub enum MetricType {
Expand Down Expand Up @@ -313,18 +346,23 @@ pub fn plan_metrics(plan: Arc<dyn ExecutionPlan>) -> HashMap<String, SQLMetric>

/// Execute the [ExecutionPlan] and collect the results in memory
pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
let stream = execute_stream(plan).await?;
common::collect(stream).await
}

/// Execute the [ExecutionPlan] and return a single stream of results
pub async fn execute_stream(
plan: Arc<dyn ExecutionPlan>,
) -> Result<SendableRecordBatchStream> {
match plan.output_partitioning().partition_count() {
0 => Ok(vec![]),
1 => {
let it = plan.execute(0).await?;
common::collect(it).await
}
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1 => plan.execute(0).await,
_ => {
// merge into a single partition
let plan = CoalescePartitionsExec::new(plan.clone());
// CoalescePartitionsExec must produce a single partition
assert_eq!(1, plan.output_partitioning().partition_count());
common::collect(plan.execute(0).await?).await
plan.execute(0).await
}
}
}
Expand All @@ -333,20 +371,24 @@ pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
pub async fn collect_partitioned(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<Vec<RecordBatch>>> {
match plan.output_partitioning().partition_count() {
0 => Ok(vec![]),
1 => {
let it = plan.execute(0).await?;
Ok(vec![common::collect(it).await?])
}
_ => {
let mut partitions = vec![];
for i in 0..plan.output_partitioning().partition_count() {
partitions.push(common::collect(plan.execute(i).await?).await?)
}
Ok(partitions)
}
let streams = execute_stream_partitioned(plan).await?;
let mut batches = Vec::with_capacity(streams.len());
for stream in streams {
batches.push(common::collect(stream).await?);
}
Ok(batches)
}

/// Execute the [ExecutionPlan] and return a vec with one stream per output partition
pub async fn execute_stream_partitioned(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<SendableRecordBatchStream>> {
let num_partitions = plan.output_partitioning().partition_count();
let mut streams = Vec::with_capacity(num_partitions);
for i in 0..num_partitions {
streams.push(plan.execute(i).await?);
}
Ok(streams)
}

/// Partitioning schemes supported by operators.
Expand Down