Skip to content

Commit

Permalink
Add some resiliency to lost executors
Browse files Browse the repository at this point in the history
  • Loading branch information
edrevo committed Jun 16, 2021
1 parent e3e7e29 commit a4855ac
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 141 deletions.
10 changes: 4 additions & 6 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -610,10 +610,10 @@ message ExecutorRegistration {
uint32 port = 3;
}

message GetExecutorMetadataParams {}

message GetExecutorMetadataResult {
repeated ExecutorMetadata metadata = 1;
message ExecutorHeartbeat {
ExecutorMetadata meta = 1;
// Unix epoch-based timestamp in seconds
uint64 timestamp = 2;
}

message RunningTask {
Expand Down Expand Up @@ -712,8 +712,6 @@ message FilePartitionMetadata {
}

service SchedulerGrpc {
rpc GetExecutorsMetadata (GetExecutorMetadataParams) returns (GetExecutorMetadataResult) {}

// Executors must poll the scheduler for heartbeat and to receive tasks
rpc PollWork (PollWorkParams) returns (PollWorkResult) {}

Expand Down
11 changes: 8 additions & 3 deletions ballista/rust/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,14 @@ async fn run_received_tasks(
task_status_sender: Sender<TaskStatus>,
task: TaskDefinition,
) {
info!("Received task {:?}", task.task_id.as_ref().unwrap());
let task_id = task.task_id.unwrap();
let task_id_log = format!(
"{}/{}/{}",
task_id.job_id, task_id.stage_id, task_id.partition_id
);
info!("Received task {}", task_id_log);
available_tasks_slots.fetch_sub(1, Ordering::SeqCst);
let plan: Arc<dyn ExecutionPlan> = (&task.plan.unwrap()).try_into().unwrap();
let task_id = task.task_id.unwrap();

tokio::spawn(async move {
let execution_result = executor
Expand All @@ -104,7 +108,8 @@ async fn run_received_tasks(
plan,
)
.await;
info!("DONE WITH TASK: {:?}", execution_result);
info!("Done with task {}", task_id_log);
debug!("Statistics: {:?}", execution_result);
available_tasks_slots.fetch_add(1, Ordering::SeqCst);
let _ = task_status_sender.send(as_task_status(
execution_result.map(|_| ()),
Expand Down
39 changes: 13 additions & 26 deletions ballista/rust/scheduler/src/api/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,32 @@
// limitations under the License.

use crate::SchedulerServer;
use ballista_core::serde::protobuf::{
scheduler_grpc_server::SchedulerGrpc, ExecutorMetadata, GetExecutorMetadataParams,
GetExecutorMetadataResult,
};
use ballista_core::serde::scheduler::ExecutorMeta;
use tonic::{Request, Response};
use ballista_core::{serde::scheduler::ExecutorMeta, BALLISTA_VERSION};
use warp::Rejection;

#[derive(Debug, serde::Serialize)]
struct StateResponse {
executors: Vec<ExecutorMeta>,
started: u128,
version: String,
version: &'static str,
}

pub(crate) async fn scheduler_state(
data_server: SchedulerServer,
) -> Result<impl warp::Reply, Rejection> {
let data: Result<Response<GetExecutorMetadataResult>, tonic::Status> = data_server
.get_executors_metadata(Request::new(GetExecutorMetadataParams {}))
.await;
let metadata: Vec<ExecutorMeta> = match data {
Ok(result) => {
let res: &GetExecutorMetadataResult = result.get_ref();
let vec: &Vec<ExecutorMetadata> = &res.metadata;
vec.iter()
.map(|v: &ExecutorMetadata| ExecutorMeta {
host: v.host.clone(),
port: v.port as u16,
id: v.id.clone(),
})
.collect()
}
Err(_) => vec![],
};
// TODO: Display last seen information in UI
let executors: Vec<ExecutorMeta> = data_server
.state
.get_executors_metadata()
.await
.unwrap_or_default()
.into_iter()
.map(|(metadata, _duration)| metadata)
.collect();
let response = StateResponse {
executors: metadata,
executors,
started: data_server.start_time,
version: data_server.version.clone(),
version: BALLISTA_VERSION,
};
Ok(warp::reply::json(&response))
}
42 changes: 5 additions & 37 deletions ballista/rust/scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ use std::{fmt, net::IpAddr};
use ballista_core::serde::protobuf::{
execute_query_params::Query, executor_registration::OptionalHost, job_status,
scheduler_grpc_server::SchedulerGrpc, ExecuteQueryParams, ExecuteQueryResult,
FailedJob, FilePartitionMetadata, FileType, GetExecutorMetadataParams,
GetExecutorMetadataResult, GetFileMetadataParams, GetFileMetadataResult,
GetJobStatusParams, GetJobStatusResult, JobStatus, PartitionId, PollWorkParams,
PollWorkResult, QueuedJob, RunningJob, TaskDefinition, TaskStatus,
FailedJob, FilePartitionMetadata, FileType, GetFileMetadataParams,
GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, JobStatus,
PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, TaskDefinition,
TaskStatus,
};
use ballista_core::serde::scheduler::ExecutorMeta;

Expand Down Expand Up @@ -72,9 +72,8 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH};
#[derive(Clone)]
pub struct SchedulerServer {
caller_ip: IpAddr,
state: Arc<SchedulerState>,
pub(crate) state: Arc<SchedulerState>,
start_time: u128,
version: String,
}

impl SchedulerServer {
Expand All @@ -83,7 +82,6 @@ impl SchedulerServer {
namespace: String,
caller_ip: IpAddr,
) -> Self {
const VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
let state = Arc::new(SchedulerState::new(config, namespace));
let state_clone = state.clone();

Expand All @@ -97,35 +95,12 @@ impl SchedulerServer {
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis(),
version: VERSION.unwrap_or("Unknown").to_string(),
}
}
}

#[tonic::async_trait]
impl SchedulerGrpc for SchedulerServer {
async fn get_executors_metadata(
&self,
_request: Request<GetExecutorMetadataParams>,
) -> std::result::Result<Response<GetExecutorMetadataResult>, tonic::Status> {
info!("Received get_executors_metadata request");
let result = self
.state
.get_executors_metadata()
.await
.map_err(|e| {
let msg = format!("Error reading executors metadata: {}", e);
error!("{}", msg);
tonic::Status::internal(msg)
})?
.into_iter()
.map(|meta| meta.into())
.collect();
Ok(Response::new(GetExecutorMetadataResult {
metadata: result,
}))
}

async fn poll_work(
&self,
request: Request<PollWorkParams>,
Expand Down Expand Up @@ -275,13 +250,6 @@ impl SchedulerGrpc for SchedulerServer {
}
};
debug!("Received plan for execution: {:?}", plan);
let executors = self.state.get_executors_metadata().await.map_err(|e| {
let msg = format!("Error reading executors metadata: {}", e);
error!("{}", msg);
tonic::Status::internal(msg)
})?;
debug!("Found executors: {:?}", executors);

let job_id: String = {
let mut rng = thread_rng();
std::iter::repeat(())
Expand Down
27 changes: 5 additions & 22 deletions ballista/rust/scheduler/src/state/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

//! Etcd config backend.

use std::{task::Poll, time::Duration};
use std::task::Poll;

use crate::state::ConfigBackendClient;
use ballista_core::error::{ballista_error, Result};

use etcd_client::{
GetOptions, LockResponse, PutOptions, WatchOptions, WatchStream, Watcher,
};
use etcd_client::{GetOptions, LockResponse, WatchOptions, WatchStream, Watcher};
use futures::{Stream, StreamExt};
use log::warn;

Expand Down Expand Up @@ -70,25 +68,9 @@ impl ConfigBackendClient for EtcdClient {
.collect())
}

async fn put(
&self,
key: String,
value: Vec<u8>,
lease_time: Option<Duration>,
) -> Result<()> {
async fn put(&self, key: String, value: Vec<u8>) -> Result<()> {
let mut etcd = self.etcd.clone();
let put_options = if let Some(lease_time) = lease_time {
etcd.lease_grant(lease_time.as_secs() as i64, None)
.await
.map(|lease| Some(PutOptions::new().with_lease(lease.id())))
.map_err(|e| {
warn!("etcd lease grant failed: {:?}", e.to_string());
ballista_error("etcd lease grant failed")
})?
} else {
None
};
etcd.put(key.clone(), value.clone(), put_options)
etcd.put(key.clone(), value.clone(), None)
.await
.map_err(|e| {
warn!("etcd put failed: {}", e);
Expand All @@ -99,6 +81,7 @@ impl ConfigBackendClient for EtcdClient {

async fn lock(&self) -> Result<Box<dyn Lock>> {
let mut etcd = self.etcd.clone();
// TODO: make this a namespaced-lock
let lock = etcd
.lock("/ballista_global_lock", None)
.await
Expand Down
Loading

0 comments on commit a4855ac

Please sign in to comment.