From 57fddea3c9b0450ed4b29d347bee8998671dd1dc Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 19 Dec 2024 22:08:44 -0500 Subject: [PATCH] Improve comments more --- datafusion/physical-plan/src/sorts/merge.rs | 28 +++++++++++-------- .../src/sorts/sort_preserving_merge.rs | 12 ++++---- .../src/sorts/streaming_merge.rs | 3 ++ 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 458c1c29c0cf..258e234b35c7 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -99,19 +99,25 @@ pub(crate) struct SortPreservingMergeStream { /// Configuration parameter to enable round-robin selection of tied winners of loser tree. /// - /// To address the issue of unbalanced polling between partitions due to tie-breakers being based - /// on partition index, especially in cases of low cardinality, we are making changes to the winner - /// selection mechanism. Previously, partitions with smaller indices were consistently chosen as the winners, - /// leading to an uneven distribution of polling. This caused upstream operator buffers for the other partitions - /// to grow excessively, as they continued receiving data without consuming it. + /// This option controls the tie-breaker strategy and attempts to avoid the + /// issue of unbalanced polling between partitions /// - /// For example, an upstream operator like a repartition execution would keep sending data to certain partitions, - /// but those partitions wouldn't consume the data if they weren't selected as winners. This resulted in inefficient buffer usage. + /// If `true`, when multiple partitions have the same value, the partition + /// that has the fewest poll counts is selected. This strategy ensures that + /// multiple partitions with the same value are chosen equally, distributing + /// the polling load in a round-robin fashion. This approach balances the + /// workload more effectively across partitions and avoids excessive buffer + /// growth. /// - /// To resolve this, we are modifying the tie-breaking logic. Instead of always choosing the partition with the smallest index, - /// we now select the partition that has the fewest poll counts for the same value. - /// This ensures that multiple partitions with the same value are chosen equally, distributing the polling load in a round-robin fashion. - /// This approach balances the workload more effectively across partitions and avoids excessive buffer growth. + /// if `false`, partitions with smaller indices are consistently chosen as + /// the winners, which can lead to an uneven distribution of polling and potentially + /// causing upstream operator buffers for the other partitions to grow + /// excessively, as they continued receiving data without consuming it. + /// + /// For example, an upstream operator like `RepartitonExec` execution would + /// keep sending data to certain partitions, but those partitions wouldn't + /// consume the data if they weren't selected as winners. This resulted in + /// inefficient buffer usage. enable_round_robin_tie_breaker: bool, /// Flag indicating whether we are in the mode of round-robin diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index a52f39182599..258762235159 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -120,13 +120,13 @@ impl SortPreservingMergeExec { /// Sets the selection strategy of tied winners of the loser tree algorithm /// - /// When true (the default) equal output rows are placed in the merged - /// stream when ready, which is faster but not stable (can vary from - /// run to run). + /// If true (the default) equal output rows are placed in the merged stream + /// in round robin fashion. This approach consumes input streams at more + /// even rates when there are many rows with the same sort key. /// - /// If false, equal output rows are placed in the merged stream in the order - /// of the inputs, resulting in potentially slower execution but in a stable - /// output order. + /// If false, equal output rows are always placed in the merged stream in + /// the order of the inputs, resulting in potentially slower execution but a + /// stable output order. pub fn with_round_robin_repartition( mut self, enable_round_robin_repartition: bool, diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index 2178cc012a10..aa245a0ac2ca 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -19,6 +19,7 @@ //! This is an order-preserving merge. use crate::metrics::BaselineMetrics; +use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::sorts::{ merge::SortPreservingMergeStream, stream::{FieldCursorStream, RowCursorStream}, @@ -120,6 +121,8 @@ impl<'a> StreamingMergeBuilder<'a> { self } + /// See [SortPreservingMergeExec::with_round_robin_repartition] for more + /// information. pub fn with_round_robin_tie_breaker( mut self, enable_round_robin_tie_breaker: bool,