From 9030f3a0b1acb34283e52532e6f3fd8d94f16d11 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Fri, 20 Dec 2024 13:31:34 +1300 Subject: [PATCH] Make encryption an optional feature --- parquet/Cargo.toml | 5 ++- parquet/src/arrow/arrow_reader/mod.rs | 44 +++++++++++++++------ parquet/src/column/writer/mod.rs | 4 ++ parquet/src/errors.rs | 2 +- parquet/src/file/footer.rs | 9 ++++- parquet/src/file/metadata/mod.rs | 13 +++++- parquet/src/file/metadata/reader.rs | 20 +++++++--- parquet/src/file/serialized_reader.rs | 57 ++++++++++++++++++--------- parquet/src/file/writer.rs | 1 + parquet/src/lib.rs | 2 +- 10 files changed, 114 insertions(+), 43 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 20caadfb168e..d3893bd9d447 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -69,7 +69,7 @@ paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } sysinfo = { version = "0.32.0", optional = true, default-features = false, features = ["system"] } crc32fast = { version = "1.4.2", optional = true, default-features = false } -ring = { version = "0.17", default-features = false, features = ["std"]} +ring = { version = "0.17", default-features = false, features = ["std"], optional = true } [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } @@ -121,7 +121,8 @@ zstd = ["dep:zstd", "zstd-sys"] sysinfo = ["dep:sysinfo"] # Verify 32-bit CRC checksum when decoding parquet pages crc = ["dep:crc32fast"] -#encryption = ["aes-gcm", "base64"] +# Enable Parquet modular encryption support +encryption = ["dep:ring"] [[example]] diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index a72620388011..4c3b54637cf4 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -41,6 +41,7 @@ mod filter; mod selection; pub mod statistics; +#[cfg(feature = "encryption")] use crate::encryption::ciphers::{CryptoContext, FileDecryptionProperties}; /// Builder for constructing parquet readers into arrow. @@ -383,12 +384,14 @@ impl ArrowReaderMetadata { pub fn load( reader: &T, options: ArrowReaderOptions, - file_decryption_properties: Option<&FileDecryptionProperties>, + #[cfg(feature = "encryption")] file_decryption_properties: Option< + &FileDecryptionProperties, + >, ) -> Result { - let metadata = ParquetMetaDataReader::new() - .with_page_indexes(options.page_index) - .with_encryption_properties(file_decryption_properties) - .parse_and_finish(reader)?; + let metadata = ParquetMetaDataReader::new().with_page_indexes(options.page_index); + #[cfg(feature = "encryption")] + let metadata = metadata.with_encryption_properties(file_decryption_properties); + let metadata = metadata.parse_and_finish(reader)?; Self::try_new(Arc::new(metadata), options) } @@ -534,11 +537,17 @@ impl ParquetRecordBatchReaderBuilder { /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`] pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result { - let metadata = ArrowReaderMetadata::load(&reader, options, None)?; + let metadata = ArrowReaderMetadata::load( + &reader, + options, + #[cfg(feature = "encryption")] + None, + )?; Ok(Self::new_with_metadata(reader, metadata)) } /// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`] and [`FileDecryptionProperties`] + #[cfg(feature = "encryption")] pub fn try_new_with_decryption( reader: T, options: ArrowReaderOptions, @@ -694,6 +703,7 @@ impl Iterator for ReaderPageIterator { let total_rows = rg.num_rows() as usize; let reader = self.reader.clone(); + #[cfg(feature = "encryption")] let crypto_context = if self.metadata.file_decryptor().is_some() { let file_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap()); @@ -708,8 +718,14 @@ impl Iterator for ReaderPageIterator { None }; - let ret = - SerializedPageReader::new(reader, meta, total_rows, page_locations, crypto_context); + let ret = SerializedPageReader::new( + reader, + meta, + total_rows, + page_locations, + #[cfg(feature = "encryption")] + crypto_context, + ); Some(ret.map(|x| Box::new(x) as _)) } } @@ -824,6 +840,7 @@ impl ParquetRecordBatchReader { /// /// Note: this is needed when the parquet file is encrypted // todo: add options or put file_decryption_properties into options + #[cfg(feature = "encryption")] pub fn try_new_with_decryption( reader: T, batch_size: usize, @@ -993,10 +1010,11 @@ mod tests { }; use arrow_select::concat::concat_batches; + #[cfg(feature = "encryption")] + use crate::arrow::arrow_reader::ArrowReaderMetadata; use crate::arrow::arrow_reader::{ - ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, - ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, - RowSelector, + ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader, + ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector, }; use crate::arrow::schema::add_encoded_arrow_schema_to_metadata; use crate::arrow::{ArrowWriter, ProjectionMask}; @@ -1006,6 +1024,7 @@ mod tests { BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96Type, }; + #[cfg(feature = "encryption")] use crate::encryption::ciphers; use crate::errors::Result; use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; @@ -1716,6 +1735,7 @@ mod tests { } #[test] + #[cfg(feature = "encryption")] fn test_non_uniform_encryption_plaintext_footer() { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted"); @@ -1766,6 +1786,7 @@ mod tests { } #[test] + #[cfg(feature = "encryption")] fn test_non_uniform_encryption() { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted"); @@ -1797,6 +1818,7 @@ mod tests { } #[test] + #[cfg(feature = "encryption")] fn test_uniform_encryption() { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/uniform_encryption.parquet.encrypted"); diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 2c7976f3fb3f..4a9df593d6b2 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -2027,6 +2027,7 @@ mod tests { r.rows_written as usize, None, Arc::new(props), + #[cfg(feature = "encryption")] None, ) .unwrap(); @@ -2080,6 +2081,7 @@ mod tests { r.rows_written as usize, None, Arc::new(props), + #[cfg(feature = "encryption")] None, ) .unwrap(); @@ -2216,6 +2218,7 @@ mod tests { r.rows_written as usize, None, Arc::new(props), + #[cfg(feature = "encryption")] None, ) .unwrap(), @@ -3487,6 +3490,7 @@ mod tests { result.rows_written as usize, None, Arc::new(props), + #[cfg(feature = "encryption")] None, ) .unwrap(), diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs index c2ab0937246c..01a30aee36ea 100644 --- a/parquet/src/errors.rs +++ b/parquet/src/errors.rs @@ -120,7 +120,7 @@ impl From for ParquetError { } } -//#[cfg(feature = "encryption")] +#[cfg(feature = "encryption")] impl From for ParquetError { fn from(e: ring::error::Unspecified) -> ParquetError { ParquetError::External(Box::new(e)) diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index af34fafb2e81..21f909d505b2 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -17,6 +17,7 @@ //! Module for working with Parquet file footers. +#[cfg(feature = "encryption")] use crate::encryption::ciphers::FileDecryptionProperties; use crate::errors::Result; use crate::file::{metadata::*, reader::ChunkReader, FOOTER_SIZE}; @@ -60,9 +61,13 @@ pub fn parse_metadata(chunk_reader: &R) -> Result, + #[cfg(feature = "encryption")] file_decryption_properties: Option<&FileDecryptionProperties>, ) -> Result { - ParquetMetaDataReader::decode_metadata(buf, file_decryption_properties) + ParquetMetaDataReader::decode_metadata( + buf, + #[cfg(feature = "encryption")] + file_decryption_properties, + ) } /// Decodes the Parquet footer returning the metadata length in bytes diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 0b0e8f5134d6..dce15a8c1b26 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -104,6 +104,7 @@ use crate::format::{ }; use crate::basic::{ColumnOrder, Compression, Encoding, Type}; +#[cfg(feature = "encryption")] use crate::encryption::ciphers::FileDecryptor; use crate::errors::{ParquetError, Result}; pub(crate) use crate::file::metadata::memory::HeapSize; @@ -176,6 +177,7 @@ pub struct ParquetMetaData { /// Offset index for each page in each column chunk offset_index: Option, /// Optional file decryptor + #[cfg(feature = "encryption")] file_decryptor: Option, } @@ -185,11 +187,12 @@ impl ParquetMetaData { pub fn new( file_metadata: FileMetaData, row_groups: Vec, - file_decryptor: Option, + #[cfg(feature = "encryption")] file_decryptor: Option, ) -> Self { ParquetMetaData { file_metadata, row_groups, + #[cfg(feature = "encryption")] file_decryptor, column_index: None, offset_index: None, @@ -223,6 +226,7 @@ impl ParquetMetaData { } /// Returns file decryptor as reference. + #[cfg(feature = "encryption")] pub fn file_decryptor(&self) -> &Option { &self.file_decryptor } @@ -350,7 +354,12 @@ pub struct ParquetMetaDataBuilder(ParquetMetaData); impl ParquetMetaDataBuilder { /// Create a new builder from a file metadata, with no row groups pub fn new(file_meta_data: FileMetaData) -> Self { - Self(ParquetMetaData::new(file_meta_data, vec![], None)) + Self(ParquetMetaData::new( + file_meta_data, + vec![], + #[cfg(feature = "encryption")] + None, + )) } /// Create a new builder from an existing ParquetMetaData diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index ae8661f73278..e080e50f181b 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -20,6 +20,7 @@ use std::{io::Read, ops::Range, sync::Arc}; use bytes::Bytes; use crate::basic::ColumnOrder; +#[cfg(feature = "encryption")] use crate::encryption::ciphers::{ create_footer_aad, BlockDecryptor, FileDecryptionProperties, FileDecryptor, }; @@ -29,10 +30,9 @@ use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; use crate::file::reader::ChunkReader; use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER}; -use crate::format::{ - ColumnOrder as TColumnOrder, EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData, - FileMetaData as TFileMetaData, -}; +use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +#[cfg(feature = "encryption")] +use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData}; use crate::schema::types; use crate::schema::types::SchemaDescriptor; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; @@ -74,6 +74,7 @@ pub struct ParquetMetaDataReader { // Size of the serialized thrift metadata plus the 8 byte footer. Only set if // `self.parse_metadata` is called. metadata_size: Option, + #[cfg(feature = "encryption")] file_decryption_properties: Option, } @@ -136,6 +137,7 @@ impl ParquetMetaDataReader { /// Provide the [`FileDecryptionProperties`] to use when decrypting the file. /// /// This is only necessary when the file is encrypted. + #[cfg(feature = "encryption")] pub fn with_encryption_properties( mut self, properties: Option<&FileDecryptionProperties>, @@ -532,6 +534,7 @@ impl ParquetMetaDataReader { let start = file_size - footer_metadata_len as u64; Self::decode_metadata( chunk_reader.get_bytes(start, metadata_len)?.as_ref(), + #[cfg(feature = "encryption")] self.file_decryption_properties.as_ref(), ) } @@ -639,12 +642,18 @@ impl ParquetMetaDataReader { /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata pub fn decode_metadata( buf: &[u8], - file_decryption_properties: Option<&FileDecryptionProperties>, + #[cfg(feature = "encryption")] file_decryption_properties: Option< + &FileDecryptionProperties, + >, ) -> Result { let mut prot = TCompactSliceInputProtocol::new(buf); + + #[cfg(feature = "encryption")] let mut file_decryptor = None; + #[cfg(feature = "encryption")] let decrypted_fmd_buf; + #[cfg(feature = "encryption")] if file_decryption_properties.is_some() && file_decryption_properties.unwrap().has_footer_key() { @@ -708,6 +717,7 @@ impl ParquetMetaDataReader { Ok(ParquetMetaData::new( file_metadata, row_groups, + #[cfg(feature = "encryption")] file_decryptor, )) } diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 8c8c3615c83e..cc27b24fb15f 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -22,6 +22,7 @@ use crate::basic::{Encoding, Type}; use crate::bloom_filter::Sbbf; use crate::column::page::{Page, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; +#[cfg(feature = "encryption")] use crate::encryption::ciphers::{create_page_aad, BlockDecryptor, CryptoContext, ModuleType}; use crate::errors::{ParquetError, Result}; use crate::file::page_index::offset_index::OffsetIndexMetaData; @@ -324,6 +325,7 @@ impl RowGroupReader for SerializedRowGroupReader<'_, R self.metadata.num_rows() as usize, page_locations, props, + #[cfg(feature = "encryption")] None, )?)) } @@ -341,8 +343,9 @@ impl RowGroupReader for SerializedRowGroupReader<'_, R /// Reads a [`PageHeader`] from the provided [`Read`] pub(crate) fn read_page_header( input: &mut T, - crypto_context: Option>, + #[cfg(feature = "encryption")] crypto_context: Option>, ) -> Result { + #[cfg(feature = "encryption")] if let Some(crypto_context) = crypto_context { let decryptor = &crypto_context.data_decryptor(); // todo: get column decryptor @@ -383,7 +386,7 @@ pub(crate) fn read_page_header( /// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read fn read_page_header_len( input: &mut T, - crypto_context: Option>, + #[cfg(feature = "encryption")] crypto_context: Option>, ) -> Result<(usize, PageHeader)> { /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read struct TrackedRead { @@ -403,7 +406,11 @@ fn read_page_header_len( inner: input, bytes_read: 0, }; - let header = read_page_header(&mut tracked, crypto_context)?; + let header = read_page_header( + &mut tracked, + #[cfg(feature = "encryption")] + crypto_context, + )?; Ok((tracked.bytes_read, header)) } @@ -413,7 +420,7 @@ pub(crate) fn decode_page( buffer: Bytes, physical_type: Type, decompressor: Option<&mut Box>, - crypto_context: Option>, + #[cfg(feature = "encryption")] crypto_context: Option>, ) -> Result { // Verify the 32-bit CRC checksum of the page #[cfg(feature = "crc")] @@ -440,6 +447,7 @@ pub(crate) fn decode_page( can_decompress = header_v2.is_compressed.unwrap_or(true); } + #[cfg(feature = "encryption")] let buffer: Bytes = if crypto_context.is_some() { let crypto_context = crypto_context.as_ref().unwrap(); let decryptor = crypto_context.data_decryptor(); @@ -584,6 +592,7 @@ pub struct SerializedPageReader { state: SerializedPageReaderState, /// Crypto context + #[cfg(feature = "encryption")] crypto_context: Option>, } @@ -594,7 +603,7 @@ impl SerializedPageReader { meta: &ColumnChunkMetaData, total_rows: usize, page_locations: Option>, - crypto_context: Option>, + #[cfg(feature = "encryption")] crypto_context: Option>, ) -> Result { let props = Arc::new(ReaderProperties::builder().build()); SerializedPageReader::new_with_properties( @@ -603,6 +612,7 @@ impl SerializedPageReader { total_rows, page_locations, props, + #[cfg(feature = "encryption")] crypto_context, ) } @@ -614,7 +624,7 @@ impl SerializedPageReader { total_rows: usize, page_locations: Option>, props: ReaderPropertiesPtr, - crypto_context: Option>, + #[cfg(feature = "encryption")] crypto_context: Option>, ) -> Result { let decompressor = create_codec(meta.compression(), props.codec_options())?; let (start, len) = meta.byte_range(); @@ -644,21 +654,13 @@ impl SerializedPageReader { require_dictionary: meta.dictionary_page_offset().is_some(), }, }; - if crypto_context.is_some() { - return Ok(Self { - reader, - decompressor, - state, - physical_type: meta.column_type(), - crypto_context, - }); - } Ok(Self { reader, decompressor, state, physical_type: meta.column_type(), - crypto_context: None, + #[cfg(feature = "encryption")] + crypto_context, }) } } @@ -690,12 +692,17 @@ impl PageReader for SerializedPageReader { let header = if let Some(header) = next_page_header.take() { *header } else { + #[cfg(feature = "encryption")] let crypto_context = page_crypto_context( &self.crypto_context, *page_ordinal, *require_dictionary, )?; - let (header_len, header) = read_page_header_len(&mut read, crypto_context)?; + let (header_len, header) = read_page_header_len( + &mut read, + #[cfg(feature = "encryption")] + crypto_context, + )?; *offset += header_len; *remaining -= header_len; header @@ -719,6 +726,7 @@ impl PageReader for SerializedPageReader { )); } + #[cfg(feature = "encryption")] let crypto_context = page_crypto_context( &self.crypto_context, *page_ordinal, @@ -729,6 +737,7 @@ impl PageReader for SerializedPageReader { Bytes::from(buffer), self.physical_type, self.decompressor.as_mut(), + #[cfg(feature = "encryption")] crypto_context, )?; if page.is_data_page() { @@ -765,6 +774,7 @@ impl PageReader for SerializedPageReader { bytes, self.physical_type, self.decompressor.as_mut(), + #[cfg(feature = "encryption")] None, )? } @@ -796,7 +806,11 @@ impl PageReader for SerializedPageReader { } } else { let mut read = self.reader.get_read(*offset as u64)?; - let (header_len, header) = read_page_header_len(&mut read, None)?; + let (header_len, header) = read_page_header_len( + &mut read, + #[cfg(feature = "encryption")] + None, + )?; *offset += header_len; *remaining_bytes -= header_len; let page_meta = if let Ok(page_meta) = (&header).try_into() { @@ -853,7 +867,11 @@ impl PageReader for SerializedPageReader { *remaining_bytes -= buffered_header.compressed_page_size as usize; } else { let mut read = self.reader.get_read(*offset as u64)?; - let (header_len, header) = read_page_header_len(&mut read, None)?; + let (header_len, header) = read_page_header_len( + &mut read, + #[cfg(feature = "encryption")] + None, + )?; let data_page_size = header.compressed_page_size as usize; *offset += header_len + data_page_size; *remaining_bytes -= header_len + data_page_size; @@ -876,6 +894,7 @@ impl PageReader for SerializedPageReader { } } +#[cfg(feature = "encryption")] fn page_crypto_context( crypto_context: &Option>, page_ordinal: usize, diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 6011795a93be..fd62e5c14f24 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1314,6 +1314,7 @@ mod tests { total_num_values as usize, None, Arc::new(props), + #[cfg(feature = "encryption")] None, ) .unwrap(); diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs index e32ae1aea147..483fc1c4b49c 100644 --- a/parquet/src/lib.rs +++ b/parquet/src/lib.rs @@ -138,7 +138,7 @@ experimental!(mod compression); experimental!(mod encodings); pub mod bloom_filter; -//#[cfg(feature = "encryption")] +#[cfg(feature = "encryption")] experimental!(mod encryption); pub mod file;