Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config grpc_server_max_decoding_message_size to make the maximum size of a decoded message at the grpc server side configurable #782

Merged
merged 1 commit into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion ballista/executor/executor_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,10 @@ default = "std::string::String::from(\"INFO,datafusion=INFO\")"
name = "log_rotation_policy"
type = "ballista_core::config::LogRotationPolicy"
doc = "Tracing log rotation policy, possible values: minutely, hourly, daily, never. Default: daily"
default = "ballista_core::config::LogRotationPolicy::Daily"
default = "ballista_core::config::LogRotationPolicy::Daily"

[[param]]
name = "grpc_server_max_decoding_message_size"
type = "u32"
default = "16777216"
doc = "The maximum size of a decoded message at the grpc server side. Default: 16MB"
4 changes: 3 additions & 1 deletion ballista/executor/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Ballista Rust executor binary.

use anyhow::Result;
use std::sync::Arc;

use ballista_core::print_version;
use ballista_executor::executor_process::{
Expand Down Expand Up @@ -77,8 +78,9 @@ async fn main() -> Result<()> {
print_thread_info: opt.print_thread_info,
job_data_ttl_seconds: opt.job_data_ttl_seconds,
job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds,
grpc_server_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
execution_engine: None,
};

start_executor_process(config).await
start_executor_process(Arc::new(config)).await
}
17 changes: 10 additions & 7 deletions ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,19 @@ pub struct ExecutorProcessConfig {
pub log_rotation_policy: LogRotationPolicy,
pub job_data_ttl_seconds: u64,
pub job_data_clean_up_interval_seconds: u64,
/// The maximum size of a decoded message at the grpc server side.
pub grpc_server_max_decoding_message_size: u32,
/// Optional execution engine to use to execute physical plans, will default to
/// DataFusion if none is provided.
pub execution_engine: Option<Arc<dyn ExecutionEngine>>,
}

pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<()> {
let rust_log = env::var(EnvFilter::DEFAULT_ENV);
let log_filter = EnvFilter::new(rust_log.unwrap_or(opt.special_mod_log_level));
let log_filter =
EnvFilter::new(rust_log.unwrap_or(opt.special_mod_log_level.clone()));
// File layer
if let Some(log_dir) = opt.log_dir {
if let Some(log_dir) = opt.log_dir.clone() {
let log_file = match opt.log_rotation_policy {
LogRotationPolicy::Minutely => {
tracing_appender::rolling::minutely(log_dir, &opt.log_file_name_prefix)
Expand Down Expand Up @@ -130,11 +133,11 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
.parse()
.with_context(|| format!("Could not parse address: {addr}"))?;

let scheduler_host = opt.scheduler_host;
let scheduler_host = opt.scheduler_host.clone();
let scheduler_port = opt.scheduler_port;
let scheduler_url = format!("http://{scheduler_host}:{scheduler_port}");

let work_dir = opt.work_dir.unwrap_or(
let work_dir = opt.work_dir.clone().unwrap_or(
TempDir::new()?
.into_path()
.into_os_string()
Expand Down Expand Up @@ -185,7 +188,7 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
runtime,
metrics_collector,
concurrent_tasks,
opt.execution_engine,
opt.execution_engine.clone(),
));

let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
Expand Down Expand Up @@ -281,7 +284,7 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
//If there is executor registration error during startup, return the error and stop early.
executor_server::startup(
scheduler.clone(),
opt.bind_host,
opt.clone(),
executor.clone(),
default_codec,
stop_send,
Expand Down
10 changes: 7 additions & 3 deletions ballista/executor/src/executor_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use tokio::task::JoinHandle;
use crate::cpu_bound_executor::DedicatedExecutor;
use crate::execution_engine::QueryStageExecutor;
use crate::executor::Executor;
use crate::executor_process::ExecutorProcessConfig;
use crate::shutdown::ShutdownNotifier;
use crate::{as_task_status, TaskExecutionTimes};

Expand All @@ -77,7 +78,7 @@ struct CuratorTaskStatus {

pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
mut scheduler: SchedulerGrpcClient<Channel>,
bind_host: String,
config: Arc<ExecutorProcessConfig>,
executor: Arc<Executor>,
codec: BallistaCodec<T, U>,
stop_send: mpsc::Sender<bool>,
Expand All @@ -102,14 +103,17 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
// 1. Start executor grpc service
let server = {
let executor_meta = executor.metadata.clone();
let addr = format!("{}:{}", bind_host, executor_meta.grpc_port);
let addr = format!("{}:{}", config.bind_host, executor_meta.grpc_port);
let addr = addr.parse().unwrap();

info!(
"Ballista v{} Rust Executor Grpc Server listening on {:?}",
BALLISTA_VERSION, addr
);
let server = ExecutorGrpcServer::new(executor_server.clone());
let server = ExecutorGrpcServer::new(executor_server.clone())
.max_decoding_message_size(
config.grpc_server_max_decoding_message_size as usize,
);
let mut grpc_shutdown = shutdown_noti.subscribe_for_shutdown();
tokio::spawn(async move {
let shutdown_signal = grpc_shutdown.recv();
Expand Down
8 changes: 7 additions & 1 deletion ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,10 @@ doc = "Time in seconds an executor should be considered lost after it enters ter
name = "scheduler_event_expected_processing_duration"
type = "u64"
default = "0"
doc = "The maximum expected processing time of a scheduler event (microseconds). Zero means disable."
doc = "The maximum expected processing time of a scheduler event (microseconds). Zero means disable."

[[param]]
name = "grpc_server_max_decoding_message_size"
type = "u32"
default = "16777216"
doc = "The maximum size of a decoded message at the grpc server side. Default: 16MB"
1 change: 1 addition & 0 deletions ballista/scheduler/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ async fn main() -> Result<()> {
executor_termination_grace_period: opt.executor_termination_grace_period,
scheduler_event_expected_processing_duration: opt
.scheduler_event_expected_processing_duration,
grpc_server_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
};

let cluster = BallistaCluster::new_from_config(&config).await?;
Expand Down
8 changes: 8 additions & 0 deletions ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct SchedulerConfig {
pub executor_termination_grace_period: u64,
/// The maximum expected processing time of a scheduler event (microseconds). Zero means disable.
pub scheduler_event_expected_processing_duration: u64,
/// The maximum size of a decoded message at the grpc server side.
pub grpc_server_max_decoding_message_size: u32,
}

impl Default for SchedulerConfig {
Expand All @@ -72,6 +74,7 @@ impl Default for SchedulerConfig {
job_resubmit_interval_ms: None,
executor_termination_grace_period: 0,
scheduler_event_expected_processing_duration: 0,
grpc_server_max_decoding_message_size: 16777216,
}
}
}
Expand Down Expand Up @@ -153,6 +156,11 @@ impl SchedulerConfig {
self.executor_termination_grace_period = value;
self
}

pub fn with_grpc_server_max_decoding_message_size(mut self, value: u32) -> Self {
self.grpc_server_max_decoding_message_size = value;
self
}
}

#[derive(Clone, Debug)]
Expand Down
6 changes: 5 additions & 1 deletion ballista/scheduler/src/scheduler_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,12 @@ pub async fn start_server(

Server::bind(&addr)
.serve(make_service_fn(move |request: &AddrStream| {
let config = &scheduler_server.state.config;
let scheduler_grpc_server =
SchedulerGrpcServer::new(scheduler_server.clone());
SchedulerGrpcServer::new(scheduler_server.clone())
.max_decoding_message_size(
config.grpc_server_max_decoding_message_size as usize,
);

let keda_scaler = ExternalScalerServer::new(scheduler_server.clone());

Expand Down