Skip to content

Commit

Permalink
Cleanup the usage of round-robin repartitioning (#8794)
Browse files Browse the repository at this point in the history
* Clean up

* Add sqllogictest

* fix

* Fix

* Enable enable_round_robin_repartition for enforce_distribution tests

* Fix

* Try to stable test

* Update datafusion/sqllogictest/test_files/repartition.slt

Co-authored-by: Andrew Lamb <[email protected]>

* Update datafusion/sqllogictest/test_files/repartition.slt

Co-authored-by: Andrew Lamb <[email protected]>

* Fix scratch space

* Fix test

* Update datafusion/core/src/physical_optimizer/enforce_distribution.rs

Co-authored-by: Andrew Lamb <[email protected]>

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
viirya and alamb authored Jan 11, 2024
1 parent 36877f3 commit 1f13607
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 28 deletions.
48 changes: 20 additions & 28 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,6 @@ fn add_hash_on_top(
mut input: DistributionContext,
hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
n_target: usize,
repartition_beneficial_stats: bool,
) -> Result<DistributionContext> {
// Early return if hash repartition is unnecessary
if n_target == 1 {
Expand All @@ -950,12 +949,6 @@ fn add_hash_on_top(
// requirements.
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.prefer_existing_sort`).
if repartition_beneficial_stats {
// Since hashing benefits from partitioning, add a round-robin repartition
// before it:
input = add_roundrobin_on_top(input, n_target)?;
}

let partitioning = Partitioning::Hash(hash_exprs, n_target);
let repartition = RepartitionExec::try_new(input.plan.clone(), partitioning)?
.with_preserve_order();
Expand Down Expand Up @@ -1208,6 +1201,12 @@ fn ensure_distribution(
true
};

let add_roundrobin = enable_round_robin
// Operator benefits from partitioning (e.g. filter):
&& (would_benefit && repartition_beneficial_stats)
// Unless partitioning increases the partition count, it is not beneficial:
&& child.plan.output_partitioning().partition_count() < target_partitions;

// When `repartition_file_scans` is set, attempt to increase
// parallelism at the source.
if repartition_file_scans && repartition_beneficial_stats {
Expand All @@ -1218,33 +1217,26 @@ fn ensure_distribution(
}
}

if enable_round_robin
// Operator benefits from partitioning (e.g. filter):
&& (would_benefit && repartition_beneficial_stats)
// Unless partitioning doesn't increase the partition count, it is not beneficial:
&& child.plan.output_partitioning().partition_count() < target_partitions
{
// Increase parallelism by adding round-robin repartitioning
// on top of the operator. Note that we only do this if the
// partition count is not already equal to the desired partition
// count.
child = add_roundrobin_on_top(child, target_partitions)?;
}

// Satisfy the distribution requirement if it is unmet.
match requirement {
Distribution::SinglePartition => {
child = add_spm_on_top(child);
}
Distribution::HashPartitioned(exprs) => {
child = add_hash_on_top(
child,
exprs.to_vec(),
target_partitions,
repartition_beneficial_stats,
)?;
if add_roundrobin {
// Add round-robin repartitioning on top of the operator
// to increase parallelism.
child = add_roundrobin_on_top(child, target_partitions)?;
}
child = add_hash_on_top(child, exprs.to_vec(), target_partitions)?;
}
Distribution::UnspecifiedDistribution => {
if add_roundrobin {
// Add round-robin repartitioning on top of the operator
// to increase parallelism.
child = add_roundrobin_on_top(child, target_partitions)?;
}
}
Distribution::UnspecifiedDistribution => {}
};

// There is an ordering requirement of the operator:
Expand Down Expand Up @@ -1908,7 +1900,7 @@ pub(crate) mod tests {
let distribution_context = DistributionContext::new(plan);
let mut config = ConfigOptions::new();
config.execution.target_partitions = target_partitions;
config.optimizer.enable_round_robin_repartition = false;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.repartition_file_scans = false;
config.optimizer.repartition_file_min_size = 1024;
config.optimizer.prefer_existing_sort = prefer_existing_sort;
Expand Down
73 changes: 73 additions & 0 deletions datafusion/sqllogictest/test_files/repartition.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

##########
# Tests for repartitioning
##########

# Set 4 partitions for deterministic output plans
statement ok
set datafusion.execution.target_partitions = 4;

statement ok
COPY (VALUES (1, 2), (2, 5), (3, 2), (4, 5), (5, 0)) TO 'test_files/scratch/repartition/parquet_table/2.parquet'
(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);

statement ok
CREATE EXTERNAL TABLE parquet_table(column1 int, column2 int)
STORED AS PARQUET
LOCATION 'test_files/scratch/repartition/parquet_table/';

# enable round robin repartitioning
statement ok
set datafusion.optimizer.enable_round_robin_repartition = true;

query TT
EXPLAIN SELECT column1, SUM(column2) FROM parquet_table GROUP BY column1;
----
logical_plan
Aggregate: groupBy=[[parquet_table.column1]], aggr=[[SUM(CAST(parquet_table.column2 AS Int64))]]
--TableScan: parquet_table projection=[column1, column2]
physical_plan
AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], aggr=[SUM(parquet_table.column2)]
--CoalesceBatchesExec: target_batch_size=8192
----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=4
------AggregateExec: mode=Partial, gby=[column1@0 as column1], aggr=[SUM(parquet_table.column2)]
--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
----------ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition/parquet_table/2.parquet]]}, projection=[column1, column2]

# disable round robin repartitioning
statement ok
set datafusion.optimizer.enable_round_robin_repartition = false;

query TT
EXPLAIN SELECT column1, SUM(column2) FROM parquet_table GROUP BY column1;
----
logical_plan
Aggregate: groupBy=[[parquet_table.column1]], aggr=[[SUM(CAST(parquet_table.column2 AS Int64))]]
--TableScan: parquet_table projection=[column1, column2]
physical_plan
AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], aggr=[SUM(parquet_table.column2)]
--CoalesceBatchesExec: target_batch_size=8192
----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=1
------AggregateExec: mode=Partial, gby=[column1@0 as column1], aggr=[SUM(parquet_table.column2)]
--------ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition/parquet_table/2.parquet]]}, projection=[column1, column2]


# Cleanup
statement ok
DROP TABLE parquet_table;

0 comments on commit 1f13607

Please sign in to comment.