Skip to content

Commit

Permalink
Minor: Improve comments in EnforceDistribution tests (apache#8474)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored and appletreeisyellow committed Dec 15, 2023
1 parent 37734a1 commit ed11d64
Showing 1 changed file with 22 additions and 12 deletions.
34 changes: 22 additions & 12 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {
/// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check whether the requirements can be satisfied by adjusting join keys ordering:
/// Requirements can not be satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
/// Requirements is already satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
/// Requirements can be satisfied by adjusting keys ordering, clear the current requiements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
/// Requirements can be satisfied by adjusting keys ordering, clear the current requirements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
///
/// 2) If the current plan is Aggregation, check whether the requirements can be satisfied by adjusting group by keys ordering:
/// Requirements can not be satisfied, clear all the requirements, return the unchanged plan.
Expand Down Expand Up @@ -928,7 +928,7 @@ fn add_roundrobin_on_top(
// If any of the following conditions is true
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
// - Usage of order preserving variants is not desirable
// (determined by flag `config.optimizer.bounded_order_preserving_variants`)
// (determined by flag `config.optimizer.prefer_existing_sort`)
let partitioning = Partitioning::RoundRobinBatch(n_target);
let repartition =
RepartitionExec::try_new(input, partitioning)?.with_preserve_order();
Expand Down Expand Up @@ -996,7 +996,7 @@ fn add_hash_on_top(
// - Preserving ordering is not helpful in terms of satisfying ordering
// requirements.
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.bounded_order_preserving_variants`).
// `config.optimizer.prefer_existing_sort`).
let mut new_plan = if repartition_beneficial_stats {
// Since hashing benefits from partitioning, add a round-robin repartition
// before it:
Expand Down Expand Up @@ -1045,7 +1045,7 @@ fn add_spm_on_top(
// If any of the following conditions is true
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
// - Usage of order preserving variants is not desirable
// (determined by flag `config.optimizer.bounded_order_preserving_variants`)
// (determined by flag `config.optimizer.prefer_existing_sort`)
let should_preserve_ordering = input.output_ordering().is_some();
let new_plan: Arc<dyn ExecutionPlan> = if should_preserve_ordering {
let existing_ordering = input.output_ordering().unwrap_or(&[]);
Expand Down Expand Up @@ -2026,15 +2026,15 @@ pub(crate) mod tests {
fn ensure_distribution_helper(
plan: Arc<dyn ExecutionPlan>,
target_partitions: usize,
bounded_order_preserving_variants: bool,
prefer_existing_sort: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
let distribution_context = DistributionContext::new(plan);
let mut config = ConfigOptions::new();
config.execution.target_partitions = target_partitions;
config.optimizer.enable_round_robin_repartition = false;
config.optimizer.repartition_file_scans = false;
config.optimizer.repartition_file_min_size = 1024;
config.optimizer.prefer_existing_sort = bounded_order_preserving_variants;
config.optimizer.prefer_existing_sort = prefer_existing_sort;
ensure_distribution(distribution_context, &config).map(|item| item.into().plan)
}

Expand All @@ -2056,23 +2056,33 @@ pub(crate) mod tests {
}

/// Runs the repartition optimizer and asserts the plan against the expected
/// Arguments
/// * `EXPECTED_LINES` - Expected output plan
/// * `PLAN` - Input plan
/// * `FIRST_ENFORCE_DIST` -
/// true: (EnforceDistribution, EnforceDistribution, EnforceSorting)
/// false: else runs (EnforceSorting, EnforceDistribution, EnforceDistribution)
/// * `PREFER_EXISTING_SORT` (optional) - if true, will not repartition / resort data if it is already sorted
/// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to
/// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file scans
/// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024);
};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $BOUNDED_ORDER_PRESERVING_VARIANTS: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $BOUNDED_ORDER_PRESERVING_VARIANTS, 10, false, 1024);
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024);
};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $BOUNDED_ORDER_PRESERVING_VARIANTS: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();

let mut config = ConfigOptions::new();
config.execution.target_partitions = $TARGET_PARTITIONS;
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;
config.optimizer.prefer_existing_sort = $BOUNDED_ORDER_PRESERVING_VARIANTS;
config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT;

// NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade
// because they were written prior to the separation of `BasicEnforcement` into
Expand Down Expand Up @@ -3294,7 +3304,7 @@ pub(crate) mod tests {
];
assert_optimized!(expected, exec, true);
// In this case preserving ordering through order preserving operators is not desirable
// (according to flag: bounded_order_preserving_variants)
// (according to flag: PREFER_EXISTING_SORT)
// hence in this case ordering lost during CoalescePartitionsExec and re-introduced with
// SortExec at the top.
let expected = &[
Expand Down Expand Up @@ -4341,7 +4351,7 @@ pub(crate) mod tests {
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];

// last flag sets config.optimizer.bounded_order_preserving_variants
// last flag sets config.optimizer.PREFER_EXISTING_SORT
assert_optimized!(expected, physical_plan.clone(), true, true);
assert_optimized!(expected, physical_plan, false, true);

Expand Down

0 comments on commit ed11d64

Please sign in to comment.