Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Parquet reader for null lists #1448

Merged
merged 4 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use arrow::array::{
new_empty_array, Array, ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray,
BinaryBuilder, BooleanArray, BooleanBufferBuilder, BooleanBuilder,
FixedSizeBinaryArray, FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder,
Int32Array, Int64Array, MapArray, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder,
Int32Array, Int64Array, MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder,
StringArray, StringBuilder, StructArray, DecimalArray,
};
use arrow::buffer::{Buffer, MutableBuffer};
Expand Down Expand Up @@ -885,6 +885,9 @@ fn remove_indices(
Ok(Arc::new(StructArray::from((new_columns, valid.finish()))))
}
}
ArrowType::Null => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably beyond the scope of this PR, but it occurs to me that this remove_indices function is basically an inverse of the arrow take kernel. I wonder if we could flip the construction of null_list_indices to be instead the non-null positions - and just use a regular take kernel?

I might have a play with this 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, seems so.

Ok(Arc::new(NullArray::new(arr.len() - indices.len())))
}
_ => Err(ParquetError::General(format!(
"ListArray of type List({:?}) is not supported by array_reader",
item_type
Expand Down Expand Up @@ -924,7 +927,7 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
&& (rep_levels.len() == next_batch_array.len()))
{
return Err(ArrowError(
"Expected item_reader def_levels and rep_levels to be same length as batch".to_string(),
format!("Expected item_reader def_levels {} and rep_levels {} to be same length as batch {}", def_levels.len(), rep_levels.len(), next_batch_array.len()),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Purely for debugging purpose. I feel it is clear.

));
}

Expand Down Expand Up @@ -964,7 +967,10 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
cur_offset += OffsetSize::one();
}
});
offsets.push(cur_offset);

if !cur_offset.is_zero() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is correct, I think it will convert an array containing no non-null values, into an empty array?

offsets.push(cur_offset);
}

let num_bytes = bit_util::ceil(offsets.len(), 8);
// TODO: A useful optimization is to use the null count to fill with
Expand Down Expand Up @@ -1984,12 +1990,14 @@ impl<'a> ArrayReaderBuilder {
field = self.arrow_schema.field_with_name(part).ok();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not part of this PR, but I wonder why this doesn't just initialize field to be self.arrow_schema.field_with_name(parts[1]).ok() and then do skip(2) given it has already done the length check above...

} else if let Some(f) = field {
if let ArrowType::Struct(fields) = f.data_type() {
field = fields.iter().find(|f| f.name() == part)
field = fields.iter().find(|f| f.name() == part);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a match block might be clearer?

} else if let ArrowType::List(list_field) = f.data_type() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken this change is necessary to support a StructArray containing a ListArray in general, without being specific to nulls?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this is not specific to nulls.

field = Some(list_field.as_ref());
} else {
field = None
field = None;
}
} else {
field = None
field = None;
}
}
field
Expand Down
15 changes: 15 additions & 0 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1210,4 +1210,19 @@ mod tests {
assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
}

#[test]
fn test_read_null_list() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/null_list.parquet", testdata);
Copy link
Member Author

@viirya viirya Mar 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I downloaded the test parquet file from @novemberkilo's parquet-testing fork to test it locally. Not sure how we can put the file there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could point to my parquet-testing fork like I have done here novemberkilo@e5952ae#diff-fe7afb5c9c916e521401d3fcfb4277d5071798c3baf83baf11d6071742823584

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think it works for CI purpose in the PR. Not sure if we can merge into master like that. Anyway, let me point to your fork first to test it. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@novemberkilo could you open a PR against parquet-testing and add your test file there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I think it is good idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya fwiw I did something like:

  • delete the parquet-testing submodule
  • git submodule add -b topic/null-list-reader [email protected]:novemberkilo/parquet-testing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. Thanks. I added the file manually to test it before.

And seems updating .gitmodules doesn't work for CI here. We may need to wait for the test file being merged in parquet-testing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have a play with creating a test that generates a parquet file, so that we can get this PR in without waiting for parquet-testing. I will also file a ticket to start a discussion on a faster way to get parquet test files checked in, without relying on an upstream repo

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tustvold. The parquet file was merged. It'd be great to get testing parquet test files checked in faster.

let parquet_file_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader));
let mut record_batch_reader = arrow_reader
.get_record_reader(60)
.expect("Failed to read into array!");

let batch = record_batch_reader.next();
assert!(batch.is_none());
Copy link
Contributor

@tustvold tustvold Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct, locally I ran

> duckdb.query(f"select count(*) from 'null_list.parquet'").fetchall()
[(1,)]
> duckdb.query(f"select * from 'null_list.parquet'").fetchall()
[(None,)]

Which would imply that the data contains a single row, with a single null value

Perhaps it should be

let batch = record_batch_reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.column(0).len(), 1);

let list = batch.column(0).as_any().downcast_ref::<ListArray>().unwrap();
assert_eq!(list.len(), 1);
assert_eq!(list.is_valid(0), true);

let val = list.value(0);
assert_eq!(val.len(), 0);

}
}