From a03b4cc9a5ef2b077eb7307f820c2a11422eac1f Mon Sep 17 00:00:00 2001 From: "r.4ntix" Date: Wed, 10 May 2023 14:05:55 +0800 Subject: [PATCH] Upgrade DataFusion to 24.0.0 --- Cargo.toml | 17 +++++++---------- ballista/client/src/context.rs | 28 ++++++++++++++++------------ ballista/core/src/utils.rs | 4 ++-- ballista/scheduler/src/flight_sql.rs | 27 +++++++++++++++++++++++++-- 4 files changed, 50 insertions(+), 26 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 392339427..862922462 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,20 +19,17 @@ members = ["ballista-cli", "ballista/client", "ballista/core", "ballista/executor", "ballista/scheduler", "benchmarks", "examples"] [workspace.dependencies] -arrow = { version = "37.0.0" } -arrow-flight = { version = "37.0.0", features = ["flight-sql-experimental"] } +arrow = { version = "38.0.0" } +arrow-flight = { version = "38.0.0", features = ["flight-sql-experimental"] } configure_me = { version = "0.4.0" } configure_me_codegen = { version = "0.4.4" } -datafusion = "23.0.0" -datafusion-cli = "23.0.0" -datafusion-proto = "23.0.0" -object_store = "0.5.4" +datafusion = "24.0.0" +datafusion-cli = "24.0.0" +datafusion-proto = "24.0.0" +object_store = "0.5.6" sqlparser = "0.33.0" tonic = { version = "0.9" } -tonic-build = { version = "0.9", default-features = false, features = [ - "transport", - "prost", -] } +tonic-build = { version = "0.9", default-features = false, features = ["transport", "prost"] } # cargo build --profile release-lto [profile.release-lto] diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs index dea233d7d..99b7d9b1c 100644 --- a/ballista/client/src/context.rs +++ b/ballista/client/src/context.rs @@ -38,7 +38,9 @@ use datafusion::catalog::TableReference; use datafusion::dataframe::DataFrame; use datafusion::datasource::{source_as_provider, TableProvider}; use datafusion::error::{DataFusionError, Result}; -use datafusion::logical_expr::{CreateExternalTable, LogicalPlan, TableScan}; +use datafusion::logical_expr::{ + CreateExternalTable, DdlStatement, LogicalPlan, TableScan, +}; use datafusion::prelude::{ AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, SessionConfig, SessionContext, @@ -389,17 +391,19 @@ impl BallistaContext { let plan = ctx.state().create_logical_plan(sql).await?; match plan { - LogicalPlan::CreateExternalTable(CreateExternalTable { - ref schema, - ref name, - ref location, - ref file_type, - ref has_header, - ref delimiter, - ref table_partition_cols, - ref if_not_exists, - .. - }) => { + LogicalPlan::Ddl(DdlStatement::CreateExternalTable( + CreateExternalTable { + ref schema, + ref name, + ref location, + ref file_type, + ref has_header, + ref delimiter, + ref table_partition_cols, + ref if_not_exists, + .. + }, + )) => { let table_exists = ctx.table_exist(name)?; let schema: SchemaRef = Arc::new(schema.as_ref().to_owned().into()); let table_partition_cols = table_partition_cols diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index 6745b529f..7c02ea02a 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -32,7 +32,7 @@ use datafusion::execution::context::{ QueryPlanner, SessionConfig, SessionContext, SessionState, }; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; -use datafusion::logical_expr::LogicalPlan; +use datafusion::logical_expr::{DdlStatement, LogicalPlan}; use datafusion::physical_plan::aggregates::AggregateExec; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -415,7 +415,7 @@ impl QueryPlanner for BallistaQueryPlanner { session_state: &SessionState, ) -> std::result::Result, DataFusionError> { match logical_plan { - LogicalPlan::CreateExternalTable(_) => { + LogicalPlan::Ddl(DdlStatement::CreateExternalTable(_)) => { // table state is managed locally in the BallistaContext, not in the scheduler Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty())))) } diff --git a/ballista/scheduler/src/flight_sql.rs b/ballista/scheduler/src/flight_sql.rs index ca755fd8e..db13b5181 100644 --- a/ballista/scheduler/src/flight_sql.rs +++ b/ballista/scheduler/src/flight_sql.rs @@ -23,8 +23,9 @@ use arrow_flight::sql::{ ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, - CommandPreparedStatementQuery, CommandPreparedStatementUpdate, CommandStatementQuery, - CommandStatementUpdate, SqlInfo, TicketStatementQuery, + CommandGetXdbcTypeInfo, CommandPreparedStatementQuery, + CommandPreparedStatementUpdate, CommandStatementQuery, CommandStatementUpdate, + SqlInfo, TicketStatementQuery, }; use arrow_flight::{ Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, @@ -597,6 +598,16 @@ impl FlightSqlService for FlightSqlServiceImpl { Ok(Response::new(Box::pin(stream))) } + /// Get a FlightDataStream containing the data related to the supported XDBC types. + async fn do_get_xdbc_type_info( + &self, + _query: CommandGetXdbcTypeInfo, + _request: Request, + ) -> Result::DoGetStream>, Status> { + debug!("do_get_xdbc_type_info"); + Err(Status::unimplemented("Implement do_get_xdbc_type_info")) + } + async fn get_flight_info_statement( &self, query: CommandStatementQuery, @@ -721,6 +732,18 @@ impl FlightSqlService for FlightSqlServiceImpl { )) } + /// Get a FlightInfo to extract information about the supported XDBC types. + async fn get_flight_info_xdbc_type_info( + &self, + _query: CommandGetXdbcTypeInfo, + _request: Request, + ) -> Result, Status> { + debug!("get_flight_info_xdbc_type_info"); + Err(Status::unimplemented( + "Implement get_flight_info_xdbc_type_info", + )) + } + async fn do_get_statement( &self, _ticket: TicketStatementQuery,