Skip to content

Commit

Permalink
[Part2] Partition and Sort Enforcement, ExecutionPlan enhancement (#4043
Browse files Browse the repository at this point in the history
)

* [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

* Fix hash join output_partitioning

* fix

* fix format

* Resolve review comments

* tiny fix

* UT to verify hash join output_partitioning

* fix comments
  • Loading branch information
mingmwang authored Nov 6, 2022
1 parent 238e179 commit b7a3331
Show file tree
Hide file tree
Showing 36 changed files with 1,537 additions and 234 deletions.
162 changes: 162 additions & 0 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,8 @@ mod tests {
use super::*;
use crate::execution::options::{CsvReadOptions, ParquetReadOptions};
use crate::physical_plan::ColumnarValue;
use crate::physical_plan::Partitioning;
use crate::physical_plan::PhysicalExpr;
use crate::test_util;
use crate::test_util::parquet_test_data;
use crate::{assert_batches_sorted_eq, execution::context::SessionContext};
Expand All @@ -851,6 +853,7 @@ mod tests {
avg, cast, count, count_distinct, create_udf, lit, max, min, sum,
BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFunction,
};
use datafusion_physical_expr::expressions::Column;

#[tokio::test]
async fn select_columns() -> Result<()> {
Expand Down Expand Up @@ -1515,4 +1518,163 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn partition_aware_union() -> Result<()> {
let left = test_table().await?.select_columns(&["c1", "c2"])?;
let right = test_table_with_name("c2")
.await?
.select_columns(&["c1", "c3"])?
.with_column_renamed("c2.c1", "c2_c1")?;

let left_rows = left.collect().await?;
let right_rows = right.collect().await?;
let join1 =
left.join(right.clone(), JoinType::Inner, &["c1"], &["c2_c1"], None)?;
let join2 = left.join(right, JoinType::Inner, &["c1"], &["c2_c1"], None)?;

let union = join1.union(join2)?;

let union_rows = union.collect().await?;

assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>());
assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());

let physical_plan = union.create_physical_plan().await?;
let default_partition_count =
SessionContext::new().copied_config().target_partitions;

// For partition aware union, the output partition count should not be changed.
assert_eq!(
physical_plan.output_partitioning().partition_count(),
default_partition_count
);
// For partition aware union, the output partition is the same with the union's inputs
for child in physical_plan.children() {
assert_eq!(
physical_plan.output_partitioning(),
child.output_partitioning()
);
}

Ok(())
}

#[tokio::test]
async fn non_partition_aware_union() -> Result<()> {
let left = test_table().await?.select_columns(&["c1", "c2"])?;
let right = test_table_with_name("c2")
.await?
.select_columns(&["c1", "c2"])?
.with_column_renamed("c2.c1", "c2_c1")?
.with_column_renamed("c2.c2", "c2_c2")?;

let left_rows = left.collect().await?;
let right_rows = right.collect().await?;
let join1 = left.join(
right.clone(),
JoinType::Inner,
&["c1", "c2"],
&["c2_c1", "c2_c2"],
None,
)?;

// join key ordering is different
let join2 = left.join(
right,
JoinType::Inner,
&["c2", "c1"],
&["c2_c2", "c2_c1"],
None,
)?;

let union = join1.union(join2)?;

let union_rows = union.collect().await?;

assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>());
assert_eq!(916, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());

let physical_plan = union.create_physical_plan().await?;
let default_partition_count =
SessionContext::new().copied_config().target_partitions;

// For non-partition aware union, the output partitioning count should be the combination of all output partitions count
assert!(matches!(
physical_plan.output_partitioning(),
Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count * 2));
Ok(())
}

#[tokio::test]
async fn verify_join_output_partitioning() -> Result<()> {
let left = test_table().await?.select_columns(&["c1", "c2"])?;
let right = test_table_with_name("c2")
.await?
.select_columns(&["c1", "c2"])?
.with_column_renamed("c2.c1", "c2_c1")?
.with_column_renamed("c2.c2", "c2_c2")?;

let all_join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::RightSemi,
JoinType::LeftAnti,
JoinType::RightAnti,
];

let default_partition_count =
SessionContext::new().copied_config().target_partitions;

for join_type in all_join_types {
let join = left.join(
right.clone(),
join_type,
&["c1", "c2"],
&["c2_c1", "c2_c2"],
None,
)?;
let physical_plan = join.create_physical_plan().await?;
let out_partitioning = physical_plan.output_partitioning();
let join_schema = physical_plan.schema();

match join_type {
JoinType::Inner
| JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti => {
let left_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("c1", &join_schema).unwrap()),
Arc::new(Column::new_with_schema("c2", &join_schema).unwrap()),
];
assert_eq!(
out_partitioning,
Partitioning::Hash(left_exprs, default_partition_count)
);
}
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
let right_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("c2_c1", &join_schema).unwrap()),
Arc::new(Column::new_with_schema("c2_c2", &join_schema).unwrap()),
];
assert_eq!(
out_partitioning,
Partitioning::Hash(right_exprs, default_partition_count)
);
}
JoinType::Full => {
assert!(matches!(
out_partitioning,
Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count));
}
}
}

Ok(())
}
}
35 changes: 14 additions & 21 deletions datafusion/core/src/physical_optimizer/merge_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,20 @@ impl PhysicalOptimizerRule for AddCoalescePartitionsExec {
.iter()
.map(|child| self.optimize(child.clone(), _config))
.collect::<Result<Vec<_>>>()?;
match plan.required_child_distribution() {
Distribution::UnspecifiedDistribution => {
with_new_children_if_necessary(plan, children)
}
Distribution::HashPartitioned(_) => {
with_new_children_if_necessary(plan, children)
}
Distribution::SinglePartition => with_new_children_if_necessary(
plan,
children
.iter()
.map(|child| {
if child.output_partitioning().partition_count() == 1 {
child.clone()
} else {
Arc::new(CoalescePartitionsExec::new(child.clone()))
}
})
.collect(),
),
}
assert_eq!(children.len(), plan.required_input_distribution().len());
let new_children = children
.into_iter()
.zip(plan.required_input_distribution())
.map(|(child, dist)| match dist {
Distribution::SinglePartition
if child.output_partitioning().partition_count() > 1 =>
{
Arc::new(CoalescePartitionsExec::new(child.clone()))
}
_ => child,
})
.collect::<Vec<_>>();
with_new_children_if_necessary(plan, new_children)
}
}

Expand Down
63 changes: 54 additions & 9 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use datafusion_physical_expr::{
expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr,
};
use std::any::Any;
use std::collections::HashMap;

use std::sync::Arc;

Expand All @@ -45,9 +46,11 @@ mod no_grouping;
mod row_hash;

use crate::physical_plan::aggregates::row_hash::GroupedHashAggregateStreamV2;
use crate::physical_plan::EquivalenceProperties;
pub use datafusion_expr::AggregateFunction;
use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
use datafusion_physical_expr::normalize_out_expr_with_alias_schema;
use datafusion_row::{row_supported, RowType};

/// Hash aggregate modes
Expand Down Expand Up @@ -163,6 +166,9 @@ pub struct AggregateExec {
/// same as input.schema() but for the final aggregate it will be the same as the input
/// to the partial aggregate
input_schema: SchemaRef,
/// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr
/// The key is the column from the input schema and the values are the columns from the output schema
alias_map: HashMap<Column, Vec<Column>>,
/// Execution Metrics
metrics: ExecutionPlanMetricsSet,
}
Expand All @@ -186,13 +192,26 @@ impl AggregateExec {

let schema = Arc::new(schema);

let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();
for (expression, name) in group_by.expr.iter() {
if let Some(column) = expression.as_any().downcast_ref::<Column>() {
let new_col_idx = schema.index_of(name)?;
// When the column name is the same, but index does not equal, treat it as Alias
if (column.name() != name) || (column.index() != new_col_idx) {
let entry = alias_map.entry(column.clone()).or_insert_with(Vec::new);
entry.push(Column::new(name, new_col_idx));
}
};
}

Ok(AggregateExec {
mode,
group_by,
aggr_expr,
input,
schema,
input_schema,
alias_map,
metrics: ExecutionPlanMetricsSet::new(),
})
}
Expand Down Expand Up @@ -255,25 +274,51 @@ impl ExecutionPlan for AggregateExec {

/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
self.input.output_partitioning()
match &self.mode {
AggregateMode::Partial => {
// Partial Aggregation will not change the output partitioning but need to respect the Alias
let input_partition = self.input.output_partitioning();
match input_partition {
Partitioning::Hash(exprs, part) => {
let normalized_exprs = exprs
.into_iter()
.map(|expr| {
normalize_out_expr_with_alias_schema(
expr,
&self.alias_map,
&self.schema,
)
})
.collect::<Vec<_>>();
Partitioning::Hash(normalized_exprs, part)
}
_ => input_partition,
}
}
// Final Aggregation's output partitioning is the same as its real input
_ => self.input.output_partitioning(),
}
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn required_child_distribution(&self) -> Distribution {
fn required_input_distribution(&self) -> Vec<Distribution> {
match &self.mode {
AggregateMode::Partial => Distribution::UnspecifiedDistribution,
AggregateMode::FinalPartitioned => Distribution::HashPartitioned(
self.group_by.expr.iter().map(|x| x.0.clone()).collect(),
),
AggregateMode::Final => Distribution::SinglePartition,
AggregateMode::Partial => vec![Distribution::UnspecifiedDistribution],
AggregateMode::FinalPartitioned => {
vec![Distribution::HashPartitioned(self.output_group_expr())]
}
AggregateMode::Final => vec![Distribution::SinglePartition],
}
}

fn relies_on_input_order(&self) -> bool {
false
fn equivalence_properties(&self) -> EquivalenceProperties {
let mut input_equivalence_properties = self.input.equivalence_properties();
input_equivalence_properties.merge_properties_with_alias(&self.alias_map);
input_equivalence_properties.truncate_properties_not_in_schema(&self.schema);
input_equivalence_properties
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
8 changes: 2 additions & 6 deletions datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ impl ExecutionPlan for AnalyzeExec {
}

/// Specifies we want the input as a single stream
fn required_child_distribution(&self) -> Distribution {
Distribution::SinglePartition
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
}

/// Get the output partitioning of this plan
Expand All @@ -85,10 +85,6 @@ impl ExecutionPlan for AnalyzeExec {
None
}

fn relies_on_input_order(&self) -> bool {
false
}

fn with_new_children(
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use std::task::{Context, Poll};

use crate::error::Result;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream,
};

use crate::execution::context::TaskContext;
Expand Down Expand Up @@ -100,8 +100,8 @@ impl ExecutionPlan for CoalesceBatchesExec {
None
}

fn relies_on_input_order(&self) -> bool {
false
fn equivalence_properties(&self) -> EquivalenceProperties {
self.input.equivalence_properties()
}

fn with_new_children(
Expand Down
Loading

0 comments on commit b7a3331

Please sign in to comment.