diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 4b8a9c5b7d79..2ae4a7c21a9c 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1356,15 +1356,30 @@ mod tests { use arrow::array::{self, Int32Array}; use arrow::datatypes::DataType; - use datafusion_common::{Constraint, Constraints, ScalarValue}; + use datafusion_common::{Constraint, Constraints}; use datafusion_expr::{ avg, cast, count, count_distinct, create_udf, expr, lit, max, min, sum, - BinaryExpr, BuiltInWindowFunction, Operator, ScalarFunctionImplementation, - Volatility, WindowFrame, WindowFunction, + BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFrame, + WindowFunction, }; use datafusion_physical_expr::expressions::Column; use datafusion_physical_plan::get_plan_string; + // Get string representation of the plan + async fn assert_physical_plan(df: &DataFrame, expected: Vec<&str>) { + let physical_plan = df + .clone() + .create_physical_plan() + .await + .expect("Error creating physical plan"); + + let actual = get_plan_string(&physical_plan); + assert_eq!( + expected, actual, + "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + } + pub fn table_with_constraints() -> Arc { let dual_schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), @@ -1587,47 +1602,36 @@ mod tests { let config = SessionConfig::new().with_target_partitions(1); let ctx = SessionContext::new_with_config(config); - let table1 = table_with_constraints(); - let df = ctx.read_table(table1)?; - let col_id = Expr::Column(datafusion_common::Column { - relation: None, - name: "id".to_string(), - }); - let col_name = Expr::Column(datafusion_common::Column { - relation: None, - name: "name".to_string(), - }); + let df = ctx.read_table(table_with_constraints())?; - // group by contains id column - let group_expr = vec![col_id.clone()]; + // GROUP BY id + let group_expr = vec![col("id")]; let aggr_expr = vec![]; let df = df.aggregate(group_expr, aggr_expr)?; - // expr list contains id, name - let expr_list = vec![col_id, col_name]; - let df = df.select(expr_list)?; - let physical_plan = df.clone().create_physical_plan().await?; - let expected = vec![ - "AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], aggr=[]", - " MemoryExec: partitions=1, partition_sizes=[1]", - ]; - // Get string representation of the plan - let actual = get_plan_string(&physical_plan); - assert_eq!( - expected, actual, - "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - // Since id and name are functionally dependant, we can use name among expression - // even if it is not part of the group by expression. - let df_results = collect(physical_plan, ctx.task_ctx()).await?; + // Since id and name are functionally dependant, we can use name among + // expression even if it is not part of the group by expression and can + // select "name" column even though it wasn't explicitly grouped + let df = df.select(vec![col("id"), col("name")])?; + assert_physical_plan( + &df, + vec![ + "AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], aggr=[]", + " MemoryExec: partitions=1, partition_sizes=[1]", + ], + ) + .await; + + let df_results = df.collect().await?; #[rustfmt::skip] - assert_batches_sorted_eq!( - ["+----+------+", + assert_batches_sorted_eq!([ + "+----+------+", "| id | name |", "+----+------+", "| 1 | a |", - "+----+------+",], + "+----+------+" + ], &df_results ); @@ -1640,57 +1644,31 @@ mod tests { let config = SessionConfig::new().with_target_partitions(1); let ctx = SessionContext::new_with_config(config); - let table1 = table_with_constraints(); - let df = ctx.read_table(table1)?; - let col_id = Expr::Column(datafusion_common::Column { - relation: None, - name: "id".to_string(), - }); - let col_name = Expr::Column(datafusion_common::Column { - relation: None, - name: "name".to_string(), - }); + let df = ctx.read_table(table_with_constraints())?; - // group by contains id column - let group_expr = vec![col_id.clone()]; + // GROUP BY id + let group_expr = vec![col("id")]; let aggr_expr = vec![]; let df = df.aggregate(group_expr, aggr_expr)?; - let condition1 = Expr::BinaryExpr(BinaryExpr::new( - Box::new(col_id.clone()), - Operator::Eq, - Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))), - )); - let condition2 = Expr::BinaryExpr(BinaryExpr::new( - Box::new(col_name), - Operator::Eq, - Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))), - )); - // Predicate refers to id, and name fields - let predicate = Expr::BinaryExpr(BinaryExpr::new( - Box::new(condition1), - Operator::And, - Box::new(condition2), - )); + // Predicate refers to id, and name fields: + // id = 1 AND name = 'a' + let predicate = col("id").eq(lit(1i32)).and(col("name").eq(lit("a"))); let df = df.filter(predicate)?; - let physical_plan = df.clone().create_physical_plan().await?; - - let expected = vec![ + assert_physical_plan( + &df, + vec![ "CoalesceBatchesExec: target_batch_size=8192", " FilterExec: id@0 = 1 AND name@1 = a", " AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], aggr=[]", " MemoryExec: partitions=1, partition_sizes=[1]", - ]; - // Get string representation of the plan - let actual = get_plan_string(&physical_plan); - assert_eq!( - expected, actual, - "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); + ], + ) + .await; // Since id and name are functionally dependant, we can use name among expression // even if it is not part of the group by expression. - let df_results = collect(physical_plan, ctx.task_ctx()).await?; + let df_results = df.collect().await?; #[rustfmt::skip] assert_batches_sorted_eq!( @@ -1711,53 +1689,35 @@ mod tests { let config = SessionConfig::new().with_target_partitions(1); let ctx = SessionContext::new_with_config(config); - let table1 = table_with_constraints(); - let df = ctx.read_table(table1)?; - let col_id = Expr::Column(datafusion_common::Column { - relation: None, - name: "id".to_string(), - }); - let col_name = Expr::Column(datafusion_common::Column { - relation: None, - name: "name".to_string(), - }); + let df = ctx.read_table(table_with_constraints())?; - // group by contains id column - let group_expr = vec![col_id.clone()]; + // GROUP BY id + let group_expr = vec![col("id")]; let aggr_expr = vec![]; // group by id, let df = df.aggregate(group_expr, aggr_expr)?; - let condition1 = Expr::BinaryExpr(BinaryExpr::new( - Box::new(col_id.clone()), - Operator::Eq, - Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))), - )); // Predicate refers to id field - let predicate = condition1; - // id=0 + // id = 1 + let predicate = col("id").eq(lit(1i32)); let df = df.filter(predicate)?; // Select expression refers to id, and name columns. // id, name - let df = df.select(vec![col_id.clone(), col_name.clone()])?; - let physical_plan = df.clone().create_physical_plan().await?; - - let expected = vec![ + let df = df.select(vec![col("id"), col("name")])?; + assert_physical_plan( + &df, + vec![ "CoalesceBatchesExec: target_batch_size=8192", " FilterExec: id@0 = 1", " AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], aggr=[]", " MemoryExec: partitions=1, partition_sizes=[1]", - ]; - // Get string representation of the plan - let actual = get_plan_string(&physical_plan); - assert_eq!( - expected, actual, - "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); + ], + ) + .await; // Since id and name are functionally dependant, we can use name among expression // even if it is not part of the group by expression. - let df_results = collect(physical_plan, ctx.task_ctx()).await?; + let df_results = df.collect().await?; #[rustfmt::skip] assert_batches_sorted_eq!( @@ -1778,51 +1738,35 @@ mod tests { let config = SessionConfig::new().with_target_partitions(1); let ctx = SessionContext::new_with_config(config); - let table1 = table_with_constraints(); - let df = ctx.read_table(table1)?; - let col_id = Expr::Column(datafusion_common::Column { - relation: None, - name: "id".to_string(), - }); + let df = ctx.read_table(table_with_constraints())?; - // group by contains id column - let group_expr = vec![col_id.clone()]; + // GROUP BY id + let group_expr = vec![col("id")]; let aggr_expr = vec![]; - // group by id, let df = df.aggregate(group_expr, aggr_expr)?; - let condition1 = Expr::BinaryExpr(BinaryExpr::new( - Box::new(col_id.clone()), - Operator::Eq, - Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))), - )); // Predicate refers to id field - let predicate = condition1; - // id=1 + // id = 1 + let predicate = col("id").eq(lit(1i32)); let df = df.filter(predicate)?; // Select expression refers to id column. // id - let df = df.select(vec![col_id.clone()])?; - let physical_plan = df.clone().create_physical_plan().await?; + let df = df.select(vec![col("id")])?; // In this case aggregate shouldn't be expanded, since these // columns are not used. - let expected = vec![ - "CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: id@0 = 1", - " AggregateExec: mode=Single, gby=[id@0 as id], aggr=[]", - " MemoryExec: partitions=1, partition_sizes=[1]", - ]; - // Get string representation of the plan - let actual = get_plan_string(&physical_plan); - assert_eq!( - expected, actual, - "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); + assert_physical_plan( + &df, + vec![ + "CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: id@0 = 1", + " AggregateExec: mode=Single, gby=[id@0 as id], aggr=[]", + " MemoryExec: partitions=1, partition_sizes=[1]", + ], + ) + .await; - // Since id and name are functionally dependant, we can use name among expression - // even if it is not part of the group by expression. - let df_results = collect(physical_plan, ctx.task_ctx()).await?; + let df_results = df.collect().await?; #[rustfmt::skip] assert_batches_sorted_eq!(