Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/enable_string_v…
Browse files Browse the repository at this point in the history
…iew_by_default2
  • Loading branch information
alamb committed Oct 30, 2024
2 parents 3be00ac + 63e8e6a commit ffc4e9d
Show file tree
Hide file tree
Showing 15 changed files with 327 additions and 21 deletions.
11 changes: 11 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,17 @@ config_namespace! {
/// Defaults to the number of CPU cores on the system
pub planning_concurrency: usize, default = num_cpus::get()

/// When set to true, skips verifying that the schema produced by
/// planning the input of `LogicalPlan::Aggregate` exactly matches the
/// schema of the input plan.
///
/// When set to false, if the schema does not match exactly
/// (including nullability and metadata), a planning error will be raised.
///
/// This is used to workaround bugs in the planner that are now caught by
/// the new schema verification step.
pub skip_physical_aggregate_schema_check: bool, default = false

/// Specifies the reserved memory for each spillable sort operation to
/// facilitate an in-memory merge.
///
Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,14 +649,16 @@ impl DefaultPhysicalPlanner {
aggr_expr,
..
}) => {
let options = session_state.config().options();
// Initially need to perform the aggregate and then merge the partitions
let input_exec = children.one()?;
let physical_input_schema = input_exec.schema();
let logical_input_schema = input.as_ref().schema();
let physical_input_schema_from_logical: Arc<Schema> =
logical_input_schema.as_ref().clone().into();
let physical_input_schema_from_logical = logical_input_schema.inner();

if physical_input_schema != physical_input_schema_from_logical {
if &physical_input_schema != physical_input_schema_from_logical
&& !options.execution.skip_physical_aggregate_schema_check
{
return internal_err!("Physical input schema should be the same as the one converted from logical input schema.");
}

Expand Down
5 changes: 1 addition & 4 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,7 @@ async fn test_count_wildcard_on_where_in() -> Result<()> {
.await?
.aggregate(vec![], vec![count(wildcard())])?
.select(vec![count(wildcard())])?
.into_unoptimized_plan(),
// Usually, into_optimized_plan() should be used here, but due to
// https://github.com/apache/datafusion/issues/5771,
// subqueries in SQL cannot be optimized, resulting in differences in logical_plan. Therefore, into_unoptimized_plan() is temporarily used here.
.into_optimized_plan()?,
),
))?
.select(vec![col("a"), col("b")])?
Expand Down
55 changes: 52 additions & 3 deletions datafusion/physical-expr/src/expressions/not.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::PhysicalExpr;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::{cast::as_boolean_array, Result, ScalarValue};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::ColumnarValue;

/// Not expression
Expand Down Expand Up @@ -100,6 +101,10 @@ impl PhysicalExpr for NotExpr {
Ok(Arc::new(NotExpr::new(Arc::clone(&children[0]))))
}

fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
children[0].not()
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.hash(&mut s);
Expand All @@ -125,10 +130,11 @@ mod tests {
use super::*;
use crate::expressions::col;
use arrow::{array::BooleanArray, datatypes::*};
use std::sync::OnceLock;

#[test]
fn neg_op() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
let schema = schema();

let expr = not(col("a", &schema)?)?;
assert_eq!(expr.data_type(&schema)?, DataType::Boolean);
Expand All @@ -137,8 +143,7 @@ mod tests {
let input = BooleanArray::from(vec![Some(true), None, Some(false)]);
let expected = &BooleanArray::from(vec![Some(false), None, Some(true)]);

let batch =
RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(input)])?;
let batch = RecordBatch::try_new(schema, vec![Arc::new(input)])?;

let result = expr
.evaluate(&batch)?
Expand All @@ -150,4 +155,48 @@ mod tests {

Ok(())
}

#[test]
fn test_evaluate_bounds() -> Result<()> {
// Note that `None` for boolean intervals is converted to `Some(false)`
// / `Some(true)` by `Interval::make`, so it is not explicitly tested
// here

// if the bounds are all booleans (false, true) so is the negation
assert_evaluate_bounds(
Interval::make(Some(false), Some(true))?,
Interval::make(Some(false), Some(true))?,
)?;
// (true, false) is not tested because it is not a valid interval (lower
// bound is greater than upper bound)
assert_evaluate_bounds(
Interval::make(Some(true), Some(true))?,
Interval::make(Some(false), Some(false))?,
)?;
assert_evaluate_bounds(
Interval::make(Some(false), Some(false))?,
Interval::make(Some(true), Some(true))?,
)?;
Ok(())
}

fn assert_evaluate_bounds(
interval: Interval,
expected_interval: Interval,
) -> Result<()> {
let not_expr = not(col("a", &schema())?)?;
assert_eq!(
not_expr.evaluate_bounds(&[&interval]).unwrap(),
expected_interval
);
Ok(())
}

fn schema() -> SchemaRef {
Arc::clone(SCHEMA.get_or_init(|| {
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, true)]))
}))
}

static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
}
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6080,7 +6080,7 @@ ORDER BY k;
statement ok
CREATE TABLE t1(v1 int);

# issue: https://github.com/apache/datafusion/issues/12814
# issue: https://github.com/apache/datafusion/issues/12814
statement error DataFusion error: Error during planning: Aggregate functions are not allowed in the WHERE clause. Consider using HAVING instead
SELECT v1 FROM t1 WHERE ((count(v1) % 1) << 1) > 0;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -804,4 +804,4 @@ query error DataFusion error: Schema error: No field named a\.
EXPLAIN CREATE TABLE t(a int) AS VALUES (a + a);

statement error DataFusion error: Schema error: No field named a\.
CREATE TABLE t(a int) AS SELECT x FROM (VALUES (a)) t(x) WHERE false;
CREATE TABLE t(a int) AS SELECT x FROM (VALUES (a)) t(x) WHERE false;
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ datafusion.execution.parquet.writer_version 1.0
datafusion.execution.planning_concurrency 13
datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8
datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000
datafusion.execution.skip_physical_aggregate_schema_check false
datafusion.execution.soft_max_rows_per_output_file 50000000
datafusion.execution.sort_in_place_threshold_bytes 1048576
datafusion.execution.sort_spill_reservation_bytes 10485760
Expand Down Expand Up @@ -302,6 +303,7 @@ datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer ve
datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system
datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input
datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode
datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step.
datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max
datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged.
datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured).
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/string/dictionary_utf8.slt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ from test_basic_operator;
Andrew datafusion📊🔥 true false true false
Xiangpeng datafusion数据融合 false true false true
Raphael datafusionДатаФусион false false false false
under_score un iść core false false false false
percent pan Tadeusz ma iść w kąt false false false false
NULL NULL NULL NULL NULL NULL

#
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/string/init_data.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ create table test_source as values
('Andrew', 'X', 'datafusion📊🔥', '🔥'),
('Xiangpeng', 'Xiangpeng', 'datafusion数据融合', 'datafusion数据融合'),
('Raphael', 'R', 'datafusionДатаФусион', 'аФус'),
('under_score', 'un_____core', 'un iść core', 'chrząszcz na łące w 東京都'),
('percent', 'p%t', 'pan Tadeusz ma iść w kąt', 'Pan Tadeusz ma frunąć stąd w kąt'),
(NULL, 'R', NULL, '🔥');

# --------------------------------------
Expand Down
4 changes: 4 additions & 0 deletions datafusion/sqllogictest/test_files/string/large_string.slt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ SELECT ascii_1, ascii_2, unicode_1, unicode_2 FROM test_basic_operator
Andrew X datafusion📊🔥 🔥
Xiangpeng Xiangpeng datafusion数据融合 datafusion数据融合
Raphael R datafusionДатаФусион аФус
under_score un_____core un iść core chrząszcz na łące w 東京都
percent p%t pan Tadeusz ma iść w kąt Pan Tadeusz ma frunąć stąd w kąt
NULL R NULL 🔥

# TODO: move it back to `string_query.slt.part` after fixing the issue
Expand All @@ -57,6 +59,8 @@ from test_basic_operator;
Andrew datafusion📊🔥 true false true false
Xiangpeng datafusion数据融合 false true false true
Raphael datafusionДатаФусион false false false false
under_score un iść core false false false false
percent pan Tadeusz ma iść w kąt false false false false
NULL NULL NULL NULL NULL NULL

#
Expand Down
102 changes: 102 additions & 0 deletions datafusion/sqllogictest/test_files/string/string.slt
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,115 @@ from test_basic_operator;
Andrew datafusion📊🔥 true false true false
Xiangpeng datafusion数据融合 false true false true
Raphael datafusionДатаФусион false false false false
under_score un iść core false false false false
percent pan Tadeusz ma iść w kąt false false false false
NULL NULL NULL NULL NULL NULL

#
# common test for string-like functions and operators
#
include ./string_query.slt.part

# TODO support all String types in sql_like_to_expr and move this test to `string_query.slt.part`
# dynamic LIKE as filter
query TTT rowsort
SELECT ascii_1, 'is LIKE', ascii_2 FROM test_basic_operator WHERE ascii_1 LIKE ascii_2
UNION ALL
SELECT ascii_1, 'is NOT LIKE', ascii_2 FROM test_basic_operator WHERE ascii_1 NOT LIKE ascii_2
UNION ALL
SELECT unicode_1, 'is LIKE', ascii_2 FROM test_basic_operator WHERE unicode_1 LIKE ascii_2
UNION ALL
SELECT unicode_1, 'is NOT LIKE', ascii_2 FROM test_basic_operator WHERE unicode_1 NOT LIKE ascii_2
UNION ALL
SELECT unicode_2, 'is LIKE', ascii_2 FROM test_basic_operator WHERE unicode_2 LIKE ascii_2
UNION ALL
SELECT unicode_2, 'is NOT LIKE', ascii_2 FROM test_basic_operator WHERE unicode_2 NOT LIKE ascii_2
----
Andrew is NOT LIKE X
Pan Tadeusz ma frunąć stąd w kąt is NOT LIKE p%t
Raphael is NOT LIKE R
Xiangpeng is LIKE Xiangpeng
chrząszcz na łące w 東京都 is NOT LIKE un_____core
datafusionДатаФусион is NOT LIKE R
datafusion数据融合 is NOT LIKE Xiangpeng
datafusion数据融合 is NOT LIKE Xiangpeng
datafusion📊🔥 is NOT LIKE X
pan Tadeusz ma iść w kąt is LIKE p%t
percent is LIKE p%t
un iść core is LIKE un_____core
under_score is LIKE un_____core
аФус is NOT LIKE R
🔥 is NOT LIKE R
🔥 is NOT LIKE X

# TODO support all String types in sql_like_to_expr and move this test to `string_query.slt.part`
# dynamic LIKE as projection
query TTTTBBBB rowsort
SELECT
ascii_1, ascii_2, unicode_1, unicode_2,
(ascii_1 LIKE ascii_2) AS ascii_1_like_ascii_2,
(ascii_2 LIKE ascii_1) AS ascii_2_like_ascii_1,
(unicode_1 LIKE ascii_2) AS unicode_1_like_ascii_2,
(unicode_2 LIKE ascii_2) AS unicode_2_like_ascii_2
FROM test_basic_operator
----
Andrew X datafusion📊🔥 🔥 false false false false
NULL R NULL 🔥 NULL NULL NULL false
Raphael R datafusionДатаФусион аФус false false false false
Xiangpeng Xiangpeng datafusion数据融合 datafusion数据融合 true true false false
percent p%t pan Tadeusz ma iść w kąt Pan Tadeusz ma frunąć stąd w kąt true false true false
under_score un_____core un iść core chrząszcz na łące w 東京都 true false true false

# TODO support all String types in sql_like_to_expr and move this test to `string_query.slt.part`
# dynamic ILIKE as filter
query TTT rowsort
SELECT ascii_1, 'is ILIKE', ascii_2 FROM test_basic_operator WHERE ascii_1 ILIKE ascii_2
UNION ALL
SELECT ascii_1, 'is NOT ILIKE', ascii_2 FROM test_basic_operator WHERE ascii_1 NOT ILIKE ascii_2
UNION ALL
SELECT unicode_1, 'is ILIKE', ascii_2 FROM test_basic_operator WHERE unicode_1 ILIKE ascii_2
UNION ALL
SELECT unicode_1, 'is NOT ILIKE', ascii_2 FROM test_basic_operator WHERE unicode_1 NOT ILIKE ascii_2
UNION ALL
SELECT unicode_2, 'is ILIKE', ascii_2 FROM test_basic_operator WHERE unicode_2 ILIKE ascii_2
UNION ALL
SELECT unicode_2, 'is NOT ILIKE', ascii_2 FROM test_basic_operator WHERE unicode_2 NOT ILIKE ascii_2
----
Andrew is NOT ILIKE X
Pan Tadeusz ma frunąć stąd w kąt is ILIKE p%t
Raphael is NOT ILIKE R
Xiangpeng is ILIKE Xiangpeng
chrząszcz na łące w 東京都 is NOT ILIKE un_____core
datafusionДатаФусион is NOT ILIKE R
datafusion数据融合 is NOT ILIKE Xiangpeng
datafusion数据融合 is NOT ILIKE Xiangpeng
datafusion📊🔥 is NOT ILIKE X
pan Tadeusz ma iść w kąt is ILIKE p%t
percent is ILIKE p%t
un iść core is ILIKE un_____core
under_score is ILIKE un_____core
аФус is NOT ILIKE R
🔥 is NOT ILIKE R
🔥 is NOT ILIKE X

# TODO support all String types in sql_like_to_expr and move this test to `string_query.slt.part`
# dynamic ILIKE as projection
query TTTTBBBB rowsort
SELECT
ascii_1, ascii_2, unicode_1, unicode_2,
(ascii_1 ILIKE ascii_2) AS ascii_1_ilike_ascii_2,
(ascii_2 ILIKE ascii_1) AS ascii_2_ilike_ascii_1,
(unicode_1 ILIKE ascii_2) AS unicode_1_ilike_ascii_2,
(unicode_2 ILIKE ascii_2) AS unicode_2_ilike_ascii_2
FROM test_basic_operator
----
Andrew X datafusion📊🔥 🔥 false false false false
NULL R NULL 🔥 NULL NULL NULL false
Raphael R datafusionДатаФусион аФус false false false false
Xiangpeng Xiangpeng datafusion数据融合 datafusion数据融合 true true false false
percent p%t pan Tadeusz ma iść w kąt Pan Tadeusz ma frunąć stąd w kąt true false true true
under_score un_____core un iść core chrząszcz na łące w 東京都 true false true false

#
# Clean up
#
Expand Down
Loading

0 comments on commit ffc4e9d

Please sign in to comment.