Skip to content

Commit

Permalink
Test + workaround for SanityCheck plan
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored and wiedld committed Jan 27, 2025
1 parent a25f24f commit 42498e4
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 49 deletions.
10 changes: 10 additions & 0 deletions datafusion/physical-optimizer/src/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supp
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::union::UnionExec;

use crate::PhysicalOptimizerRule;
use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list;
Expand Down Expand Up @@ -135,6 +137,14 @@ pub fn check_plan_sanity(
plan.required_input_ordering(),
plan.required_input_distribution(),
) {
// TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492
if child.as_any().downcast_ref::<UnionExec>().is_some() {
continue;
}
if child.as_any().downcast_ref::<SortExec>().is_some() {
continue;
}

let child_eq_props = child.equivalence_properties();
if let Some(sort_req) = sort_req {
if !child_eq_props.ordering_satisfy_requirement(&sort_req) {
Expand Down
76 changes: 27 additions & 49 deletions datafusion/sqllogictest/test_files/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,9 @@ physical_plan
# Clean up after the test
########

statement ok
drop table t

statement ok
drop table t1;

Expand Down Expand Up @@ -778,61 +781,36 @@ select make_array(make_array(1)) x UNION ALL SELECT make_array(arrow_cast(make_a
[[-1]]
[[1]]

###
# Test for https://github.com/apache/datafusion/issues/11492
###

# Input data is
# a,b,c
# 1,2,3

statement ok
CREATE EXTERNAL TABLE aggregate_test_100 (
c1 VARCHAR NOT NULL,
c2 TINYINT NOT NULL,
c3 SMALLINT NOT NULL,
c4 SMALLINT,
c5 INT,
c6 BIGINT NOT NULL,
c7 SMALLINT NOT NULL,
c8 INT NOT NULL,
c9 BIGINT UNSIGNED NOT NULL,
c10 VARCHAR NOT NULL,
c11 FLOAT NOT NULL,
c12 DOUBLE NOT NULL,
c13 VARCHAR NOT NULL
CREATE EXTERNAL TABLE t (
a INT,
b INT,
c INT
)
STORED AS CSV
LOCATION '../../testing/data/csv/aggregate_test_100.csv'
LOCATION '../core/tests/data/example.csv'
WITH ORDER (a ASC)
OPTIONS ('format.has_header' 'true');

statement ok
set datafusion.execution.batch_size = 2;
query T
SELECT (SELECT a from t ORDER BY a) UNION ALL (SELECT 'bar' as a from t) ORDER BY a;
----
1
bar

# Constant value tracking across union
query TT
explain
SELECT * FROM(
(
SELECT * FROM aggregate_test_100 WHERE c1='a'
)
UNION ALL
(
SELECT * FROM aggregate_test_100 WHERE c1='a'
))
ORDER BY c1
query I
SELECT (SELECT a from t ORDER BY a) UNION ALL (SELECT NULL as a from t) ORDER BY a;
----
logical_plan
01)Sort: aggregate_test_100.c1 ASC NULLS LAST
02)--Union
03)----Filter: aggregate_test_100.c1 = Utf8("a")
04)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")]
05)----Filter: aggregate_test_100.c1 = Utf8("a")
06)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")]
physical_plan
01)CoalescePartitionsExec
02)--UnionExec
03)----CoalesceBatchesExec: target_batch_size=2
04)------FilterExec: c1@0 = a
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true
07)----CoalesceBatchesExec: target_batch_size=2
08)------FilterExec: c1@0 = a
09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
10)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true
1
NULL

# Clean up after the test
statement ok
drop table aggregate_test_100;
drop table t

0 comments on commit 42498e4

Please sign in to comment.