Skip to content

Commit

Permalink
move EmptyExec produce_one_row to be part of MemoryExec
Browse files Browse the repository at this point in the history
  • Loading branch information
razeghi71 committed Dec 4, 2023
1 parent 26196e6 commit 0a8d519
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 135 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl TableProvider for EmptyTable {
// even though there is no data, projections apply
let projected_schema = project_schema(&self.schema, projection)?;
Ok(Arc::new(
EmptyExec::new(false, projected_schema).with_partitions(self.partitions),
EmptyExec::new(projected_schema).with_partitions(self.partitions),
))
}
}
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ impl TableProvider for ListingTable {
if partitioned_file_lists.is_empty() {
let schema = self.schema();
let projected_schema = project_schema(&schema, projection)?;
return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
return Ok(Arc::new(EmptyExec::new(projected_schema)));
}

// extract types of partition columns
Expand Down Expand Up @@ -701,7 +701,7 @@ impl TableProvider for ListingTable {
let object_store_url = if let Some(url) = self.table_paths.get(0) {
url.object_store()
} else {
return Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))));
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};
// create the execution plan
self.options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use super::optimizer::PhysicalOptimizerRule;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_plan::aggregates::AggregateExec;
use crate::physical_plan::empty::EmptyExec;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::{expressions, AggregateExpr, ExecutionPlan, Statistics};
use crate::scalar::ScalarValue;
Expand Down Expand Up @@ -82,7 +82,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
// input can be entirely removed
Ok(Arc::new(ProjectionExec::try_new(
projections,
Arc::new(EmptyExec::new(true, plan.schema())),
Arc::new(MemoryExec::try_new_with_dummy_row(plan.schema(), 1)?),
)?))
} else {
plan.map_children(|child| self.optimize(child, _config))
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1623,12 +1623,12 @@ mod hash_join_tests {

let children = vec![
PipelineStatePropagator {
plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
unbounded: left_unbounded,
children: vec![],
},
PipelineStatePropagator {
plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
unbounded: right_unbounded,
children: vec![],
},
Expand Down
9 changes: 7 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1203,12 +1203,17 @@ impl DefaultPhysicalPlanner {
}
LogicalPlan::Subquery(_) => todo!(),
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row,
produce_one_row: false,
schema,
}) => Ok(Arc::new(EmptyExec::new(
*produce_one_row,
SchemaRef::new(schema.as_ref().to_owned().into()),
))),
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: true,
schema,
}) => Ok(Arc::new(
MemoryExec::try_new_with_dummy_row(SchemaRef::new(schema.as_ref().to_owned().into()), 1)?
)),
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
self.create_initial_plan(input, session_state).await
}
Expand Down
89 changes: 8 additions & 81 deletions datafusion/physical-plan/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ use super::expressions::PhysicalSortExpr;
use super::{common, DisplayAs, SendableRecordBatchStream, Statistics};
use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning};

use arrow::array::{ArrayRef, NullArray};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
Expand All @@ -35,8 +34,6 @@ use log::trace;
/// Execution plan for empty relation (produces no rows)
#[derive(Debug)]
pub struct EmptyExec {
/// Specifies whether this exec produces a row or not
produce_one_row: bool,
/// The schema for the produced row
schema: SchemaRef,
/// Number of partitions
Expand All @@ -45,9 +42,8 @@ pub struct EmptyExec {

impl EmptyExec {
/// Create a new EmptyExec
pub fn new(produce_one_row: bool, schema: SchemaRef) -> Self {
pub fn new(schema: SchemaRef) -> Self {
EmptyExec {
produce_one_row,
schema,
partitions: 1,
}
Expand All @@ -59,36 +55,8 @@ impl EmptyExec {
self
}

/// Specifies whether this exec produces a row or not
pub fn produce_one_row(&self) -> bool {
self.produce_one_row
}

fn data(&self) -> Result<Vec<RecordBatch>> {
let batch = if self.produce_one_row {
let n_field = self.schema.fields.len();
// hack for https://github.com/apache/arrow-datafusion/pull/3242
let n_field = if n_field == 0 { 1 } else { n_field };
vec![RecordBatch::try_new(
Arc::new(Schema::new(
(0..n_field)
.map(|i| {
Field::new(format!("placeholder_{i}"), DataType::Null, true)
})
.collect::<Fields>(),
)),
(0..n_field)
.map(|_i| {
let ret: ArrayRef = Arc::new(NullArray::new(1));
ret
})
.collect(),
)?]
} else {
vec![]
};

Ok(batch)
Ok(vec![])
}
}

Expand All @@ -100,7 +68,7 @@ impl DisplayAs for EmptyExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "EmptyExec: produce_one_row={}", self.produce_one_row)
write!(f, "EmptyExec")
}
}
}
Expand Down Expand Up @@ -133,10 +101,7 @@ impl ExecutionPlan for EmptyExec {
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(EmptyExec::new(
self.produce_one_row,
self.schema.clone(),
)))
Ok(Arc::new(EmptyExec::new(self.schema.clone())))
}

fn execute(
Expand Down Expand Up @@ -184,7 +149,7 @@ mod tests {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();

let empty = EmptyExec::new(false, schema.clone());
let empty = EmptyExec::new(schema.clone());
assert_eq!(empty.schema(), schema);

// we should have no results
Expand All @@ -198,16 +163,11 @@ mod tests {
#[test]
fn with_new_children() -> Result<()> {
let schema = test::aggr_test_schema();
let empty = Arc::new(EmptyExec::new(false, schema.clone()));
let empty_with_row = Arc::new(EmptyExec::new(true, schema));
let empty = Arc::new(EmptyExec::new(schema.clone()));

let empty2 = with_new_children_if_necessary(empty.clone(), vec![])?.into();
assert_eq!(empty.schema(), empty2.schema());

let empty_with_row_2 =
with_new_children_if_necessary(empty_with_row.clone(), vec![])?.into();
assert_eq!(empty_with_row.schema(), empty_with_row_2.schema());

let too_many_kids = vec![empty2];
assert!(
with_new_children_if_necessary(empty, too_many_kids).is_err(),
Expand All @@ -220,44 +180,11 @@ mod tests {
async fn invalid_execute() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let empty = EmptyExec::new(false, schema);
let empty = EmptyExec::new(schema);

// ask for the wrong partition
assert!(empty.execute(1, task_ctx.clone()).is_err());
assert!(empty.execute(20, task_ctx).is_err());
Ok(())
}

#[tokio::test]
async fn produce_one_row() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let empty = EmptyExec::new(true, schema);

let iter = empty.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;

// should have one item
assert_eq!(batches.len(), 1);

Ok(())
}

#[tokio::test]
async fn produce_one_row_multiple_partition() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let partitions = 3;
let empty = EmptyExec::new(true, schema).with_partitions(partitions);

for n in 0..partitions {
let iter = empty.execute(n, task_ctx.clone())?;
let batches = common::collect(iter).await?;

// should have one item
assert_eq!(batches.len(), 1);
}

Ok(())
}
}
58 changes: 58 additions & 0 deletions datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use super::{
SendableRecordBatchStream, Statistics,
};

use arrow::array::{ArrayRef, NullArray};
use arrow::datatypes::SchemaRef;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, project_schema, DataFusionError, Result};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -177,6 +179,26 @@ impl MemoryExec {
})
}

pub fn try_new_with_dummy_row(schema: SchemaRef, partitions: usize) -> Result<Self> {
let n_field = schema.fields.len();
// hack for https://github.com/apache/arrow-datafusion/pull/3242
let n_field = if n_field == 0 { 1 } else { n_field };
let part = vec![RecordBatch::try_new(
Arc::new(Schema::new(
(0..n_field)
.map(|i| Field::new(format!("placeholder_{i}"), DataType::Null, true))
.collect::<Fields>(),
)),
(0..n_field)
.map(|_i| {
let ret: ArrayRef = Arc::new(NullArray::new(1));
ret
})
.collect(),
)?];
Self::try_new(&vec![part; partitions], schema, None)
}

pub fn partitions(&self) -> &[Vec<RecordBatch>] {
&self.partitions
}
Expand Down Expand Up @@ -280,11 +302,14 @@ mod tests {

use crate::memory::MemoryExec;
use crate::ExecutionPlan;
use crate::{common, test};

use arrow_schema::{DataType, Field, Schema, SortOptions};
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::PhysicalSortExpr;

use datafusion_execution::TaskContext;

#[test]
fn test_memory_order_eq() -> datafusion_common::Result<()> {
let schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -316,4 +341,37 @@ mod tests {
assert!(eq_properties.oeq_class().contains(&expected_order_eq));
Ok(())
}

#[tokio::test]
async fn dummy_row() -> datafusion_common::Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let dummy: MemoryExec = MemoryExec::try_new_with_dummy_row(schema, 1)?;

let iter = dummy.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;

// should have one item
assert_eq!(batches.len(), 1);

Ok(())
}

#[tokio::test]
async fn dummy_row_multiple_partition() -> datafusion_common::Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let partitions = 3;
let dummy: MemoryExec = MemoryExec::try_new_with_dummy_row(schema, partitions)?;

for n in 0..partitions {
let iter = dummy.execute(n, task_ctx.clone())?;
let batches = common::collect(iter).await?;

// should have one item
assert_eq!(batches.len(), 1);
}

Ok(())
}
}
1 change: 0 additions & 1 deletion datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1484,7 +1484,6 @@ message JoinOn {
}

message EmptyExecNode {
bool produce_one_row = 1;
Schema schema = 2;
}

Expand Down
12 changes: 0 additions & 12 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0a8d519

Please sign in to comment.