From b391bfd8164d1e13db7b275e69506bc8783d14ac Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Tue, 18 Jun 2024 14:42:34 -0500 Subject: [PATCH 1/4] let pyo3 convert the StorageContexts argument in PySessionContext::register_object_store --- src/context.rs | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/context.rs b/src/context.rs index ec63adb26..a4f73e650 100644 --- a/src/context.rs +++ b/src/context.rs @@ -291,24 +291,17 @@ impl PySessionContext { pub fn register_object_store( &mut self, scheme: &str, - store: &Bound<'_, PyAny>, + store: StorageContexts, host: Option<&str>, ) -> PyResult<()> { - let res: Result<(Arc, String), PyErr> = - match StorageContexts::extract_bound(store) { - Ok(store) => match store { - StorageContexts::AmazonS3(s3) => Ok((s3.inner, s3.bucket_name)), - StorageContexts::GoogleCloudStorage(gcs) => Ok((gcs.inner, gcs.bucket_name)), - StorageContexts::MicrosoftAzure(azure) => { - Ok((azure.inner, azure.container_name)) - } - StorageContexts::LocalFileSystem(local) => Ok((local.inner, "".to_string())), - }, - Err(_e) => Err(PyValueError::new_err("Invalid object store")), - }; - // for most stores the "host" is the bucket name and can be inferred from the store - let (store, upstream_host) = res?; + let (store, upstream_host): (Arc, String) = match store { + StorageContexts::AmazonS3(s3) => (s3.inner, s3.bucket_name), + StorageContexts::GoogleCloudStorage(gcs) => (gcs.inner, gcs.bucket_name), + StorageContexts::MicrosoftAzure(azure) => (azure.inner, azure.container_name), + StorageContexts::LocalFileSystem(local) => (local.inner, "".to_string()), + }; + // let users override the host to match the api signature from upstream let derived_host = if let Some(host) = host { host From 76064f041687a5ac9655bea2c46a709bfdb8c0ca Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Tue, 18 Jun 2024 14:49:18 -0500 Subject: [PATCH 2/4] clean PySessionContext methods from_pylist and from_pydict --- src/context.rs | 52 +++++++++++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/src/context.rs b/src/context.rs index a4f73e650..718d741d5 100644 --- a/src/context.rs +++ b/src/context.rs @@ -60,7 +60,7 @@ use datafusion::prelude::{ AvroReadOptions, CsvReadOptions, DataFrame, NdJsonReadOptions, ParquetReadOptions, }; use datafusion_common::ScalarValue; -use pyo3::types::PyTuple; +use pyo3::types::{PyDict, PyList, PyTuple}; use tokio::task::JoinHandle; /// Configuration options for a SessionContext @@ -427,43 +427,43 @@ impl PySessionContext { } /// Construct datafusion dataframe from Python list - #[allow(clippy::wrong_self_convention)] pub fn from_pylist( &mut self, - data: PyObject, + data: Bound<'_, PyList>, name: Option<&str>, - _py: Python, ) -> PyResult { - Python::with_gil(|py| { - // Instantiate pyarrow Table object & convert to Arrow Table - let table_class = py.import_bound("pyarrow")?.getattr("Table")?; - let args = PyTuple::new_bound(py, &[data]); - let table = table_class.call_method1("from_pylist", args)?.into(); - - // Convert Arrow Table to datafusion DataFrame - let df = self.from_arrow_table(table, name, py)?; - Ok(df) - }) + + // Acquire GIL Token + let py = data.py(); + + // Instantiate pyarrow Table object & convert to Arrow Table + let table_class = py.import_bound("pyarrow")?.getattr("Table")?; + let args = PyTuple::new_bound(py, &[data]); + let table = table_class.call_method1("from_pylist", args)?.into(); + + // Convert Arrow Table to datafusion DataFrame + let df = self.from_arrow_table(table, name, py)?; + Ok(df) } /// Construct datafusion dataframe from Python dictionary - #[allow(clippy::wrong_self_convention)] pub fn from_pydict( &mut self, - data: PyObject, + data: Bound<'_, PyDict>, name: Option<&str>, - _py: Python, ) -> PyResult { - Python::with_gil(|py| { - // Instantiate pyarrow Table object & convert to Arrow Table - let table_class = py.import_bound("pyarrow")?.getattr("Table")?; - let args = PyTuple::new_bound(py, &[data]); - let table = table_class.call_method1("from_pydict", args)?.into(); + + // Acquire GIL Token + let py = data.py(); - // Convert Arrow Table to datafusion DataFrame - let df = self.from_arrow_table(table, name, py)?; - Ok(df) - }) + // Instantiate pyarrow Table object & convert to Arrow Table + let table_class = py.import_bound("pyarrow")?.getattr("Table")?; + let args = PyTuple::new_bound(py, &[data]); + let table = table_class.call_method1("from_pydict", args)?.into(); + + // Convert Arrow Table to datafusion DataFrame + let df = self.from_arrow_table(table, name, py)?; + Ok(df) } /// Construct datafusion dataframe from Arrow Table From a326fab1bc273565d1e18792d18f64ab4702ed64 Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Tue, 18 Jun 2024 15:22:58 -0500 Subject: [PATCH 3/4] clean PySessionContext metehods from_polars, from_pandas, from_arrow_table --- src/context.rs | 79 ++++++++++++++++++++++---------------------------- 1 file changed, 35 insertions(+), 44 deletions(-) diff --git a/src/context.rs b/src/context.rs index 718d741d5..50c4a1994 100644 --- a/src/context.rs +++ b/src/context.rs @@ -432,14 +432,13 @@ impl PySessionContext { data: Bound<'_, PyList>, name: Option<&str>, ) -> PyResult { - // Acquire GIL Token let py = data.py(); - + // Instantiate pyarrow Table object & convert to Arrow Table let table_class = py.import_bound("pyarrow")?.getattr("Table")?; let args = PyTuple::new_bound(py, &[data]); - let table = table_class.call_method1("from_pylist", args)?.into(); + let table = table_class.call_method1("from_pylist", args)?; // Convert Arrow Table to datafusion DataFrame let df = self.from_arrow_table(table, name, py)?; @@ -452,14 +451,13 @@ impl PySessionContext { data: Bound<'_, PyDict>, name: Option<&str>, ) -> PyResult { - // Acquire GIL Token let py = data.py(); // Instantiate pyarrow Table object & convert to Arrow Table let table_class = py.import_bound("pyarrow")?.getattr("Table")?; let args = PyTuple::new_bound(py, &[data]); - let table = table_class.call_method1("from_pydict", args)?.into(); + let table = table_class.call_method1("from_pydict", args)?; // Convert Arrow Table to datafusion DataFrame let df = self.from_arrow_table(table, name, py)?; @@ -467,65 +465,58 @@ impl PySessionContext { } /// Construct datafusion dataframe from Arrow Table - #[allow(clippy::wrong_self_convention)] pub fn from_arrow_table( &mut self, - data: PyObject, + data: Bound<'_, PyAny>, name: Option<&str>, - _py: Python, + py: Python, ) -> PyResult { - Python::with_gil(|py| { - // Instantiate pyarrow Table object & convert to batches - let table = data.call_method0(py, "to_batches")?; - - let schema = data.getattr(py, "schema")?; - let schema = schema.extract::>(py)?; - - // Cast PyObject to RecordBatch type - // Because create_dataframe() expects a vector of vectors of record batches - // here we need to wrap the vector of record batches in an additional vector - let batches = table.extract::>>(py)?; - let list_of_batches = PyArrowType::from(vec![batches.0]); - self.create_dataframe(list_of_batches, name, Some(schema), py) - }) + // Instantiate pyarrow Table object & convert to batches + let table = data.call_method0("to_batches")?; + + let schema = data.getattr("schema")?; + let schema = schema.extract::>()?; + + // Cast PyAny to RecordBatch type + // Because create_dataframe() expects a vector of vectors of record batches + // here we need to wrap the vector of record batches in an additional vector + let batches = table.extract::>>()?; + let list_of_batches = PyArrowType::from(vec![batches.0]); + self.create_dataframe(list_of_batches, name, Some(schema), py) } /// Construct datafusion dataframe from pandas #[allow(clippy::wrong_self_convention)] pub fn from_pandas( &mut self, - data: PyObject, + data: Bound<'_, PyAny>, name: Option<&str>, - _py: Python, ) -> PyResult { - Python::with_gil(|py| { - // Instantiate pyarrow Table object & convert to Arrow Table - let table_class = py.import_bound("pyarrow")?.getattr("Table")?; - let args = PyTuple::new_bound(py, &[data]); - let table = table_class.call_method1("from_pandas", args)?.into(); - - // Convert Arrow Table to datafusion DataFrame - let df = self.from_arrow_table(table, name, py)?; - Ok(df) - }) + // Obtain GIL token + let py = data.py(); + + // Instantiate pyarrow Table object & convert to Arrow Table + let table_class = py.import_bound("pyarrow")?.getattr("Table")?; + let args = PyTuple::new_bound(py, &[data]); + let table = table_class.call_method1("from_pandas", args)?; + + // Convert Arrow Table to datafusion DataFrame + let df = self.from_arrow_table(table, name, py)?; + Ok(df) } /// Construct datafusion dataframe from polars - #[allow(clippy::wrong_self_convention)] pub fn from_polars( &mut self, - data: PyObject, + data: Bound<'_, PyAny>, name: Option<&str>, - _py: Python, ) -> PyResult { - Python::with_gil(|py| { - // Convert Polars dataframe to Arrow Table - let table = data.call_method0(py, "to_arrow")?; + // Convert Polars dataframe to Arrow Table + let table = data.call_method0("to_arrow")?; - // Convert Arrow Table to datafusion DataFrame - let df = self.from_arrow_table(table, name, py)?; - Ok(df) - }) + // Convert Arrow Table to datafusion DataFrame + let df = self.from_arrow_table(table, name, data.py())?; + Ok(df) } pub fn register_table(&mut self, name: &str, table: &PyTable) -> PyResult<()> { From f3bee822e2e2a58df7cb92601fa19fe7095b63ea Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Tue, 25 Jun 2024 12:31:38 -0500 Subject: [PATCH 4/4] prefer bound Python token over Python::with_gil When available, using an already bound python token is zero-cost. Python::with_gil carries a runtime check. Ref: https://github.com/PyO3/pyo3/pull/4274 --- src/dataframe.rs | 55 +++++++++++++++++++--------------------------- src/sql/logical.rs | 4 ++-- 2 files changed, 24 insertions(+), 35 deletions(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index 1b91067d5..9e36be2c4 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -423,17 +423,15 @@ impl PyDataFrame { /// Convert to Arrow Table /// Collect the batches and pass to Arrow Table - fn to_arrow_table(&self, py: Python) -> PyResult { + fn to_arrow_table(&self, py: Python<'_>) -> PyResult { let batches = self.collect(py)?.to_object(py); let schema: PyObject = self.schema().into_py(py); - Python::with_gil(|py| { - // Instantiate pyarrow Table object and use its from_batches method - let table_class = py.import_bound("pyarrow")?.getattr("Table")?; - let args = PyTuple::new_bound(py, &[batches, schema]); - let table: PyObject = table_class.call_method1("from_batches", args)?.into(); - Ok(table) - }) + // Instantiate pyarrow Table object and use its from_batches method + let table_class = py.import_bound("pyarrow")?.getattr("Table")?; + let args = PyTuple::new_bound(py, &[batches, schema]); + let table: PyObject = table_class.call_method1("from_batches", args)?.into(); + Ok(table) } fn execute_stream(&self, py: Python) -> PyResult { @@ -464,26 +462,22 @@ impl PyDataFrame { /// Convert to pandas dataframe with pyarrow /// Collect the batches, pass to Arrow Table & then convert to Pandas DataFrame - fn to_pandas(&self, py: Python) -> PyResult { + fn to_pandas(&self, py: Python<'_>) -> PyResult { let table = self.to_arrow_table(py)?; - Python::with_gil(|py| { - // See also: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas - let result = table.call_method0(py, "to_pandas")?; - Ok(result) - }) + // See also: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas + let result = table.call_method0(py, "to_pandas")?; + Ok(result) } /// Convert to Python list using pyarrow /// Each list item represents one row encoded as dictionary - fn to_pylist(&self, py: Python) -> PyResult { + fn to_pylist(&self, py: Python<'_>) -> PyResult { let table = self.to_arrow_table(py)?; - Python::with_gil(|py| { - // See also: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pylist - let result = table.call_method0(py, "to_pylist")?; - Ok(result) - }) + // See also: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pylist + let result = table.call_method0(py, "to_pylist")?; + Ok(result) } /// Convert to Python dictionary using pyarrow @@ -491,24 +485,19 @@ impl PyDataFrame { fn to_pydict(&self, py: Python) -> PyResult { let table = self.to_arrow_table(py)?; - Python::with_gil(|py| { - // See also: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pydict - let result = table.call_method0(py, "to_pydict")?; - Ok(result) - }) + // See also: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pydict + let result = table.call_method0(py, "to_pydict")?; + Ok(result) } /// Convert to polars dataframe with pyarrow /// Collect the batches, pass to Arrow Table & then convert to polars DataFrame - fn to_polars(&self, py: Python) -> PyResult { + fn to_polars(&self, py: Python<'_>) -> PyResult { let table = self.to_arrow_table(py)?; - - Python::with_gil(|py| { - let dataframe = py.import_bound("polars")?.getattr("DataFrame")?; - let args = PyTuple::new_bound(py, &[table]); - let result: PyObject = dataframe.call1(args)?.into(); - Ok(result) - }) + let dataframe = py.import_bound("polars")?.getattr("DataFrame")?; + let args = PyTuple::new_bound(py, &[table]); + let result: PyObject = dataframe.call1(args)?.into(); + Ok(result) } // Executes this DataFrame to get the total number of rows. diff --git a/src/sql/logical.rs b/src/sql/logical.rs index 62515c3dd..b1446b92a 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -63,7 +63,7 @@ impl PyLogicalPlan { impl PyLogicalPlan { /// Return the specific logical operator pub fn to_variant(&self, py: Python) -> PyResult { - Python::with_gil(|_| match self.plan.as_ref() { + match self.plan.as_ref() { LogicalPlan::Aggregate(plan) => PyAggregate::from(plan.clone()).to_variant(py), LogicalPlan::Analyze(plan) => PyAnalyze::from(plan.clone()).to_variant(py), LogicalPlan::CrossJoin(plan) => PyCrossJoin::from(plan.clone()).to_variant(py), @@ -85,7 +85,7 @@ impl PyLogicalPlan { "Cannot convert this plan to a LogicalNode: {:?}", other ))), - }) + } } /// Get the inputs to this plan