From 09afb2e5434e409ac05ab2f16d3a5b6d13071a0d Mon Sep 17 00:00:00 2001 From: yangzhong Date: Fri, 26 May 2023 17:38:13 +0800 Subject: [PATCH] Add config grpc_server_max_decoding_message_size to make the maximum size of a decoded message at the grpc server side configurable --- ballista/executor/executor_config_spec.toml | 8 +++++++- ballista/executor/src/bin/main.rs | 4 +++- ballista/executor/src/executor_process.rs | 17 ++++++++++------- ballista/executor/src/executor_server.rs | 10 +++++++--- ballista/scheduler/scheduler_config_spec.toml | 8 +++++++- ballista/scheduler/src/bin/main.rs | 1 + ballista/scheduler/src/config.rs | 8 ++++++++ ballista/scheduler/src/scheduler_process.rs | 6 +++++- 8 files changed, 48 insertions(+), 14 deletions(-) diff --git a/ballista/executor/executor_config_spec.toml b/ballista/executor/executor_config_spec.toml index fe91f0c5b..ba515d187 100644 --- a/ballista/executor/executor_config_spec.toml +++ b/ballista/executor/executor_config_spec.toml @@ -124,4 +124,10 @@ default = "std::string::String::from(\"INFO,datafusion=INFO\")" name = "log_rotation_policy" type = "ballista_core::config::LogRotationPolicy" doc = "Tracing log rotation policy, possible values: minutely, hourly, daily, never. Default: daily" -default = "ballista_core::config::LogRotationPolicy::Daily" \ No newline at end of file +default = "ballista_core::config::LogRotationPolicy::Daily" + +[[param]] +name = "grpc_server_max_decoding_message_size" +type = "u32" +default = "16777216" +doc = "The maximum size of a decoded message at the grpc server side. Default: 16MB" \ No newline at end of file diff --git a/ballista/executor/src/bin/main.rs b/ballista/executor/src/bin/main.rs index b5765165b..f5cca4c2d 100644 --- a/ballista/executor/src/bin/main.rs +++ b/ballista/executor/src/bin/main.rs @@ -18,6 +18,7 @@ //! Ballista Rust executor binary. use anyhow::Result; +use std::sync::Arc; use ballista_core::print_version; use ballista_executor::executor_process::{ @@ -77,8 +78,9 @@ async fn main() -> Result<()> { print_thread_info: opt.print_thread_info, job_data_ttl_seconds: opt.job_data_ttl_seconds, job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds, + grpc_server_max_decoding_message_size: opt.grpc_server_max_decoding_message_size, execution_engine: None, }; - start_executor_process(config).await + start_executor_process(Arc::new(config)).await } diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index 6aea9b6e2..a5ca9e4b3 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -83,16 +83,19 @@ pub struct ExecutorProcessConfig { pub log_rotation_policy: LogRotationPolicy, pub job_data_ttl_seconds: u64, pub job_data_clean_up_interval_seconds: u64, + /// The maximum size of a decoded message at the grpc server side. + pub grpc_server_max_decoding_message_size: u32, /// Optional execution engine to use to execute physical plans, will default to /// DataFusion if none is provided. pub execution_engine: Option>, } -pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> { +pub async fn start_executor_process(opt: Arc) -> Result<()> { let rust_log = env::var(EnvFilter::DEFAULT_ENV); - let log_filter = EnvFilter::new(rust_log.unwrap_or(opt.special_mod_log_level)); + let log_filter = + EnvFilter::new(rust_log.unwrap_or(opt.special_mod_log_level.clone())); // File layer - if let Some(log_dir) = opt.log_dir { + if let Some(log_dir) = opt.log_dir.clone() { let log_file = match opt.log_rotation_policy { LogRotationPolicy::Minutely => { tracing_appender::rolling::minutely(log_dir, &opt.log_file_name_prefix) @@ -130,11 +133,11 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> { .parse() .with_context(|| format!("Could not parse address: {addr}"))?; - let scheduler_host = opt.scheduler_host; + let scheduler_host = opt.scheduler_host.clone(); let scheduler_port = opt.scheduler_port; let scheduler_url = format!("http://{scheduler_host}:{scheduler_port}"); - let work_dir = opt.work_dir.unwrap_or( + let work_dir = opt.work_dir.clone().unwrap_or( TempDir::new()? .into_path() .into_os_string() @@ -185,7 +188,7 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> { runtime, metrics_collector, concurrent_tasks, - opt.execution_engine, + opt.execution_engine.clone(), )); let connect_timeout = opt.scheduler_connect_timeout_seconds as u64; @@ -281,7 +284,7 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> { //If there is executor registration error during startup, return the error and stop early. executor_server::startup( scheduler.clone(), - opt.bind_host, + opt.clone(), executor.clone(), default_codec, stop_send, diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs index 25ffbe26f..e04fe75ec 100644 --- a/ballista/executor/src/executor_server.rs +++ b/ballista/executor/src/executor_server.rs @@ -54,6 +54,7 @@ use tokio::task::JoinHandle; use crate::cpu_bound_executor::DedicatedExecutor; use crate::execution_engine::QueryStageExecutor; use crate::executor::Executor; +use crate::executor_process::ExecutorProcessConfig; use crate::shutdown::ShutdownNotifier; use crate::{as_task_status, TaskExecutionTimes}; @@ -77,7 +78,7 @@ struct CuratorTaskStatus { pub async fn startup( mut scheduler: SchedulerGrpcClient, - bind_host: String, + config: Arc, executor: Arc, codec: BallistaCodec, stop_send: mpsc::Sender, @@ -102,14 +103,17 @@ pub async fn startup( // 1. Start executor grpc service let server = { let executor_meta = executor.metadata.clone(); - let addr = format!("{}:{}", bind_host, executor_meta.grpc_port); + let addr = format!("{}:{}", config.bind_host, executor_meta.grpc_port); let addr = addr.parse().unwrap(); info!( "Ballista v{} Rust Executor Grpc Server listening on {:?}", BALLISTA_VERSION, addr ); - let server = ExecutorGrpcServer::new(executor_server.clone()); + let server = ExecutorGrpcServer::new(executor_server.clone()) + .max_decoding_message_size( + config.grpc_server_max_decoding_message_size as usize, + ); let mut grpc_shutdown = shutdown_noti.subscribe_for_shutdown(); tokio::spawn(async move { let shutdown_signal = grpc_shutdown.recv(); diff --git a/ballista/scheduler/scheduler_config_spec.toml b/ballista/scheduler/scheduler_config_spec.toml index f91579508..8811f4db1 100644 --- a/ballista/scheduler/scheduler_config_spec.toml +++ b/ballista/scheduler/scheduler_config_spec.toml @@ -151,4 +151,10 @@ doc = "Time in seconds an executor should be considered lost after it enters ter name = "scheduler_event_expected_processing_duration" type = "u64" default = "0" -doc = "The maximum expected processing time of a scheduler event (microseconds). Zero means disable." \ No newline at end of file +doc = "The maximum expected processing time of a scheduler event (microseconds). Zero means disable." + +[[param]] +name = "grpc_server_max_decoding_message_size" +type = "u32" +default = "16777216" +doc = "The maximum size of a decoded message at the grpc server side. Default: 16MB" \ No newline at end of file diff --git a/ballista/scheduler/src/bin/main.rs b/ballista/scheduler/src/bin/main.rs index 44ff79d8c..d95ea880e 100644 --- a/ballista/scheduler/src/bin/main.rs +++ b/ballista/scheduler/src/bin/main.rs @@ -138,6 +138,7 @@ async fn main() -> Result<()> { executor_termination_grace_period: opt.executor_termination_grace_period, scheduler_event_expected_processing_duration: opt .scheduler_event_expected_processing_duration, + grpc_server_max_decoding_message_size: opt.grpc_server_max_decoding_message_size, }; let cluster = BallistaCluster::new_from_config(&config).await?; diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs index cde78824c..f5e33ff78 100644 --- a/ballista/scheduler/src/config.rs +++ b/ballista/scheduler/src/config.rs @@ -54,6 +54,8 @@ pub struct SchedulerConfig { pub executor_termination_grace_period: u64, /// The maximum expected processing time of a scheduler event (microseconds). Zero means disable. pub scheduler_event_expected_processing_duration: u64, + /// The maximum size of a decoded message at the grpc server side. + pub grpc_server_max_decoding_message_size: u32, } impl Default for SchedulerConfig { @@ -72,6 +74,7 @@ impl Default for SchedulerConfig { job_resubmit_interval_ms: None, executor_termination_grace_period: 0, scheduler_event_expected_processing_duration: 0, + grpc_server_max_decoding_message_size: 16777216, } } } @@ -153,6 +156,11 @@ impl SchedulerConfig { self.executor_termination_grace_period = value; self } + + pub fn with_grpc_server_max_decoding_message_size(mut self, value: u32) -> Self { + self.grpc_server_max_decoding_message_size = value; + self + } } #[derive(Clone, Debug)] diff --git a/ballista/scheduler/src/scheduler_process.rs b/ballista/scheduler/src/scheduler_process.rs index e20eed8a9..9b7e220f9 100644 --- a/ballista/scheduler/src/scheduler_process.rs +++ b/ballista/scheduler/src/scheduler_process.rs @@ -71,8 +71,12 @@ pub async fn start_server( Server::bind(&addr) .serve(make_service_fn(move |request: &AddrStream| { + let config = &scheduler_server.state.config; let scheduler_grpc_server = - SchedulerGrpcServer::new(scheduler_server.clone()); + SchedulerGrpcServer::new(scheduler_server.clone()) + .max_decoding_message_size( + config.grpc_server_max_decoding_message_size as usize, + ); let keda_scaler = ExternalScalerServer::new(scheduler_server.clone());