Skip to content

Commit

Permalink
Fix Parquet reader for null list (#1448)
Browse files Browse the repository at this point in the history
* Fix Parquet reader for null list

* Test on forked parquet-testing

* For review comments

* Fix clippy
  • Loading branch information
viirya authored Mar 22, 2022
1 parent d02425d commit e778c10
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 22 deletions.
2 changes: 1 addition & 1 deletion parquet-testing
44 changes: 24 additions & 20 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use std::vec::Vec;

use arrow::array::{
new_empty_array, Array, ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray,
BinaryBuilder, BooleanArray, BooleanBufferBuilder, BooleanBuilder,
BinaryBuilder, BooleanArray, BooleanBufferBuilder, BooleanBuilder, DecimalArray,
FixedSizeBinaryArray, FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder,
Int32Array, Int64Array, MapArray, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder,
StringArray, StringBuilder, StructArray, DecimalArray,
Int32Array, Int64Array, MapArray, NullArray, OffsetSizeTrait, PrimitiveArray,
PrimitiveBuilder, StringArray, StringBuilder, StructArray,
};
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::{
Expand Down Expand Up @@ -430,14 +430,16 @@ where
}
ArrowType::Decimal(p, s) => {
let array = match array.data_type() {
ArrowType::Int32 => array.as_any()
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| v.map(|v| v.into()))
.collect::<DecimalArray>(),

ArrowType::Int64 => array.as_any()
ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
Expand Down Expand Up @@ -885,6 +887,7 @@ fn remove_indices(
Ok(Arc::new(StructArray::from((new_columns, valid.finish()))))
}
}
ArrowType::Null => Ok(Arc::new(NullArray::new(arr.len()))),
_ => Err(ParquetError::General(format!(
"ListArray of type List({:?}) is not supported by array_reader",
item_type
Expand Down Expand Up @@ -924,7 +927,7 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
&& (rep_levels.len() == next_batch_array.len()))
{
return Err(ArrowError(
"Expected item_reader def_levels and rep_levels to be same length as batch".to_string(),
format!("Expected item_reader def_levels {} and rep_levels {} to be same length as batch {}", def_levels.len(), rep_levels.len(), next_batch_array.len()),
));
}

Expand Down Expand Up @@ -964,6 +967,7 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
cur_offset += OffsetSize::one();
}
});

offsets.push(cur_offset);

let num_bytes = bit_util::ceil(offsets.len(), 8);
Expand Down Expand Up @@ -1767,15 +1771,13 @@ impl<'a> ArrayReaderBuilder {
)),
PhysicalType::INT96 => {
// get the optional timezone information from arrow type
let timezone = arrow_type
.as_ref()
.and_then(|data_type| {
if let ArrowType::Timestamp(_, tz) = data_type {
tz.clone()
} else {
None
}
});
let timezone = arrow_type.as_ref().and_then(|data_type| {
if let ArrowType::Timestamp(_, tz) = data_type {
tz.clone()
} else {
None
}
});
let converter = Int96Converter::new(Int96ArrayConverter { timezone });
Ok(Box::new(ComplexObjectArrayReader::<
Int96Type,
Expand Down Expand Up @@ -1983,13 +1985,15 @@ impl<'a> ArrayReaderBuilder {
if i == 1 {
field = self.arrow_schema.field_with_name(part).ok();
} else if let Some(f) = field {
if let ArrowType::Struct(fields) = f.data_type() {
field = fields.iter().find(|f| f.name() == part)
} else {
field = None
match f.data_type() {
ArrowType::Struct(fields) => {
field = fields.iter().find(|f| f.name() == part)
}
ArrowType::List(list_field) => field = Some(list_field.as_ref()),
_ => field = None,
}
} else {
field = None
field = None;
}
}
field
Expand Down
28 changes: 28 additions & 0 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1210,4 +1210,32 @@ mod tests {
assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
}

#[test]
fn test_read_null_list() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/null_list.parquet", testdata);
let parquet_file_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader));
let mut record_batch_reader = arrow_reader
.get_record_reader(60)
.expect("Failed to read into array!");

let batch = record_batch_reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.column(0).len(), 1);

let list = batch
.column(0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
assert_eq!(list.len(), 1);
assert!(list.is_valid(0));

let val = list.value(0);
assert_eq!(val.len(), 0);
}
}
2 changes: 1 addition & 1 deletion testing
Submodule testing updated 67 files
+ data/arrow-ipc-file/clusterfuzz-testcase-arrow-ipc-file-fuzz-5873085270589440
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5480145071243264
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5577412021190656
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5749190446153728
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5864855240835072.fuzz
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6023524637081600
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6177196536889344
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6318558565498880.fuzz
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-5298734406172672
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-5502930036326400
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-6065820480962560.fuzz
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-6537416932982784
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-6598997234548736
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-stream-fuzz-4895056843112448
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6674891504484352
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-4757582821064704
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-4961281405222912
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-5281967462023168
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-6589380504977408.fuzz
+37 −0 data/avro/README.md
+ data/avro/alltypes_dictionary.avro
+ data/avro/alltypes_plain.avro
+ data/avro/alltypes_plain.snappy.avro
+ data/avro/binary.avro
+ data/avro/datapage_v2.snappy.avro
+ data/avro/dict-page-offset-zero.avro
+ data/avro/fixed_length_decimal.avro
+ data/avro/fixed_length_decimal_legacy.avro
+ data/avro/int32_decimal.avro
+ data/avro/int64_decimal.avro
+ data/avro/list_columns.avro
+ data/avro/nested_lists.snappy.avro
+ data/avro/nonnullable.impala.avro
+ data/avro/nullable.impala.avro
+ data/avro/nulls.snappy.avro
+ data/avro/repeated_no_annotation.avro
+ data/avro/single_nan.avro
+ data/parquet/fuzzing/clusterfuzz-testcase-5913005913407488
+ data/parquet/fuzzing/clusterfuzz-testcase-6606237035003904
+ data/parquet/fuzzing/clusterfuzz-testcase-dictbitwidth-4680774947569664
+ data/parquet/fuzzing/clusterfuzz-testcase-dictbitwidth-5882232959270912
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-4738122420715520
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-4866999088447488
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-4938338763669504
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5004902418481152
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5103039558582272
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5106889906585600
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5152654819459072.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5251250357141504
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5385788188131328
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5798108001337344
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5841507574743040
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5915095763386368
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6122962147737600.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6125206807642112.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6289584196026368
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6358005443592192
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6539993600884736
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6696667471020032
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-5004902418481152
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-5415048864989184
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-5973249794637824
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-6196357887557632.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-6702965604876288
+ data/parquet/fuzzing/crash-61d6204d481340860da54e30f1937b67234ad0f7
+ data/parquet/fuzzing/crash-649c71a618ae2fd80cec177a9676eb3e280fc1fa
+ data/parquet/fuzzing/crash-9840a7b1a0d24996069f6ee0779bbe9875e8aca3

0 comments on commit e778c10

Please sign in to comment.