Skip to content

Commit

Permalink
Reorder the physical plan optimizer rules
Browse files Browse the repository at this point in the history
  • Loading branch information
kyotoYaho committed Dec 23, 2022
1 parent 4850383 commit a2bebe6
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 65 deletions.
53 changes: 36 additions & 17 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1579,12 +1579,43 @@ impl SessionState {
.register_catalog(config.default_catalog.clone(), default_catalog);
}

let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
Arc::new(GlobalSortSelection::new()),
Arc::new(JoinSelection::new()),
];
// We need to take care of the rule ordering. They may influence each other.
let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> =
vec![Arc::new(AggregateStatistics::new())];
// - In order to increase the parallelism, it will change the output partitioning
// of some operators in the plan tree, which will influence other rules.
// Therefore, it should be run as soon as possible.
// - The reason to make it optional is
// - it's not used for the distributed engine, Ballista.
// - it's conflicted with some parts of the BasicEnforcement, since it will
// introduce additional repartitioning while the BasicEnforcement aims at
// reducing unnecessary repartitioning.
if config
.config_options
.read()
.get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION)
.unwrap_or_default()
{
physical_optimizers.push(Arc::new(Repartition::new()));
}
//- Currently it will depend on the partition number to decide whether to change the
// single node sort to parallel local sort and merge. Therefore, it should be run
// after the Repartition.
// - Since it will change the output ordering of some operators, it should be run
// before JoinSelection and BasicEnforcement, which may depend on that.
physical_optimizers.push(Arc::new(GlobalSortSelection::new()));
// Statistics-base join selection will change the Auto mode to real join implementation,
// like collect left, or hash join, or future sort merge join, which will
// influence the BasicEnforcement to decide whether to add additional repartition
// and local sort to meet the distribution and ordering requirements.
// Therefore, it should be run before BasicEnforcement
physical_optimizers.push(Arc::new(JoinSelection::new()));
// It's for adding essential repartition and local sorting operator to satisfy the
// required distribution and local sort.
// Please make sure that the whole plan tree is determined.
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
// It will not influence the distribution and ordering of the whole plan tree.
// Therefore, to avoid influencing other rules, it should be run at last.
if config
.config_options
.read()
Expand All @@ -1601,18 +1632,6 @@ impl SessionState {
.unwrap(),
)));
}
// It's for increasing the parallelism by introducing round robin repartition
if config
.config_options
.read()
.get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION)
.unwrap_or_default()
{
physical_optimizers.push(Arc::new(Repartition::new()));
}
// Repartition rule could introduce additional RepartitionExec with RoundRobin partitioning.
// To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here.
physical_optimizers.push(Arc::new(BasicEnforcement::new()));

SessionState {
session_id,
Expand Down
13 changes: 11 additions & 2 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
repartition::RepartitionExec, rewrite::TreeNodeRewritable,
repartition::RepartitionExec, rewrite::TreeNodeRewritable, Partitioning,
},
};
use std::sync::Arc;
Expand Down Expand Up @@ -57,7 +57,16 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// See https://github.com/apache/arrow-datafusion/issues/139
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
|| plan_any.downcast_ref::<RepartitionExec>().is_some();
// Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
|| plan_any
.downcast_ref::<RepartitionExec>()
.map(|repart_exec| {
!matches!(
repart_exec.partitioning().clone(),
Partitioning::RoundRobinBatch(_)
)
})
.unwrap_or(false);
if wrap_in_coalesce {
Ok(Some(Arc::new(CoalesceBatchesExec::new(
plan.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,14 @@ impl PhysicalOptimizerRule for GlobalSortSelection {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &SessionConfig,
_config: &SessionConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(&|plan| {
Ok(plan
.as_any()
.downcast_ref::<SortExec>()
.and_then(|sort_exec| {
// Temporary use the config target partition number to pass through the unit test.
// Later it will be changed to use the input output partition number.
if config.target_partitions() > 1
if sort_exec.input().output_partitioning().partition_count() > 1
&& sort_exec.fetch().is_some()
// It's already preserving the partitioning so that it can be regarded as a local sort
&& !sort_exec.preserve_partitioning()
Expand Down
49 changes: 20 additions & 29 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl Repartition {
/// is an intervening node that does not `maintain_input_order`
///
/// if `can_reorder` is false, means that the output of this node
/// can not be reordered as as something upstream is relying on that order
/// can not be reordered as as the final output is relying on that order
///
/// If 'would_benefit` is false, the upstream operator doesn't
/// benefit from additional repartition
Expand All @@ -161,34 +161,14 @@ fn optimize_partitions(
// leaf node - don't replace children
plan
} else {
let can_reorder_children =
match (plan.relies_on_input_order(), plan.maintains_input_order()) {
(true, _) => {
// `plan` itself relies on the order of its
// children, so don't reorder them!
false
}
(false, false) => {
// `plan` may reorder the input itself, so no need
// to preserve the order of any children
true
}
(false, true) => {
// `plan` will maintain the order, so we can only
// repartition children if it is ok to reorder the
// output of this node
can_reorder
}
};

let children = plan
.children()
.iter()
.map(|child| {
optimize_partitions(
target_partitions,
child.clone(),
can_reorder_children,
can_reorder || child.output_ordering().is_none(),
plan.benefits_from_input_partitioning(),
)
})
Expand All @@ -197,7 +177,7 @@ fn optimize_partitions(
};

// decide if we should bother trying to repartition the output of this plan
let could_repartition = match new_plan.output_partitioning() {
let mut could_repartition = match new_plan.output_partitioning() {
// Apply when underlying node has less than `self.target_partitions` amount of concurrency
RoundRobinBatch(x) => x < target_partitions,
UnknownPartitioning(x) => x < target_partitions,
Expand All @@ -206,6 +186,13 @@ fn optimize_partitions(
Hash(_, _) => false,
};

// Don't need to apply when the returned row count is not greater than 1
let stats = new_plan.statistics();
if stats.is_exact {
could_repartition = could_repartition
&& stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true);
}

if would_benefit && could_repartition && can_reorder {
Ok(Arc::new(RepartitionExec::try_new(
new_plan,
Expand All @@ -226,7 +213,12 @@ impl PhysicalOptimizerRule for Repartition {
if config.target_partitions() == 1 {
Ok(plan)
} else {
optimize_partitions(config.target_partitions(), plan, false, false)
optimize_partitions(
config.target_partitions(),
plan.clone(),
plan.output_ordering().is_none(),
false,
)
}
}

Expand Down Expand Up @@ -546,12 +538,12 @@ mod tests {
}

#[test]
fn repartition_ignores_sort_preserving_merge() -> Result<()> {
fn repartition_with_preserving_merge() -> Result<()> {
let plan = sort_preserving_merge_exec(parquet_exec());

let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
// Expect no repartition of SortPreservingMergeExec
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

Expand All @@ -560,14 +552,13 @@ mod tests {
}

#[test]
fn repartition_does_not_repartition_transitively() -> Result<()> {
fn repartition_transitively() -> Result<()> {
let plan = sort_preserving_merge_exec(projection_exec(parquet_exec()));

let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
// Expect no repartition of SortPreservingMergeExec
// even though there is a projection exec between it
"ProjectionExec: expr=[c1@0 as c1]",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ async fn cross_join() {

assert_eq!(4 * 4, actual.len());

let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2";
let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2 ORDER BY t1_id";

let actual = execute(&ctx, sql).await;
assert_eq!(4 * 4, actual.len());
Expand Down Expand Up @@ -2201,8 +2201,8 @@ async fn right_semi_join() -> Result<()> {
"SortExec: [t1_id@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }",
" MemoryExec: partitions=1, partition_sizes=[1]",
" MemoryExec: partitions=1, partition_sizes=[1]",
Expand Down Expand Up @@ -2539,8 +2539,8 @@ async fn left_side_expr_key_inner_join() -> Result<()> {
vec![
"ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(11 AS UInt32) as t1.t1_id + Int64(11)]",
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/sql/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async fn union_schemas() -> Result<()> {
SessionContext::with_config(SessionConfig::new().with_information_schema(true));

let result = ctx
.sql("SELECT 1 A UNION ALL SELECT 2")
.sql("SELECT 1 A UNION ALL SELECT 2 order by 1")
.await
.unwrap()
.collect()
Expand All @@ -105,7 +105,7 @@ async fn union_schemas() -> Result<()> {
assert_batches_eq!(expected, &result);

let result = ctx
.sql("SELECT 1 UNION SELECT 2")
.sql("SELECT 1 UNION SELECT 2 order by 1")
.await
.unwrap()
.collect()
Expand Down
14 changes: 8 additions & 6 deletions datafusion/core/tests/sql/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1649,9 +1649,10 @@ async fn test_window_agg_sort() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" RepartitionExec: partitioning=RoundRobinBatch(16)",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
]
};

Expand Down Expand Up @@ -1681,10 +1682,11 @@ async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c2@3 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@0 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MIN(aggregate_test_100.c9)]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
" WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]"
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
" WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]"
]
};

Expand Down

0 comments on commit a2bebe6

Please sign in to comment.