Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Support Substrait's compound names also for window functions #11163

Merged
merged 6 commits into from
Jul 1, 2024

Conversation

Blizzara
Copy link
Contributor

Which issue does this PR close?

#10653 fixed the name mismatch (substrait's sum:int32 vs DF's sum) for scalar functions, and #10842 fixed it for aggregates. This fixes it for windows as well.

Closes #.

Rationale for this change

What changes are included in this PR?

Substrait's standard defines function names to be compound signatures including the arg types the function takes. DataFusion doesn't yet produce names that'd correspond to it, but we should still be able to consume those names. That's already been fixed for scalar and aggregate functions, this fixes it for window functions as well.

Also, I took the liberty to refactor/harmonize/clean up the function handling across scalar/aggregate/window. There should be no functional changes, but hopefully the code is a bit easier to read now.

Are these changes tested?

Tested with existing unit tests

Are there any user-facing changes?

Fixes a typo in one public function, though I don't expect anyone to be using that outside of consumer.rs. from_substriat_func_args -> from_substrait_func_args

@@ -128,28 +122,6 @@ pub fn name_to_op(name: &str) -> Result<Operator> {
}
}

fn scalar_function_type_from_str(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This felt like unnecessary indirection to go from name -> ScalarFunctionType -> actual function, we can just skip the ScalarFunctionType step

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is cleaner -- I think the old code is left over from the time when we had BuiltInScalarFunction

@@ -972,7 +944,7 @@ pub async fn from_substrait_rex_vec(
}

/// Convert Substrait FunctionArguments to DataFusion Exprs
pub async fn from_substriat_func_args(
pub async fn from_substrait_func_args(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix'd the spelling - this would break consumer if someone is using this function, but I dunno why anyone would be. Still it's not necessary change so I'm happy to revert if that'd be better

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a nice improvement

args.push(arg_expr?.as_ref().clone());
}
let args =
from_substrait_func_args(ctx, &f.arguments, input_schema, extensions).await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code of from_substrait_func_args was duplicated inline here and for scalar functions

vec![Expr::Literal(ScalarValue::Int64(Some(1)))]
} else {
args
};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this way args doesn't need to be mutable

})?;

// Convert function arguments from Substrait to DataFusion
async fn decode_arguments(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was same as from_substrait_func_args

"Aggregated function not found: function reference = {:?}",
let Some(fn_name) = extensions.get(&f.function_reference) else {
return plan_err!(
"Scalar function not found: function reference = {:?}",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just aligning this check for all three function types

}
);
};
let fn_name = substrait_fun_name(fn_name);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and below is coming from scalar_function_type_from_str

Ok(Arc::new(Expr::BinaryExpr(BinaryExpr {
left: Box::new(args[0].to_owned()),
op,
right: Box::new(args[1].to_owned()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a bit of a change where before we used to check the types of these args more explicitly, and now it only happens in from_substrait_func_args. I think that should be fine

};
let fn_name = substrait_fun_name(fn_name);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was the line I originally wanted to add to fix the issue

@@ -1275,8 +1198,8 @@ pub async fn from_substrait_rex(
WindowFrameUnits::Range
};
Ok(Arc::new(Expr::WindowFunction(expr::WindowFunction {
fun: fun?.unwrap(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this made it hard to debug since we'd throw at the unwrap w/o context on why

@Blizzara Blizzara marked this pull request as ready for review June 28, 2024 14:56
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Blizzara -- I think this code is a nice cleanup

The only thing I didn't see was any test coverage for the new code -- without tests it is likely that we would break this functionality in some future refactor.

Could you please add some test coverage (or perhaps correct me if I missed it in this PR)

Thanks again

@@ -128,28 +122,6 @@ pub fn name_to_op(name: &str) -> Result<Operator> {
}
}

fn scalar_function_type_from_str(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is cleaner -- I think the old code is left over from the time when we had BuiltInScalarFunction

@@ -972,7 +944,7 @@ pub async fn from_substrait_rex_vec(
}

/// Convert Substrait FunctionArguments to DataFusion Exprs
pub async fn from_substriat_func_args(
pub async fn from_substrait_func_args(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a nice improvement

@Blizzara
Copy link
Contributor Author

Blizzara commented Jul 1, 2024

Could you please add some test coverage (or perhaps correct me if I missed it in this PR)

That was indeed missing, I had planned to add it but forgot 😅 Added in a08e62b, that also surfaced another issue in consuming bound type which I fixed as well

// If the plan does not specify the bounds type, then we use a simple logic to determine the units
// If there is no `ORDER BY`, then by default, the frame counts each row from the lower up to upper boundary
// If there is `ORDER BY`, then by default, each frame is a range starting from unbounded preceding to current row
if order_by.is_empty() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dunno how well this logic works in reality, but I guess there is some logic to it - sorting is less necessary for a RANGE bound while a ROWS bound without sort is quite meaningless. Anyways we can easily keep it around for the unspecified case for backwards compatibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to simply return an error here. It looks to me like the substrait spec doesn't explicitly allow for unspecified -- I think the fact this field may not be set is because of how protobuf encodes the fields.

https://github.com/substrait-io/substrait/blob/7dbbf0468083d932a61b9c720700bd6083558fa9/proto/substrait/algebra.proto#L1038-L1039

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, the UNSPECIFIED is a proper option in Substrait spec, see https://github.com/substrait-io/substrait/blob/7dbbf0468083d932a61b9c720700bd6083558fa9/proto/substrait/algebra.proto#L1059. I don't have much opinion here (I don't think the plans we produce would ever have UNSPECIFIED), so I'm happy to leave it like this or to make it return a not_impl_err - do you have a preference?

Copy link
Contributor

@alamb alamb Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would personally suggest a not_impl_err to avoid silently ignored errors, but we can do it as a follow on PR (or never)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me . Thanks @Blizzara

// If the plan does not specify the bounds type, then we use a simple logic to determine the units
// If there is no `ORDER BY`, then by default, the frame counts each row from the lower up to upper boundary
// If there is `ORDER BY`, then by default, each frame is a range starting from unbounded preceding to current row
if order_by.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to simply return an error here. It looks to me like the substrait spec doesn't explicitly allow for unspecified -- I think the fact this field may not be set is because of how protobuf encodes the fields.

https://github.com/substrait-io/substrait/blob/7dbbf0468083d932a61b9c720700bd6083558fa9/proto/substrait/algebra.proto#L1038-L1039

@@ -0,0 +1,153 @@
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb merged commit e40c8a8 into apache:main Jul 1, 2024
23 checks passed
@alamb
Copy link
Contributor

alamb commented Jul 1, 2024

Thanks @Blizzara

@Blizzara Blizzara deleted the avo/window-functions branch July 1, 2024 16:31
findepi pushed a commit to findepi/datafusion that referenced this pull request Jul 16, 2024
…ache#11163)

* simplify and deduplicate scalar/aggregate/window function handling

* simplify window function handling and error out faster if function is not found

* fix window function name

* simplify scalar function handling

* fix

* add a test and fix consuming bound types
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants