Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Part2] Partition and Sort Enforcement, ExecutionPlan enhancement #4043

Merged
merged 10 commits into from
Nov 6, 2022

Conversation

mingmwang
Copy link
Contributor

@mingmwang mingmwang commented Oct 31, 2022

Which issue does this PR close?

Partially Closes #3854.
Closes #3653
Closes #3400
Closes #189,

You can see the entire work in #3855

Rationale for this change

What changes are included in this PR?

  1. Add methods required_input_ordering() to ExecutionPlan trait to specify the ordering requirements
  2. Fix output_partitioning(), output_ordering(), required_input_distribution() in couple of trait implementations
  3. Add method equivalence_properties() to ExecutionPlan trait to discover the equivalence properties in the Physical plan tree
  4. Support partition aware UnionExec

Are there any user-facing changes?

@mingmwang
Copy link
Contributor Author

@alamb @andygrove @Dandandan @isidentical @yahoNanJing
Please help to take a look

@github-actions github-actions bot added core Core DataFusion crate physical-expr Physical Expressions labels Oct 31, 2022
@alamb
Copy link
Contributor

alamb commented Oct 31, 2022

Thanks @mingmwang -- I will review this carefully tomorrow

@alamb
Copy link
Contributor

alamb commented Nov 1, 2022

I am sorry -- I ran out of time today -- will try and find time tomorrow

@alamb
Copy link
Contributor

alamb commented Nov 1, 2022

@liukun4515 and @Ted-Jiang perhaps you have some time to help review this as well

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking very impressive @mingmwang -- thank you very much

My biggest question is how are the changes to distribution tested? I see code that verifies partitioning (or rather not partitioning) with UnionExec but there are changes made to all the other physical operators.

For example what about tests for WindowAggregate and outer joins and sort merge join?

I saw tests for some of the functions for operating on EquivalenceProperties 👍 but not all of them.

I left some style questions about encapsulating EquivalenceProperties that might also help

So TLDR is I think the changes to the physical operators need more tests.

Maybe you could break out the equivalence class code into a separate PR?

datafusion/core/src/dataframe.rs Show resolved Hide resolved
None,
)?;

// join key ordering is different
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

// TODO check the output ordering of CrossJoin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still a todo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm not sure whether our CrossJoin implementation can keep the ordering of right side or not.

///
/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
///
pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called split_conjunction in the logical optimizer -- perhaps it could be called the same thing in the physical layer. The logical expr implementation also avoids creating quite as many Vecs

https://github.com/apache/arrow-datafusion/blob/345234550712173477e7807ba2cf67dd2ffb9ed5/datafusion/optimizer/src/utils.rs#L58-L78

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Comment on lines 106 to 116
if contains_first && !contains_second {
prop.insert(new_condition.1.clone());
idx1 = idx as i32;
} else if !contains_first && contains_second {
prop.insert(new_condition.0.clone());
idx2 = idx as i32;
} else if contains_first && contains_second {
idx1 = idx as i32;
idx2 = idx as i32;
break;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also use a match statement here and let the compiler heck that all important cases are covered:

Suggested change
if contains_first && !contains_second {
prop.insert(new_condition.1.clone());
idx1 = idx as i32;
} else if !contains_first && contains_second {
prop.insert(new_condition.0.clone());
idx2 = idx as i32;
} else if contains_first && contains_second {
idx1 = idx as i32;
idx2 = idx as i32;
break;
}
match (contains_first, contains_second) {
(true, false) => {
prop.insert(new_condition.1.clone());
idx1 = idx as i32;
}
(false, true)=> {
prop.insert(new_condition.0.clone());
idx2 = idx as i32;
}
(true, true) => {
idx1 = idx as i32;
idx2 = idx as i32;
break;
}
(false, false) => {}
}

@@ -96,12 +96,15 @@ impl ExecutionPlan for CoalesceBatchesExec {
self.input.output_partitioning()
}

// Depends on how the CoalesceBatches was implemented, it is possible to keep
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also SortPreservingMerge that can be used to preserve order but there are tradeoffs there (specifically it takes more effort to keep the sort order than it does to append batches together)

@@ -231,6 +246,38 @@ impl RecordBatchStream for FilterExecStream {
}
}

/// Return the equals Column-Pairs and Non-equals Column-Pairs
fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> EqualAndNonEqual {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this would be better in utils.rs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is only used by FilterExec, I would prefer to keep this as a private func in filter.rs

Distribution::SinglePartition
fn required_input_distribution(&self) -> Vec<Distribution> {
if self.partition_keys.is_empty() {
warn!("No partition defined for WindowAggExec!!!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why this would generate a warning -- can't this occur with a query like SELECT ROW_NUMBER OVER () from foo (as in an empty over clause)?

Copy link
Contributor Author

@mingmwang mingmwang Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a valid case, but the SQL might run very slowly without any Partition By clause due to collapsed to the Distribution::SinglePartition. I can remove the warning if we think the warning is useless. There is one optimization we can do here in future after we add
the Range Partitioning (I can work on this maybe next month). When there is not Partition By clause but only Order By, and depends on the window funcs, for some cases we can make the required_input_distribution to be SortDistribution, so that the WindowAggExec can still run in parallel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend removing the warning because it isn't clear to me what a user / administrator of the system would do in this case and so the warning will end up as spam in the logs I think.

Perhaps we can just change it to debug!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

} else {
Distribution::UnspecifiedDistribution
//TODO support PartitionCollections if there is no common partition columns in the window_expr
vec![Distribution::HashPartitioned(self.partition_keys.clone())]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I agree this sounds good

eq_properties: &[EquivalenceProperties],
) -> Arc<dyn PhysicalExpr> {
let mut normalized = expr.clone();
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
Copy link
Contributor

@alamb alamb Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to recursively rewrite exprs?

Like what if expr was A + B and you had an equivalence class with B = C

Wouldn't you have to rewrite A + B into A + C? But I don't see this code recursing.

This kind of rewrite could be tested as well I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, rewriting recursively is more safe. Currently the equal join conditions are just Columns,
and for AggregateExec, the output_group_expr are also Columns. For WindowAggExec, does DataFusion support Partition by complex exprs ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does DataFusion support Partition by complex exprs ?

Yes I think so:

DataFusion CLI v13.0.0
❯ create table foo as values (1,2), (3,4), (3,2), (2,1), (null, 0);

❯ select first_value(column1) over (partition by (column2%2) order by column2) from foo;
+--------------------------+
| FIRST_VALUE(foo.column1) |
+--------------------------+
| 2                        |
|                          |
|                          |
|                          |
|                          |
+--------------------------+

}

/// Combine the new equal condition with the existing equivalence properties.
pub fn combine_equivalence_properties(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good interface design. It can be leveraged by both the Join and Filter

}
}

pub fn remove_equivalence_properties(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the eq_properties contain the none equal columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove the related logic.

let matches = eq_properties.get_mut(match_idx as usize).unwrap();
matches.remove(remove_condition.0);
matches.remove(remove_condition.1);
if matches.len() <= 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic may be not correct. For example, original two equivalence properties, left side (l1,l2), right side (r1,r2), then after combine_equivalence_properties, it becomes one equivalence properties, (l1,l2,r1,r2). Then we comes to remove_equivalence_properties with remove condition (l1,r1).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it is confusing. I will remove the remove_equivalence_properties related logic.

/// The output ordering
output_ordering: Option<Vec<PhysicalSortExpr>>,
/// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr
alias_map: HashMap<Column, Vec<Column>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add comments to indicate what does the key & value stand for.

For my understanding, the key is the column in the input schema of this Projection operator. While the values are the columns in this output schema of this Projection operator.

}

pub fn merge_equivalence_properties_with_alias(
eq_properties: &mut Vec<EquivalenceProperties>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The eq_properties is the EquivalenceProperties of some input for the current operator.

Here, the goal of this function to construct a new EquivalenceProperties for the current operator

for (_idx, prop) in eq_properties.iter_mut().enumerate() {
if prop.contains(column) {
for col in columns {
prop.insert(col.clone());
Copy link
Contributor

@yahoNanJing yahoNanJing Nov 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it can be corrected by truncate_equivalence_properties_not_in_schema, I still think it's better to construct a new one directly rather than do the merge based on the input EquivalenceProperties

Distribution::UnspecifiedDistribution
/// Specifies the data distribution requirements for all the
/// children for this operator, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
fn required_input_distribution(&self) -> Vec<Distribution> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can just use
vec![Distribution::UnspecifiedDistribution; self.children().len()]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb
How do you think, for leaf nodes, should we return an empty vec![] here or return
vec![Distribution::UnspecifiedDistribution] ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think @yahoNanJing 's suggestion of

vec![Distribution::UnspecifiedDistribution; self.children().len()]

would make the intent clearer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will change it in the following PR.

)
})
.unzip();
vec![
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently it only supports exactly matched case. Is it possible to support partial matching case?

Copy link
Contributor Author

@mingmwang mingmwang Nov 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible, but this PR will not include it . Originally I have plan to implement such optimizations In Phase 2
with a more dynamic Enforcement rules, but it has the risk to introduce skewed joins and currently we do not have good way to handle skewed joins.

JoinType::RightSemi | JoinType::RightAnti => {
self.right.output_partitioning()
}
JoinType::Left
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these cases exist when the partition mode is CollectLeft?

@mingmwang
Copy link
Contributor Author

@alamb @yahoNanJing
Please help to take look again.

@mingmwang
Copy link
Contributor Author

retest please

}

/// Equivalent Class is a set of Columns that are known to have the same value in all tuples in a relation
/// Equivalence Class is generated by equality predicates, typically equijoin conditions and equality conditions in filters.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this abstraction an the comments.

/// equality predicates in Join or Filter
pub fn add_equal_conditions(&mut self, new_conditions: (&Column, &Column)) {
let mut idx1: Option<usize> = None;
let mut idx2: Option<usize> = None;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An option is much more correct now

@@ -472,7 +508,10 @@ pub enum Distribution {
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to add partition number and schema to the HashPartitioned in the future.

Copy link
Contributor

@yahoNanJing yahoNanJing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mingmwang
Copy link
Contributor Author

Should we make the Equivalence Properties schema aware ?

/// Equivalence Properties is a vec of EquivalentClass.
#[derive(Debug, Default, Clone)]
pub struct EquivalenceProperties {
    classes: Vec<EquivalentClass>,
    schema: SchemaRef,
}

@yahoNanJing
Copy link
Contributor

Should we make the Equivalence Properties schema aware ?

It would be great to add this schema constraint. Then we can avoid the ambiguous in
https://github.com/apache/arrow-datafusion/blob/e945c37d25cb173d03929084bcd8aac31f71580e/datafusion/core/src/physical_plan/projection.rs#L202-L209

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mingmwang -- I agree this PR is ready to merge as is.

It would be great to file tickets to track your follow on work (like the TODO about cross joins, etc)

Thanks for getting this great stuff into DataFusion

Arc::new(Column::new_with_schema("c1", &join_schema).unwrap()),
Arc::new(Column::new_with_schema("c2", &join_schema).unwrap()),
];
assert_eq!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -194,6 +258,73 @@ impl ExecutionPlan for UnionExec {
}
}

/// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel there was already a piece of code that does this -- maybe @tustvold can remind me 🤔

@yahoNanJing
Copy link
Contributor

Hi @alamb, should we merge this PR first so that @mingmwang will be able to continue the part 3 of this unnecessary shuffling optimization?

@alamb
Copy link
Contributor

alamb commented Nov 6, 2022

Hi @alamb, should we merge this PR first so that @mingmwang will be able to continue the part 3 of this unnecessary shuffling optimization?

Yes absolutely!

@alamb alamb merged commit b7a3331 into apache:master Nov 6, 2022
@alamb
Copy link
Contributor

alamb commented Nov 6, 2022

Sorry for the delay

@ursabot
Copy link

ursabot commented Nov 6, 2022

Benchmark runs are scheduled for baseline = 238e179 and contender = b7a3331. b7a3331 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
4 participants