From 10c6d0f3b2f05982fb5910e16455c2281a3ae3c2 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Tue, 25 Apr 2023 17:06:37 +0800 Subject: [PATCH] Upgrade DataFusion to 23.0.0 --- Cargo.toml | 19 +- ballista-cli/Cargo.toml | 2 +- ballista/client/Cargo.toml | 2 +- ballista/client/src/context.rs | 2 +- ballista/core/Cargo.toml | 13 +- ballista/core/src/serde/generated/ballista.rs | 454 +++++++++++++++--- ballista/core/src/serde/scheduler/mod.rs | 2 +- ballista/executor/Cargo.toml | 8 +- ballista/scheduler/Cargo.toml | 15 +- ballista/scheduler/scheduler_config_spec.toml | 1 - 10 files changed, 421 insertions(+), 97 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bbab677c1..392339427 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,13 +17,22 @@ [workspace] members = ["ballista-cli", "ballista/client", "ballista/core", "ballista/executor", "ballista/scheduler", "benchmarks", "examples"] -exclude = ["python"] [workspace.dependencies] -arrow = { version = "36.0.0" } -arrow-flight = { version = "36.0.0", features = ["flight-sql-experimental"] } -datafusion = "22.0.0" -datafusion-proto = "22.0.0" +arrow = { version = "37.0.0" } +arrow-flight = { version = "37.0.0", features = ["flight-sql-experimental"] } +configure_me = { version = "0.4.0" } +configure_me_codegen = { version = "0.4.4" } +datafusion = "23.0.0" +datafusion-cli = "23.0.0" +datafusion-proto = "23.0.0" +object_store = "0.5.4" +sqlparser = "0.33.0" +tonic = { version = "0.9" } +tonic-build = { version = "0.9", default-features = false, features = [ + "transport", + "prost", +] } # cargo build --profile release-lto [profile.release-lto] diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml index 8f94db42d..c805f3d13 100644 --- a/ballista-cli/Cargo.toml +++ b/ballista-cli/Cargo.toml @@ -32,7 +32,7 @@ readme = "README.md" ballista = { path = "../ballista/client", version = "0.11.0", features = ["standalone"] } clap = { version = "3", features = ["derive", "cargo"] } datafusion = { workspace = true } -datafusion-cli = "22.0.0" +datafusion-cli = { workspace = true } dirs = "4.0.0" env_logger = "0.10" mimalloc = { version = "0.1", default-features = false } diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml index 88b93378f..e371a8597 100644 --- a/ballista/client/Cargo.toml +++ b/ballista/client/Cargo.toml @@ -36,7 +36,7 @@ datafusion-proto = { workspace = true } futures = "0.3" log = "0.4" parking_lot = "0.12" -sqlparser = "0.32.0" +sqlparser = { workspace = true } tempfile = "3" tokio = "1.0" diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs index 84bbaf02e..dea233d7d 100644 --- a/ballista/client/src/context.rs +++ b/ballista/client/src/context.rs @@ -623,7 +623,7 @@ mod tests { .map(|t| ListingTableUrl::parse(t).unwrap()) .collect(); let config = ListingTableConfig::new_with_multi_paths(table_paths) - .with_schema(Arc::new(Schema::new(vec![]))) + .with_schema(Arc::new(Schema::empty())) .with_listing_options(error_options); let error_table = ListingTable::try_new(config).unwrap(); diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml index 95282a367..feeb83110 100644 --- a/ballista/core/Cargo.toml +++ b/ballista/core/Cargo.toml @@ -24,7 +24,7 @@ homepage = "https://github.com/apache/arrow-ballista" repository = "https://github.com/apache/arrow-ballista" readme = "README.md" authors = ["Apache Arrow "] -edition = "2018" +edition = "2021" build = "build.rs" # Exclude proto files so crates.io consumers don't need protoc @@ -58,7 +58,7 @@ hashbrown = "0.13" itertools = "0.10" libloading = "0.7.3" log = "0.4" -object_store = "0.5.2" +object_store = { workspace = true } once_cell = "1.9.0" parking_lot = "0.12" @@ -67,11 +67,11 @@ prost = "0.11" prost-types = "0.11" rand = "0.8" serde = { version = "1", features = ["derive"] } -sqlparser = "0.32.0" +sqlparser = { workspace = true } sys-info = "0.9.0" tokio = "1.0" tokio-stream = { version = "0.1", features = ["net"] } -tonic = "0.8" +tonic = { workspace = true } url = "2.2" uuid = { version = "1.0", features = ["v4"] } walkdir = "2.3.2" @@ -81,7 +81,4 @@ tempfile = "3" [build-dependencies] rustc_version = "0.4.0" -tonic-build = { version = "0.8", default-features = false, features = [ - "transport", - "prost", -] } +tonic-build = { workspace = true } diff --git a/ballista/core/src/serde/generated/ballista.rs b/ballista/core/src/serde/generated/ballista.rs index f511f2a2d..439bbf840 100644 --- a/ballista/core/src/serde/generated/ballista.rs +++ b/ballista/core/src/serde/generated/ballista.rs @@ -1135,7 +1135,7 @@ pub mod scheduler_grpc_client { /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -1191,11 +1191,27 @@ pub mod scheduler_grpc_client { self.inner = self.inner.accept_compressed(encoding); self } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } /// Executors must poll the scheduler for heartbeat and to receive tasks pub async fn poll_work( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -1209,12 +1225,18 @@ pub mod scheduler_grpc_client { let path = http::uri::PathAndQuery::from_static( "/ballista.protobuf.SchedulerGrpc/PollWork", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("ballista.protobuf.SchedulerGrpc", "PollWork")); + self.inner.unary(req, path, codec).await } pub async fn register_executor( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -1228,14 +1250,25 @@ pub mod scheduler_grpc_client { let path = http::uri::PathAndQuery::from_static( "/ballista.protobuf.SchedulerGrpc/RegisterExecutor", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "ballista.protobuf.SchedulerGrpc", + "RegisterExecutor", + ), + ); + self.inner.unary(req, path, codec).await } /// Push-based task scheduler will only leverage this interface /// rather than the PollWork interface to report executor states pub async fn heart_beat_from_executor( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -1249,12 +1282,23 @@ pub mod scheduler_grpc_client { let path = http::uri::PathAndQuery::from_static( "/ballista.protobuf.SchedulerGrpc/HeartBeatFromExecutor", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "ballista.protobuf.SchedulerGrpc", + "HeartBeatFromExecutor", + ), + ); + self.inner.unary(req, path, codec).await } pub async fn update_task_status( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -1268,12 +1312,23 @@ pub mod scheduler_grpc_client { let path = http::uri::PathAndQuery::from_static( "/ballista.protobuf.SchedulerGrpc/UpdateTaskStatus", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "ballista.protobuf.SchedulerGrpc", + "UpdateTaskStatus", + ), + ); + self.inner.unary(req, path, codec).await } pub async fn get_file_metadata( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -1287,12 +1342,20 @@ pub mod scheduler_grpc_client { let path = http::uri::PathAndQuery::from_static( "/ballista.protobuf.SchedulerGrpc/GetFileMetadata", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("ballista.protobuf.SchedulerGrpc", "GetFileMetadata"), + ); + self.inner.unary(req, path, codec).await } pub async fn execute_query( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -1306,12 +1369,20 @@ pub mod scheduler_grpc_client { let path = http::uri::PathAndQuery::from_static( "/ballista.protobuf.SchedulerGrpc/ExecuteQuery", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("ballista.protobuf.SchedulerGrpc", "ExecuteQuery"), + ); + self.inner.unary(req, path, codec).await } pub async fn get_job_status( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -1325,13 +1396,21 @@ pub mod scheduler_grpc_client { let path = http::uri::PathAndQuery::from_static( "/ballista.protobuf.SchedulerGrpc/GetJobStatus", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("ballista.protobuf.SchedulerGrpc", "GetJobStatus"), + ); + self.inner.unary(req, path, codec).await } /// Used by Executor to tell Scheduler it is stopped. pub async fn executor_stopped( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -1345,12 +1424,20 @@ pub mod scheduler_grpc_client { let path = http::uri::PathAndQuery::from_static( "/ballista.protobuf.SchedulerGrpc/ExecutorStopped", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("ballista.protobuf.SchedulerGrpc", "ExecutorStopped"), + ); + self.inner.unary(req, path, codec).await } pub async fn cancel_job( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -1364,12 +1451,18 @@ pub mod scheduler_grpc_client { let path = http::uri::PathAndQuery::from_static( "/ballista.protobuf.SchedulerGrpc/CancelJob", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("ballista.protobuf.SchedulerGrpc", "CancelJob")); + self.inner.unary(req, path, codec).await } pub async fn clean_job_data( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -1383,7 +1476,12 @@ pub mod scheduler_grpc_client { let path = http::uri::PathAndQuery::from_static( "/ballista.protobuf.SchedulerGrpc/CleanJobData", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("ballista.protobuf.SchedulerGrpc", "CleanJobData"), + ); + self.inner.unary(req, path, codec).await } } } @@ -1400,7 +1498,7 @@ pub mod executor_grpc_client { /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -1456,10 +1554,29 @@ pub mod executor_grpc_client { self.inner = self.inner.accept_compressed(encoding); self } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } pub async fn launch_task( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -1473,12 +1590,18 @@ pub mod executor_grpc_client { let path = http::uri::PathAndQuery::from_static( "/ballista.protobuf.ExecutorGrpc/LaunchTask", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("ballista.protobuf.ExecutorGrpc", "LaunchTask")); + self.inner.unary(req, path, codec).await } pub async fn launch_multi_task( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -1492,12 +1615,20 @@ pub mod executor_grpc_client { let path = http::uri::PathAndQuery::from_static( "/ballista.protobuf.ExecutorGrpc/LaunchMultiTask", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("ballista.protobuf.ExecutorGrpc", "LaunchMultiTask"), + ); + self.inner.unary(req, path, codec).await } pub async fn stop_executor( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -1511,12 +1642,20 @@ pub mod executor_grpc_client { let path = http::uri::PathAndQuery::from_static( "/ballista.protobuf.ExecutorGrpc/StopExecutor", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("ballista.protobuf.ExecutorGrpc", "StopExecutor"), + ); + self.inner.unary(req, path, codec).await } pub async fn cancel_tasks( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -1530,12 +1669,20 @@ pub mod executor_grpc_client { let path = http::uri::PathAndQuery::from_static( "/ballista.protobuf.ExecutorGrpc/CancelTasks", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("ballista.protobuf.ExecutorGrpc", "CancelTasks"), + ); + self.inner.unary(req, path, codec).await } pub async fn remove_job_data( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -1549,7 +1696,12 @@ pub mod executor_grpc_client { let path = http::uri::PathAndQuery::from_static( "/ballista.protobuf.ExecutorGrpc/RemoveJobData", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("ballista.protobuf.ExecutorGrpc", "RemoveJobData"), + ); + self.inner.unary(req, path, codec).await } } } @@ -1564,52 +1716,75 @@ pub mod scheduler_grpc_server { async fn poll_work( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; async fn register_executor( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Push-based task scheduler will only leverage this interface /// rather than the PollWork interface to report executor states async fn heart_beat_from_executor( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; async fn update_task_status( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn get_file_metadata( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn execute_query( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn get_job_status( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Used by Executor to tell Scheduler it is stopped. async fn executor_stopped( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn cancel_job( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; async fn clean_job_data( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } #[derive(Debug)] pub struct SchedulerGrpcServer { inner: _Inner, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl SchedulerGrpcServer { @@ -1622,6 +1797,8 @@ pub mod scheduler_grpc_server { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor( @@ -1645,6 +1822,22 @@ pub mod scheduler_grpc_server { self.send_compression_encodings.enable(encoding); self } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for SchedulerGrpcServer where @@ -1658,7 +1851,7 @@ pub mod scheduler_grpc_server { fn poll_ready( &mut self, _cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -1680,13 +1873,15 @@ pub mod scheduler_grpc_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).poll_work(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1696,6 +1891,10 @@ pub mod scheduler_grpc_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1718,7 +1917,7 @@ pub mod scheduler_grpc_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).register_executor(request).await }; @@ -1727,6 +1926,8 @@ pub mod scheduler_grpc_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1736,6 +1937,10 @@ pub mod scheduler_grpc_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1758,7 +1963,7 @@ pub mod scheduler_grpc_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).heart_beat_from_executor(request).await }; @@ -1767,6 +1972,8 @@ pub mod scheduler_grpc_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1776,6 +1983,10 @@ pub mod scheduler_grpc_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1798,7 +2009,7 @@ pub mod scheduler_grpc_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).update_task_status(request).await }; @@ -1807,6 +2018,8 @@ pub mod scheduler_grpc_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1816,6 +2029,10 @@ pub mod scheduler_grpc_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1838,7 +2055,7 @@ pub mod scheduler_grpc_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).get_file_metadata(request).await }; @@ -1847,6 +2064,8 @@ pub mod scheduler_grpc_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1856,6 +2075,10 @@ pub mod scheduler_grpc_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1878,7 +2101,7 @@ pub mod scheduler_grpc_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).execute_query(request).await }; @@ -1887,6 +2110,8 @@ pub mod scheduler_grpc_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1896,6 +2121,10 @@ pub mod scheduler_grpc_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1918,7 +2147,7 @@ pub mod scheduler_grpc_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).get_job_status(request).await }; @@ -1927,6 +2156,8 @@ pub mod scheduler_grpc_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1936,6 +2167,10 @@ pub mod scheduler_grpc_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1958,7 +2193,7 @@ pub mod scheduler_grpc_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).executor_stopped(request).await }; @@ -1967,6 +2202,8 @@ pub mod scheduler_grpc_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -1976,6 +2213,10 @@ pub mod scheduler_grpc_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -1998,13 +2239,15 @@ pub mod scheduler_grpc_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).cancel_job(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -2014,6 +2257,10 @@ pub mod scheduler_grpc_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -2036,7 +2283,7 @@ pub mod scheduler_grpc_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).clean_job_data(request).await }; @@ -2045,6 +2292,8 @@ pub mod scheduler_grpc_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -2054,6 +2303,10 @@ pub mod scheduler_grpc_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -2082,12 +2335,14 @@ pub mod scheduler_grpc_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner { @@ -2109,29 +2364,46 @@ pub mod executor_grpc_server { async fn launch_task( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn launch_multi_task( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn stop_executor( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn cancel_tasks( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn remove_job_data( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } #[derive(Debug)] pub struct ExecutorGrpcServer { inner: _Inner, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl ExecutorGrpcServer { @@ -2144,6 +2416,8 @@ pub mod executor_grpc_server { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor( @@ -2167,6 +2441,22 @@ pub mod executor_grpc_server { self.send_compression_encodings.enable(encoding); self } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for ExecutorGrpcServer where @@ -2180,7 +2470,7 @@ pub mod executor_grpc_server { fn poll_ready( &mut self, _cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -2202,13 +2492,15 @@ pub mod executor_grpc_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).launch_task(request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -2218,6 +2510,10 @@ pub mod executor_grpc_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -2240,7 +2536,7 @@ pub mod executor_grpc_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).launch_multi_task(request).await }; @@ -2249,6 +2545,8 @@ pub mod executor_grpc_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -2258,6 +2556,10 @@ pub mod executor_grpc_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -2280,7 +2582,7 @@ pub mod executor_grpc_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).stop_executor(request).await }; @@ -2289,6 +2591,8 @@ pub mod executor_grpc_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -2298,6 +2602,10 @@ pub mod executor_grpc_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -2320,7 +2628,7 @@ pub mod executor_grpc_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).cancel_tasks(request).await }; @@ -2329,6 +2637,8 @@ pub mod executor_grpc_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -2338,6 +2648,10 @@ pub mod executor_grpc_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -2360,7 +2674,7 @@ pub mod executor_grpc_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); + let inner = Arc::clone(&self.0); let fut = async move { (*inner).remove_job_data(request).await }; @@ -2369,6 +2683,8 @@ pub mod executor_grpc_server { } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; @@ -2378,6 +2694,10 @@ pub mod executor_grpc_server { .apply_compression_config( accept_compression_encodings, send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, ); let res = grpc.unary(method, req).await; Ok(res) @@ -2406,12 +2726,14 @@ pub mod executor_grpc_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner { diff --git a/ballista/core/src/serde/scheduler/mod.rs b/ballista/core/src/serde/scheduler/mod.rs index d8ff0906a..83a6d1735 100644 --- a/ballista/core/src/serde/scheduler/mod.rs +++ b/ballista/core/src/serde/scheduler/mod.rs @@ -135,7 +135,7 @@ impl PartitionStats { pub fn arrow_struct_repr(self) -> Field { Field::new( "partition_stats", - DataType::Struct(self.arrow_struct_fields()), + DataType::Struct(self.arrow_struct_fields().into()), false, ) } diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml index 1167e40a0..65568d82e 100644 --- a/ballista/executor/Cargo.toml +++ b/ballista/executor/Cargo.toml @@ -24,7 +24,7 @@ homepage = "https://github.com/apache/arrow-ballista" repository = "https://github.com/apache/arrow-ballista" readme = "README.md" authors = ["Apache Arrow "] -edition = "2018" +edition = "2021" [package.metadata.configure_me.bin] executor = "executor_config_spec.toml" @@ -43,7 +43,7 @@ arrow-flight = { workspace = true } async-trait = "0.1.41" ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] } chrono = { version = "0.4", default-features = false } -configure_me = "0.4.0" +configure_me = { workspace = true } dashmap = "5.4.0" datafusion = { workspace = true } datafusion-proto = { workspace = true } @@ -62,7 +62,7 @@ tokio = { version = "1.0", features = [ "signal", ] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = "0.8" +tonic = { workspace = true } tracing = "0.1.36" tracing-appender = "0.2.2" tracing-subscriber = { version = "0.3.15", features = [ @@ -75,7 +75,7 @@ uuid = { version = "1.0", features = ["v4"] } [dev-dependencies] [build-dependencies] -configure_me_codegen = "=0.4.0" +configure_me_codegen = { workspace = true } # use libc on unix like platforms to set worker priority in DedicatedExecutor [target."cfg(unix)".dependencies.libc] diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml index ee36dcbce..307099895 100644 --- a/ballista/scheduler/Cargo.toml +++ b/ballista/scheduler/Cargo.toml @@ -24,7 +24,7 @@ homepage = "https://github.com/apache/arrow-ballista" repository = "https://github.com/apache/arrow-ballista" readme = "README.md" authors = ["Apache Arrow "] -edition = "2018" +edition = "2021" [package.metadata.configure_me.bin] scheduler = "scheduler_config_spec.toml" @@ -49,7 +49,7 @@ async-trait = "0.1.41" ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] } base64 = { version = "0.13", default-features = false } clap = { version = "3", features = ["derive", "cargo"] } -configure_me = "0.4.0" +configure_me = { workspace = true } dashmap = "5.4.0" datafusion = { workspace = true } datafusion-proto = { workspace = true } @@ -62,7 +62,7 @@ http-body = "0.4" hyper = "0.14.4" itertools = "0.10.3" log = "0.4" -object_store = "0.5.0" +object_store = { workspace = true } once_cell = { version = "1.16.0", optional = true } parking_lot = "0.12" parse_arg = "0.1.3" @@ -74,7 +74,7 @@ serde = { version = "1", features = ["derive"] } sled_package = { package = "sled", version = "0.34", optional = true } tokio = { version = "1.0", features = ["full"] } tokio-stream = { version = "0.1", features = ["net"], optional = true } -tonic = "0.8" +tonic = { workspace = true } tower = { version = "0.4" } tracing = "0.1.36" tracing-appender = "0.2.2" @@ -90,8 +90,5 @@ warp = "0.3" ballista-core = { path = "../core", version = "0.11.0" } [build-dependencies] -configure_me_codegen = "=0.4.0" -tonic-build = { version = "0.8", default-features = false, features = [ - "transport", - "prost", -] } +configure_me_codegen = { workspace = true } +tonic-build = { workspace = true } diff --git a/ballista/scheduler/scheduler_config_spec.toml b/ballista/scheduler/scheduler_config_spec.toml index e1eb561d9..f91579508 100644 --- a/ballista/scheduler/scheduler_config_spec.toml +++ b/ballista/scheduler/scheduler_config_spec.toml @@ -51,7 +51,6 @@ doc = "etcd urls for use when discovery mode is `etcd`. Default: localhost:2379" default = "std::string::String::from(\"localhost:2379\")" [[param]] -abbr = "h" name = "bind_host" type = "String" default = "std::string::String::from(\"0.0.0.0\")"