From 21ebaf2143132dd54d9034986cc471d139448fc6 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 21 Oct 2024 14:05:28 +0200 Subject: [PATCH 1/3] Fix function name typo --- datafusion/physical-plan/src/execution_plan.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index a89e265ad2f8..30144d4223c2 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -852,7 +852,7 @@ pub fn execute_input_stream( Ok(Box::pin(RecordBatchStreamAdapter::new( sink_schema, input_stream - .map(move |batch| check_not_null_contraits(batch?, &risky_columns)), + .map(move |batch| check_not_null_constraints(batch?, &risky_columns)), ))) } } @@ -872,7 +872,7 @@ pub fn execute_input_stream( /// This function iterates over the specified column indices and ensures that none /// of the columns contain null values. If any column contains null values, an error /// is returned. -pub fn check_not_null_contraits( +pub fn check_not_null_constraints( batch: RecordBatch, column_indices: &Vec, ) -> Result { From 726c295d2aa625121979fadfae91ee2fc84ddd81 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 21 Oct 2024 14:31:39 +0200 Subject: [PATCH 2/3] Fix check_not_null_constraints null detection `check_not_null_constraints` (aka `check_not_null_contraits`) checked for null using `Array::null_count` which does not return real null count. --- Cargo.toml | 1 + datafusion/physical-plan/Cargo.toml | 1 + .../physical-plan/src/execution_plan.rs | 134 +++++++++++++++++- 3 files changed, 131 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 63bfb7fce413..ca13deceb3df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,6 +86,7 @@ arrow-ipc = { version = "53.1.0", default-features = false, features = [ arrow-ord = { version = "53.1.0", default-features = false } arrow-schema = { version = "53.1.0", default-features = false } arrow-string = { version = "53.1.0", default-features = false } +assertor = "0.0.3" async-trait = "0.1.73" bigdecimal = "=0.4.1" bytes = "1.4" diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 7fcd719539ec..a30c93fbc08c 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -68,6 +68,7 @@ rand = { workspace = true } tokio = { workspace = true } [dev-dependencies] +assertor = { workspace = true } datafusion-functions-aggregate = { workspace = true } rstest = { workspace = true } rstest_reuse = "0.7.0" diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 30144d4223c2..7a6430d1b280 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; +use arrow_array::Array; use futures::stream::{StreamExt, TryStreamExt}; use tokio::task::JoinSet; @@ -885,7 +886,13 @@ pub fn check_not_null_constraints( ); } - if batch.column(index).null_count() > 0 { + if batch + .column(index) + .logical_nulls() + .map(|nulls| nulls.null_count()) + .unwrap_or_default() + > 0 + { return exec_err!( "Invalid batch column at '{}' has null but schema specifies non-nullable", index @@ -920,11 +927,13 @@ pub enum CardinalityEffect { #[cfg(test)] mod tests { use super::*; + use arrow_array::{DictionaryArray, Int32Array, NullArray, RunArray}; + use arrow_schema::{DataType, Field, Schema, SchemaRef}; + use assertor::assert_that; + use assertor::StringAssertion; use std::any::Any; use std::sync::Arc; - use arrow_schema::{Schema, SchemaRef}; - use datafusion_common::{Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; @@ -1068,6 +1077,121 @@ mod tests { fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) { let _ = plan.name(); } -} -// pub mod test; + #[test] + fn test_check_not_null_constraints_accept_non_null() -> Result<()> { + check_not_null_constraints( + RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])), + vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))], + )?, + &vec![0], + )?; + Ok(()) + } + + #[test] + fn test_check_not_null_constraints_reject_null() -> Result<()> { + let result = check_not_null_constraints( + RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])), + vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))], + )?, + &vec![0], + ); + assert!(result.is_err()); + assert_that!(result.err().unwrap().message().as_ref()).starts_with( + "Invalid batch column at '0' has null but schema specifies non-nullable", + ); + Ok(()) + } + + #[test] + fn test_check_not_null_constraints_with_run_end_array() -> Result<()> { + // some null value inside REE array + let run_ends = Int32Array::from(vec![1, 2, 3, 4]); + let values = Int32Array::from(vec![Some(0), None, Some(1), None]); + let run_end_array = RunArray::try_new(&run_ends, &values)?; + let result = check_not_null_constraints( + RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new( + "a", + run_end_array.data_type().to_owned(), + true, + )])), + vec![Arc::new(run_end_array)], + )?, + &vec![0], + ); + assert!(result.is_err()); + assert_that!(result.err().unwrap().message().as_ref()).starts_with( + "Invalid batch column at '0' has null but schema specifies non-nullable", + ); + Ok(()) + } + + #[test] + fn test_check_not_null_constraints_with_dictionary_array_with_null() -> Result<()> { + let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)])); + let keys = Int32Array::from(vec![0, 1, 2, 3]); + let dictionary = DictionaryArray::new(keys, values); + let result = check_not_null_constraints( + RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new( + "a", + dictionary.data_type().to_owned(), + true, + )])), + vec![Arc::new(dictionary)], + )?, + &vec![0], + ); + assert!(result.is_err()); + assert_that!(result.err().unwrap().message().as_ref()).starts_with( + "Invalid batch column at '0' has null but schema specifies non-nullable", + ); + Ok(()) + } + + #[test] + fn test_check_not_null_constraints_with_dictionary_masking_null() -> Result<()> { + // some null value marked out by dictionary array + let values = Arc::new(Int32Array::from(vec![ + Some(1), + None, // this null value is masked by dictionary keys + Some(3), + Some(4), + ])); + let keys = Int32Array::from(vec![0, /*1,*/ 2, 3]); + let dictionary = DictionaryArray::new(keys, values); + check_not_null_constraints( + RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new( + "a", + dictionary.data_type().to_owned(), + true, + )])), + vec![Arc::new(dictionary)], + )?, + &vec![0], + )?; + Ok(()) + } + + #[test] + fn test_check_not_null_constraints_on_null_type() -> Result<()> { + // null value of Null type + let result = check_not_null_constraints( + RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("a", DataType::Null, true)])), + vec![Arc::new(NullArray::new(3))], + )?, + &vec![0], + ); + assert!(result.is_err()); + assert_that!(result.err().unwrap().message().as_ref()).starts_with( + "Invalid batch column at '0' has null but schema specifies non-nullable", + ); + Ok(()) + } +} From 31d6aecd550bf129a29c3609661461894b169e4b Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 22 Oct 2024 08:47:24 +0200 Subject: [PATCH 3/3] Drop assertor dependency --- Cargo.toml | 1 - datafusion/physical-plan/Cargo.toml | 1 - .../physical-plan/src/execution_plan.rs | 25 ++++++++++++++----- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ca13deceb3df..63bfb7fce413 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,7 +86,6 @@ arrow-ipc = { version = "53.1.0", default-features = false, features = [ arrow-ord = { version = "53.1.0", default-features = false } arrow-schema = { version = "53.1.0", default-features = false } arrow-string = { version = "53.1.0", default-features = false } -assertor = "0.0.3" async-trait = "0.1.73" bigdecimal = "=0.4.1" bytes = "1.4" diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index a30c93fbc08c..7fcd719539ec 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -68,7 +68,6 @@ rand = { workspace = true } tokio = { workspace = true } [dev-dependencies] -assertor = { workspace = true } datafusion-functions-aggregate = { workspace = true } rstest = { workspace = true } rstest_reuse = "0.7.0" diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 7a6430d1b280..e6484452d43e 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -929,8 +929,6 @@ mod tests { use super::*; use arrow_array::{DictionaryArray, Int32Array, NullArray, RunArray}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; - use assertor::assert_that; - use assertor::StringAssertion; use std::any::Any; use std::sync::Arc; @@ -1100,7 +1098,8 @@ mod tests { &vec![0], ); assert!(result.is_err()); - assert_that!(result.err().unwrap().message().as_ref()).starts_with( + assert_starts_with( + result.err().unwrap().message().as_ref(), "Invalid batch column at '0' has null but schema specifies non-nullable", ); Ok(()) @@ -1124,7 +1123,8 @@ mod tests { &vec![0], ); assert!(result.is_err()); - assert_that!(result.err().unwrap().message().as_ref()).starts_with( + assert_starts_with( + result.err().unwrap().message().as_ref(), "Invalid batch column at '0' has null but schema specifies non-nullable", ); Ok(()) @@ -1147,7 +1147,8 @@ mod tests { &vec![0], ); assert!(result.is_err()); - assert_that!(result.err().unwrap().message().as_ref()).starts_with( + assert_starts_with( + result.err().unwrap().message().as_ref(), "Invalid batch column at '0' has null but schema specifies non-nullable", ); Ok(()) @@ -1189,9 +1190,21 @@ mod tests { &vec![0], ); assert!(result.is_err()); - assert_that!(result.err().unwrap().message().as_ref()).starts_with( + assert_starts_with( + result.err().unwrap().message().as_ref(), "Invalid batch column at '0' has null but schema specifies non-nullable", ); Ok(()) } + + fn assert_starts_with(actual: impl AsRef, expected_prefix: impl AsRef) { + let actual = actual.as_ref(); + let expected_prefix = expected_prefix.as_ref(); + assert!( + actual.starts_with(expected_prefix), + "Expected '{}' to start with '{}'", + actual, + expected_prefix + ); + } }