Skip to content

Commit

Permalink
fix in_list conversion in from_proto.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangli20 committed Jul 1, 2024
1 parent 0d5745a commit 79a0f13
Showing 1 changed file with 38 additions and 12 deletions.
50 changes: 38 additions & 12 deletions native-engine/blaze-serde/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{
};

use arrow::{
array::RecordBatch,
compute::SortOptions,
datatypes::{Field, FieldRef, SchemaRef},
};
Expand All @@ -33,22 +34,23 @@ use datafusion::{
},
error::DataFusionError,
execution::context::ExecutionProps,
logical_expr::{BuiltinScalarFunction, Operator},
logical_expr::{BuiltinScalarFunction, ColumnarValue, Operator},
physical_expr::{
expressions::{LikeExpr, SCAndExpr, SCOrExpr},
expressions::{in_list, LikeExpr, SCAndExpr, SCOrExpr},
functions, ScalarFunctionExpr,
},
physical_plan::{
expressions as phys_expr,
expressions::{
BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, Literal,
BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, Literal,
NegativeExpr, NotExpr, PhysicalSortExpr,
},
joins::utils::{ColumnIndex, JoinFilter},
union::UnionExec,
ColumnStatistics, ExecutionPlan, Partitioning, PhysicalExpr, Statistics,
},
};
use datafusion_ext_commons::downcast_any;
use datafusion_ext_exprs::{
cast::TryCastExpr, get_indexed_field::GetIndexedFieldExpr, get_map_value::GetMapValueExpr,
named_struct::NamedStructExpr, row_num::RowNumExpr,
Expand Down Expand Up @@ -926,15 +928,39 @@ fn try_parse_physical_expr(
ExprType::Negative(e) => Arc::new(NegativeExpr::new(
try_parse_physical_expr_box_required(&e.expr, input_schema)?,
)),
ExprType::InList(e) => Arc::new(InListExpr::new(
try_parse_physical_expr_box_required(&e.expr, input_schema)?,
e.list
.iter()
.map(|x| try_parse_physical_expr(x, input_schema))
.collect::<Result<Vec<_>, _>>()?,
e.negated,
None,
)),
ExprType::InList(e) => {
let expr = try_parse_physical_expr_box_required(&e.expr, input_schema)
.and_then(|expr| Ok(bind(expr, input_schema)?))?; // materialize expr.data_type
let dt = expr.data_type(input_schema)?;
in_list(
bind(expr, input_schema)?,
e.list
.iter()
.map(|x| {
Ok::<_, PlanSerDeError>({
match try_parse_physical_expr(x, input_schema)? {
// cast list values to expr type
e if downcast_any!(e, Literal).is_ok()
&& e.data_type(input_schema)? != dt =>
{
match TryCastExpr::new(e, dt.clone()).evaluate(
&RecordBatch::new_empty(input_schema.clone()),
)? {
ColumnarValue::Scalar(scalar) => {
Arc::new(Literal::new(scalar))
}
ColumnarValue::Array(_) => unreachable!(),
}
}
other => other,
}
})
})
.collect::<Result<Vec<_>, _>>()?,
&e.negated,
&input_schema,
)?
}
ExprType::Case(e) => Arc::new(CaseExpr::try_new(
e.expr
.as_ref()
Expand Down

0 comments on commit 79a0f13

Please sign in to comment.