Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Parquet reader for null lists #1448

Merged
merged 4 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet-testing
44 changes: 24 additions & 20 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use std::vec::Vec;

use arrow::array::{
new_empty_array, Array, ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray,
BinaryBuilder, BooleanArray, BooleanBufferBuilder, BooleanBuilder,
BinaryBuilder, BooleanArray, BooleanBufferBuilder, BooleanBuilder, DecimalArray,
FixedSizeBinaryArray, FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder,
Int32Array, Int64Array, MapArray, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder,
StringArray, StringBuilder, StructArray, DecimalArray,
Int32Array, Int64Array, MapArray, NullArray, OffsetSizeTrait, PrimitiveArray,
PrimitiveBuilder, StringArray, StringBuilder, StructArray,
};
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::{
Expand Down Expand Up @@ -430,14 +430,16 @@ where
}
ArrowType::Decimal(p, s) => {
let array = match array.data_type() {
ArrowType::Int32 => array.as_any()
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| v.map(|v| v.into()))
.collect::<DecimalArray>(),

ArrowType::Int64 => array.as_any()
ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
Expand Down Expand Up @@ -885,6 +887,7 @@ fn remove_indices(
Ok(Arc::new(StructArray::from((new_columns, valid.finish()))))
}
}
ArrowType::Null => Ok(Arc::new(NullArray::new(arr.len()))),
_ => Err(ParquetError::General(format!(
"ListArray of type List({:?}) is not supported by array_reader",
item_type
Expand Down Expand Up @@ -924,7 +927,7 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
&& (rep_levels.len() == next_batch_array.len()))
{
return Err(ArrowError(
"Expected item_reader def_levels and rep_levels to be same length as batch".to_string(),
format!("Expected item_reader def_levels {} and rep_levels {} to be same length as batch {}", def_levels.len(), rep_levels.len(), next_batch_array.len()),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Purely for debugging purpose. I feel it is clear.

));
}

Expand Down Expand Up @@ -964,6 +967,7 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
cur_offset += OffsetSize::one();
}
});

offsets.push(cur_offset);

let num_bytes = bit_util::ceil(offsets.len(), 8);
Expand Down Expand Up @@ -1767,15 +1771,13 @@ impl<'a> ArrayReaderBuilder {
)),
PhysicalType::INT96 => {
// get the optional timezone information from arrow type
let timezone = arrow_type
.as_ref()
.and_then(|data_type| {
if let ArrowType::Timestamp(_, tz) = data_type {
tz.clone()
} else {
None
}
});
let timezone = arrow_type.as_ref().and_then(|data_type| {
if let ArrowType::Timestamp(_, tz) = data_type {
tz.clone()
} else {
None
}
});
let converter = Int96Converter::new(Int96ArrayConverter { timezone });
Ok(Box::new(ComplexObjectArrayReader::<
Int96Type,
Expand Down Expand Up @@ -1983,13 +1985,15 @@ impl<'a> ArrayReaderBuilder {
if i == 1 {
field = self.arrow_schema.field_with_name(part).ok();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not part of this PR, but I wonder why this doesn't just initialize field to be self.arrow_schema.field_with_name(parts[1]).ok() and then do skip(2) given it has already done the length check above...

} else if let Some(f) = field {
if let ArrowType::Struct(fields) = f.data_type() {
field = fields.iter().find(|f| f.name() == part)
} else {
field = None
match f.data_type() {
ArrowType::Struct(fields) => {
field = fields.iter().find(|f| f.name() == part)
}
ArrowType::List(list_field) => field = Some(list_field.as_ref()),
_ => field = None,
}
} else {
field = None
field = None;
}
}
field
Expand Down
28 changes: 28 additions & 0 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1210,4 +1210,32 @@ mod tests {
assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
}

#[test]
fn test_read_null_list() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/null_list.parquet", testdata);
Copy link
Member Author

@viirya viirya Mar 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I downloaded the test parquet file from @novemberkilo's parquet-testing fork to test it locally. Not sure how we can put the file there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could point to my parquet-testing fork like I have done here novemberkilo@e5952ae#diff-fe7afb5c9c916e521401d3fcfb4277d5071798c3baf83baf11d6071742823584

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think it works for CI purpose in the PR. Not sure if we can merge into master like that. Anyway, let me point to your fork first to test it. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@novemberkilo could you open a PR against parquet-testing and add your test file there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I think it is good idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya fwiw I did something like:

  • delete the parquet-testing submodule
  • git submodule add -b topic/null-list-reader [email protected]:novemberkilo/parquet-testing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. Thanks. I added the file manually to test it before.

And seems updating .gitmodules doesn't work for CI here. We may need to wait for the test file being merged in parquet-testing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have a play with creating a test that generates a parquet file, so that we can get this PR in without waiting for parquet-testing. I will also file a ticket to start a discussion on a faster way to get parquet test files checked in, without relying on an upstream repo

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tustvold. The parquet file was merged. It'd be great to get testing parquet test files checked in faster.

let parquet_file_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader));
let mut record_batch_reader = arrow_reader
.get_record_reader(60)
.expect("Failed to read into array!");

let batch = record_batch_reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.column(0).len(), 1);

let list = batch
.column(0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
assert_eq!(list.len(), 1);
assert!(list.is_valid(0));

let val = list.value(0);
assert_eq!(val.len(), 0);
}
}
2 changes: 1 addition & 1 deletion testing
Submodule testing updated 67 files
+ data/arrow-ipc-file/clusterfuzz-testcase-arrow-ipc-file-fuzz-5873085270589440
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5480145071243264
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5577412021190656
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5749190446153728
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5864855240835072.fuzz
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6023524637081600
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6177196536889344
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6318558565498880.fuzz
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-5298734406172672
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-5502930036326400
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-6065820480962560.fuzz
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-6537416932982784
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-6598997234548736
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-stream-fuzz-4895056843112448
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6674891504484352
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-4757582821064704
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-4961281405222912
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-5281967462023168
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-6589380504977408.fuzz
+37 −0 data/avro/README.md
+ data/avro/alltypes_dictionary.avro
+ data/avro/alltypes_plain.avro
+ data/avro/alltypes_plain.snappy.avro
+ data/avro/binary.avro
+ data/avro/datapage_v2.snappy.avro
+ data/avro/dict-page-offset-zero.avro
+ data/avro/fixed_length_decimal.avro
+ data/avro/fixed_length_decimal_legacy.avro
+ data/avro/int32_decimal.avro
+ data/avro/int64_decimal.avro
+ data/avro/list_columns.avro
+ data/avro/nested_lists.snappy.avro
+ data/avro/nonnullable.impala.avro
+ data/avro/nullable.impala.avro
+ data/avro/nulls.snappy.avro
+ data/avro/repeated_no_annotation.avro
+ data/avro/single_nan.avro
+ data/parquet/fuzzing/clusterfuzz-testcase-5913005913407488
+ data/parquet/fuzzing/clusterfuzz-testcase-6606237035003904
+ data/parquet/fuzzing/clusterfuzz-testcase-dictbitwidth-4680774947569664
+ data/parquet/fuzzing/clusterfuzz-testcase-dictbitwidth-5882232959270912
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-4738122420715520
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-4866999088447488
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-4938338763669504
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5004902418481152
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5103039558582272
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5106889906585600
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5152654819459072.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5251250357141504
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5385788188131328
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5798108001337344
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5841507574743040
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5915095763386368
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6122962147737600.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6125206807642112.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6289584196026368
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6358005443592192
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6539993600884736
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6696667471020032
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-5004902418481152
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-5415048864989184
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-5973249794637824
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-6196357887557632.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-6702965604876288
+ data/parquet/fuzzing/crash-61d6204d481340860da54e30f1937b67234ad0f7
+ data/parquet/fuzzing/crash-649c71a618ae2fd80cec177a9676eb3e280fc1fa
+ data/parquet/fuzzing/crash-9840a7b1a0d24996069f6ee0779bbe9875e8aca3