Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add executor self-registration mechanism in the heartbeat service #649

Merged
merged 1 commit into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ballista/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ message HeartBeatParams {
string executor_id = 1;
repeated ExecutorMetric metrics = 2;
ExecutorStatus status = 3;
ExecutorRegistration metadata = 4;
}

message HeartBeatResult {
Expand Down
2 changes: 2 additions & 0 deletions ballista/core/src/serde/generated/ballista.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,8 @@ pub struct HeartBeatParams {
pub metrics: ::prost::alloc::vec::Vec<ExecutorMetric>,
#[prost(message, optional, tag = "3")]
pub status: ::core::option::Option<ExecutorStatus>,
#[prost(message, optional, tag = "4")]
pub metadata: ::core::option::Option<ExecutorRegistration>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
1 change: 1 addition & 0 deletions ballista/executor/src/executor_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
status: Some(ExecutorStatus {
status: Some(executor_status::Status::Active("".to_string())),
}),
metadata: Some(self.executor.metadata.clone()),
};
let mut scheduler = self.scheduler_to_register.clone();
match scheduler
Expand Down
116 changes: 92 additions & 24 deletions ballista/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use ballista_core::serde::protobuf::{
HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams,
RegisterExecutorResult, UpdateTaskStatusParams, UpdateTaskStatusResult,
};
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
use ballista_core::serde::scheduler::ExecutorMetadata;

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
Expand Down Expand Up @@ -188,30 +188,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
grpc_port: metadata.grpc_port as u16,
specification: metadata.specification.unwrap().into(),
};
let executor_data = ExecutorData {
executor_id: metadata.id.clone(),
total_task_slots: metadata.specification.task_slots,
available_task_slots: metadata.specification.task_slots,
};

async {
// Save the executor to state
let reservations = self
.state
.executor_manager
.register_executor(metadata, executor_data, false)
.await?;

// If we are using push-based scheduling then reserve this executors slots and send
// them for scheduling tasks.
if self.state.config.is_push_staged_scheduling() {
self.offer_reservation(reservations).await?;
}

Ok::<(), ballista_core::error::BallistaError>(())
}
.await
.map_err(|e| {
self.do_register_executor(metadata).await.map_err(|e| {
let msg = format!("Fail to do executor registration due to: {e}");
error!("{}", msg);
Status::internal(msg)
Expand All @@ -228,12 +206,49 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
&self,
request: Request<HeartBeatParams>,
) -> Result<Response<HeartBeatResult>, Status> {
let remote_addr = request.remote_addr();
let HeartBeatParams {
executor_id,
metrics,
status,
metadata,
} = request.into_inner();
debug!("Received heart beat request for {:?}", executor_id);

// If not registered, do registration first before saving heart beat
if let Err(e) = self
.state
.executor_manager
.get_executor_metadata(&executor_id)
.await
{
warn!("Fail to get executor metadata: {}", e);
if let Some(metadata) = metadata {
let metadata = ExecutorMetadata {
id: metadata.id,
host: metadata
.optional_host
.map(|h| match h {
OptionalHost::Host(host) => host,
})
.unwrap_or_else(|| remote_addr.unwrap().ip().to_string()),
port: metadata.port as u16,
grpc_port: metadata.grpc_port as u16,
specification: metadata.specification.unwrap().into(),
};

self.do_register_executor(metadata).await.map_err(|e| {
let msg = format!("Fail to do executor registration due to: {e}");
error!("{}", msg);
Status::internal(msg)
})?;
} else {
return Err(Status::invalid_argument(format!(
"The registration spec for executor {executor_id} is not included"
)));
}
}

let executor_heartbeat = ExecutorHeartbeat {
executor_id,
timestamp: SystemTime::now()
Expand Down Expand Up @@ -766,6 +781,58 @@ mod test {
Ok(())
}

#[tokio::test]
async fn test_register_executor_in_heartbeat_service() -> Result<(), BallistaError> {
let state_storage = Arc::new(SledClient::try_new_temporary()?);
let cluster_state = Arc::new(DefaultClusterState::new(state_storage.clone()));
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
state_storage,
cluster_state,
BallistaCodec::default(),
SchedulerConfig::default(),
default_metrics_collector().unwrap(),
);
scheduler.init().await?;

let exec_meta = ExecutorRegistration {
id: "abc".to_owned(),
optional_host: Some(OptionalHost::Host("http://localhost:8080".to_owned())),
port: 0,
grpc_port: 0,
specification: Some(ExecutorSpecification { task_slots: 2 }.into()),
};

let request: Request<HeartBeatParams> = Request::new(HeartBeatParams {
executor_id: exec_meta.id.clone(),
metrics: vec![],
status: Some(ExecutorStatus {
status: Some(executor_status::Status::Active("".to_string())),
}),
metadata: Some(exec_meta.clone()),
});
scheduler
.heart_beat_from_executor(request)
.await
.expect("Received error response");

let state = scheduler.state.clone();
// executor should be registered
let stored_executor = state
.executor_manager
.get_executor_metadata("abc")
.await
.expect("getting executor");

assert_eq!(stored_executor.grpc_port, 0);
assert_eq!(stored_executor.port, 0);
assert_eq!(stored_executor.specification.task_slots, 2);
assert_eq!(stored_executor.host, "http://localhost:8080".to_owned());

Ok(())
}

#[tokio::test]
#[ignore]
async fn test_expired_executor() -> Result<(), BallistaError> {
Expand Down Expand Up @@ -823,6 +890,7 @@ mod test {
status: Some(ExecutorStatus {
status: Some(executor_status::Status::Active("".to_string())),
}),
metadata: Some(exec_meta.clone()),
});

let _response = scheduler
Expand Down
24 changes: 24 additions & 0 deletions ballista/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion_proto::physical_plan::AsExecutionPlan;

use crate::config::SchedulerConfig;
use crate::metrics::SchedulerMetricsCollector;
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
use log::{error, warn};

use crate::scheduler_server::event::QueryStageSchedulerEvent;
Expand Down Expand Up @@ -323,6 +324,29 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
.await?;
Ok(())
}

async fn do_register_executor(&self, metadata: ExecutorMetadata) -> Result<()> {
let executor_data = ExecutorData {
executor_id: metadata.id.clone(),
total_task_slots: metadata.specification.task_slots,
available_task_slots: metadata.specification.task_slots,
};

// Save the executor to state
let reservations = self
.state
.executor_manager
.register_executor(metadata, executor_data, false)
.await?;

// If we are using push-based scheduling then reserve this executors slots and send
// them for scheduling tasks.
if self.state.config.is_push_staged_scheduling() {
self.offer_reservation(reservations).await?;
}

Ok(())
}
}

pub fn timestamp_secs() -> u64 {
Expand Down
13 changes: 10 additions & 3 deletions ballista/scheduler/src/state/backend/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,16 @@ impl ClusterState for DefaultClusterState {
) -> error::Result<ExecutorMetadata> {
let value = self.kv_store.get(Keyspace::Executors, executor_id).await?;

let decoded =
decode_into::<protobuf::ExecutorMetadata, ExecutorMetadata>(&value)?;
Ok(decoded)
// Throw error rather than panic if the executor metadata does not exist
if value.is_empty() {
Err(BallistaError::General(format!(
"The metadata of executor {executor_id} does not exist"
)))
} else {
let decoded =
decode_into::<protobuf::ExecutorMetadata, ExecutorMetadata>(&value)?;
Ok(decoded)
}
}

async fn save_executor_heartbeat(
Expand Down