Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for multi-scheduler deployments #59

Merged
merged 39 commits into from
Jul 16, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
3e54405
Initial design and implementation
thinkharderdev Jun 1, 2022
607233a
ExecutorManager tests
thinkharderdev Jun 2, 2022
dc70bf2
Only consider alive executors
thinkharderdev Jun 3, 2022
c97b531
Use correct session ID provided in request
thinkharderdev Jun 3, 2022
11a6be7
Fix bug in etcd key scan
thinkharderdev Jun 3, 2022
8d8ef30
Debugging
thinkharderdev Jun 3, 2022
632e996
Drop for EtcdLock
thinkharderdev Jun 3, 2022
2f39bcf
Better locking
thinkharderdev Jun 3, 2022
5fe25c7
Debug for ExecutionGraph
thinkharderdev Jun 3, 2022
780d301
Fix partition accounting in ExecutionGraph
thinkharderdev Jun 4, 2022
830ed24
Fix input partition accounting
thinkharderdev Jun 4, 2022
d2812ea
Handle stages with multiple inputs better
thinkharderdev Jun 4, 2022
3f3e90b
Simplify output buffer
thinkharderdev Jun 5, 2022
7958086
Cleanup
thinkharderdev Jun 5, 2022
ecb995c
Cleanup
thinkharderdev Jun 5, 2022
2eef3b9
Merge remote-tracking branch 'upstream/master' into high-availability…
thinkharderdev Jun 5, 2022
fcdf2ad
Linting
thinkharderdev Jun 5, 2022
2f48ff5
Linting and docs
thinkharderdev Jun 6, 2022
30db7f5
Job queueing and general cleanup
thinkharderdev Jun 6, 2022
92e7cd2
Handle job queueing and failure
thinkharderdev Jun 6, 2022
80ef1be
Tests
thinkharderdev Jun 6, 2022
18b9fd2
Fix doc comments
thinkharderdev Jun 6, 2022
30ba6d9
Tests
thinkharderdev Jun 6, 2022
b35733b
Add license header
thinkharderdev Jun 6, 2022
dba6d1b
Fix graph complete logic
thinkharderdev Jun 6, 2022
2017ea3
Fix bug in partition mapping
thinkharderdev Jun 6, 2022
e3e6add
Eagerly offer pending tasks
thinkharderdev Jun 13, 2022
c496f97
Tests for event loop
thinkharderdev Jun 13, 2022
cd33317
Merge remote-tracking branch 'upstream/master' into high-availability…
thinkharderdev Jun 14, 2022
3f6ad7b
Merge upstream
thinkharderdev Jun 14, 2022
f8e3cff
Merge remote-tracking branch 'upstream/master' into high-availability…
thinkharderdev Jul 4, 2022
ab89d3b
Fix compiler error after rebase
thinkharderdev Jul 4, 2022
995c846
Clippy fix
thinkharderdev Jul 6, 2022
6130fd9
Merge pull request #4 from coralogix/scheduler-fix
thinkharderdev Jul 12, 2022
0e8905a
Use correct bind address for executor registration
thinkharderdev Jul 12, 2022
799f10b
Use correct keyspace when initing heartbeats
mpurins-coralogix Jul 13, 2022
3782fbf
Fix after cherry-pick bugfixes
thinkharderdev Jul 16, 2022
b4d8d83
Merge remote-tracking branch 'upstream/master' into high-availability…
thinkharderdev Jul 16, 2022
0323b85
Fix conflicts after merge
thinkharderdev Jul 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,37 @@ enum JoinSide{
///////////////////////////////////////////////////////////////////////////////////////////////////
// Ballista Scheduling
///////////////////////////////////////////////////////////////////////////////////////////////////
message TaskInputPartitions {
uint32 partition = 1;
repeated PartitionLocation partition_location = 2;
}

message GraphStageInput {
uint32 stage_id = 1;
repeated TaskInputPartitions partition_locations = 2;
bool complete = 3;
}


message ExecutionGraphStage {
uint64 stage_id = 1;
uint32 partitions = 2;
PhysicalHashRepartition output_partitioning = 3;
repeated GraphStageInput inputs = 4;
bytes plan = 5;
repeated TaskStatus task_statuses = 6;
uint32 output_link = 7;
bool resolved = 8;
}

message ExecutionGraph {
string job_id = 1;
string session_id = 2;
JobStatus status = 3;
repeated ExecutionGraphStage stages = 4;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't clear to me how these stages form a dag. Are the dependencies between stages stored elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, sorry I should have explained that. I'll add better docs to this struct, but for now each stage has an output_link: Option<usize> which specifies where it sends it's output. If output_link is None then the stage is final and it sends its output to the ExecutionGraphs output_locations. Likewise, each stage has a inputs: HashMap<usize,StageOuput> which "collects" input locations from its input stages.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Yes, some comments in the structs here would be great,

uint64 output_partitions = 5;
repeated PartitionLocation output_locations = 6;
}

message KeyValuePair {
string key = 1;
Expand Down Expand Up @@ -795,6 +826,10 @@ message TaskDefinition {
repeated KeyValuePair props = 5;
}

message SessionSettings {
repeated KeyValuePair configs = 1;
}

message JobSessionConfig {
string session_id = 1;
repeated KeyValuePair configs = 2;
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl BallistaConfigBuilder {
}

/// Ballista configuration
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct BallistaConfig {
/// Settings stored in map for easy serde
settings: HashMap<String, String>,
Expand Down
29 changes: 16 additions & 13 deletions ballista/rust/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,29 @@
// specific language governing permissions and limitations
// under the License.

use crate::as_task_status;
use crate::executor::Executor;
use ballista_core::error::BallistaError;
use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::{sync::Arc, time::Duration};

use datafusion::physical_plan::ExecutionPlan;
use log::{debug, error, info, trace, warn};
use tonic::transport::Channel;

use ballista_core::serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult,
TaskDefinition, TaskStatus,
};

use crate::as_task_status;
use crate::executor::Executor;
use ballista_core::error::BallistaError;
use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
use log::{debug, error, info, warn};
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::{sync::Arc, time::Duration};
use tonic::transport::Channel;

pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
mut scheduler: SchedulerGrpcClient<Channel>,
Expand All @@ -54,7 +57,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
std::sync::mpsc::channel::<TaskStatus>();

loop {
debug!("Starting registration loop with scheduler");
trace!("Starting registration loop with scheduler");

let task_status: Vec<TaskStatus> =
sample_tasks_status(&mut task_status_receiver).await;
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/scheduler/src/api/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ pub(crate) async fn scheduler_state<T: AsLogicalPlan, U: AsExecutionPlan>(
// TODO: Display last seen information in UI
let executors: Vec<ExecutorMetaResponse> = data_server
.state
.get_executors_metadata()
.executor_manager
.get_executor_state()
.await
.unwrap_or_default()
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/scheduler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ async fn main() -> Result<()> {
let etcd = etcd_client::Client::connect(&[opt.etcd_urls], None)
.await
.context("Could not connect to etcd")?;
Arc::new(EtcdClient::new(etcd))
Arc::new(EtcdClient::new(namespace.clone(), etcd))
}
#[cfg(not(feature = "etcd"))]
StateBackend::Etcd => {
Expand Down
188 changes: 87 additions & 101 deletions ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::{
with_new_children_if_necessary, ExecutionPlan, Partitioning,
};
use futures::future::BoxFuture;
use futures::FutureExt;

use log::info;

type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, Vec<Arc<ShuffleWriterExec>>);
Expand All @@ -59,15 +58,14 @@ impl DistributedPlanner {
/// Returns a vector of ExecutionPlans, where the root node is a [ShuffleWriterExec].
/// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec].
/// A [ShuffleWriterExec] is created whenever the partitioning changes.
pub async fn plan_query_stages<'a>(
pub fn plan_query_stages<'a>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some reason remove the async 🤔
I think there are some io work in plan_query_stages like save status in db

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was doing that at tone time but the implementation now is not doing IO so I changed it back to sync.

&'a mut self,
job_id: &'a str,
execution_plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<Arc<ShuffleWriterExec>>> {
info!("planning query stages");
let (new_plan, mut stages) = self
.plan_query_stages_internal(job_id, execution_plan)
.await?;
let (new_plan, mut stages) =
self.plan_query_stages_internal(job_id, execution_plan)?;
stages.push(create_shuffle_writer(
job_id,
self.next_stage_id(),
Expand All @@ -84,97 +82,91 @@ impl DistributedPlanner {
&'a mut self,
job_id: &'a str,
execution_plan: Arc<dyn ExecutionPlan>,
) -> BoxFuture<'a, Result<PartialQueryStageResult>> {
async move {
// recurse down and replace children
if execution_plan.children().is_empty() {
return Ok((execution_plan, vec![]));
}
) -> Result<PartialQueryStageResult> {
// async move {
// recurse down and replace children
if execution_plan.children().is_empty() {
return Ok((execution_plan, vec![]));
}

let mut stages = vec![];
let mut children = vec![];
for child in execution_plan.children() {
let (new_child, mut child_stages) = self
.plan_query_stages_internal(job_id, child.clone())
.await?;
children.push(new_child);
stages.append(&mut child_stages);
}
let mut stages = vec![];
let mut children = vec![];
for child in execution_plan.children() {
let (new_child, mut child_stages) =
self.plan_query_stages_internal(job_id, child.clone())?;
children.push(new_child);
stages.append(&mut child_stages);
}

if let Some(_coalesce) = execution_plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
{
let shuffle_writer = create_shuffle_writer(
job_id,
self.next_stage_id(),
children[0].clone(),
None,
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
shuffle_writer.stage_id(),
shuffle_writer.schema(),
shuffle_writer.output_partitioning().partition_count(),
shuffle_writer
.shuffle_output_partitioning()
.map(|p| p.partition_count())
.unwrap_or_else(|| {
shuffle_writer.output_partitioning().partition_count()
}),
));
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(
execution_plan,
vec![unresolved_shuffle],
)?,
stages,
))
} else if let Some(repart) =
execution_plan.as_any().downcast_ref::<RepartitionExec>()
{
match repart.output_partitioning() {
Partitioning::Hash(_, _) => {
let shuffle_writer = create_shuffle_writer(
job_id,
self.next_stage_id(),
children[0].clone(),
Some(repart.partitioning().to_owned()),
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
shuffle_writer.stage_id(),
shuffle_writer.schema(),
shuffle_writer.output_partitioning().partition_count(),
shuffle_writer
.shuffle_output_partitioning()
.map(|p| p.partition_count())
.unwrap_or_else(|| {
shuffle_writer.output_partitioning().partition_count()
}),
));
stages.push(shuffle_writer);
Ok((unresolved_shuffle, stages))
}
_ => {
// remove any non-hash repartition from the distributed plan
Ok((children[0].clone(), stages))
}
if let Some(_coalesce) = execution_plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
{
let shuffle_writer = create_shuffle_writer(
job_id,
self.next_stage_id(),
children[0].clone(),
None,
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
shuffle_writer.stage_id(),
shuffle_writer.schema(),
shuffle_writer.output_partitioning().partition_count(),
shuffle_writer
.shuffle_output_partitioning()
.map(|p| p.partition_count())
.unwrap_or_else(|| {
shuffle_writer.output_partitioning().partition_count()
}),
));
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
stages,
))
} else if let Some(repart) =
execution_plan.as_any().downcast_ref::<RepartitionExec>()
{
match repart.output_partitioning() {
Partitioning::Hash(_, _) => {
let shuffle_writer = create_shuffle_writer(
job_id,
self.next_stage_id(),
children[0].clone(),
Some(repart.partitioning().to_owned()),
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
shuffle_writer.stage_id(),
shuffle_writer.schema(),
shuffle_writer.output_partitioning().partition_count(),
shuffle_writer
.shuffle_output_partitioning()
.map(|p| p.partition_count())
.unwrap_or_else(|| {
shuffle_writer.output_partitioning().partition_count()
}),
));
stages.push(shuffle_writer);
Ok((unresolved_shuffle, stages))
}
_ => {
// remove any non-hash repartition from the distributed plan
Ok((children[0].clone(), stages))
}
} else if let Some(window) =
execution_plan.as_any().downcast_ref::<WindowAggExec>()
{
Err(BallistaError::NotImplemented(format!(
"WindowAggExec with window {:?}",
window
)))
} else {
Ok((
with_new_children_if_necessary(execution_plan, children)?,
stages,
))
}
} else if let Some(window) =
execution_plan.as_any().downcast_ref::<WindowAggExec>()
{
Err(BallistaError::NotImplemented(format!(
"WindowAggExec with window {:?}",
window
)))
} else {
Ok((
with_new_children_if_necessary(execution_plan, children)?,
stages,
))
}
.boxed()
}

/// Generate a new stage ID
Expand Down Expand Up @@ -318,9 +310,7 @@ mod test {

let mut planner = DistributedPlanner::new();
let job_uuid = Uuid::new_v4();
let stages = planner
.plan_query_stages(&job_uuid.to_string(), plan)
.await?;
let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
for stage in &stages {
println!("{}", displayable(stage.as_ref()).indent());
}
Expand Down Expand Up @@ -432,9 +422,7 @@ order by

let mut planner = DistributedPlanner::new();
let job_uuid = Uuid::new_v4();
let stages = planner
.plan_query_stages(&job_uuid.to_string(), plan)
.await?;
let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
for stage in &stages {
println!("{}", displayable(stage.as_ref()).indent());
}
Expand Down Expand Up @@ -580,9 +568,7 @@ order by

let mut planner = DistributedPlanner::new();
let job_uuid = Uuid::new_v4();
let stages = planner
.plan_query_stages(&job_uuid.to_string(), plan)
.await?;
let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;

let partial_hash = stages[0].children()[0].clone();
let partial_hash_serde = roundtrip_operator(partial_hash.clone())?;
Expand Down
Loading