From 0452b478ddd25abdcff0812271ebe6594ed3eb2e Mon Sep 17 00:00:00 2001 From: yangzhong Date: Thu, 2 Feb 2023 15:00:29 +0800 Subject: [PATCH] Add executor self-registration mechanism in the heartbeat service --- ballista/core/proto/ballista.proto | 1 + ballista/core/src/serde/generated/ballista.rs | 2 + ballista/executor/src/executor_server.rs | 1 + .../scheduler/src/scheduler_server/grpc.rs | 116 ++++++++++++++---- .../scheduler/src/scheduler_server/mod.rs | 24 ++++ .../scheduler/src/state/backend/cluster.rs | 13 +- 6 files changed, 130 insertions(+), 27 deletions(-) diff --git a/ballista/core/proto/ballista.proto b/ballista/core/proto/ballista.proto index ddf2c7af5..0f122b850 100644 --- a/ballista/core/proto/ballista.proto +++ b/ballista/core/proto/ballista.proto @@ -480,6 +480,7 @@ message HeartBeatParams { string executor_id = 1; repeated ExecutorMetric metrics = 2; ExecutorStatus status = 3; + ExecutorRegistration metadata = 4; } message HeartBeatResult { diff --git a/ballista/core/src/serde/generated/ballista.rs b/ballista/core/src/serde/generated/ballista.rs index 0c4de2e84..c23b43e5a 100644 --- a/ballista/core/src/serde/generated/ballista.rs +++ b/ballista/core/src/serde/generated/ballista.rs @@ -827,6 +827,8 @@ pub struct HeartBeatParams { pub metrics: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "3")] pub status: ::core::option::Option, + #[prost(message, optional, tag = "4")] + pub metadata: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs index 1bf5ac7b4..2372b6ca0 100644 --- a/ballista/executor/src/executor_server.rs +++ b/ballista/executor/src/executor_server.rs @@ -246,6 +246,7 @@ impl ExecutorServer SchedulerGrpc grpc_port: metadata.grpc_port as u16, specification: metadata.specification.unwrap().into(), }; - let executor_data = ExecutorData { - executor_id: metadata.id.clone(), - total_task_slots: metadata.specification.task_slots, - available_task_slots: metadata.specification.task_slots, - }; - async { - // Save the executor to state - let reservations = self - .state - .executor_manager - .register_executor(metadata, executor_data, false) - .await?; - - // If we are using push-based scheduling then reserve this executors slots and send - // them for scheduling tasks. - if self.state.config.is_push_staged_scheduling() { - self.offer_reservation(reservations).await?; - } - - Ok::<(), ballista_core::error::BallistaError>(()) - } - .await - .map_err(|e| { + self.do_register_executor(metadata).await.map_err(|e| { let msg = format!("Fail to do executor registration due to: {e}"); error!("{}", msg); Status::internal(msg) @@ -228,12 +206,49 @@ impl SchedulerGrpc &self, request: Request, ) -> Result, Status> { + let remote_addr = request.remote_addr(); let HeartBeatParams { executor_id, metrics, status, + metadata, } = request.into_inner(); debug!("Received heart beat request for {:?}", executor_id); + + // If not registered, do registration first before saving heart beat + if let Err(e) = self + .state + .executor_manager + .get_executor_metadata(&executor_id) + .await + { + warn!("Fail to get executor metadata: {}", e); + if let Some(metadata) = metadata { + let metadata = ExecutorMetadata { + id: metadata.id, + host: metadata + .optional_host + .map(|h| match h { + OptionalHost::Host(host) => host, + }) + .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()), + port: metadata.port as u16, + grpc_port: metadata.grpc_port as u16, + specification: metadata.specification.unwrap().into(), + }; + + self.do_register_executor(metadata).await.map_err(|e| { + let msg = format!("Fail to do executor registration due to: {e}"); + error!("{}", msg); + Status::internal(msg) + })?; + } else { + return Err(Status::invalid_argument(format!( + "The registration spec for executor {executor_id} is not included" + ))); + } + } + let executor_heartbeat = ExecutorHeartbeat { executor_id, timestamp: SystemTime::now() @@ -766,6 +781,58 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_register_executor_in_heartbeat_service() -> Result<(), BallistaError> { + let state_storage = Arc::new(SledClient::try_new_temporary()?); + let cluster_state = Arc::new(DefaultClusterState::new(state_storage.clone())); + let mut scheduler: SchedulerServer = + SchedulerServer::new( + "localhost:50050".to_owned(), + state_storage, + cluster_state, + BallistaCodec::default(), + SchedulerConfig::default(), + default_metrics_collector().unwrap(), + ); + scheduler.init().await?; + + let exec_meta = ExecutorRegistration { + id: "abc".to_owned(), + optional_host: Some(OptionalHost::Host("http://localhost:8080".to_owned())), + port: 0, + grpc_port: 0, + specification: Some(ExecutorSpecification { task_slots: 2 }.into()), + }; + + let request: Request = Request::new(HeartBeatParams { + executor_id: exec_meta.id.clone(), + metrics: vec![], + status: Some(ExecutorStatus { + status: Some(executor_status::Status::Active("".to_string())), + }), + metadata: Some(exec_meta.clone()), + }); + scheduler + .heart_beat_from_executor(request) + .await + .expect("Received error response"); + + let state = scheduler.state.clone(); + // executor should be registered + let stored_executor = state + .executor_manager + .get_executor_metadata("abc") + .await + .expect("getting executor"); + + assert_eq!(stored_executor.grpc_port, 0); + assert_eq!(stored_executor.port, 0); + assert_eq!(stored_executor.specification.task_slots, 2); + assert_eq!(stored_executor.host, "http://localhost:8080".to_owned()); + + Ok(()) + } + #[tokio::test] #[ignore] async fn test_expired_executor() -> Result<(), BallistaError> { @@ -823,6 +890,7 @@ mod test { status: Some(ExecutorStatus { status: Some(executor_status::Status::Active("".to_string())), }), + metadata: Some(exec_meta.clone()), }); let _response = scheduler diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index 96f463a5c..55b533d63 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -32,6 +32,7 @@ use datafusion_proto::physical_plan::AsExecutionPlan; use crate::config::SchedulerConfig; use crate::metrics::SchedulerMetricsCollector; +use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata}; use log::{error, warn}; use crate::scheduler_server::event::QueryStageSchedulerEvent; @@ -323,6 +324,29 @@ impl SchedulerServer Result<()> { + let executor_data = ExecutorData { + executor_id: metadata.id.clone(), + total_task_slots: metadata.specification.task_slots, + available_task_slots: metadata.specification.task_slots, + }; + + // Save the executor to state + let reservations = self + .state + .executor_manager + .register_executor(metadata, executor_data, false) + .await?; + + // If we are using push-based scheduling then reserve this executors slots and send + // them for scheduling tasks. + if self.state.config.is_push_staged_scheduling() { + self.offer_reservation(reservations).await?; + } + + Ok(()) + } } pub fn timestamp_secs() -> u64 { diff --git a/ballista/scheduler/src/state/backend/cluster.rs b/ballista/scheduler/src/state/backend/cluster.rs index a727d69ec..7e02697d7 100644 --- a/ballista/scheduler/src/state/backend/cluster.rs +++ b/ballista/scheduler/src/state/backend/cluster.rs @@ -355,9 +355,16 @@ impl ClusterState for DefaultClusterState { ) -> error::Result { let value = self.kv_store.get(Keyspace::Executors, executor_id).await?; - let decoded = - decode_into::(&value)?; - Ok(decoded) + // Throw error rather than panic if the executor metadata does not exist + if value.is_empty() { + Err(BallistaError::General(format!( + "The metadata of executor {executor_id} does not exist" + ))) + } else { + let decoded = + decode_into::(&value)?; + Ok(decoded) + } } async fn save_executor_heartbeat(