Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Bumped to parquet2 v0.4 #352

Merged
merged 1 commit into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ futures = { version = "0.3", optional = true }
# for faster hashing
ahash = { version = "0.7", optional = true }

parquet2 = { version = "0.3", optional = true, default_features = false, features = ["stream"] }
parquet2 = { version = "0.4", optional = true, default_features = false, features = ["stream"] }

# for division/remainder optimization at runtime
strength_reduce = { version = "0.2", optional = true }
Expand Down
5 changes: 3 additions & 2 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,13 @@ fn main() -> Result<()> {

let mut writer = File::create(write_path)?;

write_file(
let _ = write_file(
&mut writer,
row_groups,
&schema,
parquet_schema,
options,
None,
)
)?;
Ok(())
}
9 changes: 4 additions & 5 deletions examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result<Box<
// the parquet's physical, converted and logical types.
let arrow_schema = read::get_schema(&file_metadata)?;

// get the columns' metadata
let metadata = file_metadata.row_groups[row_group].column(column);

// Construct an iterator over pages. This binds `file` to this iterator, and each iteration
// is IO intensive as it will read a compressed page into memory. There is almost no CPU work
// on this operation
let pages =
read::get_page_iterator(&file_metadata, row_group, column, &mut file, None, vec![])?;

// get the columns' metadata
let metadata = file_metadata.row_groups[row_group].column(column);
let pages = read::get_page_iterator(metadata, &mut file, None, vec![])?;

// get the columns' logical type
let data_type = arrow_schema.fields()[column].data_type().clone();
Expand Down
14 changes: 4 additions & 10 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,11 @@ fn parallel_read(path: &str) -> Result<Vec<Box<dyn Array>>> {
for column in 0..producer_metadata.schema().num_columns() {
for row_group in 0..producer_metadata.row_groups.len() {
let start = SystemTime::now();
let column_metadata = producer_metadata.row_groups[row_group].column(column);
println!("produce start: {} {}", column, row_group);
let pages = read::get_page_iterator(
&producer_metadata,
row_group,
column,
&mut file,
None,
vec![],
)
.unwrap()
.collect::<Vec<_>>();
let pages = read::get_page_iterator(column_metadata, &mut file, None, vec![])
.unwrap()
.collect::<Vec<_>>();
println!(
"produce end - {:?}: {} {}",
start.elapsed().unwrap(),
Expand Down
5 changes: 3 additions & 2 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()>
let mut file = File::create(path)?;

// Write the file. Note that, at present, any error results in a corrupted file.
write_file(
let _ = write_file(
&mut file,
row_groups,
&schema,
parquet_schema,
options,
None,
)
)?;
Ok(())
}

fn main() -> Result<()> {
Expand Down
5 changes: 3 additions & 2 deletions examples/parquet_write_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ fn write_batch(path: &str, batch: RecordBatch) -> Result<()> {

// Write the file. Note that, at present, any error results in a corrupted file.
let parquet_schema = row_groups.parquet_schema().clone();
write_file(
let _ = write_file(
&mut file,
row_groups,
&schema,
parquet_schema,
options,
None,
)
)?;
Ok(())
}

fn main() -> Result<()> {
Expand Down
11 changes: 3 additions & 8 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::{pin_mut, Stream, StreamExt};
use parquet2::{
encoding::{bitpacking, delta_length_byte_array, hybrid_rle, uleb128, Encoding},
encoding::{delta_length_byte_array, hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{BinaryPageDict, DataPage},
read::StreamingIterator,
Expand Down Expand Up @@ -37,12 +37,7 @@ fn read_dict_buffer<O: Offset>(
let bit_width = indices_buffer[0];
let indices_buffer = &indices_buffer[1..];

let (_, consumed) = uleb128::decode(indices_buffer);
let indices_buffer = &indices_buffer[consumed..];

let non_null_indices_len = indices_buffer.len() * 8 / bit_width as usize;

let mut indices = bitpacking::Decoder::new(indices_buffer, bit_width, non_null_indices_len);
let mut indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, length);

let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);

Expand Down Expand Up @@ -214,7 +209,7 @@ fn extend_from_page<O: Offset>(
assert!(descriptor.max_def_level() <= 1);
let is_optional = descriptor.max_def_level() == 1;

let (validity_buffer, values_buffer, version) = utils::split_buffer(page, is_optional);
let (_, validity_buffer, values_buffer, version) = utils::split_buffer(page, descriptor);

match (&page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
Expand Down
12 changes: 4 additions & 8 deletions src/io/parquet/read/binary/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use parquet2::{
encoding::{bitpacking, hybrid_rle, uleb128, Encoding},
encoding::{hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{BinaryPageDict, DataPage},
read::StreamingIterator,
Expand Down Expand Up @@ -40,12 +40,8 @@ fn read_dict_optional<K, O>(
let bit_width = indices_buffer[0];
let indices_buffer = &indices_buffer[1..];

let (_, consumed) = uleb128::decode(indices_buffer);
let indices_buffer = &indices_buffer[consumed..];

let non_null_indices_len = indices_buffer.len() * 8 / bit_width as usize;

let mut new_indices = bitpacking::Decoder::new(indices_buffer, bit_width, non_null_indices_len);
let mut new_indices =
hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional);

let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);

Expand Down Expand Up @@ -97,7 +93,7 @@ where
assert_eq!(descriptor.max_rep_level(), 0);
let is_optional = descriptor.max_def_level() == 1;

let (validity_buffer, values_buffer, version) = other_utils::split_buffer(page, is_optional);
let (_, validity_buffer, values_buffer, version) = other_utils::split_buffer(page, descriptor);

match (&page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
Expand Down
134 changes: 40 additions & 94 deletions src/io/parquet/read/binary/nested.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use std::sync::Arc;

use parquet2::{
encoding::Encoding,
encoding::{hybrid_rle::HybridRleDecoder, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{DataPage, DataPageHeader, DataPageHeaderExt},
read::{
levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder},
StreamingIterator,
},
page::DataPage,
read::{levels::get_bit_width, StreamingIterator},
};

use super::super::nested_utils::*;
Expand Down Expand Up @@ -65,16 +62,13 @@ fn read<O: Offset>(

match (rep_level_encoding.0, def_level_encoding.0) {
(Encoding::Rle, Encoding::Rle) => {
let rep_levels = RLEDecoder::new(
rep_levels,
get_bit_width(rep_level_encoding.1),
additional as u32,
);
let rep_levels =
HybridRleDecoder::new(rep_levels, get_bit_width(rep_level_encoding.1), additional);
if is_nullable {
let def_levels = RLEDecoder::new(
let def_levels = HybridRleDecoder::new(
def_levels,
get_bit_width(def_level_encoding.1),
additional as u32,
additional,
);
let new_values = utils::BinaryIter::new(values_buffer);
read_values(
Expand All @@ -89,11 +83,8 @@ fn read<O: Offset>(
read_plain_required(values_buffer, additional, offsets, values)
}

let def_levels = RLEDecoder::new(
def_levels,
get_bit_width(def_level_encoding.1),
additional as u32,
);
let def_levels =
HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), additional);

extend_offsets(
rep_levels,
Expand All @@ -119,83 +110,38 @@ fn extend_from_page<O: Offset>(
) -> Result<()> {
let additional = page.num_values();

match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding(), Encoding::Rle);
assert_eq!(header.repetition_level_encoding(), Encoding::Rle);

match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => {
let (rep_levels, def_levels, values_buffer) = split_buffer_v1(
page.buffer(),
descriptor.max_rep_level() > 0,
descriptor.max_def_level() > 0,
);
read(
rep_levels,
def_levels,
values_buffer,
additional,
(
&header.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
is_nullable,
nested,
offsets,
values,
validity,
)
}
_ => {
return Err(utils::not_implemented(
&page.encoding(),
is_nullable,
page.dictionary_page().is_some(),
"V1",
"primitive",
))
}
}
let (rep_levels, def_levels, values_buffer, version) = utils::split_buffer(page, descriptor);

match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => read(
rep_levels,
def_levels,
values_buffer,
additional,
(
&page.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&page.definition_level_encoding(),
descriptor.max_def_level(),
),
is_nullable,
nested,
offsets,
values,
validity,
),
_ => {
return Err(utils::not_implemented(
&page.encoding(),
is_nullable,
page.dictionary_page().is_some(),
version,
"primitive",
))
}
DataPageHeader::V2(header) => match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => {
let def_level_buffer_length = header.definition_levels_byte_length as usize;
let rep_level_buffer_length = header.repetition_levels_byte_length as usize;
let (rep_levels, def_levels, values_buffer) = split_buffer_v2(
page.buffer(),
rep_level_buffer_length,
def_level_buffer_length,
);
read(
rep_levels,
def_levels,
values_buffer,
additional,
(&Encoding::Rle, descriptor.max_rep_level()),
(&Encoding::Rle, descriptor.max_def_level()),
is_nullable,
nested,
offsets,
values,
validity,
)
}
_ => {
return Err(utils::not_implemented(
&page.encoding(),
is_nullable,
page.dictionary_page().is_some(),
"V2",
"primitive",
))
}
},
};
}
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ fn extend_from_page(
assert!(descriptor.max_def_level() <= 1);
let is_optional = descriptor.max_def_level() == 1;

let (validity_buffer, values_buffer, version) = utils::split_buffer(page, is_optional);
let (_, validity_buffer, values_buffer, version) = utils::split_buffer(page, descriptor);

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::Plain, None, true) => read_optional(
Expand Down
Loading