From 49d854a59ddb35a0c039325e2290972f07e8eaf4 Mon Sep 17 00:00:00 2001 From: Shaeq Ahmed Date: Sun, 11 Sep 2022 17:43:25 +0300 Subject: [PATCH 1/5] Fix Avro 2 Arrow struct validity issue --- Cargo.toml | 2 +- src/io/avro/read/deserialize.rs | 1 + src/io/avro/read/nested.rs | 8 ++++++++ src/io/parquet/write/dictionary.rs | 2 +- src/io/parquet/write/mod.rs | 2 +- src/io/parquet/write/pages.rs | 2 +- tests/it/io/parquet/read_indexes.rs | 6 +++--- 7 files changed, 16 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c22c3351acb..25998a2226b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ futures = { version = "0.3", optional = true } async-stream = { version = "0.3.2", optional = true } # parquet support -parquet2 = { version = "0.16", optional = true, default_features = false, features = ["async"] } +parquet2 = { git = "https://github.com/jorgecarleitao/parquet2.git", rev = "21a7f98f", optional = true, features = ["full"] } # avro support avro-schema = { version = "0.3", optional = true } diff --git a/src/io/avro/read/deserialize.rs b/src/io/avro/read/deserialize.rs index 9fa9f152544..5f535650140 100644 --- a/src/io/avro/read/deserialize.rs +++ b/src/io/avro/read/deserialize.rs @@ -160,6 +160,7 @@ fn deserialize_value<'a>( let values = array.mut_values(index); block = deserialize_item(values, *is_nullable, &field.schema, block)?; } + array.try_push_valid()?; } _ => match data_type.to_physical_type() { PhysicalType::Boolean => { diff --git a/src/io/avro/read/nested.rs b/src/io/avro/read/nested.rs index 981b6894f9b..649a702ff9c 100644 --- a/src/io/avro/read/nested.rs +++ b/src/io/avro/read/nested.rs @@ -230,6 +230,14 @@ impl DynMutableStructArray { self.values[field].as_mut() } + #[inline] + pub fn try_push_valid(&mut self) -> Result<()> { + if let Some(validity) = &mut self.validity { + validity.push(true) + } + Ok(()) + } + #[inline] fn push_null(&mut self) { self.values.iter_mut().for_each(|x| x.push_null()); diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index 2abffd98a49..eb68820dba3 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -1,6 +1,6 @@ use parquet2::{ encoding::{hybrid_rle::encode_u32, Encoding}, - page::{DictPage, EncodedPage}, + page::{DictPage, Page as EncodedPage}, schema::types::PrimitiveType, statistics::{serialize_statistics, ParquetStatistics}, write::DynIter, diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 0b692e9835f..549db70d7d8 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -38,7 +38,7 @@ pub use parquet2::{ encoding::Encoding, fallible_streaming_iterator, metadata::{Descriptor, FileMetaData, KeyValue, SchemaDescriptor, ThriftFileMetaData}, - page::{CompressedDataPage, CompressedPage, EncodedPage}, + page::{CompressedDataPage, CompressedPage, Page as EncodedPage}, schema::types::{FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType}, write::{ compress, write_metadata_sidecar, Compressor, DynIter, DynStreamingIterator, RowGroupIter, diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index 0e8dcf3d69d..2b953269c45 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -1,5 +1,5 @@ use parquet2::schema::types::{ParquetType, PrimitiveType as ParquetPrimitiveType}; -use parquet2::{page::EncodedPage, write::DynIter}; +use parquet2::{page::{Page as EncodedPage}, write::DynIter}; use crate::array::{ListArray, Offset, StructArray}; use crate::bitmap::Bitmap; diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index bfa5f2a3c3b..5c71210e59d 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -9,7 +9,7 @@ use arrow2::{array::*, datatypes::*, error::Result, io::parquet::read::*, io::pa fn pages( arrays: &[&dyn Array], encoding: Encoding, -) -> Result<(Vec, Vec, Schema)> { +) -> Result<(Vec, Vec, Schema)> { // create pages with different number of rows let array11 = PrimitiveArray::::from_slice([1, 2, 3, 4]); let array12 = PrimitiveArray::::from_slice([5]); @@ -72,7 +72,7 @@ fn pages( /// Tests reading pages while skipping indexes fn read_with_indexes( - (pages1, pages2, schema): (Vec, Vec, Schema), + (pages1, pages2, schema): (Vec, Vec, Schema), expected: Box, ) -> Result<()> { let options = WriteOptions { @@ -81,7 +81,7 @@ fn read_with_indexes( version: Version::V1, }; - let to_compressed = |pages: Vec| { + let to_compressed = |pages: Vec| { let encoded_pages = DynIter::new(pages.into_iter().map(Ok)); let compressed_pages = Compressor::new(encoded_pages, options.compression, vec![]).map_err(Error::from); From 063d7a3600fe697b3a534e6e0ccd641e7913e586 Mon Sep 17 00:00:00 2001 From: Shaeq Ahmed Date: Tue, 13 Sep 2022 00:01:38 +0300 Subject: [PATCH 2/5] arrow use parquet Page instead of EncodedPage --- Cargo.toml | 2 +- src/io/parquet/write/dictionary.rs | 10 +++++----- src/io/parquet/write/pages.rs | 6 +++--- tests/it/io/parquet/read_indexes.rs | 5 +---- 4 files changed, 10 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 25998a2226b..21f92812d7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ futures = { version = "0.3", optional = true } async-stream = { version = "0.3.2", optional = true } # parquet support -parquet2 = { git = "https://github.com/jorgecarleitao/parquet2.git", rev = "21a7f98f", optional = true, features = ["full"] } +parquet2 = { git = "https://github.com/jorgecarleitao/parquet2.git", rev = "21a7f98f", optional = true, default_features = false, features = ["async"] } # avro support avro-schema = { version = "0.3", optional = true } diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index eb68820dba3..9f20d4692de 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -1,6 +1,6 @@ use parquet2::{ encoding::{hybrid_rle::encode_u32, Encoding}, - page::{DictPage, Page as EncodedPage}, + page::{DictPage, Page}, schema::types::PrimitiveType, statistics::{serialize_statistics, ParquetStatistics}, write::DynIter, @@ -106,7 +106,7 @@ fn serialize_keys( nested: &[Nested], statistics: ParquetStatistics, options: WriteOptions, -) -> Result { +) -> Result { let mut buffer = vec![]; // parquet only accepts a single validity - we "&" the validities into a single one @@ -142,7 +142,7 @@ fn serialize_keys( options, Encoding::RleDictionary, ) - .map(EncodedPage::Data) + .map(Page::Data) } macro_rules! dyn_prim { @@ -162,7 +162,7 @@ pub fn array_to_pages( nested: &[Nested], options: WriteOptions, encoding: Encoding, -) -> Result>> { +) -> Result>> { match encoding { Encoding::PlainDictionary | Encoding::RleDictionary => { // write DictPage @@ -230,7 +230,7 @@ pub fn array_to_pages( ))) } }; - let dict_page = EncodedPage::Dict(dict_page); + let dict_page = Page::Dict(dict_page); // write DataPage pointing to DictPage let data_page = serialize_keys(array, type_, nested, statistics, options)?; diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index 2b953269c45..d841ed0fae3 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -1,5 +1,5 @@ use parquet2::schema::types::{ParquetType, PrimitiveType as ParquetPrimitiveType}; -use parquet2::{page::{Page as EncodedPage}, write::DynIter}; +use parquet2::{page::Page, write::DynIter}; use crate::array::{ListArray, Offset, StructArray}; use crate::bitmap::Bitmap; @@ -193,13 +193,13 @@ fn to_parquet_leafs_recursive(type_: ParquetType, leafs: &mut Vec + Send + Sync>( array: A, type_: ParquetType, options: WriteOptions, encoding: &[Encoding], -) -> Result>>> { +) -> Result>>> { let array = array.as_ref(); let nested = to_nested(array, &type_)?; diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index 5c71210e59d..7806de1b1b0 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -6,10 +6,7 @@ use arrow2::io::parquet::read::indexes; use arrow2::{array::*, datatypes::*, error::Result, io::parquet::read::*, io::parquet::write::*}; /// Returns 2 sets of pages with different the same number of rows distributed un-evenly -fn pages( - arrays: &[&dyn Array], - encoding: Encoding, -) -> Result<(Vec, Vec, Schema)> { +fn pages(arrays: &[&dyn Array], encoding: Encoding) -> Result<(Vec, Vec, Schema)> { // create pages with different number of rows let array11 = PrimitiveArray::::from_slice([1, 2, 3, 4]); let array12 = PrimitiveArray::::from_slice([5]); From 6c3cb5a45eea7918f87471310b9ac6ccf6a84ca8 Mon Sep 17 00:00:00 2001 From: Shaeq Ahmed Date: Tue, 13 Sep 2022 00:18:46 +0300 Subject: [PATCH 3/5] remove extraneous changes --- Cargo.toml | 2 +- src/io/parquet/write/dictionary.rs | 10 +++++----- src/io/parquet/write/mod.rs | 2 +- src/io/parquet/write/pages.rs | 6 +++--- testing/arrow-testing | 2 +- testing/parquet-testing | 2 +- tests/it/io/parquet/read_indexes.rs | 9 ++++++--- 7 files changed, 18 insertions(+), 15 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 21f92812d7b..c22c3351acb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ futures = { version = "0.3", optional = true } async-stream = { version = "0.3.2", optional = true } # parquet support -parquet2 = { git = "https://github.com/jorgecarleitao/parquet2.git", rev = "21a7f98f", optional = true, default_features = false, features = ["async"] } +parquet2 = { version = "0.16", optional = true, default_features = false, features = ["async"] } # avro support avro-schema = { version = "0.3", optional = true } diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index 9f20d4692de..2abffd98a49 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -1,6 +1,6 @@ use parquet2::{ encoding::{hybrid_rle::encode_u32, Encoding}, - page::{DictPage, Page}, + page::{DictPage, EncodedPage}, schema::types::PrimitiveType, statistics::{serialize_statistics, ParquetStatistics}, write::DynIter, @@ -106,7 +106,7 @@ fn serialize_keys( nested: &[Nested], statistics: ParquetStatistics, options: WriteOptions, -) -> Result { +) -> Result { let mut buffer = vec![]; // parquet only accepts a single validity - we "&" the validities into a single one @@ -142,7 +142,7 @@ fn serialize_keys( options, Encoding::RleDictionary, ) - .map(Page::Data) + .map(EncodedPage::Data) } macro_rules! dyn_prim { @@ -162,7 +162,7 @@ pub fn array_to_pages( nested: &[Nested], options: WriteOptions, encoding: Encoding, -) -> Result>> { +) -> Result>> { match encoding { Encoding::PlainDictionary | Encoding::RleDictionary => { // write DictPage @@ -230,7 +230,7 @@ pub fn array_to_pages( ))) } }; - let dict_page = Page::Dict(dict_page); + let dict_page = EncodedPage::Dict(dict_page); // write DataPage pointing to DictPage let data_page = serialize_keys(array, type_, nested, statistics, options)?; diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 549db70d7d8..0b692e9835f 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -38,7 +38,7 @@ pub use parquet2::{ encoding::Encoding, fallible_streaming_iterator, metadata::{Descriptor, FileMetaData, KeyValue, SchemaDescriptor, ThriftFileMetaData}, - page::{CompressedDataPage, CompressedPage, Page as EncodedPage}, + page::{CompressedDataPage, CompressedPage, EncodedPage}, schema::types::{FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType}, write::{ compress, write_metadata_sidecar, Compressor, DynIter, DynStreamingIterator, RowGroupIter, diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index d841ed0fae3..0e8dcf3d69d 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -1,5 +1,5 @@ use parquet2::schema::types::{ParquetType, PrimitiveType as ParquetPrimitiveType}; -use parquet2::{page::Page, write::DynIter}; +use parquet2::{page::EncodedPage, write::DynIter}; use crate::array::{ListArray, Offset, StructArray}; use crate::bitmap::Bitmap; @@ -193,13 +193,13 @@ fn to_parquet_leafs_recursive(type_: ParquetType, leafs: &mut Vec + Send + Sync>( array: A, type_: ParquetType, options: WriteOptions, encoding: &[Encoding], -) -> Result>>> { +) -> Result>>> { let array = array.as_ref(); let nested = to_nested(array, &type_)?; diff --git a/testing/arrow-testing b/testing/arrow-testing index e8ce32338f2..5bab2f264a2 160000 --- a/testing/arrow-testing +++ b/testing/arrow-testing @@ -1 +1 @@ -Subproject commit e8ce32338f2dfeca3a5126f7677bdee159604000 +Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88 diff --git a/testing/parquet-testing b/testing/parquet-testing index 8e7badc6a38..aafd3fc9df4 160000 --- a/testing/parquet-testing +++ b/testing/parquet-testing @@ -1 +1 @@ -Subproject commit 8e7badc6a3817a02e06d17b5d8ab6b6dc356e890 +Subproject commit aafd3fc9df431c2625a514fb46626e5614f1d199 diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index 7806de1b1b0..bfa5f2a3c3b 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -6,7 +6,10 @@ use arrow2::io::parquet::read::indexes; use arrow2::{array::*, datatypes::*, error::Result, io::parquet::read::*, io::parquet::write::*}; /// Returns 2 sets of pages with different the same number of rows distributed un-evenly -fn pages(arrays: &[&dyn Array], encoding: Encoding) -> Result<(Vec, Vec, Schema)> { +fn pages( + arrays: &[&dyn Array], + encoding: Encoding, +) -> Result<(Vec, Vec, Schema)> { // create pages with different number of rows let array11 = PrimitiveArray::::from_slice([1, 2, 3, 4]); let array12 = PrimitiveArray::::from_slice([5]); @@ -69,7 +72,7 @@ fn pages(arrays: &[&dyn Array], encoding: Encoding) -> Result<(Vec, Vec, Vec, Schema), + (pages1, pages2, schema): (Vec, Vec, Schema), expected: Box, ) -> Result<()> { let options = WriteOptions { @@ -78,7 +81,7 @@ fn read_with_indexes( version: Version::V1, }; - let to_compressed = |pages: Vec| { + let to_compressed = |pages: Vec| { let encoded_pages = DynIter::new(pages.into_iter().map(Ok)); let compressed_pages = Compressor::new(encoded_pages, options.compression, vec![]).map_err(Error::from); From 1a122b0ab7de2ba72d1f21efdb570fa9d0bfb985 Mon Sep 17 00:00:00 2001 From: Shaeq Ahmed Date: Tue, 13 Sep 2022 00:24:14 +0300 Subject: [PATCH 4/5] remove submodules --- testing/arrow-testing | 1 - testing/parquet-testing | 1 - 2 files changed, 2 deletions(-) delete mode 160000 testing/arrow-testing delete mode 160000 testing/parquet-testing diff --git a/testing/arrow-testing b/testing/arrow-testing deleted file mode 160000 index 5bab2f264a2..00000000000 --- a/testing/arrow-testing +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88 diff --git a/testing/parquet-testing b/testing/parquet-testing deleted file mode 160000 index aafd3fc9df4..00000000000 --- a/testing/parquet-testing +++ /dev/null @@ -1 +0,0 @@ -Subproject commit aafd3fc9df431c2625a514fb46626e5614f1d199 From 5ddb6ca9bfb7b0285dc1305aba41c504af55967c Mon Sep 17 00:00:00 2001 From: Shaeq Ahmed Date: Tue, 13 Sep 2022 00:53:22 +0300 Subject: [PATCH 5/5] oil --- testing/arrow-testing | 1 + testing/parquet-testing | 1 + 2 files changed, 2 insertions(+) create mode 160000 testing/arrow-testing create mode 160000 testing/parquet-testing diff --git a/testing/arrow-testing b/testing/arrow-testing new file mode 160000 index 00000000000..e8ce32338f2 --- /dev/null +++ b/testing/arrow-testing @@ -0,0 +1 @@ +Subproject commit e8ce32338f2dfeca3a5126f7677bdee159604000 diff --git a/testing/parquet-testing b/testing/parquet-testing new file mode 160000 index 00000000000..8e7badc6a38 --- /dev/null +++ b/testing/parquet-testing @@ -0,0 +1 @@ +Subproject commit 8e7badc6a3817a02e06d17b5d8ab6b6dc356e890