Skip to content

Commit

Permalink
Improve comments more
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 20, 2024
1 parent 16e315a commit 57fddea
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
28 changes: 17 additions & 11 deletions datafusion/physical-plan/src/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,25 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues> {

/// Configuration parameter to enable round-robin selection of tied winners of loser tree.
///
/// To address the issue of unbalanced polling between partitions due to tie-breakers being based
/// on partition index, especially in cases of low cardinality, we are making changes to the winner
/// selection mechanism. Previously, partitions with smaller indices were consistently chosen as the winners,
/// leading to an uneven distribution of polling. This caused upstream operator buffers for the other partitions
/// to grow excessively, as they continued receiving data without consuming it.
/// This option controls the tie-breaker strategy and attempts to avoid the
/// issue of unbalanced polling between partitions
///
/// For example, an upstream operator like a repartition execution would keep sending data to certain partitions,
/// but those partitions wouldn't consume the data if they weren't selected as winners. This resulted in inefficient buffer usage.
/// If `true`, when multiple partitions have the same value, the partition
/// that has the fewest poll counts is selected. This strategy ensures that
/// multiple partitions with the same value are chosen equally, distributing
/// the polling load in a round-robin fashion. This approach balances the
/// workload more effectively across partitions and avoids excessive buffer
/// growth.
///
/// To resolve this, we are modifying the tie-breaking logic. Instead of always choosing the partition with the smallest index,
/// we now select the partition that has the fewest poll counts for the same value.
/// This ensures that multiple partitions with the same value are chosen equally, distributing the polling load in a round-robin fashion.
/// This approach balances the workload more effectively across partitions and avoids excessive buffer growth.
/// if `false`, partitions with smaller indices are consistently chosen as
/// the winners, which can lead to an uneven distribution of polling and potentially
/// causing upstream operator buffers for the other partitions to grow
/// excessively, as they continued receiving data without consuming it.
///
/// For example, an upstream operator like `RepartitonExec` execution would
/// keep sending data to certain partitions, but those partitions wouldn't
/// consume the data if they weren't selected as winners. This resulted in
/// inefficient buffer usage.
enable_round_robin_tie_breaker: bool,

/// Flag indicating whether we are in the mode of round-robin
Expand Down
12 changes: 6 additions & 6 deletions datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ impl SortPreservingMergeExec {

/// Sets the selection strategy of tied winners of the loser tree algorithm
///
/// When true (the default) equal output rows are placed in the merged
/// stream when ready, which is faster but not stable (can vary from
/// run to run).
/// If true (the default) equal output rows are placed in the merged stream
/// in round robin fashion. This approach consumes input streams at more
/// even rates when there are many rows with the same sort key.
///
/// If false, equal output rows are placed in the merged stream in the order
/// of the inputs, resulting in potentially slower execution but in a stable
/// output order.
/// If false, equal output rows are always placed in the merged stream in
/// the order of the inputs, resulting in potentially slower execution but a
/// stable output order.
pub fn with_round_robin_repartition(
mut self,
enable_round_robin_repartition: bool,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-plan/src/sorts/streaming_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! This is an order-preserving merge.
use crate::metrics::BaselineMetrics;
use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::sorts::{
merge::SortPreservingMergeStream,
stream::{FieldCursorStream, RowCursorStream},
Expand Down Expand Up @@ -120,6 +121,8 @@ impl<'a> StreamingMergeBuilder<'a> {
self
}

/// See [SortPreservingMergeExec::with_round_robin_repartition] for more
/// information.
pub fn with_round_robin_tie_breaker(
mut self,
enable_round_robin_tie_breaker: bool,
Expand Down

0 comments on commit 57fddea

Please sign in to comment.