Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ballista]Make PhysicalAggregateExprNode has repeated PhysicalExprNode #2184

Merged
merged 1 commit into from
Apr 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,4 +693,52 @@ mod tests {
.collect::<Vec<&str>>()
);
}

#[tokio::test]
#[cfg(feature = "standalone")]
async fn test_percentile_func() {
use crate::context::BallistaContext;
use ballista_core::config::{
BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
};
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::prelude::ParquetReadOptions;
let config = BallistaConfigBuilder::default()
.set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
.build()
.unwrap();
let context = BallistaContext::standalone(&config, 1).await.unwrap();

let testdata = datafusion::test_util::parquet_test_data();
context
.register_parquet(
"test",
&format!("{}/alltypes_plain.parquet", testdata),
ParquetReadOptions::default(),
)
.await
.unwrap();
let df = context
.sql("select approx_percentile_cont(\"double_col\", 0.5) from test")
.await
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
"+----------------------------------------------------+",
"| APPROXPERCENTILECONT(test.double_col,Float64(0.5)) |",
"+----------------------------------------------------+",
"| 7.574999999999999 |",
"+----------------------------------------------------+",
];

assert_eq!(
expected,
pretty_format_batches(&*res)
.unwrap()
.to_string()
.trim()
.lines()
.collect::<Vec<&str>>()
);
}
}
2 changes: 1 addition & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ message PhysicalScalarUdfNode {

message PhysicalAggregateExprNode {
datafusion.AggregateFunction aggr_function = 1;
PhysicalExprNode expr = 2;
repeated PhysicalExprNode expr = 2;
}

message PhysicalWindowExprNode {
Expand Down
8 changes: 3 additions & 5 deletions ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,15 +372,13 @@ impl AsExecutionPlan for PhysicalPlanNode {
},
)?;

let input_phy_expr = agg_node.expr.as_ref()
.map(|e| parse_physical_expr(e.as_ref(), registry))
.transpose()?
.ok_or_else(|| proto_error("missing aggregate expression".to_string()))?;
let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node.expr.iter()
.map(|e| parse_physical_expr(e, registry).unwrap()).collect();

Ok(create_aggregate_expr(
&aggr_function.into(),
false,
&[input_phy_expr],
input_phy_expr.as_slice(),
&physical_schema,
name.to_string(),
)?)
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn AggregateExpr> {
.collect::<Result<Vec<_>, BallistaError>>()?;
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
Box::new(protobuf::PhysicalAggregateExprNode {
protobuf::PhysicalAggregateExprNode {
aggr_function,
expr: Some(Box::new(expressions[0].clone())),
}),
expr: expressions,
},
)),
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub fn is_approx_percentile_cont_supported_arg_type(arg_type: &DataType) -> bool
pub struct ApproxPercentileCont {
name: String,
input_data_type: DataType,
expr: Arc<dyn PhysicalExpr>,
expr: Vec<Arc<dyn PhysicalExpr>>,
percentile: f64,
}

Expand Down Expand Up @@ -103,7 +103,7 @@ impl ApproxPercentileCont {
name: name.into(),
input_data_type,
// The physical expr to evaluate during accumulation
expr: expr[0].clone(),
expr,
percentile,
})
}
Expand Down Expand Up @@ -181,7 +181,7 @@ impl AggregateExpr for ApproxPercentileCont {
}

fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
self.expr.clone()
}

fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Expand Down Expand Up @@ -314,11 +314,6 @@ impl Accumulator for ApproxPercentileAccumulator {
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
debug_assert_eq!(
values.len(),
1,
"invalid number of values in batch percentile update"
);
let values = &values[0];
let unsorted_values =
ApproxPercentileAccumulator::convert_to_ordered_float(values)?;
Expand Down