-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reorder the physical plan optimizer rules, extract GlobalSortSelection
, make Repartition
optional
#4714
Conversation
… to be a separate rule, GlobalSortSelection
Hi @alamb, @Dandandan, @mingmwang, could you help review this PR? |
a2bebe6
to
5885de7
Compare
5885de7
to
b95ba86
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @yahoNanJing -- the basic idea in this PR looks great to me. 👍
I am concerned about two things:
- Adding repartition before sort preserving merge -- I think we can handle this by updating the test to use all the optimizer rules
- Potential conflicts (logical) with Unnecessary SortExec removal rule from Physical Plan #4691 from @mustafasrepo as they both potentially change Sorts in the physical plan.
The only thing I think is required prior to merge, however, is item 1.
@@ -133,6 +133,10 @@ pub const OPT_PREFER_HASH_JOIN: &str = "datafusion.optimizer.prefer_hash_join"; | |||
pub const OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD: &str = | |||
"datafusion.optimizer.hash_join_single_partition_threshold"; | |||
|
|||
/// Configuration option "datafusion.execution.round_robin_repartition" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -57,7 +57,16 @@ impl PhysicalOptimizerRule for CoalesceBatches { | |||
// See https://github.com/apache/arrow-datafusion/issues/139 | |||
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some() | |||
|| plan_any.downcast_ref::<HashJoinExec>().is_some() | |||
|| plan_any.downcast_ref::<RepartitionExec>().is_some(); | |||
// Don't need to add CoalesceBatchesExec after a round robin RepartitionExec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 makes sense to me -- to confirm the rationale for this change is that RoundRobinRepartition simply shuffles batches around to different partitions, but doesn't actually change (grow or shrink) the actual RecordBatches
/// - and there's some limit which can be pushed down to each of its input partitions | ||
/// then [SortPreservingMergeExec] with local sort with a limit pushed down will be preferred; | ||
/// Otherwise, the normal global sort [SortExec] will be used. | ||
/// Later more intelligent statistics-based decision can also be introduced. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See also #4691
@@ -161,34 +161,14 @@ fn optimize_partitions( | |||
// leaf node - don't replace children | |||
plan | |||
} else { | |||
let can_reorder_children = | |||
match (plan.relies_on_input_order(), plan.maintains_input_order()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is the last use of maintains_input_order
-- so after this PR is merged, I will try and create a PR that removes this from the trait -- it is redundant with the plan.output_order
I think
@@ -2201,8 +2201,8 @@ async fn right_semi_join() -> Result<()> { | |||
"SortExec: [t1_id@0 ASC NULLS LAST]", | |||
" CoalescePartitionsExec", | |||
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]", | |||
" CoalesceBatchesExec: target_batch_size=4096", | |||
" RepartitionExec: partitioning=RoundRobinBatch(2)", | |||
" RepartitionExec: partitioning=RoundRobinBatch(2)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these changes seem ok to me (no better or worse)
let ctx = SessionContext::new(); | ||
// We need to specify the target partition number. | ||
// Otherwise, the default value used may vary on different environment | ||
// with different cpu core number, which may cause the UT failure. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -1649,9 +1652,10 @@ async fn test_window_agg_sort() -> Result<()> { | |||
let expected = { | |||
vec![ | |||
"ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]", | |||
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", | |||
" RepartitionExec: partitioning=RoundRobinBatch(2)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These Repartition's don't seem particularly useful to me (they repartition right before a projection) but I also don't think they hurt either
GlobalSortSelection
, make Repartition
optional
The CI check https://github.com/apache/arrow-datafusion/actions/runs/3764447841/jobs/6399000363 appears to be failing:
|
@alamb, I don't think there is any logical conflict with #4691. I remember @mustafasrepo and I discussing this PR from that angle before and nothing came up. So unless something changed in the meantime, everybody should be OK. |
Thanks @alamb, @Dandandan and @ozankabak. The UT and conflicts are refined. Could you help have a check again? |
@@ -120,6 +120,7 @@ datafusion.execution.target_partitions 7 | |||
datafusion.execution.time_zone +00:00 | |||
datafusion.explain.logical_plan_only false | |||
datafusion.explain.physical_plan_only false | |||
datafusion.optimizer.enable_round_robin_repartition true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Benchmark runs are scheduled for baseline = 981a9bb and contender = 899c86a. 899c86a is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #4678.
Rationale for this change
According to #4678, this change is done by the following steps:
BasicEnforcement
to be a separate rule,GlobalSortSelection
.Repartition
optional.The reason for this ordering is as follows:
Repartition
, in order to increase the parallelism, it will change the output partitioning of some operators in the plan tree, which will influence other rules. Therefore, it should be run as soon as possible. The reason to make it optional is it's not used for the distributed engine, Ballista. And it's conflicted with some parts of theBasicEnforcement
, since it will introduce additional repartitioning while theBasicEnforcement
aims at reducing unnecessary repartitioning.GlobalSortSelection
, since currently it will depend on the partition number to decide whether change the single node sort to parallel local sort and merge, it should be run after theRepartition
. Since it will change the output ordering of some operators, it should be run beforeJoinSelection
andBasicEnforcement
, which may depend on that.JoinSelection
, based on statistics, it will change the Auto mode to real join implementation, like collect left, or hash join, or future sort merge join, which will influence theBasicEnforcement
to decide whether to add additional repartition and local sort to meet the distribution and ordering requirements. Therefore, it should be run beforeBasicEnforcement
.BasicEnforcement
, before run this rule, please make sure that the whole plan tree is determined.CoalesceBatches
, it will not influence the distribution and ordering of the whole plan tree. Therefore, to avoid influencing other rules, it should be run at last.What changes are included in this PR?
BasicEnforcement
to be a separate rule,GlobalSortSelection
.CoalesceBatches
after theRepartition
.EmptyExec
is aligned with the field number of its schema.Repartition
only considers the ordering influence for the final output and will not consider the intermediate data ordering especially the local ordering which should be dealt with byBasicEnforcement
.Are these changes tested?
Are there any user-facing changes?