Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Keda autoscaling for ballista in k8s #586

Merged
merged 7 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ballista/rust/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ uuid = { version = "0.8", features = ["v4"] }

[build-dependencies]
configure_me_codegen = "0.4.0"
tonic-build = { version = "0.4" }

[package.metadata.configure_me.bin]
scheduler = "scheduler_config_spec.toml"
7 changes: 6 additions & 1 deletion ballista/rust/scheduler/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,10 @@ extern crate configure_me_codegen;
fn main() -> Result<(), String> {
println!("cargo:rerun-if-changed=scheduler_config_spec.toml");
configure_me_codegen::build_script_auto()
.map_err(|e| format!("configure_me code generation failed: {}", e))
.map_err(|e| format!("configure_me code generation failed: {}", e))?;

println!("cargo:rerun-if-changed=proto/keda.proto");
tonic_build::configure()
.compile(&["proto/keda.proto"], &["proto"])
.map_err(|e| format!("protobuf compilation failed: {}", e))
}
63 changes: 63 additions & 0 deletions ballista/rust/scheduler/proto/keda.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2020 The KEDA Authors.

and others that have contributed code to the public domain.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at.

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file comes from https://github.com/kedacore/keda/blob/main/pkg/scalers/externalscaler/externalscaler.proto
syntax = "proto3";

package externalscaler;
option go_package = ".;externalscaler";

service ExternalScaler {
rpc IsActive(ScaledObjectRef) returns (IsActiveResponse) {}
// Commented out since we aren't supporting the streaming scaler interface at the moment
// rpc StreamIsActive(ScaledObjectRef) returns (stream IsActiveResponse) {}
rpc GetMetricSpec(ScaledObjectRef) returns (GetMetricSpecResponse) {}
rpc GetMetrics(GetMetricsRequest) returns (GetMetricsResponse) {}
}

message ScaledObjectRef {
string name = 1;
string namespace = 2;
map<string, string> scalerMetadata = 3;
}

message IsActiveResponse {
bool result = 1;
}

message GetMetricSpecResponse {
repeated MetricSpec metricSpecs = 1;
}

message MetricSpec {
string metricName = 1;
int64 targetSize = 2;
}

message GetMetricsRequest {
ScaledObjectRef scaledObjectRef = 1;
string metricName = 2;
}

message GetMetricsResponse {
repeated MetricValue metricValues = 1;
}

message MetricValue {
string metricName = 1;
int64 metricValue = 2;
}
69 changes: 64 additions & 5 deletions ballista/rust/scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,22 @@ pub use standalone::new_standalone_scheduler;
#[cfg(test)]
pub mod test_utils;

// include the generated protobuf source as a submodule
#[allow(clippy::all)]
pub mod externalscaler {
include!(concat!(env!("OUT_DIR"), "/externalscaler.rs"));
}

use std::{convert::TryInto, sync::Arc};
use std::{fmt, net::IpAddr};

use ballista_core::serde::protobuf::{
execute_query_params::Query, executor_registration::OptionalHost, job_status,
scheduler_grpc_server::SchedulerGrpc, ExecuteQueryParams, ExecuteQueryResult,
FailedJob, FilePartitionMetadata, FileType, GetFileMetadataParams,
GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, JobStatus,
PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, TaskDefinition,
TaskStatus,
scheduler_grpc_server::SchedulerGrpc, task_status, ExecuteQueryParams,
ExecuteQueryResult, FailedJob, FilePartitionMetadata, FileType,
GetFileMetadataParams, GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult,
JobStatus, PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob,
TaskDefinition, TaskStatus,
};
use ballista_core::serde::scheduler::ExecutorMeta;

Expand All @@ -62,6 +68,10 @@ impl parse_arg::ParseArgFromStr for ConfigBackend {
}
}

use crate::externalscaler::{
external_scaler_server::ExternalScaler, GetMetricSpecResponse, GetMetricsRequest,
GetMetricsResponse, IsActiveResponse, MetricSpec, MetricValue, ScaledObjectRef,
};
use crate::planner::DistributedPlanner;

use log::{debug, error, info, warn};
Expand Down Expand Up @@ -103,6 +113,55 @@ impl SchedulerServer {
}
}

const INFLIGHT_TASKS_METRIC_NAME: &str = "inflight_tasks";

#[tonic::async_trait]
impl ExternalScaler for SchedulerServer {
async fn is_active(
&self,
_request: Request<ScaledObjectRef>,
) -> Result<Response<IsActiveResponse>, tonic::Status> {
let tasks = self.state.get_all_tasks().await.map_err(|e| {
let msg = format!("Error reading tasks: {}", e);
error!("{}", msg);
tonic::Status::internal(msg)
})?;
let result = tasks.iter().any(|(_key, task)| {
!matches!(
task.status,
Some(task_status::Status::Completed(_))
| Some(task_status::Status::Failed(_))
)
});
debug!("Are there active tasks? {}", result);
Ok(Response::new(IsActiveResponse { result }))
}

async fn get_metric_spec(
&self,
_request: Request<ScaledObjectRef>,
) -> Result<Response<GetMetricSpecResponse>, tonic::Status> {
Ok(Response::new(GetMetricSpecResponse {
metric_specs: vec![MetricSpec {
metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
target_size: 1,
}],
}))
}

async fn get_metrics(
&self,
_request: Request<GetMetricsRequest>,
) -> Result<Response<GetMetricsResponse>, tonic::Status> {
Ok(Response::new(GetMetricsResponse {
metric_values: vec![MetricValue {
metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
metric_value: 10000000, // A very high number to saturate the HPA
}],
}))
}
}

#[tonic::async_trait]
impl SchedulerGrpc for SchedulerServer {
async fn poll_work(
Expand Down
4 changes: 4 additions & 0 deletions ballista/rust/scheduler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Ballista Rust scheduler binary.

use anyhow::{Context, Result};
use ballista_scheduler::externalscaler::external_scaler_server::ExternalScalerServer;
use futures::future::{self, Either, TryFutureExt};
use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
use std::convert::Infallible;
Expand Down Expand Up @@ -72,8 +73,11 @@ async fn start_server(
let scheduler_grpc_server =
SchedulerGrpcServer::new(scheduler_server.clone());

let keda_scaler = ExternalScalerServer::new(scheduler_server.clone());

let mut tonic = TonicServer::builder()
.add_service(scheduler_grpc_server)
.add_service(keda_scaler)
.into_service();
let mut warp = warp::service(get_routes(scheduler_server));

Expand Down
28 changes: 15 additions & 13 deletions ballista/rust/scheduler/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,15 @@ impl SchedulerState {
Ok((&value).try_into()?)
}

pub async fn get_all_tasks(&self) -> Result<HashMap<String, TaskStatus>> {
self.config_client
.get_from_prefix(&get_task_prefix(&self.namespace))
.await?
.into_iter()
.map(|(key, bytes)| Ok((key, decode_protobuf(&bytes)?)))
.collect()
}

/// This function ensures that the task wasn't assigned to an executor that died.
/// If that is the case, then the task is re-scheduled.
/// Returns true if the task was dead, false otherwise.
Expand Down Expand Up @@ -274,18 +283,12 @@ impl SchedulerState {
&self,
executor_id: &str,
) -> Result<Option<(TaskStatus, Arc<dyn ExecutionPlan>)>> {
let kvs: HashMap<String, Vec<u8>> = self
.config_client
.get_from_prefix(&get_task_prefix(&self.namespace))
.await?
.into_iter()
.collect();
let tasks = self.get_all_tasks().await?;
// TODO: Make the duration a configurable parameter
let executors = self
.get_alive_executors_metadata(Duration::from_secs(60))
.await?;
'tasks: for (_key, value) in kvs.iter() {
let mut status: TaskStatus = decode_protobuf(value)?;
'tasks: for (_key, status) in tasks.iter() {
if status.status.is_none() {
let partition = status.partition_id.as_ref().unwrap();
let plan = self
Expand All @@ -301,31 +304,29 @@ impl SchedulerState {
for unresolved_shuffle in unresolved_shuffles {
for stage_id in unresolved_shuffle.query_stage_ids {
for partition_id in 0..unresolved_shuffle.partition_count {
let referenced_task = kvs
let referenced_task = tasks
.get(&get_task_status_key(
&self.namespace,
&partition.job_id,
stage_id,
partition_id,
))
.unwrap();
let referenced_task: TaskStatus =
decode_protobuf(referenced_task)?;
let task_is_dead = self
.reschedule_dead_task(&referenced_task, &executors)
.await?;
if task_is_dead {
continue 'tasks;
} else if let Some(task_status::Status::Completed(
CompletedTask { executor_id },
)) = referenced_task.status
)) = &referenced_task.status
{
let empty = vec![];
let locations =
partition_locations.entry(stage_id).or_insert(empty);
let executor_meta = executors
.iter()
.find(|exec| exec.id == executor_id)
.find(|exec| exec.id == *executor_id)
.unwrap()
.clone();
locations.push(vec![
Expand All @@ -350,6 +351,7 @@ impl SchedulerState {
remove_unresolved_shuffles(plan.as_ref(), &partition_locations)?;

// If we get here, there are no more unresolved shuffled and the task can be run
let mut status = status.clone();
status.status = Some(task_status::Status::Running(RunningTask {
executor_id: executor_id.to_owned(),
}));
Expand Down
50 changes: 48 additions & 2 deletions docs/user-guide/src/distributed/kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The k8s deployment consists of:
- k8s deployment for one or more executor processes
- k8s service to route traffic to the schedulers
- k8s persistent volume and persistent volume claims to make local data accessible to Ballista
- _(optional)_ a [keda](http://keda.sh) instance for autoscaling the number of executors

## Limitations

Expand Down Expand Up @@ -163,8 +164,8 @@ spec:
image: <your-image>
command: ["/executor"]
args:
- "--bind-port=50051",
- "--scheduler-host=ballista-scheduler",
- "--bind-port=50051"
- "--scheduler-host=ballista-scheduler"
- "--scheduler-port=50050"
ports:
- containerPort: 50051
Expand Down Expand Up @@ -216,3 +217,48 @@ Run the following kubectl command to delete the cluster.
```bash
kubectl delete -f cluster.yaml
```

## Adding autoscaling for executors

Ballista supports autoscaling for executors through [Keda](http://keda.sh). Keda allows scaling a deployment
through custom metrics which are exposed through the Ballista scheduler, and it can even scale the number of
executors down to 0 if there is no activity in the cluster.

Keda can be installed in your kubernetes cluster through a single command line:

```bash
kubectl apply -f https://github.com/kedacore/keda/releases/download/v2.3.0/keda-2.3.0.yaml
```

Once you have deployed Keda on your cluster, you can now deploy a new kubernetes object called `ScaledObject`
which will let Keda know how to scale your executors. In order to do that, copy the following YAML into a
`scale.yaml` file:

```yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: ballista-executor
spec:
scaleTargetRef:
name: ballista-executor
minReplicaCount: 0
maxReplicaCount: 5
triggers:
- type: external
metadata:
# Change this DNS if the scheduler isn't deployed in the "default" namespace
scalerAddress: ballista-scheduler.default.svc.cluster.local:50050
```

And then deploy it into the cluster:

```bash
kubectl apply -f scale.yaml
```

If the cluster is inactive, Keda will now scale the number of executors down to 0, and will scale them up when
you launch a query. Please note that Keda will perform a scan once every 30 seconds, so it might take a bit to
scale the executors.

Please visit Keda's [documentation page](https://keda.sh/docs/2.3/concepts/scaling-deployments/) for more information.