Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Part2] Partition and Sort Enforcement, ExecutionPlan enhancement #4043

Merged
merged 10 commits into from
Nov 6, 2022
162 changes: 162 additions & 0 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,8 @@ mod tests {
use super::*;
use crate::execution::options::{CsvReadOptions, ParquetReadOptions};
use crate::physical_plan::ColumnarValue;
use crate::physical_plan::Partitioning;
use crate::physical_plan::PhysicalExpr;
use crate::test_util;
use crate::test_util::parquet_test_data;
use crate::{assert_batches_sorted_eq, execution::context::SessionContext};
Expand All @@ -851,6 +853,7 @@ mod tests {
avg, cast, count, count_distinct, create_udf, lit, max, min, sum,
BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFunction,
};
use datafusion_physical_expr::expressions::Column;

#[tokio::test]
async fn select_columns() -> Result<()> {
Expand Down Expand Up @@ -1515,4 +1518,163 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn partition_aware_union() -> Result<()> {
let left = test_table().await?.select_columns(&["c1", "c2"])?;
let right = test_table_with_name("c2")
.await?
.select_columns(&["c1", "c3"])?
.with_column_renamed("c2.c1", "c2_c1")?;

let left_rows = left.collect().await?;
let right_rows = right.collect().await?;
let join1 =
alamb marked this conversation as resolved.
Show resolved Hide resolved
left.join(right.clone(), JoinType::Inner, &["c1"], &["c2_c1"], None)?;
let join2 = left.join(right, JoinType::Inner, &["c1"], &["c2_c1"], None)?;

let union = join1.union(join2)?;

let union_rows = union.collect().await?;

assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>());
assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());

let physical_plan = union.create_physical_plan().await?;
let default_partition_count =
SessionContext::new().copied_config().target_partitions;

// For partition aware union, the output partition count should not be changed.
assert_eq!(
physical_plan.output_partitioning().partition_count(),
default_partition_count
);
// For partition aware union, the output partition is the same with the union's inputs
for child in physical_plan.children() {
assert_eq!(
physical_plan.output_partitioning(),
child.output_partitioning()
);
}

Ok(())
}

#[tokio::test]
async fn non_partition_aware_union() -> Result<()> {
let left = test_table().await?.select_columns(&["c1", "c2"])?;
let right = test_table_with_name("c2")
.await?
.select_columns(&["c1", "c2"])?
.with_column_renamed("c2.c1", "c2_c1")?
.with_column_renamed("c2.c2", "c2_c2")?;

let left_rows = left.collect().await?;
let right_rows = right.collect().await?;
let join1 = left.join(
right.clone(),
JoinType::Inner,
&["c1", "c2"],
&["c2_c1", "c2_c2"],
None,
)?;

// join key ordering is different
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let join2 = left.join(
right,
JoinType::Inner,
&["c2", "c1"],
&["c2_c2", "c2_c1"],
None,
)?;

let union = join1.union(join2)?;

let union_rows = union.collect().await?;

assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>());
assert_eq!(916, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());

let physical_plan = union.create_physical_plan().await?;
let default_partition_count =
SessionContext::new().copied_config().target_partitions;

// For non-partition aware union, the output partitioning count should be the combination of all output partitions count
assert!(matches!(
physical_plan.output_partitioning(),
Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count * 2));
Ok(())
}

#[tokio::test]
async fn verify_join_output_partitioning() -> Result<()> {
let left = test_table().await?.select_columns(&["c1", "c2"])?;
let right = test_table_with_name("c2")
.await?
.select_columns(&["c1", "c2"])?
.with_column_renamed("c2.c1", "c2_c1")?
.with_column_renamed("c2.c2", "c2_c2")?;

let all_join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::RightSemi,
JoinType::LeftAnti,
JoinType::RightAnti,
];

let default_partition_count =
SessionContext::new().copied_config().target_partitions;

for join_type in all_join_types {
let join = left.join(
right.clone(),
join_type,
&["c1", "c2"],
&["c2_c1", "c2_c2"],
None,
)?;
let physical_plan = join.create_physical_plan().await?;
let out_partitioning = physical_plan.output_partitioning();
let join_schema = physical_plan.schema();

match join_type {
JoinType::Inner
| JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti => {
let left_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("c1", &join_schema).unwrap()),
Arc::new(Column::new_with_schema("c2", &join_schema).unwrap()),
];
assert_eq!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out_partitioning,
Partitioning::Hash(left_exprs, default_partition_count)
);
}
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
let right_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("c2_c1", &join_schema).unwrap()),
Arc::new(Column::new_with_schema("c2_c2", &join_schema).unwrap()),
];
assert_eq!(
out_partitioning,
Partitioning::Hash(right_exprs, default_partition_count)
);
}
JoinType::Full => {
assert!(matches!(
out_partitioning,
Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count));
}
}
}

Ok(())
}
}
35 changes: 14 additions & 21 deletions datafusion/core/src/physical_optimizer/merge_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,20 @@ impl PhysicalOptimizerRule for AddCoalescePartitionsExec {
.iter()
.map(|child| self.optimize(child.clone(), _config))
.collect::<Result<Vec<_>>>()?;
match plan.required_child_distribution() {
Distribution::UnspecifiedDistribution => {
with_new_children_if_necessary(plan, children)
}
Distribution::HashPartitioned(_) => {
with_new_children_if_necessary(plan, children)
}
Distribution::SinglePartition => with_new_children_if_necessary(
plan,
children
.iter()
.map(|child| {
if child.output_partitioning().partition_count() == 1 {
child.clone()
} else {
Arc::new(CoalescePartitionsExec::new(child.clone()))
}
})
.collect(),
),
}
assert_eq!(children.len(), plan.required_input_distribution().len());
let new_children = children
.into_iter()
.zip(plan.required_input_distribution())
.map(|(child, dist)| match dist {
Distribution::SinglePartition
if child.output_partitioning().partition_count() > 1 =>
{
Arc::new(CoalescePartitionsExec::new(child.clone()))
}
_ => child,
})
.collect::<Vec<_>>();
with_new_children_if_necessary(plan, new_children)
}
}

Expand Down
63 changes: 54 additions & 9 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use datafusion_physical_expr::{
expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr,
};
use std::any::Any;
use std::collections::HashMap;

use std::sync::Arc;

Expand All @@ -45,9 +46,11 @@ mod no_grouping;
mod row_hash;

use crate::physical_plan::aggregates::row_hash::GroupedHashAggregateStreamV2;
use crate::physical_plan::EquivalenceProperties;
pub use datafusion_expr::AggregateFunction;
use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
use datafusion_physical_expr::normalize_out_expr_with_alias_schema;
use datafusion_row::{row_supported, RowType};

/// Hash aggregate modes
Expand Down Expand Up @@ -163,6 +166,9 @@ pub struct AggregateExec {
/// same as input.schema() but for the final aggregate it will be the same as the input
/// to the partial aggregate
input_schema: SchemaRef,
/// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr
/// The key is the column from the input schema and the values are the columns from the output schema
alias_map: HashMap<Column, Vec<Column>>,
/// Execution Metrics
metrics: ExecutionPlanMetricsSet,
}
Expand All @@ -186,13 +192,26 @@ impl AggregateExec {

let schema = Arc::new(schema);

let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what this code is for? It doesn't seem correct to me as I don't understand the circumstances under which the output of be different 🤔

It seems like in this case the input logical plan maybe was incorrect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to deal with the case that there are alias in the group exprs, in this case we can not derive the output partitioning from the input/child directly, need to take the alias into consideration. This is similar to the ProjectionExec.

For example, the input has the output partitioning 'a', the ProjectionExec or AggregateExec might have alias 'a as a1', the output partitioning of the ProjectionExec or AggregateExec should be 'a1'.
ProjectionExec and AggregateExec will never change the real data distribution, but need to respect the alias.

for (expression, name) in group_by.expr.iter() {
if let Some(column) = expression.as_any().downcast_ref::<Column>() {
let new_col_idx = schema.index_of(name)?;
// When the column name is the same, but index does not equal, treat it as Alias
if (column.name() != name) || (column.index() != new_col_idx) {
let entry = alias_map.entry(column.clone()).or_insert_with(Vec::new);
entry.push(Column::new(name, new_col_idx));
}
};
}

Ok(AggregateExec {
mode,
group_by,
aggr_expr,
input,
schema,
input_schema,
alias_map,
metrics: ExecutionPlanMetricsSet::new(),
})
}
Expand Down Expand Up @@ -255,25 +274,51 @@ impl ExecutionPlan for AggregateExec {

/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
self.input.output_partitioning()
match &self.mode {
AggregateMode::Partial => {
// Partial Aggregation will not change the output partitioning but need to respect the Alias
let input_partition = self.input.output_partitioning();
match input_partition {
Partitioning::Hash(exprs, part) => {
let normalized_exprs = exprs
.into_iter()
.map(|expr| {
normalize_out_expr_with_alias_schema(
expr,
&self.alias_map,
&self.schema,
)
})
.collect::<Vec<_>>();
Partitioning::Hash(normalized_exprs, part)
}
_ => input_partition,
}
}
// Final Aggregation's output partitioning is the same as its real input
_ => self.input.output_partitioning(),
}
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn required_child_distribution(&self) -> Distribution {
fn required_input_distribution(&self) -> Vec<Distribution> {
match &self.mode {
AggregateMode::Partial => Distribution::UnspecifiedDistribution,
AggregateMode::FinalPartitioned => Distribution::HashPartitioned(
self.group_by.expr.iter().map(|x| x.0.clone()).collect(),
),
AggregateMode::Final => Distribution::SinglePartition,
AggregateMode::Partial => vec![Distribution::UnspecifiedDistribution],
AggregateMode::FinalPartitioned => {
vec![Distribution::HashPartitioned(self.output_group_expr())]
}
AggregateMode::Final => vec![Distribution::SinglePartition],
}
}

fn relies_on_input_order(&self) -> bool {
false
fn equivalence_properties(&self) -> EquivalenceProperties {
let mut input_equivalence_properties = self.input.equivalence_properties();
input_equivalence_properties.merge_properties_with_alias(&self.alias_map);
input_equivalence_properties.truncate_properties_not_in_schema(&self.schema);
input_equivalence_properties
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
8 changes: 2 additions & 6 deletions datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ impl ExecutionPlan for AnalyzeExec {
}

/// Specifies we want the input as a single stream
fn required_child_distribution(&self) -> Distribution {
Distribution::SinglePartition
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
}

/// Get the output partitioning of this plan
Expand All @@ -85,10 +85,6 @@ impl ExecutionPlan for AnalyzeExec {
None
}

fn relies_on_input_order(&self) -> bool {
false
}

fn with_new_children(
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use std::task::{Context, Poll};

use crate::error::Result;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream,
};

use crate::execution::context::TaskContext;
Expand Down Expand Up @@ -100,8 +100,8 @@ impl ExecutionPlan for CoalesceBatchesExec {
None
}

fn relies_on_input_order(&self) -> bool {
false
fn equivalence_properties(&self) -> EquivalenceProperties {
self.input.equivalence_properties()
}

fn with_new_children(
Expand Down
Loading