Skip to content

Commit

Permalink
Add SchedulerConfig for the scheduler configurations, like event_loo…
Browse files Browse the repository at this point in the history
…p_buffer_size, finished_job_data_clean_up_interval_seconds, finished_job_state_clean_up_interval_seconds (#472)

* Move data cleanup caller explicitly to the event loop

* Refactor BallistaConfig by extracting common validation logic

* Create a separate mod for BallistaConfig

* Add SchedulerConfig for the scheduler configurations, like event_loop_buffer_size, finished_job_data_clean_up_interval_seconds, finished_job_state_clean_up_interval_seconds

* Rename the scheduler config advertise_endpoint to advertise_flight_result_route_endpoint and add it to the SchedulerConfig

* Allow redundant configurations in ValidConfiguration

* Don't need to be delayed for cleaning up shuffle data of failed job

* Update user-guide for the scheduler configurations

* Fix doc config name format

* Change the SchedulerConfig to be an explicit struct

* Fix doc format

* Rename advertise-flight-result-route-endpoint to advertise-flight-sql-endpoint

Co-authored-by: yangzhong <[email protected]>
  • Loading branch information
yahoNanJing and kyotoYaho authored Nov 2, 2022
1 parent 64d4e9a commit 926605e
Show file tree
Hide file tree
Showing 12 changed files with 295 additions and 280 deletions.
14 changes: 13 additions & 1 deletion ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ name = "version"
doc = "Print version of this executable"

[[param]]
name = "advertise_endpoint"
name = "advertise_flight_sql_endpoint"
type = "String"
doc = "Route for proxying flight results via scheduler. Should be of the form 'IP:PORT'"

Expand Down Expand Up @@ -83,6 +83,18 @@ type = "u32"
default = "10000"
doc = "Event loop buffer size. Default: 10000"

[[param]]
name = "finished_job_data_clean_up_interval_seconds"
type = "u64"
default = "300"
doc = "Delayed interval for cleaning up finished job data. Default: 300"

[[param]]
name = "finished_job_state_clean_up_interval_seconds"
type = "u64"
default = "3600"
doc = "Delayed interval for cleaning up finished job state. Default: 3600"

[[param]]
name = "executor_slots_policy"
type = "ballista_scheduler::config::SlotsPolicy"
Expand Down
71 changes: 71 additions & 0 deletions ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,80 @@

//! Ballista scheduler specific configuration
use ballista_core::config::TaskSchedulingPolicy;
use clap::ArgEnum;
use std::fmt;

/// Configurations for the ballista scheduler of scheduling jobs and tasks
#[derive(Debug, Clone)]
pub struct SchedulerConfig {
/// The task scheduling policy for the scheduler
pub scheduling_policy: TaskSchedulingPolicy,
/// The event loop buffer size. for a system of high throughput, a larger value like 1000000 is recommended
pub event_loop_buffer_size: u32,
/// The executor slots policy for the scheduler. For a cluster with single scheduler, round-robin-local is recommended
pub executor_slots_policy: SlotsPolicy,
/// The delayed interval for cleaning up finished job data, mainly the shuffle data, 0 means the cleaning up is disabled
pub finished_job_data_clean_up_interval_seconds: u64,
/// The delayed interval for cleaning up finished job state stored in the backend, 0 means the cleaning up is disabled.
pub finished_job_state_clean_up_interval_seconds: u64,
/// The route endpoint for proxying flight sql results via scheduler
pub advertise_flight_sql_endpoint: Option<String>,
}

impl Default for SchedulerConfig {
fn default() -> Self {
Self {
scheduling_policy: TaskSchedulingPolicy::PullStaged,
event_loop_buffer_size: 10000,
executor_slots_policy: SlotsPolicy::Bias,
finished_job_data_clean_up_interval_seconds: 300,
finished_job_state_clean_up_interval_seconds: 3600,
advertise_flight_sql_endpoint: None,
}
}
}

impl SchedulerConfig {
pub fn is_push_staged_scheduling(&self) -> bool {
matches!(self.scheduling_policy, TaskSchedulingPolicy::PushStaged)
}

pub fn with_scheduler_policy(mut self, policy: TaskSchedulingPolicy) -> Self {
self.scheduling_policy = policy;
self
}

pub fn with_event_loop_buffer_size(mut self, buffer_size: u32) -> Self {
self.event_loop_buffer_size = buffer_size;
self
}

pub fn with_finished_job_data_clean_up_interval_seconds(
mut self,
interval_seconds: u64,
) -> Self {
self.finished_job_data_clean_up_interval_seconds = interval_seconds;
self
}

pub fn with_finished_job_state_clean_up_interval_seconds(
mut self,
interval_seconds: u64,
) -> Self {
self.finished_job_state_clean_up_interval_seconds = interval_seconds;
self
}

pub fn with_advertise_flight_sql_endpoint(
mut self,
endpoint: Option<String>,
) -> Self {
self.advertise_flight_sql_endpoint = endpoint;
self
}
}

// an enum used to configure the executor slots policy
// needs to be visible to code generated by configure_me
#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)]
Expand Down
8 changes: 6 additions & 2 deletions ballista/scheduler/src/flight_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::common::batch_byte_size;
use datafusion::prelude::SessionContext;
use datafusion_proto::protobuf::LogicalPlanNode;
use itertools::Itertools;
use prost::Message;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time::sleep;
Expand Down Expand Up @@ -243,7 +242,12 @@ impl FlightSqlServiceImpl {
))?
};

let (host, port) = match &self.server.advertise_endpoint {
let (host, port) = match &self
.server
.state
.config
.advertise_flight_sql_endpoint
{
Some(endpoint) => {
let advertise_endpoint_vec: Vec<&str> = endpoint.split(":").collect();
match advertise_endpoint_vec.as_slice() {
Expand Down
61 changes: 22 additions & 39 deletions ballista/scheduler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use datafusion_proto::protobuf::LogicalPlanNode;
use ballista_scheduler::scheduler_server::SchedulerServer;
use ballista_scheduler::state::backend::{StateBackend, StateBackendClient};

use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::serde::BallistaCodec;

use log::info;
Expand All @@ -62,7 +61,8 @@ mod config {
}

use ballista_core::utils::create_grpc_server;
use ballista_scheduler::config::SlotsPolicy;

use ballista_scheduler::config::SchedulerConfig;
#[cfg(feature = "flight-sql")]
use ballista_scheduler::flight_sql::FlightSqlServiceImpl;
use config::prelude::*;
Expand All @@ -72,10 +72,7 @@ async fn start_server(
scheduler_name: String,
config_backend: Arc<dyn StateBackendClient>,
addr: SocketAddr,
scheduling_policy: TaskSchedulingPolicy,
slots_policy: SlotsPolicy,
event_loop_buffer_size: usize,
advertise_endpoint: Option<String>,
config: SchedulerConfig,
) -> Result<()> {
info!(
"Ballista v{} Scheduler listening on {:?}",
Expand All @@ -84,28 +81,16 @@ async fn start_server(
// Should only call SchedulerServer::new() once in the process
info!(
"Starting Scheduler grpc server with task scheduling policy of {:?}",
scheduling_policy
config.scheduling_policy
);

let mut scheduler_server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
match scheduling_policy {
TaskSchedulingPolicy::PushStaged => SchedulerServer::new_with_policy(
scheduler_name,
config_backend.clone(),
scheduling_policy,
slots_policy,
BallistaCodec::default(),
event_loop_buffer_size,
advertise_endpoint,
),
_ => SchedulerServer::new(
scheduler_name,
config_backend.clone(),
BallistaCodec::default(),
event_loop_buffer_size,
advertise_endpoint,
),
};
SchedulerServer::new(
scheduler_name,
config_backend.clone(),
BallistaCodec::default(),
config,
);

scheduler_server.init().await?;

Expand Down Expand Up @@ -207,7 +192,7 @@ async fn main() -> Result<()> {
let addr = format!("{}:{}", bind_host, port);
let addr = addr.parse()?;

let client: Arc<dyn StateBackendClient> = match opt.config_backend {
let config_backend: Arc<dyn StateBackendClient> = match opt.config_backend {
#[cfg(not(any(feature = "sled", feature = "etcd")))]
_ => std::compile_error!(
"To build the scheduler enable at least one config backend feature (`etcd` or `sled`)"
Expand Down Expand Up @@ -248,18 +233,16 @@ async fn main() -> Result<()> {
}
};

let scheduling_policy: TaskSchedulingPolicy = opt.scheduler_policy;
let slots_policy: SlotsPolicy = opt.executor_slots_policy;
let event_loop_buffer_size = opt.event_loop_buffer_size as usize;
start_server(
scheduler_name,
client,
addr,
scheduling_policy,
slots_policy,
event_loop_buffer_size,
opt.advertise_endpoint,
)
.await?;
let config = SchedulerConfig {
scheduling_policy: opt.scheduler_policy,
event_loop_buffer_size: opt.event_loop_buffer_size,
executor_slots_policy: opt.executor_slots_policy,
finished_job_data_clean_up_interval_seconds: opt
.finished_job_data_clean_up_interval_seconds,
finished_job_state_clean_up_interval_seconds: opt
.finished_job_state_clean_up_interval_seconds,
advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint,
};
start_server(scheduler_name, config_backend, addr, config).await?;
Ok(())
}
16 changes: 7 additions & 9 deletions ballista/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy, BALLISTA_JOB_NAME};
use ballista_core::config::{BallistaConfig, BALLISTA_JOB_NAME};
use ballista_core::serde::protobuf::execute_query_params::{OptionalSessionId, Query};
use std::convert::TryInto;

Expand Down Expand Up @@ -58,7 +58,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
&self,
request: Request<PollWorkParams>,
) -> Result<Response<PollWorkResult>, Status> {
if let TaskSchedulingPolicy::PushStaged = self.policy {
if self.state.config.is_push_staged_scheduling() {
error!("Poll work interface is not supported for push-based task scheduling");
return Err(tonic::Status::failed_precondition(
"Bad request because poll work is not supported for push-based task scheduling",
Expand Down Expand Up @@ -207,7 +207,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc

// If we are using push-based scheduling then reserve this executors slots and send
// them for scheduling tasks.
if matches!(self.policy, TaskSchedulingPolicy::PushStaged) {
if self.state.config.is_push_staged_scheduling() {
self.offer_reservation(reservations).await?;
}

Expand Down Expand Up @@ -553,6 +553,7 @@ mod test {
use datafusion_proto::protobuf::LogicalPlanNode;
use tonic::Request;

use crate::config::SchedulerConfig;
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::{
executor_registration::OptionalHost, executor_status, ExecutorRegistration,
Expand All @@ -576,8 +577,7 @@ mod test {
"localhost:50050".to_owned(),
state_storage.clone(),
BallistaCodec::default(),
10000,
None,
SchedulerConfig::default(),
);
scheduler.init().await?;
let exec_meta = ExecutorRegistration {
Expand Down Expand Up @@ -663,8 +663,7 @@ mod test {
"localhost:50050".to_owned(),
state_storage.clone(),
BallistaCodec::default(),
10000,
None,
SchedulerConfig::default(),
);
scheduler.init().await?;

Expand Down Expand Up @@ -744,8 +743,7 @@ mod test {
"localhost:50050".to_owned(),
state_storage.clone(),
BallistaCodec::default(),
10000,
None,
SchedulerConfig::default(),
);
scheduler.init().await?;

Expand Down
Loading

0 comments on commit 926605e

Please sign in to comment.