From d48c610d0caf96d8b9836afd9b4cb4c0fabdb030 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Sun, 10 Sep 2023 09:54:24 +0100 Subject: [PATCH] Add `schema` and `file_sort_order` to `read/register_parquet` --- src/context.rs | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/src/context.rs b/src/context.rs index b5ba4d87e..632b179a1 100644 --- a/src/context.rs +++ b/src/context.rs @@ -31,6 +31,7 @@ use crate::catalog::{PyCatalog, PyTable}; use crate::dataframe::PyDataFrame; use crate::dataset::Dataset; use crate::errors::{py_datafusion_err, DataFusionError}; +use crate::expr::PyExpr; use crate::physical_plan::PyExecutionPlan; use crate::record_batch::PyRecordBatchStream; use crate::sql::logical::PyLogicalPlan; @@ -445,7 +446,10 @@ impl PySessionContext { #[allow(clippy::too_many_arguments)] #[pyo3(signature = (name, path, table_partition_cols=vec![], parquet_pruning=true, - file_extension=".parquet"))] + file_extension=".parquet", + skip_metadata=true, + schema=None, + file_sort_order=None))] fn register_parquet( &mut self, name: &str, @@ -453,12 +457,23 @@ impl PySessionContext { table_partition_cols: Vec<(String, String)>, parquet_pruning: bool, file_extension: &str, + skip_metadata: bool, + schema: Option>, + file_sort_order: Option>>, py: Python, ) -> PyResult<()> { let mut options = ParquetReadOptions::default() .table_partition_cols(convert_table_partition_cols(table_partition_cols)?) - .parquet_pruning(parquet_pruning); + .parquet_pruning(parquet_pruning) + .skip_metadata(skip_metadata); options.file_extension = file_extension; + options.schema = schema.as_ref().map(|x| &x.0); + options.file_sort_order = file_sort_order + .unwrap_or(vec![]) + .into_iter() + .map(|e| e.into_iter().map(|f| f.into()).collect()) + .collect(); + let result = self.ctx.register_parquet(name, path, options); wait_for_future(py, result).map_err(DataFusionError::from)?; Ok(()) @@ -722,7 +737,9 @@ impl PySessionContext { table_partition_cols=vec![], parquet_pruning=true, file_extension=".parquet", - skip_metadata=true))] + skip_metadata=true, + schema=None, + file_sort_order=None))] fn read_parquet( &self, path: &str, @@ -730,6 +747,8 @@ impl PySessionContext { parquet_pruning: bool, file_extension: &str, skip_metadata: bool, + schema: Option>, + file_sort_order: Option>>, py: Python, ) -> PyResult { let mut options = ParquetReadOptions::default() @@ -737,6 +756,12 @@ impl PySessionContext { .parquet_pruning(parquet_pruning) .skip_metadata(skip_metadata); options.file_extension = file_extension; + options.schema = schema.as_ref().map(|x| &x.0); + options.file_sort_order = file_sort_order + .unwrap_or(vec![]) + .into_iter() + .map(|e| e.into_iter().map(|f| f.into()).collect()) + .collect(); let result = self.ctx.read_parquet(path, options); let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?);