Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: new create_one ExpressionHandler API #662

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
326 changes: 325 additions & 1 deletion kernel/src/engine/arrow_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,133 @@ impl ExpressionHandler for ArrowExpressionHandler {
output_type,
})
}

fn create_one(&self, schema: SchemaRef, values: &[Scalar]) -> DeltaResult<Box<dyn EngineData>> {
let mut array_transform = SingleRowArrayTransform::new(values);
let datatype = schema.into();
array_transform
.transform(&datatype) // FIXME
.unwrap(); // FIXME
let array = array_transform.finish()?;
let struct_array = array.as_any().downcast_ref::<StructArray>().unwrap(); // FIXME

let record_batch: RecordBatch = struct_array.into(); // FIXME will panic for top-level null
Ok(Box::new(ArrowEngineData::new(record_batch)))
}
}

/// [`SchemaTransform`] that will transform a [`Schema`] and an ordered list of leaf values (as
/// Scalar slice) into an arrow Array with a single row of each literal.
struct SingleRowArrayTransform<'a> {
/// Leaf-node values to insert in schema order.
scalars: &'a [Scalar],
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
/// Index into `scalars` for the next leaf we encounter.
next_scalar_idx: usize,
/// A stack of built arrays. After visiting children, we pop them off to
/// build the parent container, then push the parent back on.
stack: Vec<ArrayRef>,
}

impl<'a> SingleRowArrayTransform<'a> {
fn new(scalars: &'a [Scalar]) -> Self {
Self {
scalars,
next_scalar_idx: 0,
stack: Vec::new(),
}
}

/// top of `stack` should be our single-row struct array
fn finish(mut self) -> DeltaResult<ArrayRef> {
match self.stack.pop() {
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
Some(array) => Ok(array),
None => Err(Error::generic("didn't build array")),
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relating to the other FIXME about panicking:

Suggested change
match self.stack.pop() {
Some(array) => Ok(array),
None => Err(Error::generic("didn't build array")),
}
let Some(array) = self.stack.pop() else {
return Err(Error::generic("didn't build array"));
}
let Some(array) = array.as_struct_opt() else {
return Err(Error::generic("not a struct"));
}
Ok(array)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as_struct_opt will return an &StructArray - and I want to avoid having to clone that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just ended up checking array.data_type() though I wonder if it would be better to actually return an Arc<StructArray> instead of the trait object ArrayRef?

}
}

use crate::schema::SchemaTransform;
use crate::schema::StructType;
use std::borrow::Cow;
impl<'a> SchemaTransform<'a> for SingleRowArrayTransform<'a> {
fn transform_primitive(&mut self, ptype: &'a PrimitiveType) -> Option<Cow<'a, PrimitiveType>> {
// check bounds
if self.next_scalar_idx >= self.scalars.len() {
return None; // TODO
}
let scalar = &self.scalars[self.next_scalar_idx];
self.next_scalar_idx += 1;
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved

let arr = match scalar.to_array(1) {
Ok(a) => a,
Err(_e) => return None, // or handle error
};
self.stack.push(arr);

Some(Cow::Borrowed(ptype))
}

fn transform_struct(&mut self, stype: &'a StructType) -> Option<Cow<'a, StructType>> {
self.recurse_into_struct(stype)?;

let n_fields = stype.fields.len();
if self.stack.len() < n_fields {
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
return None; // FIXME
}

// pop in reverse
let mut child_arrays = Vec::with_capacity(n_fields);
for _ in 0..n_fields {
child_arrays.push(self.stack.pop().unwrap());
}
child_arrays.reverse();
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved

let field_refs: Vec<StructField> = stype
.fields()
.map(|f| StructField::new(f.name.clone(), f.data_type.clone(), f.nullable))
.collect(); // FIXME
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved

// let struct_dt = DataType::Struct(Box::new(StructType::new(field_refs.clone())));

let arrow_fields = field_refs
.iter()
.map(|f| ArrowField::try_from(f))
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
.try_collect()
.unwrap(); // FIXME
let struct_arr = Arc::new(StructArray::new(arrow_fields, child_arrays, None)) as ArrayRef;
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved

self.stack.push(struct_arr);
Some(Cow::Borrowed(stype))
}

fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
self.recurse_into_struct_field(field)
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
}

// For arrays, do something similar:
fn transform_array(&mut self, atype: &'a ArrayType) -> Option<Cow<'a, ArrayType>> {
// Recurse into child
self.recurse_into_array(atype)?;

// Once child is done, pop the child's single-row array from the stack
let child_array = self.stack.pop().unwrap();

unimplemented!();

Some(Cow::Borrowed(atype))
}

fn transform_map(&mut self, mtype: &'a MapType) -> Option<Cow<'a, MapType>> {
self.recurse_into_map(mtype)?;

let value_arr = self.stack.pop().unwrap();
let key_arr = self.stack.pop().unwrap();

unimplemented!();

// build map, push
Some(Cow::Borrowed(mtype))
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -568,7 +695,7 @@ impl ExpressionEvaluator for DefaultExpressionEvaluator {
mod tests {
use std::ops::{Add, Div, Mul, Sub};

use arrow_array::{GenericStringArray, Int32Array};
use arrow_array::{create_array, record_batch, GenericStringArray, Int32Array};
use arrow_buffer::ScalarBuffer;
use arrow_schema::{DataType, Field, Fields, Schema};

Expand Down Expand Up @@ -867,4 +994,201 @@ mod tests {
let expected = Arc::new(BooleanArray::from(vec![true, false]));
assert_eq!(results.as_ref(), expected.as_ref());
}

#[test]
fn test_create_one() {
let values: &[Scalar] = &[1.into(), 2.into(), 3.into()];
let schema = Arc::new(crate::schema::StructType::new([
StructField::new("a", DeltaDataTypes::INTEGER, true),
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
StructField::new("b", DeltaDataTypes::INTEGER, true),
StructField::new("c", DeltaDataTypes::INTEGER, true),
]));

let handler = ArrowExpressionHandler;
let actual = handler.create_one(schema, values).unwrap();
let expected =
record_batch!(("a", Int32, [1]), ("b", Int32, [2]), ("c", Int32, [3])).unwrap();
let actual_rb: RecordBatch = actual
.into_any()
.downcast::<ArrowEngineData>()
.unwrap()
.into();
assert_eq!(actual_rb, expected);
}

//#[test]
//fn test_create_one_string() {
// let expr = Expression::struct_from([Expression::literal("a")]);
// let schema = Arc::new(crate::schema::StructType::new([StructField::new(
// "col_1",
// DeltaDataTypes::STRING,
// true,
// )]));
//
// let handler = ArrowExpressionHandler;
// let actual = handler.create_one(schema, &expr).unwrap();
// let expected = record_batch!(("col_1", Utf8, ["a"])).unwrap();
// let actual_rb: RecordBatch = actual
// .into_any()
// .downcast::<ArrowEngineData>()
// .unwrap()
// .into();
// assert_eq!(actual_rb, expected);
//}
//
//#[test]
//fn test_create_one_null() {
// let expr = Expression::struct_from([Expression::null_literal(DeltaDataTypes::INTEGER)]);
// let schema = Arc::new(crate::schema::StructType::new([StructField::new(
// "col_1",
// DeltaDataTypes::INTEGER,
// true,
// )]));
//
// let handler = ArrowExpressionHandler;
// let actual = handler.create_one(schema, &expr).unwrap();
// let expected = record_batch!(("col_1", Int32, [None])).unwrap();
// let actual_rb: RecordBatch = actual
// .into_any()
// .downcast::<ArrowEngineData>()
// .unwrap()
// .into();
// assert_eq!(actual_rb, expected);
//}
//
//#[test]
//fn test_create_one_non_null() {
// let expr = Expression::struct_from([Expression::literal(1)]);
// let schema = Arc::new(crate::schema::StructType::new([StructField::new(
// "a",
// DeltaDataTypes::INTEGER,
// false,
// )]));
//
// let handler = ArrowExpressionHandler;
// let actual = handler.create_one(schema, &expr).unwrap();
// let expected_schema = Arc::new(arrow_schema::Schema::new(vec![Field::new(
// "a",
// DataType::Int32,
// false,
// )]));
// let expected =
// RecordBatch::try_new(expected_schema, vec![create_array!(Int32, [1])]).unwrap();
// let actual_rb: RecordBatch = actual
// .into_any()
// .downcast::<ArrowEngineData>()
// .unwrap()
// .into();
// assert_eq!(actual_rb, expected);
//}
//
//#[test]
//fn test_create_one_disallow_column_ref() {
// let expr = Expression::struct_from([column_expr!("a")]);
// let schema = Arc::new(crate::schema::StructType::new([StructField::new(
// "a",
// DeltaDataTypes::INTEGER,
// true,
// )]));
//
// let handler = ArrowExpressionHandler;
// let actual = handler.create_one(schema, &expr);
// assert!(actual.is_err());
//}
//
//#[test]
//fn test_create_one_disallow_operator() {
// let expr = Expression::struct_from([Expression::binary(
// BinaryOperator::Plus,
// Expression::literal(1),
// Expression::literal(2),
// )]);
// let schema = Arc::new(crate::schema::StructType::new([StructField::new(
// "a",
// DeltaDataTypes::INTEGER,
// true,
// )]));
//
// let handler = ArrowExpressionHandler;
// let actual = handler.create_one(schema, &expr);
// assert!(actual.is_err());
//}
//
//#[test]
//fn test_create_one_nested() {
// let expr = Expression::struct_from([Expression::struct_from([
// Expression::literal(1),
// Expression::literal(2),
// ])]);
// let schema = Arc::new(crate::schema::StructType::new([StructField::new(
// "a",
// DeltaDataTypes::struct_type([
// StructField::new("b", DeltaDataTypes::INTEGER, true),
// StructField::new("c", DeltaDataTypes::INTEGER, false),
// ]),
// false,
// )]));
//
// let handler = ArrowExpressionHandler;
// let actual = handler.create_one(schema, &expr).unwrap();
// let expected_schema = Arc::new(arrow_schema::Schema::new(vec![Field::new(
// "a",
// DataType::Struct(
// vec![
// Field::new("b", DataType::Int32, true),
// Field::new("c", DataType::Int32, false),
// ]
// .into(),
// ),
// false,
// )]));
// let expected = RecordBatch::try_new(
// expected_schema,
// vec![Arc::new(StructArray::from(vec![
// (
// Arc::new(Field::new("b", DataType::Int32, true)),
// create_array!(Int32, [1]) as ArrayRef,
// ),
// (
// Arc::new(Field::new("c", DataType::Int32, false)),
// create_array!(Int32, [2]) as ArrayRef,
// ),
// ]))],
// )
// .unwrap();
// let actual_rb: RecordBatch = actual
// .into_any()
// .downcast::<ArrowEngineData>()
// .unwrap()
// .into();
// assert_eq!(actual_rb, expected);
//
// // make the same but with literal struct instead of struct of literal
// let struct_data = StructData::try_new(
// vec![
// StructField::new("b", DeltaDataTypes::INTEGER, true),
// StructField::new("c", DeltaDataTypes::INTEGER, false),
// ],
// vec![Scalar::Integer(1), Scalar::Integer(2)],
// )
// .unwrap();
// let expr = Expression::struct_from([Expression::literal(Scalar::Struct(struct_data))]);
// let schema = Arc::new(crate::schema::StructType::new([StructField::new(
// "a",
// DeltaDataTypes::struct_type([
// StructField::new("b", DeltaDataTypes::INTEGER, true),
// StructField::new("c", DeltaDataTypes::INTEGER, false),
// ]),
// false,
// )]));
//
// let handler = ArrowExpressionHandler;
// let actual = handler.create_one(schema, &expr).unwrap();
// let actual_rb: RecordBatch = actual
// .into_any()
// .downcast::<ArrowEngineData>()
// .unwrap()
// .into();
// assert_eq!(actual_rb, expected);
//}
}
8 changes: 8 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ pub use error::{DeltaResult, Error};
pub use expressions::{Expression, ExpressionRef};
pub use table::Table;

use expressions::Scalar;

#[cfg(any(
feature = "default-engine",
feature = "sync-engine",
Expand Down Expand Up @@ -337,6 +339,12 @@ pub trait ExpressionHandler: AsAny {
expression: Expression,
output_type: DataType,
) -> Arc<dyn ExpressionEvaluator>;

/// Create a single-row [`EngineData`] by applying the given schema to the leaf-values given in
/// `values`.
// Note: we will stick with a Schema instead of DataType (more constrained can expand in
// future)
fn create_one(&self, schema: SchemaRef, values: &[Scalar]) -> DeltaResult<Box<dyn EngineData>>;
}

/// Provides file system related functionalities to Delta Kernel.
Expand Down
5 changes: 5 additions & 0 deletions kernel/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ impl StructType {
self.fields.values()
}

pub fn len(&self) -> usize {
// O(1) for indexmap
self.fields.len()
}

/// Extracts the name and type of all leaf columns, in schema order. Caller should pass Some
/// `own_name` if this schema is embedded in a larger struct (e.g. `add.*`) and None if the
/// schema is a top-level result (e.g. `*`).
Expand Down
Loading