Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix functions with Volatility::Volatile and parameters #13001

Merged
merged 1 commit into from
Oct 22, 2024

Conversation

agscpp
Copy link
Contributor

@agscpp agscpp commented Oct 18, 2024

Which issue does this PR close?

Closes #13000.

Rationale for this change

These changes will allow functions to be implemented that produce a unique result for each call given the same input.

@github-actions github-actions bot added logical-expr Logical plan and expressions physical-expr Physical Expressions core Core DataFusion crate labels Oct 18, 2024
@agscpp agscpp force-pushed the fix_udf_function branch 2 times, most recently from ff4b90c to 4c8ebc4 Compare October 18, 2024 14:22
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @agscpp -- this makes a lot of sense and I think is quite close. I left some API comments for your consideration. Let me know!

Comment on lines 492 to 495
not_impl_err!(
"Function {} does not implement invoke_batch but called",
self.name()
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 It seems like the ideal outcome would be for all ScalarUDFs to implement this method (as it covers invoke, invoke_no_args as well).

Would you be open to changing this so it uses a default implementation like this?

Suggested change
not_impl_err!(
"Function {} does not implement invoke_batch but called",
self.name()
)
if _args.empty() {
self.invoke_no_args(number_rows)
} else {
self.invoke(args)
}

Then the function implementation could decide what to do with that information

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"+-----------+", //
"| str |", //
"+-----------+", //
"| 1. test_1 |", //
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the meaning of the trailing // ?

Also, it seems like the indexes repeat (multiple with 1) imply invoke is run multiple times - perhaps we could set target_partitions to 1 on the SessionContext so the data wasn't repartitioned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I myself did not understand what the // are for, but I left it because this style was used above.

Copy link
Contributor Author

@agscpp agscpp Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the invoke_batch method is called once. In the logic of the function itself, I wrote the indexing of the module by the contents of the lines.

1. test_1 
2. test_1 
3. test_1 
4. test_1 
1. test_2 
2. test_2 
3. test_2 

@@ -143,7 +143,10 @@ impl PhysicalExpr for ScalarFunctionExpr {
// evaluate the function
let output = match self.args.is_empty() {
true => self.fun.invoke_no_args(batch.num_rows()),
false => self.fun.invoke(&inputs),
false => match self.fun.signature().volatility {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you modified invoke_batch as above, we could change this code to simply call self.fun.invoke_batch() always

@agscpp agscpp requested a review from alamb October 21, 2024 14:11
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @agscpp -- this looks great to me

@@ -467,7 +478,28 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
/// to arrays, which will likely be simpler code, but be slower.
///
/// [invoke_no_args]: ScalarUDFImpl::invoke_no_args
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue>;
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! -- as a follow on PR I think we should deprecate the other two functions (invoke_no_args and invoke) telling people to use invoke instead

Is this ok with you @jayzhan211 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a follow on PR I think we should deprecate the other two functions (invoke_no_args and invoke) telling people to use invoke instead

did you mean invoke_batch?

yes, it would be great to have only one invoke entry-point

assert_batches_eq!(expected, &result);

let result =
plan_and_collect(&ctx, "select add_index_to_string('test') AS str from t") // with fixed function parameters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

@@ -467,7 +478,28 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
/// to arrays, which will likely be simpler code, but be slower.
///
/// [invoke_no_args]: ScalarUDFImpl::invoke_no_args
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue>;
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a follow on PR I think we should deprecate the other two functions (invoke_no_args and invoke) telling people to use invoke instead

did you mean invoke_batch?

yes, it would be great to have only one invoke entry-point

Comment on lines 495 to 496
_args: &[ColumnarValue],
_number_rows: usize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not unused, let's remove leading _ from arg names

Comment on lines 491 to 492
/// The function should be used for signatures with [`datafusion_expr_common::signature::Volatility::Volatile`]
/// and with arguments.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only for these?

if yes => the function should be named appropriate invoke_volatile)
if no => remove the comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this comment. This function is always called in the current implementation.

Comment on lines +643 to +633
}
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: consider separating test cases into separate test functions, this would given them descriptive names

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are very similar, so they don't break down into two parts well.

Comment on lines 488 to 489
/// Volatile UDF that should be append a different value to each row
struct AddIndexToStringScalarUDF {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The volatility is important, let's reflect it in the function name -- it's this function's main purpose, not an attribute it happens to have.

AddIndexToStringVolatileScalarUDF

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you!

@@ -483,6 +485,196 @@ async fn test_user_defined_functions_with_alias() -> Result<()> {
Ok(())
}

/// Volatile UDF that should be append a different value to each row
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Volatile UDF that should be append a different value to each row
/// Volatile UDF that should append a different value to each row
#[derive(Debug)]

Comment on lines 505 to 514
impl std::fmt::Debug for AddIndexToStringScalarUDF {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("ScalarUDF")
.field("name", &self.name)
.field("signature", &self.signature)
.field("fun", &"<FUNC>")
.finish()
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leverage #[derive(Debug)], to ensure all fields are part of debug.

Suggested change
impl std::fmt::Debug for AddIndexToStringScalarUDF {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("ScalarUDF")
.field("name", &self.name)
.field("signature", &self.signature)
.field("fun", &"<FUNC>")
.finish()
}
}

_ => unimplemented!(),
};
Ok(ColumnarValue::Array(
Arc::new(StringArray::from(answer)) as ArrayRef
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Arc::new(StringArray::from(answer)) as ArrayRef
Arc::new(StringArray::from(answer))

}
_ => unimplemented!(),
};
Ok(ColumnarValue::Array(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK for this function to return array also when it's invoked with ColumnarValue::Scalar only?

Copy link
Contributor Author

@agscpp agscpp Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's fine. If you return a ColumnarValue::Scalar, all rows will have the same result.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about #12922

@agscpp agscpp requested a review from findepi October 22, 2024 11:28
@alamb alamb merged commit ef1365a into apache:main Oct 22, 2024
24 checks passed
@alamb
Copy link
Contributor

alamb commented Oct 22, 2024

Thank you @agscpp and @findepi 🚀

agscpp added a commit to agscpp/datafusion that referenced this pull request Oct 22, 2024
agscpp added a commit to tarantool/datafusion that referenced this pull request Oct 29, 2024
agscpp added a commit to tarantool/datafusion that referenced this pull request Oct 29, 2024
agscpp added a commit to tarantool/datafusion that referenced this pull request Oct 29, 2024
askalt pushed a commit to tarantool/datafusion that referenced this pull request Nov 21, 2024
0x501D pushed a commit to tarantool/datafusion that referenced this pull request Nov 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Volatility::Volatile does not work for functions with arguments
3 participants