Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove redundant fields in ExecutorManager #728

Merged
merged 6 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ default = "3600"
doc = "Delayed interval for cleaning up finished job state. Default: 3600"

[[param]]
name = "executor_slots_policy"
type = "ballista_scheduler::config::SlotsPolicy"
doc = "The executor slots policy for the scheduler, possible values: bias, round-robin, round-robin-local. Default: bias"
default = "ballista_scheduler::config::SlotsPolicy::Bias"
name = "task_distribution"
type = "ballista_scheduler::config::TaskDistribution"
doc = "The policy of distributing tasks to available executor slots, possible values: bias, round-robin. Default: bias"
default = "ballista_scheduler::config::TaskDistribution::Bias"

[[param]]
name = "plugin_dir"
Expand Down
2 changes: 1 addition & 1 deletion ballista/scheduler/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn main() -> Result<()> {
bind_port: opt.bind_port,
scheduling_policy: opt.scheduler_policy,
event_loop_buffer_size: opt.event_loop_buffer_size,
executor_slots_policy: opt.executor_slots_policy,
task_distribution: opt.task_distribution,
finished_job_data_clean_up_interval_seconds: opt
.finished_job_data_clean_up_interval_seconds,
finished_job_state_clean_up_interval_seconds: opt
Expand Down
164 changes: 104 additions & 60 deletions ballista/scheduler/src/cluster/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::cluster::{
reserve_slots_bias, reserve_slots_round_robin, ClusterState, ExecutorHeartbeatStream,
JobState, JobStateEvent, JobStateEventStream, JobStatus, TaskDistribution,
};
use crate::scheduler_server::SessionBuilder;
use crate::scheduler_server::{timestamp_secs, SessionBuilder};
use crate::state::execution_graph::ExecutionGraph;
use crate::state::executor_manager::ExecutorReservation;
use crate::state::session_manager::create_datafusion_context;
Expand All @@ -42,12 +42,11 @@ use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
use futures::StreamExt;
use itertools::Itertools;
use log::warn;
use log::{info, warn};
use prost::Message;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

/// State implementation based on underlying `KeyValueStore`
pub struct KeyValueState<
Expand All @@ -57,6 +56,8 @@ pub struct KeyValueState<
> {
/// Underlying `KeyValueStore`
store: S,
/// ExecutorHeartbeat cache, executor_id -> ExecutorHeartbeat
executor_heartbeats: Arc<DashMap<String, ExecutorHeartbeat>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to also cache ExecutorMetadata and ExecutorData here as well to preserve the functionality previously in ExecutorManager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ExecutorMetadata, it can be cached. However, for ExecutorData, I don't think it should be cached, as it's frequently changed and it's hard to make it consistent with the one stored in backend storage when there are multiple active schedulers.

Actually, I don't think it's necessary to introduce multiple active schedulers in the Ballista cluster. Maybe only HA is needed. If so, we can cache the ExecutorData with executor available slots to avoid frequent visit to the backend storage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah I don't think ExecutorData is used at all anymore actually. We use ExecutorTaskSlots to store the task slots for storing available slots for reservation purposes. So I think we should be fine just caching ExecutorMetadata

Actually, I don't think it's necessary to introduce multiple active schedulers in the Ballista cluster.

We run multiple active schedulers. The scheduler is doing a non-trivial amount of work in scheduling and it's important to us to be able to scale that layer horizontally.

Copy link
Contributor Author

@yahoNanJing yahoNanJing Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the horizontally scaling, actually, I haven't met any bottlenecks of the scheduler after introducing the optimizations previously. maybe single scheduler is enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have :). The problem is mostly around the cost of plan serialization. We frequently see execution plans with 50k+ files and the plan is very large and causes a lot of CPU and memory overhead to serialize.

Aside from that, the other issue is that doing zero-downtime deployments is much easier with multiple active schedulers. It is a solvable problem using leader election but for our use case it was preferable to just run multiple active schedulers and solve both the deployment and scalability issue.

Copy link
Contributor Author

@yahoNanJing yahoNanJing Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for the plan, it's also been cached and we only need to do the serialization only once for one query. So do you still have the problem?

The reason I do not prefer the multiple active schedulers is the contention for the execution slots, which will lead to inefficient slots updates and the infeasibility of future consistent hashing based task assignments.

Copy link
Contributor Author

@yahoNanJing yahoNanJing Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the multiple active schedulers is required, I think your current design of ExecutionReservation is reasonable. Since the consistent hashing based task assignments may be only possible for the single active scheduler without slots contention, we have to use different policy for different scheduler deployments

/// Codec used to serialize/deserialize execution plan
codec: BallistaCodec<T, U>,
/// Name of current scheduler. Should be `{host}:{port}`
Expand All @@ -79,18 +80,95 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
) -> Self {
Self {
store,
executor_heartbeats: Arc::new(DashMap::new()),
scheduler: scheduler.into(),
codec,
queued_jobs: DashMap::new(),
session_builder,
}
}

/// Initialize the set of active executor heartbeats from storage
async fn init_active_executor_heartbeats(&self) -> Result<()> {
let heartbeats = self.store.scan(Keyspace::Heartbeats, None).await?;

for (_, value) in heartbeats {
let data: ExecutorHeartbeat = decode_protobuf(&value)?;
if let Some(protobuf::ExecutorStatus {
status: Some(protobuf::executor_status::Status::Active(_)),
}) = &data.status
{
self.executor_heartbeats
.insert(data.executor_id.clone(), data);
}
}

Ok(())
}

/// Return the stream of executor heartbeats observed by all schedulers in the cluster.
/// This can be aggregated to provide an eventually consistent view of all executors within the cluster
async fn executor_heartbeat_stream(&self) -> Result<ExecutorHeartbeatStream> {
let events = self
.store
.watch(Keyspace::Heartbeats, String::default())
.await?;

Ok(events
.filter_map(|event| {
futures::future::ready(match event {
WatchEvent::Put(_, value) => {
if let Ok(heartbeat) =
decode_protobuf::<ExecutorHeartbeat>(&value)
{
Some(heartbeat)
} else {
None
}
}
WatchEvent::Delete(_) => None,
})
})
.boxed())
}
}

#[async_trait]
impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
ClusterState for KeyValueState<S, T, U>
{
/// Initialize a background process that will listen for executor heartbeats and update the in-memory cache
/// of executor heartbeats
async fn init(&self) -> Result<()> {
self.init_active_executor_heartbeats().await?;

let mut heartbeat_stream = self.executor_heartbeat_stream().await?;

info!("Initializing heartbeat listener");

let heartbeats = self.executor_heartbeats.clone();
tokio::task::spawn(async move {
while let Some(heartbeat) = heartbeat_stream.next().await {
let executor_id = heartbeat.executor_id.clone();

match heartbeat
.status
.as_ref()
.and_then(|status| status.status.as_ref())
{
Some(protobuf::executor_status::Status::Dead(_)) => {
heartbeats.remove(&executor_id);
}
_ => {
heartbeats.insert(executor_id, heartbeat);
}
}
}
});

Ok(())
}

async fn reserve_slots(
&self,
num_slots: u32,
Expand Down Expand Up @@ -240,22 +318,17 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
) -> Result<Vec<ExecutorReservation>> {
let executor_id = metadata.id.clone();

let current_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| {
BallistaError::Internal(format!("Error getting current timestamp: {e:?}"))
})?
.as_secs();

//TODO this should be in a transaction
// Now that we know we can connect, save the metadata and slots
self.save_executor_metadata(metadata).await?;
self.save_executor_heartbeat(ExecutorHeartbeat {
executor_id: executor_id.clone(),
timestamp: current_ts,
timestamp: timestamp_secs(),
metrics: vec![],
status: Some(protobuf::ExecutorStatus {
status: Some(protobuf::executor_status::Status::Active("".to_string())),
status: Some(
protobuf::executor_status::Status::Active(String::default()),
),
}),
})
.await?;
Expand Down Expand Up @@ -359,21 +432,20 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) -> Result<()> {
let executor_id = heartbeat.executor_id.clone();
self.store
.put(Keyspace::Heartbeats, executor_id, heartbeat.encode_to_vec())
.await
.put(
Keyspace::Heartbeats,
executor_id.clone(),
heartbeat.clone().encode_to_vec(),
)
.await?;
self.executor_heartbeats.insert(executor_id, heartbeat);
Ok(())
}

async fn remove_executor(&self, executor_id: &str) -> Result<()> {
let current_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| {
BallistaError::Internal(format!("Error getting current timestamp: {e:?}"))
})?
.as_secs();

let value = ExecutorHeartbeat {
executor_id: executor_id.to_owned(),
timestamp: current_ts,
timestamp: timestamp_secs(),
metrics: vec![],
status: Some(protobuf::ExecutorStatus {
status: Some(protobuf::executor_status::Status::Dead("".to_string())),
Expand All @@ -384,52 +456,24 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
self.store
.put(Keyspace::Heartbeats, executor_id.to_owned(), value)
.await?;
self.executor_heartbeats.remove(executor_id);

// TODO Check the Executor reservation logic for push-based scheduling

Ok(())
}

async fn executor_heartbeat_stream(&self) -> Result<ExecutorHeartbeatStream> {
let events = self
.store
.watch(Keyspace::Heartbeats, String::default())
.await?;

Ok(events
.filter_map(|event| {
futures::future::ready(match event {
WatchEvent::Put(_, value) => {
if let Ok(heartbeat) =
decode_protobuf::<ExecutorHeartbeat>(&value)
{
Some(heartbeat)
} else {
None
}
}
WatchEvent::Delete(_) => None,
})
})
.boxed())
fn executor_heartbeats(&self) -> HashMap<String, ExecutorHeartbeat> {
self.executor_heartbeats
.iter()
.map(|r| (r.key().clone(), r.value().clone()))
.collect()
}

async fn executor_heartbeats(&self) -> Result<HashMap<String, ExecutorHeartbeat>> {
let heartbeats = self.store.scan(Keyspace::Heartbeats, None).await?;

let mut heartbeat_map = HashMap::with_capacity(heartbeats.len());

for (_, value) in heartbeats {
let data: ExecutorHeartbeat = decode_protobuf(&value)?;
if let Some(protobuf::ExecutorStatus {
status: Some(protobuf::executor_status::Status::Active(_)),
}) = &data.status
{
heartbeat_map.insert(data.executor_id.clone(), data);
}
}

Ok(heartbeat_map)
fn get_executor_heartbeat(&self, executor_id: &str) -> Option<ExecutorHeartbeat> {
self.executor_heartbeats
.get(executor_id)
.map(|r| r.value().clone())
}
}

Expand Down
Loading