Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(prover): Add endpoint to PJM to get queue reports #2918

Merged
merged 8 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/lib/basic_types/src/prover_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::{net::IpAddr, ops::Add, str::FromStr};

use chrono::{DateTime, Duration, NaiveDateTime, NaiveTime, Utc};
use serde::{Deserialize, Serialize};
use strum::{Display, EnumString};

use crate::{
Expand All @@ -27,7 +28,7 @@ pub struct ExtendedJobCountStatistics {
pub successful: usize,
}

#[derive(Debug, Clone, Copy, Default)]
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
pub struct JobCountStatistics {
pub queued: usize,
pub in_progress: usize,
Expand Down
2 changes: 2 additions & 0 deletions core/lib/config/src/configs/prover_job_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub struct ProverJobMonitorConfig {
/// The interval between runs for Witness Job Queuer.
#[serde(default = "ProverJobMonitorConfig::default_witness_job_queuer_run_interval_ms")]
pub witness_job_queuer_run_interval_ms: u64,
/// HTTP port of the ProverJobMonitor to send requests to.
pub http_port: u16,
}

impl ProverJobMonitorConfig {
Expand Down
1 change: 1 addition & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,7 @@ impl Distribution<configs::prover_job_monitor::ProverJobMonitorConfig> for Encod
prover_queue_reporter_run_interval_ms: self.sample(rng),
witness_generator_queue_reporter_run_interval_ms: self.sample(rng),
witness_job_queuer_run_interval_ms: self.sample(rng),
http_port: self.sample(rng),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/lib/env_config/src/prover_job_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod tests {
prover_queue_reporter_run_interval_ms: 10000,
witness_generator_queue_reporter_run_interval_ms: 10000,
witness_job_queuer_run_interval_ms: 10000,
http_port: 3074,
}
}

Expand All @@ -55,6 +56,7 @@ mod tests {
fn from_env_with_default() {
let config = r#"
PROVER_JOB_MONITOR_PROMETHEUS_PORT=3317
PROVER_JOB_MONITOR_HTTP_PORT=3074
PROVER_JOB_MONITOR_MAX_DB_CONNECTIONS=9
"#;
let mut lock = MUTEX.lock();
Expand All @@ -80,6 +82,7 @@ mod tests {
PROVER_JOB_MONITOR_PROVER_QUEUE_REPORTER_RUN_INTERVAL_MS=10001
PROVER_JOB_MONITOR_WITNESS_GENERATOR_QUEUE_REPORTER_RUN_INTERVAL_MS=10001
PROVER_JOB_MONITOR_WITNESS_JOB_QUEUER_RUN_INTERVAL_MS=10001
PROVER_JOB_MONITOR_HTTP_PORT=3074
"#;
let mut lock = MUTEX.lock();
lock.set_env(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ message ProverJobMonitor {
optional uint64 prover_queue_reporter_run_interval_ms = 12; // optional; ms
optional uint64 witness_generator_queue_reporter_run_interval_ms = 13; // optional; ms
optional uint64 witness_job_queuer_run_interval_ms = 14; // optional; ms
optional uint32 http_port = 15; // required; u32
}
4 changes: 4 additions & 0 deletions core/lib/protobuf_config/src/prover_job_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ impl ProtoRepr for proto::ProverJobMonitor {
.or_else(|| Some(Self::Type::default_witness_job_queuer_run_interval_ms())),
)
.context("witness_job_queuer_run_interval_ms")?,
http_port: required(&self.http_port)
.and_then(|x| Ok((*x).try_into()?))
.context("http_port")?,
})
}

Expand Down Expand Up @@ -126,6 +129,7 @@ impl ProtoRepr for proto::ProverJobMonitor {
this.witness_generator_queue_reporter_run_interval_ms,
),
witness_job_queuer_run_interval_ms: Some(this.witness_job_queuer_run_interval_ms),
http_port: Some(this.http_port.into()),
}
}
}
1 change: 1 addition & 0 deletions etc/env/base/prover_job_monitor.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ proof_compressor_queue_reporter_run_interval_ms = 10000
prover_queue_reporter_run_interval_ms = 10000
witness_generator_queue_reporter_run_interval_ms = 10000
witness_job_queuer_run_interval_ms = 10000
http_port = 3074
1 change: 1 addition & 0 deletions etc/env/file_based/general.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ prover_job_monitor:
prover_queue_reporter_run_interval_ms: 10000
witness_generator_queue_reporter_run_interval_ms: 10000
witness_job_queuer_run_interval_ms: 10000
http_port: 3074


base_token_adjuster:
Expand Down
29 changes: 25 additions & 4 deletions prover/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions prover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ categories = ["cryptography"]
[workspace.dependencies]
# Common dependencies
anyhow = "1.0"
axum = "0.7.5"
async-trait = "0.1"
bincode = "1"
chrono = "0.4.38"
Expand Down
3 changes: 3 additions & 0 deletions prover/crates/bin/prover_job_monitor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ zksync_prover_dal.workspace = true
zksync_utils.workspace = true
zksync_types.workspace = true
zksync_config = { workspace = true, features = ["observability_ext"] }
zksync_db_connection.workspace = true

vise.workspace = true

Expand All @@ -25,3 +26,5 @@ clap = { workspace = true, features = ["derive"] }
ctrlc = { workspace = true, features = ["termination"] }
tracing.workspace = true
async-trait.workspace = true
serde.workspace = true
axum.workspace = true
176 changes: 176 additions & 0 deletions prover/crates/bin/prover_job_monitor/src/autoscaler_queue_reporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use std::collections::HashMap;

use axum::{
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
Json, Router,
};
use serde::{Deserialize, Serialize};
use zksync_db_connection::error::DalError;
use zksync_prover_dal::{ConnectionPool, Prover, ProverDal};
use zksync_types::{
basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion,
prover_dal::JobCountStatistics,
};

#[derive(Debug, Clone)]
pub struct AutoscalerQueueReporter {
connection_pool: ConnectionPool<Prover>,
}

#[derive(Default, Debug, Serialize, Deserialize)]
pub struct QueueReport {
pub basic_witness_jobs: JobCountStatistics,
pub leaf_witness_jobs: JobCountStatistics,
pub node_witness_jobs: JobCountStatistics,
pub recursion_tip_witness_jobs: JobCountStatistics,
pub scheduler_witness_jobs: JobCountStatistics,
pub prover_jobs: JobCountStatistics,
pub proof_compressor_jobs: JobCountStatistics,
}

#[derive(Default, Debug, Serialize, Deserialize)]
pub struct VersionedQueueReport {
pub version: ProtocolSemanticVersion,
pub report: QueueReport,
}

impl AutoscalerQueueReporter {
pub fn new(connection_pool: ConnectionPool<Prover>) -> Self {
Self { connection_pool }
}

pub async fn get_report(&self) -> Result<Json<Vec<VersionedQueueReport>>, ProcessorError> {
tracing::debug!("Received request to get queue report");

let mut result = HashMap::<ProtocolSemanticVersion, QueueReport>::new();

for round in AggregationRound::ALL_ROUNDS {
self.get_witness_jobs_report(round, &mut result).await?;
}

self.get_prover_jobs_report(&mut result).await?;
self.get_proof_compressor_jobs_report(&mut result).await?;

Ok(Json(
result
.into_iter()
.map(|(version, report)| VersionedQueueReport { version, report })
.collect(),
))
}

async fn get_witness_jobs_report(
&self,
aggregation_round: AggregationRound,
state: &mut HashMap<ProtocolSemanticVersion, QueueReport>,
) -> anyhow::Result<()> {
let stats = self
.connection_pool
.connection()
.await?
.fri_witness_generator_dal()
.get_witness_jobs_stats(aggregation_round)
.await;

for (protocol_version, job_stats) in stats {
let report = state.entry(protocol_version).or_default();

match aggregation_round {
AggregationRound::BasicCircuits => report.basic_witness_jobs = job_stats,
AggregationRound::LeafAggregation => report.leaf_witness_jobs = job_stats,
AggregationRound::NodeAggregation => report.node_witness_jobs = job_stats,
AggregationRound::RecursionTip => report.recursion_tip_witness_jobs = job_stats,
AggregationRound::Scheduler => report.scheduler_witness_jobs = job_stats,
}
}
Ok(())
}

async fn get_prover_jobs_report(
&self,
state: &mut HashMap<ProtocolSemanticVersion, QueueReport>,
) -> anyhow::Result<()> {
let stats = self
.connection_pool
.connection()
.await?
.fri_prover_jobs_dal()
.get_generic_prover_jobs_stats()
.await;

for (protocol_version, stats) in stats {
let report = state.entry(protocol_version).or_default();

report.prover_jobs = stats;
}
Ok(())
}

async fn get_proof_compressor_jobs_report(
&self,
state: &mut HashMap<ProtocolSemanticVersion, QueueReport>,
) -> anyhow::Result<()> {
let stats = self
.connection_pool
.connection()
.await?
.fri_proof_compressor_dal()
.get_jobs_stats()
.await;

for (protocol_version, stats) in stats {
let report = state.entry(protocol_version).or_default();

report.proof_compressor_jobs = stats;
}

Ok(())
}
}

pub fn get_queue_reporter_router(connection_pool: ConnectionPool<Prover>) -> Router {
let autoscaler_queue_reporter = AutoscalerQueueReporter::new(connection_pool);

Router::new().route(
"/queue_report",
get(move || async move { autoscaler_queue_reporter.get_report().await }),
)
}

pub enum ProcessorError {
Dal(DalError),
Custom(String),
}

impl From<DalError> for ProcessorError {
fn from(err: DalError) -> Self {
ProcessorError::Dal(err)
}
}

impl From<anyhow::Error> for ProcessorError {
fn from(err: anyhow::Error) -> Self {
ProcessorError::Custom(err.to_string())
}
}

impl IntoResponse for ProcessorError {
fn into_response(self) -> Response {
let (status_code, message) = match self {
ProcessorError::Dal(err) => {
tracing::error!("Sqlx error: {:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed getting data from database",
)
}
ProcessorError::Custom(err) => {
tracing::error!("Custom error invoked: {:?}", &err);
(StatusCode::INTERNAL_SERVER_ERROR, "Internal error")
}
};
(status_code, message).into_response()
}
}
1 change: 1 addition & 0 deletions prover/crates/bin/prover_job_monitor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod archiver;
pub mod autoscaler_queue_reporter;
pub mod job_requeuer;
pub(crate) mod metrics;
pub mod queue_reporter;
Expand Down
Loading
Loading