From 28b9f41b600cd3888e2522adb592676b17b15e1a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 8 Oct 2022 06:02:32 -0600 Subject: [PATCH] Upgrade to DataFusion 13.0.0-rc1 (#325) --- ballista-cli/Cargo.toml | 4 ++-- ballista/rust/client/Cargo.toml | 6 +++--- ballista/rust/core/Cargo.toml | 8 ++++---- ballista/rust/executor/Cargo.toml | 8 ++++---- ballista/rust/scheduler/Cargo.toml | 6 +++--- benchmarks/Cargo.toml | 4 ++-- examples/Cargo.toml | 2 +- python/Cargo.toml | 2 +- python/src/ballista_context.rs | 5 +++-- python/src/context.rs | 15 ++++++++------- python/src/dataframe.rs | 12 +++++++----- python/src/dataset.rs | 10 +++++++++- python/src/dataset_exec.rs | 12 +++++++++--- python/src/expression.rs | 6 +++--- python/src/udaf.rs | 15 ++++++++------- python/src/udf.rs | 14 +++++++------- 16 files changed, 74 insertions(+), 55 deletions(-) diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml index 9f28ee9a6..ae6dea443 100644 --- a/ballista-cli/Cargo.toml +++ b/ballista-cli/Cargo.toml @@ -33,8 +33,8 @@ ballista = { path = "../ballista/rust/client", version = "0.8.0", features = [ "standalone", ] } clap = { version = "3", features = ["derive", "cargo"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" } -datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" } +datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" } dirs = "4.0.0" env_logger = "0.9" mimalloc = { version = "0.1", default-features = false } diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml index a7ab2164a..f08a4b7d0 100644 --- a/ballista/rust/client/Cargo.toml +++ b/ballista/rust/client/Cargo.toml @@ -31,12 +31,12 @@ rust-version = "1.59" ballista-core = { path = "../core", version = "0.8.0" } ballista-executor = { path = "../executor", version = "0.8.0", optional = true } ballista-scheduler = { path = "../scheduler", version = "0.8.0", optional = true } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" } futures = "0.3" log = "0.4" parking_lot = "0.12" -sqlparser = "0.23" +sqlparser = "0.25" tempfile = "3" tokio = "1.0" diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index 6699af013..d834ce108 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -41,13 +41,13 @@ simd = ["datafusion/simd"] [dependencies] ahash = { version = "0.8", default-features = false } -arrow-flight = { version = "23.0.0", features = ["flight-sql-experimental"] } +arrow-flight = { version = "24.0.0", features = ["flight-sql-experimental"] } async-trait = "0.1.41" chrono = { version = "0.4", default-features = false } clap = { version = "3", features = ["derive", "cargo"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" } datafusion-objectstore-hdfs = { version = "0.1.0", optional = true } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" } futures = "0.3" hashbrown = "0.12" @@ -63,7 +63,7 @@ prost = "0.11" prost-types = "0.11" rand = "0.8" serde = { version = "1", features = ["derive"] } -sqlparser = "0.23" +sqlparser = "0.25" tokio = "1.0" tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.8" diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index ca3532ac7..9eda5278b 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -34,15 +34,15 @@ snmalloc = ["snmalloc-rs"] [dependencies] anyhow = "1" -arrow = { version = "23.0.0" } -arrow-flight = { version = "23.0.0" } +arrow = { version = "24.0.0" } +arrow-flight = { version = "24.0.0" } async-trait = "0.1.41" ballista-core = { path = "../core", version = "0.8.0" } chrono = { version = "0.4", default-features = false } configure_me = "0.4.0" dashmap = "5.4.0" -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" } futures = "0.3" hyper = "0.14.4" log = "0.4" diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml index 90e3f020d..ccf5bd6f0 100644 --- a/ballista/rust/scheduler/Cargo.toml +++ b/ballista/rust/scheduler/Cargo.toml @@ -38,7 +38,7 @@ sled = ["sled_package", "tokio-stream"] [dependencies] anyhow = "1" -arrow-flight = { version = "23.0.0", features = ["flight-sql-experimental"] } +arrow-flight = { version = "24.0.0", features = ["flight-sql-experimental"] } async-recursion = "1.0.0" async-trait = "0.1.41" ballista-core = { path = "../core", version = "0.8.0" } @@ -46,8 +46,8 @@ base64 = { version = "0.13", default-features = false } clap = { version = "3", features = ["derive", "cargo"] } configure_me = "0.4.0" dashmap = "5.4.0" -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" } etcd-client = { version = "0.9", optional = true } flatbuffers = { version = "2.1.2" } futures = "0.3" diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 167dbee53..553e94eed 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"] [dependencies] ballista = { path = "../ballista/rust/client", version = "0.8.0" } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" } env_logger = "0.9" futures = "0.3" mimalloc = { version = "0.1", optional = true, default-features = false } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 7170365ec..6a1967a33 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -35,7 +35,7 @@ required-features = ["ballista/standalone"] [dependencies] ballista = { path = "../ballista/rust/client", version = "0.8.0" } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" } futures = "0.3" num_cpus = "1.13.0" prost = "0.11" diff --git a/python/Cargo.toml b/python/Cargo.toml index c5d2e8e6f..829be3be1 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -36,7 +36,7 @@ default = ["mimalloc"] [dependencies] async-trait = "0.1" ballista = { path = "../ballista/rust/client", version = "0.8.0" } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee", features = ["pyarrow"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1", features = ["pyarrow"] } futures = "0.3" mimalloc = { version = "*", optional = true, default-features = false } pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", "abi3-py37"] } diff --git a/python/src/ballista_context.rs b/python/src/ballista_context.rs index 2d92e9708..d059ba727 100644 --- a/python/src/ballista_context.rs +++ b/python/src/ballista_context.rs @@ -25,6 +25,7 @@ use crate::dataframe::PyDataFrame; use crate::errors::BallistaError; use ballista::prelude::{BallistaConfig, BallistaContext}; use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::pyarrow::PyArrowType; use datafusion::prelude::{AvroReadOptions, CsvReadOptions, ParquetReadOptions}; /// `PyBallistaContext` is able to plan and execute DataFusion plans. @@ -83,7 +84,7 @@ impl PyBallistaContext { &mut self, name: &str, path: PathBuf, - schema: Option, + schema: Option>, has_header: bool, delimiter: &str, schema_infer_max_records: usize, @@ -108,7 +109,7 @@ impl PyBallistaContext { .delimiter(delimiter[0]) .schema_infer_max_records(schema_infer_max_records) .file_extension(file_extension); - options.schema = schema.as_ref(); + options.schema = schema.as_ref().map(|x| &x.0); let result = ctx.register_csv(name, path, options); wait_for_future(py, result).map_err(BallistaError::from)?; diff --git a/python/src/context.rs b/python/src/context.rs index 8dfa9befe..38a3cb9b9 100644 --- a/python/src/context.rs +++ b/python/src/context.rs @@ -24,6 +24,7 @@ use pyo3::exceptions::{PyKeyError, PyValueError}; use pyo3::prelude::*; use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::pyarrow::PyArrowType; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::datasource::TableProvider; use datafusion::datasource::MemTable; @@ -100,9 +101,9 @@ impl PySessionContext { fn create_dataframe( &mut self, - partitions: Vec>, + partitions: PyArrowType>>, ) -> PyResult { - let table = MemTable::try_new(partitions[0][0].schema(), partitions) + let table = MemTable::try_new(partitions.0[0][0].schema(), partitions.0) .map_err(DataFusionError::from)?; // generate a random (unique) name for this table @@ -138,10 +139,10 @@ impl PySessionContext { fn register_record_batches( &mut self, name: &str, - partitions: Vec>, + partitions: PyArrowType>>, ) -> PyResult<()> { - let schema = partitions[0][0].schema(); - let table = MemTable::try_new(schema, partitions)?; + let schema = partitions.0[0][0].schema(); + let table = MemTable::try_new(schema, partitions.0)?; self.ctx .register_table(name, Arc::new(table)) .map_err(DataFusionError::from)?; @@ -184,7 +185,7 @@ impl PySessionContext { &mut self, name: &str, path: PathBuf, - schema: Option, + schema: Option>, has_header: bool, delimiter: &str, schema_infer_max_records: usize, @@ -206,7 +207,7 @@ impl PySessionContext { .delimiter(delimiter[0]) .schema_infer_max_records(schema_infer_max_records) .file_extension(file_extension); - options.schema = schema.as_ref(); + options.schema = schema.as_ref().map(|x| &x.0); let result = self.ctx.register_csv(name, path, options); wait_for_future(py, result).map_err(DataFusionError::from)?; diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs index af1bcf0ce..bc1a9a680 100644 --- a/python/src/dataframe.rs +++ b/python/src/dataframe.rs @@ -18,7 +18,7 @@ use crate::utils::wait_for_future; use crate::{errors::DataFusionError, expression::PyExpr}; use datafusion::arrow::datatypes::Schema; -use datafusion::arrow::pyarrow::PyArrowConvert; +use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowException, PyArrowType}; use datafusion::arrow::util::pretty; use datafusion::dataframe::DataFrame; use datafusion::logical_plan::JoinType; @@ -65,8 +65,8 @@ impl PyDataFrame { } /// Returns the schema from the logical plan - fn schema(&self) -> Schema { - self.df.schema().into() + fn schema(&self) -> PyArrowType { + PyArrowType(self.df.schema().into()) } #[args(args = "*")] @@ -126,7 +126,8 @@ impl PyDataFrame { fn show(&self, py: Python, num: usize) -> PyResult<()> { let df = self.df.limit(0, Some(num))?; let batches = wait_for_future(py, df.collect())?; - Ok(pretty::print_batches(&batches)?) + pretty::print_batches(&batches) + .map_err(|err| PyArrowException::new_err(err.to_string())) } fn join( @@ -162,6 +163,7 @@ impl PyDataFrame { fn explain(&self, py: Python, verbose: bool, analyze: bool) -> PyResult<()> { let df = self.df.explain(verbose, analyze)?; let batches = wait_for_future(py, df.collect())?; - Ok(pretty::print_batches(&batches)?) + pretty::print_batches(&batches) + .map_err(|err| PyArrowException::new_err(err.to_string())) } } diff --git a/python/src/dataset.rs b/python/src/dataset.rs index a678b9040..d34d974fc 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -27,6 +27,7 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::pyarrow::PyArrowType; use datafusion::datasource::datasource::TableProviderFilterPushDown; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::{DataFusionError, Result as DFResult}; @@ -74,7 +75,14 @@ impl TableProvider for Dataset { Python::with_gil(|py| { let dataset = self.dataset.as_ref(py); // This can panic but since we checked that self.dataset is a pyarrow.dataset.Dataset it should never - Arc::new(dataset.getattr("schema").unwrap().extract().unwrap()) + Arc::new( + dataset + .getattr("schema") + .unwrap() + .extract::>() + .unwrap() + .0, + ) }) } diff --git a/python/src/dataset_exec.rs b/python/src/dataset_exec.rs index f57a3a76e..987b7ad1e 100644 --- a/python/src/dataset_exec.rs +++ b/python/src/dataset_exec.rs @@ -28,6 +28,7 @@ use futures::stream; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::error::ArrowError; use datafusion::arrow::error::Result as ArrowResult; +use datafusion::arrow::pyarrow::PyArrowType; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError as InnerDataFusionError, Result as DFResult}; use datafusion::execution::context::TaskContext; @@ -54,7 +55,7 @@ impl Iterator for PyArrowBatchesAdapter { Some( batches .next()? - .and_then(|batch| batch.extract()) + .and_then(|batch| Ok(batch.extract::>()?.0)) .map_err(|err| ArrowError::ExternalError(Box::new(err))), ) }) @@ -109,7 +110,12 @@ impl DatasetExec { let scanner = dataset.call_method("scanner", (), Some(kwargs))?; - let schema = Arc::new(scanner.getattr("projected_schema")?.extract()?); + let schema = Arc::new( + scanner + .getattr("projected_schema")? + .extract::>()? + .0, + ); let builtins = Python::import(py, "builtins")?; let pylist = builtins.getattr("list")?; @@ -211,7 +217,7 @@ impl ExecutionPlan for DatasetExec { let schema: SchemaRef = Arc::new( scanner .getattr("projected_schema") - .and_then(|schema| schema.extract()) + .and_then(|schema| Ok(schema.extract::>()?.0)) .map_err(|err| InnerDataFusionError::External(Box::new(err)))?, ); let record_batches: &PyIterator = scanner diff --git a/python/src/expression.rs b/python/src/expression.rs index b3275ccf0..aa6e540cc 100644 --- a/python/src/expression.rs +++ b/python/src/expression.rs @@ -19,8 +19,8 @@ use pyo3::{basic::CompareOp, prelude::*}; use std::convert::{From, Into}; use datafusion::arrow::datatypes::DataType; +use datafusion::arrow::pyarrow::PyArrowType; use datafusion::logical_plan::{col, lit, Expr}; - use datafusion::scalar::ScalarValue; /// An PyExpr that can be used on a DataFrame @@ -125,12 +125,12 @@ impl PyExpr { self.expr.clone().is_null().into() } - pub fn cast(&self, to: DataType) -> PyExpr { + pub fn cast(&self, to: PyArrowType) -> PyExpr { // self.expr.cast_to() requires DFSchema to validate that the cast // is supported, omit that for now let expr = Expr::Cast { expr: Box::new(self.expr.clone()), - data_type: to, + data_type: to.0, }; expr.into() } diff --git a/python/src/udaf.rs b/python/src/udaf.rs index 3b93048b8..f29734764 100644 --- a/python/src/udaf.rs +++ b/python/src/udaf.rs @@ -21,7 +21,7 @@ use pyo3::{prelude::*, types::PyTuple}; use datafusion::arrow::array::ArrayRef; use datafusion::arrow::datatypes::DataType; -use datafusion::arrow::pyarrow::PyArrowConvert; +use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowType}; use datafusion::common::ScalarValue; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::{ @@ -82,6 +82,7 @@ impl Accumulator for RustAccumulator { // 1. cast states to Pyarrow array let state = state + .data() .to_pyarrow(py) .map_err(|e| DataFusionError::Execution(format!("{}", e)))?; @@ -120,18 +121,18 @@ impl PyAggregateUDF { fn new( name: &str, accumulator: PyObject, - input_type: DataType, - return_type: DataType, - state_type: Vec, + input_type: PyArrowType, + return_type: PyArrowType, + state_type: PyArrowType>, volatility: &str, ) -> PyResult { let function = logical_expr::create_udaf( name, - input_type, - Arc::new(return_type), + input_type.0, + Arc::new(return_type.0), parse_volatility(volatility)?, to_rust_accumulator(accumulator), - Arc::new(state_type), + Arc::new(state_type.0), ); Ok(Self { function }) } diff --git a/python/src/udf.rs b/python/src/udf.rs index 8cf8c9cfd..4f188f7e0 100644 --- a/python/src/udf.rs +++ b/python/src/udf.rs @@ -19,9 +19,9 @@ use std::sync::Arc; use pyo3::{prelude::*, types::PyTuple}; -use datafusion::arrow::array::ArrayRef; +use datafusion::arrow::array::{make_array, ArrayData, ArrayRef}; use datafusion::arrow::datatypes::DataType; -use datafusion::arrow::pyarrow::PyArrowConvert; +use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowType}; use datafusion::error::DataFusionError; use datafusion::logical_expr::{self, function::ScalarFunctionImplementation}; use datafusion::physical_plan::functions::make_scalar_function; @@ -52,7 +52,7 @@ fn to_rust_function(func: PyObject) -> ScalarFunctionImplementation { }?; // 3. cast to arrow::array::Array - let array = ArrayRef::from_pyarrow(value).unwrap(); + let array = make_array(ArrayData::from_pyarrow(value).unwrap()); Ok(array) }) }, @@ -72,14 +72,14 @@ impl PyScalarUDF { fn new( name: &str, func: PyObject, - input_types: Vec, - return_type: DataType, + input_types: PyArrowType>, + return_type: PyArrowType, volatility: &str, ) -> PyResult { let function = logical_expr::create_udf( name, - input_types, - Arc::new(return_type), + input_types.0, + Arc::new(return_type.0), parse_volatility(volatility)?, to_rust_function(func), );