Skip to content

Commit

Permalink
Add partition by constructs in window functions and modify logical …
Browse files Browse the repository at this point in the history
…planning (#501)

* closing up type checks

* add fmt
  • Loading branch information
jimexist authored Jun 9, 2021
1 parent 42f908e commit d5bca0e
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 84 deletions.
2 changes: 1 addition & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ message WindowExprNode {
// udaf = 3
}
LogicalExprNode expr = 4;
// repeated LogicalExprNode partition_by = 5;
repeated LogicalExprNode partition_by = 5;
repeated LogicalExprNode order_by = 6;
// repeated LogicalExprNode filter = 7;
oneof window_frame {
Expand Down
8 changes: 8 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,12 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
.window_function
.as_ref()
.ok_or_else(|| proto_error("Received empty window function"))?;
let partition_by = expr
.partition_by
.iter()
.map(|e| e.try_into())
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
let order_by = expr
.order_by
.iter()
Expand Down Expand Up @@ -940,6 +946,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
AggregateFunction::from(aggr_function),
),
args: vec![parse_required_expr(&expr.expr)?],
partition_by,
order_by,
window_frame,
})
Expand All @@ -960,6 +967,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
BuiltInWindowFunction::from(built_in_function),
),
args: vec![parse_required_expr(&expr.expr)?],
partition_by,
order_by,
window_frame,
})
Expand Down
6 changes: 6 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
Expr::WindowFunction {
ref fun,
ref args,
ref partition_by,
ref order_by,
ref window_frame,
..
Expand All @@ -1023,6 +1024,10 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
}
};
let arg = &args[0];
let partition_by = partition_by
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<_>, _>>()?;
let order_by = order_by
.iter()
.map(|e| e.try_into())
Expand All @@ -1035,6 +1040,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
let window_expr = Box::new(protobuf::WindowExprNode {
expr: Some(Box::new(arg.try_into()?)),
window_function: Some(window_function),
partition_by,
order_by,
window_frame,
});
Expand Down
8 changes: 8 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
Expr::WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
..
} => {
let arg = df_planner
Expand All @@ -248,9 +250,15 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.map_err(|e| {
BallistaError::General(format!("{:?}", e))
})?;
if !partition_by.is_empty() {
return Err(BallistaError::NotImplemented("Window function with partition by is not yet implemented".to_owned()));
}
if !order_by.is_empty() {
return Err(BallistaError::NotImplemented("Window function with order by is not yet implemented".to_owned()));
}
if window_frame.is_some() {
return Err(BallistaError::NotImplemented("Window function with window frame is not yet implemented".to_owned()));
}
let window_expr = create_window_expr(
&fun,
&[arg],
Expand Down
14 changes: 13 additions & 1 deletion datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ pub enum Expr {
fun: window_functions::WindowFunction,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
/// List of partition by expressions
partition_by: Vec<Expr>,
/// List of order by expressions
order_by: Vec<Expr>,
/// Window frame
Expand Down Expand Up @@ -588,10 +590,18 @@ impl Expr {
Expr::ScalarUDF { args, .. } => args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
Expr::WindowFunction { args, order_by, .. } => {
Expr::WindowFunction {
args,
partition_by,
order_by,
..
} => {
let visitor = args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
let visitor = partition_by
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
let visitor = order_by
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
Expand Down Expand Up @@ -733,11 +743,13 @@ impl Expr {
Expr::WindowFunction {
args,
fun,
partition_by,
order_by,
window_frame,
} => Expr::WindowFunction {
args: rewrite_vec(args, rewriter)?,
fun,
partition_by: rewrite_vec(partition_by, rewriter)?,
order_by: rewrite_vec(order_by, rewriter)?,
window_frame,
},
Expand Down
6 changes: 1 addition & 5 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,11 +687,7 @@ impl LogicalPlan {
LogicalPlan::Window {
ref window_expr, ..
} => {
write!(
f,
"WindowAggr: windowExpr=[{:?}] partitionBy=[]",
window_expr
)
write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr)
}
LogicalPlan::Aggregate {
ref group_expr,
Expand Down
46 changes: 38 additions & 8 deletions datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{

const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__";
const CASE_ELSE_MARKER: &str = "__DATAFUSION_CASE_ELSE__";
const WINDOW_PARTITION_MARKER: &str = "__DATAFUSION_WINDOW_PARTITION__";
const WINDOW_SORT_MARKER: &str = "__DATAFUSION_WINDOW_SORT__";

/// Recursively walk a list of expression trees, collecting the unique set of column
Expand Down Expand Up @@ -258,9 +259,16 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<Expr>> {
Expr::IsNotNull(e) => Ok(vec![e.as_ref().to_owned()]),
Expr::ScalarFunction { args, .. } => Ok(args.clone()),
Expr::ScalarUDF { args, .. } => Ok(args.clone()),
Expr::WindowFunction { args, order_by, .. } => {
Expr::WindowFunction {
args,
partition_by,
order_by,
..
} => {
let mut expr_list: Vec<Expr> = vec![];
expr_list.extend(args.clone());
expr_list.push(lit(WINDOW_PARTITION_MARKER));
expr_list.extend(partition_by.clone());
expr_list.push(lit(WINDOW_SORT_MARKER));
expr_list.extend(order_by.clone());
Ok(expr_list)
Expand Down Expand Up @@ -340,7 +348,20 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
Expr::WindowFunction {
fun, window_frame, ..
} => {
let index = expressions
let partition_index = expressions
.iter()
.position(|expr| {
matches!(expr, Expr::Literal(ScalarValue::Utf8(Some(str)))
if str == WINDOW_PARTITION_MARKER)
})
.ok_or_else(|| {
DataFusionError::Internal(
"Ill-formed window function expressions: unexpected marker"
.to_owned(),
)
})?;

let sort_index = expressions
.iter()
.position(|expr| {
matches!(expr, Expr::Literal(ScalarValue::Utf8(Some(str)))
Expand All @@ -351,12 +372,21 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
"Ill-formed window function expressions".to_owned(),
)
})?;
Ok(Expr::WindowFunction {
fun: fun.clone(),
args: expressions[..index].to_vec(),
order_by: expressions[index + 1..].to_vec(),
window_frame: *window_frame,
})

if partition_index >= sort_index {
Err(DataFusionError::Internal(
"Ill-formed window function expressions: partition index too large"
.to_owned(),
))
} else {
Ok(Expr::WindowFunction {
fun: fun.clone(),
args: expressions[..partition_index].to_vec(),
partition_by: expressions[partition_index + 1..sort_index].to_vec(),
order_by: expressions[sort_index + 1..].to_vec(),
window_frame: *window_frame,
})
}
}
Expr::AggregateFunction { fun, distinct, .. } => Ok(Expr::AggregateFunction {
fun: fun.clone(),
Expand Down
Loading

0 comments on commit d5bca0e

Please sign in to comment.