Skip to content

Commit

Permalink
Preserve all of the valid orderings during merging. (#8169)
Browse files Browse the repository at this point in the history
* Preserve all of the valid orderings during merging.

* Update datafusion/physical-expr/src/equivalence.rs

Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Address reviews

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
mustafasrepo and ozankabak authored Nov 15, 2023
1 parent abb2ae7 commit 6ecb6cd
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 23 deletions.
276 changes: 263 additions & 13 deletions datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,272 @@

#[cfg(test)]
mod sp_repartition_fuzz_tests {
use arrow::compute::concat_batches;
use arrow_array::{ArrayRef, Int64Array, RecordBatch};
use arrow_schema::SortOptions;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::{collect, ExecutionPlan, Partitioning};
use datafusion::prelude::SessionContext;
use datafusion_execution::config::SessionConfig;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::sync::Arc;

use arrow::compute::{concat_batches, lexsort, SortColumn};
use arrow_array::{ArrayRef, Int64Array, RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions};

use datafusion::physical_plan::{
collect,
memory::MemoryExec,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet},
repartition::RepartitionExec,
sorts::sort_preserving_merge::SortPreservingMergeExec,
sorts::streaming_merge::streaming_merge,
stream::RecordBatchStreamAdapter,
ExecutionPlan, Partitioning,
};
use datafusion::prelude::SessionContext;
use datafusion_common::Result;
use datafusion_execution::{
config::SessionConfig, memory_pool::MemoryConsumer, SendableRecordBatchStream,
};
use datafusion_physical_expr::{
expressions::{col, Column},
EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
};
use test_utils::add_empty_batches;

use itertools::izip;
use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};

// Generate a schema which consists of 6 columns (a, b, c, d, e, f)
fn create_test_schema() -> Result<SchemaRef> {
let a = Field::new("a", DataType::Int32, true);
let b = Field::new("b", DataType::Int32, true);
let c = Field::new("c", DataType::Int32, true);
let d = Field::new("d", DataType::Int32, true);
let e = Field::new("e", DataType::Int32, true);
let f = Field::new("f", DataType::Int32, true);
let schema = Arc::new(Schema::new(vec![a, b, c, d, e, f]));

Ok(schema)
}

/// Construct a schema with random ordering
/// among column a, b, c, d
/// where
/// Column [a=f] (e.g they are aliases).
/// Column e is constant.
fn create_random_schema(seed: u64) -> Result<(SchemaRef, EquivalenceProperties)> {
let test_schema = create_test_schema()?;
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
let col_exprs = [col_a, col_b, col_c, col_d, col_e, col_f];

let mut eq_properties = EquivalenceProperties::new(test_schema.clone());
// Define a and f are aliases
eq_properties.add_equal_conditions(col_a, col_f);
// Column e has constant value.
eq_properties = eq_properties.add_constants([col_e.clone()]);

// Randomly order columns for sorting
let mut rng = StdRng::seed_from_u64(seed);
let mut remaining_exprs = col_exprs[0..4].to_vec(); // only a, b, c, d are sorted

let options_asc = SortOptions {
descending: false,
nulls_first: false,
};

while !remaining_exprs.is_empty() {
let n_sort_expr = rng.gen_range(0..remaining_exprs.len() + 1);
remaining_exprs.shuffle(&mut rng);

let ordering = remaining_exprs
.drain(0..n_sort_expr)
.map(|expr| PhysicalSortExpr {
expr: expr.clone(),
options: options_asc,
})
.collect();

eq_properties.add_new_orderings([ordering]);
}

Ok((test_schema, eq_properties))
}

// If we already generated a random result for one of the
// expressions in the equivalence classes. For other expressions in the same
// equivalence class use same result. This util gets already calculated result, when available.
fn get_representative_arr(
eq_group: &[Arc<dyn PhysicalExpr>],
existing_vec: &[Option<ArrayRef>],
schema: SchemaRef,
) -> Option<ArrayRef> {
for expr in eq_group.iter() {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
if let Some(res) = &existing_vec[idx] {
return Some(res.clone());
}
}
None
}

// Generate a table that satisfies the given equivalence properties; i.e.
// equivalences, ordering equivalences, and constants.
fn generate_table_for_eq_properties(
eq_properties: &EquivalenceProperties,
n_elem: usize,
n_distinct: usize,
) -> Result<RecordBatch> {
let mut rng = StdRng::seed_from_u64(23);

let schema = eq_properties.schema();
let mut schema_vec = vec![None; schema.fields.len()];

// Utility closure to generate random array
let mut generate_random_array = |num_elems: usize, max_val: usize| -> ArrayRef {
let values: Vec<u64> = (0..num_elems)
.map(|_| rng.gen_range(0..max_val) as u64)
.collect();
Arc::new(UInt64Array::from_iter_values(values))
};

// Fill constant columns
for constant in eq_properties.constants() {
let col = constant.as_any().downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
let arr =
Arc::new(UInt64Array::from_iter_values(vec![0; n_elem])) as ArrayRef;
schema_vec[idx] = Some(arr);
}

// Fill columns based on ordering equivalences
for ordering in eq_properties.oeq_class().iter() {
let (sort_columns, indices): (Vec<_>, Vec<_>) = ordering
.iter()
.map(|PhysicalSortExpr { expr, options }| {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
let arr = generate_random_array(n_elem, n_distinct);
(
SortColumn {
values: arr,
options: Some(*options),
},
idx,
)
})
.unzip();

let sort_arrs = arrow::compute::lexsort(&sort_columns, None)?;
for (idx, arr) in izip!(indices, sort_arrs) {
schema_vec[idx] = Some(arr);
}
}

// Fill columns based on equivalence groups
for eq_group in eq_properties.eq_group().iter() {
let representative_array =
get_representative_arr(eq_group, &schema_vec, schema.clone())
.unwrap_or_else(|| generate_random_array(n_elem, n_distinct));

for expr in eq_group {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
schema_vec[idx] = Some(representative_array.clone());
}
}

let res: Vec<_> = schema_vec
.into_iter()
.zip(schema.fields.iter())
.map(|(elem, field)| {
(
field.name(),
// Generate random values for columns that do not occur in any of the groups (equivalence, ordering equivalence, constants)
elem.unwrap_or_else(|| generate_random_array(n_elem, n_distinct)),
)
})
.collect();

Ok(RecordBatch::try_from_iter(res)?)
}

// This test checks for whether during sort preserving merge we can preserve all of the valid orderings
// successfully. If at the input we have orderings [a ASC, b ASC], [c ASC, d ASC]
// After sort preserving merge orderings [a ASC, b ASC], [c ASC, d ASC] should still be valid.
#[tokio::test]
async fn stream_merge_multi_order_preserve() -> Result<()> {
const N_PARTITION: usize = 8;
const N_ELEM: usize = 25;
const N_DISTINCT: usize = 5;
const N_DIFF_SCHEMA: usize = 20;

use datafusion::physical_plan::common::collect;
for seed in 0..N_DIFF_SCHEMA {
// Create a schema with random equivalence properties
let (_test_schema, eq_properties) = create_random_schema(seed as u64)?;
let table_data_with_properties =
generate_table_for_eq_properties(&eq_properties, N_ELEM, N_DISTINCT)?;
let schema = table_data_with_properties.schema();
let streams: Vec<SendableRecordBatchStream> = (0..N_PARTITION)
.map(|_idx| {
let batch = table_data_with_properties.clone();
Box::pin(RecordBatchStreamAdapter::new(
schema.clone(),
futures::stream::once(async { Ok(batch) }),
)) as SendableRecordBatchStream
})
.collect::<Vec<_>>();

// Returns concatenated version of the all available orderings
let exprs = eq_properties
.oeq_class()
.output_ordering()
.unwrap_or_default();

let context = SessionContext::new().task_ctx();
let mem_reservation =
MemoryConsumer::new("test".to_string()).register(context.memory_pool());

// Internally SortPreservingMergeExec uses this function for merging.
let res = streaming_merge(
streams,
schema,
&exprs,
BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
1,
None,
mem_reservation,
)?;
let res = collect(res).await?;
// Contains the merged result.
let res = concat_batches(&res[0].schema(), &res)?;

for ordering in eq_properties.oeq_class().iter() {
let err_msg = format!("error in eq properties: {:?}", eq_properties);
let sort_solumns = ordering
.iter()
.map(|sort_expr| sort_expr.evaluate_to_sort_column(&res))
.collect::<Result<Vec<_>>>()?;
let orig_columns = sort_solumns
.iter()
.map(|sort_column| sort_column.values.clone())
.collect::<Vec<_>>();
let sorted_columns = lexsort(&sort_solumns, None)?;

// Make sure after merging ordering is still valid.
assert_eq!(orig_columns.len(), sorted_columns.len(), "{}", err_msg);
assert!(
izip!(orig_columns.into_iter(), sorted_columns.into_iter())
.all(|(lhs, rhs)| { lhs == rhs }),
"{}",
err_msg
)
}
}
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn sort_preserving_repartition_test() {
let seed_start = 0;
Expand Down
31 changes: 26 additions & 5 deletions datafusion/physical-expr/src/equivalence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl EquivalenceGroup {
}

/// Returns an iterator over the equivalence classes in this group.
fn iter(&self) -> impl Iterator<Item = &EquivalenceClass> {
pub fn iter(&self) -> impl Iterator<Item = &EquivalenceClass> {
self.classes.iter()
}

Expand Down Expand Up @@ -551,7 +551,7 @@ impl EquivalenceGroup {

/// This function constructs a duplicate-free `LexOrderingReq` by filtering out
/// duplicate entries that have same physical expression inside. For example,
/// `vec![a Some(Asc), a Some(Desc)]` collapses to `vec![a Some(Asc)]`.
/// `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a Some(ASC)]`.
pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement {
let mut output = Vec::<PhysicalSortRequirement>::new();
for item in input {
Expand All @@ -562,6 +562,19 @@ pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement {
output
}

/// This function constructs a duplicate-free `LexOrdering` by filtering out
/// duplicate entries that have same physical expression inside. For example,
/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`.
pub fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering {
let mut output = Vec::<PhysicalSortExpr>::new();
for item in input {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item);
}
}
output
}

/// An `OrderingEquivalenceClass` object keeps track of different alternative
/// orderings than can describe a schema. For example, consider the following table:
///
Expand Down Expand Up @@ -667,10 +680,13 @@ impl OrderingEquivalenceClass {
}
}

/// Gets the first ordering entry in this ordering equivalence class.
/// This is one of the many valid orderings (if there are multiple).
/// Returns the concatenation of all the orderings. This enables merge
/// operations to preserve all equivalent orderings simultaneously.
pub fn output_ordering(&self) -> Option<LexOrdering> {
self.orderings.first().cloned()
let output_ordering =
self.orderings.iter().flatten().cloned().collect::<Vec<_>>();
let output_ordering = collapse_lex_ordering(output_ordering);
(!output_ordering.is_empty()).then_some(output_ordering)
}

// Append orderings in `other` to all existing orderings in this equivalence
Expand Down Expand Up @@ -825,6 +841,11 @@ impl EquivalenceProperties {
&self.eq_group
}

/// Returns a reference to the constant expressions
pub fn constants(&self) -> &[Arc<dyn PhysicalExpr>] {
&self.constants
}

/// Returns the normalized version of the ordering equivalence class within.
/// Normalization removes constants and duplicates as well as standardizing
/// expressions according to the equivalence group within.
Expand Down
3 changes: 0 additions & 3 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,6 @@ impl ExecutionPlan for RepartitionExec {
if !self.maintains_input_order()[0] {
result.clear_orderings();
}
if self.preserve_order {
result = result.with_reorder(self.sort_exprs().unwrap_or_default().to_vec())
}
result
}

Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
}

fn equivalence_properties(&self) -> EquivalenceProperties {
let output_oeq = self.input.equivalence_properties();
output_oeq.with_reorder(self.expr.to_vec())
self.input.equivalence_properties()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
Loading

0 comments on commit 6ecb6cd

Please sign in to comment.