From a534b76e129fcc50420f957bffa6e3b32c4882ae Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 15 Feb 2023 18:41:01 -0700 Subject: [PATCH 1/4] Add PyExectionPlan.execute method --- datafusion/tests/test_dataframe.py | 4 +++ src/lib.rs | 1 + src/physical_plan.rs | 50 +++++++++++++++++++++++++++++- src/record_batch.rs | 38 +++++++++++++++++++++++ 4 files changed, 92 insertions(+), 1 deletion(-) create mode 100644 src/record_batch.rs diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py index dcab86a49..b37b425d4 100644 --- a/datafusion/tests/test_dataframe.py +++ b/datafusion/tests/test_dataframe.py @@ -388,6 +388,10 @@ def test_execution_plan(aggregate_df): assert "RepartitionExec:" in indent assert "CsvExec:" in indent + stream = plan.execute(0) + batch = stream.next() + print(batch) + def test_repartition(df): df.repartition(2) diff --git a/src/lib.rs b/src/lib.rs index f6d404efd..d9898db61 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,7 @@ mod expr; mod functions; pub mod physical_plan; mod pyarrow_filter_expression; +mod record_batch; pub mod sql; pub mod store; pub mod substrait; diff --git a/src/physical_plan.rs b/src/physical_plan.rs index 340d527fa..93b5af9ff 100644 --- a/src/physical_plan.rs +++ b/src/physical_plan.rs @@ -15,10 +15,20 @@ // specific language governing permissions and limitations // under the License. -use datafusion::physical_plan::{displayable, ExecutionPlan}; +use datafusion::execution::context::TaskContext; +use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::physical_plan::{displayable, ExecutionPlan, SendableRecordBatchStream}; +use datafusion_common::Result; +use futures::StreamExt; +use std::collections::HashMap; use std::sync::Arc; +use crate::errors::py_datafusion_err; +use crate::record_batch::PyRecordBatch; +use crate::utils::wait_for_future; use pyo3::prelude::*; +use tokio::runtime::Runtime; +use tokio::task::JoinHandle; #[pyclass(name = "ExecutionPlan", module = "datafusion", subclass)] #[derive(Debug, Clone)] @@ -53,6 +63,24 @@ impl PyExecutionPlan { let d = displayable(self.plan.as_ref()); format!("{}", d.indent()) } + + pub fn execute(&self, part: usize, py: Python) -> PyResult { + let ctx = Arc::new(TaskContext::new( + "task_id".to_string(), + "session_id".to_string(), + HashMap::new(), + HashMap::new(), + HashMap::new(), + Arc::new(RuntimeEnv::default()), + )); + // create a Tokio runtime to run the async code + let rt = Runtime::new().unwrap(); + let plan = self.plan.clone(); + let fut: JoinHandle> = + rt.spawn(async move { plan.execute(part, ctx) }); + let stream = wait_for_future(py, fut).map_err(|e| py_datafusion_err(e))?; + Ok(PyRecordBatchStream::new(stream?)) + } } impl From for Arc { @@ -66,3 +94,23 @@ impl From> for PyExecutionPlan { PyExecutionPlan { plan: plan.clone() } } } + +#[pyclass(name = "RecordBatchStream", module = "datafusion", subclass)] +pub struct PyRecordBatchStream { + stream: SendableRecordBatchStream, +} + +impl PyRecordBatchStream { + fn new(stream: SendableRecordBatchStream) -> Self { + Self { stream } + } +} + +#[pymethods] +impl PyRecordBatchStream { + fn next(&mut self, py: Python) -> PyResult { + let result = self.stream.next(); + let batch = wait_for_future(py, result).unwrap()?; //.map_err(DataFusionError::from)?; + Ok(batch.into()) + } +} diff --git a/src/record_batch.rs b/src/record_batch.rs new file mode 100644 index 000000000..997d050dc --- /dev/null +++ b/src/record_batch.rs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::arrow::pyarrow::PyArrowConvert; +use datafusion::arrow::record_batch::RecordBatch; +use pyo3::{pyclass, pymethods, PyObject, PyResult, Python}; + +#[pyclass(name = "RecordBatch", module = "datafusion", subclass)] +pub struct PyRecordBatch { + batch: RecordBatch, +} + +#[pymethods] +impl PyRecordBatch { + fn to_pyarrow(&self, py: Python) -> PyResult { + self.batch.to_pyarrow(py) + } +} + +impl From for PyRecordBatch { + fn from(batch: RecordBatch) -> Self { + Self { batch } + } +} From e98b548bec610958db83ca61e72bff16c6b555ec Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 15 Feb 2023 19:27:05 -0700 Subject: [PATCH 2/4] Refactor --- datafusion/tests/test_dataframe.py | 3 +- src/context.rs | 35 +++++++++++++++++++-- src/physical_plan.rs | 50 +----------------------------- src/record_batch.rs | 26 ++++++++++++++++ 4 files changed, 62 insertions(+), 52 deletions(-) diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py index b37b425d4..165fef278 100644 --- a/datafusion/tests/test_dataframe.py +++ b/datafusion/tests/test_dataframe.py @@ -388,7 +388,8 @@ def test_execution_plan(aggregate_df): assert "RepartitionExec:" in indent assert "CsvExec:" in indent - stream = plan.execute(0) + ctx = SessionContext() + stream = ctx.execute(plan, 0) batch = stream.next() print(batch) diff --git a/src/context.rs b/src/context.rs index 8dcd1d6ff..cb7c622fd 100644 --- a/src/context.rs +++ b/src/context.rs @@ -28,7 +28,9 @@ use pyo3::prelude::*; use crate::catalog::{PyCatalog, PyTable}; use crate::dataframe::PyDataFrame; use crate::dataset::Dataset; -use crate::errors::DataFusionError; +use crate::errors::{py_datafusion_err, DataFusionError}; +use crate::physical_plan::PyExecutionPlan; +use crate::record_batch::PyRecordBatchStream; use crate::sql::logical::PyLogicalPlan; use crate::store::StorageContexts; use crate::udaf::PyAggregateUDF; @@ -39,14 +41,17 @@ use datafusion::arrow::pyarrow::PyArrowType; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::datasource::TableProvider; use datafusion::datasource::MemTable; -use datafusion::execution::context::{SessionConfig, SessionContext}; +use datafusion::execution::context::{SessionConfig, SessionContext, TaskContext}; use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, UnboundedMemoryPool}; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::prelude::{ AvroReadOptions, CsvReadOptions, DataFrame, NdJsonReadOptions, ParquetReadOptions, }; use datafusion_common::ScalarValue; +use tokio::runtime::Runtime; +use tokio::task::JoinHandle; #[pyclass(name = "SessionConfig", module = "datafusion", subclass, unsendable)] #[derive(Clone, Default)] @@ -579,6 +584,32 @@ impl PySessionContext { Err(err) => Ok(format!("Error: {:?}", err.to_string())), } } + + /// Execute a partition of an execution plan and return a stream of record batches + pub fn execute( + &self, + plan: PyExecutionPlan, + part: usize, + py: Python, + ) -> PyResult { + let ctx = Arc::new(TaskContext::new( + "task_id".to_string(), + "session_id".to_string(), + HashMap::new(), + HashMap::new(), + HashMap::new(), + Arc::new(RuntimeEnv::default()), + )); + // create a Tokio runtime to run the async code + //TODO can we use an existing runtime from the SessionContext ? If not, we at least + // need to configure the runtime here + let rt = Runtime::new().unwrap(); + let plan = plan.plan.clone(); + let fut: JoinHandle> = + rt.spawn(async move { plan.execute(part, ctx) }); + let stream = wait_for_future(py, fut).map_err(|e| py_datafusion_err(e))?; + Ok(PyRecordBatchStream::new(stream?)) + } } impl PySessionContext { diff --git a/src/physical_plan.rs b/src/physical_plan.rs index 93b5af9ff..340d527fa 100644 --- a/src/physical_plan.rs +++ b/src/physical_plan.rs @@ -15,20 +15,10 @@ // specific language governing permissions and limitations // under the License. -use datafusion::execution::context::TaskContext; -use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::physical_plan::{displayable, ExecutionPlan, SendableRecordBatchStream}; -use datafusion_common::Result; -use futures::StreamExt; -use std::collections::HashMap; +use datafusion::physical_plan::{displayable, ExecutionPlan}; use std::sync::Arc; -use crate::errors::py_datafusion_err; -use crate::record_batch::PyRecordBatch; -use crate::utils::wait_for_future; use pyo3::prelude::*; -use tokio::runtime::Runtime; -use tokio::task::JoinHandle; #[pyclass(name = "ExecutionPlan", module = "datafusion", subclass)] #[derive(Debug, Clone)] @@ -63,24 +53,6 @@ impl PyExecutionPlan { let d = displayable(self.plan.as_ref()); format!("{}", d.indent()) } - - pub fn execute(&self, part: usize, py: Python) -> PyResult { - let ctx = Arc::new(TaskContext::new( - "task_id".to_string(), - "session_id".to_string(), - HashMap::new(), - HashMap::new(), - HashMap::new(), - Arc::new(RuntimeEnv::default()), - )); - // create a Tokio runtime to run the async code - let rt = Runtime::new().unwrap(); - let plan = self.plan.clone(); - let fut: JoinHandle> = - rt.spawn(async move { plan.execute(part, ctx) }); - let stream = wait_for_future(py, fut).map_err(|e| py_datafusion_err(e))?; - Ok(PyRecordBatchStream::new(stream?)) - } } impl From for Arc { @@ -94,23 +66,3 @@ impl From> for PyExecutionPlan { PyExecutionPlan { plan: plan.clone() } } } - -#[pyclass(name = "RecordBatchStream", module = "datafusion", subclass)] -pub struct PyRecordBatchStream { - stream: SendableRecordBatchStream, -} - -impl PyRecordBatchStream { - fn new(stream: SendableRecordBatchStream) -> Self { - Self { stream } - } -} - -#[pymethods] -impl PyRecordBatchStream { - fn next(&mut self, py: Python) -> PyResult { - let result = self.stream.next(); - let batch = wait_for_future(py, result).unwrap()?; //.map_err(DataFusionError::from)?; - Ok(batch.into()) - } -} diff --git a/src/record_batch.rs b/src/record_batch.rs index 997d050dc..15b70e8ce 100644 --- a/src/record_batch.rs +++ b/src/record_batch.rs @@ -15,8 +15,11 @@ // specific language governing permissions and limitations // under the License. +use crate::utils::wait_for_future; use datafusion::arrow::pyarrow::PyArrowConvert; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::physical_plan::SendableRecordBatchStream; +use futures::StreamExt; use pyo3::{pyclass, pymethods, PyObject, PyResult, Python}; #[pyclass(name = "RecordBatch", module = "datafusion", subclass)] @@ -36,3 +39,26 @@ impl From for PyRecordBatch { Self { batch } } } + +#[pyclass(name = "RecordBatchStream", module = "datafusion", subclass)] +pub struct PyRecordBatchStream { + stream: SendableRecordBatchStream, +} + +impl PyRecordBatchStream { + pub fn new(stream: SendableRecordBatchStream) -> Self { + Self { stream } + } +} + +#[pymethods] +impl PyRecordBatchStream { + fn next(&mut self, py: Python) -> PyResult> { + let result = self.stream.next(); + match wait_for_future(py, result) { + None => Ok(None), + Some(Ok(b)) => Ok(Some(b.into())), + Some(Err(e)) => Err(e.into()), + } + } +} From 375955a43211ebaeeafda9ce723ba5a2a838c7c2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 15 Feb 2023 19:28:50 -0700 Subject: [PATCH 3/4] improve test --- datafusion/tests/test_dataframe.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py index 165fef278..18946888f 100644 --- a/datafusion/tests/test_dataframe.py +++ b/datafusion/tests/test_dataframe.py @@ -390,8 +390,12 @@ def test_execution_plan(aggregate_df): ctx = SessionContext() stream = ctx.execute(plan, 0) + # get the one and only batch batch = stream.next() - print(batch) + assert batch is not None + # there should be no more batches + batch = stream.next() + assert batch is None def test_repartition(df): From 713cfccea7e7b136facea9f02d80332d5b3b3e6f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 15 Feb 2023 19:36:11 -0700 Subject: [PATCH 4/4] remove comment --- src/context.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/context.rs b/src/context.rs index cb7c622fd..1acf5f289 100644 --- a/src/context.rs +++ b/src/context.rs @@ -601,8 +601,6 @@ impl PySessionContext { Arc::new(RuntimeEnv::default()), )); // create a Tokio runtime to run the async code - //TODO can we use an existing runtime from the SessionContext ? If not, we at least - // need to configure the runtime here let rt = Runtime::new().unwrap(); let plan = plan.plan.clone(); let fut: JoinHandle> =