Skip to content

Commit

Permalink
add test & fix single quote
Browse files Browse the repository at this point in the history
  • Loading branch information
Veeupup committed Dec 7, 2023
1 parent 5516189 commit d237d66
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 33 deletions.
78 changes: 45 additions & 33 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion::execution::context::SessionState;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::file::statistics::Statistics;
Expand Down Expand Up @@ -249,11 +250,17 @@ pub struct ParquetMetadataFunc {}

impl TableFunctionImpl for ParquetMetadataFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let Some(Expr::Column(Column { name, .. })) = exprs.get(0) else {
return plan_err!("parquet_metadata requires string argument as its input");
let filename = match exprs.get(0) {
Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
_ => {
return plan_err!(
"parquet_metadata requires string argument as its input"
);
}
};

let file = File::open(name.clone())?;
let file = File::open(filename.clone())?;
let reader = SerializedFileReader::new(file)?;
let metadata = reader.metadata();

Expand Down Expand Up @@ -309,7 +316,7 @@ impl TableFunctionImpl for ParquetMetadataFunc {
let mut total_uncompressed_size_arr = vec![];
for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
for (col_idx, column) in row_group.columns().iter().enumerate() {
filename_arr.push(name.clone());
filename_arr.push(filename.clone());
row_group_id_arr.push(rg_idx as i64);
row_group_num_rows_arr.push(row_group.num_rows());
row_group_num_columns_arr.push(row_group.num_columns() as i64);
Expand All @@ -320,38 +327,43 @@ impl TableFunctionImpl for ParquetMetadataFunc {
path_in_schema_arr.push(column.column_path().to_string());
type_arr.push(column.column_type().to_string());
if let Some(s) = column.statistics() {
let (min_val, max_val) = match s {
Statistics::Boolean(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int32(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int64(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int96(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Float(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Double(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::ByteArray(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::FixedLenByteArray(val) => {
(val.min().to_string(), val.max().to_string())
}
let (min_val, max_val) = if s.has_min_max_set() {
let (min_val, max_val) = match s {
Statistics::Boolean(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int32(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int64(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int96(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Float(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Double(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::ByteArray(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::FixedLenByteArray(val) => {
(val.min().to_string(), val.max().to_string())
}
};
(Some(min_val), Some(max_val))
} else {
(None, None)
};
stats_min_arr.push(Some(min_val.clone()));
stats_max_arr.push(Some(max_val.clone()));
stats_min_arr.push(min_val.clone());
stats_max_arr.push(max_val.clone());
stats_null_count_arr.push(Some(s.null_count() as i64));
stats_distinct_count_arr.push(s.distinct_count().map(|c| c as i64));
stats_min_value_arr.push(Some(min_val));
stats_max_value_arr.push(Some(max_val));
stats_min_value_arr.push(min_val);
stats_max_value_arr.push(max_val);
} else {
stats_min_arr.push(None);
stats_max_arr.push(None);
Expand Down
32 changes: 32 additions & 0 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, String> {

#[cfg(test)]
mod tests {
use datafusion::assert_batches_eq;

use super::*;

fn assert_conversion(input: &str, expected: Result<usize, String>) {
Expand Down Expand Up @@ -388,4 +390,34 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_parquet_metadata_works() -> Result<(), DataFusionError> {
let ctx = SessionContext::new();
ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));

// input with single quote
let sql =
"SELECT * FROM parquet_metadata('../datafusion/core/tests/data/fixed_size_list_array.parquet')";
let df = ctx.sql(&sql).await?;
let rbs = df.collect().await?;

let excepted = [
"+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+",
"| filename | row_group_id | row_group_num_rows | row_group_num_columns | row_group_bytes | column_id | file_offset | num_values | path_in_schema | type | stats_min | stats_max | stats_null_count | stats_distinct_count | stats_min_value | stats_max_value | compression | encodings | index_page_offset | dictionary_page_offset | data_page_offset | total_compressed_size | total_uncompressed_size |",
"+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+",
"| ../datafusion/core/tests/data/fixed_size_list_array.parquet | 0 | 2 | 1 | 123 | 0 | 125 | 4 | \"f0.list.item\" | INT64 | 1 | 4 | 0 | | 1 | 4 | SNAPPY | [RLE_DICTIONARY, PLAIN, RLE] | | 4 | 46 | 121 | 123 |",
"+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+",
];
assert_batches_eq!(excepted, &rbs);

// input with double quote
let sql =
"SELECT * FROM parquet_metadata(\"../datafusion/core/tests/data/fixed_size_list_array.parquet\")";
let df = ctx.sql(&sql).await?;
let rbs = df.collect().await?;
assert_batches_eq!(excepted, &rbs);

Ok(())
}
}

0 comments on commit d237d66

Please sign in to comment.