Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Jan 20, 2025
1 parent 3e7646d commit eccf6a0
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 37 deletions.
11 changes: 8 additions & 3 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,12 @@ impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
.schema_descr()
.column(self.column_idx);

let file_decryptor = self.metadata.file_decryptor().clone().unwrap().get_column_decryptor(column_name.name().as_bytes());
let file_decryptor = self
.metadata
.file_decryptor()
.clone()
.unwrap()
.get_column_decryptor(column_name.name().as_bytes());
let data_decryptor = Arc::new(file_decryptor.clone());
let metadata_decryptor = Arc::new(file_decryptor.clone());

Expand Down Expand Up @@ -1880,8 +1885,8 @@ mod tests {

let decryption_properties = Some(
ciphers::FileDecryptionProperties::builder()
.with_column_key("float_field".as_bytes().to_vec(), column_1_key.to_vec())
.with_column_key("double_field".as_bytes().to_vec(), column_2_key.to_vec())
.with_column_key("double_field".as_bytes().to_vec(), column_1_key.to_vec())
.with_column_key("float_field".as_bytes().to_vec(), column_2_key.to_vec())
.build(),
);

Expand Down
8 changes: 6 additions & 2 deletions parquet/src/encryption/ciphers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,12 @@ impl FileDecryptionProperties {
}
pub fn has_footer_key(&self) -> bool { self.footer_key.is_some() }

pub fn aad_prefix(&self) -> Option<&Vec<u8>> {
self.aad_prefix.as_ref()
pub fn has_column_keys(&self) -> bool {
self.column_keys.is_some()
}

pub fn aad_prefix(&self) -> Option<Vec<u8>> {
self.aad_prefix.clone()
}
}

Expand Down
28 changes: 13 additions & 15 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ use crate::file::page_index::index::Index;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::statistics::{self, Statistics};
use crate::format::{
BoundaryOrder, ColumnChunk, ColumnCryptoMetaData, ColumnIndex, ColumnMetaData, OffsetIndex,
PageLocation, RowGroup, SizeStatistics, SortingColumn,
BoundaryOrder, ColumnChunk, ColumnCryptoMetaData, ColumnIndex, ColumnMetaData,
EncryptionAlgorithm, OffsetIndex, PageLocation, RowGroup, SizeStatistics, SortingColumn,
};
use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor,
Expand Down Expand Up @@ -646,7 +646,7 @@ impl RowGroupMetaData {
let total_byte_size = rg.total_byte_size;
let num_rows = rg.num_rows;
let mut columns = vec![];
for (i, (c, d)) in rg
for (i, (mut c, d)) in rg
.columns
.drain(0..)
.zip(schema_descr.columns())
Expand All @@ -657,27 +657,25 @@ impl RowGroupMetaData {
if let Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(crypto_metadata)) =
c.crypto_metadata.clone()
{
if decryptor.is_none() {
if c.encrypted_column_metadata.is_none() {
cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
} else {
let decryptor = decryptor.unwrap();
let column_name = crypto_metadata.path_in_schema.join(".");
if !decryptor.unwrap().has_column_key(&column_name.as_bytes()) {
if !decryptor.has_column_key(&column_name.as_bytes()) {
cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
break;
}
let aad_file_unique = decryptor.unwrap().aad_file_unique();
let aad_prefix = decryptor
.unwrap()
.decryption_properties()
.aad_prefix()
.unwrap();

let column_decryptor = decryptor
.unwrap()
.get_column_decryptor(column_name.as_bytes())
.footer_decryptor()
.unwrap();

let aad_file_unique = decryptor.aad_file_unique();
let aad_prefix: Vec<u8> = decryptor
.decryption_properties()
.aad_prefix()
.unwrap_or_default();
let column_aad = create_module_aad(
[aad_prefix.as_slice(), aad_file_unique.as_slice()]
.concat()
Expand All @@ -688,12 +686,12 @@ impl RowGroupMetaData {
None,
)?;

let buf = c.encrypted_column_metadata.unwrap();
let buf = c.encrypted_column_metadata.clone().unwrap();
let decrypted_cc_buf =
column_decryptor.decrypt(buf.as_slice().as_ref(), column_aad.as_ref())?;

let mut prot = TCompactSliceInputProtocol::new(decrypted_cc_buf.as_slice());
let c = ColumnChunk::read_from_in_protocol(&mut prot)?;
c.meta_data = Some(ColumnMetaData::read_from_in_protocol(&mut prot)?);
cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
}
} else {
Expand Down
31 changes: 16 additions & 15 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,6 @@ impl ParquetMetaDataReader {
if file_decryption_properties.is_none() {
return Err(general_err!("Parquet file has an encrypted footer but no decryption properties were provided"));
};
let file_decryption_properties = file_decryption_properties;

let t_file_crypto_metadata: TFileCryptoMetaData =
TFileCryptoMetaData::read_from_in_protocol(&mut prot)
Expand Down Expand Up @@ -762,18 +761,6 @@ impl ParquetMetaDataReader {
.map_err(|e| general_err!("Could not parse metadata: {}", e))?;
let schema = types::from_thrift(&t_file_metadata.schema)?;
let schema_descr = Arc::new(SchemaDescriptor::new(schema));
let mut row_groups = Vec::new();
// TODO: row group filtering
for rg in t_file_metadata.row_groups {
row_groups.push(RowGroupMetaData::from_thrift(
schema_descr.clone(),
rg,
#[cfg(feature = "encryption")]
decryptor.as_ref(),
)?);
}
let column_orders =
Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;

// todo add file decryptor
#[cfg(feature = "encryption")]
Expand All @@ -786,16 +773,30 @@ impl ParquetMetaDataReader {
}; // todo decr: add support for GCMCTRV1
let aad_file_unique = aes_gcm_algo.aad_file_unique.unwrap();
let aad_prefix: Vec<u8> = aes_gcm_algo.aad_prefix.unwrap_or_default();
let fdp = file_decryption_properties.unwrap();

decryptor = Some(FileDecryptor::new(
fdp,
file_decryption_properties.unwrap(),
aad_file_unique.clone(),
aad_prefix.clone(),
));
// todo get key_metadata etc. Set file decryptor in return value
// todo check signature
}

let mut row_groups = Vec::new();
// TODO: row group filtering
for rg in t_file_metadata.row_groups {
let r = RowGroupMetaData::from_thrift(
schema_descr.clone(),
rg,
#[cfg(feature = "encryption")]
decryptor.as_ref(),
)?;
row_groups.push(r);
}
let column_orders =
Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;

let file_metadata = FileMetaData::new(
t_file_metadata.version,
t_file_metadata.num_rows,
Expand Down
7 changes: 5 additions & 2 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ pub(crate) fn read_page_header<T: Read>(
if !decryptor.has_footer_key() || !decryptor.footer_decryptor().is_some() {
let mut prot = TCompactInputProtocol::new(input);
let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
return Ok(page_header)
return Ok(page_header);
};

// let file_decryptor = decryptor.column_decryptor();
Expand All @@ -380,7 +380,10 @@ pub(crate) fn read_page_header<T: Read>(
let mut ciphertext: Vec<u8> = vec![];
input.read_to_end(&mut ciphertext)?;

let buf = data_decryptor.footer_decryptor().unwrap().decrypt(&ciphertext, aad.as_ref())?;
let buf = data_decryptor
.footer_decryptor()
.unwrap()
.decrypt(&ciphertext, aad.as_ref())?;

let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
Expand Down

0 comments on commit eccf6a0

Please sign in to comment.