Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add validation to RecordBatch for non-nullable fields containing null values #1890

Merged
merged 8 commits into from
Jun 18, 2022
Merged
2 changes: 1 addition & 1 deletion arrow/src/array/array_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,7 @@ mod tests {
)]
fn fixed_size_binary_array_all_null_in_batch_with_schema() {
let schema =
Schema::new(vec![Field::new("a", DataType::FixedSizeBinary(2), false)]);
Schema::new(vec![Field::new("a", DataType::FixedSizeBinary(2), true)]);

let none_option: Option<[u8; 2]> = None;
let item = FixedSizeBinaryArray::try_from_sparse_iter(
Expand Down
2 changes: 1 addition & 1 deletion arrow/src/csv/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,7 @@ mod tests {
fn test_nulls() {
let schema = Schema::new(vec![
Field::new("c_int", DataType::UInt64, false),
Field::new("c_float", DataType::Float32, false),
Field::new("c_float", DataType::Float32, true),
Field::new("c_string", DataType::Utf8, false),
]);

Expand Down
4 changes: 2 additions & 2 deletions arrow/src/ipc/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ mod tests {

#[test]
fn test_write_file() {
let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
let values: Vec<Option<u32>> = vec![
Some(999),
None,
Expand Down Expand Up @@ -1011,7 +1011,7 @@ mod tests {
let schema = Schema::new(vec![
Field::new("nulls", DataType::Null, true),
Field::new("int32s", DataType::Int32, false),
Field::new("nulls2", DataType::Null, false),
Field::new("nulls2", DataType::Null, true),
Field::new("f64s", DataType::Float64, false),
]);
let array1 = NullArray::new(32);
Expand Down
11 changes: 5 additions & 6 deletions arrow/src/json/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
//! let schema = Schema::new(vec![
//! Field::new("a", DataType::Float64, false),
//! Field::new("b", DataType::Float64, false),
//! Field::new("c", DataType::Float64, false),
//! Field::new("c", DataType::Float64, true),
//! ]);
//!
//! let file = File::open("test/data/basic.json").unwrap();
Expand Down Expand Up @@ -1869,7 +1869,7 @@ mod tests {
#[test]
fn test_json_basic_schema() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Float32, false),
Field::new("c", DataType::Boolean, false),
Field::new("d", DataType::Utf8, false),
Expand Down Expand Up @@ -1917,8 +1917,7 @@ mod tests {

#[test]
fn test_json_format_strings_for_date() {
let schema =
Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, false)]));
let schema = Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, true)]));
let e = schema.column_with_name("e").unwrap();
assert_eq!(&DataType::Date32, e.1.data_type());
let mut fmts = HashMap::new();
Expand Down Expand Up @@ -1952,7 +1951,7 @@ mod tests {
// Implicit: omitting fields from a schema
// Explicit: supplying a vec of fields to take
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Float32, false),
Field::new("c", DataType::Boolean, false),
]);
Expand All @@ -1964,7 +1963,7 @@ mod tests {
);
let reader_schema = reader.schema();
let expected_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("a", DataType::Int32, true),
Field::new("c", DataType::Boolean, false),
]));
assert_eq!(reader_schema, expected_schema);
Expand Down
38 changes: 19 additions & 19 deletions arrow/src/json/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -879,11 +879,11 @@ mod tests {
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);

let schema = Schema::new(vec![
Field::new("nanos", arr_nanos.data_type().clone(), false),
Field::new("micros", arr_micros.data_type().clone(), false),
Field::new("millis", arr_millis.data_type().clone(), false),
Field::new("secs", arr_secs.data_type().clone(), false),
Field::new("name", arr_names.data_type().clone(), false),
Field::new("nanos", arr_nanos.data_type().clone(), true),
Field::new("micros", arr_micros.data_type().clone(), true),
Field::new("millis", arr_millis.data_type().clone(), true),
Field::new("secs", arr_secs.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);

Expand Down Expand Up @@ -929,8 +929,8 @@ mod tests {
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);

let schema = Schema::new(vec![
Field::new("date32", arr_date32.data_type().clone(), false),
Field::new("date64", arr_date64.data_type().clone(), false),
Field::new("date32", arr_date32.data_type().clone(), true),
Field::new("date64", arr_date64.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), false),
]);
let schema = Arc::new(schema);
Expand Down Expand Up @@ -968,11 +968,11 @@ mod tests {
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);

let schema = Schema::new(vec![
Field::new("time32sec", arr_time32sec.data_type().clone(), false),
Field::new("time32msec", arr_time32msec.data_type().clone(), false),
Field::new("time64usec", arr_time64usec.data_type().clone(), false),
Field::new("time64nsec", arr_time64nsec.data_type().clone(), false),
Field::new("name", arr_names.data_type().clone(), false),
Field::new("time32sec", arr_time32sec.data_type().clone(), true),
Field::new("time32msec", arr_time32msec.data_type().clone(), true),
Field::new("time64usec", arr_time64usec.data_type().clone(), true),
Field::new("time64nsec", arr_time64nsec.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);

Expand Down Expand Up @@ -1011,11 +1011,11 @@ mod tests {
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);

let schema = Schema::new(vec![
Field::new("duration_sec", arr_durationsec.data_type().clone(), false),
Field::new("duration_msec", arr_durationmsec.data_type().clone(), false),
Field::new("duration_usec", arr_durationusec.data_type().clone(), false),
Field::new("duration_nsec", arr_durationnsec.data_type().clone(), false),
Field::new("name", arr_names.data_type().clone(), false),
Field::new("duration_sec", arr_durationsec.data_type().clone(), true),
Field::new("duration_msec", arr_durationmsec.data_type().clone(), true),
Field::new("duration_usec", arr_durationusec.data_type().clone(), true),
Field::new("duration_nsec", arr_durationnsec.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);

Expand Down Expand Up @@ -1159,7 +1159,7 @@ mod tests {
DataType::List(Box::new(list_inner_type.clone())),
false,
);
let field_c2 = Field::new("c2", DataType::Utf8, false);
let field_c2 = Field::new("c2", DataType::Utf8, true);
let schema = Schema::new(vec![field_c1.clone(), field_c2]);

// list column rows: [[1, 2], [3]], [], [[4, 5, 6]]
Expand Down Expand Up @@ -1444,7 +1444,7 @@ mod tests {

let map = MapArray::from(map_data);

let map_field = Field::new("map", map_data_type, false);
let map_field = Field::new("map", map_data_type, true);
let schema = Arc::new(Schema::new(vec![map_field]));

let batch = RecordBatch::try_new(schema, vec![Arc::new(map)]).unwrap();
Expand Down
19 changes: 19 additions & 0 deletions arrow/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ impl RecordBatch {
)
})?;

for (c, f) in columns.iter().zip(&schema.fields) {
if !f.is_nullable() && c.null_count() > 0 {
return Err(ArrowError::InvalidArgumentError(format!(
"Column '{}' is declared as non-nullable but contains null values",
f.name()
)));
}
}

if columns.iter().any(|c| c.len() != row_count) {
let err = match options.row_count {
Some(_) => {
Expand Down Expand Up @@ -979,4 +988,14 @@ mod tests {
assert_ne!(a, b);
assert_eq!(b, RecordBatch::new_empty(schema))
}

#[test]
fn test_nulls_in_non_nullable_field() {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let maybe_batch = RecordBatch::try_new(
schema,
vec![Arc::new(Int32Array::from(vec![Some(1), None]))],
);
assert_eq!("Invalid argument error: Column 'a' is declared as non-nullable but contains null values", format!("{}", maybe_batch.err().unwrap()));
}
}