Skip to content

Commit

Permalink
fix(prover): Run for zero queue to allow scaling down to 0 (#3115)
Browse files Browse the repository at this point in the history
## What ❔

* Run evaluation if queue is 0, but there some pods running in the
namespace.
* Add check if clusters data is ready.
* Add max_provers config.

<!-- What are the changes this PR brings about? -->
<!-- Example: This PR adds a PR template to the repo. -->
<!-- (For bigger PRs adding more context is appreciated) -->

## Why ❔

<!-- Why are these changes done? What goal do they contribute to? What
are the principles behind them? -->
<!-- Example: PR templates ensure PR reviewers, observers, and future
iterators are in context about the evolution of repos. -->

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [x] Code has been formatted via `zkstack dev fmt` and `zkstack dev
lint`.


ref ZKD-1855
  • Loading branch information
yorik authored Oct 17, 2024
1 parent 4cb28b1 commit bbe1919
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 44 deletions.
2 changes: 2 additions & 0 deletions core/lib/config/src/configs/prover_autoscaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub struct ProverAutoscalerScalerConfig {
pub cluster_priorities: HashMap<String, u32>,
/// Prover speed per GPU. Used to calculate desired number of provers for queue size.
pub prover_speed: HashMap<Gpu, u32>,
/// Maximum number of provers which can be run per cluster/GPU.
pub max_provers: HashMap<String, HashMap<Gpu, u32>>,
/// Duration after which pending pod considered long pending.
#[serde(default = "ProverAutoscalerScalerConfig::default_long_pending_duration")]
pub long_pending_duration: Duration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ message ProverSpeed {
optional uint32 speed = 2; // required
}

message MaxProver {
optional string cluster_and_gpu = 1; // required, format: <cluster>/<gpu>
optional uint32 max = 2; // required
}

message ProverAutoscalerScalerConfig {
optional uint32 prometheus_port = 1; // required
optional std.Duration scaler_run_interval = 2; // optional
Expand All @@ -43,4 +48,5 @@ message ProverAutoscalerScalerConfig {
repeated ClusterPriority cluster_priorities = 6; // optional
repeated ProverSpeed prover_speed = 7; // optional
optional uint32 long_pending_duration_s = 8; // optional
repeated MaxProver max_provers = 9; // optional
}
38 changes: 38 additions & 0 deletions core/lib/protobuf_config/src/prover_autoscaler.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use anyhow::Context as _;
use time::Duration;
use zksync_config::configs::{self, prover_autoscaler::Gpu};
Expand Down Expand Up @@ -92,6 +94,15 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig {
Some(s) => Duration::seconds(s.into()),
None => Self::Type::default_long_pending_duration(),
},
max_provers: self.max_provers.iter().fold(HashMap::new(), |mut acc, e| {
let (cluster_and_gpu, max) = e.read().expect("max_provers");
if let Some((cluster, gpu)) = cluster_and_gpu.split_once('/') {
acc.entry(cluster.to_string())
.or_default()
.insert(gpu.parse().expect("max_provers/gpu"), max);
}
acc
}),
})
}

Expand All @@ -117,6 +128,15 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig {
.map(|(k, v)| proto::ProverSpeed::build(&(*k, *v)))
.collect(),
long_pending_duration_s: Some(this.long_pending_duration.whole_seconds() as u32),
max_provers: this
.max_provers
.iter()
.flat_map(|(cluster, inner_map)| {
inner_map.iter().map(move |(gpu, max)| {
proto::MaxProver::build(&(format!("{}/{}", cluster, gpu), *max))
})
})
.collect(),
}
}
}
Expand Down Expand Up @@ -170,3 +190,21 @@ impl ProtoRepr for proto::ProverSpeed {
}
}
}

impl ProtoRepr for proto::MaxProver {
type Type = (String, u32);
fn read(&self) -> anyhow::Result<Self::Type> {
Ok((
required(&self.cluster_and_gpu)
.context("cluster_and_gpu")?
.parse()?,
*required(&self.max).context("max")?,
))
}
fn build(this: &Self::Type) -> Self {
Self {
cluster_and_gpu: Some(this.0.to_string()),
max: Some(this.1),
}
}
}
10 changes: 1 addition & 9 deletions prover/crates/bin/prover_autoscaler/src/cluster_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,11 @@ pub struct Namespace {
pub pods: HashMap<String, Pod>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Cluster {
pub name: String,
pub namespaces: HashMap<String, Namespace>,
}
impl Default for Cluster {
fn default() -> Self {
Self {
name: "".to_string(),
namespaces: HashMap::new(),
}
}
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct Clusters {
Expand Down
59 changes: 47 additions & 12 deletions prover/crates/bin/prover_autoscaler/src/global/scaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct Scaler {

/// Which cluster to use first.
cluster_priorities: HashMap<String, u32>,
max_provers: HashMap<String, HashMap<Gpu, u32>>,
prover_speed: HashMap<Gpu, u32>,
long_pending_duration: chrono::Duration,
}
Expand Down Expand Up @@ -87,6 +88,7 @@ impl Scaler {
watcher,
queuer,
cluster_priorities: config.cluster_priorities,
max_provers: config.max_provers,
prover_speed: config.prover_speed,
long_pending_duration: chrono::Duration::seconds(
config.long_pending_duration.whole_seconds(),
Expand All @@ -112,7 +114,12 @@ impl Scaler {
let e = gp_map.entry(gpu).or_insert(GPUPool {
name: cluster.name.clone(),
gpu,
max_pool_size: 100, // TODO: get from the agent.
max_pool_size: self
.max_provers
.get(&cluster.name)
.and_then(|inner_map| inner_map.get(&gpu))
.copied()
.unwrap_or(0),
..Default::default()
});

Expand Down Expand Up @@ -265,23 +272,46 @@ impl Scaler {
}
}

tracing::debug!("run result: provers {:?}, total: {}", &provers, total);
tracing::debug!(
"run result for namespace {}: provers {:?}, total: {}",
namespace,
&provers,
total
);

provers
}
}

/// is_namespace_running returns true if there are some pods running in it.
fn is_namespace_running(namespace: &str, clusters: &Clusters) -> bool {
clusters
.clusters
.values()
.flat_map(|v| v.namespaces.iter())
.filter_map(|(k, v)| if k == namespace { Some(v) } else { None })
.flat_map(|v| v.deployments.values())
.map(
|d| d.running + d.desired, // If there is something running or expected to run, we
// should consider the namespace.
)
.sum::<i32>()
> 0
}

#[async_trait::async_trait]
impl Task for Scaler {
async fn invoke(&self) -> anyhow::Result<()> {
let queue = self.queuer.get_queue().await.unwrap();

// TODO: Check that clusters data is ready.
let clusters = self.watcher.clusters.lock().await;
let guard = self.watcher.data.lock().await;
watcher::check_is_ready(&guard.is_ready)?;

for (ns, ppv) in &self.namespaces {
let q = queue.queue.get(ppv).cloned().unwrap_or(0);
if q > 0 {
let provers = self.run(ns, q, &clusters);
tracing::debug!("Running eval for namespace {ns} and PPV {ppv} found queue {q}");
if q > 0 || is_namespace_running(ns, &guard.clusters) {
let provers = self.run(ns, q, &guard.clusters);
for (k, num) in &provers {
AUTOSCALER_METRICS.provers[&(k.cluster.clone(), ns.clone(), k.gpu)]
.set(*num as u64);
Expand All @@ -302,22 +332,27 @@ mod tests {

use super::*;
use crate::{
cluster_types::{self, Deployment, Namespace, Pod},
cluster_types::{Deployment, Namespace, Pod},
global::{queuer, watcher},
};

#[test]
fn test_run() {
let watcher = watcher::Watcher {
cluster_agents: vec![],
clusters: Arc::new(Mutex::new(cluster_types::Clusters {
..Default::default()
})),
data: Arc::new(Mutex::new(watcher::WatchedData::default())),
};
let queuer = queuer::Queuer {
prover_job_monitor_url: "".to_string(),
};
let scaler = Scaler::new(watcher, queuer, ProverAutoscalerScalerConfig::default());
let scaler = Scaler::new(
watcher,
queuer,
ProverAutoscalerScalerConfig {
max_provers: HashMap::from([("foo".to_string(), HashMap::from([(Gpu::L4, 100)]))]),
..Default::default()
},
);
let got = scaler.run(
&"prover".to_string(),
1499,
Expand Down Expand Up @@ -355,6 +390,6 @@ mod tests {
},
3,
)]);
assert!(got == want);
assert_eq!(got, want);
}
}
49 changes: 34 additions & 15 deletions prover/crates/bin/prover_autoscaler/src/global/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::HashMap, sync::Arc};

use anyhow::{Context, Ok};
use anyhow::{anyhow, Context, Ok, Result};
use futures::future;
use reqwest::Method;
use tokio::sync::Mutex;
Expand All @@ -12,15 +12,31 @@ use crate::{
task_wiring::Task,
};

#[derive(Default)]
pub struct WatchedData {
pub clusters: Clusters,
pub is_ready: Vec<bool>,
}

pub fn check_is_ready(v: &Vec<bool>) -> Result<()> {
for b in v {
if !b {
return Err(anyhow!("Clusters data is not ready"));
}
}
Ok(())
}

#[derive(Clone)]
pub struct Watcher {
/// List of base URLs of all agents.
pub cluster_agents: Vec<Arc<Url>>,
pub clusters: Arc<Mutex<Clusters>>,
pub data: Arc<Mutex<WatchedData>>,
}

impl Watcher {
pub fn new(agent_urls: Vec<String>) -> Self {
let size = agent_urls.len();
Self {
cluster_agents: agent_urls
.into_iter()
Expand All @@ -31,8 +47,11 @@ impl Watcher {
)
})
.collect(),
clusters: Arc::new(Mutex::new(Clusters {
clusters: HashMap::new(),
data: Arc::new(Mutex::new(WatchedData {
clusters: Clusters {
clusters: HashMap::new(),
},
is_ready: vec![false; size],
})),
}
}
Expand All @@ -45,7 +64,8 @@ impl Task for Watcher {
.cluster_agents
.clone()
.into_iter()
.map(|a| {
.enumerate()
.map(|(i, a)| {
tracing::debug!("Getting cluster data from agent {}.", a);
tokio::spawn(async move {
let url: String = a
Expand All @@ -55,13 +75,14 @@ impl Task for Watcher {
.to_string();
let response =
send_request_with_retries(&url, 5, Method::GET, None, None).await;
response
let res = response
.map_err(|err| {
anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}")
})?
.json::<Cluster>()
.await
.context("Failed to read response as json")
.context("Failed to read response as json");
Ok((i, res))
})
})
.collect();
Expand All @@ -71,18 +92,16 @@ impl Task for Watcher {
.await
.into_iter()
.map(|h| async move {
let c = h.unwrap().unwrap();
self.clusters
.lock()
.await
.clusters
.insert(c.name.clone(), c);
let (i, res) = h??;
let c = res?;
let mut guard = self.data.lock().await;
guard.clusters.clusters.insert(c.name.clone(), c);
guard.is_ready[i] = true;
Ok(())
})
.collect::<Vec<_>>(),
)
.await
.unwrap();
.await?;

Ok(())
}
Expand Down
14 changes: 6 additions & 8 deletions prover/crates/bin/prover_autoscaler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,21 @@ async fn main() -> anyhow::Result<()> {
let _ = rustls::crypto::ring::default_provider().install_default();
let client = kube::Client::try_default().await?;

tracing::info!("Starting ProverAutoscaler");

let mut tasks = vec![];

match opt.job {
AutoscalerType::Agent => {
let cluster = opt
.cluster_name
.context("cluster_name is required for Agent")?;
tracing::info!("Starting ProverAutoscaler Agent for cluster {}", cluster);
let agent_config = general_config.agent_config.context("agent_config")?;
let exporter_config = PrometheusExporterConfig::pull(agent_config.prometheus_port);
tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone())));

// TODO: maybe get cluster name from curl -H "Metadata-Flavor: Google"
// http://metadata.google.internal/computeMetadata/v1/instance/attributes/cluster-name
let watcher = Watcher::new(
client.clone(),
opt.cluster_name
.context("cluster_name is required for Agent")?,
agent_config.namespaces,
);
let watcher = Watcher::new(client.clone(), cluster, agent_config.namespaces);
let scaler = Scaler { client };
tasks.push(tokio::spawn(watcher.clone().run()));
tasks.push(tokio::spawn(agent::run_server(
Expand All @@ -108,6 +105,7 @@ async fn main() -> anyhow::Result<()> {
)))
}
AutoscalerType::Scaler => {
tracing::info!("Starting ProverAutoscaler Scaler");
let scaler_config = general_config.scaler_config.context("scaler_config")?;
let interval = scaler_config.scaler_run_interval.unsigned_abs();
let exporter_config = PrometheusExporterConfig::pull(scaler_config.prometheus_port);
Expand Down

0 comments on commit bbe1919

Please sign in to comment.