diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 5aafd00cf1b0..c86bbef42174 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -610,10 +610,10 @@ message ExecutorRegistration { uint32 port = 3; } -message GetExecutorMetadataParams {} - -message GetExecutorMetadataResult { - repeated ExecutorMetadata metadata = 1; +message ExecutorHeartbeat { + ExecutorMetadata meta = 1; + // Unix epoch-based timestamp in seconds + uint64 timestamp = 2; } message RunningTask { @@ -712,8 +712,6 @@ message FilePartitionMetadata { } service SchedulerGrpc { - rpc GetExecutorsMetadata (GetExecutorMetadataParams) returns (GetExecutorMetadataResult) {} - // Executors must poll the scheduler for heartbeat and to receive tasks rpc PollWork (PollWorkParams) returns (PollWorkResult) {} diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs index afc6f0089b92..3d9be687bed7 100644 --- a/ballista/rust/executor/src/execution_loop.rs +++ b/ballista/rust/executor/src/execution_loop.rs @@ -90,10 +90,14 @@ async fn run_received_tasks( task_status_sender: Sender, task: TaskDefinition, ) { - info!("Received task {:?}", task.task_id.as_ref().unwrap()); + let task_id = task.task_id.unwrap(); + let task_id_log = format!( + "{}/{}/{}", + task_id.job_id, task_id.stage_id, task_id.partition_id + ); + info!("Received task {}", task_id_log); available_tasks_slots.fetch_sub(1, Ordering::SeqCst); let plan: Arc = (&task.plan.unwrap()).try_into().unwrap(); - let task_id = task.task_id.unwrap(); tokio::spawn(async move { let execution_result = executor @@ -104,7 +108,8 @@ async fn run_received_tasks( plan, ) .await; - info!("DONE WITH TASK: {:?}", execution_result); + info!("Done with task {}", task_id_log); + debug!("Statistics: {:?}", execution_result); available_tasks_slots.fetch_add(1, Ordering::SeqCst); let _ = task_status_sender.send(as_task_status( execution_result.map(|_| ()), diff --git a/ballista/rust/scheduler/src/api/handlers.rs b/ballista/rust/scheduler/src/api/handlers.rs index 7293558d0cc4..ee0ee73f4eca 100644 --- a/ballista/rust/scheduler/src/api/handlers.rs +++ b/ballista/rust/scheduler/src/api/handlers.rs @@ -11,45 +11,32 @@ // limitations under the License. use crate::SchedulerServer; -use ballista_core::serde::protobuf::{ - scheduler_grpc_server::SchedulerGrpc, ExecutorMetadata, GetExecutorMetadataParams, - GetExecutorMetadataResult, -}; -use ballista_core::serde::scheduler::ExecutorMeta; -use tonic::{Request, Response}; +use ballista_core::{serde::scheduler::ExecutorMeta, BALLISTA_VERSION}; use warp::Rejection; #[derive(Debug, serde::Serialize)] struct StateResponse { executors: Vec, started: u128, - version: String, + version: &'static str, } pub(crate) async fn scheduler_state( data_server: SchedulerServer, ) -> Result { - let data: Result, tonic::Status> = data_server - .get_executors_metadata(Request::new(GetExecutorMetadataParams {})) - .await; - let metadata: Vec = match data { - Ok(result) => { - let res: &GetExecutorMetadataResult = result.get_ref(); - let vec: &Vec = &res.metadata; - vec.iter() - .map(|v: &ExecutorMetadata| ExecutorMeta { - host: v.host.clone(), - port: v.port as u16, - id: v.id.clone(), - }) - .collect() - } - Err(_) => vec![], - }; + // TODO: Display last seen information in UI + let executors: Vec = data_server + .state + .get_executors_metadata() + .await + .unwrap_or_default() + .into_iter() + .map(|(metadata, _duration)| metadata) + .collect(); let response = StateResponse { - executors: metadata, + executors, started: data_server.start_time, - version: data_server.version.clone(), + version: BALLISTA_VERSION, }; Ok(warp::reply::json(&response)) } diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 99c7be66a646..a3b2e964a071 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -30,10 +30,10 @@ use std::{fmt, net::IpAddr}; use ballista_core::serde::protobuf::{ execute_query_params::Query, executor_registration::OptionalHost, job_status, scheduler_grpc_server::SchedulerGrpc, ExecuteQueryParams, ExecuteQueryResult, - FailedJob, FilePartitionMetadata, FileType, GetExecutorMetadataParams, - GetExecutorMetadataResult, GetFileMetadataParams, GetFileMetadataResult, - GetJobStatusParams, GetJobStatusResult, JobStatus, PartitionId, PollWorkParams, - PollWorkResult, QueuedJob, RunningJob, TaskDefinition, TaskStatus, + FailedJob, FilePartitionMetadata, FileType, GetFileMetadataParams, + GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, JobStatus, + PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, TaskDefinition, + TaskStatus, }; use ballista_core::serde::scheduler::ExecutorMeta; @@ -72,9 +72,8 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH}; #[derive(Clone)] pub struct SchedulerServer { caller_ip: IpAddr, - state: Arc, + pub(crate) state: Arc, start_time: u128, - version: String, } impl SchedulerServer { @@ -83,7 +82,6 @@ impl SchedulerServer { namespace: String, caller_ip: IpAddr, ) -> Self { - const VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION"); let state = Arc::new(SchedulerState::new(config, namespace)); let state_clone = state.clone(); @@ -97,35 +95,12 @@ impl SchedulerServer { .duration_since(UNIX_EPOCH) .unwrap() .as_millis(), - version: VERSION.unwrap_or("Unknown").to_string(), } } } #[tonic::async_trait] impl SchedulerGrpc for SchedulerServer { - async fn get_executors_metadata( - &self, - _request: Request, - ) -> std::result::Result, tonic::Status> { - info!("Received get_executors_metadata request"); - let result = self - .state - .get_executors_metadata() - .await - .map_err(|e| { - let msg = format!("Error reading executors metadata: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })? - .into_iter() - .map(|meta| meta.into()) - .collect(); - Ok(Response::new(GetExecutorMetadataResult { - metadata: result, - })) - } - async fn poll_work( &self, request: Request, @@ -275,13 +250,6 @@ impl SchedulerGrpc for SchedulerServer { } }; debug!("Received plan for execution: {:?}", plan); - let executors = self.state.get_executors_metadata().await.map_err(|e| { - let msg = format!("Error reading executors metadata: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })?; - debug!("Found executors: {:?}", executors); - let job_id: String = { let mut rng = thread_rng(); std::iter::repeat(()) diff --git a/ballista/rust/scheduler/src/state/etcd.rs b/ballista/rust/scheduler/src/state/etcd.rs index 807477d86995..d6741a7d83dc 100644 --- a/ballista/rust/scheduler/src/state/etcd.rs +++ b/ballista/rust/scheduler/src/state/etcd.rs @@ -17,14 +17,12 @@ //! Etcd config backend. -use std::{task::Poll, time::Duration}; +use std::task::Poll; use crate::state::ConfigBackendClient; use ballista_core::error::{ballista_error, Result}; -use etcd_client::{ - GetOptions, LockResponse, PutOptions, WatchOptions, WatchStream, Watcher, -}; +use etcd_client::{GetOptions, LockResponse, WatchOptions, WatchStream, Watcher}; use futures::{Stream, StreamExt}; use log::warn; @@ -70,25 +68,9 @@ impl ConfigBackendClient for EtcdClient { .collect()) } - async fn put( - &self, - key: String, - value: Vec, - lease_time: Option, - ) -> Result<()> { + async fn put(&self, key: String, value: Vec) -> Result<()> { let mut etcd = self.etcd.clone(); - let put_options = if let Some(lease_time) = lease_time { - etcd.lease_grant(lease_time.as_secs() as i64, None) - .await - .map(|lease| Some(PutOptions::new().with_lease(lease.id()))) - .map_err(|e| { - warn!("etcd lease grant failed: {:?}", e.to_string()); - ballista_error("etcd lease grant failed") - })? - } else { - None - }; - etcd.put(key.clone(), value.clone(), put_options) + etcd.put(key.clone(), value.clone(), None) .await .map_err(|e| { warn!("etcd put failed: {}", e); @@ -99,6 +81,7 @@ impl ConfigBackendClient for EtcdClient { async fn lock(&self) -> Result> { let mut etcd = self.etcd.clone(); + // TODO: make this a namespaced-lock let lock = etcd .lock("/ballista_global_lock", None) .await diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index 75f1574ef125..a17c82d4b737 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::time::{SystemTime, UNIX_EPOCH}; use std::{ any::type_name, collections::HashMap, convert::TryInto, sync::Arc, time::Duration, }; @@ -26,8 +27,9 @@ use prost::Message; use tokio::sync::OwnedMutexGuard; use ballista_core::serde::protobuf::{ - job_status, task_status, CompletedJob, CompletedTask, ExecutorMetadata, FailedJob, - FailedTask, JobStatus, PhysicalPlanNode, RunningJob, RunningTask, TaskStatus, + job_status, task_status, CompletedJob, CompletedTask, ExecutorHeartbeat, + ExecutorMetadata, FailedJob, FailedTask, JobStatus, PhysicalPlanNode, RunningJob, + RunningTask, TaskStatus, }; use ballista_core::serde::scheduler::PartitionStats; use ballista_core::{error::BallistaError, serde::scheduler::ExecutorMeta}; @@ -48,8 +50,6 @@ pub use etcd::EtcdClient; #[cfg(feature = "sled")] pub use standalone::StandaloneClient; -const LEASE_TIME: Duration = Duration::from_secs(60); - /// A trait that contains the necessary methods to save and retrieve the state and configuration of a cluster. #[tonic::async_trait] pub trait ConfigBackendClient: Send + Sync { @@ -62,12 +62,7 @@ pub trait ConfigBackendClient: Send + Sync { async fn get_from_prefix(&self, prefix: &str) -> Result)>>; /// Saves the value into the provided key, overriding any previous data that might have been associated to that key. - async fn put( - &self, - key: String, - value: Vec, - lease_time: Option, - ) -> Result<()>; + async fn put(&self, key: String, value: Vec) -> Result<()>; async fn lock(&self) -> Result>; @@ -104,25 +99,55 @@ impl SchedulerState { } } - pub async fn get_executors_metadata(&self) -> Result> { + pub async fn get_executors_metadata(&self) -> Result> { let mut result = vec![]; let entries = self .config_client .get_from_prefix(&get_executors_prefix(&self.namespace)) .await?; + let now_epoch_ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards"); for (_key, entry) in entries { - let meta: ExecutorMetadata = decode_protobuf(&entry)?; - result.push(meta.into()); + let heartbeat: ExecutorHeartbeat = decode_protobuf(&entry)?; + let meta = heartbeat.meta.unwrap(); + let ts = Duration::from_secs(heartbeat.timestamp); + let time_since_last_seen = now_epoch_ts + .checked_sub(ts) + .unwrap_or_else(|| Duration::from_secs(0)); + result.push((meta.into(), time_since_last_seen)); } Ok(result) } + pub async fn get_alive_executors_metadata( + &self, + last_seen_threshold: Duration, + ) -> Result> { + Ok(self + .get_executors_metadata() + .await? + .into_iter() + .filter_map(|(exec, last_seen)| { + (last_seen < last_seen_threshold).then(|| exec) + }) + .collect()) + } + pub async fn save_executor_metadata(&self, meta: ExecutorMeta) -> Result<()> { let key = get_executor_key(&self.namespace, &meta.id); let meta: ExecutorMetadata = meta.into(); - let value: Vec = encode_protobuf(&meta)?; - self.config_client.put(key, value, Some(LEASE_TIME)).await + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + let heartbeat = ExecutorHeartbeat { + meta: Some(meta), + timestamp, + }; + let value: Vec = encode_protobuf(&heartbeat)?; + self.config_client.put(key, value).await } pub async fn save_job_metadata( @@ -133,7 +158,7 @@ impl SchedulerState { debug!("Saving job metadata: {:?}", status); let key = get_job_key(&self.namespace, job_id); let value = encode_protobuf(status)?; - self.config_client.put(key, value, None).await + self.config_client.put(key, value).await } pub async fn get_job_metadata(&self, job_id: &str) -> Result { @@ -158,7 +183,7 @@ impl SchedulerState { partition_id.partition_id as usize, ); let value = encode_protobuf(status)?; - self.config_client.put(key, value, None).await + self.config_client.put(key, value).await } pub async fn _get_task_status( @@ -191,7 +216,7 @@ impl SchedulerState { let proto: PhysicalPlanNode = plan.try_into()?; encode_protobuf(&proto)? }; - self.config_client.clone().put(key, value, None).await + self.config_client.clone().put(key, value).await } pub async fn get_stage_plan( @@ -211,6 +236,40 @@ impl SchedulerState { Ok((&value).try_into()?) } + /// This function ensures that the task wasn't assigned to an executor that died. + /// If that is the case, then the task is re-scheduled. + /// Returns true if the task was dead, false otherwise. + async fn reschedule_dead_task( + &self, + task_status: &TaskStatus, + executors: &[ExecutorMeta], + ) -> Result { + let executor_id: &str = match &task_status.status { + Some(task_status::Status::Completed(CompletedTask { executor_id })) => { + executor_id + } + Some(task_status::Status::Running(RunningTask { executor_id })) => { + executor_id + } + _ => return Ok(false), + }; + let executor_meta = executors.iter().find(|exec| exec.id == executor_id); + let task_is_dead = executor_meta.is_none(); + if task_is_dead { + info!( + "Executor {} isn't alive. Rescheduling task {:?}", + executor_id, + task_status.partition_id.as_ref().unwrap() + ); + // Task was handled in an executor that isn't alive anymore, so we can't resolve it + // We mark the task as pending again and continue + let mut task_status = task_status.clone(); + task_status.status = None; + self.save_task_status(&task_status).await?; + } + Ok(task_is_dead) + } + pub async fn assign_next_schedulable_task( &self, executor_id: &str, @@ -221,7 +280,10 @@ impl SchedulerState { .await? .into_iter() .collect(); - let executors = self.get_executors_metadata().await?; + // TODO: Make the duration a configurable parameter + let executors = self + .get_alive_executors_metadata(Duration::from_secs(60)) + .await?; 'tasks: for (_key, value) in kvs.iter() { let mut status: TaskStatus = decode_protobuf(value)?; if status.status.is_none() { @@ -249,13 +311,23 @@ impl SchedulerState { .unwrap(); let referenced_task: TaskStatus = decode_protobuf(referenced_task)?; - if let Some(task_status::Status::Completed(CompletedTask { - executor_id, - })) = referenced_task.status + let task_is_dead = self + .reschedule_dead_task(&referenced_task, &executors) + .await?; + if task_is_dead { + continue 'tasks; + } else if let Some(task_status::Status::Completed( + CompletedTask { executor_id }, + )) = referenced_task.status { let empty = vec![]; let locations = partition_locations.entry(stage_id).or_insert(empty); + let executor_meta = executors + .iter() + .find(|exec| exec.id == executor_id) + .unwrap() + .clone(); locations.push(vec![ ballista_core::serde::scheduler::PartitionLocation { partition_id: @@ -264,11 +336,7 @@ impl SchedulerState { stage_id, partition_id, }, - executor_meta: executors - .iter() - .find(|exec| exec.id == executor_id) - .unwrap() - .clone(), + executor_meta, partition_stats: PartitionStats::default(), }, ]); @@ -336,7 +404,7 @@ impl SchedulerState { .get_executors_metadata() .await? .into_iter() - .map(|meta| (meta.id.to_string(), meta)) + .map(|(meta, _)| (meta.id.to_string(), meta)) .collect(); let status: JobStatus = decode_protobuf(&value)?; let new_status = self.get_job_status_from_tasks(job_id, &executors).await?; @@ -553,7 +621,12 @@ mod test { port: 123, }; state.save_executor_metadata(meta.clone()).await?; - let result = state.get_executors_metadata().await?; + let result: Vec<_> = state + .get_executors_metadata() + .await? + .into_iter() + .map(|(meta, _)| meta) + .collect(); assert_eq!(vec![meta], result); Ok(()) } diff --git a/ballista/rust/scheduler/src/state/standalone.rs b/ballista/rust/scheduler/src/state/standalone.rs index 69805c016a10..8514d4cf3e64 100644 --- a/ballista/rust/scheduler/src/state/standalone.rs +++ b/ballista/rust/scheduler/src/state/standalone.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{sync::Arc, task::Poll, time::Duration}; +use std::{sync::Arc, task::Poll}; use crate::state::ConfigBackendClient; use ballista_core::error::{ballista_error, BallistaError, Result}; @@ -89,13 +89,7 @@ impl ConfigBackendClient for StandaloneClient { .map_err(|e| ballista_error(&format!("sled error {:?}", e)))?) } - // TODO: support lease_time. See https://github.com/spacejam/sled/issues/1119 for how to approach this - async fn put( - &self, - key: String, - value: Vec, - _lease_time: Option, - ) -> Result<()> { + async fn put(&self, key: String, value: Vec) -> Result<()> { self.db .insert(key, value) .map_err(|e| { @@ -170,7 +164,7 @@ mod tests { let client = create_instance()?; let key = "key"; let value = "value".as_bytes(); - client.put(key.to_owned(), value.to_vec(), None).await?; + client.put(key.to_owned(), value.to_vec()).await?; assert_eq!(client.get(key).await?, value); Ok(()) } @@ -189,12 +183,8 @@ mod tests { let client = create_instance()?; let key = "key"; let value = "value".as_bytes(); - client - .put(format!("{}/1", key), value.to_vec(), None) - .await?; - client - .put(format!("{}/2", key), value.to_vec(), None) - .await?; + client.put(format!("{}/1", key), value.to_vec()).await?; + client.put(format!("{}/2", key), value.to_vec()).await?; assert_eq!( client.get_from_prefix(key).await?, vec![ @@ -211,13 +201,13 @@ mod tests { let key = "key"; let value = "value".as_bytes(); let mut watch: Box = client.watch(key.to_owned()).await?; - client.put(key.to_owned(), value.to_vec(), None).await?; + client.put(key.to_owned(), value.to_vec()).await?; assert_eq!( watch.next().await, Some(WatchEvent::Put(key.to_owned(), value.to_owned())) ); let value2 = "value2".as_bytes(); - client.put(key.to_owned(), value2.to_vec(), None).await?; + client.put(key.to_owned(), value2.to_vec()).await?; assert_eq!( watch.next().await, Some(WatchEvent::Put(key.to_owned(), value2.to_owned())) diff --git a/dev/docker/ballista-base.dockerfile b/dev/docker/ballista-base.dockerfile index 31620b38cf39..e977f5eeff75 100644 --- a/dev/docker/ballista-base.dockerfile +++ b/dev/docker/ballista-base.dockerfile @@ -23,7 +23,7 @@ # Base image extends debian:buster-slim -FROM rust:1.51.0-buster AS builder +FROM rust:1.52.1-buster AS builder RUN apt update && apt -y install musl musl-dev musl-tools libssl-dev openssl