Skip to content

Commit

Permalink
Revert "chore: remove panics in datafusion-common::scalar (apache#7901)"
Browse files Browse the repository at this point in the history
This reverts commit e642cc2.
  • Loading branch information
alamb committed Nov 12, 2023
1 parent 6fe00ce commit f3fca2b
Show file tree
Hide file tree
Showing 54 changed files with 427 additions and 723 deletions.
2 changes: 1 addition & 1 deletion datafusion/common/src/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl FromPyArrow for ScalarValue {

impl ToPyArrow for ScalarValue {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let array = self.to_array()?;
let array = self.to_array();
// convert to pyarrow array using C data interface
let pyarray = array.to_data().to_pyarrow(py)?;
let pyscalar = pyarray.call_method1(py, "__getitem__", (0,))?;
Expand Down
598 changes: 247 additions & 351 deletions datafusion/common/src/scalar.rs

Large diffs are not rendered by default.

10 changes: 1 addition & 9 deletions datafusion/core/benches/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,7 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("to_array_of_size 100000", |b| {
let scalar = ScalarValue::Int32(Some(100));

b.iter(|| {
assert_eq!(
scalar
.to_array_of_size(100000)
.expect("Failed to convert to array of size")
.null_count(),
0
)
})
b.iter(|| assert_eq!(scalar.to_array_of_size(100000).null_count(), 0))
});
}

Expand Down
5 changes: 1 addition & 4 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,7 @@ async fn prune_partitions(
// Applies `filter` to `batch` returning `None` on error
let do_filter = |filter| -> Option<ArrayRef> {
let expr = create_physical_expr(filter, &df_schema, &schema, &props).ok()?;
expr.evaluate(&batch)
.ok()?
.into_array(partitions.len())
.ok()
Some(expr.evaluate(&batch).ok()?.into_array(partitions.len()))
};

//.Compute the conjunction of the filters, ignoring errors
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl PartitionColumnProjector {
&mut self.key_buffer_cache,
partition_value.as_ref(),
file_batch.num_rows(),
)?,
),
)
}

Expand Down Expand Up @@ -396,11 +396,11 @@ fn create_dict_array<T>(
dict_val: &ScalarValue,
len: usize,
data_type: DataType,
) -> Result<ArrayRef>
) -> ArrayRef
where
T: ArrowNativeType,
{
let dict_vals = dict_val.to_array()?;
let dict_vals = dict_val.to_array();

let sliced_key_buffer = buffer_gen.get_buffer(len);

Expand All @@ -409,16 +409,16 @@ where
.len(len)
.add_buffer(sliced_key_buffer);
builder = builder.add_child_data(dict_vals.to_data());
Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
Arc::new(DictionaryArray::<UInt16Type>::from(
builder.build().unwrap(),
)))
))
}

fn create_output_array(
key_buffer_cache: &mut ZeroBufferGenerators,
val: &ScalarValue,
len: usize,
) -> Result<ArrayRef> {
) -> ArrayRef {
if let ScalarValue::Dictionary(key_type, dict_val) = &val {
match key_type.as_ref() {
DataType::Int8 => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl ArrowPredicate for DatafusionArrowPredicate {
match self
.physical_expr
.evaluate(&batch)
.and_then(|v| v.into_array(batch.num_rows()))
.map(|v| v.into_array(batch.num_rows()))
{
Ok(array) => {
let bool_arr = as_boolean_array(&array)?.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ macro_rules! get_min_max_values {
.flatten()
// column either didn't have statistics at all or didn't have min/max values
.or_else(|| Some(null_scalar.clone()))
.and_then(|s| s.to_array().ok())
.map(|s| s.to_array())
}}
}

Expand All @@ -425,7 +425,7 @@ macro_rules! get_null_count_values {
},
);

value.to_array().ok()
Some(value.to_array())
}};
}

Expand Down
14 changes: 5 additions & 9 deletions datafusion/expr/src/columnar_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use arrow::array::ArrayRef;
use arrow::array::NullArray;
use arrow::datatypes::DataType;
use datafusion_common::{Result, ScalarValue};
use datafusion_common::ScalarValue;
use std::sync::Arc;

/// Represents the result of evaluating an expression: either a single
Expand All @@ -47,15 +47,11 @@ impl ColumnarValue {

/// Convert a columnar value into an ArrayRef. [`Self::Scalar`] is
/// converted by repeating the same scalar multiple times.
///
/// # Errors
///
/// Errors if `self` is a Scalar that fails to be converted into an array of size
pub fn into_array(self, num_rows: usize) -> Result<ArrayRef> {
Ok(match self {
pub fn into_array(self, num_rows: usize) -> ArrayRef {
match self {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
})
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
}
}

/// null columnar values are implemented as a null array in order to pass batch
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/window_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl WindowAggState {
}

pub fn new(out_type: &DataType) -> Result<Self> {
let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0)?;
let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0);
Ok(Self {
window_frame_range: Range { start: 0, end: 0 },
window_frame_ctx: None,
Expand Down
8 changes: 2 additions & 6 deletions datafusion/optimizer/src/unwrap_cast_in_comparison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1089,12 +1089,8 @@ mod tests {
// Verify that calling the arrow
// cast kernel yields the same results
// input array
let literal_array = literal
.to_array_of_size(1)
.expect("Failed to convert to array of size");
let expected_array = expected_value
.to_array_of_size(1)
.expect("Failed to convert to array of size");
let literal_array = literal.to_array_of_size(1);
let expected_array = expected_value.to_array_of_size(1);
let cast_array = cast_with_options(
&literal_array,
&target_type,
Expand Down
12 changes: 4 additions & 8 deletions datafusion/physical-expr/src/aggregate/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,17 +505,13 @@ mod tests {

let values1 = expr1
.iter()
.map(|e| {
e.evaluate(batch1)
.and_then(|v| v.into_array(batch1.num_rows()))
})
.map(|e| e.evaluate(batch1))
.map(|r| r.map(|v| v.into_array(batch1.num_rows())))
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
.map(|e| {
e.evaluate(batch2)
.and_then(|v| v.into_array(batch2.num_rows()))
})
.map(|e| e.evaluate(batch2))
.map(|r| r.map(|v| v.into_array(batch2.num_rows())))
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
Expand Down
12 changes: 4 additions & 8 deletions datafusion/physical-expr/src/aggregate/covariance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,17 +754,13 @@ mod tests {

let values1 = expr1
.iter()
.map(|e| {
e.evaluate(batch1)
.and_then(|v| v.into_array(batch1.num_rows()))
})
.map(|e| e.evaluate(batch1))
.map(|r| r.map(|v| v.into_array(batch1.num_rows())))
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
.map(|e| {
e.evaluate(batch2)
.and_then(|v| v.into_array(batch2.num_rows()))
})
.map(|e| e.evaluate(batch2))
.map(|r| r.map(|v| v.into_array(batch2.num_rows())))
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
Expand Down
10 changes: 2 additions & 8 deletions datafusion/physical-expr/src/aggregate/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,10 +587,7 @@ mod tests {
let mut states = vec![];

for idx in 0..state1.len() {
states.push(concat(&[
&state1[idx].to_array()?,
&state2[idx].to_array()?,
])?);
states.push(concat(&[&state1[idx].to_array(), &state2[idx].to_array()])?);
}

let mut first_accumulator =
Expand All @@ -617,10 +614,7 @@ mod tests {
let mut states = vec![];

for idx in 0..state1.len() {
states.push(concat(&[
&state1[idx].to_array()?,
&state2[idx].to_array()?,
])?);
states.push(concat(&[&state1[idx].to_array(), &state2[idx].to_array()])?);
}

let mut last_accumulator =
Expand Down
12 changes: 4 additions & 8 deletions datafusion/physical-expr/src/aggregate/stddev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,17 +445,13 @@ mod tests {

let values1 = expr1
.iter()
.map(|e| {
e.evaluate(batch1)
.and_then(|v| v.into_array(batch1.num_rows()))
})
.map(|e| e.evaluate(batch1))
.map(|r| r.map(|v| v.into_array(batch1.num_rows())))
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
.map(|e| {
e.evaluate(batch2)
.and_then(|v| v.into_array(batch2.num_rows()))
})
.map(|e| e.evaluate(batch2))
.map(|r| r.map(|v| v.into_array(batch2.num_rows())))
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/aggregate/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ use std::sync::Arc;
pub fn get_accum_scalar_values_as_arrays(
accum: &dyn Accumulator,
) -> Result<Vec<ArrayRef>> {
accum
Ok(accum
.state()?
.iter()
.map(|s| s.to_array_of_size(1))
.collect::<Result<Vec<_>>>()
.collect::<Vec<_>>())
}

/// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow
Expand Down
12 changes: 4 additions & 8 deletions datafusion/physical-expr/src/aggregate/variance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,17 +519,13 @@ mod tests {

let values1 = expr1
.iter()
.map(|e| {
e.evaluate(batch1)
.and_then(|v| v.into_array(batch1.num_rows()))
})
.map(|e| e.evaluate(batch1))
.map(|r| r.map(|v| v.into_array(batch1.num_rows())))
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
.map(|e| {
e.evaluate(batch2)
.and_then(|v| v.into_array(batch2.num_rows()))
})
.map(|e| e.evaluate(batch2))
.map(|r| r.map(|v| v.into_array(batch2.num_rows())))
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/conditional_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if value.is_null() {
continue;
} else {
let last_value = value.to_array_of_size(size)?;
let last_value = value.to_array_of_size(size);
current_value =
zip(&remainder, &last_value, current_value.as_ref())?;
break;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {

let array = match array {
ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => scalar.to_array()?,
ColumnarValue::Scalar(scalar) => scalar.to_array(),
};

let arr = match date_part.to_lowercase().as_str() {
Expand Down
Loading

0 comments on commit f3fca2b

Please sign in to comment.