Skip to content

Commit

Permalink
Add config grpc_server_max_decoding_message_size to make the maximum …
Browse files Browse the repository at this point in the history
…size of a decoded message at the grpc server side configurable (#782)

Co-authored-by: yangzhong <[email protected]>
  • Loading branch information
yahoNanJing and kyotoYaho authored May 29, 2023
1 parent d5a5500 commit 10e021a
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 14 deletions.
8 changes: 7 additions & 1 deletion ballista/executor/executor_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,10 @@ default = "std::string::String::from(\"INFO,datafusion=INFO\")"
name = "log_rotation_policy"
type = "ballista_core::config::LogRotationPolicy"
doc = "Tracing log rotation policy, possible values: minutely, hourly, daily, never. Default: daily"
default = "ballista_core::config::LogRotationPolicy::Daily"
default = "ballista_core::config::LogRotationPolicy::Daily"

[[param]]
name = "grpc_server_max_decoding_message_size"
type = "u32"
default = "16777216"
doc = "The maximum size of a decoded message at the grpc server side. Default: 16MB"
4 changes: 3 additions & 1 deletion ballista/executor/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Ballista Rust executor binary.

use anyhow::Result;
use std::sync::Arc;

use ballista_core::print_version;
use ballista_executor::executor_process::{
Expand Down Expand Up @@ -77,8 +78,9 @@ async fn main() -> Result<()> {
print_thread_info: opt.print_thread_info,
job_data_ttl_seconds: opt.job_data_ttl_seconds,
job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds,
grpc_server_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
execution_engine: None,
};

start_executor_process(config).await
start_executor_process(Arc::new(config)).await
}
17 changes: 10 additions & 7 deletions ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,19 @@ pub struct ExecutorProcessConfig {
pub log_rotation_policy: LogRotationPolicy,
pub job_data_ttl_seconds: u64,
pub job_data_clean_up_interval_seconds: u64,
/// The maximum size of a decoded message at the grpc server side.
pub grpc_server_max_decoding_message_size: u32,
/// Optional execution engine to use to execute physical plans, will default to
/// DataFusion if none is provided.
pub execution_engine: Option<Arc<dyn ExecutionEngine>>,
}

pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<()> {
let rust_log = env::var(EnvFilter::DEFAULT_ENV);
let log_filter = EnvFilter::new(rust_log.unwrap_or(opt.special_mod_log_level));
let log_filter =
EnvFilter::new(rust_log.unwrap_or(opt.special_mod_log_level.clone()));
// File layer
if let Some(log_dir) = opt.log_dir {
if let Some(log_dir) = opt.log_dir.clone() {
let log_file = match opt.log_rotation_policy {
LogRotationPolicy::Minutely => {
tracing_appender::rolling::minutely(log_dir, &opt.log_file_name_prefix)
Expand Down Expand Up @@ -130,11 +133,11 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
.parse()
.with_context(|| format!("Could not parse address: {addr}"))?;

let scheduler_host = opt.scheduler_host;
let scheduler_host = opt.scheduler_host.clone();
let scheduler_port = opt.scheduler_port;
let scheduler_url = format!("http://{scheduler_host}:{scheduler_port}");

let work_dir = opt.work_dir.unwrap_or(
let work_dir = opt.work_dir.clone().unwrap_or(
TempDir::new()?
.into_path()
.into_os_string()
Expand Down Expand Up @@ -185,7 +188,7 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
runtime,
metrics_collector,
concurrent_tasks,
opt.execution_engine,
opt.execution_engine.clone(),
));

let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
Expand Down Expand Up @@ -281,7 +284,7 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
//If there is executor registration error during startup, return the error and stop early.
executor_server::startup(
scheduler.clone(),
opt.bind_host,
opt.clone(),
executor.clone(),
default_codec,
stop_send,
Expand Down
10 changes: 7 additions & 3 deletions ballista/executor/src/executor_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use tokio::task::JoinHandle;
use crate::cpu_bound_executor::DedicatedExecutor;
use crate::execution_engine::QueryStageExecutor;
use crate::executor::Executor;
use crate::executor_process::ExecutorProcessConfig;
use crate::shutdown::ShutdownNotifier;
use crate::{as_task_status, TaskExecutionTimes};

Expand All @@ -77,7 +78,7 @@ struct CuratorTaskStatus {

pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
mut scheduler: SchedulerGrpcClient<Channel>,
bind_host: String,
config: Arc<ExecutorProcessConfig>,
executor: Arc<Executor>,
codec: BallistaCodec<T, U>,
stop_send: mpsc::Sender<bool>,
Expand All @@ -102,14 +103,17 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
// 1. Start executor grpc service
let server = {
let executor_meta = executor.metadata.clone();
let addr = format!("{}:{}", bind_host, executor_meta.grpc_port);
let addr = format!("{}:{}", config.bind_host, executor_meta.grpc_port);
let addr = addr.parse().unwrap();

info!(
"Ballista v{} Rust Executor Grpc Server listening on {:?}",
BALLISTA_VERSION, addr
);
let server = ExecutorGrpcServer::new(executor_server.clone());
let server = ExecutorGrpcServer::new(executor_server.clone())
.max_decoding_message_size(
config.grpc_server_max_decoding_message_size as usize,
);
let mut grpc_shutdown = shutdown_noti.subscribe_for_shutdown();
tokio::spawn(async move {
let shutdown_signal = grpc_shutdown.recv();
Expand Down
8 changes: 7 additions & 1 deletion ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,10 @@ doc = "Time in seconds an executor should be considered lost after it enters ter
name = "scheduler_event_expected_processing_duration"
type = "u64"
default = "0"
doc = "The maximum expected processing time of a scheduler event (microseconds). Zero means disable."
doc = "The maximum expected processing time of a scheduler event (microseconds). Zero means disable."

[[param]]
name = "grpc_server_max_decoding_message_size"
type = "u32"
default = "16777216"
doc = "The maximum size of a decoded message at the grpc server side. Default: 16MB"
1 change: 1 addition & 0 deletions ballista/scheduler/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ async fn main() -> Result<()> {
executor_termination_grace_period: opt.executor_termination_grace_period,
scheduler_event_expected_processing_duration: opt
.scheduler_event_expected_processing_duration,
grpc_server_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
};

let cluster = BallistaCluster::new_from_config(&config).await?;
Expand Down
8 changes: 8 additions & 0 deletions ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct SchedulerConfig {
pub executor_termination_grace_period: u64,
/// The maximum expected processing time of a scheduler event (microseconds). Zero means disable.
pub scheduler_event_expected_processing_duration: u64,
/// The maximum size of a decoded message at the grpc server side.
pub grpc_server_max_decoding_message_size: u32,
}

impl Default for SchedulerConfig {
Expand All @@ -72,6 +74,7 @@ impl Default for SchedulerConfig {
job_resubmit_interval_ms: None,
executor_termination_grace_period: 0,
scheduler_event_expected_processing_duration: 0,
grpc_server_max_decoding_message_size: 16777216,
}
}
}
Expand Down Expand Up @@ -153,6 +156,11 @@ impl SchedulerConfig {
self.executor_termination_grace_period = value;
self
}

pub fn with_grpc_server_max_decoding_message_size(mut self, value: u32) -> Self {
self.grpc_server_max_decoding_message_size = value;
self
}
}

#[derive(Clone, Debug)]
Expand Down
6 changes: 5 additions & 1 deletion ballista/scheduler/src/scheduler_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,12 @@ pub async fn start_server(

Server::bind(&addr)
.serve(make_service_fn(move |request: &AddrStream| {
let config = &scheduler_server.state.config;
let scheduler_grpc_server =
SchedulerGrpcServer::new(scheduler_server.clone());
SchedulerGrpcServer::new(scheduler_server.clone())
.max_decoding_message_size(
config.grpc_server_max_decoding_message_size as usize,
);

let keda_scaler = ExternalScalerServer::new(scheduler_server.clone());

Expand Down

0 comments on commit 10e021a

Please sign in to comment.