Skip to content

Commit

Permalink
Improve DataFrame functional tests (#8630)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Dec 23, 2023
1 parent 7443f30 commit 69e5382
Showing 1 changed file with 82 additions and 138 deletions.
220 changes: 82 additions & 138 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1356,15 +1356,30 @@ mod tests {

use arrow::array::{self, Int32Array};
use arrow::datatypes::DataType;
use datafusion_common::{Constraint, Constraints, ScalarValue};
use datafusion_common::{Constraint, Constraints};
use datafusion_expr::{
avg, cast, count, count_distinct, create_udf, expr, lit, max, min, sum,
BinaryExpr, BuiltInWindowFunction, Operator, ScalarFunctionImplementation,
Volatility, WindowFrame, WindowFunction,
BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFrame,
WindowFunction,
};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::get_plan_string;

// Get string representation of the plan
async fn assert_physical_plan(df: &DataFrame, expected: Vec<&str>) {
let physical_plan = df
.clone()
.create_physical_plan()
.await
.expect("Error creating physical plan");

let actual = get_plan_string(&physical_plan);
assert_eq!(
expected, actual,
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
}

pub fn table_with_constraints() -> Arc<dyn TableProvider> {
let dual_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Expand Down Expand Up @@ -1587,47 +1602,36 @@ mod tests {
let config = SessionConfig::new().with_target_partitions(1);
let ctx = SessionContext::new_with_config(config);

let table1 = table_with_constraints();
let df = ctx.read_table(table1)?;
let col_id = Expr::Column(datafusion_common::Column {
relation: None,
name: "id".to_string(),
});
let col_name = Expr::Column(datafusion_common::Column {
relation: None,
name: "name".to_string(),
});
let df = ctx.read_table(table_with_constraints())?;

// group by contains id column
let group_expr = vec![col_id.clone()];
// GROUP BY id
let group_expr = vec![col("id")];
let aggr_expr = vec![];
let df = df.aggregate(group_expr, aggr_expr)?;

// expr list contains id, name
let expr_list = vec![col_id, col_name];
let df = df.select(expr_list)?;
let physical_plan = df.clone().create_physical_plan().await?;
let expected = vec![
"AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], aggr=[]",
" MemoryExec: partitions=1, partition_sizes=[1]",
];
// Get string representation of the plan
let actual = get_plan_string(&physical_plan);
assert_eq!(
expected, actual,
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
// Since id and name are functionally dependant, we can use name among expression
// even if it is not part of the group by expression.
let df_results = collect(physical_plan, ctx.task_ctx()).await?;
// Since id and name are functionally dependant, we can use name among
// expression even if it is not part of the group by expression and can
// select "name" column even though it wasn't explicitly grouped
let df = df.select(vec![col("id"), col("name")])?;
assert_physical_plan(
&df,
vec![
"AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], aggr=[]",
" MemoryExec: partitions=1, partition_sizes=[1]",
],
)
.await;

let df_results = df.collect().await?;

#[rustfmt::skip]
assert_batches_sorted_eq!(
["+----+------+",
assert_batches_sorted_eq!([
"+----+------+",
"| id | name |",
"+----+------+",
"| 1 | a |",
"+----+------+",],
"+----+------+"
],
&df_results
);

Expand All @@ -1640,57 +1644,31 @@ mod tests {
let config = SessionConfig::new().with_target_partitions(1);
let ctx = SessionContext::new_with_config(config);

let table1 = table_with_constraints();
let df = ctx.read_table(table1)?;
let col_id = Expr::Column(datafusion_common::Column {
relation: None,
name: "id".to_string(),
});
let col_name = Expr::Column(datafusion_common::Column {
relation: None,
name: "name".to_string(),
});
let df = ctx.read_table(table_with_constraints())?;

// group by contains id column
let group_expr = vec![col_id.clone()];
// GROUP BY id
let group_expr = vec![col("id")];
let aggr_expr = vec![];
let df = df.aggregate(group_expr, aggr_expr)?;

let condition1 = Expr::BinaryExpr(BinaryExpr::new(
Box::new(col_id.clone()),
Operator::Eq,
Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
));
let condition2 = Expr::BinaryExpr(BinaryExpr::new(
Box::new(col_name),
Operator::Eq,
Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))),
));
// Predicate refers to id, and name fields
let predicate = Expr::BinaryExpr(BinaryExpr::new(
Box::new(condition1),
Operator::And,
Box::new(condition2),
));
// Predicate refers to id, and name fields:
// id = 1 AND name = 'a'
let predicate = col("id").eq(lit(1i32)).and(col("name").eq(lit("a")));
let df = df.filter(predicate)?;
let physical_plan = df.clone().create_physical_plan().await?;

let expected = vec![
assert_physical_plan(
&df,
vec![
"CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: id@0 = 1 AND name@1 = a",
" AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], aggr=[]",
" MemoryExec: partitions=1, partition_sizes=[1]",
];
// Get string representation of the plan
let actual = get_plan_string(&physical_plan);
assert_eq!(
expected, actual,
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
],
)
.await;

// Since id and name are functionally dependant, we can use name among expression
// even if it is not part of the group by expression.
let df_results = collect(physical_plan, ctx.task_ctx()).await?;
let df_results = df.collect().await?;

#[rustfmt::skip]
assert_batches_sorted_eq!(
Expand All @@ -1711,53 +1689,35 @@ mod tests {
let config = SessionConfig::new().with_target_partitions(1);
let ctx = SessionContext::new_with_config(config);

let table1 = table_with_constraints();
let df = ctx.read_table(table1)?;
let col_id = Expr::Column(datafusion_common::Column {
relation: None,
name: "id".to_string(),
});
let col_name = Expr::Column(datafusion_common::Column {
relation: None,
name: "name".to_string(),
});
let df = ctx.read_table(table_with_constraints())?;

// group by contains id column
let group_expr = vec![col_id.clone()];
// GROUP BY id
let group_expr = vec![col("id")];
let aggr_expr = vec![];
// group by id,
let df = df.aggregate(group_expr, aggr_expr)?;

let condition1 = Expr::BinaryExpr(BinaryExpr::new(
Box::new(col_id.clone()),
Operator::Eq,
Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
));
// Predicate refers to id field
let predicate = condition1;
// id=0
// id = 1
let predicate = col("id").eq(lit(1i32));
let df = df.filter(predicate)?;
// Select expression refers to id, and name columns.
// id, name
let df = df.select(vec![col_id.clone(), col_name.clone()])?;
let physical_plan = df.clone().create_physical_plan().await?;

let expected = vec![
let df = df.select(vec![col("id"), col("name")])?;
assert_physical_plan(
&df,
vec![
"CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: id@0 = 1",
" AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], aggr=[]",
" MemoryExec: partitions=1, partition_sizes=[1]",
];
// Get string representation of the plan
let actual = get_plan_string(&physical_plan);
assert_eq!(
expected, actual,
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
],
)
.await;

// Since id and name are functionally dependant, we can use name among expression
// even if it is not part of the group by expression.
let df_results = collect(physical_plan, ctx.task_ctx()).await?;
let df_results = df.collect().await?;

#[rustfmt::skip]
assert_batches_sorted_eq!(
Expand All @@ -1778,51 +1738,35 @@ mod tests {
let config = SessionConfig::new().with_target_partitions(1);
let ctx = SessionContext::new_with_config(config);

let table1 = table_with_constraints();
let df = ctx.read_table(table1)?;
let col_id = Expr::Column(datafusion_common::Column {
relation: None,
name: "id".to_string(),
});
let df = ctx.read_table(table_with_constraints())?;

// group by contains id column
let group_expr = vec![col_id.clone()];
// GROUP BY id
let group_expr = vec![col("id")];
let aggr_expr = vec![];
// group by id,
let df = df.aggregate(group_expr, aggr_expr)?;

let condition1 = Expr::BinaryExpr(BinaryExpr::new(
Box::new(col_id.clone()),
Operator::Eq,
Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
));
// Predicate refers to id field
let predicate = condition1;
// id=1
// id = 1
let predicate = col("id").eq(lit(1i32));
let df = df.filter(predicate)?;
// Select expression refers to id column.
// id
let df = df.select(vec![col_id.clone()])?;
let physical_plan = df.clone().create_physical_plan().await?;
let df = df.select(vec![col("id")])?;

// In this case aggregate shouldn't be expanded, since these
// columns are not used.
let expected = vec![
"CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: id@0 = 1",
" AggregateExec: mode=Single, gby=[id@0 as id], aggr=[]",
" MemoryExec: partitions=1, partition_sizes=[1]",
];
// Get string representation of the plan
let actual = get_plan_string(&physical_plan);
assert_eq!(
expected, actual,
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
assert_physical_plan(
&df,
vec![
"CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: id@0 = 1",
" AggregateExec: mode=Single, gby=[id@0 as id], aggr=[]",
" MemoryExec: partitions=1, partition_sizes=[1]",
],
)
.await;

// Since id and name are functionally dependant, we can use name among expression
// even if it is not part of the group by expression.
let df_results = collect(physical_plan, ctx.task_ctx()).await?;
let df_results = df.collect().await?;

#[rustfmt::skip]
assert_batches_sorted_eq!(
Expand Down

0 comments on commit 69e5382

Please sign in to comment.