Skip to content

Commit

Permalink
Support for multi-scheduler deployments (#59)
Browse files Browse the repository at this point in the history
* Initial design and implementation

* ExecutorManager tests

* Only consider alive executors

* Use correct session ID provided in request

* Fix bug in etcd key scan

* Debugging

* Drop for EtcdLock

* Better locking

* Debug for ExecutionGraph

* Fix partition accounting in ExecutionGraph

* Fix input partition accounting

* Handle stages with multiple inputs better

* Simplify output buffer

* Cleanup

* Cleanup

* Linting

* Linting and docs

* Job queueing and general cleanup

* Handle job queueing and failure

* Tests

* Fix doc comments

* Tests

* Add license header

* Fix graph complete logic

* Fix bug in partition mapping

* Eagerly offer pending tasks

* Tests for event loop

* Merge upstream

* Fix compiler error after rebase

* Clippy fix

* Merge pull request #4 from coralogix/scheduler-fix

Scheduler fixes

* Use correct bind address for executor registration

* Use correct keyspace when initing heartbeats

* Fix after cherry-pick bugfixes

* Fix conflicts after merge

Co-authored-by: Martins Purins <[email protected]>
  • Loading branch information
thinkharderdev and mpurins-coralogix authored Jul 16, 2022
1 parent 56ec4df commit a2c794e
Show file tree
Hide file tree
Showing 27 changed files with 4,439 additions and 3,087 deletions.
35 changes: 35 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,37 @@ enum JoinSide{
///////////////////////////////////////////////////////////////////////////////////////////////////
// Ballista Scheduling
///////////////////////////////////////////////////////////////////////////////////////////////////
message TaskInputPartitions {
uint32 partition = 1;
repeated PartitionLocation partition_location = 2;
}

message GraphStageInput {
uint32 stage_id = 1;
repeated TaskInputPartitions partition_locations = 2;
bool complete = 3;
}


message ExecutionGraphStage {
uint64 stage_id = 1;
uint32 partitions = 2;
PhysicalHashRepartition output_partitioning = 3;
repeated GraphStageInput inputs = 4;
bytes plan = 5;
repeated TaskStatus task_statuses = 6;
uint32 output_link = 7;
bool resolved = 8;
}

message ExecutionGraph {
string job_id = 1;
string session_id = 2;
JobStatus status = 3;
repeated ExecutionGraphStage stages = 4;
uint64 output_partitions = 5;
repeated PartitionLocation output_locations = 6;
}

message KeyValuePair {
string key = 1;
Expand Down Expand Up @@ -581,6 +612,10 @@ message TaskDefinition {
repeated KeyValuePair props = 5;
}

message SessionSettings {
repeated KeyValuePair configs = 1;
}

message JobSessionConfig {
string session_id = 1;
repeated KeyValuePair configs = 2;
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl BallistaConfigBuilder {
}

/// Ballista configuration
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct BallistaConfig {
/// Settings stored in map for easy serde
settings: HashMap<String, String>,
Expand Down
16 changes: 9 additions & 7 deletions ballista/rust/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@
// specific language governing permissions and limitations
// under the License.

use crate::as_task_status;
use crate::executor::Executor;
use ballista_core::error::BallistaError;
use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use datafusion::physical_plan::ExecutionPlan;

use ballista_core::serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult,
TaskDefinition, TaskStatus,
};

use crate::as_task_status;
use crate::executor::Executor;
use ballista_core::error::BallistaError;
use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
use futures::FutureExt;
use log::{debug, error, info, warn};
use log::{debug, error, info, trace, warn};
use std::any::Any;
use std::collections::HashMap;
use std::error::Error;
Expand Down Expand Up @@ -57,7 +59,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
std::sync::mpsc::channel::<TaskStatus>();

loop {
debug!("Starting registration loop with scheduler");
trace!("Starting registration loop with scheduler");

let task_status: Vec<TaskStatus> =
sample_tasks_status(&mut task_status_receiver).await;
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/executor/src/executor_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
.map(|h| match h {
OptionalHost::Host(host) => host,
})
.unwrap_or_else(|| String::from("127.0.0.1")),
.unwrap_or_else(|| String::from("0.0.0.0")),
executor_meta.grpc_port
);
let addr = addr.parse().unwrap();
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/scheduler/src/api/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ pub(crate) async fn scheduler_state<T: AsLogicalPlan, U: AsExecutionPlan>(
// TODO: Display last seen information in UI
let executors: Vec<ExecutorMetaResponse> = data_server
.state
.get_executors_metadata()
.executor_manager
.get_executor_state()
.await
.unwrap_or_default()
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/scheduler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ async fn main() -> Result<()> {
let etcd = etcd_client::Client::connect(&[opt.etcd_urls], None)
.await
.context("Could not connect to etcd")?;
Arc::new(EtcdClient::new(etcd))
Arc::new(EtcdClient::new(namespace.clone(), etcd))
}
#[cfg(not(feature = "etcd"))]
StateBackend::Etcd => {
Expand Down
188 changes: 87 additions & 101 deletions ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::{
with_new_children_if_necessary, ExecutionPlan, Partitioning,
};
use futures::future::BoxFuture;
use futures::FutureExt;

use log::info;

type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, Vec<Arc<ShuffleWriterExec>>);
Expand All @@ -59,15 +58,14 @@ impl DistributedPlanner {
/// Returns a vector of ExecutionPlans, where the root node is a [ShuffleWriterExec].
/// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec].
/// A [ShuffleWriterExec] is created whenever the partitioning changes.
pub async fn plan_query_stages<'a>(
pub fn plan_query_stages<'a>(
&'a mut self,
job_id: &'a str,
execution_plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<Arc<ShuffleWriterExec>>> {
info!("planning query stages");
let (new_plan, mut stages) = self
.plan_query_stages_internal(job_id, execution_plan)
.await?;
let (new_plan, mut stages) =
self.plan_query_stages_internal(job_id, execution_plan)?;
stages.push(create_shuffle_writer(
job_id,
self.next_stage_id(),
Expand All @@ -84,97 +82,91 @@ impl DistributedPlanner {
&'a mut self,
job_id: &'a str,
execution_plan: Arc<dyn ExecutionPlan>,
) -> BoxFuture<'a, Result<PartialQueryStageResult>> {
async move {
// recurse down and replace children
if execution_plan.children().is_empty() {
return Ok((execution_plan, vec![]));
}
) -> Result<PartialQueryStageResult> {
// async move {
// recurse down and replace children
if execution_plan.children().is_empty() {
return Ok((execution_plan, vec![]));
}

let mut stages = vec![];
let mut children = vec![];
for child in execution_plan.children() {
let (new_child, mut child_stages) = self
.plan_query_stages_internal(job_id, child.clone())
.await?;
children.push(new_child);
stages.append(&mut child_stages);
}
let mut stages = vec![];
let mut children = vec![];
for child in execution_plan.children() {
let (new_child, mut child_stages) =
self.plan_query_stages_internal(job_id, child.clone())?;
children.push(new_child);
stages.append(&mut child_stages);
}

if let Some(_coalesce) = execution_plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
{
let shuffle_writer = create_shuffle_writer(
job_id,
self.next_stage_id(),
children[0].clone(),
None,
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
shuffle_writer.stage_id(),
shuffle_writer.schema(),
shuffle_writer.output_partitioning().partition_count(),
shuffle_writer
.shuffle_output_partitioning()
.map(|p| p.partition_count())
.unwrap_or_else(|| {
shuffle_writer.output_partitioning().partition_count()
}),
));
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(
execution_plan,
vec![unresolved_shuffle],
)?,
stages,
))
} else if let Some(repart) =
execution_plan.as_any().downcast_ref::<RepartitionExec>()
{
match repart.output_partitioning() {
Partitioning::Hash(_, _) => {
let shuffle_writer = create_shuffle_writer(
job_id,
self.next_stage_id(),
children[0].clone(),
Some(repart.partitioning().to_owned()),
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
shuffle_writer.stage_id(),
shuffle_writer.schema(),
shuffle_writer.output_partitioning().partition_count(),
shuffle_writer
.shuffle_output_partitioning()
.map(|p| p.partition_count())
.unwrap_or_else(|| {
shuffle_writer.output_partitioning().partition_count()
}),
));
stages.push(shuffle_writer);
Ok((unresolved_shuffle, stages))
}
_ => {
// remove any non-hash repartition from the distributed plan
Ok((children[0].clone(), stages))
}
if let Some(_coalesce) = execution_plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
{
let shuffle_writer = create_shuffle_writer(
job_id,
self.next_stage_id(),
children[0].clone(),
None,
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
shuffle_writer.stage_id(),
shuffle_writer.schema(),
shuffle_writer.output_partitioning().partition_count(),
shuffle_writer
.shuffle_output_partitioning()
.map(|p| p.partition_count())
.unwrap_or_else(|| {
shuffle_writer.output_partitioning().partition_count()
}),
));
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
stages,
))
} else if let Some(repart) =
execution_plan.as_any().downcast_ref::<RepartitionExec>()
{
match repart.output_partitioning() {
Partitioning::Hash(_, _) => {
let shuffle_writer = create_shuffle_writer(
job_id,
self.next_stage_id(),
children[0].clone(),
Some(repart.partitioning().to_owned()),
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
shuffle_writer.stage_id(),
shuffle_writer.schema(),
shuffle_writer.output_partitioning().partition_count(),
shuffle_writer
.shuffle_output_partitioning()
.map(|p| p.partition_count())
.unwrap_or_else(|| {
shuffle_writer.output_partitioning().partition_count()
}),
));
stages.push(shuffle_writer);
Ok((unresolved_shuffle, stages))
}
_ => {
// remove any non-hash repartition from the distributed plan
Ok((children[0].clone(), stages))
}
} else if let Some(window) =
execution_plan.as_any().downcast_ref::<WindowAggExec>()
{
Err(BallistaError::NotImplemented(format!(
"WindowAggExec with window {:?}",
window
)))
} else {
Ok((
with_new_children_if_necessary(execution_plan, children)?,
stages,
))
}
} else if let Some(window) =
execution_plan.as_any().downcast_ref::<WindowAggExec>()
{
Err(BallistaError::NotImplemented(format!(
"WindowAggExec with window {:?}",
window
)))
} else {
Ok((
with_new_children_if_necessary(execution_plan, children)?,
stages,
))
}
.boxed()
}

/// Generate a new stage ID
Expand Down Expand Up @@ -318,9 +310,7 @@ mod test {

let mut planner = DistributedPlanner::new();
let job_uuid = Uuid::new_v4();
let stages = planner
.plan_query_stages(&job_uuid.to_string(), plan)
.await?;
let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
for stage in &stages {
println!("{}", displayable(stage.as_ref()).indent());
}
Expand Down Expand Up @@ -432,9 +422,7 @@ order by

let mut planner = DistributedPlanner::new();
let job_uuid = Uuid::new_v4();
let stages = planner
.plan_query_stages(&job_uuid.to_string(), plan)
.await?;
let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
for stage in &stages {
println!("{}", displayable(stage.as_ref()).indent());
}
Expand Down Expand Up @@ -580,9 +568,7 @@ order by

let mut planner = DistributedPlanner::new();
let job_uuid = Uuid::new_v4();
let stages = planner
.plan_query_stages(&job_uuid.to_string(), plan)
.await?;
let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;

let partial_hash = stages[0].children()[0].clone();
let partial_hash_serde = roundtrip_operator(partial_hash.clone())?;
Expand Down
Loading

0 comments on commit a2c794e

Please sign in to comment.