From 27188c26d57e1870df9f05c1ff724dd8b982689a Mon Sep 17 00:00:00 2001 From: yangzhong Date: Mon, 27 Mar 2023 18:51:56 +0800 Subject: [PATCH 1/6] Remove redundant fields in ExecutorManager --- ballista/scheduler/src/cluster/kv.rs | 25 +- ballista/scheduler/src/cluster/memory.rs | 27 +- .../scheduler/src/scheduler_server/grpc.rs | 15 +- .../scheduler/src/scheduler_server/mod.rs | 11 +- .../scheduler/src/state/executor_manager.rs | 281 +++--------------- 5 files changed, 84 insertions(+), 275 deletions(-) diff --git a/ballista/scheduler/src/cluster/kv.rs b/ballista/scheduler/src/cluster/kv.rs index e28699778..61ab47acb 100644 --- a/ballista/scheduler/src/cluster/kv.rs +++ b/ballista/scheduler/src/cluster/kv.rs @@ -20,7 +20,7 @@ use crate::cluster::{ reserve_slots_bias, reserve_slots_round_robin, ClusterState, ExecutorHeartbeatStream, JobState, JobStateEvent, JobStateEventStream, JobStatus, TaskDistribution, }; -use crate::scheduler_server::SessionBuilder; +use crate::scheduler_server::{timestamp_secs, SessionBuilder}; use crate::state::execution_graph::ExecutionGraph; use crate::state::executor_manager::ExecutorReservation; use crate::state::session_manager::create_datafusion_context; @@ -47,7 +47,6 @@ use prost::Message; use std::collections::{HashMap, HashSet}; use std::future::Future; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; /// State implementation based on underlying `KeyValueStore` pub struct KeyValueState< @@ -240,22 +239,17 @@ impl ) -> Result> { let executor_id = metadata.id.clone(); - let current_ts = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map_err(|e| { - BallistaError::Internal(format!("Error getting current timestamp: {e:?}")) - })? - .as_secs(); - //TODO this should be in a transaction // Now that we know we can connect, save the metadata and slots self.save_executor_metadata(metadata).await?; self.save_executor_heartbeat(ExecutorHeartbeat { executor_id: executor_id.clone(), - timestamp: current_ts, + timestamp: timestamp_secs(), metrics: vec![], status: Some(protobuf::ExecutorStatus { - status: Some(protobuf::executor_status::Status::Active("".to_string())), + status: Some( + protobuf::executor_status::Status::Active(String::default()), + ), }), }) .await?; @@ -364,16 +358,9 @@ impl } async fn remove_executor(&self, executor_id: &str) -> Result<()> { - let current_ts = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map_err(|e| { - BallistaError::Internal(format!("Error getting current timestamp: {e:?}")) - })? - .as_secs(); - let value = ExecutorHeartbeat { executor_id: executor_id.to_owned(), - timestamp: current_ts, + timestamp: timestamp_secs(), metrics: vec![], status: Some(protobuf::ExecutorStatus { status: Some(protobuf::executor_status::Status::Dead("".to_string())), diff --git a/ballista/scheduler/src/cluster/memory.rs b/ballista/scheduler/src/cluster/memory.rs index 95c6b7a2d..fc9a92a08 100644 --- a/ballista/scheduler/src/cluster/memory.rs +++ b/ballista/scheduler/src/cluster/memory.rs @@ -163,14 +163,18 @@ impl ClusterState for InMemoryClusterState { mut spec: ExecutorData, reserve: bool, ) -> Result> { - let heartbeat = ExecutorHeartbeat { - executor_id: metadata.id.clone(), + let executor_id = metadata.id.clone(); + + self.save_executor_metadata(metadata).await?; + self.save_executor_heartbeat(ExecutorHeartbeat { + executor_id: executor_id.clone(), timestamp: timestamp_secs(), metrics: vec![], status: Some(ExecutorStatus { status: Some(executor_status::Status::Active(String::default())), }), - }; + }) + .await?; let mut guard = self.task_slots.lock(); @@ -178,38 +182,29 @@ impl ClusterState for InMemoryClusterState { if let Some((idx, _)) = guard .task_slots .iter() - .find_position(|slots| slots.executor_id == metadata.id) + .find_position(|slots| slots.executor_id == executor_id) { guard.task_slots.swap_remove(idx); } if reserve { let slots = std::mem::take(&mut spec.available_task_slots) as usize; - let reservations = (0..slots) - .map(|_| ExecutorReservation::new_free(metadata.id.clone())) + .map(|_| ExecutorReservation::new_free(executor_id.clone())) .collect(); - self.executors.insert(metadata.id.clone(), metadata.clone()); - guard.task_slots.push(AvailableTaskSlots { - executor_id: metadata.id, + executor_id, slots: 0, }); - self.heartbeat_sender.send(&heartbeat); - Ok(reservations) } else { - self.executors.insert(metadata.id.clone(), metadata.clone()); - guard.task_slots.push(AvailableTaskSlots { - executor_id: metadata.id, + executor_id, slots: spec.available_task_slots, }); - self.heartbeat_sender.send(&heartbeat); - Ok(vec![]) } } diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index de07a06d5..1054279b9 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -773,12 +773,14 @@ mod test { let active_executors = state .executor_manager - .get_alive_executors_within_one_minute(); + .get_alive_executors_within_one_minute() + .await?; assert!(active_executors.is_empty()); let expired_executors = state .executor_manager - .get_expired_executors(scheduler.executor_termination_grace_period); + .get_expired_executors(scheduler.executor_termination_grace_period) + .await?; assert!(expired_executors.is_empty()); Ok(()) @@ -902,12 +904,14 @@ mod test { let active_executors = state .executor_manager - .get_alive_executors_within_one_minute(); + .get_alive_executors_within_one_minute() + .await?; assert_eq!(active_executors.len(), 1); let expired_executors = state .executor_manager - .get_expired_executors(scheduler.executor_termination_grace_period); + .get_expired_executors(scheduler.executor_termination_grace_period) + .await?; assert!(expired_executors.is_empty()); // simulate the heartbeat timeout @@ -919,7 +923,8 @@ mod test { let active_executors = state .executor_manager - .get_alive_executors_within_one_minute(); + .get_alive_executors_within_one_minute() + .await?; assert!(active_executors.is_empty()); Ok(()) } diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index 69a8c1b48..91086184c 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -223,9 +223,16 @@ impl SchedulerServer, - // executor_id -> ExecutorMetadata map - executor_metadata: Arc>, - // executor_id -> ExecutorHeartbeat map - executors_heartbeat: Arc>, - // executor_id -> ExecutorData map, only used when the slots policy is of local - executor_data: Arc>>, // dead executor sets: dead_executors: Arc>, clients: ExecutorClients, @@ -119,44 +111,30 @@ impl ExecutorManager { }; Self { - slots_policy, task_distribution, cluster_state, - executor_metadata: Arc::new(DashMap::new()), - executors_heartbeat: Arc::new(DashMap::new()), - executor_data: Arc::new(Mutex::new(HashMap::new())), dead_executors: Arc::new(DashSet::new()), clients: Default::default(), } } - /// Initialize a background process that will listen for executor heartbeats and update the in-memory cache - /// of executor heartbeats + /// Initialize a background process that will listen for executor heartbeats pub async fn init(&self) -> Result<()> { - self.init_active_executor_heartbeats().await?; - let mut heartbeat_stream = self.cluster_state.executor_heartbeat_stream().await?; info!("Initializing heartbeat listener"); - let heartbeats = self.executors_heartbeat.clone(); let dead_executors = self.dead_executors.clone(); tokio::task::spawn(async move { while let Some(heartbeat) = heartbeat_stream.next().await { let executor_id = heartbeat.executor_id.clone(); - match heartbeat + if let Some(executor_status::Status::Dead(_)) = heartbeat .status .as_ref() .and_then(|status| status.status.as_ref()) { - Some(executor_status::Status::Dead(_)) => { - heartbeats.remove(&executor_id); - dead_executors.insert(executor_id); - } - _ => { - heartbeats.insert(executor_id, heartbeat); - } + dead_executors.insert(executor_id); } } }); @@ -168,88 +146,13 @@ impl ExecutorManager { /// for scheduling. /// This operation is atomic, so if this method return an Err, no slots have been reserved. pub async fn reserve_slots(&self, n: u32) -> Result> { - if self.slots_policy.is_local() { - self.reserve_slots_local(n).await - } else { - let alive_executors = self.get_alive_executors_within_one_minute(); - - debug!("Alive executors: {alive_executors:?}"); - - self.cluster_state - .reserve_slots(n, self.task_distribution, Some(alive_executors)) - .await - } - } - - async fn reserve_slots_local(&self, n: u32) -> Result> { - debug!("Attempting to reserve {} executor slots", n); - - let alive_executors = self.get_alive_executors_within_one_minute(); + let alive_executors = self.get_alive_executors_within_one_minute().await?; - match self.slots_policy { - SlotsPolicy::RoundRobinLocal => { - self.reserve_slots_local_round_robin(n, alive_executors) - .await - } - _ => Err(BallistaError::General(format!( - "Reservation policy {:?} is not supported", - self.slots_policy - ))), - } - } - - /// Create ExecutorReservation in a round robin way to evenly assign tasks to executors - async fn reserve_slots_local_round_robin( - &self, - mut n: u32, - alive_executors: HashSet, - ) -> Result> { - let mut executor_data = self.executor_data.lock(); - - let mut available_executor_data: Vec<&mut ExecutorData> = executor_data - .values_mut() - .filter_map(|data| { - (data.available_task_slots > 0 - && alive_executors.contains(&data.executor_id)) - .then_some(data) - }) - .collect(); - available_executor_data - .sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots)); - - let mut reservations: Vec = vec![]; - - // Exclusive - let mut last_updated_idx = 0usize; - loop { - let n_before = n; - for (idx, data) in available_executor_data.iter_mut().enumerate() { - if n == 0 { - break; - } - - // Since the vector is sorted in descending order, - // if finding one executor has not enough slots, the following will have not enough, either - if data.available_task_slots == 0 { - break; - } + debug!("Alive executors: {alive_executors:?}"); - reservations - .push(ExecutorReservation::new_free(data.executor_id.clone())); - data.available_task_slots -= 1; - n -= 1; - - if idx >= last_updated_idx { - last_updated_idx = idx + 1; - } - } - - if n_before == n { - break; - } - } - - Ok(reservations) + self.cluster_state + .reserve_slots(n, self.task_distribution, Some(alive_executors)) + .await } /// Returned reserved task slots to the pool of available slots. This operation is atomic @@ -258,36 +161,7 @@ impl ExecutorManager { &self, reservations: Vec, ) -> Result<()> { - if self.slots_policy.is_local() { - self.cancel_reservations_local(reservations).await - } else { - self.cluster_state.cancel_reservations(reservations).await - } - } - - async fn cancel_reservations_local( - &self, - reservations: Vec, - ) -> Result<()> { - let mut executor_slots: HashMap = HashMap::new(); - for reservation in reservations { - if let Some(slots) = executor_slots.get_mut(&reservation.executor_id) { - *slots += 1; - } else { - executor_slots.insert(reservation.executor_id, 1); - } - } - - let mut executor_data = self.executor_data.lock(); - for (id, released_slots) in executor_slots.into_iter() { - if let Some(slots) = executor_data.get_mut(&id) { - slots.available_task_slots += released_slots; - } else { - warn!("ExecutorData for {} is not cached in memory", id); - } - } - - Ok(()) + self.cluster_state.cancel_reservations(reservations).await } /// Send rpc to Executors to cancel the running tasks @@ -362,7 +236,14 @@ impl ExecutorManager { /// Send rpc to Executors to clean up the job data async fn clean_up_job_data_inner(&self, job_id: String) { - let alive_executors = self.get_alive_executors_within_one_minute(); + let alive_executors = if let Ok(alive_executors) = + self.get_alive_executors_within_one_minute().await + { + alive_executors + } else { + warn!("Fail to get alive executors within one minute"); + HashSet::new() + }; for executor in alive_executors { let job_id_clone = job_id.to_owned(); if let Ok(mut client) = self.get_client(&executor).await { @@ -411,15 +292,13 @@ impl ExecutorManager { /// Get a list of all executors along with the timestamp of their last recorded heartbeat pub async fn get_executor_state(&self) -> Result> { - let heartbeat_timestamps: Vec<(String, u64)> = { - self.executors_heartbeat - .iter() - .map(|item| { - let (executor_id, heartbeat) = item.pair(); - (executor_id.clone(), heartbeat.timestamp) - }) - .collect() - }; + let heartbeat_timestamps: Vec<(String, u64)> = self + .cluster_state + .executor_heartbeats() + .await? + .into_iter() + .map(|(executor_id, heartbeat)| (executor_id, heartbeat.timestamp)) + .collect(); let mut state: Vec<(ExecutorMetadata, Duration)> = vec![]; for (executor_id, ts) in heartbeat_timestamps { @@ -437,12 +316,6 @@ impl ExecutorManager { &self, executor_id: &str, ) -> Result { - { - if let Some(cached) = self.executor_metadata.get(executor_id) { - return Ok(cached.clone()); - } - } - self.cluster_state.get_executor_metadata(executor_id).await } @@ -471,36 +344,11 @@ impl ExecutorManager { self.test_scheduler_connectivity(&metadata).await?; - let current_ts = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map_err(|e| { - BallistaError::Internal(format!("Error getting current timestamp: {e:?}")) - })? - .as_secs(); - - let initial_heartbeat = ExecutorHeartbeat { - executor_id: metadata.id.clone(), - timestamp: current_ts, - metrics: vec![], - status: Some(ExecutorStatus { - status: Some(executor_status::Status::Active(String::default())), - }), - }; - if !reserve { - if self.slots_policy.is_local() { - let mut executor_data = self.executor_data.lock(); - executor_data - .insert(specification.executor_id.clone(), specification.clone()); - } - self.cluster_state .register_executor(metadata, specification.clone(), reserve) .await?; - self.executors_heartbeat - .insert(initial_heartbeat.executor_id.clone(), initial_heartbeat); - Ok(vec![]) } else { let mut specification = specification; @@ -512,19 +360,10 @@ impl ExecutorManager { specification.available_task_slots = 0; - if self.slots_policy.is_local() { - let mut executor_data = self.executor_data.lock(); - executor_data - .insert(specification.executor_id.clone(), specification.clone()); - } - self.cluster_state .register_executor(metadata, specification, reserve) .await?; - self.executors_heartbeat - .insert(initial_heartbeat.executor_id.clone(), initial_heartbeat); - Ok(reservations) } } @@ -540,14 +379,6 @@ impl ExecutorManager { let executor_id = executor_id.to_owned(); - self.executors_heartbeat.remove(&executor_id); - - // Remove executor data cache for dead executors - { - let mut executor_data = self.executor_data.lock(); - executor_data.remove(&executor_id); - } - self.dead_executors.insert(executor_id); Ok(()) @@ -587,9 +418,6 @@ impl ExecutorManager { .save_executor_heartbeat(heartbeat.clone()) .await?; - self.executors_heartbeat - .insert(heartbeat.executor_id.clone(), heartbeat); - Ok(()) } @@ -597,33 +425,18 @@ impl ExecutorManager { self.dead_executors.contains(executor_id) } - /// Initialize the set of active executor heartbeats from storage - async fn init_active_executor_heartbeats(&self) -> Result<()> { - let heartbeats = self.cluster_state.executor_heartbeats().await?; - - for (executor_id, heartbeat) in heartbeats { - // let data: protobuf::ExecutorHeartbeat = decode_protobuf(&value)?; - if let Some(ExecutorStatus { - status: Some(executor_status::Status::Active(_)), - }) = heartbeat.status - { - self.executors_heartbeat.insert(executor_id, heartbeat); - } - } - Ok(()) - } - /// Retrieve the set of all executor IDs where the executor has been observed in the last /// `last_seen_ts_threshold` seconds. - pub(crate) fn get_alive_executors( + pub(crate) async fn get_alive_executors( &self, last_seen_ts_threshold: u64, - ) -> HashSet { - self.executors_heartbeat + ) -> Result> { + Ok(self + .cluster_state + .executor_heartbeats() + .await? .iter() - .filter_map(|pair| { - let (exec, heartbeat) = pair.pair(); - + .filter_map(|(exec, heartbeat)| { let active = matches!( heartbeat .status @@ -635,14 +448,14 @@ impl ExecutorManager { (active && live).then(|| exec.clone()) }) - .collect() + .collect()) } /// Return a list of expired executors - pub(crate) fn get_expired_executors( + pub(crate) async fn get_expired_executors( &self, termination_grace_period: u64, - ) -> Vec { + ) -> Result> { let now_epoch_ts = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("Time went backwards"); @@ -658,12 +471,12 @@ impl ExecutorManager { .unwrap_or_else(|| Duration::from_secs(0)) .as_secs(); - let expired_executors = self - .executors_heartbeat + Ok(self + .cluster_state + .executor_heartbeats() + .await? .iter() - .filter_map(|pair| { - let (_exec, heartbeat) = pair.pair(); - + .filter_map(|(_exec, heartbeat)| { let terminating = matches!( heartbeat .status @@ -680,11 +493,12 @@ impl ExecutorManager { ((terminating && grace_period_expired) || expired) .then(|| heartbeat.clone()) }) - .collect::>(); - expired_executors + .collect::>()) } - pub(crate) fn get_alive_executors_within_one_minute(&self) -> HashSet { + pub(crate) async fn get_alive_executors_within_one_minute( + &self, + ) -> Result> { let now_epoch_ts = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("Time went backwards"); @@ -692,6 +506,7 @@ impl ExecutorManager { .checked_sub(Duration::from_secs(60)) .unwrap_or_else(|| Duration::from_secs(0)); self.get_alive_executors(last_seen_threshold.as_secs()) + .await } } From 37eb7028e9ed9d7d70d0beac09bbb79537afa222 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Tue, 28 Mar 2023 15:37:16 +0800 Subject: [PATCH 2/6] Remove RoundRobinLocal slot policy --- ballista/scheduler/scheduler_config_spec.toml | 2 +- ballista/scheduler/src/config.rs | 7 ------- ballista/scheduler/src/state/executor_manager.rs | 9 +-------- 3 files changed, 2 insertions(+), 16 deletions(-) diff --git a/ballista/scheduler/scheduler_config_spec.toml b/ballista/scheduler/scheduler_config_spec.toml index 93ffe4c13..d0faaeea2 100644 --- a/ballista/scheduler/scheduler_config_spec.toml +++ b/ballista/scheduler/scheduler_config_spec.toml @@ -98,7 +98,7 @@ doc = "Delayed interval for cleaning up finished job state. Default: 3600" [[param]] name = "executor_slots_policy" type = "ballista_scheduler::config::SlotsPolicy" -doc = "The executor slots policy for the scheduler, possible values: bias, round-robin, round-robin-local. Default: bias" +doc = "The executor slots policy for the scheduler, possible values: bias, round-robin. Default: bias" default = "ballista_scheduler::config::SlotsPolicy::Bias" [[param]] diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs index 087e38674..ecc50bd2a 100644 --- a/ballista/scheduler/src/config.rs +++ b/ballista/scheduler/src/config.rs @@ -167,13 +167,6 @@ pub enum ClusterStorageConfig { pub enum SlotsPolicy { Bias, RoundRobin, - RoundRobinLocal, -} - -impl SlotsPolicy { - pub fn is_local(&self) -> bool { - matches!(self, SlotsPolicy::RoundRobinLocal) - } } impl std::str::FromStr for SlotsPolicy { diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs index 0f33646a9..ae0db2583 100644 --- a/ballista/scheduler/src/state/executor_manager.rs +++ b/ballista/scheduler/src/state/executor_manager.rs @@ -105,9 +105,7 @@ impl ExecutorManager { ) -> Self { let task_distribution = match slots_policy { SlotsPolicy::Bias => TaskDistribution::Bias, - SlotsPolicy::RoundRobin | SlotsPolicy::RoundRobinLocal => { - TaskDistribution::RoundRobin - } + SlotsPolicy::RoundRobin => TaskDistribution::RoundRobin, }; Self { @@ -529,7 +527,6 @@ mod test { async fn test_reserve_and_cancel() -> Result<()> { test_reserve_and_cancel_inner(SlotsPolicy::Bias).await?; test_reserve_and_cancel_inner(SlotsPolicy::RoundRobin).await?; - test_reserve_and_cancel_inner(SlotsPolicy::RoundRobinLocal).await?; Ok(()) } @@ -576,7 +573,6 @@ mod test { async fn test_reserve_partial() -> Result<()> { test_reserve_partial_inner(SlotsPolicy::Bias).await?; test_reserve_partial_inner(SlotsPolicy::RoundRobin).await?; - test_reserve_partial_inner(SlotsPolicy::RoundRobinLocal).await?; Ok(()) } @@ -627,7 +623,6 @@ mod test { async fn test_reserve_concurrent() -> Result<()> { test_reserve_concurrent_inner(SlotsPolicy::Bias).await?; test_reserve_concurrent_inner(SlotsPolicy::RoundRobin).await?; - test_reserve_concurrent_inner(SlotsPolicy::RoundRobinLocal).await?; Ok(()) } @@ -677,7 +672,6 @@ mod test { async fn test_register_reserve() -> Result<()> { test_register_reserve_inner(SlotsPolicy::Bias).await?; test_register_reserve_inner(SlotsPolicy::RoundRobin).await?; - test_register_reserve_inner(SlotsPolicy::RoundRobinLocal).await?; Ok(()) } @@ -710,7 +704,6 @@ mod test { async fn test_ignore_fenced_executors() -> Result<()> { test_ignore_fenced_executors_inner(SlotsPolicy::Bias).await?; test_ignore_fenced_executors_inner(SlotsPolicy::RoundRobin).await?; - test_ignore_fenced_executors_inner(SlotsPolicy::RoundRobinLocal).await?; Ok(()) } From ee3a914cc6965cf04049ec808e00531977668c85 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Tue, 28 Mar 2023 15:56:52 +0800 Subject: [PATCH 3/6] Rename SlotsPolicy to TaskDistribution --- ballista/scheduler/scheduler_config_spec.toml | 8 +-- ballista/scheduler/src/bin/main.rs | 2 +- ballista/scheduler/src/cluster/mod.rs | 13 +--- ballista/scheduler/src/config.rs | 27 +++++--- .../scheduler/src/state/executor_manager.rs | 67 ++++++++++--------- ballista/scheduler/src/state/mod.rs | 4 +- 6 files changed, 59 insertions(+), 62 deletions(-) diff --git a/ballista/scheduler/scheduler_config_spec.toml b/ballista/scheduler/scheduler_config_spec.toml index d0faaeea2..2e7d0d65f 100644 --- a/ballista/scheduler/scheduler_config_spec.toml +++ b/ballista/scheduler/scheduler_config_spec.toml @@ -96,10 +96,10 @@ default = "3600" doc = "Delayed interval for cleaning up finished job state. Default: 3600" [[param]] -name = "executor_slots_policy" -type = "ballista_scheduler::config::SlotsPolicy" -doc = "The executor slots policy for the scheduler, possible values: bias, round-robin. Default: bias" -default = "ballista_scheduler::config::SlotsPolicy::Bias" +name = "task_distribution" +type = "ballista_scheduler::config::TaskDistribution" +doc = "The policy of distributing tasks to available executor slots, possible values: bias, round-robin. Default: bias" +default = "ballista_scheduler::config::TaskDistribution::Bias" [[param]] name = "plugin_dir" diff --git a/ballista/scheduler/src/bin/main.rs b/ballista/scheduler/src/bin/main.rs index 93904901e..7511f4d35 100644 --- a/ballista/scheduler/src/bin/main.rs +++ b/ballista/scheduler/src/bin/main.rs @@ -109,7 +109,7 @@ async fn main() -> Result<()> { bind_port: opt.bind_port, scheduling_policy: opt.scheduler_policy, event_loop_buffer_size: opt.event_loop_buffer_size, - executor_slots_policy: opt.executor_slots_policy, + task_distribution: opt.task_distribution, finished_job_data_clean_up_interval_seconds: opt .finished_job_data_clean_up_interval_seconds, finished_job_state_clean_up_interval_seconds: opt diff --git a/ballista/scheduler/src/cluster/mod.rs b/ballista/scheduler/src/cluster/mod.rs index 35a5052a8..c3f2ec22d 100644 --- a/ballista/scheduler/src/cluster/mod.rs +++ b/ballista/scheduler/src/cluster/mod.rs @@ -29,7 +29,7 @@ use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState}; use crate::cluster::storage::etcd::EtcdClient; use crate::cluster::storage::sled::SledClient; use crate::cluster::storage::KeyValueStore; -use crate::config::{ClusterStorageConfig, SchedulerConfig}; +use crate::config::{ClusterStorageConfig, SchedulerConfig, TaskDistribution}; use crate::scheduler_server::SessionBuilder; use crate::state::execution_graph::ExecutionGraph; use crate::state::executor_manager::ExecutorReservation; @@ -195,17 +195,6 @@ impl BallistaCluster { /// by any schedulers with a shared `ClusterState` pub type ExecutorHeartbeatStream = Pin + Send>>; -/// Method of distributing tasks to available executor slots -#[derive(Debug, Clone, Copy)] -pub enum TaskDistribution { - /// Eagerly assign tasks to executor slots. This will assign as many task slots per executor - /// as are currently available - Bias, - /// Distributed tasks evenely across executors. This will try and iterate through available executors - /// and assign one task to each executor until all tasks are assigned. - RoundRobin, -} - /// A trait that contains the necessary method to maintain a globally consistent view of cluster resources #[tonic::async_trait] pub trait ClusterState: Send + Sync + 'static { diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs index ecc50bd2a..7a2518c31 100644 --- a/ballista/scheduler/src/config.rs +++ b/ballista/scheduler/src/config.rs @@ -26,7 +26,7 @@ use std::fmt; #[derive(Debug, Clone)] pub struct SchedulerConfig { /// Namespace of this scheduler. Schedulers using the same cluster storage and namespace - /// will share gloabl cluster state. + /// will share global cluster state. pub namespace: String, /// The external hostname of the scheduler pub external_host: String, @@ -36,8 +36,8 @@ pub struct SchedulerConfig { pub scheduling_policy: TaskSchedulingPolicy, /// The event loop buffer size. for a system of high throughput, a larger value like 1000000 is recommended pub event_loop_buffer_size: u32, - /// The executor slots policy for the scheduler. For a cluster with single scheduler, round-robin-local is recommended - pub executor_slots_policy: SlotsPolicy, + /// Policy of distributing tasks to available executor slots. For a cluster with single scheduler, round-robin is recommended + pub task_distribution: TaskDistribution, /// The delayed interval for cleaning up finished job data, mainly the shuffle data, 0 means the cleaning up is disabled pub finished_job_data_clean_up_interval_seconds: u64, /// The delayed interval for cleaning up finished job state stored in the backend, 0 means the cleaning up is disabled. @@ -62,7 +62,7 @@ impl Default for SchedulerConfig { bind_port: 50050, scheduling_policy: TaskSchedulingPolicy::PullStaged, event_loop_buffer_size: 10000, - executor_slots_policy: SlotsPolicy::Bias, + task_distribution: TaskDistribution::Bias, finished_job_data_clean_up_interval_seconds: 300, finished_job_state_clean_up_interval_seconds: 3600, advertise_flight_sql_endpoint: None, @@ -131,8 +131,8 @@ impl SchedulerConfig { self } - pub fn with_executor_slots_policy(mut self, policy: SlotsPolicy) -> Self { - self.executor_slots_policy = policy; + pub fn with_task_distribution(mut self, policy: TaskDistribution) -> Self { + self.task_distribution = policy; self } @@ -161,15 +161,20 @@ pub enum ClusterStorageConfig { Sled(Option), } -// an enum used to configure the executor slots policy -// needs to be visible to code generated by configure_me +/// Policy of distributing tasks to available executor slots +/// +/// It needs to be visible to code generated by configure_me #[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)] -pub enum SlotsPolicy { +pub enum TaskDistribution { + /// Eagerly assign tasks to executor slots. This will assign as many task slots per executor + /// as are currently available Bias, + /// Distributed tasks evenly across executors. This will try and iterate through available executors + /// and assign one task to each executor until all tasks are assigned. RoundRobin, } -impl std::str::FromStr for SlotsPolicy { +impl std::str::FromStr for TaskDistribution { type Err = String; fn from_str(s: &str) -> std::result::Result { @@ -177,7 +182,7 @@ impl std::str::FromStr for SlotsPolicy { } } -impl parse_arg::ParseArgFromStr for SlotsPolicy { +impl parse_arg::ParseArgFromStr for TaskDistribution { fn describe_type(mut writer: W) -> fmt::Result { write!(writer, "The executor slots policy for the scheduler") } diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs index ae0db2583..9bedd64bd 100644 --- a/ballista/scheduler/src/state/executor_manager.rs +++ b/ballista/scheduler/src/state/executor_manager.rs @@ -17,15 +17,13 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use crate::cluster::TaskDistribution; - #[cfg(not(test))] use ballista_core::error::BallistaError; use ballista_core::error::Result; use ballista_core::serde::protobuf; use crate::cluster::ClusterState; -use crate::config::SlotsPolicy; +use crate::config::TaskDistribution; use crate::state::execution_graph::RunningTaskInfo; use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient; @@ -101,13 +99,8 @@ pub struct ExecutorManager { impl ExecutorManager { pub(crate) fn new( cluster_state: Arc, - slots_policy: SlotsPolicy, + task_distribution: TaskDistribution, ) -> Self { - let task_distribution = match slots_policy { - SlotsPolicy::Bias => TaskDistribution::Bias, - SlotsPolicy::RoundRobin => TaskDistribution::RoundRobin, - }; - Self { task_distribution, cluster_state, @@ -511,7 +504,7 @@ impl ExecutorManager { #[cfg(test)] mod test { - use crate::config::SlotsPolicy; + use crate::config::TaskDistribution; use crate::scheduler_server::timestamp_secs; use crate::state::executor_manager::{ExecutorManager, ExecutorReservation}; @@ -525,17 +518,19 @@ mod test { #[tokio::test] async fn test_reserve_and_cancel() -> Result<()> { - test_reserve_and_cancel_inner(SlotsPolicy::Bias).await?; - test_reserve_and_cancel_inner(SlotsPolicy::RoundRobin).await?; + test_reserve_and_cancel_inner(TaskDistribution::Bias).await?; + test_reserve_and_cancel_inner(TaskDistribution::RoundRobin).await?; Ok(()) } - async fn test_reserve_and_cancel_inner(slots_policy: SlotsPolicy) -> Result<()> { + async fn test_reserve_and_cancel_inner( + task_distribution: TaskDistribution, + ) -> Result<()> { let cluster = test_cluster_context(); let executor_manager = - ExecutorManager::new(cluster.cluster_state(), slots_policy); + ExecutorManager::new(cluster.cluster_state(), task_distribution); let executors = test_executors(10, 4); @@ -551,7 +546,7 @@ mod test { assert_eq!( reservations.len(), 40, - "Expected 40 reservations for policy {slots_policy:?}" + "Expected 40 reservations for policy {task_distribution:?}" ); // Now cancel them @@ -563,7 +558,7 @@ mod test { assert_eq!( reservations.len(), 40, - "Expected 40 reservations for policy {slots_policy:?}" + "Expected 40 reservations for policy {task_distribution:?}" ); Ok(()) @@ -571,17 +566,19 @@ mod test { #[tokio::test] async fn test_reserve_partial() -> Result<()> { - test_reserve_partial_inner(SlotsPolicy::Bias).await?; - test_reserve_partial_inner(SlotsPolicy::RoundRobin).await?; + test_reserve_partial_inner(TaskDistribution::Bias).await?; + test_reserve_partial_inner(TaskDistribution::RoundRobin).await?; Ok(()) } - async fn test_reserve_partial_inner(slots_policy: SlotsPolicy) -> Result<()> { + async fn test_reserve_partial_inner( + task_distribution: TaskDistribution, + ) -> Result<()> { let cluster = test_cluster_context(); let executor_manager = - ExecutorManager::new(cluster.cluster_state(), slots_policy); + ExecutorManager::new(cluster.cluster_state(), task_distribution); let executors = test_executors(10, 4); @@ -621,13 +618,15 @@ mod test { #[tokio::test] async fn test_reserve_concurrent() -> Result<()> { - test_reserve_concurrent_inner(SlotsPolicy::Bias).await?; - test_reserve_concurrent_inner(SlotsPolicy::RoundRobin).await?; + test_reserve_concurrent_inner(TaskDistribution::Bias).await?; + test_reserve_concurrent_inner(TaskDistribution::RoundRobin).await?; Ok(()) } - async fn test_reserve_concurrent_inner(slots_policy: SlotsPolicy) -> Result<()> { + async fn test_reserve_concurrent_inner( + task_distribution: TaskDistribution, + ) -> Result<()> { let (sender, mut receiver) = tokio::sync::mpsc::channel::>>(1000); @@ -635,7 +634,7 @@ mod test { let cluster = test_cluster_context(); let executor_manager = - ExecutorManager::new(cluster.cluster_state(), slots_policy); + ExecutorManager::new(cluster.cluster_state(), task_distribution); for (executor_metadata, executor_data) in executors { executor_manager @@ -670,17 +669,19 @@ mod test { #[tokio::test] async fn test_register_reserve() -> Result<()> { - test_register_reserve_inner(SlotsPolicy::Bias).await?; - test_register_reserve_inner(SlotsPolicy::RoundRobin).await?; + test_register_reserve_inner(TaskDistribution::Bias).await?; + test_register_reserve_inner(TaskDistribution::RoundRobin).await?; Ok(()) } - async fn test_register_reserve_inner(slots_policy: SlotsPolicy) -> Result<()> { + async fn test_register_reserve_inner( + task_distribution: TaskDistribution, + ) -> Result<()> { let cluster = test_cluster_context(); let executor_manager = - ExecutorManager::new(cluster.cluster_state(), slots_policy); + ExecutorManager::new(cluster.cluster_state(), task_distribution); let executors = test_executors(10, 4); @@ -702,17 +703,19 @@ mod test { #[tokio::test] async fn test_ignore_fenced_executors() -> Result<()> { - test_ignore_fenced_executors_inner(SlotsPolicy::Bias).await?; - test_ignore_fenced_executors_inner(SlotsPolicy::RoundRobin).await?; + test_ignore_fenced_executors_inner(TaskDistribution::Bias).await?; + test_ignore_fenced_executors_inner(TaskDistribution::RoundRobin).await?; Ok(()) } - async fn test_ignore_fenced_executors_inner(slots_policy: SlotsPolicy) -> Result<()> { + async fn test_ignore_fenced_executors_inner( + task_distribution: TaskDistribution, + ) -> Result<()> { let cluster = test_cluster_context(); let executor_manager = - ExecutorManager::new(cluster.cluster_state(), slots_policy); + ExecutorManager::new(cluster.cluster_state(), task_distribution); // Setup two executors initially let executors = test_executors(2, 4); diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index 57ea0be6c..0c3b8bf0e 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -116,7 +116,7 @@ impl SchedulerState SchedulerState Date: Wed, 29 Mar 2023 16:46:12 +0800 Subject: [PATCH 4/6] Introduce executor heartbeat cache to KeyValueState --- ballista/scheduler/src/cluster/kv.rs | 139 ++++++++++++------ ballista/scheduler/src/cluster/memory.rs | 47 ++---- ballista/scheduler/src/cluster/mod.rs | 14 +- ballista/scheduler/src/cluster/test/mod.rs | 36 ++--- .../scheduler/src/scheduler_server/grpc.rs | 15 +- .../scheduler/src/scheduler_server/mod.rs | 11 +- .../scheduler/src/state/executor_manager.rs | 82 +++-------- 7 files changed, 165 insertions(+), 179 deletions(-) diff --git a/ballista/scheduler/src/cluster/kv.rs b/ballista/scheduler/src/cluster/kv.rs index 61ab47acb..5740970ef 100644 --- a/ballista/scheduler/src/cluster/kv.rs +++ b/ballista/scheduler/src/cluster/kv.rs @@ -42,7 +42,7 @@ use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode}; use futures::StreamExt; use itertools::Itertools; -use log::warn; +use log::{info, warn}; use prost::Message; use std::collections::{HashMap, HashSet}; use std::future::Future; @@ -56,6 +56,8 @@ pub struct KeyValueState< > { /// Underlying `KeyValueStore` store: S, + /// ExecutorHeartbeat cache, executor_id -> ExecutorHeartbeat + executor_heartbeats: Arc>, /// Codec used to serialize/deserialize execution plan codec: BallistaCodec, /// Name of current scheduler. Should be `{host}:{port}` @@ -78,18 +80,95 @@ impl ) -> Self { Self { store, + executor_heartbeats: Arc::new(DashMap::new()), scheduler: scheduler.into(), codec, queued_jobs: DashMap::new(), session_builder, } } + + /// Initialize the set of active executor heartbeats from storage + async fn init_active_executor_heartbeats(&self) -> Result<()> { + let heartbeats = self.store.scan(Keyspace::Heartbeats, None).await?; + + for (_, value) in heartbeats { + let data: ExecutorHeartbeat = decode_protobuf(&value)?; + if let Some(protobuf::ExecutorStatus { + status: Some(protobuf::executor_status::Status::Active(_)), + }) = &data.status + { + self.executor_heartbeats + .insert(data.executor_id.clone(), data); + } + } + + Ok(()) + } + + /// Return the stream of executor heartbeats observed by all schedulers in the cluster. + /// This can be aggregated to provide an eventually consistent view of all executors within the cluster + async fn executor_heartbeat_stream(&self) -> Result { + let events = self + .store + .watch(Keyspace::Heartbeats, String::default()) + .await?; + + Ok(events + .filter_map(|event| { + futures::future::ready(match event { + WatchEvent::Put(_, value) => { + if let Ok(heartbeat) = + decode_protobuf::(&value) + { + Some(heartbeat) + } else { + None + } + } + WatchEvent::Delete(_) => None, + }) + }) + .boxed()) + } } #[async_trait] impl ClusterState for KeyValueState { + /// Initialize a background process that will listen for executor heartbeats and update the in-memory cache + /// of executor heartbeats + async fn init(&self) -> Result<()> { + self.init_active_executor_heartbeats().await?; + + let mut heartbeat_stream = self.executor_heartbeat_stream().await?; + + info!("Initializing heartbeat listener"); + + let heartbeats = self.executor_heartbeats.clone(); + tokio::task::spawn(async move { + while let Some(heartbeat) = heartbeat_stream.next().await { + let executor_id = heartbeat.executor_id.clone(); + + match heartbeat + .status + .as_ref() + .and_then(|status| status.status.as_ref()) + { + Some(protobuf::executor_status::Status::Dead(_)) => { + heartbeats.remove(&executor_id); + } + _ => { + heartbeats.insert(executor_id, heartbeat); + } + } + } + }); + + Ok(()) + } + async fn reserve_slots( &self, num_slots: u32, @@ -353,8 +432,14 @@ impl async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) -> Result<()> { let executor_id = heartbeat.executor_id.clone(); self.store - .put(Keyspace::Heartbeats, executor_id, heartbeat.encode_to_vec()) - .await + .put( + Keyspace::Heartbeats, + executor_id.clone(), + heartbeat.clone().encode_to_vec(), + ) + .await?; + self.executor_heartbeats.insert(executor_id, heartbeat); + Ok(()) } async fn remove_executor(&self, executor_id: &str) -> Result<()> { @@ -371,52 +456,24 @@ impl self.store .put(Keyspace::Heartbeats, executor_id.to_owned(), value) .await?; + self.executor_heartbeats.remove(executor_id); // TODO Check the Executor reservation logic for push-based scheduling Ok(()) } - async fn executor_heartbeat_stream(&self) -> Result { - let events = self - .store - .watch(Keyspace::Heartbeats, String::default()) - .await?; - - Ok(events - .filter_map(|event| { - futures::future::ready(match event { - WatchEvent::Put(_, value) => { - if let Ok(heartbeat) = - decode_protobuf::(&value) - { - Some(heartbeat) - } else { - None - } - } - WatchEvent::Delete(_) => None, - }) - }) - .boxed()) + fn executor_heartbeats(&self) -> HashMap { + self.executor_heartbeats + .iter() + .map(|r| (r.key().clone(), r.value().clone())) + .collect() } - async fn executor_heartbeats(&self) -> Result> { - let heartbeats = self.store.scan(Keyspace::Heartbeats, None).await?; - - let mut heartbeat_map = HashMap::with_capacity(heartbeats.len()); - - for (_, value) in heartbeats { - let data: ExecutorHeartbeat = decode_protobuf(&value)?; - if let Some(protobuf::ExecutorStatus { - status: Some(protobuf::executor_status::Status::Active(_)), - }) = &data.status - { - heartbeat_map.insert(data.executor_id.clone(), data); - } - } - - Ok(heartbeat_map) + fn get_executor_heartbeat(&self, executor_id: &str) -> Option { + self.executor_heartbeats + .get(executor_id) + .map(|r| r.value().clone()) } } diff --git a/ballista/scheduler/src/cluster/memory.rs b/ballista/scheduler/src/cluster/memory.rs index fc9a92a08..6d7d7bc60 100644 --- a/ballista/scheduler/src/cluster/memory.rs +++ b/ballista/scheduler/src/cluster/memory.rs @@ -16,8 +16,8 @@ // under the License. use crate::cluster::{ - reserve_slots_bias, reserve_slots_round_robin, ClusterState, ExecutorHeartbeatStream, - JobState, JobStateEvent, JobStateEventStream, JobStatus, TaskDistribution, + reserve_slots_bias, reserve_slots_round_robin, ClusterState, JobState, JobStateEvent, + JobStateEventStream, JobStatus, TaskDistribution, }; use crate::state::execution_graph::ExecutionGraph; use crate::state::executor_manager::ExecutorReservation; @@ -53,9 +53,6 @@ pub struct InMemoryClusterState { executors: DashMap, /// Last heartbeat received for each executor heartbeats: DashMap, - /// Broadcast channel sender for heartbeats, If `None` there are not - /// subscribers - heartbeat_sender: ClusterEventSender, } #[async_trait] @@ -226,15 +223,13 @@ impl ClusterState for InMemoryClusterState { } async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) -> Result<()> { - if let Some(mut last) = self.heartbeats.get_mut(&heartbeat.executor_id) { - let _ = std::mem::replace(last.deref_mut(), heartbeat.clone()); + let executor_id = heartbeat.executor_id.clone(); + if let Some(mut last) = self.heartbeats.get_mut(&executor_id) { + let _ = std::mem::replace(last.deref_mut(), heartbeat); } else { - self.heartbeats - .insert(heartbeat.executor_id.clone(), heartbeat.clone()); + self.heartbeats.insert(executor_id, heartbeat); } - self.heartbeat_sender.send(&heartbeat); - Ok(()) } @@ -251,34 +246,20 @@ impl ClusterState for InMemoryClusterState { } } - if let Some(heartbeat) = self.heartbeats.get_mut(executor_id).as_deref_mut() { - let new_heartbeat = ExecutorHeartbeat { - executor_id: executor_id.to_string(), - timestamp: timestamp_secs(), - metrics: vec![], - status: Some(ExecutorStatus { - status: Some(executor_status::Status::Dead(String::default())), - }), - }; - - *heartbeat = new_heartbeat; - - self.heartbeat_sender.send(heartbeat); - } + self.heartbeats.remove(executor_id); Ok(()) } - async fn executor_heartbeat_stream(&self) -> Result { - Ok(Box::pin(self.heartbeat_sender.subscribe())) - } - - async fn executor_heartbeats(&self) -> Result> { - Ok(self - .heartbeats + fn executor_heartbeats(&self) -> HashMap { + self.heartbeats .iter() .map(|r| (r.key().clone(), r.value().clone())) - .collect()) + .collect() + } + + fn get_executor_heartbeat(&self, executor_id: &str) -> Option { + self.heartbeats.get(executor_id).map(|r| r.value().clone()) } } diff --git a/ballista/scheduler/src/cluster/mod.rs b/ballista/scheduler/src/cluster/mod.rs index c3f2ec22d..8bea6d77e 100644 --- a/ballista/scheduler/src/cluster/mod.rs +++ b/ballista/scheduler/src/cluster/mod.rs @@ -198,6 +198,11 @@ pub type ExecutorHeartbeatStream = Pin /// A trait that contains the necessary method to maintain a globally consistent view of cluster resources #[tonic::async_trait] pub trait ClusterState: Send + Sync + 'static { + /// Initialize when it's necessary, especially for state with backend storage + async fn init(&self) -> Result<()> { + Ok(()) + } + /// Reserve up to `num_slots` executor task slots. If not enough task slots are available, reserve /// as many as possible. /// @@ -250,12 +255,11 @@ pub trait ClusterState: Send + Sync + 'static { /// Remove the executor from the cluster async fn remove_executor(&self, executor_id: &str) -> Result<()>; - /// Return the stream of executor heartbeats observed by all schedulers in the cluster. - /// This can be aggregated to provide an eventually consistent view of all executors within the cluster - async fn executor_heartbeat_stream(&self) -> Result; - /// Return a map of the last seen heartbeat for all active executors - async fn executor_heartbeats(&self) -> Result>; + fn executor_heartbeats(&self) -> HashMap; + + /// Get executor heartbeat for the provided executor ID. Return None if the executor does not exist + fn get_executor_heartbeat(&self, executor_id: &str) -> Option; } /// Events related to the state of jobs. Implementations may or may not support all event types. diff --git a/ballista/scheduler/src/cluster/test/mod.rs b/ballista/scheduler/src/cluster/test/mod.rs index 4d4001c63..b9056bfd4 100644 --- a/ballista/scheduler/src/cluster/test/mod.rs +++ b/ballista/scheduler/src/cluster/test/mod.rs @@ -22,11 +22,10 @@ use crate::state::executor_manager::ExecutorReservation; use crate::test_utils::{await_condition, mock_completed_task, mock_executor}; use ballista_core::error::{BallistaError, Result}; use ballista_core::serde::protobuf::job_status::Status; -use ballista_core::serde::protobuf::{executor_status, ExecutorHeartbeat, JobStatus}; +use ballista_core::serde::protobuf::{executor_status, JobStatus}; use ballista_core::serde::scheduler::{ ExecutorData, ExecutorMetadata, ExecutorSpecification, }; -use dashmap::DashMap; use futures::StreamExt; use itertools::Itertools; use std::collections::HashSet; @@ -36,27 +35,14 @@ use tokio::sync::RwLock; pub struct ClusterStateTest { state: Arc, - received_heartbeats: Arc>, reservations: Vec, total_task_slots: u32, } impl ClusterStateTest { pub async fn new(state: S) -> Result { - let received_heartbeats = Arc::new(DashMap::new()); - - let mut heartbeat_stream = state.executor_heartbeat_stream().await?; - let received_heartbeat_clone = received_heartbeats.clone(); - - tokio::spawn(async move { - while let Some(heartbeat) = heartbeat_stream.next().await { - received_heartbeat_clone.insert(heartbeat.executor_id.clone(), heartbeat); - } - }); - Ok(Self { state: Arc::new(state), - received_heartbeats, reservations: vec![], total_task_slots: 0, }) @@ -115,17 +101,19 @@ impl ClusterStateTest { // Heratbeat stream is async so wait up to 500ms for it to show up await_condition(Duration::from_millis(50), 10, || { - let found_heartbeat = - self.received_heartbeats.get(executor_id).map(|heartbeat| { + let found_heartbeat = self.state.get_executor_heartbeat(executor_id).map_or( + false, + |heartbeat| { matches!( heartbeat.status, Some(ballista_core::serde::generated::ballista::ExecutorStatus { status: Some(executor_status::Status::Active(_)) }) ) - }); + }, + ); - futures::future::ready(Ok(found_heartbeat.unwrap_or_default())) + futures::future::ready(Ok(found_heartbeat)) }) .await?; @@ -135,17 +123,19 @@ impl ClusterStateTest { pub async fn assert_dead_executor(self, executor_id: &str) -> Result { // Heratbeat stream is async so wait up to 500ms for it to show up await_condition(Duration::from_millis(50), 10, || { - let found_heartbeat = - self.received_heartbeats.get(executor_id).map(|heartbeat| { + let found_heartbeat = self.state.get_executor_heartbeat(executor_id).map_or( + true, + |heartbeat| { matches!( heartbeat.status, Some(ballista_core::serde::generated::ballista::ExecutorStatus { status: Some(executor_status::Status::Dead(_)) }) ) - }); + }, + ); - futures::future::ready(Ok(found_heartbeat.unwrap_or_default())) + futures::future::ready(Ok(found_heartbeat)) }) .await?; diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index 1054279b9..de07a06d5 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -773,14 +773,12 @@ mod test { let active_executors = state .executor_manager - .get_alive_executors_within_one_minute() - .await?; + .get_alive_executors_within_one_minute(); assert!(active_executors.is_empty()); let expired_executors = state .executor_manager - .get_expired_executors(scheduler.executor_termination_grace_period) - .await?; + .get_expired_executors(scheduler.executor_termination_grace_period); assert!(expired_executors.is_empty()); Ok(()) @@ -904,14 +902,12 @@ mod test { let active_executors = state .executor_manager - .get_alive_executors_within_one_minute() - .await?; + .get_alive_executors_within_one_minute(); assert_eq!(active_executors.len(), 1); let expired_executors = state .executor_manager - .get_expired_executors(scheduler.executor_termination_grace_period) - .await?; + .get_expired_executors(scheduler.executor_termination_grace_period); assert!(expired_executors.is_empty()); // simulate the heartbeat timeout @@ -923,8 +919,7 @@ mod test { let active_executors = state .executor_manager - .get_alive_executors_within_one_minute() - .await?; + .get_alive_executors_within_one_minute(); assert!(active_executors.is_empty()); Ok(()) } diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index 91086184c..69a8c1b48 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -223,16 +223,9 @@ impl SchedulerServer, - // dead executor sets: - dead_executors: Arc>, clients: ExecutorClients, } @@ -104,31 +101,12 @@ impl ExecutorManager { Self { task_distribution, cluster_state, - dead_executors: Arc::new(DashSet::new()), clients: Default::default(), } } - /// Initialize a background process that will listen for executor heartbeats pub async fn init(&self) -> Result<()> { - let mut heartbeat_stream = self.cluster_state.executor_heartbeat_stream().await?; - - info!("Initializing heartbeat listener"); - - let dead_executors = self.dead_executors.clone(); - tokio::task::spawn(async move { - while let Some(heartbeat) = heartbeat_stream.next().await { - let executor_id = heartbeat.executor_id.clone(); - - if let Some(executor_status::Status::Dead(_)) = heartbeat - .status - .as_ref() - .and_then(|status| status.status.as_ref()) - { - dead_executors.insert(executor_id); - } - } - }); + self.cluster_state.init().await?; Ok(()) } @@ -137,7 +115,7 @@ impl ExecutorManager { /// for scheduling. /// This operation is atomic, so if this method return an Err, no slots have been reserved. pub async fn reserve_slots(&self, n: u32) -> Result> { - let alive_executors = self.get_alive_executors_within_one_minute().await?; + let alive_executors = self.get_alive_executors_within_one_minute(); debug!("Alive executors: {alive_executors:?}"); @@ -227,14 +205,7 @@ impl ExecutorManager { /// Send rpc to Executors to clean up the job data async fn clean_up_job_data_inner(&self, job_id: String) { - let alive_executors = if let Ok(alive_executors) = - self.get_alive_executors_within_one_minute().await - { - alive_executors - } else { - warn!("Fail to get alive executors within one minute"); - HashSet::new() - }; + let alive_executors = self.get_alive_executors_within_one_minute(); for executor in alive_executors { let job_id_clone = job_id.to_owned(); if let Ok(mut client) = self.get_client(&executor).await { @@ -286,7 +257,6 @@ impl ExecutorManager { let heartbeat_timestamps: Vec<(String, u64)> = self .cluster_state .executor_heartbeats() - .await? .into_iter() .map(|(executor_id, heartbeat)| (executor_id, heartbeat.timestamp)) .collect(); @@ -366,13 +336,7 @@ impl ExecutorManager { reason: Option, ) -> Result<()> { info!("Removing executor {}: {:?}", executor_id, reason); - self.cluster_state.remove_executor(executor_id).await?; - - let executor_id = executor_id.to_owned(); - - self.dead_executors.insert(executor_id); - - Ok(()) + self.cluster_state.remove_executor(executor_id).await } #[cfg(not(test))] @@ -413,19 +377,26 @@ impl ExecutorManager { } pub(crate) fn is_dead_executor(&self, executor_id: &str) -> bool { - self.dead_executors.contains(executor_id) + self.cluster_state + .get_executor_heartbeat(executor_id) + .map_or(true, |heartbeat| { + matches!( + heartbeat.status, + Some(ballista_core::serde::generated::ballista::ExecutorStatus { + status: Some(executor_status::Status::Dead(_)) + }) + ) + }) } /// Retrieve the set of all executor IDs where the executor has been observed in the last /// `last_seen_ts_threshold` seconds. - pub(crate) async fn get_alive_executors( + pub(crate) fn get_alive_executors( &self, last_seen_ts_threshold: u64, - ) -> Result> { - Ok(self - .cluster_state + ) -> HashSet { + self.cluster_state .executor_heartbeats() - .await? .iter() .filter_map(|(exec, heartbeat)| { let active = matches!( @@ -439,14 +410,14 @@ impl ExecutorManager { (active && live).then(|| exec.clone()) }) - .collect()) + .collect() } /// Return a list of expired executors - pub(crate) async fn get_expired_executors( + pub(crate) fn get_expired_executors( &self, termination_grace_period: u64, - ) -> Result> { + ) -> Vec { let now_epoch_ts = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("Time went backwards"); @@ -462,10 +433,8 @@ impl ExecutorManager { .unwrap_or_else(|| Duration::from_secs(0)) .as_secs(); - Ok(self - .cluster_state + self.cluster_state .executor_heartbeats() - .await? .iter() .filter_map(|(_exec, heartbeat)| { let terminating = matches!( @@ -484,12 +453,10 @@ impl ExecutorManager { ((terminating && grace_period_expired) || expired) .then(|| heartbeat.clone()) }) - .collect::>()) + .collect::>() } - pub(crate) async fn get_alive_executors_within_one_minute( - &self, - ) -> Result> { + pub(crate) fn get_alive_executors_within_one_minute(&self) -> HashSet { let now_epoch_ts = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("Time went backwards"); @@ -497,7 +464,6 @@ impl ExecutorManager { .checked_sub(Duration::from_secs(60)) .unwrap_or_else(|| Duration::from_secs(0)); self.get_alive_executors(last_seen_threshold.as_secs()) - .await } } From db4adc26fdf314eaa9a73ac89cbe48b1cbe64329 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Wed, 29 Mar 2023 18:41:16 +0800 Subject: [PATCH 5/6] Remove buggy executor check for pull-staged task scheduling --- .../scheduler/src/scheduler_server/grpc.rs | 45 +++---------------- .../scheduler/src/scheduler_server/mod.rs | 4 +- 2 files changed, 10 insertions(+), 39 deletions(-) diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index de07a06d5..dfef98094 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -22,12 +22,12 @@ use std::convert::TryInto; use ballista_core::serde::protobuf::executor_registration::OptionalHost; use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc; use ballista_core::serde::protobuf::{ - executor_status, CancelJobParams, CancelJobResult, CleanJobDataParams, - CleanJobDataResult, ExecuteQueryParams, ExecuteQueryResult, ExecutorHeartbeat, - ExecutorStatus, ExecutorStoppedParams, ExecutorStoppedResult, GetFileMetadataParams, - GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, HeartBeatParams, - HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams, - RegisterExecutorResult, UpdateTaskStatusParams, UpdateTaskStatusResult, + CancelJobParams, CancelJobResult, CleanJobDataParams, CleanJobDataResult, + ExecuteQueryParams, ExecuteQueryResult, ExecutorHeartbeat, ExecutorStoppedParams, + ExecutorStoppedResult, GetFileMetadataParams, GetFileMetadataResult, + GetJobStatusParams, GetJobStatusResult, HeartBeatParams, HeartBeatResult, + PollWorkParams, PollWorkResult, RegisterExecutorParams, RegisterExecutorResult, + UpdateTaskStatusParams, UpdateTaskStatusResult, }; use ballista_core::serde::scheduler::ExecutorMetadata; @@ -47,7 +47,7 @@ use datafusion::prelude::SessionContext; use std::time::{SystemTime, UNIX_EPOCH}; use tonic::{Request, Response, Status}; -use crate::scheduler_server::{timestamp_secs, SchedulerServer}; +use crate::scheduler_server::SchedulerServer; use crate::state::executor_manager::ExecutorReservation; #[tonic::async_trait] @@ -72,19 +72,6 @@ impl SchedulerGrpc } = request.into_inner() { trace!("Received poll_work request for {:?}", metadata); - // We might receive buggy poll work requests from dead executors. - if self - .state - .executor_manager - .is_dead_executor(&metadata.id.clone()) - { - let error_msg = format!( - "Receive buggy poll work request from dead Executor {}", - metadata.id.clone() - ); - warn!("{}", error_msg); - return Err(Status::internal(error_msg)); - } let metadata = ExecutorMetadata { id: metadata.id, host: metadata @@ -97,14 +84,6 @@ impl SchedulerGrpc grpc_port: metadata.grpc_port as u16, specification: metadata.specification.unwrap().into(), }; - let executor_heartbeat = ExecutorHeartbeat { - executor_id: metadata.id.clone(), - timestamp: timestamp_secs(), - metrics: vec![], - status: Some(ExecutorStatus { - status: Some(executor_status::Status::Active("".to_string())), - }), - }; self.state .executor_manager @@ -116,16 +95,6 @@ impl SchedulerGrpc Status::internal(msg) })?; - self.state - .executor_manager - .save_executor_heartbeat(executor_heartbeat) - .await - .map_err(|e| { - let msg = format!("Could not save executor heartbeat: {e}"); - error!("{}", msg); - Status::internal(msg) - })?; - self.update_task_status(&metadata.id, task_status) .await .map_err(|e| { diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index 69a8c1b48..217711f7c 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -189,7 +189,9 @@ impl SchedulerServer, ) -> Result<()> { // We might receive buggy task updates from dead executors. - if self.state.executor_manager.is_dead_executor(executor_id) { + if self.state.config.is_push_staged_scheduling() + && self.state.executor_manager.is_dead_executor(executor_id) + { let error_msg = format!( "Receive buggy tasks status from dead Executor {executor_id}, task status update ignored." ); From fa7d44bc5b35deaec0c2ab58d9cd6a646b0a8949 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Thu, 30 Mar 2023 21:19:41 +0800 Subject: [PATCH 6/6] Add ExecutorMetadata cache for KeyValueState --- ballista/scheduler/src/cluster/kv.rs | 35 ++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/ballista/scheduler/src/cluster/kv.rs b/ballista/scheduler/src/cluster/kv.rs index 5740970ef..eb164753e 100644 --- a/ballista/scheduler/src/cluster/kv.rs +++ b/ballista/scheduler/src/cluster/kv.rs @@ -56,6 +56,8 @@ pub struct KeyValueState< > { /// Underlying `KeyValueStore` store: S, + /// ExecutorMetadata cache, executor_id -> ExecutorMetadata + executors: Arc>, /// ExecutorHeartbeat cache, executor_id -> ExecutorHeartbeat executor_heartbeats: Arc>, /// Codec used to serialize/deserialize execution plan @@ -80,6 +82,7 @@ impl ) -> Self { Self { store, + executors: Arc::new(DashMap::new()), executor_heartbeats: Arc::new(DashMap::new()), scheduler: scheduler.into(), codec, @@ -147,6 +150,7 @@ impl info!("Initializing heartbeat listener"); let heartbeats = self.executor_heartbeats.clone(); + let executors = self.executors.clone(); tokio::task::spawn(async move { while let Some(heartbeat) = heartbeat_stream.next().await { let executor_id = heartbeat.executor_id.clone(); @@ -158,6 +162,7 @@ impl { Some(protobuf::executor_status::Status::Dead(_)) => { heartbeats.remove(&executor_id); + executors.remove(&executor_id); } _ => { heartbeats.insert(executor_id, heartbeat); @@ -414,19 +419,35 @@ impl async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()> { let executor_id = metadata.id.clone(); - let proto: protobuf::ExecutorMetadata = metadata.into(); + let proto: protobuf::ExecutorMetadata = metadata.clone().into(); self.store - .put(Keyspace::Executors, executor_id, proto.encode_to_vec()) - .await + .put( + Keyspace::Executors, + executor_id.clone(), + proto.encode_to_vec(), + ) + .await?; + + self.executors.insert(executor_id, metadata); + + Ok(()) } async fn get_executor_metadata(&self, executor_id: &str) -> Result { - let value = self.store.get(Keyspace::Executors, executor_id).await?; + let metadata = if let Some(metadata) = self.executors.get(executor_id) { + metadata.value().clone() + } else { + let value = self.store.get(Keyspace::Executors, executor_id).await?; + let decoded = + decode_into::(&value)?; + self.executors + .insert(executor_id.to_string(), decoded.clone()); + + decoded + }; - let decoded = - decode_into::(&value)?; - Ok(decoded) + Ok(metadata) } async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) -> Result<()> {