Skip to content

Commit

Permalink
Properly return a scalar value from a UDF when only input is scalar
Browse files Browse the repository at this point in the history
Fixes #699
  • Loading branch information
mwylde committed Oct 10, 2024
1 parent 655d388 commit 2f05a4b
Showing 1 changed file with 11 additions and 2 deletions.
13 changes: 11 additions & 2 deletions crates/arroyo-udf/arroyo-udf-host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
mod test;

use anyhow::{anyhow, bail};
use arrow::array::{make_array, Array, ArrayData, ArrayRef, UInt64Array};
use arrow::array::{make_array, Array, ArrayData, ArrayRef, Datum, UInt64Array};
use arrow::datatypes::DataType;
use arrow::ffi::from_ffi;
use arroyo_udf_common::async_udf::{DrainResult, SendableFfiAsyncUdfHandle};
Expand Down Expand Up @@ -389,6 +389,9 @@ impl ScalarUDFImpl for SyncUdfDylib {
})
.max()
.unwrap();

let all_scalars = args.iter()
.all(|c| matches!(c, ColumnarValue::Scalar(_)));

let args = args
.iter()
Expand All @@ -402,7 +405,13 @@ impl ScalarUDFImpl for SyncUdfDylib {
match result {
RunResult::Ok(FfiArraySchema(array, schema)) => {
let result_array = unsafe { from_ffi(array, &schema).unwrap() };
Ok(ColumnarValue::Array(make_array(result_array)))

let array = make_array(result_array);
if all_scalars {
Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(&array, 0)?))
} else {
Ok(ColumnarValue::Array(array))
}
}
RunResult::Err => {
panic!("panic in UDF {}", self.name);
Expand Down

0 comments on commit 2f05a4b

Please sign in to comment.