From 4b98333698af00ca5318a397127d33067c879740 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 6 May 2022 08:16:27 +0100 Subject: [PATCH] Separate parquet -> arrow conversion logic (#1655) Don't treat embedded arrow schema as authoritative (#1663) Fix projection of nested parquet files (#1652) (#1654) Fix schema inference for repeated fields (#1681) Support reading alternative list representations from parquet (#1680) --- arrow/src/datatypes/datatype.rs | 10 + parquet/src/arrow/array_reader/builder.rs | 847 +++++-------------- parquet/src/arrow/array_reader/list_array.rs | 11 +- parquet/src/arrow/arrow_writer.rs | 2 +- parquet/src/arrow/schema.rs | 707 +++------------- parquet/src/arrow/schema/complex.rs | 563 ++++++++++++ parquet/src/arrow/schema/primitive.rs | 251 ++++++ parquet/src/errors.rs | 8 + 8 files changed, 1174 insertions(+), 1225 deletions(-) create mode 100644 parquet/src/arrow/schema/complex.rs create mode 100644 parquet/src/arrow/schema/primitive.rs diff --git a/arrow/src/datatypes/datatype.rs b/arrow/src/datatypes/datatype.rs index e6f5461fd8cd..5cdefe5197af 100644 --- a/arrow/src/datatypes/datatype.rs +++ b/arrow/src/datatypes/datatype.rs @@ -181,6 +181,16 @@ pub enum IntervalUnit { MonthDayNano, } +impl IntervalUnit { + pub fn get_byte_width(&self) -> usize { + match self { + IntervalUnit::YearMonth => 4, + IntervalUnit::DayTime => 8, + IntervalUnit::MonthDayNano => 16, + } + } +} + // Sparse or Dense union layouts #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub enum UnionMode { diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index af2896350cad..186ddd196b2d 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use arrow::datatypes::{DataType as ArrowType, Field, IntervalUnit, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, IntervalUnit, SchemaRef}; use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::{ @@ -32,15 +31,14 @@ use crate::arrow::converter::{ IntervalDayTimeArrayConverter, IntervalDayTimeConverter, IntervalYearMonthArrayConverter, IntervalYearMonthConverter, }; -use crate::basic::{ConvertedType, Repetition, Type as PhysicalType}; +use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType}; +use crate::basic::Type as PhysicalType; use crate::data_type::{ BoolType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96Type, }; -use crate::errors::ParquetError::ArrowError; -use crate::errors::{ParquetError, Result}; -use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr}; -use crate::schema::visitor::TypeVisitor; +use crate::errors::Result; +use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type}; /// Create array reader from parquet schema, column indices, and parquet file reader. pub fn build_array_reader( @@ -52,435 +50,211 @@ pub fn build_array_reader( where T: IntoIterator, { - let mut leaves = HashMap::<*const Type, usize>::new(); - - let mut filtered_root_names = HashSet::::new(); - - for c in column_indices { - let column = parquet_schema.column(c).self_type() as *const Type; - - leaves.insert(column, c); - - let root = parquet_schema.get_column_root_ptr(c); - filtered_root_names.insert(root.name().to_string()); + let field = convert_schema( + parquet_schema.as_ref(), + column_indices, + Some(arrow_schema.as_ref()), + )?; + + match &field { + Some(field) => build_reader(field, row_groups.as_ref()), + None => Ok(make_empty_array_reader(row_groups.num_rows())), } - - // Only pass root fields that take part in the projection - // to avoid traversal of columns that are not read. - // TODO: also prune unread parts of the tree in child structures - let filtered_root_fields = parquet_schema - .root_schema() - .get_fields() - .iter() - .filter(|field| filtered_root_names.contains(field.name())) - .cloned() - .collect::>(); - - let proj = Type::GroupType { - basic_info: parquet_schema.root_schema().get_basic_info().clone(), - fields: filtered_root_fields, - }; - - ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups) - .build_array_reader() } -/// Used to build array reader. -struct ArrayReaderBuilder { - root_schema: TypePtr, - arrow_schema: Arc, - // Key: columns that need to be included in final array builder - // Value: column index in schema - columns_included: Arc>, - row_groups: Box, +fn build_reader( + field: &ParquetField, + row_groups: &dyn RowGroupCollection, +) -> Result> { + match field.field_type { + ParquetFieldType::Primitive { .. } => build_primitive_reader(field, row_groups), + ParquetFieldType::Group { .. } => match &field.arrow_type { + DataType::Map(_, _) => build_map_reader(field, row_groups), + DataType::Struct(_) => build_struct_reader(field, row_groups), + DataType::List(_) => build_list_reader(field, false, row_groups), + DataType::LargeList(_) => build_list_reader(field, true, row_groups), + d => unimplemented!("reading group type {} not implemented", d), + }, + } } -/// Used in type visitor. -#[derive(Clone)] -struct ArrayReaderBuilderContext { - def_level: i16, - rep_level: i16, - path: ColumnPath, +/// Build array reader for map type. +fn build_map_reader( + field: &ParquetField, + row_groups: &dyn RowGroupCollection, +) -> Result> { + let children = field.children().unwrap(); + assert_eq!(children.len(), 2); + + let key_reader = build_reader(&children[0], row_groups)?; + let value_reader = build_reader(&children[1], row_groups)?; + + Ok(Box::new(MapArrayReader::new( + key_reader, + value_reader, + field.arrow_type.clone(), + field.def_level, + field.rep_level, + ))) } -impl Default for ArrayReaderBuilderContext { - fn default() -> Self { - Self { - def_level: 0i16, - rep_level: 0i16, - path: ColumnPath::new(Vec::new()), - } +/// Build array reader for list type. +fn build_list_reader( + field: &ParquetField, + is_large: bool, + row_groups: &dyn RowGroupCollection, +) -> Result> { + let children = field.children().unwrap(); + assert_eq!(children.len(), 1); + + let data_type = field.arrow_type.clone(); + let item_reader = build_reader(&children[0], row_groups)?; + let item_type = item_reader.get_data_type().clone(); + + match is_large { + false => Ok(Box::new(ListArrayReader::::new( + item_reader, + data_type, + item_type, + field.def_level, + field.rep_level, + field.nullable, + )) as _), + true => Ok(Box::new(ListArrayReader::::new( + item_reader, + data_type, + item_type, + field.def_level, + field.rep_level, + field.nullable, + )) as _), } } -/// Create array reader by visiting schema. -impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext> - for ArrayReaderBuilder -{ - /// Build array reader for primitive type. - fn visit_primitive( - &mut self, - cur_type: TypePtr, - context: &'a ArrayReaderBuilderContext, - ) -> Result>> { - if self.is_included(cur_type.as_ref()) { - let mut new_context = context.clone(); - new_context.path.append(vec![cur_type.name().to_string()]); - - let null_mask_only = match cur_type.get_basic_info().repetition() { - Repetition::REPEATED => { - return Err(ArrowError(format!( - "Reading repeated primitive ({:?}) is not supported yet!", - cur_type.name() - ))); - } - Repetition::OPTIONAL => { - new_context.def_level += 1; - - // Can just compute null mask if no parent - context.def_level == 0 && context.rep_level == 0 - } - _ => false, - }; - - let reader = self.build_for_primitive_type_inner( - cur_type, - &new_context, - null_mask_only, - )?; - - Ok(Some(reader)) - } else { - Ok(None) - } - } - - /// Build array reader for struct type. - fn visit_struct( - &mut self, - cur_type: Arc, - context: &'a ArrayReaderBuilderContext, - ) -> Result>> { - let mut new_context = context.clone(); - new_context.path.append(vec![cur_type.name().to_string()]); - - if cur_type.get_basic_info().has_repetition() { - match cur_type.get_basic_info().repetition() { - Repetition::REPEATED => { - return Err(ArrowError(format!( - "Reading repeated struct ({:?}) is not supported yet!", - cur_type.name(), - ))) - } - Repetition::OPTIONAL => { - new_context.def_level += 1; - } - Repetition::REQUIRED => {} - } - } - - self.build_for_struct_type_inner(&cur_type, &new_context) - } - - /// Build array reader for map type. - fn visit_map( - &mut self, - map_type: Arc, - context: &'a ArrayReaderBuilderContext, - ) -> Result>> { - // Add map type to context - let mut new_context = context.clone(); - new_context.path.append(vec![map_type.name().to_string()]); - - match map_type.get_basic_info().repetition() { - Repetition::REQUIRED => {} - Repetition::OPTIONAL => { - new_context.def_level += 1; - } - Repetition::REPEATED => { - return Err(ArrowError("Map cannot be repeated".to_string())) - } - } - - if map_type.get_fields().len() != 1 { - return Err(ArrowError(format!( - "Map field must have exactly one key_value child, found {}", - map_type.get_fields().len() - ))); - } - - // Add map entry (key_value) to context - let map_key_value = &map_type.get_fields()[0]; - if map_key_value.get_basic_info().repetition() != Repetition::REPEATED { - return Err(ArrowError( - "Child of map field must be repeated".to_string(), - )); - } - - new_context - .path - .append(vec![map_key_value.name().to_string()]); - - new_context.rep_level += 1; - new_context.def_level += 1; - - if map_key_value.get_fields().len() != 2 { - // According to the specification the values are optional (#1642) - return Err(ArrowError(format!( - "Child of map field must have two children, found {}", - map_key_value.get_fields().len() - ))); - } - - // Get key and value, and create context for each - let map_key = &map_key_value.get_fields()[0]; - let map_value = &map_key_value.get_fields()[1]; - - if map_key.get_basic_info().repetition() != Repetition::REQUIRED { - return Err(ArrowError("Map keys must be required".to_string())); - } - - if map_value.get_basic_info().repetition() == Repetition::REPEATED { - return Err(ArrowError("Map values cannot be repeated".to_string())); - } - - let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap(); - let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap(); - - let arrow_type = self - .arrow_schema - .field_with_name(map_type.name()) - .ok() - .map(|f| f.data_type().to_owned()) - .unwrap_or_else(|| { - ArrowType::Map( - Box::new(Field::new( - map_key_value.name(), - ArrowType::Struct(vec![ - Field::new( - map_key.name(), - key_reader.get_data_type().clone(), - false, - ), - Field::new( - map_value.name(), - value_reader.get_data_type().clone(), - map_value.is_optional(), - ), - ]), - map_type.is_optional(), - )), - false, - ) - }); - - let key_array_reader: Box = Box::new(MapArrayReader::new( - key_reader, - value_reader, - arrow_type, - new_context.def_level, - new_context.rep_level, - )); - - Ok(Some(key_array_reader)) - } - - /// Build array reader for list type. - fn visit_list_with_item( - &mut self, - list_type: Arc, - item_type: Arc, - context: &'a ArrayReaderBuilderContext, - ) -> Result>> { - let mut new_context = context.clone(); - new_context.path.append(vec![list_type.name().to_string()]); - - // If the list is nullable - let nullable = match list_type.get_basic_info().repetition() { - Repetition::REQUIRED => false, - Repetition::OPTIONAL => { - new_context.def_level += 1; - true - } - Repetition::REPEATED => { - return Err(general_err!("List type cannot be repeated")) +/// Creates primitive array reader for each primitive type. +fn build_primitive_reader( + field: &ParquetField, + row_groups: &dyn RowGroupCollection, +) -> Result> { + let (col_idx, primitive_type, type_len) = match &field.field_type { + ParquetFieldType::Primitive { + col_idx, + primitive_type, + } => match primitive_type.as_ref() { + Type::PrimitiveType { type_length, .. } => { + (*col_idx, primitive_type.clone(), *type_length) } - }; - - if list_type.get_fields().len() != 1 { - return Err(ArrowError(format!( - "List field must have exactly one child, found {}", - list_type.get_fields().len() - ))); - } - let mut list_child = &list_type.get_fields()[0]; - - if list_child.get_basic_info().repetition() != Repetition::REPEATED { - return Err(ArrowError("List child must be repeated".to_string())); - } - - // The repeated field - new_context.rep_level += 1; - new_context.def_level += 1; - - match self.dispatch(item_type, &new_context) { - Ok(Some(item_reader)) => { - let item_type = item_reader.get_data_type().clone(); - - // a list is a group type with a single child. The list child's - // name comes from the child's field name. - // if the child's name is "list" and it has a child, then use this child - if list_child.name() == "list" && !list_child.get_fields().is_empty() { - list_child = list_child.get_fields().first().unwrap(); - } - - let arrow_type = self - .arrow_schema - .field_with_name(list_type.name()) - .ok() - .map(|f| f.data_type().to_owned()) - .unwrap_or_else(|| { - ArrowType::List(Box::new(Field::new( - list_child.name(), - item_type.clone(), - list_child.is_optional(), - ))) - }); - - let list_array_reader: Box = match arrow_type { - ArrowType::List(_) => Box::new(ListArrayReader::::new( - item_reader, - arrow_type, - item_type, - new_context.def_level, - new_context.rep_level, - nullable, - )), - ArrowType::LargeList(_) => Box::new(ListArrayReader::::new( - item_reader, - arrow_type, - item_type, - new_context.def_level, - new_context.rep_level, - nullable, - )), - _ => { - return Err(ArrowError(format!( - "creating ListArrayReader with type {:?} should be unreachable", - arrow_type - ))) - } - }; - - Ok(Some(list_array_reader)) - } - result => result, - } - } -} - -impl<'a> ArrayReaderBuilder { - /// Construct array reader builder. - fn new( - root_schema: TypePtr, - arrow_schema: Arc, - columns_included: Arc>, - file_reader: Box, - ) -> Self { - Self { - root_schema, - arrow_schema, - columns_included, - row_groups: file_reader, - } - } - - /// Main entry point. - fn build_array_reader(&mut self) -> Result> { - let context = ArrayReaderBuilderContext::default(); - - match self.visit_struct(self.root_schema.clone(), &context)? { - Some(reader) => Ok(reader), - None => Ok(make_empty_array_reader(self.row_groups.num_rows())), - } - } - - // Utility functions - - /// Check whether one column in included in this array reader builder. - fn is_included(&self, t: &Type) -> bool { - self.columns_included.contains_key(&(t as *const Type)) - } + Type::GroupType { .. } => unreachable!(), + }, + _ => unreachable!(), + }; - /// Creates primitive array reader for each primitive type. - fn build_for_primitive_type_inner( - &self, - cur_type: TypePtr, - context: &'a ArrayReaderBuilderContext, - null_mask_only: bool, - ) -> Result> { - let column_desc = Arc::new(ColumnDescriptor::new( - cur_type.clone(), - context.def_level, - context.rep_level, - context.path.clone(), - )); + let physical_type = primitive_type.get_physical_type(); - let page_iterator = self - .row_groups - .column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?; + let column_desc = Arc::new(ColumnDescriptor::new( + primitive_type, + field.def_level, + field.rep_level, + ColumnPath::new(vec![]), + )); - let arrow_type: Option = self - .get_arrow_field(&cur_type, context) - .map(|f| f.data_type().clone()); + let page_iterator = row_groups.column_chunks(col_idx)?; + let null_mask_only = field.def_level == 1 && field.nullable; + let arrow_type = Some(field.arrow_type.clone()); - match cur_type.get_physical_type() { - PhysicalType::BOOLEAN => Ok(Box::new( - PrimitiveArrayReader::::new_with_options( + match physical_type { + PhysicalType::BOOLEAN => Ok(Box::new( + PrimitiveArrayReader::::new_with_options( + page_iterator, + column_desc, + arrow_type, + null_mask_only, + )?, + )), + PhysicalType::INT32 => { + if let Some(DataType::Null) = arrow_type { + Ok(Box::new(NullArrayReader::::new( page_iterator, column_desc, - arrow_type, - null_mask_only, - )?, - )), - PhysicalType::INT32 => { - if let Some(ArrowType::Null) = arrow_type { - Ok(Box::new(NullArrayReader::::new( + )?)) + } else { + Ok(Box::new( + PrimitiveArrayReader::::new_with_options( page_iterator, column_desc, - )?)) + arrow_type, + null_mask_only, + )?, + )) + } + } + PhysicalType::INT64 => Ok(Box::new( + PrimitiveArrayReader::::new_with_options( + page_iterator, + column_desc, + arrow_type, + null_mask_only, + )?, + )), + PhysicalType::INT96 => { + // get the optional timezone information from arrow type + let timezone = arrow_type.as_ref().and_then(|data_type| { + if let DataType::Timestamp(_, tz) = data_type { + tz.clone() } else { - Ok(Box::new( - PrimitiveArrayReader::::new_with_options( - page_iterator, - column_desc, - arrow_type, - null_mask_only, - )?, - )) + None } - } - PhysicalType::INT64 => Ok(Box::new( - PrimitiveArrayReader::::new_with_options( - page_iterator, - column_desc, - arrow_type, - null_mask_only, - )?, - )), - PhysicalType::INT96 => { - // get the optional timezone information from arrow type - let timezone = arrow_type.as_ref().and_then(|data_type| { - if let ArrowType::Timestamp(_, tz) = data_type { - tz.clone() - } else { - None - } - }); - let converter = Int96Converter::new(Int96ArrayConverter { timezone }); + }); + let converter = Int96Converter::new(Int96ArrayConverter { timezone }); + Ok(Box::new(ComplexObjectArrayReader::< + Int96Type, + Int96Converter, + >::new( + page_iterator, + column_desc, + converter, + arrow_type, + )?)) + } + PhysicalType::FLOAT => Ok(Box::new( + PrimitiveArrayReader::::new_with_options( + page_iterator, + column_desc, + arrow_type, + null_mask_only, + )?, + )), + PhysicalType::DOUBLE => Ok(Box::new( + PrimitiveArrayReader::::new_with_options( + page_iterator, + column_desc, + arrow_type, + null_mask_only, + )?, + )), + PhysicalType::BYTE_ARRAY => match arrow_type { + Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader( + page_iterator, + column_desc, + arrow_type, + null_mask_only, + ), + _ => make_byte_array_reader( + page_iterator, + column_desc, + arrow_type, + null_mask_only, + ), + }, + PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type { + DataType::Decimal(precision, scale) => { + let converter = DecimalConverter::new(DecimalArrayConverter::new( + precision as i32, + scale as i32, + )); Ok(Box::new(ComplexObjectArrayReader::< - Int96Type, - Int96Converter, + FixedLenByteArrayType, + DecimalConverter, >::new( page_iterator, column_desc, @@ -488,47 +262,38 @@ impl<'a> ArrayReaderBuilder { arrow_type, )?)) } - PhysicalType::FLOAT => Ok(Box::new( - PrimitiveArrayReader::::new_with_options( - page_iterator, - column_desc, - arrow_type, - null_mask_only, - )?, - )), - PhysicalType::DOUBLE => Ok(Box::new( - PrimitiveArrayReader::::new_with_options( - page_iterator, - column_desc, - arrow_type, - null_mask_only, - )?, - )), - PhysicalType::BYTE_ARRAY => match arrow_type { - Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader( + DataType::Interval(IntervalUnit::DayTime) => { + let converter = + IntervalDayTimeConverter::new(IntervalDayTimeArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + FixedLenByteArrayType, + _, + >::new( page_iterator, column_desc, + converter, arrow_type, - null_mask_only, - ), - _ => make_byte_array_reader( + )?)) + } + DataType::Interval(IntervalUnit::YearMonth) => { + let converter = + IntervalYearMonthConverter::new(IntervalYearMonthArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + FixedLenByteArrayType, + _, + >::new( page_iterator, column_desc, + converter, arrow_type, - null_mask_only, - ), - }, - PhysicalType::FIXED_LEN_BYTE_ARRAY - if cur_type.get_basic_info().converted_type() - == ConvertedType::DECIMAL => - { - let converter = DecimalConverter::new(DecimalArrayConverter::new( - cur_type.get_precision(), - cur_type.get_scale(), - )); + )?)) + } + _ => { + let converter = + FixedLenBinaryConverter::new(FixedSizeArrayConverter::new(type_len)); Ok(Box::new(ComplexObjectArrayReader::< FixedLenByteArrayType, - DecimalConverter, + FixedLenBinaryConverter, >::new( page_iterator, column_desc, @@ -536,173 +301,27 @@ impl<'a> ArrayReaderBuilder { arrow_type, )?)) } - PhysicalType::FIXED_LEN_BYTE_ARRAY => { - let byte_width = match *cur_type { - Type::PrimitiveType { - ref type_length, .. - } => *type_length, - _ => { - return Err(ArrowError( - "Expected a physical type, not a group type".to_string(), - )) - } - }; - if cur_type.get_basic_info().converted_type() == ConvertedType::INTERVAL { - if byte_width != 12 { - return Err(ArrowError(format!( - "Parquet interval type should have length of 12, found {}", - byte_width - ))); - } - match arrow_type { - Some(ArrowType::Interval(IntervalUnit::DayTime)) => { - let converter = IntervalDayTimeConverter::new( - IntervalDayTimeArrayConverter {}, - ); - Ok(Box::new(ComplexObjectArrayReader::< - FixedLenByteArrayType, - _, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) - } - Some(ArrowType::Interval(IntervalUnit::YearMonth)) => { - let converter = IntervalYearMonthConverter::new( - IntervalYearMonthArrayConverter {}, - ); - Ok(Box::new(ComplexObjectArrayReader::< - FixedLenByteArrayType, - _, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) - } - Some(t) => Err(ArrowError(format!( - "Cannot write a Parquet interval to {:?}", - t - ))), - None => { - // we do not support an interval not matched to an Arrow type, - // because we risk data loss as we won't know which of the 12 bytes - // are or should be populated - Err(ArrowError( - "Cannot write a Parquet interval with no Arrow type specified. - There is a risk of data loss as Arrow either supports YearMonth or - DayTime precision. Without the Arrow type, we cannot infer the type. - ".to_string() - )) - } - } - } else { - let converter = FixedLenBinaryConverter::new( - FixedSizeArrayConverter::new(byte_width), - ); - Ok(Box::new(ComplexObjectArrayReader::< - FixedLenByteArrayType, - FixedLenBinaryConverter, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) - } - } - } + }, } +} - /// Constructs struct array reader without considering repetition. - fn build_for_struct_type_inner( - &mut self, - cur_type: &Type, - context: &'a ArrayReaderBuilderContext, - ) -> Result>> { - let mut fields = Vec::with_capacity(cur_type.get_fields().len()); - let mut children_reader = Vec::with_capacity(cur_type.get_fields().len()); - - for child in cur_type.get_fields() { - if let Some(child_reader) = self.dispatch(child.clone(), context)? { - // TODO: this results in calling get_arrow_field twice, it could be reused - // from child_reader above, by making child_reader carry its `Field` - let mut struct_context = context.clone(); - struct_context.path.append(vec![child.name().to_string()]); - let field = match self.get_arrow_field(child, &struct_context) { - Some(f) => f.clone(), - _ => Field::new( - child.name(), - child_reader.get_data_type().clone(), - child.is_optional(), - ), - }; - fields.push(field); - children_reader.push(child_reader); - } - } - - if !fields.is_empty() { - let arrow_type = ArrowType::Struct(fields); - Ok(Some(Box::new(StructArrayReader::new( - arrow_type, - children_reader, - context.def_level, - context.rep_level, - )))) - } else { - Ok(None) - } - } - - fn get_arrow_field( - &self, - cur_type: &Type, - context: &'a ArrayReaderBuilderContext, - ) -> Option<&Field> { - let parts: Vec<&str> = context - .path - .parts() - .iter() - .map(|x| -> &str { x }) - .collect::>(); - - // If the parts length is one it'll have the top level "schema" type. If - // it's two then it'll be a top-level type that we can get from the arrow - // schema directly. - if parts.len() <= 2 { - self.arrow_schema.field_with_name(cur_type.name()).ok() - } else { - // If it's greater than two then we need to traverse the type path - // until we find the actual field we're looking for. - let mut field: Option<&Field> = None; - - for (i, part) in parts.iter().enumerate().skip(1) { - if i == 1 { - field = self.arrow_schema.field_with_name(part).ok(); - } else if let Some(f) = field { - match f.data_type() { - ArrowType::Struct(fields) => { - field = fields.iter().find(|f| f.name() == part) - } - ArrowType::List(list_field) => match list_field.data_type() { - ArrowType::Struct(fields) => { - field = fields.iter().find(|f| f.name() == part) - } - _ => field = Some(list_field.as_ref()), - }, - _ => field = None, - } - } else { - field = None; - } - } - field - } - } +/// Constructs struct array reader without considering repetition. +fn build_struct_reader( + field: &ParquetField, + row_groups: &dyn RowGroupCollection, +) -> Result> { + let children = field.children().unwrap(); + let children_reader = children + .into_iter() + .map(|child| build_reader(child, row_groups)) + .collect::>>()?; + + Ok(Box::new(StructArrayReader::new( + field.arrow_type.clone(), + children_reader, + field.def_level, + field.rep_level, + )) as _) } #[cfg(test)] @@ -735,9 +354,9 @@ mod tests { .unwrap(); // Create arrow types - let arrow_type = ArrowType::Struct(vec![Field::new( + let arrow_type = DataType::Struct(vec![Field::new( "b_struct", - ArrowType::Struct(vec![Field::new("b_c_int", ArrowType::Int32, true)]), + DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)]), true, )]); diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 5b7d4865fca2..31bee242a920 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -591,9 +591,18 @@ mod tests { .unwrap(); let batch = array_reader.next_batch(100).unwrap(); + assert_eq!(batch.data_type(), array_reader.get_data_type()); assert_eq!( batch.data_type(), - &ArrowType::Struct(arrow_schema.fields().clone()) + &ArrowType::Struct(vec![Field::new( + "table_info", + ArrowType::List(Box::new(Field::new( + "table_info", + ArrowType::Struct(vec![Field::new("name", ArrowType::Binary, false)]), + false + ))), + false + )]) ); assert_eq!(batch.len(), 0); } diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index 7ddd6443230e..2275cf08137d 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -1058,7 +1058,7 @@ mod tests { let stocks_field = Field::new( "stocks", DataType::Map( - Box::new(Field::new("entries", entries_struct_type, false)), + Box::new(Field::new("entries", entries_struct_type, true)), false, ), true, diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs index bfccfe7f908d..1e7e44e48059 100644 --- a/parquet/src/arrow/schema.rs +++ b/parquet/src/arrow/schema.rs @@ -23,22 +23,23 @@ //! //! The interfaces for converting arrow schema to parquet schema is coming. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; -use arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit}; +use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use arrow::ipc::writer; +use crate::basic::{ + LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType, +}; use crate::errors::{ParquetError::ArrowError, Result}; use crate::file::{metadata::KeyValue, properties::WriterProperties}; use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type, TypePtr}; -use crate::{ - basic::{ - ConvertedType, LogicalType, Repetition, - TimeUnit as ParquetTimeUnit, Type as PhysicalType, - }, - errors::ParquetError, -}; + +mod complex; +mod primitive; + +pub(crate) use complex::{convert_schema, ParquetField, ParquetFieldType}; /// Convert Parquet schema to Arrow schema including optional metadata. /// Attempts to decode any existing Arrow schema metadata, falling back @@ -47,15 +48,11 @@ pub fn parquet_to_arrow_schema( parquet_schema: &SchemaDescriptor, key_value_metadata: Option<&Vec>, ) -> Result { - let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); - metadata - .remove(super::ARROW_SCHEMA_META_KEY) - .map(|encoded| get_arrow_schema_from_metadata(&encoded)) - .unwrap_or(parquet_to_arrow_schema_by_columns( - parquet_schema, - 0..parquet_schema.columns().len(), - key_value_metadata, - )) + parquet_to_arrow_schema_by_columns( + parquet_schema, + 0..parquet_schema.columns().len(), + key_value_metadata, + ) } /// Convert parquet schema to arrow schema including optional metadata, @@ -122,58 +119,25 @@ where T: IntoIterator, { let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); - let arrow_schema_metadata = metadata + let maybe_schema = metadata .remove(super::ARROW_SCHEMA_META_KEY) - .map(|encoded| get_arrow_schema_from_metadata(&encoded)) - .map_or(Ok(None), |v| v.map(Some))?; + .map(|value| get_arrow_schema_from_metadata(&value)) + .transpose()?; - // add the Arrow metadata to the Parquet metadata - if let Some(arrow_schema) = &arrow_schema_metadata { + // Add the Arrow metadata to the Parquet metadata skipping keys that collide + if let Some(arrow_schema) = &maybe_schema { arrow_schema.metadata().iter().for_each(|(k, v)| { - metadata.insert(k.clone(), v.clone()); + metadata.entry(k.clone()).or_insert(v.clone()); }); } - let mut base_nodes = Vec::new(); - let mut base_nodes_set = HashSet::new(); - let mut leaves = HashSet::new(); - - enum FieldType<'a> { - Parquet(&'a Type), - Arrow(Field), + match convert_schema(parquet_schema, column_indices, maybe_schema.as_ref())? { + Some(field) => match field.arrow_type { + DataType::Struct(fields) => Ok(Schema::new_with_metadata(fields, metadata)), + _ => unreachable!(), + }, + None => Ok(Schema::new_with_metadata(vec![], metadata)), } - - for c in column_indices { - let column = parquet_schema.column(c); - let name = column.name(); - - if let Some(field) = arrow_schema_metadata - .as_ref() - .and_then(|schema| schema.field_with_name(name).ok().cloned()) - { - base_nodes.push(FieldType::Arrow(field)); - } else { - let column = column.self_type() as *const Type; - let root = parquet_schema.get_column_root(c); - let root_raw_ptr = root as *const Type; - - leaves.insert(column); - if !base_nodes_set.contains(&root_raw_ptr) { - base_nodes.push(FieldType::Parquet(root)); - base_nodes_set.insert(root_raw_ptr); - } - } - } - - base_nodes - .into_iter() - .map(|t| match t { - FieldType::Parquet(t) => ParquetTypeConverter::new(t, &leaves).to_field(), - FieldType::Arrow(f) => Ok(Some(f)), - }) - .collect::>>>() - .map(|result| result.into_iter().flatten().collect::>()) - .map(|fields| Schema::new_with_metadata(fields, metadata)) } /// Try to convert Arrow schema metadata into a schema @@ -299,14 +263,13 @@ fn parse_key_value_metadata( /// Convert parquet column schema to arrow field. pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result { - let schema = parquet_column.self_type(); + let field = complex::convert_type(parquet_column.self_type_ptr())?; - let mut leaves = HashSet::new(); - leaves.insert(parquet_column.self_type() as *const Type); - - ParquetTypeConverter::new(schema, &leaves) - .to_field() - .map(|opt| opt.unwrap()) + Ok(Field::new( + parquet_column.name(), + field.arrow_type, + field.nullable, + )) } pub fn decimal_length_from_precision(precision: usize) -> usize { @@ -385,34 +348,53 @@ fn arrow_to_parquet_type(field: &Field) -> Result { DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE) .with_repetition(repetition) .build(), - DataType::Timestamp(time_unit, zone) => Type::primitive_type_builder( - name, - PhysicalType::INT64, - ) - .with_logical_type(Some(LogicalType::Timestamp { - is_adjusted_to_u_t_c: matches!(zone, Some(z) if !z.as_str().is_empty()), - unit: match time_unit { - TimeUnit::Second => ParquetTimeUnit::MILLIS(Default::default()), - TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()), - TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()), - TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()), - }, - })) - .with_repetition(repetition) - .build(), + DataType::Timestamp(TimeUnit::Second, _) => { + // Cannot represent seconds in LogicalType + Type::primitive_type_builder(name, PhysicalType::INT64) + .with_repetition(repetition) + .build() + } + DataType::Timestamp(time_unit, _) => { + Type::primitive_type_builder(name, PhysicalType::INT64) + .with_logical_type(Some(LogicalType::Timestamp { + is_adjusted_to_u_t_c: false, + unit: match time_unit { + TimeUnit::Second => unreachable!(), + TimeUnit::Millisecond => { + ParquetTimeUnit::MILLIS(Default::default()) + } + TimeUnit::Microsecond => { + ParquetTimeUnit::MICROS(Default::default()) + } + TimeUnit::Nanosecond => { + ParquetTimeUnit::NANOS(Default::default()) + } + }, + })) + .with_repetition(repetition) + .build() + } DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32) .with_logical_type(Some(LogicalType::Date)) .with_repetition(repetition) .build(), - // date64 is cast to date32 - DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32) - .with_logical_type(Some(LogicalType::Date)) + // Cannot represent Date64 in LogicalType + DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT64) .with_repetition(repetition) .build(), - DataType::Time32(_) => Type::primitive_type_builder(name, PhysicalType::INT32) + DataType::Time32(TimeUnit::Second) => { + // Cannot represent seconds in LogicalType + Type::primitive_type_builder(name, PhysicalType::INT32) + .with_repetition(repetition) + .build() + } + DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32) .with_logical_type(Some(LogicalType::Time { is_adjusted_to_u_t_c: false, - unit: ParquetTimeUnit::MILLIS(Default::default()), + unit: match unit { + TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()), + u => unreachable!("Invalid unit for Time32: {:?}", u), + }, })) .with_repetition(repetition) .build(), @@ -430,11 +412,11 @@ fn arrow_to_parquet_type(field: &Field) -> Result { DataType::Duration(_) => Err(ArrowError( "Converting Duration to parquet not supported".to_string(), )), - DataType::Interval(_) => { + DataType::Interval(unit) => { + // No LogicalType support https://github.com/apache/parquet-format/pull/165 Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) - .with_converted_type(ConvertedType::INTERVAL) .with_repetition(repetition) - .with_length(12) + .with_length(unit.get_byte_width() as _) .build() } DataType::Binary | DataType::LargeBinary => { @@ -544,502 +526,6 @@ fn arrow_to_parquet_type(field: &Field) -> Result { } } } -/// This struct is used to group methods and data structures used to convert parquet -/// schema together. -struct ParquetTypeConverter<'a> { - schema: &'a Type, - /// This is the columns that need to be converted to arrow schema. - columns_to_convert: &'a HashSet<*const Type>, -} - -impl<'a> ParquetTypeConverter<'a> { - fn new(schema: &'a Type, columns_to_convert: &'a HashSet<*const Type>) -> Self { - Self { - schema, - columns_to_convert, - } - } - - fn clone_with_schema(&self, other: &'a Type) -> Self { - Self { - schema: other, - columns_to_convert: self.columns_to_convert, - } - } -} - -impl ParquetTypeConverter<'_> { - // Public interfaces. - - /// Converts parquet schema to arrow data type. - /// - /// This function discards schema name. - /// - /// If this schema is a primitive type and not included in the leaves, the result is - /// Ok(None). - /// - /// If this schema is a group type and none of its children is reserved in the - /// conversion, the result is Ok(None). - fn to_data_type(&self) -> Result> { - match self.schema { - Type::PrimitiveType { .. } => self.to_primitive_type(), - Type::GroupType { .. } => self.to_group_type(), - } - } - - /// Converts parquet schema to arrow field. - /// - /// This method is roughly the same as - /// [`to_data_type`](`ParquetTypeConverter::to_data_type`), except it reserves schema - /// name. - fn to_field(&self) -> Result> { - self.to_data_type().map(|opt| { - opt.map(|dt| Field::new(self.schema.name(), dt, self.is_nullable())) - }) - } - - // Utility functions. - - /// Checks whether this schema is nullable. - fn is_nullable(&self) -> bool { - let basic_info = self.schema.get_basic_info(); - if basic_info.has_repetition() { - match basic_info.repetition() { - Repetition::OPTIONAL => true, - Repetition::REPEATED => true, - Repetition::REQUIRED => false, - } - } else { - false - } - } - - fn is_repeated(&self) -> bool { - let basic_info = self.schema.get_basic_info(); - - basic_info.has_repetition() && basic_info.repetition() == Repetition::REPEATED - } - - fn is_self_included(&self) -> bool { - self.columns_to_convert - .contains(&(self.schema as *const Type)) - } - - // Functions for primitive types. - - /// Entry point for converting parquet primitive type to arrow type. - /// - /// This function takes care of repetition. - fn to_primitive_type(&self) -> Result> { - if self.is_self_included() { - self.to_primitive_type_inner().map(|dt| { - if self.is_repeated() { - Some(DataType::List(Box::new(Field::new( - self.schema.name(), - dt, - self.is_nullable(), - )))) - } else { - Some(dt) - } - }) - } else { - Ok(None) - } - } - - /// Converting parquet primitive type to arrow data type. - fn to_primitive_type_inner(&self) -> Result { - match self.schema.get_physical_type() { - PhysicalType::BOOLEAN => Ok(DataType::Boolean), - PhysicalType::INT32 => self.from_int32(), - PhysicalType::INT64 => self.from_int64(), - PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)), - PhysicalType::FLOAT => Ok(DataType::Float32), - PhysicalType::DOUBLE => Ok(DataType::Float64), - PhysicalType::BYTE_ARRAY => self.from_byte_array(), - PhysicalType::FIXED_LEN_BYTE_ARRAY => self.from_fixed_len_byte_array(), - } - } - - #[allow(clippy::wrong_self_convention)] - fn from_int32(&self) -> Result { - match ( - self.schema.get_basic_info().logical_type(), - self.schema.get_basic_info().converted_type(), - ) { - (None, ConvertedType::NONE) => Ok(DataType::Int32), - (Some(ref t @ LogicalType::Integer { - bit_width, - is_signed, - }), _) => match (bit_width, is_signed) { - (8, true) => Ok(DataType::Int8), - (16, true) => Ok(DataType::Int16), - (32, true) => Ok(DataType::Int32), - (8, false) => Ok(DataType::UInt8), - (16, false) => Ok(DataType::UInt16), - (32, false) => Ok(DataType::UInt32), - _ => Err(ArrowError(format!( - "Cannot create INT32 physical type from {:?}", - t, - ))), - }, - (Some(LogicalType::Decimal {..}), _) => Ok(self.to_decimal()), - (Some(LogicalType::Date), _) => Ok(DataType::Date32), - (Some(LogicalType::Time { unit, .. }), _) => match unit { - ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)), - _ => Err(ArrowError(format!( - "Cannot create INT32 physical type from {:?}", - unit - ))), - }, - // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null - (Some(LogicalType::Unknown), _) => Ok(DataType::Null), - (None, ConvertedType::UINT_8) => Ok(DataType::UInt8), - (None, ConvertedType::UINT_16) => Ok(DataType::UInt16), - (None, ConvertedType::UINT_32) => Ok(DataType::UInt32), - (None, ConvertedType::INT_8) => Ok(DataType::Int8), - (None, ConvertedType::INT_16) => Ok(DataType::Int16), - (None, ConvertedType::INT_32) => Ok(DataType::Int32), - (None, ConvertedType::DATE) => Ok(DataType::Date32), - (None, ConvertedType::TIME_MILLIS) => { - Ok(DataType::Time32(TimeUnit::Millisecond)) - } - (None, ConvertedType::DECIMAL) => Ok(self.to_decimal()), - (logical, converted) => Err(ArrowError(format!( - "Unable to convert parquet INT32 logical type {:?} or converted type {}", - logical, converted - ))), - } - } - - #[allow(clippy::wrong_self_convention)] - fn from_int64(&self) -> Result { - match ( - self.schema.get_basic_info().logical_type(), - self.schema.get_basic_info().converted_type(), - ) { - (None, ConvertedType::NONE) => Ok(DataType::Int64), - (Some(LogicalType::Integer { bit_width, is_signed }), _) if bit_width == 64 => { - match is_signed { - true => Ok(DataType::Int64), - false => Ok(DataType::UInt64), - } - } - (Some(LogicalType::Time { unit, .. }), _) => match unit { - ParquetTimeUnit::MILLIS(_) => Err(ArrowError( - "Cannot create INT64 from MILLIS time unit".to_string(), - )), - ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)), - ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)), - }, - (Some(LogicalType::Timestamp { is_adjusted_to_u_t_c, unit }), _) => Ok(DataType::Timestamp( - match unit { - ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond, - ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond, - ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond, - }, - if is_adjusted_to_u_t_c { - Some("UTC".to_string()) - } else { - None - }, - )), - (None, ConvertedType::INT_64) => Ok(DataType::Int64), - (None, ConvertedType::UINT_64) => Ok(DataType::UInt64), - (None, ConvertedType::TIME_MICROS) => { - Ok(DataType::Time64(TimeUnit::Microsecond)) - } - (None, ConvertedType::TIMESTAMP_MILLIS) => { - Ok(DataType::Timestamp(TimeUnit::Millisecond, None)) - } - (None, ConvertedType::TIMESTAMP_MICROS) => { - Ok(DataType::Timestamp(TimeUnit::Microsecond, None)) - } - (Some(LogicalType::Decimal {..}), _) => Ok(self.to_decimal()), - (None, ConvertedType::DECIMAL) => Ok(self.to_decimal()), - (logical, converted) => Err(ArrowError(format!( - "Unable to convert parquet INT64 logical type {:?} or converted type {}", - logical, converted - ))), - } - } - - #[allow(clippy::wrong_self_convention)] - fn from_fixed_len_byte_array(&self) -> Result { - match ( - self.schema.get_basic_info().logical_type(), - self.schema.get_basic_info().converted_type(), - ) { - (Some(LogicalType::Decimal {..}), _) => Ok(self.to_decimal()), - (None, ConvertedType::DECIMAL) => Ok(self.to_decimal()), - (None, ConvertedType::INTERVAL) => { - // There is currently no reliable way of determining which IntervalUnit - // to return. Thus without the original Arrow schema, the results - // would be incorrect if all 12 bytes of the interval are populated - Ok(DataType::Interval(IntervalUnit::DayTime)) - } - _ => { - let byte_width = match self.schema { - Type::PrimitiveType { - ref type_length, .. - } => *type_length, - _ => { - return Err(ArrowError( - "Expected a physical type, not a group type".to_string(), - )) - } - }; - - Ok(DataType::FixedSizeBinary(byte_width)) - } - } - } - - fn to_decimal(&self) -> DataType { - assert!(self.schema.is_primitive()); - DataType::Decimal( - self.schema.get_precision() as usize, - self.schema.get_scale() as usize, - ) - } - - #[allow(clippy::wrong_self_convention)] - fn from_byte_array(&self) -> Result { - match (self.schema.get_basic_info().logical_type(), self.schema.get_basic_info().converted_type()) { - (Some(LogicalType::String), _) => Ok(DataType::Utf8), - (Some(LogicalType::Json), _) => Ok(DataType::Binary), - (Some(LogicalType::Bson), _) => Ok(DataType::Binary), - (Some(LogicalType::Enum), _) => Ok(DataType::Binary), - (None, ConvertedType::NONE) => Ok(DataType::Binary), - (None, ConvertedType::JSON) => Ok(DataType::Binary), - (None, ConvertedType::BSON) => Ok(DataType::Binary), - (None, ConvertedType::ENUM) => Ok(DataType::Binary), - (None, ConvertedType::UTF8) => Ok(DataType::Utf8), - (logical, converted) => Err(ArrowError(format!( - "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}", - logical, converted - ))), - } - } - - // Functions for group types. - - /// Entry point for converting parquet group type. - /// - /// This function takes care of logical type and repetition. - fn to_group_type(&self) -> Result> { - match ( - self.schema.get_basic_info().logical_type(), - self.schema.get_basic_info().converted_type(), - ) { - (Some(LogicalType::List), _) | (_, ConvertedType::LIST) => self.to_list(), - (Some(LogicalType::Map), _) - | (_, ConvertedType::MAP) - | (_, ConvertedType::MAP_KEY_VALUE) => self.to_map(), - (_, _) => { - if self.is_repeated() { - self.to_struct().map(|opt| { - opt.map(|dt| { - DataType::List(Box::new(Field::new( - self.schema.name(), - dt, - self.is_nullable(), - ))) - }) - }) - } else { - self.to_struct() - } - } - } - } - - /// Converts a parquet group type to arrow struct. - fn to_struct(&self) -> Result> { - match self.schema { - Type::PrimitiveType { .. } => Err(ParquetError::General(format!( - "{:?} is a struct type, and can't be processed as primitive.", - self.schema - ))), - Type::GroupType { - basic_info: _, - fields, - } => fields - .iter() - .map(|field_ptr| self.clone_with_schema(field_ptr).to_field()) - .collect::>>>() - .map(|result| result.into_iter().flatten().collect::>()) - .map(|fields| { - if fields.is_empty() { - None - } else { - Some(DataType::Struct(fields)) - } - }), - } - } - - /// Converts a parquet list to arrow list. - /// - /// To fully understand this algorithm, please refer to - /// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md). - fn to_list(&self) -> Result> { - match self.schema { - Type::PrimitiveType { .. } => Err(ParquetError::General(format!( - "{:?} is a list type and can't be processed as primitive.", - self.schema - ))), - Type::GroupType { - basic_info: _, - fields, - } if fields.len() == 1 => { - let list_item = fields.first().unwrap(); - let item_converter = self.clone_with_schema(list_item); - - let item_type = match list_item.as_ref() { - Type::PrimitiveType { .. } => { - if item_converter.is_repeated() { - item_converter.to_primitive_type_inner().map(Some) - } else { - Err(ArrowError( - "Primitive element type of list must be repeated." - .to_string(), - )) - } - } - Type::GroupType { - basic_info: _, - fields, - } => { - if fields.len() > 1 { - item_converter.to_struct() - } else if fields.len() == 1 - && list_item.name() != "array" - && list_item.name() != format!("{}_tuple", self.schema.name()) - { - let nested_item = fields.first().unwrap(); - let nested_item_converter = - self.clone_with_schema(nested_item); - - nested_item_converter.to_data_type() - } else { - item_converter.to_struct() - } - } - }; - - // Check that the name of the list child is "list", in which case we - // get the child nullability and name (normally "element") from the nested - // group type. - // Without this step, the child incorrectly inherits the parent's optionality - let (list_item_name, item_is_optional) = match &item_converter.schema { - Type::GroupType { basic_info, fields } - if basic_info.name() == "list" && fields.len() == 1 => - { - let field = fields.first().unwrap(); - (field.name(), field.is_optional()) - } - _ => (list_item.name(), list_item.is_optional()), - }; - - item_type.map(|opt| { - opt.map(|dt| { - DataType::List(Box::new(Field::new( - list_item_name, - dt, - item_is_optional, - ))) - }) - }) - } - _ => Err(ArrowError( - "Group element type of list can only contain one field.".to_string(), - )), - } - } - - /// Converts a parquet map to arrow map. - /// - /// To fully understand this algorithm, please refer to - /// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md). - fn to_map(&self) -> Result> { - match self.schema { - Type::PrimitiveType { .. } => Err(ParquetError::General(format!( - "{:?} is a map type and can't be processed as primitive.", - self.schema - ))), - Type::GroupType { - basic_info: _, - fields, - } if fields.len() == 1 => { - let key_item = fields.first().unwrap(); - - let (key_type, value_type) = match key_item.as_ref() { - Type::PrimitiveType { .. } => { - return Err(ArrowError( - "A map can only have a group child type (key_values)." - .to_string(), - )) - } - Type::GroupType { - basic_info: _, - fields, - } => { - if fields.len() != 2 { - return Err(ArrowError(format!("Map type should have 2 fields, a key and value. Found {} fields", fields.len()))); - } else { - let nested_key = fields.first().unwrap(); - let nested_key_converter = self.clone_with_schema(nested_key); - - let nested_value = fields.last().unwrap(); - let nested_value_converter = - self.clone_with_schema(nested_value); - - ( - nested_key_converter.to_data_type()?.map(|d| { - Field::new( - nested_key.name(), - d, - nested_key.is_optional(), - ) - }), - nested_value_converter.to_data_type()?.map(|d| { - Field::new( - nested_value.name(), - d, - nested_value.is_optional(), - ) - }), - ) - } - } - }; - - match (key_type, value_type) { - (Some(key), Some(value)) => Ok(Some(DataType::Map( - Box::new(Field::new( - key_item.name(), - DataType::Struct(vec![key, value]), - self.schema.is_optional(), - )), - false, // There is no information to tell if keys are sorted - ))), - (None, None) => Ok(None), - (None, Some(_)) => Err(ArrowError( - "Could not convert the map key to a valid datatype".to_string(), - )), - (Some(_), None) => Err(ArrowError( - "Could not convert the map value to a valid datatype".to_string(), - )), - } - } - _ => Err(ArrowError( - "Group element type of map can only contain one field.".to_string(), - )), - } - } -} #[cfg(test)] mod tests { @@ -1261,7 +747,7 @@ mod tests { { arrow_fields.push(Field::new( "my_list", - DataType::List(Box::new(Field::new("element", DataType::Utf8, true))), + DataType::List(Box::new(Field::new("str", DataType::Utf8, false))), true, )); } @@ -1273,7 +759,7 @@ mod tests { { arrow_fields.push(Field::new( "my_list", - DataType::List(Box::new(Field::new("element", DataType::Int32, true))), + DataType::List(Box::new(Field::new("element", DataType::Int32, false))), true, )); } @@ -1292,7 +778,7 @@ mod tests { ]); arrow_fields.push(Field::new( "my_list", - DataType::List(Box::new(Field::new("element", arrow_struct, true))), + DataType::List(Box::new(Field::new("element", arrow_struct, false))), true, )); } @@ -1309,7 +795,7 @@ mod tests { DataType::Struct(vec![Field::new("str", DataType::Utf8, false)]); arrow_fields.push(Field::new( "my_list", - DataType::List(Box::new(Field::new("array", arrow_struct, true))), + DataType::List(Box::new(Field::new("array", arrow_struct, false))), true, )); } @@ -1326,7 +812,11 @@ mod tests { DataType::Struct(vec![Field::new("str", DataType::Utf8, false)]); arrow_fields.push(Field::new( "my_list", - DataType::List(Box::new(Field::new("my_list_tuple", arrow_struct, true))), + DataType::List(Box::new(Field::new( + "my_list_tuple", + arrow_struct, + false, + ))), true, )); } @@ -1336,8 +826,8 @@ mod tests { { arrow_fields.push(Field::new( "name", - DataType::List(Box::new(Field::new("name", DataType::Int32, true))), - true, + DataType::List(Box::new(Field::new("name", DataType::Int32, false))), + false, )); } @@ -1350,7 +840,7 @@ mod tests { assert_eq!(arrow_fields.len(), converted_fields.len()); for i in 0..arrow_fields.len() { - assert_eq!(arrow_fields[i], converted_fields[i]); + assert_eq!(arrow_fields[i], converted_fields[i], "{}", i); } } @@ -1642,15 +1132,15 @@ mod tests { fn test_nested_schema_partial_ordering() { let mut arrow_fields = Vec::new(); { + let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)]; + let group1 = Field::new("group1", DataType::Struct(group1_fields), false); + arrow_fields.push(group1); + let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)]; let group2 = Field::new("group2", DataType::Struct(group2_fields), false); arrow_fields.push(group2); arrow_fields.push(Field::new("leaf5", DataType::Int64, false)); - - let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)]; - let group1 = Field::new("group1", DataType::Struct(group1_fields), false); - arrow_fields.push(group1); } let message_type = " @@ -1679,7 +1169,7 @@ mod tests { let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); let converted_arrow_schema = - parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 4, 0], None) + parquet_to_arrow_schema_by_columns(&parquet_schema, vec![0, 3, 4], None) .unwrap(); let converted_fields = converted_arrow_schema.fields(); @@ -1700,9 +1190,9 @@ mod tests { DataType::List(Box::new(Field::new( "innerGroup", DataType::Struct(vec![Field::new("leaf3", DataType::Int32, true)]), - true, + false, ))), - true, + false, ); let outer_group_list = Field::new( @@ -1713,9 +1203,9 @@ mod tests { Field::new("leaf2", DataType::Int32, true), inner_group_list, ]), - true, + false, ))), - true, + false, ); arrow_fields.push(outer_group_list); } @@ -1790,8 +1280,8 @@ mod tests { Field::new("string", DataType::Utf8, true), Field::new( "bools", - DataType::List(Box::new(Field::new("bools", DataType::Boolean, true))), - true, + DataType::List(Box::new(Field::new("bools", DataType::Boolean, false))), + false, ), Field::new("date", DataType::Date32, true), Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true), @@ -2179,7 +1669,6 @@ mod tests { } #[test] - #[ignore = "Roundtrip of lists currently fails because we don't check their types correctly in the Arrow schema"] fn test_arrow_schema_roundtrip_lists() -> Result<()> { let metadata: HashMap = [("Key".to_string(), "Value".to_string())] diff --git a/parquet/src/arrow/schema/complex.rs b/parquet/src/arrow/schema/complex.rs new file mode 100644 index 000000000000..7cc3d1267721 --- /dev/null +++ b/parquet/src/arrow/schema/complex.rs @@ -0,0 +1,563 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::schema::primitive::convert_primitive; +use crate::basic::{ConvertedType, Repetition}; +use crate::errors::ParquetError; +use crate::errors::Result; +use crate::schema::types::{SchemaDescriptor, Type, TypePtr}; +use arrow::datatypes::{DataType, Field, Schema}; + +fn get_repetition(t: &Type) -> Repetition { + let info = t.get_basic_info(); + match info.has_repetition() { + true => info.repetition(), + false => Repetition::REQUIRED, + } +} + +/// Representation of a parquet file, in terms of arrow schema elements +pub struct ParquetField { + pub rep_level: i16, + pub def_level: i16, + pub nullable: bool, + pub arrow_type: DataType, + pub field_type: ParquetFieldType, +} + +impl ParquetField { + fn to_list(self, name: &str) -> Self { + ParquetField { + rep_level: self.rep_level, + def_level: self.def_level, + nullable: false, + arrow_type: DataType::List(Box::new(Field::new( + name, + self.arrow_type.clone(), + false, + ))), + field_type: ParquetFieldType::Group { + children: vec![self], + }, + } + } + + pub fn children(&self) -> Option<&[ParquetField]> { + match &self.field_type { + ParquetFieldType::Primitive { .. } => None, + ParquetFieldType::Group { children } => Some(&children), + } + } +} + +pub enum ParquetFieldType { + Primitive { + col_idx: usize, + primitive_type: TypePtr, + }, + Group { + children: Vec, + }, +} + +struct VisitorContext { + rep_level: i16, + def_level: i16, + /// An optional [`DataType`] sourced from the embedded arrow schema + data_type: Option, +} + +impl VisitorContext { + fn levels(&self, repetition: Repetition) -> (i16, i16, bool) { + match repetition { + Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true), + Repetition::REQUIRED => (self.def_level, self.rep_level, false), + Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false), + } + } +} + +/// Walks the parquet schema in a depth-first fashion in order to extract the +/// necessary information to map it to arrow data structures +/// +/// See [Logical Types] for more information on the conversion algorithm +/// +/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md +struct Visitor { + /// The column index of the next leaf column + next_col_idx: usize, + + /// Mask of columns to include + column_mask: Vec, +} + +impl Visitor { + fn visit_primitive( + &mut self, + primitive_type: TypePtr, + context: VisitorContext, + ) -> Result> { + let col_idx = self.next_col_idx; + self.next_col_idx += 1; + + if !self.column_mask[col_idx] { + return Ok(None); + } + + let repetition = get_repetition(&primitive_type); + let (def_level, rep_level, nullable) = context.levels(repetition); + + let arrow_type = convert_primitive(&primitive_type, context.data_type)?; + + let primitive_field = ParquetField { + rep_level, + def_level, + nullable, + arrow_type, + field_type: ParquetFieldType::Primitive { + primitive_type: primitive_type.clone(), + col_idx, + }, + }; + + Ok(Some(match repetition { + Repetition::REPEATED => primitive_field.to_list(primitive_type.name()), + _ => primitive_field, + })) + } + + fn visit_struct( + &mut self, + struct_type: TypePtr, + context: VisitorContext, + ) -> Result> { + // The root type will not have a repetition level + let repetition = get_repetition(&struct_type); + let (def_level, rep_level, nullable) = context.levels(repetition); + + let parquet_fields = struct_type.get_fields(); + + // Extract the arrow fields + let arrow_fields = match &context.data_type { + Some(DataType::Struct(fields)) => { + if fields.len() != parquet_fields.len() { + return Err(arrow_err!( + "incompatible arrow schema, expected {} struct fields got {}", + parquet_fields.len(), + fields.len() + )); + } + Some(fields) + } + Some(d) => { + return Err(arrow_err!( + "incompatible arrow schema, expected struct got {}", + d + )) + } + None => None, + }; + + let mut child_fields = Vec::with_capacity(parquet_fields.len()); + let mut children = Vec::with_capacity(parquet_fields.len()); + + // Perform a DFS of children + for (idx, parquet_field) in parquet_fields.iter().enumerate() { + let data_type = match arrow_fields { + Some(fields) => { + let field = &fields[idx]; + if field.name() != parquet_field.name() { + return Err(arrow_err!( + "incompatible arrow schema, expected field named {} got {}", + parquet_field.name(), + field.name() + )); + } + Some(field.data_type().clone()) + } + None => None, + }; + + let arrow_field = arrow_fields.map(|x| &x[idx]); + let child_ctx = VisitorContext { + rep_level, + def_level, + data_type, + }; + + if let Some(child) = self.dispatch(parquet_field.clone(), child_ctx)? { + // The child type returned may be different from what is encoded in the arrow + // schema in the event of a mismatch or a projection + child_fields.push(convert_field(parquet_field, &child, arrow_field)); + children.push(child); + } + } + + if children.is_empty() { + return Ok(None); + } + + let struct_field = ParquetField { + rep_level, + def_level, + nullable, + arrow_type: DataType::Struct(child_fields), + field_type: ParquetFieldType::Group { children }, + }; + + Ok(Some(match repetition { + Repetition::REPEATED => struct_field.to_list(struct_type.name()), + _ => struct_field, + })) + } + + fn visit_map( + &mut self, + map_type: TypePtr, + context: VisitorContext, + ) -> Result> { + let rep_level = context.rep_level + 1; + let (def_level, nullable) = match map_type.get_basic_info().repetition() { + Repetition::REQUIRED => (context.def_level + 1, false), + Repetition::OPTIONAL => (context.def_level + 2, true), + Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")), + }; + + if map_type.get_fields().len() != 1 { + return Err(arrow_err!( + "Map field must have exactly one key_value child, found {}", + map_type.get_fields().len() + )); + } + + // Add map entry (key_value) to context + let map_key_value = &map_type.get_fields()[0]; + if map_key_value.get_basic_info().repetition() != Repetition::REPEATED { + return Err(arrow_err!("Child of map field must be repeated")); + } + + if map_key_value.get_fields().len() != 2 { + // According to the specification the values are optional (#1642) + return Err(arrow_err!( + "Child of map field must have two children, found {}", + map_key_value.get_fields().len() + )); + } + + // Get key and value, and create context for each + let map_key = &map_key_value.get_fields()[0]; + let map_value = &map_key_value.get_fields()[1]; + + if map_key.get_basic_info().repetition() != Repetition::REQUIRED { + return Err(arrow_err!("Map keys must be required")); + } + + if map_value.get_basic_info().repetition() == Repetition::REPEATED { + return Err(arrow_err!("Map values cannot be repeated")); + } + + // Extract the arrow fields + let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type { + Some(DataType::Map(field, sorted)) => match field.data_type() { + DataType::Struct(fields) => { + if fields.len() != 2 { + return Err(arrow_err!( + "Map data type should contain struct with two children, got {}", + fields.len() + )); + } + + (Some(field), Some(&fields[0]), Some(&fields[1]), *sorted) + } + d => { + return Err(arrow_err!( + "Map data type should contain struct got {}", + d + )); + } + }, + Some(d) => { + return Err(arrow_err!( + "incompatible arrow schema, expected map got {}", + d + )) + } + None => (None, None, None, false), + }; + + let maybe_key = { + let context = VisitorContext { + rep_level, + def_level, + data_type: arrow_key.map(|x| x.data_type().clone()), + }; + + self.dispatch(map_key.clone(), context)? + }; + + let maybe_value = { + let context = VisitorContext { + rep_level, + def_level, + data_type: arrow_value.map(|x| x.data_type().clone()), + }; + + self.dispatch(map_value.clone(), context)? + }; + + // Need both columns to be projected + match (maybe_key, maybe_value) { + (Some(key), Some(value)) => { + let key_field = convert_field(map_key, &key, arrow_key); + let value_field = convert_field(map_value, &value, arrow_value); + + let map_field = Field::new( + map_key_value.name(), + DataType::Struct(vec![key_field, value_field]), + nullable, + ) + .with_metadata(arrow_map.and_then(|f| f.metadata().cloned())); + + Ok(Some(ParquetField { + rep_level, + def_level, + nullable, + arrow_type: DataType::Map(Box::new(map_field), sorted), + field_type: ParquetFieldType::Group { + children: vec![key, value], + }, + })) + } + _ => Ok(None), + } + } + + fn visit_list( + &mut self, + list_type: TypePtr, + context: VisitorContext, + ) -> Result> { + if list_type.is_primitive() { + return Err(arrow_err!( + "{:?} is a list type and can't be processed as primitive.", + list_type + )); + } + + let fields = list_type.get_fields(); + if fields.len() != 1 { + return Err(arrow_err!( + "list type must have a single child, found {}", + fields.len() + )); + } + + let repeated_field = &fields[0]; + if get_repetition(&repeated_field) != Repetition::REPEATED { + return Err(arrow_err!("List child must be repeated")); + } + + // If the list is nullable + let (def_level, nullable) = match list_type.get_basic_info().repetition() { + Repetition::REQUIRED => (context.def_level, false), + Repetition::OPTIONAL => (context.def_level + 1, true), + Repetition::REPEATED => { + return Err(arrow_err!("List type cannot be repeated")) + } + }; + + let arrow_field = match &context.data_type { + Some(DataType::List(f)) => Some(f.as_ref()), + Some(DataType::LargeList(f)) => Some(f.as_ref()), + Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()), + Some(d) => { + return Err(arrow_err!( + "incompatible arrow schema, expected list got {}", + d + )) + } + None => None, + }; + + if repeated_field.is_primitive() { + // If the repeated field is not a group, then its type is the element type and elements are required. + let context = VisitorContext { + rep_level: context.rep_level, + def_level, + data_type: arrow_field.map(|f| f.data_type().clone()), + }; + + return match self.visit_primitive(repeated_field.clone(), context) { + Ok(Some(mut field)) => { + field.nullable = nullable; + Ok(Some(field)) + } + r => r, + }; + } + + let items = repeated_field.get_fields(); + if items.len() != 1 + || repeated_field.name() == "array" + || repeated_field.name() == format!("{}_tuple", list_type.name()) + { + // If the repeated field is a group with multiple fields, then its type is the element type and elements are required. + // + // If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name + // with _tuple appended then the repeated type is the element type and elements are required. + let context = VisitorContext { + rep_level: context.rep_level, + def_level, + data_type: arrow_field.map(|f| f.data_type().clone()), + }; + + return match self.visit_struct(repeated_field.clone(), context) { + Ok(Some(mut field)) => { + field.nullable = nullable; + Ok(Some(field)) + } + r => r, + }; + } + + // Regular list handling logic + let item_type = &items[0]; + let rep_level = context.rep_level + 1; + let def_level = def_level + 1; + + let new_context = VisitorContext { + def_level, + rep_level, + data_type: arrow_field.map(|f| f.data_type().clone()), + }; + + match self.dispatch(item_type.clone(), new_context) { + Ok(Some(item)) => { + let item_field = Box::new(convert_field(item_type, &item, arrow_field)); + + // Use arrow type as hint for index size + let arrow_type = match context.data_type { + Some(DataType::LargeList(_)) => DataType::LargeList(item_field), + Some(DataType::FixedSizeList(_, len)) => { + DataType::FixedSizeList(item_field, len) + } + _ => DataType::List(item_field), + }; + + Ok(Some(ParquetField { + rep_level, + def_level, + nullable, + arrow_type, + field_type: ParquetFieldType::Group { + children: vec![item], + }, + })) + } + r => r, + } + } + + fn dispatch( + &mut self, + cur_type: TypePtr, + context: VisitorContext, + ) -> Result> { + if cur_type.is_primitive() { + self.visit_primitive(cur_type, context) + } else { + match cur_type.get_basic_info().converted_type() { + ConvertedType::LIST => self.visit_list(cur_type, context), + ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => { + self.visit_map(cur_type, context) + } + _ => self.visit_struct(cur_type, context), + } + } + } +} + +fn convert_field( + parquet_type: &Type, + field: &ParquetField, + arrow_hint: Option<&Field>, +) -> Field { + let name = parquet_type.name(); + let data_type = field.arrow_type.clone(); + let nullable = field.nullable; + + match arrow_hint { + Some(hint) => { + let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) { + (DataType::Dictionary(_, _), Some(id), Some(ordered)) => { + Field::new_dict(name, data_type, nullable, id, ordered) + } + _ => Field::new(name, data_type, nullable), + }; + + field.with_metadata(hint.metadata().cloned()) + } + None => Field::new(name, data_type, nullable), + } +} + +/// Computes the [`ParquetField`] for the provided [`FileMetaData`] with `leaf_columns` listing +/// the indexes of leaf columns to project +pub fn convert_schema>( + schema: &SchemaDescriptor, + leaf_columns: T, + embedded_arrow_schema: Option<&Schema>, +) -> Result> { + let mut leaf_mask = vec![false; schema.num_columns()]; + let mut last_idx = 0; + for i in leaf_columns { + if i < last_idx { + return Err(general_err!("out of order projection is not supported")); + } + last_idx = i; + leaf_mask[i] = true; + } + + let mut visitor = Visitor { + next_col_idx: 0, + column_mask: leaf_mask, + }; + + let context = VisitorContext { + rep_level: 0, + def_level: 0, + data_type: embedded_arrow_schema.map(|s| DataType::Struct(s.fields().clone())), + }; + + visitor.dispatch(schema.root_schema_ptr(), context) +} + +/// Computes the [`ParquetField`] for the provided `parquet_type` +pub fn convert_type(parquet_type: TypePtr) -> Result { + let mut visitor = Visitor { + next_col_idx: 0, + column_mask: vec![true], + }; + + let context = VisitorContext { + rep_level: 0, + def_level: 0, + data_type: None, + }; + + Ok(visitor.dispatch(parquet_type, context)?.unwrap()) +} diff --git a/parquet/src/arrow/schema/primitive.rs b/parquet/src/arrow/schema/primitive.rs new file mode 100644 index 000000000000..234fcc936c1e --- /dev/null +++ b/parquet/src/arrow/schema/primitive.rs @@ -0,0 +1,251 @@ +use crate::basic::{ + ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType, +}; +use crate::errors::{ParquetError, Result}; +use crate::schema::types::{BasicTypeInfo, Type}; +use arrow::datatypes::{DataType, IntervalUnit, TimeUnit}; + +/// Converts [`Type`] to [`DataType`] with an optional `arrow_type_hint` +/// provided by the arrow schema +/// +/// Note: the values embedded in the schema are advisory, +pub fn convert_primitive( + parquet_type: &Type, + arrow_type_hint: Option, +) -> Result { + let physical_type = from_parquet(parquet_type)?; + Ok(match arrow_type_hint { + Some(hint) => apply_hint(physical_type, hint), + None => physical_type, + }) +} + +fn apply_hint(parquet: DataType, hint: DataType) -> DataType { + match (&parquet, &hint) { + // Not all time units can be represented as LogicalType / ConvertedType + (DataType::Int32 | DataType::Int64, DataType::Timestamp(_, _)) => hint, + (DataType::Int32, DataType::Time32(_)) => hint, + (DataType::Int64, DataType::Time64(_)) => hint, + + // Date64 doesn't have a corresponding LogicalType / ConvertedType + (DataType::Int64, DataType::Date64) => hint, + + // Determine timezone + (DataType::Timestamp(p, None), DataType::Timestamp(h, Some(_))) if p == h => hint, + + // Determine offset size + (DataType::Utf8, DataType::LargeUtf8) => hint, + (DataType::Binary, DataType::LargeBinary) => hint, + + // Deduce interval type + (DataType::FixedSizeBinary(4), DataType::Interval(IntervalUnit::YearMonth)) => { + hint + } + (DataType::FixedSizeBinary(8), DataType::Interval(IntervalUnit::DayTime)) => hint, + ( + DataType::FixedSizeBinary(16), + DataType::Interval(IntervalUnit::MonthDayNano), + ) => hint, + + // Potentially preserve dictionary encoding + (_, DataType::Dictionary(_, value)) => { + // Apply hint to inner type + let hinted = apply_hint(parquet, value.as_ref().clone()); + + // If matches dictionary value - preserve dictionary + // otherwise use hinted inner type + match &hinted == value.as_ref() { + true => hint, + false => hinted, + } + } + _ => parquet, + } +} + +fn from_parquet(parquet_type: &Type) -> Result { + match parquet_type { + Type::PrimitiveType { + physical_type, + basic_info, + type_length, + scale, + precision, + .. + } => match physical_type { + PhysicalType::BOOLEAN => Ok(DataType::Boolean), + PhysicalType::INT32 => from_int32(basic_info, *scale, *precision), + PhysicalType::INT64 => from_int64(basic_info, *scale, *precision), + PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)), + PhysicalType::FLOAT => Ok(DataType::Float32), + PhysicalType::DOUBLE => Ok(DataType::Float64), + PhysicalType::BYTE_ARRAY => from_byte_array(basic_info), + PhysicalType::FIXED_LEN_BYTE_ARRAY => { + from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length) + } + }, + Type::GroupType { .. } => unreachable!(), + } +} + +fn decimal_type(scale: i32, precision: i32) -> Result { + let scale = scale + .try_into() + .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?; + + let precision = precision + .try_into() + .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?; + + Ok(DataType::Decimal(precision, scale)) +} + +fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result { + match (info.logical_type(), info.converted_type()) { + (None, ConvertedType::NONE) => Ok(DataType::Int32), + ( + Some( + ref t @ LogicalType::Integer { + bit_width, + is_signed, + }, + ), + _, + ) => match (bit_width, is_signed) { + (8, true) => Ok(DataType::Int8), + (16, true) => Ok(DataType::Int16), + (32, true) => Ok(DataType::Int32), + (8, false) => Ok(DataType::UInt8), + (16, false) => Ok(DataType::UInt16), + (32, false) => Ok(DataType::UInt32), + _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)), + }, + (Some(LogicalType::Decimal { scale, precision }), _) => { + decimal_type(scale, precision) + } + (Some(LogicalType::Date), _) => Ok(DataType::Date32), + (Some(LogicalType::Time { unit, .. }), _) => match unit { + ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)), + _ => Err(arrow_err!( + "Cannot create INT32 physical type from {:?}", + unit + )), + }, + // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null + (Some(LogicalType::Unknown), _) => Ok(DataType::Null), + (None, ConvertedType::UINT_8) => Ok(DataType::UInt8), + (None, ConvertedType::UINT_16) => Ok(DataType::UInt16), + (None, ConvertedType::UINT_32) => Ok(DataType::UInt32), + (None, ConvertedType::INT_8) => Ok(DataType::Int8), + (None, ConvertedType::INT_16) => Ok(DataType::Int16), + (None, ConvertedType::INT_32) => Ok(DataType::Int32), + (None, ConvertedType::DATE) => Ok(DataType::Date32), + (None, ConvertedType::TIME_MILLIS) => Ok(DataType::Time32(TimeUnit::Millisecond)), + (None, ConvertedType::DECIMAL) => decimal_type(scale, precision), + (logical, converted) => Err(arrow_err!( + "Unable to convert parquet INT32 logical type {:?} or converted type {}", + logical, + converted + )), + } +} + +fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result { + match (info.logical_type(), info.converted_type()) { + (None, ConvertedType::NONE) => Ok(DataType::Int64), + ( + Some(LogicalType::Integer { + bit_width, + is_signed, + }), + _, + ) if bit_width == 64 => match is_signed { + true => Ok(DataType::Int64), + false => Ok(DataType::UInt64), + }, + (Some(LogicalType::Time { unit, .. }), _) => match unit { + ParquetTimeUnit::MILLIS(_) => { + Err(arrow_err!("Cannot create INT64 from MILLIS time unit",)) + } + ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)), + ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)), + }, + ( + Some(LogicalType::Timestamp { + is_adjusted_to_u_t_c, + unit, + }), + _, + ) => Ok(DataType::Timestamp( + match unit { + ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond, + ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond, + ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond, + }, + if is_adjusted_to_u_t_c { + Some("UTC".to_string()) + } else { + None + }, + )), + (None, ConvertedType::INT_64) => Ok(DataType::Int64), + (None, ConvertedType::UINT_64) => Ok(DataType::UInt64), + (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)), + (None, ConvertedType::TIMESTAMP_MILLIS) => { + Ok(DataType::Timestamp(TimeUnit::Millisecond, None)) + } + (None, ConvertedType::TIMESTAMP_MICROS) => { + Ok(DataType::Timestamp(TimeUnit::Microsecond, None)) + } + (Some(LogicalType::Decimal { scale, precision }), _) => { + decimal_type(scale, precision) + } + (None, ConvertedType::DECIMAL) => decimal_type(scale, precision), + (logical, converted) => Err(arrow_err!( + "Unable to convert parquet INT64 logical type {:?} or converted type {}", + logical, + converted + )), + } +} + +fn from_byte_array(info: &BasicTypeInfo) -> Result { + match (info.logical_type(), info.converted_type()) { + (Some(LogicalType::String), _) => Ok(DataType::Utf8), + (Some(LogicalType::Json), _) => Ok(DataType::Binary), + (Some(LogicalType::Bson), _) => Ok(DataType::Binary), + (Some(LogicalType::Enum), _) => Ok(DataType::Binary), + (None, ConvertedType::NONE) => Ok(DataType::Binary), + (None, ConvertedType::JSON) => Ok(DataType::Binary), + (None, ConvertedType::BSON) => Ok(DataType::Binary), + (None, ConvertedType::ENUM) => Ok(DataType::Binary), + (None, ConvertedType::UTF8) => Ok(DataType::Utf8), + (logical, converted) => Err(arrow_err!( + "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}", + logical, + converted + )), + } +} + +fn from_fixed_len_byte_array( + info: &BasicTypeInfo, + scale: i32, + precision: i32, + type_length: i32, +) -> Result { + // TODO: This should check the type length for the decimal and interval types + match (info.logical_type(), info.converted_type()) { + (Some(LogicalType::Decimal { scale, precision }), _) => { + decimal_type(scale, precision) + } + (None, ConvertedType::DECIMAL) => decimal_type(scale, precision), + (None, ConvertedType::INTERVAL) => { + // There is currently no reliable way of determining which IntervalUnit + // to return. Thus without the original Arrow schema, the results + // would be incorrect if all 12 bytes of the interval are populated + Ok(DataType::Interval(IntervalUnit::DayTime)) + } + _ => Ok(DataType::FixedSizeBinary(type_length)), + } +} diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs index be1a22192954..fcbb846f110f 100644 --- a/parquet/src/errors.rs +++ b/parquet/src/errors.rs @@ -135,6 +135,14 @@ macro_rules! eof_err { ($fmt:expr, $($args:expr),*) => (ParquetError::EOF(format!($fmt, $($args),*))); } +macro_rules! arrow_err { + ($fmt:expr) => (ParquetError::ArrowError($fmt.to_owned())); + ($fmt:expr, $($args:expr),*) => (ParquetError::ArrowError(format!($fmt, $($args),*))); + ($e:expr, $fmt:expr) => (ParquetError::ArrowError($fmt.to_owned(), $e)); + ($e:ident, $fmt:expr, $($args:tt),*) => ( + ParquetError::ArrowError(&format!($fmt, $($args),*), $e)); +} + // ---------------------------------------------------------------------- // Convert parquet error into other errors