Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster state refactor Part 2 #658

Merged
merged 28 commits into from
Feb 11, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b606c77
Customize session builder
thinkharderdev Nov 14, 2022
a157581
Add setter for executor slots policy
thinkharderdev Nov 14, 2022
24d4830
Construct Executor with functions
thinkharderdev Nov 14, 2022
c615fce
Add queued and completed timestamps to successful job status
thinkharderdev Nov 14, 2022
5ad27c0
Add public methods to SchedulerServer
thinkharderdev Nov 15, 2022
490bda5
Public method for getting execution graph
thinkharderdev Nov 15, 2022
a802315
Public method for stage metrics
thinkharderdev Nov 15, 2022
ff96bcd
Use node-level local limit (#20)
thinkharderdev Oct 24, 2022
694f6e2
configure_me_codegen retroactively reserved on our `bind_host` parame…
Nov 17, 2022
91119e4
Add ClusterState trait
thinkharderdev Dec 9, 2022
18790f4
Merge remote-tracking branch 'cgx/cluster-state' into cluster-state-r…
thinkharderdev Dec 9, 2022
41f228c
Refactor slightly for clarity
thinkharderdev Dec 9, 2022
70e1bcf
Revert "Use node-level local limit (#20)"
thinkharderdev Dec 9, 2022
96a8c9d
Revert "Public method for stage metrics"
thinkharderdev Dec 9, 2022
081b224
Revert "Public method for getting execution graph"
thinkharderdev Dec 9, 2022
7ae9aaa
Revert "Add public methods to SchedulerServer"
thinkharderdev Dec 9, 2022
d34ecb5
Revert "Add queued and completed timestamps to successful job status"
thinkharderdev Dec 9, 2022
ee2c9d0
Revert "Construct Executor with functions"
thinkharderdev Dec 9, 2022
3948c81
Always forget the apache header
thinkharderdev Dec 9, 2022
7062743
Merge remote-tracking branch 'origin' into cluster-state-refactor
thinkharderdev Jan 20, 2023
950882b
WIP
thinkharderdev Feb 3, 2023
f03f208
Implement JobState
thinkharderdev Feb 6, 2023
3767b17
Tests and fixes
thinkharderdev Feb 7, 2023
6733143
do not hold ref across await point
thinkharderdev Feb 7, 2023
c66433c
Merge remote-tracking branch 'origin/main' into cluster-state-refactor-2
thinkharderdev Feb 8, 2023
12ab248
Fix clippy warnings
thinkharderdev Feb 8, 2023
1f3b491
Fix tomlfmt github action
thinkharderdev Feb 9, 2023
43bfaf6
uncomment test
thinkharderdev Feb 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,10 @@ jobs:
#
# ignore ./Cargo.toml because putting workspaces in multi-line lists make it easy to read
ci/scripts/rust_toml_fmt.sh
git diff --exit-code
if test -f "./Cargo.toml.bak"; then
echo "cargo tomlfmt found format violations"
exit 1
fi
env:
CARGO_HOME: "/github/home/.cargo"
CARGO_TARGET_DIR: "/github/home/target"
Expand Down
29 changes: 27 additions & 2 deletions ballista/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ message ExecutorMetadata {
ExecutorSpecification specification = 5;
}


// Used by grpc
message ExecutorRegistration {
string id = 1;
Expand Down Expand Up @@ -336,6 +337,15 @@ message ExecutorResource {
}
}

message AvailableTaskSlots {
string executor_id = 1;
uint32 slots = 2;
}

message ExecutorTaskSlots {
repeated AvailableTaskSlots task_slots = 1;
}

message ExecutorData {
string executor_id = 1;
repeated ExecutorResourcePair resources = 2;
Expand Down Expand Up @@ -544,18 +554,33 @@ message GetJobStatusParams {

message SuccessfulJob {
repeated PartitionLocation partition_location = 1;
uint64 queued_at = 2;
uint64 started_at = 3;
uint64 ended_at = 4;
}

message QueuedJob {}
message QueuedJob {
uint64 queued_at = 1;
}

// TODO: add progress report
message RunningJob {}
message RunningJob {
uint64 queued_at = 1;
uint64 started_at = 2;
string scheduler = 3;
}

message FailedJob {
string error = 1;
uint64 queued_at = 2;
uint64 started_at = 3;
uint64 ended_at = 4;
}

message JobStatus {
string job_id = 5;
string job_name = 6;

oneof status {
QueuedJob queued = 1;
RunningJob running = 2;
Expand Down
44 changes: 42 additions & 2 deletions ballista/core/src/serde/generated/ballista.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,20 @@ pub mod executor_resource {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct AvailableTaskSlots {
#[prost(string, tag = "1")]
pub executor_id: ::prost::alloc::string::String,
#[prost(uint32, tag = "2")]
pub slots: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorTaskSlots {
#[prost(message, repeated, tag = "1")]
pub task_slots: ::prost::alloc::vec::Vec<AvailableTaskSlots>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorData {
#[prost(string, tag = "1")]
pub executor_id: ::prost::alloc::string::String,
Expand Down Expand Up @@ -933,23 +947,49 @@ pub struct GetJobStatusParams {
pub struct SuccessfulJob {
#[prost(message, repeated, tag = "1")]
pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
#[prost(uint64, tag = "2")]
pub queued_at: u64,
#[prost(uint64, tag = "3")]
pub started_at: u64,
#[prost(uint64, tag = "4")]
pub ended_at: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct QueuedJob {}
pub struct QueuedJob {
#[prost(uint64, tag = "1")]
pub queued_at: u64,
}
/// TODO: add progress report
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RunningJob {}
pub struct RunningJob {
#[prost(uint64, tag = "1")]
pub queued_at: u64,
#[prost(uint64, tag = "2")]
pub started_at: u64,
#[prost(string, tag = "3")]
pub scheduler: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FailedJob {
#[prost(string, tag = "1")]
pub error: ::prost::alloc::string::String,
#[prost(uint64, tag = "2")]
pub queued_at: u64,
#[prost(uint64, tag = "3")]
pub started_at: u64,
#[prost(uint64, tag = "4")]
pub ended_at: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct JobStatus {
#[prost(string, tag = "5")]
pub job_id: ::prost::alloc::string::String,
#[prost(string, tag = "6")]
pub job_name: ::prost::alloc::string::String,
#[prost(oneof = "job_status::Status", tags = "1, 2, 3, 4")]
pub status: ::core::option::Option<job_status::Status>,
}
Expand Down
11 changes: 7 additions & 4 deletions ballista/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ use datafusion::execution::FunctionRegistry;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use datafusion_proto::common::proto_error;
use datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
use datafusion_proto::{
convert_required,
logical_plan::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec},
physical_plan::{AsExecutionPlan, PhysicalExtensionCodec},
};

use prost::Message;
use std::fmt::Debug;
use std::marker::PhantomData;
Expand Down Expand Up @@ -69,16 +71,17 @@ pub fn decode_protobuf(bytes: &[u8]) -> Result<BallistaAction, BallistaError> {
}

#[derive(Clone, Debug)]
pub struct BallistaCodec<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
pub struct BallistaCodec<
T: 'static + AsLogicalPlan = LogicalPlanNode,
U: 'static + AsExecutionPlan = PhysicalPlanNode,
> {
logical_extension_codec: Arc<dyn LogicalExtensionCodec>,
physical_extension_codec: Arc<dyn PhysicalExtensionCodec>,
logical_plan_repr: PhantomData<T>,
physical_plan_repr: PhantomData<U>,
}

impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> Default
for BallistaCodec<T, U>
{
impl Default for BallistaCodec {
fn default() -> Self {
Self {
logical_extension_codec: Arc::new(DefaultLogicalExtensionCodec {}),
Expand Down
11 changes: 2 additions & 9 deletions ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,10 @@ doc = "Route for proxying flight results via scheduler. Should be of the form 'I

[[param]]
abbr = "b"
name = "config_backend"
type = "ballista_scheduler::state::backend::StateBackend"
doc = "The configuration backend for the scheduler, possible values: etcd, memory, sled. Default: sled"
default = "ballista_scheduler::state::backend::StateBackend::Sled"

[[param]]
abbr = "c"
name = "cluster_backend"
type = "ballista_scheduler::state::backend::StateBackend"
type = "ballista_scheduler::cluster::ClusterStorage"
doc = "The configuration backend for the scheduler cluster state, possible values: etcd, memory, sled. Default: sled"
default = "ballista_scheduler::state::backend::StateBackend::Sled"
default = "ballista_scheduler::cluster::ClusterStorage::Sled"

[[param]]
abbr = "n"
Expand Down
5 changes: 3 additions & 2 deletions ballista/scheduler/src/api/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ pub(crate) async fn get_query_stages<T: AsLogicalPlan, U: AsExecutionPlan>(
{
Ok(warp::reply::json(&QueryStagesResponse {
stages: graph
.as_ref()
.stages()
.iter()
.map(|(id, stage)| {
Expand Down Expand Up @@ -303,7 +304,7 @@ pub(crate) async fn get_job_dot_graph<T: AsLogicalPlan, U: AsExecutionPlan>(
.await
.map_err(|_| warp::reject())?
{
ExecutionGraphDot::generate(graph).map_err(|_| warp::reject())
ExecutionGraphDot::generate(graph.as_ref()).map_err(|_| warp::reject())
} else {
Ok("Not Found".to_string())
}
Expand All @@ -322,7 +323,7 @@ pub(crate) async fn get_query_stage_dot_graph<T: AsLogicalPlan, U: AsExecutionPl
.await
.map_err(|_| warp::reject())?
{
ExecutionGraphDot::generate_for_query_stage(graph, stage_id)
ExecutionGraphDot::generate_for_query_stage(graph.as_ref(), stage_id)
.map_err(|_| warp::reject())
} else {
Ok("Not Found".to_string())
Expand Down
97 changes: 21 additions & 76 deletions ballista/scheduler/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

//! Ballista Rust scheduler binary.

use std::{env, io, sync::Arc};
use std::{env, io};

use anyhow::{Context, Result};
use anyhow::Result;

use ballista_core::print_version;
use ballista_scheduler::scheduler_process::start_server;
#[cfg(feature = "etcd")]
use ballista_scheduler::state::backend::etcd::EtcdClient;
use ballista_scheduler::state::backend::memory::MemoryBackendClient;
#[cfg(feature = "sled")]
use ballista_scheduler::state::backend::sled::SledClient;
use ballista_scheduler::state::backend::{StateBackend, StateBackendClient};

use crate::config::{Config, ResultExt};
use ballista_core::config::LogRotationPolicy;
use ballista_scheduler::cluster::BallistaCluster;
use ballista_scheduler::config::{ClusterStorageConfig, SchedulerConfig};
use tracing_subscriber::EnvFilter;

#[macro_use]
extern crate configure_me;
Expand All @@ -43,12 +43,6 @@ mod config {
));
}

use ballista_core::config::LogRotationPolicy;
use ballista_scheduler::config::SchedulerConfig;
use ballista_scheduler::state::backend::cluster::DefaultClusterState;
use config::prelude::*;
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> Result<()> {
// parse options
Expand All @@ -61,25 +55,14 @@ async fn main() -> Result<()> {
std::process::exit(0);
}

let config_backend = init_kv_backend(&opt.config_backend, &opt).await?;

let cluster_state = if opt.cluster_backend == opt.config_backend {
Arc::new(DefaultClusterState::new(config_backend.clone()))
} else {
let cluster_kv_store = init_kv_backend(&opt.cluster_backend, &opt).await?;

Arc::new(DefaultClusterState::new(cluster_kv_store))
};

let special_mod_log_level = opt.log_level_setting;
let namespace = opt.namespace;
let external_host = opt.external_host;
let bind_host = opt.bind_host;
let port = opt.bind_port;
let log_dir = opt.log_dir;
let print_thread_info = opt.print_thread_info;
let log_file_name_prefix = format!("scheduler_{namespace}_{external_host}_{port}");
let scheduler_name = format!("{external_host}:{port}");

let log_file_name_prefix = format!(
"scheduler_{}_{}_{}",
opt.namespace, opt.external_host, opt.bind_port
);

let rust_log = env::var(EnvFilter::DEFAULT_ENV);
let log_filter = EnvFilter::new(rust_log.unwrap_or(special_mod_log_level));
Expand Down Expand Up @@ -117,10 +100,13 @@ async fn main() -> Result<()> {
.init();
}

let addr = format!("{bind_host}:{port}");
let addr = format!("{}:{}", opt.bind_host, opt.bind_port);
let addr = addr.parse()?;

let config = SchedulerConfig {
namespace: opt.namespace,
external_host: opt.external_host,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this PR drops bind_host? That can be different from external_host, so I'm not sure we can do that.

Or maybe that is handled somewhere else in this PR and I haven't seen it yet?

Copy link
Member

@andygrove andygrove Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like bind_host is not currently used in the scheduler, but it is in the executor, so this seems like an oversight.

cc @avantgardnerio who may have opinions here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this is slightly confusing :). It is still used to build the bind address for the server

    let addr = format!("{}:{}", opt.bind_host, opt.bind_port);
    let addr = addr.parse()?;

The reason external_host was added to SchedulerConfig is so that we can pass it in to start_server and build the BallistaCluster there only from the SchedulerConfig. In order to do that I had to pass a few more things from the Config struct generated by configure_me which isn't available in the lib code.

So really the only point is to further minimize what we do in main.rs to just setting up logging and validating CLI arguments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I believe external_host is required for FlightSQL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I stand corrected - it appears to have morphed into its own advertise_flight_sql_endpoint.

bind_port: opt.bind_port,
scheduling_policy: opt.scheduler_policy,
event_loop_buffer_size: opt.event_loop_buffer_size,
executor_slots_policy: opt.executor_slots_policy,
Expand All @@ -129,54 +115,13 @@ async fn main() -> Result<()> {
finished_job_state_clean_up_interval_seconds: opt
.finished_job_state_clean_up_interval_seconds,
advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint,
cluster_storage: ClusterStorageConfig::Memory,
job_resubmit_interval_ms: (opt.job_resubmit_interval_ms > 0)
.then_some(opt.job_resubmit_interval_ms),
};
start_server(scheduler_name, config_backend, cluster_state, addr, config).await?;
Ok(())
}

async fn init_kv_backend(
backend: &StateBackend,
opt: &Config,
) -> Result<Arc<dyn StateBackendClient>> {
let cluster_backend: Arc<dyn StateBackendClient> = match backend {
#[cfg(feature = "etcd")]
StateBackend::Etcd => {
let etcd = etcd_client::Client::connect(&[opt.etcd_urls.clone()], None)
.await
.context("Could not connect to etcd")?;
Arc::new(EtcdClient::new(opt.namespace.clone(), etcd))
}
#[cfg(not(feature = "etcd"))]
StateBackend::Etcd => {
unimplemented!(
"build the scheduler with the `etcd` feature to use the etcd config backend"
)
}
#[cfg(feature = "sled")]
StateBackend::Sled => {
if opt.sled_dir.is_empty() {
Arc::new(
SledClient::try_new_temporary()
.context("Could not create sled config backend")?,
)
} else {
println!("{}", opt.sled_dir);
Arc::new(
SledClient::try_new(opt.sled_dir.clone())
.context("Could not create sled config backend")?,
)
}
}
#[cfg(not(feature = "sled"))]
StateBackend::Sled => {
unimplemented!(
"build the scheduler with the `sled` feature to use the sled config backend"
)
}
StateBackend::Memory => Arc::new(MemoryBackendClient::new()),
};
let cluster = BallistaCluster::new_from_config(&config).await?;

Ok(cluster_backend)
start_server(cluster, addr, config).await?;
Ok(())
}
Loading