From 0c08160135f428b25befe3b221616af8683a7d2d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 16 Jun 2022 12:03:50 -0600 Subject: [PATCH 1/7] add validation to RecordBatch for non-nullable fields containing null values --- arrow/src/array/array_binary.rs | 2 +- arrow/src/csv/reader.rs | 2 +- arrow/src/json/reader.rs | 9 ++++---- arrow/src/json/writer.rs | 38 ++++++++++++++++----------------- arrow/src/record_batch.rs | 19 +++++++++++++++++ 5 files changed, 44 insertions(+), 26 deletions(-) diff --git a/arrow/src/array/array_binary.rs b/arrow/src/array/array_binary.rs index 481ea92d66c3..3efb25888be2 100644 --- a/arrow/src/array/array_binary.rs +++ b/arrow/src/array/array_binary.rs @@ -1806,7 +1806,7 @@ mod tests { )] fn fixed_size_binary_array_all_null_in_batch_with_schema() { let schema = - Schema::new(vec![Field::new("a", DataType::FixedSizeBinary(2), false)]); + Schema::new(vec![Field::new("a", DataType::FixedSizeBinary(2), true)]); let none_option: Option<[u8; 2]> = None; let item = FixedSizeBinaryArray::try_from_sparse_iter( diff --git a/arrow/src/csv/reader.rs b/arrow/src/csv/reader.rs index 21e107ee4c8e..639c0b42afea 100644 --- a/arrow/src/csv/reader.rs +++ b/arrow/src/csv/reader.rs @@ -1439,7 +1439,7 @@ mod tests { fn test_nulls() { let schema = Schema::new(vec![ Field::new("c_int", DataType::UInt64, false), - Field::new("c_float", DataType::Float32, false), + Field::new("c_float", DataType::Float32, true), Field::new("c_string", DataType::Utf8, false), ]); diff --git a/arrow/src/json/reader.rs b/arrow/src/json/reader.rs index e1fa54f8a644..f6aaddcd66e4 100644 --- a/arrow/src/json/reader.rs +++ b/arrow/src/json/reader.rs @@ -1869,7 +1869,7 @@ mod tests { #[test] fn test_json_basic_schema() { let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, false), + Field::new("a", DataType::Int32, true), Field::new("b", DataType::Float32, false), Field::new("c", DataType::Boolean, false), Field::new("d", DataType::Utf8, false), @@ -1917,8 +1917,7 @@ mod tests { #[test] fn test_json_format_strings_for_date() { - let schema = - Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, false)])); + let schema = Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, true)])); let e = schema.column_with_name("e").unwrap(); assert_eq!(&DataType::Date32, e.1.data_type()); let mut fmts = HashMap::new(); @@ -1952,7 +1951,7 @@ mod tests { // Implicit: omitting fields from a schema // Explicit: supplying a vec of fields to take let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, false), + Field::new("a", DataType::Int32, true), Field::new("b", DataType::Float32, false), Field::new("c", DataType::Boolean, false), ]); @@ -1964,7 +1963,7 @@ mod tests { ); let reader_schema = reader.schema(); let expected_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), + Field::new("a", DataType::Int32, true), Field::new("c", DataType::Boolean, false), ])); assert_eq!(reader_schema, expected_schema); diff --git a/arrow/src/json/writer.rs b/arrow/src/json/writer.rs index 078382f57dd0..72a4d62525a1 100644 --- a/arrow/src/json/writer.rs +++ b/arrow/src/json/writer.rs @@ -879,11 +879,11 @@ mod tests { let arr_names = StringArray::from(vec![Some("a"), Some("b")]); let schema = Schema::new(vec![ - Field::new("nanos", arr_nanos.data_type().clone(), false), - Field::new("micros", arr_micros.data_type().clone(), false), - Field::new("millis", arr_millis.data_type().clone(), false), - Field::new("secs", arr_secs.data_type().clone(), false), - Field::new("name", arr_names.data_type().clone(), false), + Field::new("nanos", arr_nanos.data_type().clone(), true), + Field::new("micros", arr_micros.data_type().clone(), true), + Field::new("millis", arr_millis.data_type().clone(), true), + Field::new("secs", arr_secs.data_type().clone(), true), + Field::new("name", arr_names.data_type().clone(), true), ]); let schema = Arc::new(schema); @@ -929,8 +929,8 @@ mod tests { let arr_names = StringArray::from(vec![Some("a"), Some("b")]); let schema = Schema::new(vec![ - Field::new("date32", arr_date32.data_type().clone(), false), - Field::new("date64", arr_date64.data_type().clone(), false), + Field::new("date32", arr_date32.data_type().clone(), true), + Field::new("date64", arr_date64.data_type().clone(), true), Field::new("name", arr_names.data_type().clone(), false), ]); let schema = Arc::new(schema); @@ -968,11 +968,11 @@ mod tests { let arr_names = StringArray::from(vec![Some("a"), Some("b")]); let schema = Schema::new(vec![ - Field::new("time32sec", arr_time32sec.data_type().clone(), false), - Field::new("time32msec", arr_time32msec.data_type().clone(), false), - Field::new("time64usec", arr_time64usec.data_type().clone(), false), - Field::new("time64nsec", arr_time64nsec.data_type().clone(), false), - Field::new("name", arr_names.data_type().clone(), false), + Field::new("time32sec", arr_time32sec.data_type().clone(), true), + Field::new("time32msec", arr_time32msec.data_type().clone(), true), + Field::new("time64usec", arr_time64usec.data_type().clone(), true), + Field::new("time64nsec", arr_time64nsec.data_type().clone(), true), + Field::new("name", arr_names.data_type().clone(), true), ]); let schema = Arc::new(schema); @@ -1011,11 +1011,11 @@ mod tests { let arr_names = StringArray::from(vec![Some("a"), Some("b")]); let schema = Schema::new(vec![ - Field::new("duration_sec", arr_durationsec.data_type().clone(), false), - Field::new("duration_msec", arr_durationmsec.data_type().clone(), false), - Field::new("duration_usec", arr_durationusec.data_type().clone(), false), - Field::new("duration_nsec", arr_durationnsec.data_type().clone(), false), - Field::new("name", arr_names.data_type().clone(), false), + Field::new("duration_sec", arr_durationsec.data_type().clone(), true), + Field::new("duration_msec", arr_durationmsec.data_type().clone(), true), + Field::new("duration_usec", arr_durationusec.data_type().clone(), true), + Field::new("duration_nsec", arr_durationnsec.data_type().clone(), true), + Field::new("name", arr_names.data_type().clone(), true), ]); let schema = Arc::new(schema); @@ -1159,7 +1159,7 @@ mod tests { DataType::List(Box::new(list_inner_type.clone())), false, ); - let field_c2 = Field::new("c2", DataType::Utf8, false); + let field_c2 = Field::new("c2", DataType::Utf8, true); let schema = Schema::new(vec![field_c1.clone(), field_c2]); // list column rows: [[1, 2], [3]], [], [[4, 5, 6]] @@ -1444,7 +1444,7 @@ mod tests { let map = MapArray::from(map_data); - let map_field = Field::new("map", map_data_type, false); + let map_field = Field::new("map", map_data_type, true); let schema = Arc::new(Schema::new(vec![map_field])); let batch = RecordBatch::try_new(schema, vec![Arc::new(map)]).unwrap(); diff --git a/arrow/src/record_batch.rs b/arrow/src/record_batch.rs index ae8fae58f1af..3f3ec1b7c68f 100644 --- a/arrow/src/record_batch.rs +++ b/arrow/src/record_batch.rs @@ -138,6 +138,15 @@ impl RecordBatch { ) })?; + for (c, f) in columns.iter().zip(&schema.fields) { + if !f.is_nullable() && c.null_count() > 0 { + return Err(ArrowError::InvalidArgumentError(format!( + "Column '{}' is declared as non-nullable but contains null values", + f.name() + ))); + } + } + if columns.iter().any(|c| c.len() != row_count) { let err = match options.row_count { Some(_) => { @@ -979,4 +988,14 @@ mod tests { assert_ne!(a, b); assert_eq!(b, RecordBatch::new_empty(schema)) } + + #[test] + fn test_nulls_in_non_nullable_field() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let maybe_batch = RecordBatch::try_new( + schema, + vec![Arc::new(Int32Array::from(vec![Some(1), None]))], + ); + assert_eq!("Invalid argument error: Column 'a' is declared as non-nullable but contains null values", format!("{}", maybe_batch.err().unwrap())); + } } From 4055c49aa09fb90fd1b448268e247ae1b978a88c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 16 Jun 2022 12:25:43 -0600 Subject: [PATCH 2/7] Fix some IPC tests --- arrow/src/ipc/writer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index 120eb7ab9b7e..9551c4f1741c 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -962,7 +962,7 @@ mod tests { #[test] fn test_write_file() { - let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]); + let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]); let values: Vec> = vec![ Some(999), None, @@ -1011,7 +1011,7 @@ mod tests { let schema = Schema::new(vec![ Field::new("nulls", DataType::Null, true), Field::new("int32s", DataType::Int32, false), - Field::new("nulls2", DataType::Null, false), + Field::new("nulls2", DataType::Null, true), Field::new("f64s", DataType::Float64, false), ]); let array1 = NullArray::new(32); From d64f888446da742a4ebac3c52de993e557760a08 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 16 Jun 2022 12:35:44 -0600 Subject: [PATCH 3/7] add workaround for dictionary issue --- arrow/src/json/reader.rs | 2 +- arrow/src/record_batch.rs | 13 +++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/arrow/src/json/reader.rs b/arrow/src/json/reader.rs index f6aaddcd66e4..3ac2566e7849 100644 --- a/arrow/src/json/reader.rs +++ b/arrow/src/json/reader.rs @@ -33,7 +33,7 @@ //! let schema = Schema::new(vec![ //! Field::new("a", DataType::Float64, false), //! Field::new("b", DataType::Float64, false), -//! Field::new("c", DataType::Float64, false), +//! Field::new("c", DataType::Float64, true), //! ]); //! //! let file = File::open("test/data/basic.json").unwrap(); diff --git a/arrow/src/record_batch.rs b/arrow/src/record_batch.rs index 3f3ec1b7c68f..dc145b2df6c3 100644 --- a/arrow/src/record_batch.rs +++ b/arrow/src/record_batch.rs @@ -140,10 +140,15 @@ impl RecordBatch { for (c, f) in columns.iter().zip(&schema.fields) { if !f.is_nullable() && c.null_count() > 0 { - return Err(ArrowError::InvalidArgumentError(format!( - "Column '{}' is declared as non-nullable but contains null values", - f.name() - ))); + if f.name().len() == 0 { + // hacky workaround for known issue with dictionary IPC encoding + // https://github.com/apache/arrow-rs/issues/1892 + } else { + return Err(ArrowError::InvalidArgumentError(format!( + "Column '{}' is declared as non-nullable but contains null values", + f.name() + ))); + } } } From c91f3551a4ef6c8282510a3f2c2314849c3953fa Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 16 Jun 2022 13:47:59 -0600 Subject: [PATCH 4/7] address feedback --- arrow/src/record_batch.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/arrow/src/record_batch.rs b/arrow/src/record_batch.rs index dc145b2df6c3..6c039f9b05f1 100644 --- a/arrow/src/record_batch.rs +++ b/arrow/src/record_batch.rs @@ -140,10 +140,9 @@ impl RecordBatch { for (c, f) in columns.iter().zip(&schema.fields) { if !f.is_nullable() && c.null_count() > 0 { - if f.name().len() == 0 { - // hacky workaround for known issue with dictionary IPC encoding - // https://github.com/apache/arrow-rs/issues/1892 - } else { + // hacky workaround for known issue with dictionary IPC encoding + // https://github.com/apache/arrow-rs/issues/1892 + if !f.name().is_empty() { return Err(ArrowError::InvalidArgumentError(format!( "Column '{}' is declared as non-nullable but contains null values", f.name() From 6495a7635ab0b27c5fd69ed33a473a094d8ba6b7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 18 Jun 2022 07:19:37 -0600 Subject: [PATCH 5/7] upmerge --- arrow-flight/src/arrow.flight.protocol.rs | 42 +++++++++++------------ 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs index c76469b39ce7..f56dd0dd8869 100644 --- a/arrow-flight/src/arrow.flight.protocol.rs +++ b/arrow-flight/src/arrow.flight.protocol.rs @@ -279,9 +279,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoStreamingRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -308,9 +308,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -389,9 +389,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -418,9 +418,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoStreamingRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -446,9 +446,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoStreamingRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -475,9 +475,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -501,9 +501,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await From cada5e4ff25cfd176ba2db8096a9cc1424f50e18 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 18 Jun 2022 07:22:02 -0600 Subject: [PATCH 6/7] remove workaround for dictionary types --- arrow/src/record_batch.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/arrow/src/record_batch.rs b/arrow/src/record_batch.rs index 6c039f9b05f1..3f3ec1b7c68f 100644 --- a/arrow/src/record_batch.rs +++ b/arrow/src/record_batch.rs @@ -140,14 +140,10 @@ impl RecordBatch { for (c, f) in columns.iter().zip(&schema.fields) { if !f.is_nullable() && c.null_count() > 0 { - // hacky workaround for known issue with dictionary IPC encoding - // https://github.com/apache/arrow-rs/issues/1892 - if !f.name().is_empty() { - return Err(ArrowError::InvalidArgumentError(format!( - "Column '{}' is declared as non-nullable but contains null values", - f.name() - ))); - } + return Err(ArrowError::InvalidArgumentError(format!( + "Column '{}' is declared as non-nullable but contains null values", + f.name() + ))); } } From c1c4c94aed8a700f3eff7d04a01ebd031ea4ff8c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 18 Jun 2022 07:23:54 -0600 Subject: [PATCH 7/7] revert unrelated cargo fmt changes --- arrow-flight/src/arrow.flight.protocol.rs | 42 +++++++++++------------ 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs index f56dd0dd8869..c76469b39ce7 100644 --- a/arrow-flight/src/arrow.flight.protocol.rs +++ b/arrow-flight/src/arrow.flight.protocol.rs @@ -279,9 +279,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoStreamingRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -308,9 +308,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -389,9 +389,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -418,9 +418,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoStreamingRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -446,9 +446,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoStreamingRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -475,9 +475,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await @@ -501,9 +501,9 @@ pub mod flight_service_client { &mut self, request: impl tonic::IntoRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await