Skip to content

Commit

Permalink
Upgrade to DataFusion 13.0.0-rc1 (#325)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Oct 8, 2022
1 parent 80f7030 commit 28b9f41
Show file tree
Hide file tree
Showing 16 changed files with 74 additions and 55 deletions.
4 changes: 2 additions & 2 deletions ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ ballista = { path = "../ballista/rust/client", version = "0.8.0", features = [
"standalone",
] }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ rust-version = "1.59"
ballista-core = { path = "../core", version = "0.8.0" }
ballista-executor = { path = "../executor", version = "0.8.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.8.0", optional = true }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
sqlparser = "0.23"
sqlparser = "0.25"
tempfile = "3"
tokio = "1.0"

Expand Down
8 changes: 4 additions & 4 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.8", default-features = false }

arrow-flight = { version = "23.0.0", features = ["flight-sql-experimental"] }
arrow-flight = { version = "24.0.0", features = ["flight-sql-experimental"] }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
datafusion-objectstore-hdfs = { version = "0.1.0", optional = true }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
futures = "0.3"
hashbrown = "0.12"

Expand All @@ -63,7 +63,7 @@ prost = "0.11"
prost-types = "0.11"
rand = "0.8"
serde = { version = "1", features = ["derive"] }
sqlparser = "0.23"
sqlparser = "0.25"
tokio = "1.0"
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"
Expand Down
8 changes: 4 additions & 4 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ snmalloc = ["snmalloc-rs"]

[dependencies]
anyhow = "1"
arrow = { version = "23.0.0" }
arrow-flight = { version = "23.0.0" }
arrow = { version = "24.0.0" }
arrow-flight = { version = "24.0.0" }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.8.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
dashmap = "5.4.0"
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
futures = "0.3"
hyper = "0.14.4"
log = "0.4"
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ sled = ["sled_package", "tokio-stream"]

[dependencies]
anyhow = "1"
arrow-flight = { version = "23.0.0", features = ["flight-sql-experimental"] }
arrow-flight = { version = "24.0.0", features = ["flight-sql-experimental"] }
async-recursion = "1.0.0"
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.8.0" }
base64 = { version = "0.13", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
dashmap = "5.4.0"
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
etcd-client = { version = "0.9", optional = true }
flatbuffers = { version = "2.1.2" }
futures = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"]

[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.8.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]

[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.8.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.11"
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ default = ["mimalloc"]
[dependencies]
async-trait = "0.1"
ballista = { path = "../ballista/rust/client", version = "0.8.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee", features = ["pyarrow"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1", features = ["pyarrow"] }
futures = "0.3"
mimalloc = { version = "*", optional = true, default-features = false }
pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", "abi3-py37"] }
Expand Down
5 changes: 3 additions & 2 deletions python/src/ballista_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::dataframe::PyDataFrame;
use crate::errors::BallistaError;
use ballista::prelude::{BallistaConfig, BallistaContext};
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::prelude::{AvroReadOptions, CsvReadOptions, ParquetReadOptions};

/// `PyBallistaContext` is able to plan and execute DataFusion plans.
Expand Down Expand Up @@ -83,7 +84,7 @@ impl PyBallistaContext {
&mut self,
name: &str,
path: PathBuf,
schema: Option<Schema>,
schema: Option<PyArrowType<Schema>>,
has_header: bool,
delimiter: &str,
schema_infer_max_records: usize,
Expand All @@ -108,7 +109,7 @@ impl PyBallistaContext {
.delimiter(delimiter[0])
.schema_infer_max_records(schema_infer_max_records)
.file_extension(file_extension);
options.schema = schema.as_ref();
options.schema = schema.as_ref().map(|x| &x.0);

let result = ctx.register_csv(name, path, options);
wait_for_future(py, result).map_err(BallistaError::from)?;
Expand Down
15 changes: 8 additions & 7 deletions python/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use pyo3::exceptions::{PyKeyError, PyValueError};
use pyo3::prelude::*;

use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::datasource::TableProvider;
use datafusion::datasource::MemTable;
Expand Down Expand Up @@ -100,9 +101,9 @@ impl PySessionContext {

fn create_dataframe(
&mut self,
partitions: Vec<Vec<RecordBatch>>,
partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
) -> PyResult<PyDataFrame> {
let table = MemTable::try_new(partitions[0][0].schema(), partitions)
let table = MemTable::try_new(partitions.0[0][0].schema(), partitions.0)
.map_err(DataFusionError::from)?;

// generate a random (unique) name for this table
Expand Down Expand Up @@ -138,10 +139,10 @@ impl PySessionContext {
fn register_record_batches(
&mut self,
name: &str,
partitions: Vec<Vec<RecordBatch>>,
partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
) -> PyResult<()> {
let schema = partitions[0][0].schema();
let table = MemTable::try_new(schema, partitions)?;
let schema = partitions.0[0][0].schema();
let table = MemTable::try_new(schema, partitions.0)?;
self.ctx
.register_table(name, Arc::new(table))
.map_err(DataFusionError::from)?;
Expand Down Expand Up @@ -184,7 +185,7 @@ impl PySessionContext {
&mut self,
name: &str,
path: PathBuf,
schema: Option<Schema>,
schema: Option<PyArrowType<Schema>>,
has_header: bool,
delimiter: &str,
schema_infer_max_records: usize,
Expand All @@ -206,7 +207,7 @@ impl PySessionContext {
.delimiter(delimiter[0])
.schema_infer_max_records(schema_infer_max_records)
.file_extension(file_extension);
options.schema = schema.as_ref();
options.schema = schema.as_ref().map(|x| &x.0);

let result = self.ctx.register_csv(name, path, options);
wait_for_future(py, result).map_err(DataFusionError::from)?;
Expand Down
12 changes: 7 additions & 5 deletions python/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use crate::utils::wait_for_future;
use crate::{errors::DataFusionError, expression::PyExpr};
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::PyArrowConvert;
use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowException, PyArrowType};
use datafusion::arrow::util::pretty;
use datafusion::dataframe::DataFrame;
use datafusion::logical_plan::JoinType;
Expand Down Expand Up @@ -65,8 +65,8 @@ impl PyDataFrame {
}

/// Returns the schema from the logical plan
fn schema(&self) -> Schema {
self.df.schema().into()
fn schema(&self) -> PyArrowType<Schema> {
PyArrowType(self.df.schema().into())
}

#[args(args = "*")]
Expand Down Expand Up @@ -126,7 +126,8 @@ impl PyDataFrame {
fn show(&self, py: Python, num: usize) -> PyResult<()> {
let df = self.df.limit(0, Some(num))?;
let batches = wait_for_future(py, df.collect())?;
Ok(pretty::print_batches(&batches)?)
pretty::print_batches(&batches)
.map_err(|err| PyArrowException::new_err(err.to_string()))
}

fn join(
Expand Down Expand Up @@ -162,6 +163,7 @@ impl PyDataFrame {
fn explain(&self, py: Python, verbose: bool, analyze: bool) -> PyResult<()> {
let df = self.df.explain(verbose, analyze)?;
let batches = wait_for_future(py, df.collect())?;
Ok(pretty::print_batches(&batches)?)
pretty::print_batches(&batches)
.map_err(|err| PyArrowException::new_err(err.to_string()))
}
}
10 changes: 9 additions & 1 deletion python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::sync::Arc;
use async_trait::async_trait;

use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::datasource::datasource::TableProviderFilterPushDown;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::{DataFusionError, Result as DFResult};
Expand Down Expand Up @@ -74,7 +75,14 @@ impl TableProvider for Dataset {
Python::with_gil(|py| {
let dataset = self.dataset.as_ref(py);
// This can panic but since we checked that self.dataset is a pyarrow.dataset.Dataset it should never
Arc::new(dataset.getattr("schema").unwrap().extract().unwrap())
Arc::new(
dataset
.getattr("schema")
.unwrap()
.extract::<PyArrowType<_>>()
.unwrap()
.0,
)
})
}

Expand Down
12 changes: 9 additions & 3 deletions python/src/dataset_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use futures::stream;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError as InnerDataFusionError, Result as DFResult};
use datafusion::execution::context::TaskContext;
Expand All @@ -54,7 +55,7 @@ impl Iterator for PyArrowBatchesAdapter {
Some(
batches
.next()?
.and_then(|batch| batch.extract())
.and_then(|batch| Ok(batch.extract::<PyArrowType<_>>()?.0))
.map_err(|err| ArrowError::ExternalError(Box::new(err))),
)
})
Expand Down Expand Up @@ -109,7 +110,12 @@ impl DatasetExec {

let scanner = dataset.call_method("scanner", (), Some(kwargs))?;

let schema = Arc::new(scanner.getattr("projected_schema")?.extract()?);
let schema = Arc::new(
scanner
.getattr("projected_schema")?
.extract::<PyArrowType<_>>()?
.0,
);

let builtins = Python::import(py, "builtins")?;
let pylist = builtins.getattr("list")?;
Expand Down Expand Up @@ -211,7 +217,7 @@ impl ExecutionPlan for DatasetExec {
let schema: SchemaRef = Arc::new(
scanner
.getattr("projected_schema")
.and_then(|schema| schema.extract())
.and_then(|schema| Ok(schema.extract::<PyArrowType<_>>()?.0))
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?,
);
let record_batches: &PyIterator = scanner
Expand Down
6 changes: 3 additions & 3 deletions python/src/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use pyo3::{basic::CompareOp, prelude::*};
use std::convert::{From, Into};

use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::logical_plan::{col, lit, Expr};

use datafusion::scalar::ScalarValue;

/// An PyExpr that can be used on a DataFrame
Expand Down Expand Up @@ -125,12 +125,12 @@ impl PyExpr {
self.expr.clone().is_null().into()
}

pub fn cast(&self, to: DataType) -> PyExpr {
pub fn cast(&self, to: PyArrowType<DataType>) -> PyExpr {
// self.expr.cast_to() requires DFSchema to validate that the cast
// is supported, omit that for now
let expr = Expr::Cast {
expr: Box::new(self.expr.clone()),
data_type: to,
data_type: to.0,
};
expr.into()
}
Expand Down
15 changes: 8 additions & 7 deletions python/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use pyo3::{prelude::*, types::PyTuple};

use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::pyarrow::PyArrowConvert;
use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowType};
use datafusion::common::ScalarValue;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{
Expand Down Expand Up @@ -82,6 +82,7 @@ impl Accumulator for RustAccumulator {

// 1. cast states to Pyarrow array
let state = state
.data()
.to_pyarrow(py)
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?;

Expand Down Expand Up @@ -120,18 +121,18 @@ impl PyAggregateUDF {
fn new(
name: &str,
accumulator: PyObject,
input_type: DataType,
return_type: DataType,
state_type: Vec<DataType>,
input_type: PyArrowType<DataType>,
return_type: PyArrowType<DataType>,
state_type: PyArrowType<Vec<DataType>>,
volatility: &str,
) -> PyResult<Self> {
let function = logical_expr::create_udaf(
name,
input_type,
Arc::new(return_type),
input_type.0,
Arc::new(return_type.0),
parse_volatility(volatility)?,
to_rust_accumulator(accumulator),
Arc::new(state_type),
Arc::new(state_type.0),
);
Ok(Self { function })
}
Expand Down
Loading

0 comments on commit 28b9f41

Please sign in to comment.