Skip to content

Commit

Permalink
remove duplicate the logic b/w DataFrame API and SQL planning
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangzhx committed Apr 14, 2023
1 parent ebb8390 commit 692bfac
Show file tree
Hide file tree
Showing 8 changed files with 418 additions and 61 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,9 +634,9 @@ impl ExprSchema for DFSchema {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DFField {
/// Optional qualifier (usually a table or relation name)
qualifier: Option<OwnedTableReference>,
pub qualifier: Option<OwnedTableReference>,
/// Arrow field definition
field: FieldRef,
pub field: FieldRef,
}

impl DFField {
Expand Down
242 changes: 219 additions & 23 deletions datafusion/core/tests/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,179 @@ use datafusion::error::Result;
use datafusion::execution::context::SessionContext;
use datafusion::prelude::JoinType;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
use datafusion::test_util::parquet_test_data;
use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_common::ScalarValue;
use datafusion_expr::expr::{GroupingSet, Sort};
use datafusion_expr::{avg, col, count, lit, max, sum, Expr, ExprSchemable};
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_expr::Expr::{ScalarSubquery, Wildcard};
use datafusion_expr::{
avg, col, count, expr, lit, max, sum, AggregateFunction, Expr, ExprSchemable,
Subquery, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunction,
};

#[tokio::test]
async fn count_wildcard() -> Result<()> {
let ctx = SessionContext::new();
let testdata = datafusion::test_util::parquet_test_data();
async fn test_count_wildcard_on_sort() -> Result<()> {
let ctx = create_join_context()?;

ctx.register_parquet(
"alltypes_tiny_pages",
&format!("{testdata}/alltypes_tiny_pages.parquet"),
ParquetReadOptions::default(),
)
.await?;
let sql_results = ctx
.sql("select b,count(*) from t1 group by b order by count(*)")
.await?
.explain(false, false)?
.collect()
.await?;

let df_results = ctx
.table("t1")
.await?
.aggregate(vec![col("b")], vec![count(Wildcard)])?
.sort(vec![count(Wildcard).sort(true, false)])?
.explain(false, false)?
.collect()
.await?;
//make sure sql plan same with df plan
assert_eq!(
pretty_format_batches(&sql_results)?.to_string(),
pretty_format_batches(&df_results)?.to_string()
);
Ok(())
}

#[tokio::test]
async fn test_count_wildcard_on_where_in() -> Result<()> {
let ctx = create_join_context()?;
let sql_results = ctx
.sql("SELECT a,b FROM t1 WHERE a in (SELECT count(*) FROM t2)")
.await?
.explain(false, false)?
.collect()
.await?;

// In the same SessionContext, AliasGenerator will increase subquery_alias id by 1
// https://github.com/apache/arrow-datafusion/blame/cf45eb9020092943b96653d70fafb143cc362e19/datafusion/optimizer/src/alias.rs#L40-L43
// for compare difference betwwen sql and df logical plan, we need to create a new SessionContext here
let ctx = create_join_context()?;
let df_results = ctx
.table("t1")
.await?
.filter(Expr::InSubquery {
expr: Box::new(col("a")),
subquery: Subquery {
subquery: Arc::new(
ctx.table("t2")
.await?
.aggregate(vec![], vec![count(Expr::Wildcard)])?
.select(vec![count(Expr::Wildcard)])?
.into_unoptimized_plan(),
// Usually, into_optimized_plan() should be used here, but due to
// https://github.com/apache/arrow-datafusion/issues/5771,
// subqueries in SQL cannot be optimized, resulting in differences in logical_plan. Therefore, into_unoptimized_plan() is temporarily used here.
),
outer_ref_columns: vec![],
},
negated: false,
})?
.select(vec![col("a"), col("b")])?
.explain(false, false)?
.collect()
.await?;

// make sure sql plan same with df plan
assert_eq!(
pretty_format_batches(&sql_results)?.to_string(),
pretty_format_batches(&df_results)?.to_string()
);

Ok(())
}

#[tokio::test]
async fn test_count_wildcard_on_where_exist() -> Result<()> {
let ctx = create_join_context()?;
let sql_results = ctx
.sql("SELECT a, b FROM t1 WHERE EXISTS (SELECT count(*) FROM t2)")
.await?
.explain(false, false)?
.collect()
.await?;
let df_results = ctx
.table("t1")
.await?
.filter(Expr::Exists {
subquery: Subquery {
subquery: Arc::new(
ctx.table("t2")
.await?
.aggregate(vec![], vec![count(Expr::Wildcard)])?
.select(vec![count(Expr::Wildcard)])?
.into_unoptimized_plan(),
// Usually, into_optimized_plan() should be used here, but due to
// https://github.com/apache/arrow-datafusion/issues/5771,
// subqueries in SQL cannot be optimized, resulting in differences in logical_plan. Therefore, into_unoptimized_plan() is temporarily used here.
),
outer_ref_columns: vec![],
},
negated: false,
})?
.select(vec![col("a"), col("b")])?
.explain(false, false)?
.collect()
.await?;

//make sure sql plan same with df plan
assert_eq!(
pretty_format_batches(&sql_results)?.to_string(),
pretty_format_batches(&df_results)?.to_string()
);

Ok(())
}

#[tokio::test]
async fn test_count_wildcard_on_window() -> Result<()> {
let ctx = create_join_context()?;

let sql_results = ctx
.sql("select COUNT(*) OVER(ORDER BY a DESC RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING) from t1")
.await?
.explain(false, false)?
.collect()
.await?;

let df_results = ctx
.table("t1")
.await?
.select(vec![Expr::WindowFunction(expr::WindowFunction::new(
WindowFunction::AggregateFunction(AggregateFunction::Count),
vec![Expr::Wildcard],
vec![],
vec![Expr::Sort(Sort::new(Box::new(col("a")), false, true))],
WindowFrame {
units: WindowFrameUnits::Range,
start_bound: WindowFrameBound::Preceding(ScalarValue::UInt32(Some(6))),
end_bound: WindowFrameBound::Following(ScalarValue::UInt32(Some(2))),
},
))])?
.explain(false, false)?
.collect()
.await?;

//make sure sql plan same with df plan
assert_eq!(
pretty_format_batches(&df_results)?.to_string(),
pretty_format_batches(&sql_results)?.to_string()
);

Ok(())
}

#[tokio::test]
async fn test_count_wildcard_on_aggregate() -> Result<()> {
let ctx = create_join_context()?;
register_alltypes_tiny_pages_parquet(&ctx).await?;

let sql_results = ctx
.sql("select count(*) from alltypes_tiny_pages")
.sql("select count(*) from t1")
.await?
.select(vec![count(Expr::Wildcard)])?
.explain(false, false)?
Expand All @@ -58,7 +213,7 @@ async fn count_wildcard() -> Result<()> {

// add `.select(vec![count(Expr::Wildcard)])?` to make sure we can analyze all node instead of just top node.
let df_results = ctx
.table("alltypes_tiny_pages")
.table("t1")
.await?
.aggregate(vec![], vec![count(Expr::Wildcard)])?
.select(vec![count(Expr::Wildcard)])?
Expand All @@ -72,24 +227,54 @@ async fn count_wildcard() -> Result<()> {
pretty_format_batches(&df_results)?.to_string()
);

let results = ctx
.table("alltypes_tiny_pages")
Ok(())
}
#[tokio::test]
async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
let ctx = create_join_context()?;

let sql_results = ctx
.sql("select a,b from t1 where (select count(*) from t2 where t1.a = t2.a)>0;")
.await?
.aggregate(vec![], vec![count(Expr::Wildcard)])?
.explain(false, false)?
.collect()
.await?;

let expected = vec![
"+-----------------+",
"| COUNT(UInt8(1)) |",
"+-----------------+",
"| 7300 |",
"+-----------------+",
];
assert_batches_sorted_eq!(expected, &results);
// In the same SessionContext, AliasGenerator will increase subquery_alias id by 1
// https://github.com/apache/arrow-datafusion/blame/cf45eb9020092943b96653d70fafb143cc362e19/datafusion/optimizer/src/alias.rs#L40-L43
// for compare difference betwwen sql and df logical plan, we need to create a new SessionContext here
let ctx = create_join_context()?;
let df_results = ctx
.table("t1")
.await?
.filter(
ScalarSubquery(datafusion_expr::Subquery {
subquery: Arc::new(
ctx.table("t2")
.await?
.filter(col("t1.a").eq(col("t2.a")))?
.aggregate(vec![], vec![count(lit(COUNT_STAR_EXPANSION))])?
.select(vec![count(lit(COUNT_STAR_EXPANSION))])?
.into_unoptimized_plan(),
),
outer_ref_columns: vec![],
})
.gt(lit(ScalarValue::UInt8(Some(0)))),
)?
.select(vec![col("t1.a"), col("t1.b")])?
.explain(false, false)?
.collect()
.await?;

//make sure sql plan same with df plan
assert_eq!(
pretty_format_batches(&sql_results)?.to_string(),
pretty_format_batches(&df_results)?.to_string()
);

Ok(())
}

#[tokio::test]
async fn describe() -> Result<()> {
let ctx = SessionContext::new();
Expand Down Expand Up @@ -1047,3 +1232,14 @@ async fn table_with_nested_types(n: usize) -> Result<DataFrame> {
ctx.register_batch("shapes", batch)?;
ctx.table("shapes").await
}

pub async fn register_alltypes_tiny_pages_parquet(ctx: &SessionContext) -> Result<()> {
let testdata = parquet_test_data();
ctx.register_parquet(
"alltypes_tiny_pages",
&format!("{testdata}/alltypes_tiny_pages.parquet"),
ParquetReadOptions::default(),
)
.await?;
Ok(())
}
9 changes: 0 additions & 9 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,15 +1132,6 @@ async fn plan_and_collect(ctx: &SessionContext, sql: &str) -> Result<Vec<RecordB
/// Execute query and return results as a Vec of RecordBatches
async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec<RecordBatch> {
let df = ctx.sql(sql).await.unwrap();

// We are not really interested in the direct output of optimized_logical_plan
// since the physical plan construction already optimizes the given logical plan
// and we want to avoid double-optimization as a consequence. So we just construct
// it here to make sure that it doesn't fail at this step and get the optimized
// schema (to assert later that the logical and optimized schemas are the same).
let optimized = df.clone().into_optimized_plan().unwrap();
assert_eq!(df.logical_plan().schema(), optimized.schema());

df.collect().await.unwrap()
}

Expand Down
Loading

0 comments on commit 692bfac

Please sign in to comment.