diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index 4838df474ccff..24f3399ee2be5 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -30,6 +30,7 @@ use datafusion::execution::context::SessionState; use datafusion::logical_expr::Expr; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::ExecutionPlan; +use datafusion::scalar::ScalarValue; use parquet::file::reader::FileReader; use parquet::file::serialized_reader::SerializedFileReader; use parquet::file::statistics::Statistics; @@ -249,11 +250,17 @@ pub struct ParquetMetadataFunc {} impl TableFunctionImpl for ParquetMetadataFunc { fn call(&self, exprs: &[Expr]) -> Result> { - let Some(Expr::Column(Column { name, .. })) = exprs.get(0) else { - return plan_err!("parquet_metadata requires string argument as its input"); + let filename = match exprs.get(0) { + Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet') + Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet") + _ => { + return plan_err!( + "parquet_metadata requires string argument as its input" + ); + } }; - let file = File::open(name.clone())?; + let file = File::open(filename.clone())?; let reader = SerializedFileReader::new(file)?; let metadata = reader.metadata(); @@ -309,7 +316,7 @@ impl TableFunctionImpl for ParquetMetadataFunc { let mut total_uncompressed_size_arr = vec![]; for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() { for (col_idx, column) in row_group.columns().iter().enumerate() { - filename_arr.push(name.clone()); + filename_arr.push(filename.clone()); row_group_id_arr.push(rg_idx as i64); row_group_num_rows_arr.push(row_group.num_rows()); row_group_num_columns_arr.push(row_group.num_columns() as i64); @@ -320,38 +327,43 @@ impl TableFunctionImpl for ParquetMetadataFunc { path_in_schema_arr.push(column.column_path().to_string()); type_arr.push(column.column_type().to_string()); if let Some(s) = column.statistics() { - let (min_val, max_val) = match s { - Statistics::Boolean(val) => { - (val.min().to_string(), val.max().to_string()) - } - Statistics::Int32(val) => { - (val.min().to_string(), val.max().to_string()) - } - Statistics::Int64(val) => { - (val.min().to_string(), val.max().to_string()) - } - Statistics::Int96(val) => { - (val.min().to_string(), val.max().to_string()) - } - Statistics::Float(val) => { - (val.min().to_string(), val.max().to_string()) - } - Statistics::Double(val) => { - (val.min().to_string(), val.max().to_string()) - } - Statistics::ByteArray(val) => { - (val.min().to_string(), val.max().to_string()) - } - Statistics::FixedLenByteArray(val) => { - (val.min().to_string(), val.max().to_string()) - } + let (min_val, max_val) = if s.has_min_max_set() { + let (min_val, max_val) = match s { + Statistics::Boolean(val) => { + (val.min().to_string(), val.max().to_string()) + } + Statistics::Int32(val) => { + (val.min().to_string(), val.max().to_string()) + } + Statistics::Int64(val) => { + (val.min().to_string(), val.max().to_string()) + } + Statistics::Int96(val) => { + (val.min().to_string(), val.max().to_string()) + } + Statistics::Float(val) => { + (val.min().to_string(), val.max().to_string()) + } + Statistics::Double(val) => { + (val.min().to_string(), val.max().to_string()) + } + Statistics::ByteArray(val) => { + (val.min().to_string(), val.max().to_string()) + } + Statistics::FixedLenByteArray(val) => { + (val.min().to_string(), val.max().to_string()) + } + }; + (Some(min_val), Some(max_val)) + } else { + (None, None) }; - stats_min_arr.push(Some(min_val.clone())); - stats_max_arr.push(Some(max_val.clone())); + stats_min_arr.push(min_val.clone()); + stats_max_arr.push(max_val.clone()); stats_null_count_arr.push(Some(s.null_count() as i64)); stats_distinct_count_arr.push(s.distinct_count().map(|c| c as i64)); - stats_min_value_arr.push(Some(min_val)); - stats_max_value_arr.push(Some(max_val)); + stats_min_value_arr.push(min_val); + stats_max_value_arr.push(max_val); } else { stats_min_arr.push(None); stats_max_arr.push(None); diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index d92e64390eb27..d07503858f5c0 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -331,6 +331,8 @@ fn extract_memory_pool_size(size: &str) -> Result { #[cfg(test)] mod tests { + use datafusion::assert_batches_eq; + use super::*; fn assert_conversion(input: &str, expected: Result) { @@ -388,4 +390,34 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_parquet_metadata_works() -> Result<(), DataFusionError> { + let ctx = SessionContext::new(); + ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {})); + + // input with single quote + let sql = + "SELECT * FROM parquet_metadata('../datafusion/core/tests/data/fixed_size_list_array.parquet')"; + let df = ctx.sql(&sql).await?; + let rbs = df.collect().await?; + + let excepted = [ + "+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+", + "| filename | row_group_id | row_group_num_rows | row_group_num_columns | row_group_bytes | column_id | file_offset | num_values | path_in_schema | type | stats_min | stats_max | stats_null_count | stats_distinct_count | stats_min_value | stats_max_value | compression | encodings | index_page_offset | dictionary_page_offset | data_page_offset | total_compressed_size | total_uncompressed_size |", + "+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+", + "| ../datafusion/core/tests/data/fixed_size_list_array.parquet | 0 | 2 | 1 | 123 | 0 | 125 | 4 | \"f0.list.item\" | INT64 | 1 | 4 | 0 | | 1 | 4 | SNAPPY | [RLE_DICTIONARY, PLAIN, RLE] | | 4 | 46 | 121 | 123 |", + "+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+", + ]; + assert_batches_eq!(excepted, &rbs); + + // input with double quote + let sql = + "SELECT * FROM parquet_metadata(\"../datafusion/core/tests/data/fixed_size_list_array.parquet\")"; + let df = ctx.sql(&sql).await?; + let rbs = df.collect().await?; + assert_batches_eq!(excepted, &rbs); + + Ok(()) + } }