Skip to content

Commit

Permalink
Split EmptyExec into PlaceholderRowExec (apache#8446)
Browse files Browse the repository at this point in the history
* add PlaceHolderRowExec

* Change produce_one_row=true calls to use PlaceHolderRowExec

* remove produce_one_row from EmptyExec, changes in proto serializer, working tests

* PlaceHolder => Placeholder

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
razeghi71 and alamb authored Dec 9, 2023
1 parent 62ee8fb commit d091b55
Show file tree
Hide file tree
Showing 23 changed files with 459 additions and 183 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl TableProvider for EmptyTable {
// even though there is no data, projections apply
let projected_schema = project_schema(&self.schema, projection)?;
Ok(Arc::new(
EmptyExec::new(false, projected_schema).with_partitions(self.partitions),
EmptyExec::new(projected_schema).with_partitions(self.partitions),
))
}
}
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ impl TableProvider for ListingTable {
if partitioned_file_lists.is_empty() {
let schema = self.schema();
let projected_schema = project_schema(&schema, projection)?;
return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
return Ok(Arc::new(EmptyExec::new(projected_schema)));
}

// extract types of partition columns
Expand Down Expand Up @@ -713,7 +713,7 @@ impl TableProvider for ListingTable {
let object_store_url = if let Some(url) = self.table_paths.first() {
url.object_store()
} else {
return Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))));
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};
// create the execution plan
self.options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ use super::optimizer::PhysicalOptimizerRule;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_plan::aggregates::AggregateExec;
use crate::physical_plan::empty::EmptyExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::{expressions, AggregateExpr, ExecutionPlan, Statistics};
use crate::scalar::ScalarValue;

use datafusion_common::stats::Precision;
use datafusion_common::tree_node::TreeNode;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;

/// Optimizer that uses available statistics for aggregate functions
#[derive(Default)]
Expand Down Expand Up @@ -82,7 +82,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
// input can be entirely removed
Ok(Arc::new(ProjectionExec::try_new(
projections,
Arc::new(EmptyExec::new(true, plan.schema())),
Arc::new(PlaceholderRowExec::new(plan.schema())),
)?))
} else {
plan.map_children(|child| self.optimize(child, _config))
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1623,12 +1623,12 @@ mod hash_join_tests {

let children = vec![
PipelineStatePropagator {
plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
unbounded: left_unbounded,
children: vec![],
},
PipelineStatePropagator {
plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
unbounded: right_unbounded,
children: vec![],
},
Expand Down
12 changes: 9 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ use datafusion_expr::{
WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_sql::utils::window_expr_common_partition_keys;

use async_trait::async_trait;
Expand Down Expand Up @@ -1196,10 +1197,15 @@ impl DefaultPhysicalPlanner {
}
LogicalPlan::Subquery(_) => todo!(),
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row,
produce_one_row: false,
schema,
}) => Ok(Arc::new(EmptyExec::new(
*produce_one_row,
SchemaRef::new(schema.as_ref().to_owned().into()),
))),
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: true,
schema,
}) => Ok(Arc::new(PlaceholderRowExec::new(
SchemaRef::new(schema.as_ref().to_owned().into()),
))),
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
Expand Down Expand Up @@ -2767,7 +2773,7 @@ mod tests {
digraph {
1[shape=box label="ProjectionExec: expr=[id@0 + 2 as employee.id + Int32(2)]", tooltip=""]
2[shape=box label="EmptyExec: produce_one_row=false", tooltip=""]
2[shape=box label="EmptyExec", tooltip=""]
1 -> 2 [arrowhead=none, arrowtail=normal, dir=back]
}
// End DataFusion GraphViz Plan
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::logical_expr::{
col, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, UNNAMED_TABLE,
};
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
collect, ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
Expand All @@ -42,6 +41,7 @@ use datafusion_common::project_schema;
use datafusion_common::stats::Precision;

use async_trait::async_trait;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use futures::stream::Stream;

/// Also run all tests that are found in the `custom_sources_cases` directory
Expand Down Expand Up @@ -256,9 +256,9 @@ async fn optimizers_catch_all_statistics() {

let physical_plan = df.create_physical_plan().await.unwrap();

// when the optimization kicks in, the source is replaced by an EmptyExec
// when the optimization kicks in, the source is replaced by an PlaceholderRowExec
assert!(
contains_empty_exec(Arc::clone(&physical_plan)),
contains_place_holder_exec(Arc::clone(&physical_plan)),
"Expected aggregate_statistics optimizations missing: {physical_plan:?}"
);

Expand All @@ -283,12 +283,12 @@ async fn optimizers_catch_all_statistics() {
assert_eq!(format!("{:?}", actual[0]), format!("{expected:?}"));
}

fn contains_empty_exec(plan: Arc<dyn ExecutionPlan>) -> bool {
if plan.as_any().is::<EmptyExec>() {
fn contains_place_holder_exec(plan: Arc<dyn ExecutionPlan>) -> bool {
if plan.as_any().is::<PlaceholderRowExec>() {
true
} else if plan.children().len() != 1 {
false
} else {
contains_empty_exec(Arc::clone(&plan.children()[0]))
contains_place_holder_exec(Arc::clone(&plan.children()[0]))
}
}
4 changes: 2 additions & 2 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ async fn explain_analyze_runs_optimizers() {

// This happens as an optimization pass where count(*) can be
// answered using statistics only.
let expected = "EmptyExec: produce_one_row=true";
let expected = "PlaceholderRowExec";

let sql = "EXPLAIN SELECT count(*) from alltypes_plain";
let actual = execute_to_batches(&ctx, sql).await;
Expand Down Expand Up @@ -806,7 +806,7 @@ async fn explain_physical_plan_only() {
let expected = vec![vec![
"physical_plan",
"ProjectionExec: expr=[2 as COUNT(*)]\
\n EmptyExec: produce_one_row=true\
\n PlaceholderRowExec\
\n",
]];
assert_eq!(expected, actual);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,7 @@ impl LogicalPlan {
self.with_new_exprs(new_exprs, &new_inputs_with_values)
}

/// Walk the logical plan, find any `PlaceHolder` tokens, and return a map of their IDs and DataTypes
/// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
pub fn get_parameter_types(
&self,
) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/optimizer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ Looking at the `EXPLAIN` output we can see that the optimizer has effectively re
| logical_plan | Projection: Int64(3) AS Int64(1) + Int64(2) |
| | EmptyRelation |
| physical_plan | ProjectionExec: expr=[3 as Int64(1) + Int64(2)] |
| | EmptyExec: produce_one_row=true |
| | PlaceholderRowExec |
| | |
+---------------+-------------------------------------------------+
```
Expand Down Expand Up @@ -318,15 +318,15 @@ In the following example, the `type_coercion` and `simplify_expressions` passes
| logical_plan | Projection: Utf8("3.2") AS foo |
| | EmptyRelation |
| initial_physical_plan | ProjectionExec: expr=[3.2 as foo] |
| | EmptyExec: produce_one_row=true |
| | PlaceholderRowExec |
| | |
| physical_plan after aggregate_statistics | SAME TEXT AS ABOVE |
| physical_plan after join_selection | SAME TEXT AS ABOVE |
| physical_plan after coalesce_batches | SAME TEXT AS ABOVE |
| physical_plan after repartition | SAME TEXT AS ABOVE |
| physical_plan after add_merge_exec | SAME TEXT AS ABOVE |
| physical_plan | ProjectionExec: expr=[3.2 as foo] |
| | EmptyExec: produce_one_row=true |
| | PlaceholderRowExec |
| | |
+------------------------------------------------------------+---------------------------------------------------------------------------+
```
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
/// ```dot
/// strict digraph dot_plan {
// 0[label="ProjectionExec: expr=[id@0 + 2 as employee.id + Int32(2)]",tooltip=""]
// 1[label="EmptyExec: produce_one_row=false",tooltip=""]
// 1[label="EmptyExec",tooltip=""]
// 0 -> 1
// }
/// ```
Expand Down
93 changes: 10 additions & 83 deletions datafusion/physical-plan/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! EmptyRelation execution plan
//! EmptyRelation with produce_one_row=false execution plan
use std::any::Any;
use std::sync::Arc;
Expand All @@ -24,19 +24,16 @@ use super::expressions::PhysicalSortExpr;
use super::{common, DisplayAs, SendableRecordBatchStream, Statistics};
use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning};

use arrow::array::{ArrayRef, NullArray};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_execution::TaskContext;

use log::trace;

/// Execution plan for empty relation (produces no rows)
/// Execution plan for empty relation with produce_one_row=false
#[derive(Debug)]
pub struct EmptyExec {
/// Specifies whether this exec produces a row or not
produce_one_row: bool,
/// The schema for the produced row
schema: SchemaRef,
/// Number of partitions
Expand All @@ -45,9 +42,8 @@ pub struct EmptyExec {

impl EmptyExec {
/// Create a new EmptyExec
pub fn new(produce_one_row: bool, schema: SchemaRef) -> Self {
pub fn new(schema: SchemaRef) -> Self {
EmptyExec {
produce_one_row,
schema,
partitions: 1,
}
Expand All @@ -59,36 +55,8 @@ impl EmptyExec {
self
}

/// Specifies whether this exec produces a row or not
pub fn produce_one_row(&self) -> bool {
self.produce_one_row
}

fn data(&self) -> Result<Vec<RecordBatch>> {
let batch = if self.produce_one_row {
let n_field = self.schema.fields.len();
// hack for https://github.com/apache/arrow-datafusion/pull/3242
let n_field = if n_field == 0 { 1 } else { n_field };
vec![RecordBatch::try_new(
Arc::new(Schema::new(
(0..n_field)
.map(|i| {
Field::new(format!("placeholder_{i}"), DataType::Null, true)
})
.collect::<Fields>(),
)),
(0..n_field)
.map(|_i| {
let ret: ArrayRef = Arc::new(NullArray::new(1));
ret
})
.collect(),
)?]
} else {
vec![]
};

Ok(batch)
Ok(vec![])
}
}

Expand All @@ -100,7 +68,7 @@ impl DisplayAs for EmptyExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "EmptyExec: produce_one_row={}", self.produce_one_row)
write!(f, "EmptyExec")
}
}
}
Expand Down Expand Up @@ -133,10 +101,7 @@ impl ExecutionPlan for EmptyExec {
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(EmptyExec::new(
self.produce_one_row,
self.schema.clone(),
)))
Ok(Arc::new(EmptyExec::new(self.schema.clone())))
}

fn execute(
Expand Down Expand Up @@ -184,7 +149,7 @@ mod tests {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();

let empty = EmptyExec::new(false, schema.clone());
let empty = EmptyExec::new(schema.clone());
assert_eq!(empty.schema(), schema);

// we should have no results
Expand All @@ -198,16 +163,11 @@ mod tests {
#[test]
fn with_new_children() -> Result<()> {
let schema = test::aggr_test_schema();
let empty = Arc::new(EmptyExec::new(false, schema.clone()));
let empty_with_row = Arc::new(EmptyExec::new(true, schema));
let empty = Arc::new(EmptyExec::new(schema.clone()));

let empty2 = with_new_children_if_necessary(empty.clone(), vec![])?.into();
assert_eq!(empty.schema(), empty2.schema());

let empty_with_row_2 =
with_new_children_if_necessary(empty_with_row.clone(), vec![])?.into();
assert_eq!(empty_with_row.schema(), empty_with_row_2.schema());

let too_many_kids = vec![empty2];
assert!(
with_new_children_if_necessary(empty, too_many_kids).is_err(),
Expand All @@ -220,44 +180,11 @@ mod tests {
async fn invalid_execute() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let empty = EmptyExec::new(false, schema);
let empty = EmptyExec::new(schema);

// ask for the wrong partition
assert!(empty.execute(1, task_ctx.clone()).is_err());
assert!(empty.execute(20, task_ctx).is_err());
Ok(())
}

#[tokio::test]
async fn produce_one_row() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let empty = EmptyExec::new(true, schema);

let iter = empty.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;

// should have one item
assert_eq!(batches.len(), 1);

Ok(())
}

#[tokio::test]
async fn produce_one_row_multiple_partition() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let partitions = 3;
let empty = EmptyExec::new(true, schema).with_partitions(partitions);

for n in 0..partitions {
let iter = empty.execute(n, task_ctx.clone())?;
let batches = common::collect(iter).await?;

// should have one item
assert_eq!(batches.len(), 1);
}

Ok(())
}
}
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub mod limit;
pub mod memory;
pub mod metrics;
mod ordering;
pub mod placeholder_row;
pub mod projection;
pub mod repartition;
pub mod sorts;
Expand Down
Loading

0 comments on commit d091b55

Please sign in to comment.