diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml index 215c58a7fb3f..9bca8d969571 100644 --- a/ballista/rust/scheduler/Cargo.toml +++ b/ballista/rust/scheduler/Cargo.toml @@ -61,6 +61,7 @@ uuid = { version = "0.8", features = ["v4"] } [build-dependencies] configure_me_codegen = "0.4.0" +tonic-build = { version = "0.4" } [package.metadata.configure_me.bin] scheduler = "scheduler_config_spec.toml" diff --git a/ballista/rust/scheduler/build.rs b/ballista/rust/scheduler/build.rs index bae6a3bfe2e6..e90bd495a9e4 100644 --- a/ballista/rust/scheduler/build.rs +++ b/ballista/rust/scheduler/build.rs @@ -20,5 +20,10 @@ extern crate configure_me_codegen; fn main() -> Result<(), String> { println!("cargo:rerun-if-changed=scheduler_config_spec.toml"); configure_me_codegen::build_script_auto() - .map_err(|e| format!("configure_me code generation failed: {}", e)) + .map_err(|e| format!("configure_me code generation failed: {}", e))?; + + println!("cargo:rerun-if-changed=proto/keda.proto"); + tonic_build::configure() + .compile(&["proto/keda.proto"], &["proto"]) + .map_err(|e| format!("protobuf compilation failed: {}", e)) } diff --git a/ballista/rust/scheduler/proto/keda.proto b/ballista/rust/scheduler/proto/keda.proto new file mode 100644 index 000000000000..051dd438f41a --- /dev/null +++ b/ballista/rust/scheduler/proto/keda.proto @@ -0,0 +1,63 @@ +/* + Copyright 2020 The KEDA Authors. + + and others that have contributed code to the public domain. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at. + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +// This file comes from https://github.com/kedacore/keda/blob/main/pkg/scalers/externalscaler/externalscaler.proto +syntax = "proto3"; + +package externalscaler; +option go_package = ".;externalscaler"; + +service ExternalScaler { + rpc IsActive(ScaledObjectRef) returns (IsActiveResponse) {} + // Commented out since we aren't supporting the streaming scaler interface at the moment + // rpc StreamIsActive(ScaledObjectRef) returns (stream IsActiveResponse) {} + rpc GetMetricSpec(ScaledObjectRef) returns (GetMetricSpecResponse) {} + rpc GetMetrics(GetMetricsRequest) returns (GetMetricsResponse) {} +} + +message ScaledObjectRef { + string name = 1; + string namespace = 2; + map scalerMetadata = 3; +} + +message IsActiveResponse { + bool result = 1; +} + +message GetMetricSpecResponse { + repeated MetricSpec metricSpecs = 1; +} + +message MetricSpec { + string metricName = 1; + int64 targetSize = 2; +} + +message GetMetricsRequest { + ScaledObjectRef scaledObjectRef = 1; + string metricName = 2; +} + +message GetMetricsResponse { + repeated MetricValue metricValues = 1; +} + +message MetricValue { + string metricName = 1; + int64 metricValue = 2; +} \ No newline at end of file diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 3620f79baaa5..3bd4c03aa9c3 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -28,16 +28,22 @@ pub use standalone::new_standalone_scheduler; #[cfg(test)] pub mod test_utils; +// include the generated protobuf source as a submodule +#[allow(clippy::all)] +pub mod externalscaler { + include!(concat!(env!("OUT_DIR"), "/externalscaler.rs")); +} + use std::{convert::TryInto, sync::Arc}; use std::{fmt, net::IpAddr}; use ballista_core::serde::protobuf::{ execute_query_params::Query, executor_registration::OptionalHost, job_status, - scheduler_grpc_server::SchedulerGrpc, ExecuteQueryParams, ExecuteQueryResult, - FailedJob, FilePartitionMetadata, FileType, GetFileMetadataParams, - GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, JobStatus, - PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, TaskDefinition, - TaskStatus, + scheduler_grpc_server::SchedulerGrpc, task_status, ExecuteQueryParams, + ExecuteQueryResult, FailedJob, FilePartitionMetadata, FileType, + GetFileMetadataParams, GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, + JobStatus, PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, + TaskDefinition, TaskStatus, }; use ballista_core::serde::scheduler::ExecutorMeta; @@ -62,6 +68,10 @@ impl parse_arg::ParseArgFromStr for ConfigBackend { } } +use crate::externalscaler::{ + external_scaler_server::ExternalScaler, GetMetricSpecResponse, GetMetricsRequest, + GetMetricsResponse, IsActiveResponse, MetricSpec, MetricValue, ScaledObjectRef, +}; use crate::planner::DistributedPlanner; use log::{debug, error, info, warn}; @@ -103,6 +113,55 @@ impl SchedulerServer { } } +const INFLIGHT_TASKS_METRIC_NAME: &str = "inflight_tasks"; + +#[tonic::async_trait] +impl ExternalScaler for SchedulerServer { + async fn is_active( + &self, + _request: Request, + ) -> Result, tonic::Status> { + let tasks = self.state.get_all_tasks().await.map_err(|e| { + let msg = format!("Error reading tasks: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; + let result = tasks.iter().any(|(_key, task)| { + !matches!( + task.status, + Some(task_status::Status::Completed(_)) + | Some(task_status::Status::Failed(_)) + ) + }); + debug!("Are there active tasks? {}", result); + Ok(Response::new(IsActiveResponse { result })) + } + + async fn get_metric_spec( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Ok(Response::new(GetMetricSpecResponse { + metric_specs: vec![MetricSpec { + metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(), + target_size: 1, + }], + })) + } + + async fn get_metrics( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Ok(Response::new(GetMetricsResponse { + metric_values: vec![MetricValue { + metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(), + metric_value: 10000000, // A very high number to saturate the HPA + }], + })) + } +} + #[tonic::async_trait] impl SchedulerGrpc for SchedulerServer { async fn poll_work( diff --git a/ballista/rust/scheduler/src/main.rs b/ballista/rust/scheduler/src/main.rs index 34386ca6c561..7b79eb1b39ac 100644 --- a/ballista/rust/scheduler/src/main.rs +++ b/ballista/rust/scheduler/src/main.rs @@ -18,6 +18,7 @@ //! Ballista Rust scheduler binary. use anyhow::{Context, Result}; +use ballista_scheduler::externalscaler::external_scaler_server::ExternalScalerServer; use futures::future::{self, Either, TryFutureExt}; use hyper::{server::conn::AddrStream, service::make_service_fn, Server}; use std::convert::Infallible; @@ -72,8 +73,11 @@ async fn start_server( let scheduler_grpc_server = SchedulerGrpcServer::new(scheduler_server.clone()); + let keda_scaler = ExternalScalerServer::new(scheduler_server.clone()); + let mut tonic = TonicServer::builder() .add_service(scheduler_grpc_server) + .add_service(keda_scaler) .into_service(); let mut warp = warp::service(get_routes(scheduler_server)); diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index a17c82d4b737..cbee3f1bef69 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -236,6 +236,15 @@ impl SchedulerState { Ok((&value).try_into()?) } + pub async fn get_all_tasks(&self) -> Result> { + self.config_client + .get_from_prefix(&get_task_prefix(&self.namespace)) + .await? + .into_iter() + .map(|(key, bytes)| Ok((key, decode_protobuf(&bytes)?))) + .collect() + } + /// This function ensures that the task wasn't assigned to an executor that died. /// If that is the case, then the task is re-scheduled. /// Returns true if the task was dead, false otherwise. @@ -274,18 +283,12 @@ impl SchedulerState { &self, executor_id: &str, ) -> Result)>> { - let kvs: HashMap> = self - .config_client - .get_from_prefix(&get_task_prefix(&self.namespace)) - .await? - .into_iter() - .collect(); + let tasks = self.get_all_tasks().await?; // TODO: Make the duration a configurable parameter let executors = self .get_alive_executors_metadata(Duration::from_secs(60)) .await?; - 'tasks: for (_key, value) in kvs.iter() { - let mut status: TaskStatus = decode_protobuf(value)?; + 'tasks: for (_key, status) in tasks.iter() { if status.status.is_none() { let partition = status.partition_id.as_ref().unwrap(); let plan = self @@ -301,7 +304,7 @@ impl SchedulerState { for unresolved_shuffle in unresolved_shuffles { for stage_id in unresolved_shuffle.query_stage_ids { for partition_id in 0..unresolved_shuffle.partition_count { - let referenced_task = kvs + let referenced_task = tasks .get(&get_task_status_key( &self.namespace, &partition.job_id, @@ -309,8 +312,6 @@ impl SchedulerState { partition_id, )) .unwrap(); - let referenced_task: TaskStatus = - decode_protobuf(referenced_task)?; let task_is_dead = self .reschedule_dead_task(&referenced_task, &executors) .await?; @@ -318,14 +319,14 @@ impl SchedulerState { continue 'tasks; } else if let Some(task_status::Status::Completed( CompletedTask { executor_id }, - )) = referenced_task.status + )) = &referenced_task.status { let empty = vec![]; let locations = partition_locations.entry(stage_id).or_insert(empty); let executor_meta = executors .iter() - .find(|exec| exec.id == executor_id) + .find(|exec| exec.id == *executor_id) .unwrap() .clone(); locations.push(vec![ @@ -350,6 +351,7 @@ impl SchedulerState { remove_unresolved_shuffles(plan.as_ref(), &partition_locations)?; // If we get here, there are no more unresolved shuffled and the task can be run + let mut status = status.clone(); status.status = Some(task_status::Status::Running(RunningTask { executor_id: executor_id.to_owned(), })); diff --git a/docs/user-guide/src/distributed/kubernetes.md b/docs/user-guide/src/distributed/kubernetes.md index 07b51f7871b6..4b80d1731943 100644 --- a/docs/user-guide/src/distributed/kubernetes.md +++ b/docs/user-guide/src/distributed/kubernetes.md @@ -28,6 +28,7 @@ The k8s deployment consists of: - k8s deployment for one or more executor processes - k8s service to route traffic to the schedulers - k8s persistent volume and persistent volume claims to make local data accessible to Ballista +- _(optional)_ a [keda](http://keda.sh) instance for autoscaling the number of executors ## Limitations @@ -163,8 +164,8 @@ spec: image: command: ["/executor"] args: - - "--bind-port=50051", - - "--scheduler-host=ballista-scheduler", + - "--bind-port=50051" + - "--scheduler-host=ballista-scheduler" - "--scheduler-port=50050" ports: - containerPort: 50051 @@ -216,3 +217,48 @@ Run the following kubectl command to delete the cluster. ```bash kubectl delete -f cluster.yaml ``` + +## Adding autoscaling for executors + +Ballista supports autoscaling for executors through [Keda](http://keda.sh). Keda allows scaling a deployment +through custom metrics which are exposed through the Ballista scheduler, and it can even scale the number of +executors down to 0 if there is no activity in the cluster. + +Keda can be installed in your kubernetes cluster through a single command line: + +```bash +kubectl apply -f https://github.com/kedacore/keda/releases/download/v2.3.0/keda-2.3.0.yaml +``` + +Once you have deployed Keda on your cluster, you can now deploy a new kubernetes object called `ScaledObject` +which will let Keda know how to scale your executors. In order to do that, copy the following YAML into a +`scale.yaml` file: + +```yaml +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: ballista-executor +spec: + scaleTargetRef: + name: ballista-executor + minReplicaCount: 0 + maxReplicaCount: 5 + triggers: + - type: external + metadata: + # Change this DNS if the scheduler isn't deployed in the "default" namespace + scalerAddress: ballista-scheduler.default.svc.cluster.local:50050 +``` + +And then deploy it into the cluster: + +```bash +kubectl apply -f scale.yaml +``` + +If the cluster is inactive, Keda will now scale the number of executors down to 0, and will scale them up when +you launch a query. Please note that Keda will perform a scan once every 30 seconds, so it might take a bit to +scale the executors. + +Please visit Keda's [documentation page](https://keda.sh/docs/2.3/concepts/scaling-deployments/) for more information. \ No newline at end of file