Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better Equivalence (ordering and exact equivalence) Propagation through ProjectionExec #8484

Merged
merged 50 commits into from
Dec 11, 2023
Merged
Changes from 1 commit
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f990947
Discover ordering of complex expressions in group by and window parti…
mustafasrepo Nov 6, 2023
ce982f0
Remove unnecessary tests
mustafasrepo Nov 6, 2023
8dc081f
Update comments
mustafasrepo Nov 6, 2023
8560c87
Minor changes
mustafasrepo Nov 6, 2023
c31ec8c
Better projection support complex expression support
mustafasrepo Nov 6, 2023
49b1504
Fix failing test
mustafasrepo Nov 10, 2023
4ce3b90
Simplifications
mustafasrepo Nov 10, 2023
78206ba
Simplifications
mustafasrepo Nov 10, 2023
429096a
Add is end flag
mustafasrepo Nov 10, 2023
a4a3d7e
Simplifications
mustafasrepo Nov 14, 2023
f6bc4d9
Simplifications
mustafasrepo Nov 14, 2023
20ade77
Simplifications
mustafasrepo Nov 14, 2023
2b7a43c
Merge branch 'apache_main' into complex_exprs_ordering_support
mustafasrepo Nov 15, 2023
fffde03
Merge branch 'apache_main' into complex_exprs_ordering_support
mustafasrepo Nov 16, 2023
66d05d9
Minor changes
mustafasrepo Nov 16, 2023
b78b9d7
Minor changes
mustafasrepo Nov 16, 2023
20b5f08
Minor changes
mustafasrepo Nov 16, 2023
3e1a2b9
All tests pass
mustafasrepo Nov 16, 2023
9be89e2
Change implementation of find_longest_permutation
mustafasrepo Nov 16, 2023
20e96d0
Minor changes
mustafasrepo Nov 16, 2023
2400a9b
Minor changes
mustafasrepo Nov 16, 2023
0cd8427
Merge branch 'apache_main' into complex_exprs_ordering_support
mustafasrepo Nov 17, 2023
7a1e030
Complex exprs requirement support (#215)
mustafasrepo Nov 17, 2023
63c1ee5
Merge branch 'complex_exprs_requirement' into complex_exprs_projection
mustafasrepo Nov 17, 2023
d39f5b2
Minor changes
mustafasrepo Nov 17, 2023
1120488
Remove unused methods
mustafasrepo Nov 17, 2023
d2a2667
Minor changes
mustafasrepo Nov 17, 2023
7604c13
Add random projection test
mustafasrepo Nov 17, 2023
792c8e9
Minor changes
mustafasrepo Nov 17, 2023
b35d343
Minor changes
mustafasrepo Nov 17, 2023
ad3b2be
Merge branch 'complex_exprs_requirement' into complex_exprs_projection
mustafasrepo Nov 17, 2023
464c0a0
Simplifications
mustafasrepo Nov 17, 2023
cee9018
Add comments
mustafasrepo Nov 17, 2023
4659904
Simplifications
mustafasrepo Nov 17, 2023
9f32fb7
Minor changes
mustafasrepo Nov 17, 2023
0f5e7ec
Simplifications
mustafasrepo Nov 17, 2023
6e2f7ba
Add new orderings
mustafasrepo Nov 17, 2023
3a4bb76
Minor changes
mustafasrepo Nov 17, 2023
39ff152
Add comments
mustafasrepo Nov 17, 2023
2b8dffc
Add new tests
mustafasrepo Nov 20, 2023
9afd4b5
Change project ordering implementation
mustafasrepo Nov 20, 2023
e429e89
Change ordering projection implementation
mustafasrepo Nov 21, 2023
a48a953
Merge branch 'apache_main' into complex_exprs_projection
mustafasrepo Nov 23, 2023
f38a9c4
Remove leftover code
mustafasrepo Nov 23, 2023
5e86fe6
Add new test
mustafasrepo Nov 23, 2023
66856ae
Minor changes
mustafasrepo Nov 23, 2023
c2a617e
Review Part 1
metesynnada Nov 24, 2023
68586e8
simplifications
mustafasrepo Nov 24, 2023
1508964
Review
ozankabak Dec 8, 2023
96f7d3a
Update join suffix implementation
mustafasrepo Dec 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove unnecessary tests
mustafasrepo committed Nov 6, 2023
commit ce982f0ae3cfa4e18f63d74fabcab9ff42bce757
313 changes: 0 additions & 313 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
@@ -2278,316 +2278,3 @@ mod tests {
Ok(())
}
}

// #[cfg(test)]
// mod tmp_tests {
// use crate::assert_batches_eq;
// use crate::physical_optimizer::utils::get_plan_string;
// use crate::physical_plan::{collect, displayable, ExecutionPlan};
// use crate::prelude::SessionContext;
// use arrow::util::pretty::print_batches;
// use datafusion_common::Result;
// use datafusion_execution::config::SessionConfig;
// use std::sync::Arc;
//
// fn print_plan(plan: &Arc<dyn ExecutionPlan>) -> Result<()> {
// let formatted = displayable(plan.as_ref()).indent(true).to_string();
// let actual: Vec<&str> = formatted.trim().lines().collect();
// println!("{:#?}", actual);
// Ok(())
// }
//
// #[tokio::test]
// async fn test_query() -> Result<()> {
// let config = SessionConfig::new().with_target_partitions(2);
// let ctx = SessionContext::new_with_config(config);
//
// ctx.sql(
// "CREATE unbounded EXTERNAL TABLE csv_with_timestamps (
// name VARCHAR,
// ts TIMESTAMP
// )
// STORED AS CSV
// WITH ORDER (ts DESC)
// LOCATION '../core/tests/data/timestamps.csv'",
// )
// .await?;
//
// let sql = "SELECT date_bin('15 minutes', ts)
// FROM csv_with_timestamps
// GROUP BY (date_bin('15 minutes', ts))
// LIMIT 5";
//
// let msg = format!("Creating logical plan for '{sql}'");
// let dataframe = ctx.sql(sql).await.expect(&msg);
// let physical_plan = dataframe.create_physical_plan().await?;
// print_plan(&physical_plan)?;
// let batches = collect(physical_plan.clone(), ctx.task_ctx()).await?;
// print_batches(&batches)?;
//
// let expected = vec![
// "GlobalLimitExec: skip=0, fetch=5",
// " CoalescePartitionsExec",
// " AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8(\"15 minutes\"),csv_with_timestamps.ts)@0 as date_bin(Utf8(\"15 minutes\"),csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted",
// " CoalesceBatchesExec: target_batch_size=8192",
// " SortPreservingRepartitionExec: partitioning=Hash([date_bin(Utf8(\"15 minutes\"),csv_with_timestamps.ts)@0], 2), input_partitions=2, sort_exprs=date_bin(Utf8(\"15 minutes\"),csv_with_timestamps.ts)@0 DESC",
// " AggregateExec: mode=Partial, gby=[date_bin(900000000000, ts@0) as date_bin(Utf8(\"15 minutes\"),csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted",
// " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
// " CsvExec: file_groups={1 group: [[Users/akurmustafa/projects/synnada/arrow-datafusion-synnada/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], infinite_source=true, output_ordering=[ts@0 DESC], has_header=false",
// ];
// // Get string representation of the plan
// let actual = get_plan_string(&physical_plan);
// assert_eq!(
// expected, actual,
// "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
// );
//
// let expected = [
// "+------------+",
// "| nth_value1 |",
// "+------------+",
// "| 2 |",
// "| 2 |",
// "| 2 |",
// "| 2 |",
// "| 2 |",
// "+------------+",
// ];
// assert_batches_eq!(expected, &batches);
// Ok(())
// }
//
// #[tokio::test]
// async fn test_query3() -> Result<()> {
// let config = SessionConfig::new().with_target_partitions(1);
// let ctx = SessionContext::new_with_config(config);
//
// ctx.sql(
// "CREATE EXTERNAL TABLE csv_with_timestamps (
// name VARCHAR,
// ts TIMESTAMP
// )
// STORED AS CSV
// WITH ORDER (ts DESC)
// LOCATION '../core/tests/data/timestamps.csv'",
// )
// .await?;
//
// let sql = " SELECT datebin FROM(
// SELECT date_bin('15 minutes', ts) as datebin
// FROM csv_with_timestamps)
// GROUP BY datebin;
// ";
//
// let msg = format!("Creating logical plan for '{sql}'");
// let dataframe = ctx.sql(sql).await.expect(&msg);
// let physical_plan = dataframe.create_physical_plan().await?;
// print_plan(&physical_plan)?;
// let batches = collect(physical_plan.clone(), ctx.task_ctx()).await?;
// print_batches(&batches)?;
//
// let expected = vec![
// "ProjectionExec: expr=[NTH_VALUE(annotated_data_finite2.d,Int64(2)) ORDER BY [annotated_data_finite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as nth_value1]",
// " GlobalLimitExec: skip=0, fetch=5",
// " BoundedWindowAggExec: wdw=[NTH_VALUE(annotated_data_finite2.d,Int64(2)) ORDER BY [annotated_data_finite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"NTH_VALUE(annotated_data_finite2.d,Int64(2)) ORDER BY [annotated_data_finite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted]",
// " CsvExec: file_groups={1 group: [[Users/akurmustafa/projects/synnada/arrow-datafusion-synnada/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
// ];
// // Get string representation of the plan
// let actual = get_plan_string(&physical_plan);
// assert_eq!(
// expected, actual,
// "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
// );
//
// let expected = [
// "+------------+",
// "| nth_value1 |",
// "+------------+",
// "| 2 |",
// "| 2 |",
// "| 2 |",
// "| 2 |",
// "| 2 |",
// "+------------+",
// ];
// assert_batches_eq!(expected, &batches);
// Ok(())
// }
//
// #[tokio::test]
// async fn test_query2() -> Result<()> {
// let config = SessionConfig::new().with_target_partitions(1);
// let ctx = SessionContext::new_with_config(config);
//
// ctx.sql(
// "CREATE EXTERNAL TABLE csv_with_timestamps (
// name VARCHAR,
// ts TIMESTAMP
// )
// STORED AS CSV
// WITH ORDER (ts DESC)
// LOCATION '../core/tests/data/timestamps.csv'",
// )
// .await?;
//
// let sql = "SELECT ts + INTERVAL '15 minutes'
// FROM csv_with_timestamps
// GROUP BY (ts + INTERVAL '15 minutes')
// LIMIT 5";
//
// let msg = format!("Creating logical plan for '{sql}'");
// let dataframe = ctx.sql(sql).await.expect(&msg);
// let physical_plan = dataframe.create_physical_plan().await?;
// print_plan(&physical_plan)?;
// let batches = collect(physical_plan.clone(), ctx.task_ctx()).await?;
// print_batches(&batches)?;
//
// let expected = vec![
// "ProjectionExec: expr=[NTH_VALUE(annotated_data_finite2.d,Int64(2)) ORDER BY [annotated_data_finite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as nth_value1]",
// " GlobalLimitExec: skip=0, fetch=5",
// " BoundedWindowAggExec: wdw=[NTH_VALUE(annotated_data_finite2.d,Int64(2)) ORDER BY [annotated_data_finite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"NTH_VALUE(annotated_data_finite2.d,Int64(2)) ORDER BY [annotated_data_finite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted]",
// " CsvExec: file_groups={1 group: [[Users/akurmustafa/projects/synnada/arrow-datafusion-synnada/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
// ];
// // Get string representation of the plan
// let actual = get_plan_string(&physical_plan);
// assert_eq!(
// expected, actual,
// "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
// );
//
// let expected = [
// "+------------+",
// "| nth_value1 |",
// "+------------+",
// "| 2 |",
// "| 2 |",
// "| 2 |",
// "| 2 |",
// "| 2 |",
// "+------------+",
// ];
// assert_batches_eq!(expected, &batches);
// Ok(())
// }
//
// #[tokio::test]
// async fn test_query4() -> Result<()> {
// let config = SessionConfig::new().with_target_partitions(1);
// let ctx = SessionContext::new_with_config(config);
//
// ctx.sql("CREATE TABLE tab0(col0 INTEGER, col1 INTEGER, col2 INTEGER)")
// .await?;
//
// let sql = "SELECT l.col0, LAST_VALUE(r.col1 ORDER BY r.col0) as last_col1
// FROM tab0 as l
// JOIN tab0 as r
// ON l.col0 = r.col0
// GROUP BY l.col0, l.col1, l.col2
// ORDER BY l.col0";
//
// let msg = format!("Creating logical plan for '{sql}'");
// let dataframe = ctx.sql(sql).await.expect(&msg);
// let physical_plan = dataframe.create_physical_plan().await?;
// print_plan(&physical_plan)?;
// let batches = collect(physical_plan.clone(), ctx.task_ctx()).await?;
// print_batches(&batches)?;
//
// let expected = vec![
// "ProjectionExec: expr=[NTH_VALUE(annotated_data_finite2.d,Int64(2)) ORDER BY [annotated_data_finite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as nth_value1]",
// " GlobalLimitExec: skip=0, fetch=5",
// " BoundedWindowAggExec: wdw=[NTH_VALUE(annotated_data_finite2.d,Int64(2)) ORDER BY [annotated_data_finite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"NTH_VALUE(annotated_data_finite2.d,Int64(2)) ORDER BY [annotated_data_finite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted]",
// " CsvExec: file_groups={1 group: [[Users/akurmustafa/projects/synnada/arrow-datafusion-synnada/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
// ];
// // Get string representation of the plan
// let actual = get_plan_string(&physical_plan);
// assert_eq!(
// expected, actual,
// "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
// );
//
// let expected = [
// "+------------+",
// "| nth_value1 |",
// "+------------+",
// "| 2 |",
// "| 2 |",
// "| 2 |",
// "| 2 |",
// "| 2 |",
// "+------------+",
// ];
// assert_batches_eq!(expected, &batches);
// Ok(())
// }
//
// #[tokio::test]
// async fn test_query5() -> Result<()> {
// let config = SessionConfig::new().with_target_partitions(1);
// let ctx = SessionContext::new_with_config(config);
//
// ctx.sql(
// "CREATE EXTERNAL TABLE aggregate_test_100 (
// c1 VARCHAR NOT NULL,
// c2 TINYINT NOT NULL,
// c3 SMALLINT NOT NULL,
// c4 SMALLINT,
// c5 INT,
// c6 BIGINT NOT NULL,
// c7 SMALLINT NOT NULL,
// c8 INT NOT NULL,
// c9 BIGINT UNSIGNED NOT NULL,
// c10 VARCHAR NOT NULL,
// c11 FLOAT NOT NULL,
// c12 DOUBLE NOT NULL,
// c13 VARCHAR NOT NULL
// )
// STORED AS CSV
// WITH HEADER ROW
// LOCATION '../../testing/data/csv/aggregate_test_100.csv'",
// )
// .await?;
//
// let sql = "SELECT c3,
// SUM(c9) OVER(ORDER BY c3+c4 DESC, c9 DESC, c2 ASC) as sum1,
// SUM(c9) OVER(ORDER BY c3+c4 ASC, c9 ASC ) as sum2
// FROM aggregate_test_100
// LIMIT 5";
//
// let msg = format!("Creating logical plan for '{sql}'");
// let dataframe = ctx.sql(sql).await.expect(&msg);
// let physical_plan = dataframe.create_physical_plan().await?;
// print_plan(&physical_plan)?;
// let batches = collect(physical_plan.clone(), ctx.task_ctx()).await?;
// print_batches(&batches)?;
//
// let expected = vec![
// "ProjectionExec: expr=[c3@0 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]",
// " GlobalLimitExec: skip=0, fetch=5",
// " WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int16(NULL)) }]",
// " ProjectionExec: expr=[c3@1 as c3, c4@2 as c4, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]",
// " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }], mode=[Sorted]",
// " SortExec: expr=[c3@1 + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]",
// " CsvExec: file_groups={1 group: [[Users/akurmustafa/projects/synnada/arrow-datafusion-synnada/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c4, c9], has_header=true",
// ];
// // Get string representation of the plan
// let actual = get_plan_string(&physical_plan);
// assert_eq!(
// expected, actual,
// "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
// );
//
// let expected = [
// "+------------+",
// "| nth_value1 |",
// "+------------+",
// "| 2 |",
// "| 2 |",
// "| 2 |",
// "| 2 |",
// "| 2 |",
// "+------------+",
// ];
// assert_batches_eq!(expected, &batches);
// Ok(())
// }
// }