Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Bumped to latest parquet. (#352)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Aug 30, 2021
1 parent 1be7586 commit d8aacdb
Show file tree
Hide file tree
Showing 23 changed files with 195 additions and 391 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ futures = { version = "0.3", optional = true }
# for faster hashing
ahash = { version = "0.7", optional = true }

parquet2 = { version = "0.3", optional = true, default_features = false, features = ["stream"] }
parquet2 = { version = "0.4", optional = true, default_features = false, features = ["stream"] }

# for division/remainder optimization at runtime
strength_reduce = { version = "0.2", optional = true }
Expand Down
5 changes: 3 additions & 2 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,13 @@ fn main() -> Result<()> {

let mut writer = File::create(write_path)?;

write_file(
let _ = write_file(
&mut writer,
row_groups,
&schema,
parquet_schema,
options,
None,
)
)?;
Ok(())
}
9 changes: 4 additions & 5 deletions examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result<Box<
// the parquet's physical, converted and logical types.
let arrow_schema = read::get_schema(&file_metadata)?;

// get the columns' metadata
let metadata = file_metadata.row_groups[row_group].column(column);

// Construct an iterator over pages. This binds `file` to this iterator, and each iteration
// is IO intensive as it will read a compressed page into memory. There is almost no CPU work
// on this operation
let pages =
read::get_page_iterator(&file_metadata, row_group, column, &mut file, None, vec![])?;

// get the columns' metadata
let metadata = file_metadata.row_groups[row_group].column(column);
let pages = read::get_page_iterator(metadata, &mut file, None, vec![])?;

// get the columns' logical type
let data_type = arrow_schema.fields()[column].data_type().clone();
Expand Down
14 changes: 4 additions & 10 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,11 @@ fn parallel_read(path: &str) -> Result<Vec<Box<dyn Array>>> {
for column in 0..producer_metadata.schema().num_columns() {
for row_group in 0..producer_metadata.row_groups.len() {
let start = SystemTime::now();
let column_metadata = producer_metadata.row_groups[row_group].column(column);
println!("produce start: {} {}", column, row_group);
let pages = read::get_page_iterator(
&producer_metadata,
row_group,
column,
&mut file,
None,
vec![],
)
.unwrap()
.collect::<Vec<_>>();
let pages = read::get_page_iterator(column_metadata, &mut file, None, vec![])
.unwrap()
.collect::<Vec<_>>();
println!(
"produce end - {:?}: {} {}",
start.elapsed().unwrap(),
Expand Down
5 changes: 3 additions & 2 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()>
let mut file = File::create(path)?;

// Write the file. Note that, at present, any error results in a corrupted file.
write_file(
let _ = write_file(
&mut file,
row_groups,
&schema,
parquet_schema,
options,
None,
)
)?;
Ok(())
}

fn main() -> Result<()> {
Expand Down
5 changes: 3 additions & 2 deletions examples/parquet_write_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ fn write_batch(path: &str, batch: RecordBatch) -> Result<()> {

// Write the file. Note that, at present, any error results in a corrupted file.
let parquet_schema = row_groups.parquet_schema().clone();
write_file(
let _ = write_file(
&mut file,
row_groups,
&schema,
parquet_schema,
options,
None,
)
)?;
Ok(())
}

fn main() -> Result<()> {
Expand Down
11 changes: 3 additions & 8 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::{pin_mut, Stream, StreamExt};
use parquet2::{
encoding::{bitpacking, delta_length_byte_array, hybrid_rle, uleb128, Encoding},
encoding::{delta_length_byte_array, hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{BinaryPageDict, DataPage},
read::StreamingIterator,
Expand Down Expand Up @@ -37,12 +37,7 @@ fn read_dict_buffer<O: Offset>(
let bit_width = indices_buffer[0];
let indices_buffer = &indices_buffer[1..];

let (_, consumed) = uleb128::decode(indices_buffer);
let indices_buffer = &indices_buffer[consumed..];

let non_null_indices_len = indices_buffer.len() * 8 / bit_width as usize;

let mut indices = bitpacking::Decoder::new(indices_buffer, bit_width, non_null_indices_len);
let mut indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, length);

let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);

Expand Down Expand Up @@ -214,7 +209,7 @@ fn extend_from_page<O: Offset>(
assert!(descriptor.max_def_level() <= 1);
let is_optional = descriptor.max_def_level() == 1;

let (validity_buffer, values_buffer, version) = utils::split_buffer(page, is_optional);
let (_, validity_buffer, values_buffer, version) = utils::split_buffer(page, descriptor);

match (&page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
Expand Down
12 changes: 4 additions & 8 deletions src/io/parquet/read/binary/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use parquet2::{
encoding::{bitpacking, hybrid_rle, uleb128, Encoding},
encoding::{hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{BinaryPageDict, DataPage},
read::StreamingIterator,
Expand Down Expand Up @@ -40,12 +40,8 @@ fn read_dict_optional<K, O>(
let bit_width = indices_buffer[0];
let indices_buffer = &indices_buffer[1..];

let (_, consumed) = uleb128::decode(indices_buffer);
let indices_buffer = &indices_buffer[consumed..];

let non_null_indices_len = indices_buffer.len() * 8 / bit_width as usize;

let mut new_indices = bitpacking::Decoder::new(indices_buffer, bit_width, non_null_indices_len);
let mut new_indices =
hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional);

let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);

Expand Down Expand Up @@ -97,7 +93,7 @@ where
assert_eq!(descriptor.max_rep_level(), 0);
let is_optional = descriptor.max_def_level() == 1;

let (validity_buffer, values_buffer, version) = other_utils::split_buffer(page, is_optional);
let (_, validity_buffer, values_buffer, version) = other_utils::split_buffer(page, descriptor);

match (&page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
Expand Down
134 changes: 40 additions & 94 deletions src/io/parquet/read/binary/nested.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use std::sync::Arc;

use parquet2::{
encoding::Encoding,
encoding::{hybrid_rle::HybridRleDecoder, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{DataPage, DataPageHeader, DataPageHeaderExt},
read::{
levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder},
StreamingIterator,
},
page::DataPage,
read::{levels::get_bit_width, StreamingIterator},
};

use super::super::nested_utils::*;
Expand Down Expand Up @@ -65,16 +62,13 @@ fn read<O: Offset>(

match (rep_level_encoding.0, def_level_encoding.0) {
(Encoding::Rle, Encoding::Rle) => {
let rep_levels = RLEDecoder::new(
rep_levels,
get_bit_width(rep_level_encoding.1),
additional as u32,
);
let rep_levels =
HybridRleDecoder::new(rep_levels, get_bit_width(rep_level_encoding.1), additional);
if is_nullable {
let def_levels = RLEDecoder::new(
let def_levels = HybridRleDecoder::new(
def_levels,
get_bit_width(def_level_encoding.1),
additional as u32,
additional,
);
let new_values = utils::BinaryIter::new(values_buffer);
read_values(
Expand All @@ -89,11 +83,8 @@ fn read<O: Offset>(
read_plain_required(values_buffer, additional, offsets, values)
}

let def_levels = RLEDecoder::new(
def_levels,
get_bit_width(def_level_encoding.1),
additional as u32,
);
let def_levels =
HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), additional);

extend_offsets(
rep_levels,
Expand All @@ -119,83 +110,38 @@ fn extend_from_page<O: Offset>(
) -> Result<()> {
let additional = page.num_values();

match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding(), Encoding::Rle);
assert_eq!(header.repetition_level_encoding(), Encoding::Rle);

match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => {
let (rep_levels, def_levels, values_buffer) = split_buffer_v1(
page.buffer(),
descriptor.max_rep_level() > 0,
descriptor.max_def_level() > 0,
);
read(
rep_levels,
def_levels,
values_buffer,
additional,
(
&header.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
is_nullable,
nested,
offsets,
values,
validity,
)
}
_ => {
return Err(utils::not_implemented(
&page.encoding(),
is_nullable,
page.dictionary_page().is_some(),
"V1",
"primitive",
))
}
}
let (rep_levels, def_levels, values_buffer, version) = utils::split_buffer(page, descriptor);

match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => read(
rep_levels,
def_levels,
values_buffer,
additional,
(
&page.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&page.definition_level_encoding(),
descriptor.max_def_level(),
),
is_nullable,
nested,
offsets,
values,
validity,
),
_ => {
return Err(utils::not_implemented(
&page.encoding(),
is_nullable,
page.dictionary_page().is_some(),
version,
"primitive",
))
}
DataPageHeader::V2(header) => match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => {
let def_level_buffer_length = header.definition_levels_byte_length as usize;
let rep_level_buffer_length = header.repetition_levels_byte_length as usize;
let (rep_levels, def_levels, values_buffer) = split_buffer_v2(
page.buffer(),
rep_level_buffer_length,
def_level_buffer_length,
);
read(
rep_levels,
def_levels,
values_buffer,
additional,
(&Encoding::Rle, descriptor.max_rep_level()),
(&Encoding::Rle, descriptor.max_def_level()),
is_nullable,
nested,
offsets,
values,
validity,
)
}
_ => {
return Err(utils::not_implemented(
&page.encoding(),
is_nullable,
page.dictionary_page().is_some(),
"V2",
"primitive",
))
}
},
};
}
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ fn extend_from_page(
assert!(descriptor.max_def_level() <= 1);
let is_optional = descriptor.max_def_level() == 1;

let (validity_buffer, values_buffer, version) = utils::split_buffer(page, is_optional);
let (_, validity_buffer, values_buffer, version) = utils::split_buffer(page, descriptor);

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::Plain, None, true) => read_optional(
Expand Down
Loading

0 comments on commit d8aacdb

Please sign in to comment.