Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LexRequirement as a struct, instead of a type #12583

Merged
merged 4 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use datafusion_physical_expr::{

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::{future, stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use object_store::ObjectStore;
Expand Down Expand Up @@ -987,12 +988,12 @@ impl TableProvider for ListingTable {
))?
.clone();
// Converts Vec<Vec<SortExpr>> into type required by execution plan to specify its required input ordering
Some(
Some(LexRequirement::new(
ordering
.into_iter()
.map(PhysicalSortRequirement::from)
.collect::<Vec<_>>(),
)
))
} else {
None
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ fn ensure_distribution(
// Make sure to satisfy ordering requirement:
child = add_sort_above_with_check(
child,
required_input_ordering.to_vec(),
required_input_ordering.clone(),
None,
);
}
Expand Down
13 changes: 7 additions & 6 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use datafusion_physical_expr::{
use datafusion_physical_plan::streaming::StreamingTableExec;
use datafusion_physical_plan::union::UnionExec;

use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use itertools::Itertools;

Expand Down Expand Up @@ -334,10 +335,10 @@ fn try_swapping_with_output_req(
return Ok(None);
}

let mut updated_sort_reqs = vec![];
let mut updated_sort_reqs = LexRequirement::new(vec![]);
// None or empty_vec can be treated in the same way.
if let Some(reqs) = &output_req.required_input_ordering()[0] {
for req in reqs {
for req in &reqs.inner {
let Some(new_expr) = update_expr(&req.expr, projection.expr(), false)? else {
return Ok(None);
};
Expand Down Expand Up @@ -1995,7 +1996,7 @@ mod tests {
let csv = create_simple_csv_exec();
let sort_req: Arc<dyn ExecutionPlan> = Arc::new(OutputRequirementExec::new(
csv.clone(),
Some(vec![
Some(LexRequirement::new(vec![
PhysicalSortRequirement {
expr: Arc::new(Column::new("b", 1)),
options: Some(SortOptions::default()),
Expand All @@ -2008,7 +2009,7 @@ mod tests {
)),
options: Some(SortOptions::default()),
},
]),
])),
Distribution::HashPartitioned(vec![
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("b", 1)),
Expand Down Expand Up @@ -2041,7 +2042,7 @@ mod tests {
];

assert_eq!(get_plan_string(&after_optimize), expected);
let expected_reqs = vec![
let expected_reqs = LexRequirement::new(vec![
PhysicalSortRequirement {
expr: Arc::new(Column::new("b", 2)),
options: Some(SortOptions::default()),
Expand All @@ -2054,7 +2055,7 @@ mod tests {
)),
options: Some(SortOptions::default()),
},
];
]);
assert_eq!(
after_optimize
.as_any()
Expand Down
23 changes: 15 additions & 8 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ fn pushdown_requirement_to_children(
let child_plan = plan.children().swap_remove(0);
match determine_children_requirement(parent_required, request_child, child_plan) {
RequirementsCompatibility::Satisfy => {
let req = (!request_child.is_empty()).then(|| request_child.to_vec());
let req = (!request_child.is_empty())
.then(|| LexRequirement::new(request_child.to_vec()));
Ok(Some(vec![req]))
}
RequirementsCompatibility::Compatible(adjusted) => Ok(Some(vec![adjusted])),
Expand All @@ -189,7 +190,9 @@ fn pushdown_requirement_to_children(
.requirements_compatible(parent_required, &sort_req)
{
debug_assert!(!parent_required.is_empty());
Ok(Some(vec![Some(parent_required.to_vec())]))
Ok(Some(vec![Some(LexRequirement::new(
parent_required.to_vec(),
))]))
} else {
Ok(None)
}
Expand All @@ -211,15 +214,17 @@ fn pushdown_requirement_to_children(
.eq_properties
.requirements_compatible(parent_required, &output_req)
{
let req = (!parent_required.is_empty()).then(|| parent_required.to_vec());
let req = (!parent_required.is_empty())
.then(|| LexRequirement::new(parent_required.to_vec()));
Ok(Some(vec![req]))
} else {
Ok(None)
}
} else if is_union(plan) {
// UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
// propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
let req = (!parent_required.is_empty()).then(|| parent_required.to_vec());
let req = (!parent_required.is_empty())
.then(|| LexRequirement::new(parent_required.to_vec()));
Ok(Some(vec![req; plan.children().len()]))
} else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
// If the current plan is SortMergeJoinExec
Expand Down Expand Up @@ -277,7 +282,8 @@ fn pushdown_requirement_to_children(
} else {
// Can push-down through SortPreservingMergeExec, because parent requirement is finer
// than SortPreservingMergeExec output ordering.
let req = (!parent_required.is_empty()).then(|| parent_required.to_vec());
let req = (!parent_required.is_empty())
.then(|| LexRequirement::new(parent_required.to_vec()));
Ok(Some(vec![req]))
}
} else {
Expand Down Expand Up @@ -331,7 +337,8 @@ fn determine_children_requirement(
{
// Parent requirements are more specific, adjust child's requirements
// and push down the new requirements:
let adjusted = (!parent_required.is_empty()).then(|| parent_required.to_vec());
let adjusted = (!parent_required.is_empty())
.then(|| LexRequirement::new(parent_required.to_vec()));
RequirementsCompatibility::Compatible(adjusted)
} else {
RequirementsCompatibility::NonCompatible
Expand Down Expand Up @@ -471,7 +478,7 @@ fn shift_right_required(
})
.collect::<Vec<_>>();
if new_right_required.len() == parent_required.len() {
Ok(new_right_required)
Ok(LexRequirement::new(new_right_required))
} else {
plan_err!(
"Expect to shift all the parent required column indexes for SortMergeJoin"
Expand Down Expand Up @@ -574,7 +581,7 @@ fn handle_custom_pushdown(
.iter()
.map(|&maintains_order| {
if maintains_order {
Some(updated_parent_req.clone())
Some(LexRequirement::new(updated_parent_req.clone()))
} else {
None
}
Expand Down
63 changes: 56 additions & 7 deletions datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::ops::Deref;
use std::sync::Arc;

use crate::physical_expr::PhysicalExpr;
Expand Down Expand Up @@ -222,11 +223,13 @@ impl PhysicalSortRequirement {
pub fn from_sort_exprs<'a>(
ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
) -> LexRequirement {
ordering
.into_iter()
.cloned()
.map(PhysicalSortRequirement::from)
.collect()
LexRequirement::new(
ordering
.into_iter()
.cloned()
.map(PhysicalSortRequirement::from)
.collect(),
)
}

/// Converts an iterator of [`PhysicalSortRequirement`] into a Vec
Expand Down Expand Up @@ -264,9 +267,55 @@ pub type LexOrdering = Vec<PhysicalSortExpr>;
/// a reference to a lexicographical ordering.
pub type LexOrderingRef<'a> = &'a [PhysicalSortExpr];
Copy link
Contributor

@berkaysynnada berkaysynnada Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could follow a similar pattern for LexOrderingRef, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a separate issue for this, #12591!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW having a real struct will make printing these structures much easier -- e.g. I had to do some akward workarounds to implement Display or LexOrdering in #12590


///`LexRequirement` is an alias for the type `Vec<PhysicalSortRequirement>`, which
///`LexRequirement` is an struct containing a `Vec<PhysicalSortRequirement>`, which
/// represents a lexicographical ordering requirement.
pub type LexRequirement = Vec<PhysicalSortRequirement>;
#[derive(Debug, Default, Clone, PartialEq)]
pub struct LexRequirement {
pub inner: Vec<PhysicalSortRequirement>,
}

impl LexRequirement {
pub fn new(inner: Vec<PhysicalSortRequirement>) -> Self {
Self { inner }
}

pub fn iter(&self) -> impl Iterator<Item = &PhysicalSortRequirement> {
self.inner.iter()
}

pub fn push(&mut self, physical_sort_requirement: PhysicalSortRequirement) {
self.inner.push(physical_sort_requirement)
}
}

impl Deref for LexRequirement {
type Target = [PhysicalSortRequirement];

fn deref(&self) -> &Self::Target {
self.inner.as_slice()
}
}

impl FromIterator<PhysicalSortRequirement> for LexRequirement {
fn from_iter<T: IntoIterator<Item = PhysicalSortRequirement>>(iter: T) -> Self {
let mut lex_requirement = LexRequirement::new(vec![]);

for i in iter {
lex_requirement.inner.push(i);
}

lex_requirement
}
}

impl IntoIterator for LexRequirement {
type Item = PhysicalSortRequirement;
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter()
}
}

///`LexRequirementRef` is an alias for the type &`[PhysicalSortRequirement]`, which
/// represents a reference to a lexicographical ordering requirement.
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ impl EquivalenceGroup {
// Normalize the requirements:
let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
// Convert sort requirements back to sort expressions:
PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs)
PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs.inner)
}

/// This function applies the `normalize_sort_requirement` function for all
Expand All @@ -428,12 +428,12 @@ impl EquivalenceGroup {
&self,
sort_reqs: LexRequirementRef,
) -> LexRequirement {
collapse_lex_req(
collapse_lex_req(LexRequirement::new(
sort_reqs
.iter()
.map(|sort_req| self.normalize_sort_requirement(sort_req.clone()))
.collect(),
)
))
}

/// Projects `expr` according to the given projection mapping.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/equivalence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement {
output.push(item);
}
}
output
LexRequirement::new(output)
}

/// Adds the `offset` value to `Column` indices inside `expr`. This function is
Expand Down
5 changes: 3 additions & 2 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,9 @@ impl EquivalenceProperties {
) -> Option<LexRequirement> {
let mut lhs = self.normalize_sort_requirements(req1);
let mut rhs = self.normalize_sort_requirements(req2);
lhs.iter_mut()
.zip(rhs.iter_mut())
lhs.inner
.iter_mut()
.zip(rhs.inner.iter_mut())
.all(|(lhs, rhs)| {
lhs.expr.eq(&rhs.expr)
&& match (lhs.options, rhs.options) {
Expand Down
18 changes: 10 additions & 8 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,21 +370,23 @@ impl AggregateExec {
// prefix requirements with this section. In this case, aggregation will
// work more efficiently.
let indices = get_ordered_partition_by_indices(&groupby_exprs, &input);
let mut new_requirement = indices
.iter()
.map(|&idx| PhysicalSortRequirement {
expr: Arc::clone(&groupby_exprs[idx]),
options: None,
})
.collect::<Vec<_>>();
let mut new_requirement = LexRequirement::new(
indices
.iter()
.map(|&idx| PhysicalSortRequirement {
expr: Arc::clone(&groupby_exprs[idx]),
options: None,
})
.collect::<Vec<_>>(),
);

let req = get_finer_aggregate_exprs_requirement(
&mut aggr_expr,
&group_by,
input_eq_properties,
&mode,
)?;
new_requirement.extend(req);
new_requirement.inner.extend(req);
new_requirement = collapse_lex_req(new_requirement);

// If our aggregation has grouping sets then our base grouping exprs will
Expand Down
8 changes: 6 additions & 2 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,9 @@ impl SortExec {
) -> PlanProperties {
// Determine execution mode:
let sort_satisfied = input.equivalence_properties().ordering_satisfy_requirement(
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()).as_slice(),
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter())
.inner
.as_slice(),
);
let mode = match input.execution_mode() {
ExecutionMode::Unbounded if sort_satisfied => ExecutionMode::Unbounded,
Expand Down Expand Up @@ -895,7 +897,9 @@ impl ExecutionPlan for SortExec {
.input
.equivalence_properties()
.ordering_satisfy_requirement(
PhysicalSortRequirement::from_sort_exprs(self.expr.iter()).as_slice(),
PhysicalSortRequirement::from_sort_exprs(self.expr.iter())
.inner
.as_slice(),
);

match (sort_satisfied, self.fetch.as_ref()) {
Expand Down
Loading