diff --git a/parquet-testing b/parquet-testing index 3edb72ae3630..7175a4713397 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 3edb72ae36305b24bf45dde4af41c92e54be85cf +Subproject commit 7175a471339704c7645af0fe66c68305e2e6759c diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index 97dbd4bb9e88..a26e60c02cf4 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -26,10 +26,10 @@ use std::vec::Vec; use arrow::array::{ new_empty_array, Array, ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray, - BinaryBuilder, BooleanArray, BooleanBufferBuilder, BooleanBuilder, + BinaryBuilder, BooleanArray, BooleanBufferBuilder, BooleanBuilder, DecimalArray, FixedSizeBinaryArray, FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder, - Int32Array, Int64Array, MapArray, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder, - StringArray, StringBuilder, StructArray, DecimalArray, + Int32Array, Int64Array, MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, + PrimitiveBuilder, StringArray, StringBuilder, StructArray, }; use arrow::buffer::{Buffer, MutableBuffer}; use arrow::datatypes::{ @@ -430,14 +430,16 @@ where } ArrowType::Decimal(p, s) => { let array = match array.data_type() { - ArrowType::Int32 => array.as_any() + ArrowType::Int32 => array + .as_any() .downcast_ref::() .unwrap() .iter() .map(|v| v.map(|v| v.into())) .collect::(), - ArrowType::Int64 => array.as_any() + ArrowType::Int64 => array + .as_any() .downcast_ref::() .unwrap() .iter() @@ -885,6 +887,7 @@ fn remove_indices( Ok(Arc::new(StructArray::from((new_columns, valid.finish())))) } } + ArrowType::Null => Ok(Arc::new(NullArray::new(arr.len()))), _ => Err(ParquetError::General(format!( "ListArray of type List({:?}) is not supported by array_reader", item_type @@ -924,7 +927,7 @@ impl ArrayReader for ListArrayReader { && (rep_levels.len() == next_batch_array.len())) { return Err(ArrowError( - "Expected item_reader def_levels and rep_levels to be same length as batch".to_string(), + format!("Expected item_reader def_levels {} and rep_levels {} to be same length as batch {}", def_levels.len(), rep_levels.len(), next_batch_array.len()), )); } @@ -964,6 +967,7 @@ impl ArrayReader for ListArrayReader { cur_offset += OffsetSize::one(); } }); + offsets.push(cur_offset); let num_bytes = bit_util::ceil(offsets.len(), 8); @@ -1767,15 +1771,13 @@ impl<'a> ArrayReaderBuilder { )), PhysicalType::INT96 => { // get the optional timezone information from arrow type - let timezone = arrow_type - .as_ref() - .and_then(|data_type| { - if let ArrowType::Timestamp(_, tz) = data_type { - tz.clone() - } else { - None - } - }); + let timezone = arrow_type.as_ref().and_then(|data_type| { + if let ArrowType::Timestamp(_, tz) = data_type { + tz.clone() + } else { + None + } + }); let converter = Int96Converter::new(Int96ArrayConverter { timezone }); Ok(Box::new(ComplexObjectArrayReader::< Int96Type, @@ -1983,13 +1985,15 @@ impl<'a> ArrayReaderBuilder { if i == 1 { field = self.arrow_schema.field_with_name(part).ok(); } else if let Some(f) = field { - if let ArrowType::Struct(fields) = f.data_type() { - field = fields.iter().find(|f| f.name() == part) - } else { - field = None + match f.data_type() { + ArrowType::Struct(fields) => { + field = fields.iter().find(|f| f.name() == part) + } + ArrowType::List(list_field) => field = Some(list_field.as_ref()), + _ => field = None, } } else { - field = None + field = None; } } field diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index fd2055b32782..141e49c778bd 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -1210,4 +1210,32 @@ mod tests { assert_eq!(get_dict(&batches[3]), get_dict(&batches[4])); assert_eq!(get_dict(&batches[4]), get_dict(&batches[5])); } + + #[test] + fn test_read_null_list() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/null_list.parquet", testdata); + let parquet_file_reader = + SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap(); + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader)); + let mut record_batch_reader = arrow_reader + .get_record_reader(60) + .expect("Failed to read into array!"); + + let batch = record_batch_reader.next().unwrap().unwrap(); + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.column(0).len(), 1); + + let list = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(list.len(), 1); + assert!(list.is_valid(0)); + + let val = list.value(0); + assert_eq!(val.len(), 0); + } } diff --git a/testing b/testing index b658b087767b..d315f7985207 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit b658b087767b041b2081766814655b4dd5a9a439 +Subproject commit d315f7985207d2d67fc2c8e41053e9d97d573f4b