Skip to content

Commit

Permalink
Minor: Add ConstExpr::from, use in places
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 5, 2024
1 parent 9355f4a commit ad8d1eb
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ mod sp_repartition_fuzz_tests {
// Define a and f are aliases
eq_properties.add_equal_conditions(col_a, col_f)?;
// Column e has constant value.
eq_properties = eq_properties.add_constants([ConstExpr::new(col_e.clone())]);
eq_properties = eq_properties.add_constants([ConstExpr::from(col_e)]);

// Randomly order columns for sorting
let mut rng = StdRng::seed_from_u64(seed);
Expand Down
28 changes: 28 additions & 0 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,28 @@ use datafusion_common::JoinType;
/// - `across_partitions`: A boolean flag indicating whether the constant expression is
/// valid across partitions. If set to `true`, the constant expression has same value for all partitions.
/// If set to `false`, the constant expression may have different values for different partitions.
///
/// # Example
///
/// ```rust
/// # use datafusion_physical_expr::ConstExpr;
/// # use datafusion_physical_expr_common::expressions::lit;
/// let col = lit(5);
/// // Create a constant expression from a physical expression ref
/// let const_expr = ConstExpr::from(&col);
/// // create a constant expression from a physical expression
/// let const_expr = ConstExpr::from(col);
/// ```
pub struct ConstExpr {
expr: Arc<dyn PhysicalExpr>,
across_partitions: bool,
}

impl ConstExpr {
/// Create a new constant expression from a physical expression.
///
/// Note you can also use `ConstExpr::from` to create a constant expression
/// from a reference as well
pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
Self {
expr,
Expand Down Expand Up @@ -85,6 +101,18 @@ impl ConstExpr {
}
}

impl From<Arc<dyn PhysicalExpr>> for ConstExpr {
fn from(expr: Arc<dyn PhysicalExpr>) -> Self {
Self::new(expr)
}
}

impl From<&Arc<dyn PhysicalExpr>> for ConstExpr {
fn from(expr: &Arc<dyn PhysicalExpr>) -> Self {
Self::new(Arc::clone(expr))
}
}

/// Checks whether `expr` is among in the `const_exprs`.
pub fn const_exprs_contains(
const_exprs: &[ConstExpr],
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/equivalence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ mod tests {
// Define a and f are aliases
eq_properties.add_equal_conditions(col_a, col_f)?;
// Column e has constant value.
eq_properties = eq_properties.add_constants([ConstExpr::new(Arc::clone(col_e))]);
eq_properties = eq_properties.add_constants([ConstExpr::from(col_e)]);

// Randomly order columns for sorting
let mut rng = StdRng::seed_from_u64(seed);
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,9 @@ mod tests {
let eq_group = EquivalenceGroup::new(eq_group);
eq_properties.add_equivalence_group(eq_group);

let constants = constants.into_iter().map(|expr| {
ConstExpr::new(Arc::clone(expr)).with_across_partitions(true)
});
let constants = constants
.into_iter()
.map(|expr| ConstExpr::from(expr).with_across_partitions(true));
eq_properties = eq_properties.add_constants(constants);

let reqs = convert_to_sort_exprs(&reqs);
Expand Down
22 changes: 10 additions & 12 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,13 @@ impl EquivalenceProperties {
// Left expression is constant, add right as constant
if !const_exprs_contains(&self.constants, right) {
self.constants
.push(ConstExpr::new(Arc::clone(right)).with_across_partitions(true));
.push(ConstExpr::from(right).with_across_partitions(true));
}
} else if self.is_expr_constant(right) {
// Right expression is constant, add left as constant
if !const_exprs_contains(&self.constants, left) {
self.constants
.push(ConstExpr::new(Arc::clone(left)).with_across_partitions(true));
.push(ConstExpr::from(left).with_across_partitions(true));
}
}

Expand Down Expand Up @@ -300,7 +300,7 @@ impl EquivalenceProperties {
{
if !const_exprs_contains(&self.constants, &expr) {
let const_expr =
ConstExpr::new(expr).with_across_partitions(across_partitions);
ConstExpr::from(expr).with_across_partitions(across_partitions);
self.constants.push(const_expr);
}
}
Expand Down Expand Up @@ -404,7 +404,7 @@ impl EquivalenceProperties {
// we add column `a` as constant to the algorithm state. This enables us
// to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
eq_properties = eq_properties
.add_constants(std::iter::once(ConstExpr::new(normalized_req.expr)));
.add_constants(std::iter::once(ConstExpr::from(normalized_req.expr)));
}
true
}
Expand Down Expand Up @@ -832,9 +832,8 @@ impl EquivalenceProperties {
&& !const_exprs_contains(&projected_constants, target)
{
// Expression evaluates to single value
projected_constants.push(
ConstExpr::new(Arc::clone(target)).with_across_partitions(true),
);
projected_constants
.push(ConstExpr::from(target).with_across_partitions(true));
}
}
projected_constants
Expand Down Expand Up @@ -927,8 +926,8 @@ impl EquivalenceProperties {
// Note that these expressions are not properly "constants". This is just
// an implementation strategy confined to this function.
for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
eq_properties = eq_properties
.add_constants(std::iter::once(ConstExpr::new(Arc::clone(expr))));
eq_properties =
eq_properties.add_constants(std::iter::once(ConstExpr::from(expr)));
search_indices.shift_remove(idx);
}
// Add new ordered section to the state.
Expand Down Expand Up @@ -2147,8 +2146,7 @@ mod tests {
let col_h = &col("h", &test_schema)?;

// Add column h as constant
eq_properties =
eq_properties.add_constants(vec![ConstExpr::new(Arc::clone(col_h))]);
eq_properties = eq_properties.add_constants(vec![ConstExpr::from(col_h)]);

let test_cases = vec![
// TEST CASE 1
Expand Down Expand Up @@ -2458,7 +2456,7 @@ mod tests {
for case in cases {
let mut properties = base_properties
.clone()
.add_constants(case.constants.into_iter().map(ConstExpr::new));
.add_constants(case.constants.into_iter().map(ConstExpr::from));
for [left, right] in &case.equal_conditions {
properties.add_equal_conditions(left, right)?
}
Expand Down
6 changes: 2 additions & 4 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,11 @@ impl FilterExec {
// Filter evaluates to single value for all partitions
if input_eqs.is_expr_constant(binary.left()) {
res_constants.push(
ConstExpr::new(binary.right().clone())
.with_across_partitions(true),
ConstExpr::from(binary.right()).with_across_partitions(true),
)
} else if input_eqs.is_expr_constant(binary.right()) {
res_constants.push(
ConstExpr::new(binary.left().clone())
.with_across_partitions(true),
ConstExpr::from(binary.left()).with_across_partitions(true),
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ fn calculate_union_eq_properties(
// TODO: Check whether constant expressions evaluates the same value or not for each partition
let across_partitions = false;
return Some(
ConstExpr::new(meet_constant.owned_expr())
ConstExpr::from(meet_constant.owned_expr())
.with_across_partitions(across_partitions),
);
}
Expand Down
4 changes: 1 addition & 3 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,9 +598,7 @@ pub fn get_window_mode(
options: None,
}));
// Treat partition by exprs as constant. During analysis of requirements are satisfied.
let const_exprs = partitionby_exprs
.iter()
.map(|expr| ConstExpr::new(expr.clone()));
let const_exprs = partitionby_exprs.iter().map(ConstExpr::from);
let partition_by_eqs = input_eqs.add_constants(const_exprs);
let order_by_reqs = PhysicalSortRequirement::from_sort_exprs(orderby_keys);
let reverse_order_by_reqs =
Expand Down

0 comments on commit ad8d1eb

Please sign in to comment.