Skip to content

Commit

Permalink
feat: support parsing for parquet writer option (#4938)
Browse files Browse the repository at this point in the history
* feat: support parsing for parquet writer option

Signed-off-by: fan <[email protected]>

* fix clippy warning

Signed-off-by: fan <[email protected]>

* add tests

Signed-off-by: fan <[email protected]>

* follow reviews

Signed-off-by: fan <[email protected]>

* fix only support lower and uppercase

Signed-off-by: fan <[email protected]>

---------

Signed-off-by: fan <[email protected]>
  • Loading branch information
fansehep authored Oct 18, 2023
1 parent 952cd2e commit a94ccff
Show file tree
Hide file tree
Showing 2 changed files with 253 additions and 0 deletions.
185 changes: 185 additions & 0 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Contains Rust mappings for Thrift definition.
//! Refer to [`parquet.thrift`](https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift) file to see raw definitions.
use std::str::FromStr;
use std::{fmt, str};

pub use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel};
Expand Down Expand Up @@ -278,6 +279,29 @@ pub enum Encoding {
BYTE_STREAM_SPLIT,
}

impl FromStr for Encoding {
type Err = ParquetError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"PLAIN" | "plain" => Ok(Encoding::PLAIN),
"PLAIN_DICTIONARY" | "plain_dictionary" => Ok(Encoding::PLAIN_DICTIONARY),
"RLE" | "rle" => Ok(Encoding::RLE),
"BIT_PACKED" | "bit_packed" => Ok(Encoding::BIT_PACKED),
"DELTA_BINARY_PACKED" | "delta_binary_packed" => {
Ok(Encoding::DELTA_BINARY_PACKED)
}
"DELTA_LENGTH_BYTE_ARRAY" | "delta_length_byte_array" => {
Ok(Encoding::DELTA_LENGTH_BYTE_ARRAY)
}
"DELTA_BYTE_ARRAY" | "delta_byte_array" => Ok(Encoding::DELTA_BYTE_ARRAY),
"RLE_DICTIONARY" | "rle_dictionary" => Ok(Encoding::RLE_DICTIONARY),
"BYTE_STREAM_SPLIT" | "byte_stream_split" => Ok(Encoding::BYTE_STREAM_SPLIT),
_ => Err(general_err!("unknown encoding: {}", s)),
}
}
}

// ----------------------------------------------------------------------
// Mirrors `parquet::CompressionCodec`

Expand All @@ -295,6 +319,90 @@ pub enum Compression {
LZ4_RAW,
}

fn split_compression_string(
str_setting: &str,
) -> Result<(&str, Option<u32>), ParquetError> {
let split_setting = str_setting.split_once('(');

match split_setting {
Some((codec, level_str)) => {
let level =
&level_str[..level_str.len() - 1]
.parse::<u32>()
.map_err(|_| {
ParquetError::General(format!(
"invalid compression level: {}",
level_str
))
})?;
Ok((codec, Some(*level)))
}
None => Ok((str_setting, None)),
}
}

fn check_level_is_none(level: &Option<u32>) -> Result<(), ParquetError> {
if level.is_some() {
return Err(ParquetError::General("level is not support".to_string()));
}

Ok(())
}

fn require_level(codec: &str, level: Option<u32>) -> Result<u32, ParquetError> {
level.ok_or(ParquetError::General(format!("{} require level", codec)))
}

impl FromStr for Compression {
type Err = ParquetError;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let (codec, level) = split_compression_string(s)?;

let c = match codec {
"UNCOMPRESSED" | "uncompressed" => {
check_level_is_none(&level)?;
Compression::UNCOMPRESSED
}
"SNAPPY" | "snappy" => {
check_level_is_none(&level)?;
Compression::SNAPPY
}
"GZIP" | "gzip" => {
let level = require_level(codec, level)?;
Compression::GZIP(GzipLevel::try_new(level)?)
}
"LZO" | "lzo" => {
check_level_is_none(&level)?;
Compression::LZO
}
"BROTLI" | "brotli" => {
let level = require_level(codec, level)?;
Compression::BROTLI(BrotliLevel::try_new(level)?)
}
"LZ4" | "lz4" => {
check_level_is_none(&level)?;
Compression::LZ4
}
"ZSTD" | "zstd" => {
let level = require_level(codec, level)?;
Compression::ZSTD(ZstdLevel::try_new(level as i32)?)
}
"LZ4_RAW" | "lz4_raw" => {
check_level_is_none(&level)?;
Compression::LZ4_RAW
}
_ => {
return Err(ParquetError::General(format!(
"unsupport compression {codec}"
)));
}
};

Ok(c)
}
}

// ----------------------------------------------------------------------
// Mirrors `parquet::PageType`

Expand Down Expand Up @@ -2130,4 +2238,81 @@ mod tests {
);
assert_eq!(ColumnOrder::UNDEFINED.sort_order(), SortOrder::SIGNED);
}

#[test]
fn test_parse_encoding() {
let mut encoding: Encoding = "PLAIN".parse().unwrap();
assert_eq!(encoding, Encoding::PLAIN);
encoding = "PLAIN_DICTIONARY".parse().unwrap();
assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
encoding = "RLE".parse().unwrap();
assert_eq!(encoding, Encoding::RLE);
encoding = "BIT_PACKED".parse().unwrap();
assert_eq!(encoding, Encoding::BIT_PACKED);
encoding = "DELTA_BINARY_PACKED".parse().unwrap();
assert_eq!(encoding, Encoding::DELTA_BINARY_PACKED);
encoding = "DELTA_LENGTH_BYTE_ARRAY".parse().unwrap();
assert_eq!(encoding, Encoding::DELTA_LENGTH_BYTE_ARRAY);
encoding = "DELTA_BYTE_ARRAY".parse().unwrap();
assert_eq!(encoding, Encoding::DELTA_BYTE_ARRAY);
encoding = "RLE_DICTIONARY".parse().unwrap();
assert_eq!(encoding, Encoding::RLE_DICTIONARY);
encoding = "BYTE_STREAM_SPLIT".parse().unwrap();
assert_eq!(encoding, Encoding::BYTE_STREAM_SPLIT);

// test lowercase
encoding = "byte_stream_split".parse().unwrap();
assert_eq!(encoding, Encoding::BYTE_STREAM_SPLIT);

// test unknown string
match "plain_xxx".parse::<Encoding>() {
Ok(e) => {
panic!("Should not be able to parse {:?}", e);
}
Err(e) => {
assert_eq!(e.to_string(), "Parquet error: unknown encoding: plain_xxx");
}
}
}

#[test]
fn test_parse_compression() {
let mut compress: Compression = "snappy".parse().unwrap();
assert_eq!(compress, Compression::SNAPPY);
compress = "lzo".parse().unwrap();
assert_eq!(compress, Compression::LZO);
compress = "zstd(3)".parse().unwrap();
assert_eq!(compress, Compression::ZSTD(ZstdLevel::try_new(3).unwrap()));
compress = "LZ4_RAW".parse().unwrap();
assert_eq!(compress, Compression::LZ4_RAW);
compress = "uncompressed".parse().unwrap();
assert_eq!(compress, Compression::UNCOMPRESSED);
compress = "snappy".parse().unwrap();
assert_eq!(compress, Compression::SNAPPY);
compress = "gzip(9)".parse().unwrap();
assert_eq!(compress, Compression::GZIP(GzipLevel::try_new(9).unwrap()));
compress = "lzo".parse().unwrap();
assert_eq!(compress, Compression::LZO);
compress = "brotli(3)".parse().unwrap();
assert_eq!(
compress,
Compression::BROTLI(BrotliLevel::try_new(3).unwrap())
);
compress = "lz4".parse().unwrap();
assert_eq!(compress, Compression::LZ4);

// test unknown compression
let mut err = "plain_xxx".parse::<Encoding>().unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: unknown encoding: plain_xxx"
);

// test invalid compress level
err = "gzip(-10)".parse::<Encoding>().unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: unknown encoding: gzip(-10)"
);
}
}
68 changes: 68 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

//! Configuration via [`WriterProperties`] and [`ReaderProperties`]
use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};

use crate::basic::{Compression, Encoding};
Expand Down Expand Up @@ -72,6 +73,18 @@ impl WriterVersion {
}
}

impl FromStr for WriterVersion {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"PARQUET_1_0" | "parquet_1_0" => Ok(WriterVersion::PARQUET_1_0),
"PARQUET_2_0" | "parquet_2_0" => Ok(WriterVersion::PARQUET_2_0),
_ => Err(format!("Invalid writer version: {}", s)),
}
}
}

/// Reference counted writer properties.
pub type WriterPropertiesPtr = Arc<WriterProperties>;

Expand Down Expand Up @@ -655,6 +668,19 @@ pub enum EnabledStatistics {
Page,
}

impl FromStr for EnabledStatistics {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"NONE" | "none" => Ok(EnabledStatistics::None),
"CHUNK" | "chunk" => Ok(EnabledStatistics::Chunk),
"PAGE" | "page" => Ok(EnabledStatistics::Page),
_ => Err(format!("Invalid statistics arg: {}", s)),
}
}
}

impl Default for EnabledStatistics {
fn default() -> Self {
DEFAULT_STATISTICS_ENABLED
Expand Down Expand Up @@ -1182,4 +1208,46 @@ mod tests {

assert_eq!(props.codec_options(), &codec_options);
}

#[test]
fn test_parse_writerversion() {
let mut writer_version = "PARQUET_1_0".parse::<WriterVersion>().unwrap();
assert_eq!(writer_version, WriterVersion::PARQUET_1_0);
writer_version = "PARQUET_2_0".parse::<WriterVersion>().unwrap();
assert_eq!(writer_version, WriterVersion::PARQUET_2_0);

// test lowercase
writer_version = "parquet_1_0".parse::<WriterVersion>().unwrap();
assert_eq!(writer_version, WriterVersion::PARQUET_1_0);

// test invalid version
match "PARQUET_-1_0".parse::<WriterVersion>() {
Ok(_) => panic!("Should not be able to parse PARQUET_-1_0"),
Err(e) => {
assert_eq!(e, "Invalid writer version: PARQUET_-1_0");
}
}
}

#[test]
fn test_parse_enabledstatistics() {
let mut enabled_statistics = "NONE".parse::<EnabledStatistics>().unwrap();
assert_eq!(enabled_statistics, EnabledStatistics::None);
enabled_statistics = "CHUNK".parse::<EnabledStatistics>().unwrap();
assert_eq!(enabled_statistics, EnabledStatistics::Chunk);
enabled_statistics = "PAGE".parse::<EnabledStatistics>().unwrap();
assert_eq!(enabled_statistics, EnabledStatistics::Page);

// test lowercase
enabled_statistics = "none".parse::<EnabledStatistics>().unwrap();
assert_eq!(enabled_statistics, EnabledStatistics::None);

//test invalid statistics
match "ChunkAndPage".parse::<EnabledStatistics>() {
Ok(_) => panic!("Should not be able to parse ChunkAndPage"),
Err(e) => {
assert_eq!(e, "Invalid statistics arg: ChunkAndPage");
}
}
}
}

0 comments on commit a94ccff

Please sign in to comment.