Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move EmptyExec produce_one_row to be part of MemoryExec #8412

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl TableProvider for EmptyTable {
// even though there is no data, projections apply
let projected_schema = project_schema(&self.schema, projection)?;
Ok(Arc::new(
EmptyExec::new(false, projected_schema).with_partitions(self.partitions),
EmptyExec::new(projected_schema).with_partitions(self.partitions),
))
}
}
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ impl TableProvider for ListingTable {
if partitioned_file_lists.is_empty() {
let schema = self.schema();
let projected_schema = project_schema(&schema, projection)?;
return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
return Ok(Arc::new(EmptyExec::new(projected_schema)));
}

// extract types of partition columns
Expand Down Expand Up @@ -701,7 +701,7 @@ impl TableProvider for ListingTable {
let object_store_url = if let Some(url) = self.table_paths.get(0) {
url.object_store()
} else {
return Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))));
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};
// create the execution plan
self.options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use super::optimizer::PhysicalOptimizerRule;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_plan::aggregates::AggregateExec;
use crate::physical_plan::empty::EmptyExec;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::{expressions, AggregateExpr, ExecutionPlan, Statistics};
use crate::scalar::ScalarValue;
Expand Down Expand Up @@ -82,7 +82,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
// input can be entirely removed
Ok(Arc::new(ProjectionExec::try_new(
projections,
Arc::new(EmptyExec::new(true, plan.schema())),
Arc::new(MemoryExec::try_new_with_dummy_row(plan.schema(), 1)?),
)?))
} else {
plan.map_children(|child| self.optimize(child, _config))
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1623,12 +1623,12 @@ mod hash_join_tests {

let children = vec![
PipelineStatePropagator {
plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
unbounded: left_unbounded,
children: vec![],
},
PipelineStatePropagator {
plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
unbounded: right_unbounded,
children: vec![],
},
Expand Down
11 changes: 8 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1203,12 +1203,17 @@ impl DefaultPhysicalPlanner {
}
LogicalPlan::Subquery(_) => todo!(),
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row,
produce_one_row: false,
schema,
}) => Ok(Arc::new(EmptyExec::new(
*produce_one_row,
SchemaRef::new(schema.as_ref().to_owned().into()),
))),
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: true,
schema,
}) => Ok(Arc::new(
MemoryExec::try_new_with_dummy_row(SchemaRef::new(schema.as_ref().to_owned().into()), 1)?
)),
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
self.create_initial_plan(input, session_state).await
}
Expand Down Expand Up @@ -2774,7 +2779,7 @@ mod tests {

digraph {
1[shape=box label="ProjectionExec: expr=[id@0 + 2 as employee.id + Int32(2)]", tooltip=""]
2[shape=box label="EmptyExec: produce_one_row=false", tooltip=""]
2[shape=box label="EmptyExec", tooltip=""]
1 -> 2 [arrowhead=none, arrowtail=normal, dir=back]
}
// End DataFusion GraphViz Plan
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::logical_expr::{
col, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, UNNAMED_TABLE,
};
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
collect, ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
Expand All @@ -42,6 +41,7 @@ use datafusion_common::project_schema;
use datafusion_common::stats::Precision;

use async_trait::async_trait;
use datafusion_physical_plan::memory::MemoryExec;
use futures::stream::Stream;

/// Also run all tests that are found in the `custom_sources_cases` directory
Expand Down Expand Up @@ -258,7 +258,7 @@ async fn optimizers_catch_all_statistics() {

// when the optimization kicks in, the source is replaced by an EmptyExec
assert!(
contains_empty_exec(Arc::clone(&physical_plan)),
contains_memory_exec(Arc::clone(&physical_plan)),
"Expected aggregate_statistics optimizations missing: {physical_plan:?}"
);

Expand All @@ -283,12 +283,12 @@ async fn optimizers_catch_all_statistics() {
assert_eq!(format!("{:?}", actual[0]), format!("{expected:?}"));
}

fn contains_empty_exec(plan: Arc<dyn ExecutionPlan>) -> bool {
if plan.as_any().is::<EmptyExec>() {
fn contains_memory_exec(plan: Arc<dyn ExecutionPlan>) -> bool {
if plan.as_any().is::<MemoryExec>() {
true
} else if plan.children().len() != 1 {
false
} else {
contains_empty_exec(Arc::clone(&plan.children()[0]))
contains_memory_exec(Arc::clone(&plan.children()[0]))
}
}
4 changes: 2 additions & 2 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ async fn explain_analyze_runs_optimizers() {

// This happens as an optimization pass where count(*) can be
// answered using statistics only.
let expected = "EmptyExec: produce_one_row=true";
let expected = "MemoryExec: partitions=1, partition_sizes=[1]";

let sql = "EXPLAIN SELECT count(*) from alltypes_plain";
let actual = execute_to_batches(&ctx, sql).await;
Expand Down Expand Up @@ -806,7 +806,7 @@ async fn explain_physical_plan_only() {
let expected = vec![vec![
"physical_plan",
"ProjectionExec: expr=[2 as COUNT(*)]\
\n EmptyExec: produce_one_row=true\
\n MemoryExec: partitions=1, partition_sizes=[1]\
\n",
]];
assert_eq!(expected, actual);
Expand Down
6 changes: 3 additions & 3 deletions datafusion/optimizer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ Looking at the `EXPLAIN` output we can see that the optimizer has effectively re
| logical_plan | Projection: Int64(3) AS Int64(1) + Int64(2) |
| | EmptyRelation |
| physical_plan | ProjectionExec: expr=[3 as Int64(1) + Int64(2)] |
| | EmptyExec: produce_one_row=true |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+-------------------------------------------------+
```
Expand Down Expand Up @@ -318,15 +318,15 @@ In the following example, the `type_coercion` and `simplify_expressions` passes
| logical_plan | Projection: Utf8("3.2") AS foo |
| | EmptyRelation |
| initial_physical_plan | ProjectionExec: expr=[3.2 as foo] |
| | EmptyExec: produce_one_row=true |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | |
| physical_plan after aggregate_statistics | SAME TEXT AS ABOVE |
| physical_plan after join_selection | SAME TEXT AS ABOVE |
| physical_plan after coalesce_batches | SAME TEXT AS ABOVE |
| physical_plan after repartition | SAME TEXT AS ABOVE |
| physical_plan after add_merge_exec | SAME TEXT AS ABOVE |
| physical_plan | ProjectionExec: expr=[3.2 as foo] |
| | EmptyExec: produce_one_row=true |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | |
+------------------------------------------------------------+---------------------------------------------------------------------------+
```
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
/// ```dot
/// strict digraph dot_plan {
// 0[label="ProjectionExec: expr=[id@0 + 2 as employee.id + Int32(2)]",tooltip=""]
// 1[label="EmptyExec: produce_one_row=false",tooltip=""]
// 1[label="EmptyExec",tooltip=""]
// 0 -> 1
// }
/// ```
Expand Down
89 changes: 8 additions & 81 deletions datafusion/physical-plan/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ use super::expressions::PhysicalSortExpr;
use super::{common, DisplayAs, SendableRecordBatchStream, Statistics};
use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning};

use arrow::array::{ArrayRef, NullArray};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
Expand All @@ -35,8 +34,6 @@ use log::trace;
/// Execution plan for empty relation (produces no rows)
#[derive(Debug)]
pub struct EmptyExec {
/// Specifies whether this exec produces a row or not
produce_one_row: bool,
/// The schema for the produced row
schema: SchemaRef,
/// Number of partitions
Expand All @@ -45,9 +42,8 @@ pub struct EmptyExec {

impl EmptyExec {
/// Create a new EmptyExec
pub fn new(produce_one_row: bool, schema: SchemaRef) -> Self {
pub fn new(schema: SchemaRef) -> Self {
EmptyExec {
produce_one_row,
schema,
partitions: 1,
}
Expand All @@ -59,36 +55,8 @@ impl EmptyExec {
self
}

/// Specifies whether this exec produces a row or not
pub fn produce_one_row(&self) -> bool {
self.produce_one_row
}

fn data(&self) -> Result<Vec<RecordBatch>> {
let batch = if self.produce_one_row {
let n_field = self.schema.fields.len();
// hack for https://github.com/apache/arrow-datafusion/pull/3242
let n_field = if n_field == 0 { 1 } else { n_field };
vec![RecordBatch::try_new(
Arc::new(Schema::new(
(0..n_field)
.map(|i| {
Field::new(format!("placeholder_{i}"), DataType::Null, true)
})
.collect::<Fields>(),
)),
(0..n_field)
.map(|_i| {
let ret: ArrayRef = Arc::new(NullArray::new(1));
ret
})
.collect(),
)?]
} else {
vec![]
};

Ok(batch)
Ok(vec![])
}
}

Expand All @@ -100,7 +68,7 @@ impl DisplayAs for EmptyExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "EmptyExec: produce_one_row={}", self.produce_one_row)
write!(f, "EmptyExec")
}
}
}
Expand Down Expand Up @@ -133,10 +101,7 @@ impl ExecutionPlan for EmptyExec {
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(EmptyExec::new(
self.produce_one_row,
self.schema.clone(),
)))
Ok(Arc::new(EmptyExec::new(self.schema.clone())))
}

fn execute(
Expand Down Expand Up @@ -184,7 +149,7 @@ mod tests {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();

let empty = EmptyExec::new(false, schema.clone());
let empty = EmptyExec::new(schema.clone());
assert_eq!(empty.schema(), schema);

// we should have no results
Expand All @@ -198,16 +163,11 @@ mod tests {
#[test]
fn with_new_children() -> Result<()> {
let schema = test::aggr_test_schema();
let empty = Arc::new(EmptyExec::new(false, schema.clone()));
let empty_with_row = Arc::new(EmptyExec::new(true, schema));
let empty = Arc::new(EmptyExec::new(schema.clone()));

let empty2 = with_new_children_if_necessary(empty.clone(), vec![])?.into();
assert_eq!(empty.schema(), empty2.schema());

let empty_with_row_2 =
with_new_children_if_necessary(empty_with_row.clone(), vec![])?.into();
assert_eq!(empty_with_row.schema(), empty_with_row_2.schema());

let too_many_kids = vec![empty2];
assert!(
with_new_children_if_necessary(empty, too_many_kids).is_err(),
Expand All @@ -220,44 +180,11 @@ mod tests {
async fn invalid_execute() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let empty = EmptyExec::new(false, schema);
let empty = EmptyExec::new(schema);

// ask for the wrong partition
assert!(empty.execute(1, task_ctx.clone()).is_err());
assert!(empty.execute(20, task_ctx).is_err());
Ok(())
}

#[tokio::test]
async fn produce_one_row() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let empty = EmptyExec::new(true, schema);

let iter = empty.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;

// should have one item
assert_eq!(batches.len(), 1);

Ok(())
}

#[tokio::test]
async fn produce_one_row_multiple_partition() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let partitions = 3;
let empty = EmptyExec::new(true, schema).with_partitions(partitions);

for n in 0..partitions {
let iter = empty.execute(n, task_ctx.clone())?;
let batches = common::collect(iter).await?;

// should have one item
assert_eq!(batches.len(), 1);
}

Ok(())
}
}
Loading
Loading