From 934b32fd7b485bdd48d74a7375ccf7ba03c188b5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 Dec 2023 08:34:00 -0700 Subject: [PATCH] Upgrade to DataFusion 34.0.0-rc1 (#927) * Use latest DF * update deps * Set max encoding size * tests compile now * fix some todo comments * address another todo item * specify message sizes in gRPC clients as well * fix typo and use 34 rc1 --- Cargo.toml | 16 ++++----- ballista/client/src/context.rs | 2 -- .../core/src/cache_layer/object_store/file.rs | 35 +++++++++++++++---- .../core/src/cache_layer/object_store/mod.rs | 24 ++++++++++--- ballista/core/src/cache_layer/policy/file.rs | 1 + ballista/core/src/utils.rs | 2 +- ballista/scheduler/src/cluster/mod.rs | 1 + .../scheduler/src/scheduler_server/grpc.rs | 6 ---- benchmarks/src/bin/tpch.rs | 3 +- 9 files changed, 60 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fcdebab65..57752844d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,16 +29,16 @@ members = [ resolver = "2" [workspace.dependencies] -arrow = { version = "48.0.0", features=["ipc_compression"] } -arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] } -arrow-schema = { version = "48.0.0", default-features = false } +arrow = { version = "49.0.0", features=["ipc_compression"] } +arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] } +arrow-schema = { version = "49.0.0", default-features = false } configure_me = { version = "0.4.0" } configure_me_codegen = { version = "0.4.4" } -datafusion = "33.0.0" -datafusion-cli = "33.0.0" -datafusion-proto = "33.0.0" -object_store = "0.7.0" -sqlparser = "0.39.0" +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "34.0.0-rc1" } +datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "34.0.0-rc1" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "34.0.0-rc1" } +object_store = "0.8.0" +sqlparser = "0.40.0" tonic = { version = "0.10" } tonic-build = { version = "0.10", default-features = false, features = [ "transport", diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs index ea3815594..a1c0d7e3c 100644 --- a/ballista/client/src/context.rs +++ b/ballista/client/src/context.rs @@ -616,8 +616,6 @@ mod tests { target_partitions: x.target_partitions, file_sort_order: vec![], infinite_source: false, - insert_mode: - datafusion::datasource::listing::ListingTableInsertMode::Error, file_type_write_options: None, single_file: false, }; diff --git a/ballista/core/src/cache_layer/object_store/file.rs b/ballista/core/src/cache_layer/object_store/file.rs index d52ca954b..169d2b5ce 100644 --- a/ballista/core/src/cache_layer/object_store/file.rs +++ b/ballista/core/src/cache_layer/object_store/file.rs @@ -22,11 +22,12 @@ use crate::error::BallistaError; use async_trait::async_trait; use ballista_cache::loading_cache::LoadingCache; use bytes::Bytes; -use futures::stream::BoxStream; +use futures::stream::{self, BoxStream, StreamExt}; use log::info; use object_store::path::Path; use object_store::{ Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, + PutOptions, PutResult, }; use std::fmt::{Debug, Display, Formatter}; use std::ops::Range; @@ -73,7 +74,24 @@ impl ObjectStore for FileCacheObjectStore where M: CacheMedium, { - async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> { + async fn put( + &self, + _location: &Path, + _bytes: Bytes, + ) -> object_store::Result { + Err(Error::NotSupported { + source: Box::new(BallistaError::General( + "Write path is not supported".to_string(), + )), + }) + } + + async fn put_opts( + &self, + _location: &Path, + _bytes: Bytes, + _opts: PutOptions, + ) -> object_store::Result { Err(Error::NotSupported { source: Box::new(BallistaError::General( "Write path is not supported".to_string(), @@ -209,13 +227,18 @@ where }) } - async fn list( + fn list( &self, _prefix: Option<&Path>, - ) -> object_store::Result>> { - Err(Error::NotSupported { - source: Box::new(BallistaError::General("List is not supported".to_string())), + ) -> BoxStream<'_, object_store::Result> { + stream::once(async { + Err(Error::NotSupported { + source: Box::new(BallistaError::General( + "List is not supported".to_string(), + )), + }) }) + .boxed() } async fn list_with_delimiter( diff --git a/ballista/core/src/cache_layer/object_store/mod.rs b/ballista/core/src/cache_layer/object_store/mod.rs index b90a741fe..6d754ecaf 100644 --- a/ballista/core/src/cache_layer/object_store/mod.rs +++ b/ballista/core/src/cache_layer/object_store/mod.rs @@ -22,7 +22,8 @@ use bytes::Bytes; use futures::stream::BoxStream; use object_store::path::Path; use object_store::{ - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, + PutResult, }; use std::fmt::{Debug, Display, Formatter}; use std::ops::Range; @@ -60,10 +61,23 @@ impl Display for ObjectStoreWithKey { #[async_trait] impl ObjectStore for ObjectStoreWithKey { - async fn put(&self, location: &Path, bytes: Bytes) -> object_store::Result<()> { + async fn put( + &self, + location: &Path, + bytes: Bytes, + ) -> object_store::Result { self.inner.put(location, bytes).await } + async fn put_opts( + &self, + location: &Path, + bytes: Bytes, + opts: PutOptions, + ) -> object_store::Result { + self.inner.put_opts(location, bytes, opts).await + } + async fn put_multipart( &self, location: &Path, @@ -115,11 +129,11 @@ impl ObjectStore for ObjectStoreWithKey { self.inner.delete(location).await } - async fn list( + fn list( &self, prefix: Option<&Path>, - ) -> object_store::Result>> { - self.inner.list(prefix).await + ) -> BoxStream<'_, object_store::Result> { + self.inner.list(prefix) } async fn list_with_delimiter( diff --git a/ballista/core/src/cache_layer/policy/file.rs b/ballista/core/src/cache_layer/policy/file.rs index 98d76a114..dfc6c83fc 100644 --- a/ballista/core/src/cache_layer/policy/file.rs +++ b/ballista/core/src/cache_layer/policy/file.rs @@ -202,6 +202,7 @@ where last_modified: source_meta.last_modified, size: cache_meta.size, e_tag: source_meta.e_tag, + version: None, }) } diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index e16c1b4c2..a6541c081 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -319,7 +319,7 @@ impl QueryPlanner for BallistaQueryPlanner { match logical_plan { LogicalPlan::Ddl(DdlStatement::CreateExternalTable(_)) => { // table state is managed locally in the BallistaContext, not in the scheduler - Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty())))) + Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))) } _ => Ok(Arc::new(DistributedQueryExec::with_repr( self.scheduler_url.clone(), diff --git a/ballista/scheduler/src/cluster/mod.rs b/ballista/scheduler/src/cluster/mod.rs index 793d3fc1f..f7e1ca465 100644 --- a/ballista/scheduler/src/cluster/mod.rs +++ b/ballista/scheduler/src/cluster/mod.rs @@ -1092,6 +1092,7 @@ mod test { last_modified: Default::default(), size: 1, e_tag: None, + version: None, }, partition_values: vec![], range: None, diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index 8257891cf..c1ff35deb 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -308,12 +308,6 @@ impl SchedulerGrpc let path = Path::from(path.as_str()); let file_metas: Vec<_> = obj_store .list(Some(&path)) - .await - .map_err(|e| { - let msg = format!("Error listing files: {e}"); - error!("{}", msg); - tonic::Status::internal(msg) - })? .try_collect() .await .map_err(|e| { diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 189719c3e..b2863a411 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -26,7 +26,7 @@ use ballista::prelude::{ use datafusion::arrow::array::*; use datafusion::arrow::util::display::array_value_to_string; use datafusion::common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION}; -use datafusion::datasource::listing::{ListingTableInsertMode, ListingTableUrl}; +use datafusion::datasource::listing::ListingTableUrl; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionState; @@ -845,7 +845,6 @@ async fn get_table( table_partition_cols: vec![], file_sort_order: vec![], infinite_source: false, - insert_mode: ListingTableInsertMode::Error, file_type_write_options: None, single_file: false, };