From 6f839432b10a354c294acd876428f2bc7fcebb26 Mon Sep 17 00:00:00 2001 From: baishen Date: Fri, 24 Feb 2023 22:12:52 +0800 Subject: [PATCH 01/18] feat: implement map data type read and write --- src/io/parquet/read/deserialize/mod.rs | 29 ++++- src/io/parquet/read/deserialize/nested.rs | 16 ++- src/io/parquet/write/mod.rs | 14 ++- src/io/parquet/write/pages.rs | 124 +++++++++++++++++++++- src/io/parquet/write/schema.rs | 15 +++ 5 files changed, 185 insertions(+), 13 deletions(-) diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index c6d7bd3521a..18e1200b0a6 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -15,7 +15,7 @@ use parquet2::read::get_page_iterator as _get_page_iterator; use parquet2::schema::types::PrimitiveType; use crate::{ - array::{Array, DictionaryKey, FixedSizeListArray, ListArray}, + array::{Array, DictionaryKey, FixedSizeListArray, ListArray, MapArray}, datatypes::{DataType, Field, IntervalUnit}, error::Result, offset::Offsets, @@ -87,6 +87,33 @@ pub fn create_list( } } +/// Creates a new [`MapArray`]. +pub fn create_map( + data_type: DataType, + nested: &mut NestedState, + values: Box, +) -> Box { + let (mut offsets, validity) = nested.nested.pop().unwrap().inner(); + match data_type.to_logical_type() { + DataType::Map(_, _) => { + offsets.push(values.len() as i64); + let offsets = offsets.iter().map(|x| *x as i32).collect::>(); + + let offsets: Offsets = offsets + .try_into() + .expect("i64 offsets do not fit in i32 offsets"); + + Box::new(MapArray::new( + data_type, + offsets.into(), + values, + validity.and_then(|x| x.into()), + )) + } + _ => unreachable!(), + } +} + fn is_primitive(data_type: &DataType) -> bool { matches!( data_type.to_physical_type(), diff --git a/src/io/parquet/read/deserialize/nested.rs b/src/io/parquet/read/deserialize/nested.rs index de06a60fd65..c4175eecf56 100644 --- a/src/io/parquet/read/deserialize/nested.rs +++ b/src/io/parquet/read/deserialize/nested.rs @@ -283,16 +283,12 @@ where num_rows, chunk_size, )?; - Box::new(iter.map(move |x| { - let (nested, inner) = x?; - let array = MapArray::new( - field.data_type().clone(), - vec![0, inner.len() as i32].try_into().unwrap(), - inner, - None, - ); - Ok((nested, array.boxed())) - })) + let iter = iter.map(move |x| { + let (mut nested, array) = x?; + let array = create_map(field.data_type().clone(), &mut nested, array); + Ok((nested, array)) + }); + Box::new(iter) as _ } other => { return Err(Error::nyi(format!( diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index a87dfd54975..0d928748793 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -633,8 +633,20 @@ fn transverse_recursive T + Clone>( unreachable!() } } + Map => { + if let DataType::Map(field, _) = data_type.to_logical_type() { + if let DataType::Struct(fields) = field.data_type.to_logical_type() { + for field in fields { + transverse_recursive(&field.data_type, map.clone(), encodings) + } + } else { + unreachable!() + } + } else { + unreachable!() + } + } Union => todo!(), - Map => todo!(), } } diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index 5012c0323cf..edcdfb2a247 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -2,7 +2,7 @@ use parquet2::schema::types::{ParquetType, PrimitiveType as ParquetPrimitiveType use parquet2::{page::Page, write::DynIter}; use std::fmt::Debug; -use crate::array::{ListArray, StructArray}; +use crate::array::{ListArray, MapArray, StructArray}; use crate::bitmap::Bitmap; use crate::datatypes::PhysicalType; use crate::io::parquet::read::schema::is_nullable; @@ -141,6 +141,29 @@ fn to_nested_recursive( ))); to_nested_recursive(array.values().as_ref(), type_, nested, parents)?; } + Map => { + let array = array.as_any().downcast_ref::().unwrap(); + let type_ = if let ParquetType::GroupType { fields, .. } = type_ { + if let ParquetType::GroupType { fields, .. } = &fields[0] { + &fields[0] + } else { + return Err(Error::InvalidArgumentError( + "Parquet type must be a group for a list array".to_string(), + )); + } + } else { + return Err(Error::InvalidArgumentError( + "Parquet type must be a group for a list array".to_string(), + )); + }; + + parents.push(Nested::List(ListNested::new( + array.offsets().clone(), + array.validity().cloned(), + is_optional, + ))); + to_nested_recursive(array.field().as_ref(), type_, nested, parents)?; + } _ => { parents.push(Nested::Primitive( array.validity().cloned(), @@ -178,6 +201,10 @@ fn to_leaves_recursive<'a>(array: &'a dyn Array, leaves: &mut Vec<&'a dyn Array> let array = array.as_any().downcast_ref::>().unwrap(); to_leaves_recursive(array.values().as_ref(), leaves); } + Map => { + let array = array.as_any().downcast_ref::().unwrap(); + to_leaves_recursive(array.field().as_ref(), leaves); + } Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8 | LargeUtf8 | Dictionary(_) => leaves.push(array), other => todo!("Writing {:?} to parquet not yet implemented", other), @@ -509,4 +536,99 @@ mod tests { ] ); } + + #[test] + fn test_map() { + let kv_type = DataType::Struct(vec![ + Field::new("k", DataType::Utf8, false), + Field::new("v", DataType::Int32, false), + ]); + let kv_field = Field::new("kv", kv_type.clone(), false); + let map_type = DataType::Map(Box::new(kv_field.clone()), false); + + let key_array = Utf8Array::::from_slice(["k1", "k2", "k3", "k4", "k5", "k6"]).boxed(); + let val_array = Int32Array::from_slice([42, 28, 19, 31, 21, 17]).boxed(); + let kv_array = StructArray::try_new(kv_type, vec![key_array, val_array], None)?.boxed(); + let offsets = OffsetsBuffer::try_from(vec![0, 2, 3, 4, 6]).unwrap(); + + let array = MapArray::try_new(map_type, offsets, kv_array, None)?; + + let type_ = ParquetType::GroupType { + field_info: FieldInfo { + name: "kv".to_string(), + repetition: Repetition::Optional, + id: None, + }, + logical_type: None, + converted_type: None, + fields: vec![ + ParquetType::PrimitiveType(ParquetPrimitiveType { + field_info: FieldInfo { + name: "k".to_string(), + repetition: Repetition::Required, + id: None, + }, + logical_type: Some(PrimitiveLogicalType::String), + converted_type: Some(PrimitiveConvertedType::Utf8), + physical_type: ParquetPhysicalType::ByteArray, + }), + ParquetType::PrimitiveType(ParquetPrimitiveType { + field_info: FieldInfo { + name: "v".to_string(), + repetition: Repetition::Required, + id: None, + }, + logical_type: None, + converted_type: None, + physical_type: ParquetPhysicalType::Int32, + }), + ], + }; + + let type_ = ParquetType::GroupType { + field_info: FieldInfo { + name: "m".to_string(), + repetition: Repetition::Required, + id: None, + }, + logical_type: Some(GroupLogicalType::Map), + converted_type: None, + fields: vec![ParquetType::GroupType { + field_info: FieldInfo { + name: "map".to_string(), + repetition: Repetition::Repeated, + id: None, + }, + logical_type: None, + converted_type: None, + fields: vec![type_], + }], + }; + + let a = to_nested(&array, &type_).unwrap(); + + assert_eq!( + a, + vec![ + vec![ + Nested::List(ListNested:: { + is_optional: false, + offsets: vec![0, 2, 3, 4, 6].try_into().unwrap(), + validity: None, + }), + Nested::Struct(None, false, 6), + Nested::Primitive(None, false, 6), + ], + vec![ + Nested::List(ListNested:: { + is_optional: false, + offsets: vec![0, 2, 3, 4, 6].try_into().unwrap(), + validity: None, + }), + Nested::Struct(None, false, 6), + Nested::Primitive(None, false, 4), + ], + ] + ); + } } diff --git a/src/io/parquet/write/schema.rs b/src/io/parquet/write/schema.rs index ee60e98ac13..2001ee3e37f 100644 --- a/src/io/parquet/write/schema.rs +++ b/src/io/parquet/write/schema.rs @@ -320,6 +320,21 @@ pub fn to_parquet_type(field: &Field) -> Result { None, )) } + DataType::Map(f, _) => Ok(ParquetType::from_group( + name, + repetition, + None, + Some(GroupLogicalType::Map), + vec![ParquetType::from_group( + "map".to_string(), + Repetition::Repeated, + None, + None, + vec![to_parquet_type(f)?], + None, + )], + None, + )), other => Err(Error::NotYetImplemented(format!( "Writing the data type {other:?} is not yet implemented" ))), From 43ec99fea0f216a2f03ac892eabab059bdbc4779 Mon Sep 17 00:00:00 2001 From: baishen Date: Sat, 25 Feb 2023 00:52:17 +0800 Subject: [PATCH 02/18] fix --- src/io/parquet/write/pages.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index edcdfb2a247..eeb09fa9035 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -148,12 +148,12 @@ fn to_nested_recursive( &fields[0] } else { return Err(Error::InvalidArgumentError( - "Parquet type must be a group for a list array".to_string(), + "Parquet type must be a group for a map array".to_string(), )); } } else { return Err(Error::InvalidArgumentError( - "Parquet type must be a group for a list array".to_string(), + "Parquet type must be a group for a map array".to_string(), )); }; From d1dab86176d220d9ba056fb9d336582a7bd57fe7 Mon Sep 17 00:00:00 2001 From: baishen Date: Sat, 25 Feb 2023 01:06:51 +0800 Subject: [PATCH 03/18] fix test --- src/io/parquet/read/deserialize/nested.rs | 1 - src/io/parquet/write/pages.rs | 7 +++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/io/parquet/read/deserialize/nested.rs b/src/io/parquet/read/deserialize/nested.rs index c4175eecf56..9a2c0232f2f 100644 --- a/src/io/parquet/read/deserialize/nested.rs +++ b/src/io/parquet/read/deserialize/nested.rs @@ -1,7 +1,6 @@ use parquet2::schema::types::PrimitiveType; use crate::{ - array::MapArray, datatypes::{DataType, Field}, error::{Error, Result}, }; diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index eeb09fa9035..a961e6a9a13 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -258,6 +258,7 @@ pub fn array_to_columns + Send + Sync>( #[cfg(test)] mod tests { + use parquet2::schema::types::{GroupLogicalType, PrimitiveConvertedType, PrimitiveLogicalType}; use parquet2::schema::Repetition; use super::*; @@ -548,10 +549,12 @@ mod tests { let key_array = Utf8Array::::from_slice(["k1", "k2", "k3", "k4", "k5", "k6"]).boxed(); let val_array = Int32Array::from_slice([42, 28, 19, 31, 21, 17]).boxed(); - let kv_array = StructArray::try_new(kv_type, vec![key_array, val_array], None)?.boxed(); + let kv_array = StructArray::try_new(kv_type, vec![key_array, val_array], None) + .unwrap() + .boxed(); let offsets = OffsetsBuffer::try_from(vec![0, 2, 3, 4, 6]).unwrap(); - let array = MapArray::try_new(map_type, offsets, kv_array, None)?; + let array = MapArray::try_new(map_type, offsets, kv_array, None).unwrap(); let type_ = ParquetType::GroupType { field_info: FieldInfo { From 3ec9b64cf63d83c452b21350e788b26a5970cbe3 Mon Sep 17 00:00:00 2001 From: baishen Date: Sat, 25 Feb 2023 01:14:32 +0800 Subject: [PATCH 04/18] fix test --- src/io/parquet/write/pages.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index a961e6a9a13..815cddd4833 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -619,7 +619,7 @@ mod tests { offsets: vec![0, 2, 3, 4, 6].try_into().unwrap(), validity: None, }), - Nested::Struct(None, false, 6), + Nested::Struct(None, true, 6), Nested::Primitive(None, false, 6), ], vec![ @@ -628,8 +628,8 @@ mod tests { offsets: vec![0, 2, 3, 4, 6].try_into().unwrap(), validity: None, }), - Nested::Struct(None, false, 6), - Nested::Primitive(None, false, 4), + Nested::Struct(None, true, 6), + Nested::Primitive(None, false, 6), ], ] ); From 337257a6194fdc88ae45b4fa23e73f588eb8c040 Mon Sep 17 00:00:00 2001 From: baishen Date: Sat, 25 Feb 2023 01:45:52 +0800 Subject: [PATCH 05/18] fix clippy --- src/io/parquet/write/pages.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index 815cddd4833..155b62173ba 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -545,7 +545,7 @@ mod tests { Field::new("v", DataType::Int32, false), ]); let kv_field = Field::new("kv", kv_type.clone(), false); - let map_type = DataType::Map(Box::new(kv_field.clone()), false); + let map_type = DataType::Map(Box::new(kv_field, false); let key_array = Utf8Array::::from_slice(["k1", "k2", "k3", "k4", "k5", "k6"]).boxed(); let val_array = Int32Array::from_slice([42, 28, 19, 31, 21, 17]).boxed(); From 7e3b184396f28fe1a42eb6c6a3a750c6fae0652c Mon Sep 17 00:00:00 2001 From: baishen Date: Sat, 25 Feb 2023 01:47:41 +0800 Subject: [PATCH 06/18] fix --- src/io/parquet/write/pages.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index 155b62173ba..50bfdb63be9 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -545,7 +545,7 @@ mod tests { Field::new("v", DataType::Int32, false), ]); let kv_field = Field::new("kv", kv_type.clone(), false); - let map_type = DataType::Map(Box::new(kv_field, false); + let map_type = DataType::Map(Box::new(kv_field), false); let key_array = Utf8Array::::from_slice(["k1", "k2", "k3", "k4", "k5", "k6"]).boxed(); let val_array = Int32Array::from_slice([42, 28, 19, 31, 21, 17]).boxed(); From 587eb8deacfb654fe9d45070e40bae5180bc9926 Mon Sep 17 00:00:00 2001 From: baishen Date: Sat, 25 Feb 2023 07:13:34 +0800 Subject: [PATCH 07/18] add parquet map tests --- tests/it/io/parquet/mod.rs | 43 +++++++++++++++++++++++++++--------- tests/it/io/parquet/write.rs | 23 +++++++++++++++++++ 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 7bb6cf01a9e..92626ed5695 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1244,15 +1244,29 @@ pub fn pyarrow_struct_statistics(column: &str) -> Statistics { pub fn pyarrow_map(column: &str) -> Box { match column { "map" => { - let s1 = [Some("a1"), Some("a2")]; - let s2 = [Some("b1"), Some("b2")]; + let s1 = [ + Some("a1"), + Some("a2"), + Some("a3"), + Some("a4"), + Some("a5"), + Some("a6"), + ]; + let s2 = [ + Some("b1"), + Some("b2"), + Some("b3"), + Some("b4"), + Some("b5"), + Some("b6"), + ]; let dt = DataType::Struct(vec![ Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Utf8, true), ]); MapArray::try_new( DataType::Map(Box::new(Field::new("entries", dt.clone(), false)), false), - vec![0, 2].try_into().unwrap(), + vec![0, 2, 3, 4, 6].try_into().unwrap(), StructArray::try_new( dt, vec![ @@ -1269,15 +1283,22 @@ pub fn pyarrow_map(column: &str) -> Box { .boxed() } "map_nullable" => { - let s1 = [Some("a1"), Some("a2")]; - let s2 = [Some("b1"), None]; + let s1 = [ + Some("a1"), + Some("a2"), + Some("a3"), + Some("a4"), + Some("a5"), + Some("a6"), + ]; + let s2 = [Some("b1"), None, Some("b2"), Some("b3"), None, Some("b4")]; let dt = DataType::Struct(vec![ Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Utf8, true), ]); MapArray::try_new( DataType::Map(Box::new(Field::new("entries", dt.clone(), false)), false), - vec![0, 2].try_into().unwrap(), + vec![0, 2, 3, 4, 6].try_into().unwrap(), StructArray::try_new( dt, vec![ @@ -1344,8 +1365,8 @@ pub fn pyarrow_map_statistics(column: &str) -> Statistics { )), max_value: Box::new(new_map( vec![ - Utf8Array::::from_slice(["a2"]).boxed(), - Utf8Array::::from_slice(["b2"]).boxed(), + Utf8Array::::from_slice(["a6"]).boxed(), + Utf8Array::::from_slice(["b6"]).boxed(), ], names, )), @@ -1362,7 +1383,7 @@ pub fn pyarrow_map_statistics(column: &str) -> Statistics { null_count: new_map( vec![ UInt64Array::from([Some(0)]).boxed(), - UInt64Array::from([Some(1)]).boxed(), + UInt64Array::from([Some(2)]).boxed(), ], names.clone(), ) @@ -1376,8 +1397,8 @@ pub fn pyarrow_map_statistics(column: &str) -> Statistics { )), max_value: Box::new(new_map( vec![ - Utf8Array::::from_slice(["a2"]).boxed(), - Utf8Array::::from_slice(["b1"]).boxed(), + Utf8Array::::from_slice(["a6"]).boxed(), + Utf8Array::::from_slice(["b4"]).boxed(), ], names, )), diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index d110298bc68..a38e82fd65c 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -37,6 +37,7 @@ fn round_trip_opt_stats( pyarrow_required_statistics(column), ), "struct" => (pyarrow_struct(column), pyarrow_struct_statistics(column)), + "map" => (pyarrow_map(column), pyarrow_map_statistics(column)), "nested_edge" => ( pyarrow_nested_edge(column), pyarrow_nested_edge_statistics(column), @@ -626,6 +627,28 @@ fn struct_v2() -> Result<()> { ) } +#[test] +fn map_v1() -> Result<()> { + round_trip( + "map", + "map", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain, Encoding::Plain], + ) +} + +#[test] +fn map_v2() -> Result<()> { + round_trip( + "map", + "map", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain, Encoding::Plain], + ) +} + #[test] fn nested_edge_simple() -> Result<()> { round_trip( From 348f3b85a538de94b5e3aae026218b9732506f21 Mon Sep 17 00:00:00 2001 From: baishen Date: Sat, 25 Feb 2023 07:25:29 +0800 Subject: [PATCH 08/18] fix test --- src/io/parquet/read/indexes/mod.rs | 7 +++++ tests/it/io/parquet/mod.rs | 43 ++++++++---------------------- 2 files changed, 18 insertions(+), 32 deletions(-) diff --git a/src/io/parquet/read/indexes/mod.rs b/src/io/parquet/read/indexes/mod.rs index 045482a831a..3dbb070fc9f 100644 --- a/src/io/parquet/read/indexes/mod.rs +++ b/src/io/parquet/read/indexes/mod.rs @@ -206,6 +206,13 @@ fn deserialize( unreachable!() } } + PhysicalType::Map => { + if let DataType::Map(inner) = data_type.to_logical_type() { + deserialize(indexes, inner.data_type.clone()) + } else { + unreachable!() + } + } PhysicalType::Struct => { let children_fields = if let DataType::Struct(children) = data_type.to_logical_type() { children diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 92626ed5695..9567afe5008 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1244,29 +1244,15 @@ pub fn pyarrow_struct_statistics(column: &str) -> Statistics { pub fn pyarrow_map(column: &str) -> Box { match column { "map" => { - let s1 = [ - Some("a1"), - Some("a2"), - Some("a3"), - Some("a4"), - Some("a5"), - Some("a6"), - ]; - let s2 = [ - Some("b1"), - Some("b2"), - Some("b3"), - Some("b4"), - Some("b5"), - Some("b6"), - ]; + let s1 = [Some("a1"), Some("a2")]; + let s2 = [Some("b1"), Some("b2")]; let dt = DataType::Struct(vec![ Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Utf8, true), ]); MapArray::try_new( DataType::Map(Box::new(Field::new("entries", dt.clone(), false)), false), - vec![0, 2, 3, 4, 6].try_into().unwrap(), + vec![0, 2].try_into().unwrap(), StructArray::try_new( dt, vec![ @@ -1283,22 +1269,15 @@ pub fn pyarrow_map(column: &str) -> Box { .boxed() } "map_nullable" => { - let s1 = [ - Some("a1"), - Some("a2"), - Some("a3"), - Some("a4"), - Some("a5"), - Some("a6"), - ]; - let s2 = [Some("b1"), None, Some("b2"), Some("b3"), None, Some("b4")]; + let s1 = [Some("a1"), Some("a2")]; + let s2 = [Some("b1"), None]; let dt = DataType::Struct(vec![ Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Utf8, true), ]); MapArray::try_new( DataType::Map(Box::new(Field::new("entries", dt.clone(), false)), false), - vec![0, 2, 3, 4, 6].try_into().unwrap(), + vec![0, 2].try_into().unwrap(), StructArray::try_new( dt, vec![ @@ -1365,8 +1344,8 @@ pub fn pyarrow_map_statistics(column: &str) -> Statistics { )), max_value: Box::new(new_map( vec![ - Utf8Array::::from_slice(["a6"]).boxed(), - Utf8Array::::from_slice(["b6"]).boxed(), + Utf8Array::::from_slice(["a2"]).boxed(), + Utf8Array::::from_slice(["b2"]).boxed(), ], names, )), @@ -1383,7 +1362,7 @@ pub fn pyarrow_map_statistics(column: &str) -> Statistics { null_count: new_map( vec![ UInt64Array::from([Some(0)]).boxed(), - UInt64Array::from([Some(2)]).boxed(), + UInt64Array::from([Some(1)]).boxed(), ], names.clone(), ) @@ -1397,8 +1376,8 @@ pub fn pyarrow_map_statistics(column: &str) -> Statistics { )), max_value: Box::new(new_map( vec![ - Utf8Array::::from_slice(["a6"]).boxed(), - Utf8Array::::from_slice(["b4"]).boxed(), + Utf8Array::::from_slice(["a2"]).boxed(), + Utf8Array::::from_slice(["b2"]).boxed(), ], names, )), From 44e1b89e076ecca074c503d6f156833ff70b2eec Mon Sep 17 00:00:00 2001 From: baishen Date: Sat, 25 Feb 2023 07:31:16 +0800 Subject: [PATCH 09/18] fix --- src/io/parquet/read/indexes/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/parquet/read/indexes/mod.rs b/src/io/parquet/read/indexes/mod.rs index 3dbb070fc9f..e040478e12c 100644 --- a/src/io/parquet/read/indexes/mod.rs +++ b/src/io/parquet/read/indexes/mod.rs @@ -207,7 +207,7 @@ fn deserialize( } } PhysicalType::Map => { - if let DataType::Map(inner) = data_type.to_logical_type() { + if let DataType::Map(inner, _) = data_type.to_logical_type() { deserialize(indexes, inner.data_type.clone()) } else { unreachable!() From acab611c5199ce13c14996e2fa5d1401e8df52ec Mon Sep 17 00:00:00 2001 From: baishen Date: Sat, 25 Feb 2023 07:58:45 +0800 Subject: [PATCH 10/18] fix --- tests/it/io/parquet/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 9567afe5008..7bb6cf01a9e 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1377,7 +1377,7 @@ pub fn pyarrow_map_statistics(column: &str) -> Statistics { max_value: Box::new(new_map( vec![ Utf8Array::::from_slice(["a2"]).boxed(), - Utf8Array::::from_slice(["b2"]).boxed(), + Utf8Array::::from_slice(["b1"]).boxed(), ], names, )), From b4c0f46c67a3f23feff24c07401e5fccaff9fd2b Mon Sep 17 00:00:00 2001 From: baishen Date: Sat, 25 Feb 2023 09:29:11 +0800 Subject: [PATCH 11/18] fix map statistics test --- tests/it/io/parquet/mod.rs | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 7bb6cf01a9e..bd0ff01e9ec 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1298,15 +1298,19 @@ pub fn pyarrow_map(column: &str) -> Box { } pub fn pyarrow_map_statistics(column: &str) -> Statistics { - let new_map = |arrays: Vec>, names: Vec| { - let fields = names + let new_map = |arrays: Vec>, fields: Vec| { + let fields = fields .into_iter() .zip(arrays.iter()) - .map(|(n, a)| Field::new(n, a.data_type().clone(), true)) + .map(|(f, a)| Field::new(f.name, a.data_type().clone(), f.is_nullable)) .collect::>(); MapArray::new( DataType::Map( - Box::new(Field::new("items", DataType::Struct(fields.clone()), false)), + Box::new(Field::new( + "entries", + DataType::Struct(fields.clone()), + false, + )), false, ), vec![0, arrays[0].len() as i32].try_into().unwrap(), @@ -1315,7 +1319,10 @@ pub fn pyarrow_map_statistics(column: &str) -> Statistics { ) }; - let names = vec!["key".to_string(), "value".to_string()]; + let fields = vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, true), + ]; match column { "map" => Statistics { @@ -1324,7 +1331,7 @@ pub fn pyarrow_map_statistics(column: &str) -> Statistics { UInt64Array::from([None]).boxed(), UInt64Array::from([None]).boxed(), ], - names.clone(), + fields.clone(), ) .boxed(), null_count: new_map( @@ -1332,7 +1339,7 @@ pub fn pyarrow_map_statistics(column: &str) -> Statistics { UInt64Array::from([Some(0)]).boxed(), UInt64Array::from([Some(0)]).boxed(), ], - names.clone(), + fields.clone(), ) .boxed(), min_value: Box::new(new_map( @@ -1340,14 +1347,14 @@ pub fn pyarrow_map_statistics(column: &str) -> Statistics { Utf8Array::::from_slice(["a1"]).boxed(), Utf8Array::::from_slice(["b1"]).boxed(), ], - names.clone(), + fields.clone(), )), max_value: Box::new(new_map( vec![ Utf8Array::::from_slice(["a2"]).boxed(), Utf8Array::::from_slice(["b2"]).boxed(), ], - names, + fields, )), }, "map_nullable" => Statistics { @@ -1356,7 +1363,7 @@ pub fn pyarrow_map_statistics(column: &str) -> Statistics { UInt64Array::from([None]).boxed(), UInt64Array::from([None]).boxed(), ], - names.clone(), + fields.clone(), ) .boxed(), null_count: new_map( @@ -1364,7 +1371,7 @@ pub fn pyarrow_map_statistics(column: &str) -> Statistics { UInt64Array::from([Some(0)]).boxed(), UInt64Array::from([Some(1)]).boxed(), ], - names.clone(), + fields.clone(), ) .boxed(), min_value: Box::new(new_map( @@ -1372,14 +1379,14 @@ pub fn pyarrow_map_statistics(column: &str) -> Statistics { Utf8Array::::from_slice(["a1"]).boxed(), Utf8Array::::from_slice(["b1"]).boxed(), ], - names.clone(), + fields.clone(), )), max_value: Box::new(new_map( vec![ Utf8Array::::from_slice(["a2"]).boxed(), Utf8Array::::from_slice(["b1"]).boxed(), ], - names, + fields, )), }, _ => unreachable!(), From fafbd8866aaec0055c05c689ece3402e35b6806c Mon Sep 17 00:00:00 2001 From: baishen Date: Sat, 25 Feb 2023 23:27:49 +0800 Subject: [PATCH 12/18] make create_map public --- src/io/parquet/read/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index c41d60e4c1e..1171b9cf018 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -35,8 +35,8 @@ pub use parquet2::{ use crate::{array::Array, error::Result}; pub use deserialize::{ - column_iter_to_arrays, create_list, get_page_iterator, init_nested, n_columns, InitNested, - NestedArrayIter, NestedState, StructIterator, + column_iter_to_arrays, create_list, create_map, get_page_iterator, init_nested, n_columns, + InitNested, NestedArrayIter, NestedState, StructIterator, }; pub use file::{FileReader, RowGroupReader}; pub use row_group::*; From 19988886ea68a4ce045d03f553c8260a964d3daf Mon Sep 17 00:00:00 2001 From: baishen Date: Sun, 26 Feb 2023 00:22:32 +0800 Subject: [PATCH 13/18] implement n_columns for Map, parquet_type add GroupConvertedType for List and Map --- src/io/parquet/read/deserialize/mod.rs | 8 ++++++++ src/io/parquet/write/schema.rs | 8 ++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 18e1200b0a6..8dd55bb8772 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -174,6 +174,14 @@ pub fn n_columns(data_type: &DataType) -> usize { unreachable!() } } + Map => { + let a = data_type.to_logical_type(); + if let DataType::Map(inner, _) = a { + n_columns(&inner.data_type) + } else { + unreachable!() + } + } Struct => { if let DataType::Struct(fields) = data_type.to_logical_type() { fields.iter().map(|inner| n_columns(&inner.data_type)).sum() diff --git a/src/io/parquet/write/schema.rs b/src/io/parquet/write/schema.rs index 2001ee3e37f..d5008603bb5 100644 --- a/src/io/parquet/write/schema.rs +++ b/src/io/parquet/write/schema.rs @@ -3,8 +3,8 @@ use parquet2::{ metadata::KeyValue, schema::{ types::{ - GroupLogicalType, IntegerType, ParquetType, PhysicalType, PrimitiveConvertedType, - PrimitiveLogicalType, TimeUnit as ParquetTimeUnit, + GroupConvertedType, GroupLogicalType, IntegerType, ParquetType, PhysicalType, + PrimitiveConvertedType, PrimitiveLogicalType, TimeUnit as ParquetTimeUnit, }, Repetition, }, @@ -307,7 +307,7 @@ pub fn to_parquet_type(field: &Field) -> Result { Ok(ParquetType::from_group( name, repetition, - None, + Some(GroupConvertedType::List), Some(GroupLogicalType::List), vec![ParquetType::from_group( "list".to_string(), @@ -323,7 +323,7 @@ pub fn to_parquet_type(field: &Field) -> Result { DataType::Map(f, _) => Ok(ParquetType::from_group( name, repetition, - None, + Some(GroupConvertedType::Map), Some(GroupLogicalType::Map), vec![ParquetType::from_group( "map".to_string(), From 8b8f2fea4b92c15917492a8b274bc7c9bfa61ddd Mon Sep 17 00:00:00 2001 From: baishen Date: Sun, 26 Feb 2023 01:09:57 +0800 Subject: [PATCH 14/18] implement GrowableMap --- src/array/growable/map.rs | 113 ++++++++++++++++++++++++++++++++++++++ src/array/growable/mod.rs | 4 +- 2 files changed, 116 insertions(+), 1 deletion(-) create mode 100644 src/array/growable/map.rs diff --git a/src/array/growable/map.rs b/src/array/growable/map.rs new file mode 100644 index 00000000000..33cac8d83bd --- /dev/null +++ b/src/array/growable/map.rs @@ -0,0 +1,113 @@ +use std::sync::Arc; + +use crate::{ + array::{Array, MapArray}, + bitmap::MutableBitmap, + offset::{Offset, Offsets}, +}; + +use super::{ + make_growable, + utils::{build_extend_null_bits, ExtendNullBits}, + Growable, +}; + +fn extend_offset_values(growable: &mut GrowableMap<'_>, index: usize, start: usize, len: usize) { + let array = growable.arrays[index]; + let offsets = array.offsets(); + + growable + .offsets + .try_extend_from_slice(offsets, start, len) + .unwrap(); + + let end = offsets.buffer()[start + len] as usize; + let start = offsets.buffer()[start] as usize; + let len = end - start; + growable.values.extend(index, start, len); +} + +/// Concrete [`Growable`] for the [`MapArray`]. +pub struct GrowableMap<'a> { + arrays: Vec<&'a MapArray>, + validity: MutableBitmap, + values: Box + 'a>, + offsets: Offsets, + extend_null_bits: Vec>, +} + +impl<'a> GrowableMap<'a> { + /// Creates a new [`GrowableMap`] bound to `arrays` with a pre-allocated `capacity`. + /// # Panics + /// If `arrays` is empty. + pub fn new(arrays: Vec<&'a MapArray>, mut use_validity: bool, capacity: usize) -> Self { + // if any of the arrays has nulls, insertions from any array requires setting bits + // as there is at least one array with nulls. + if !use_validity & arrays.iter().any(|array| array.null_count() > 0) { + use_validity = true; + }; + + let extend_null_bits = arrays + .iter() + .map(|array| build_extend_null_bits(*array, use_validity)) + .collect(); + + let inner = arrays + .iter() + .map(|array| array.field().as_ref()) + .collect::>(); + let values = make_growable(&inner, use_validity, 0); + + Self { + arrays, + offsets: Offsets::with_capacity(capacity), + values, + validity: MutableBitmap::with_capacity(capacity), + extend_null_bits, + } + } + + fn to(&mut self) -> MapArray { + let validity = std::mem::take(&mut self.validity); + let offsets = std::mem::take(&mut self.offsets); + let values = self.values.as_box(); + + MapArray::new( + self.arrays[0].data_type().clone(), + offsets.into(), + values, + validity.into(), + ) + } +} + +impl<'a> Growable<'a> for GrowableMap<'a> { + fn extend(&mut self, index: usize, start: usize, len: usize) { + (self.extend_null_bits[index])(&mut self.validity, start, len); + extend_offset_values(self, index, start, len); + } + + fn extend_validity(&mut self, additional: usize) { + self.offsets.extend_constant(additional); + self.validity.extend_constant(additional, false); + } + + #[inline] + fn len(&self) -> usize { + self.offsets.len() - 1 + } + + fn as_arc(&mut self) -> Arc { + Arc::new(self.to()) + } + + fn as_box(&mut self) -> Box { + Box::new(self.to()) + } +} + +impl<'a> From> for MapArray { + fn from(mut val: GrowableMap<'a>) -> Self { + val.to() + } +} diff --git a/src/array/growable/mod.rs b/src/array/growable/mod.rs index 9ac97d42578..45f79405307 100644 --- a/src/array/growable/mod.rs +++ b/src/array/growable/mod.rs @@ -19,6 +19,8 @@ mod primitive; pub use primitive::GrowablePrimitive; mod list; pub use list::GrowableList; +mod map; +pub use map::GrowableMap; mod structure; pub use structure::GrowableStruct; mod fixed_size_list; @@ -123,7 +125,7 @@ pub fn make_growable<'a>( .collect::>(); Box::new(union::GrowableUnion::new(arrays, capacity)) } - Map => todo!(), + Map => dyn_growable!(map::GrowableMap, arrays, use_validity, capacity), Dictionary(key_type) => { match_integer_type!(key_type, |$T| { let arrays = arrays From 2a4cd77fb7058736160d55a2672d2d40f3b1fdc8 Mon Sep 17 00:00:00 2001 From: baishen Date: Sun, 26 Feb 2023 13:34:15 +0800 Subject: [PATCH 15/18] add tests for GrowableMap --- src/array/growable/map.rs | 2 +- tests/it/array/growable/map.rs | 116 +++++++++++++++++++++++++++++++++ tests/it/array/growable/mod.rs | 1 + 3 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 tests/it/array/growable/map.rs diff --git a/src/array/growable/map.rs b/src/array/growable/map.rs index 33cac8d83bd..0919b4821ba 100644 --- a/src/array/growable/map.rs +++ b/src/array/growable/map.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use crate::{ array::{Array, MapArray}, bitmap::MutableBitmap, - offset::{Offset, Offsets}, + offset::Offsets, }; use super::{ diff --git a/tests/it/array/growable/map.rs b/tests/it/array/growable/map.rs new file mode 100644 index 00000000000..1af1fd7a59a --- /dev/null +++ b/tests/it/array/growable/map.rs @@ -0,0 +1,116 @@ +use arrow2::{ + array::{ + growable::{Growable, GrowableMap}, + Array, MapArray, PrimitiveArray, StructArray, Utf8Array, + }, + bitmap::Bitmap, + datatypes::{DataType, Field}, + offset::OffsetsBuffer, +}; + +fn some_values() -> (DataType, Vec>) { + let strings: Box = Box::new(Utf8Array::::from([ + Some("a"), + Some("aa"), + Some("bc"), + Some("mark"), + Some("doe"), + Some("xyz"), + ])); + let ints: Box = Box::new(PrimitiveArray::::from(&[ + Some(1), + Some(2), + Some(3), + Some(4), + Some(5), + Some(6), + ])); + let fields = vec![ + Field::new("key", DataType::Utf8, true), + Field::new("val", DataType::Int32, true), + ]; + (DataType::Struct(fields), vec![strings, ints]) +} + +#[test] +fn basic() { + let (fields, values) = some_values(); + + let kv_array = StructArray::new(fields.clone(), values, None).boxed(); + let kv_field = Field::new("kv", fields.clone(), false); + let data_type = DataType::Map(Box::new(kv_field), false); + let offsets = OffsetsBuffer::try_from(vec![0, 1, 2, 4, 6]).unwrap(); + + let array = MapArray::new(data_type.clone(), offsets.clone(), kv_array.clone(), None); + + let mut a = GrowableMap::new(vec![&array], false, 0); + + a.extend(0, 1, 2); + assert_eq!(a.len(), 2); + let result: MapArray = a.into(); + + let kv_array = kv_array.sliced(1, 4); + let offsets = OffsetsBuffer::try_from(vec![0, 1, 3]).unwrap(); + let expected = MapArray::new(data_type, offsets, kv_array, None); + + assert_eq!(result, expected) +} + +#[test] +fn offset() { + let (fields, values) = some_values(); + + let kv_array = StructArray::new(fields.clone(), values, None).boxed(); + let kv_field = Field::new("kv", fields.clone(), false); + let data_type = DataType::Map(Box::new(kv_field), false); + let offsets = OffsetsBuffer::try_from(vec![0, 1, 2, 4, 6]).unwrap(); + + let array = + MapArray::new(data_type.clone(), offsets.clone(), kv_array.clone(), None).sliced(1, 3); + + let mut a = GrowableMap::new(vec![&array], false, 0); + + a.extend(0, 1, 2); + assert_eq!(a.len(), 2); + let result: MapArray = a.into(); + + let kv_array = kv_array.sliced(2, 4); + let offsets = OffsetsBuffer::try_from(vec![0, 2, 4]).unwrap(); + let expected = MapArray::new(data_type, offsets, kv_array, None); + + assert_eq!(result, expected) +} + +#[test] +fn nulls() { + let (fields, values) = some_values(); + + let kv_array = StructArray::new(fields.clone(), values, None).boxed(); + let kv_field = Field::new("kv", fields.clone(), false); + let data_type = DataType::Map(Box::new(kv_field), false); + let offsets = OffsetsBuffer::try_from(vec![0, 1, 2, 4, 6]).unwrap(); + + let array = MapArray::new( + data_type.clone(), + offsets.clone(), + kv_array.clone(), + Some(Bitmap::from_u8_slice([0b00000010], 4)), + ); + + let mut a = GrowableMap::new(vec![&array], false, 0); + + a.extend(0, 1, 2); + assert_eq!(a.len(), 2); + let result: MapArray = a.into(); + + let kv_array = kv_array.sliced(1, 4); + let offsets = OffsetsBuffer::try_from(vec![0, 1, 3]).unwrap(); + let expected = MapArray::new( + data_type, + offsets, + kv_array, + Some(Bitmap::from_u8_slice([0b00000010], 4).sliced(1, 2)), + ); + + assert_eq!(result, expected) +} diff --git a/tests/it/array/growable/mod.rs b/tests/it/array/growable/mod.rs index 480bcf06c31..d4b034a13e6 100644 --- a/tests/it/array/growable/mod.rs +++ b/tests/it/array/growable/mod.rs @@ -4,6 +4,7 @@ mod dictionary; mod fixed_binary; mod fixed_size_list; mod list; +mod map; mod null; mod primitive; mod struct_; From 493485cb3de600c14f59595230f27eedf03bfd0d Mon Sep 17 00:00:00 2001 From: baishen Date: Sun, 26 Feb 2023 13:38:20 +0800 Subject: [PATCH 16/18] fix clippy --- tests/it/array/growable/map.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tests/it/array/growable/map.rs b/tests/it/array/growable/map.rs index 1af1fd7a59a..de9069f68bc 100644 --- a/tests/it/array/growable/map.rs +++ b/tests/it/array/growable/map.rs @@ -37,11 +37,11 @@ fn basic() { let (fields, values) = some_values(); let kv_array = StructArray::new(fields.clone(), values, None).boxed(); - let kv_field = Field::new("kv", fields.clone(), false); + let kv_field = Field::new("kv", fields, false); let data_type = DataType::Map(Box::new(kv_field), false); let offsets = OffsetsBuffer::try_from(vec![0, 1, 2, 4, 6]).unwrap(); - let array = MapArray::new(data_type.clone(), offsets.clone(), kv_array.clone(), None); + let array = MapArray::new(data_type.clone(), offsets, kv_array.clone(), None); let mut a = GrowableMap::new(vec![&array], false, 0); @@ -61,12 +61,11 @@ fn offset() { let (fields, values) = some_values(); let kv_array = StructArray::new(fields.clone(), values, None).boxed(); - let kv_field = Field::new("kv", fields.clone(), false); + let kv_field = Field::new("kv", fields, false); let data_type = DataType::Map(Box::new(kv_field), false); let offsets = OffsetsBuffer::try_from(vec![0, 1, 2, 4, 6]).unwrap(); - let array = - MapArray::new(data_type.clone(), offsets.clone(), kv_array.clone(), None).sliced(1, 3); + let array = MapArray::new(data_type.clone(), offsets, kv_array.clone(), None).sliced(1, 3); let mut a = GrowableMap::new(vec![&array], false, 0); @@ -86,13 +85,13 @@ fn nulls() { let (fields, values) = some_values(); let kv_array = StructArray::new(fields.clone(), values, None).boxed(); - let kv_field = Field::new("kv", fields.clone(), false); + let kv_field = Field::new("kv", fields, false); let data_type = DataType::Map(Box::new(kv_field), false); let offsets = OffsetsBuffer::try_from(vec![0, 1, 2, 4, 6]).unwrap(); let array = MapArray::new( data_type.clone(), - offsets.clone(), + offsets, kv_array.clone(), Some(Bitmap::from_u8_slice([0b00000010], 4)), ); From 61024411b4f18404d6710118f5d4f21d8c017424 Mon Sep 17 00:00:00 2001 From: baishen Date: Mon, 27 Feb 2023 11:57:06 +0800 Subject: [PATCH 17/18] add map array fmt --- src/array/fmt.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/array/fmt.rs b/src/array/fmt.rs index 36cb2f4e456..4f2c6896beb 100644 --- a/src/array/fmt.rs +++ b/src/array/fmt.rs @@ -88,7 +88,9 @@ pub fn get_value_display<'a, F: Write + 'a>( Union => Box::new(move |f, index| { super::union::fmt::write_value(array.as_any().downcast_ref().unwrap(), index, null, f) }), - Map => todo!(), + Map => Box::new(move |f, index| { + super::map::fmt::write_value(array.as_any().downcast_ref().unwrap(), index, null, f) + }), Dictionary(key_type) => match_integer_type!(key_type, |$T| { Box::new(move |f, index| { super::dictionary::fmt::write_value::<$T,_>(array.as_any().downcast_ref().unwrap(), index, null, f) From c6cc76f9438480581ab9c3331456a47cd468f9e8 Mon Sep 17 00:00:00 2001 From: baishen Date: Mon, 27 Feb 2023 12:07:07 +0800 Subject: [PATCH 18/18] fix --- src/array/map/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/array/map/mod.rs b/src/array/map/mod.rs index 434a14233c9..150ebbeeb7b 100644 --- a/src/array/map/mod.rs +++ b/src/array/map/mod.rs @@ -8,7 +8,7 @@ use crate::{ use super::{new_empty_array, specification::try_check_offsets_bounds, Array}; mod ffi; -mod fmt; +pub(super) mod fmt; mod iterator; pub use iterator::*;