Skip to content

Commit

Permalink
Add executor self-registration mechanism in the heartbeat service
Browse files Browse the repository at this point in the history
  • Loading branch information
kyotoYaho committed Feb 2, 2023
1 parent 1b0be75 commit 0452b47
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 27 deletions.
1 change: 1 addition & 0 deletions ballista/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ message HeartBeatParams {
string executor_id = 1;
repeated ExecutorMetric metrics = 2;
ExecutorStatus status = 3;
ExecutorRegistration metadata = 4;
}

message HeartBeatResult {
Expand Down
2 changes: 2 additions & 0 deletions ballista/core/src/serde/generated/ballista.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,8 @@ pub struct HeartBeatParams {
pub metrics: ::prost::alloc::vec::Vec<ExecutorMetric>,
#[prost(message, optional, tag = "3")]
pub status: ::core::option::Option<ExecutorStatus>,
#[prost(message, optional, tag = "4")]
pub metadata: ::core::option::Option<ExecutorRegistration>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
1 change: 1 addition & 0 deletions ballista/executor/src/executor_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
status: Some(ExecutorStatus {
status: Some(executor_status::Status::Active("".to_string())),
}),
metadata: Some(self.executor.metadata.clone()),
};
let mut scheduler = self.scheduler_to_register.clone();
match scheduler
Expand Down
116 changes: 92 additions & 24 deletions ballista/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use ballista_core::serde::protobuf::{
HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams,
RegisterExecutorResult, UpdateTaskStatusParams, UpdateTaskStatusResult,
};
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
use ballista_core::serde::scheduler::ExecutorMetadata;

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
Expand Down Expand Up @@ -188,30 +188,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
grpc_port: metadata.grpc_port as u16,
specification: metadata.specification.unwrap().into(),
};
let executor_data = ExecutorData {
executor_id: metadata.id.clone(),
total_task_slots: metadata.specification.task_slots,
available_task_slots: metadata.specification.task_slots,
};

async {
// Save the executor to state
let reservations = self
.state
.executor_manager
.register_executor(metadata, executor_data, false)
.await?;

// If we are using push-based scheduling then reserve this executors slots and send
// them for scheduling tasks.
if self.state.config.is_push_staged_scheduling() {
self.offer_reservation(reservations).await?;
}

Ok::<(), ballista_core::error::BallistaError>(())
}
.await
.map_err(|e| {
self.do_register_executor(metadata).await.map_err(|e| {
let msg = format!("Fail to do executor registration due to: {e}");
error!("{}", msg);
Status::internal(msg)
Expand All @@ -228,12 +206,49 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
&self,
request: Request<HeartBeatParams>,
) -> Result<Response<HeartBeatResult>, Status> {
let remote_addr = request.remote_addr();
let HeartBeatParams {
executor_id,
metrics,
status,
metadata,
} = request.into_inner();
debug!("Received heart beat request for {:?}", executor_id);

// If not registered, do registration first before saving heart beat
if let Err(e) = self
.state
.executor_manager
.get_executor_metadata(&executor_id)
.await
{
warn!("Fail to get executor metadata: {}", e);
if let Some(metadata) = metadata {
let metadata = ExecutorMetadata {
id: metadata.id,
host: metadata
.optional_host
.map(|h| match h {
OptionalHost::Host(host) => host,
})
.unwrap_or_else(|| remote_addr.unwrap().ip().to_string()),
port: metadata.port as u16,
grpc_port: metadata.grpc_port as u16,
specification: metadata.specification.unwrap().into(),
};

self.do_register_executor(metadata).await.map_err(|e| {
let msg = format!("Fail to do executor registration due to: {e}");
error!("{}", msg);
Status::internal(msg)
})?;
} else {
return Err(Status::invalid_argument(format!(
"The registration spec for executor {executor_id} is not included"
)));
}
}

let executor_heartbeat = ExecutorHeartbeat {
executor_id,
timestamp: SystemTime::now()
Expand Down Expand Up @@ -766,6 +781,58 @@ mod test {
Ok(())
}

#[tokio::test]
async fn test_register_executor_in_heartbeat_service() -> Result<(), BallistaError> {
let state_storage = Arc::new(SledClient::try_new_temporary()?);
let cluster_state = Arc::new(DefaultClusterState::new(state_storage.clone()));
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
state_storage,
cluster_state,
BallistaCodec::default(),
SchedulerConfig::default(),
default_metrics_collector().unwrap(),
);
scheduler.init().await?;

let exec_meta = ExecutorRegistration {
id: "abc".to_owned(),
optional_host: Some(OptionalHost::Host("http://localhost:8080".to_owned())),
port: 0,
grpc_port: 0,
specification: Some(ExecutorSpecification { task_slots: 2 }.into()),
};

let request: Request<HeartBeatParams> = Request::new(HeartBeatParams {
executor_id: exec_meta.id.clone(),
metrics: vec![],
status: Some(ExecutorStatus {
status: Some(executor_status::Status::Active("".to_string())),
}),
metadata: Some(exec_meta.clone()),
});
scheduler
.heart_beat_from_executor(request)
.await
.expect("Received error response");

let state = scheduler.state.clone();
// executor should be registered
let stored_executor = state
.executor_manager
.get_executor_metadata("abc")
.await
.expect("getting executor");

assert_eq!(stored_executor.grpc_port, 0);
assert_eq!(stored_executor.port, 0);
assert_eq!(stored_executor.specification.task_slots, 2);
assert_eq!(stored_executor.host, "http://localhost:8080".to_owned());

Ok(())
}

#[tokio::test]
#[ignore]
async fn test_expired_executor() -> Result<(), BallistaError> {
Expand Down Expand Up @@ -823,6 +890,7 @@ mod test {
status: Some(ExecutorStatus {
status: Some(executor_status::Status::Active("".to_string())),
}),
metadata: Some(exec_meta.clone()),
});

let _response = scheduler
Expand Down
24 changes: 24 additions & 0 deletions ballista/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion_proto::physical_plan::AsExecutionPlan;

use crate::config::SchedulerConfig;
use crate::metrics::SchedulerMetricsCollector;
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
use log::{error, warn};

use crate::scheduler_server::event::QueryStageSchedulerEvent;
Expand Down Expand Up @@ -323,6 +324,29 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
.await?;
Ok(())
}

async fn do_register_executor(&self, metadata: ExecutorMetadata) -> Result<()> {
let executor_data = ExecutorData {
executor_id: metadata.id.clone(),
total_task_slots: metadata.specification.task_slots,
available_task_slots: metadata.specification.task_slots,
};

// Save the executor to state
let reservations = self
.state
.executor_manager
.register_executor(metadata, executor_data, false)
.await?;

// If we are using push-based scheduling then reserve this executors slots and send
// them for scheduling tasks.
if self.state.config.is_push_staged_scheduling() {
self.offer_reservation(reservations).await?;
}

Ok(())
}
}

pub fn timestamp_secs() -> u64 {
Expand Down
13 changes: 10 additions & 3 deletions ballista/scheduler/src/state/backend/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,16 @@ impl ClusterState for DefaultClusterState {
) -> error::Result<ExecutorMetadata> {
let value = self.kv_store.get(Keyspace::Executors, executor_id).await?;

let decoded =
decode_into::<protobuf::ExecutorMetadata, ExecutorMetadata>(&value)?;
Ok(decoded)
// Throw error rather than panic if the executor metadata does not exist
if value.is_empty() {
Err(BallistaError::General(format!(
"The metadata of executor {executor_id} does not exist"
)))
} else {
let decoded =
decode_into::<protobuf::ExecutorMetadata, ExecutorMetadata>(&value)?;
Ok(decoded)
}
}

async fn save_executor_heartbeat(
Expand Down

0 comments on commit 0452b47

Please sign in to comment.