diff --git a/Cargo.lock b/Cargo.lock index 7f249cad4c78..feb53dc5bfe2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8023,6 +8023,7 @@ dependencies = [ "num_enum 0.7.2", "serde", "serde_json", + "sqlx", "strum", "thiserror", "tiny-keccak 2.0.2", diff --git a/core/lib/basic_types/Cargo.toml b/core/lib/basic_types/Cargo.toml index 918aa41cad02..14c06f0c7c15 100644 --- a/core/lib/basic_types/Cargo.toml +++ b/core/lib/basic_types/Cargo.toml @@ -19,6 +19,7 @@ serde_json.workspace = true chrono.workspace = true strum = { workspace = true, features = ["derive"] } num_enum.workspace = true +sqlx = { workspace = true, feature= ["derive"]} anyhow.workspace = true url = { workspace = true, features = ["serde"] } diff --git a/core/lib/basic_types/src/prover_dal.rs b/core/lib/basic_types/src/prover_dal.rs index c14d1cb91fd3..690a5b7d7166 100644 --- a/core/lib/basic_types/src/prover_dal.rs +++ b/core/lib/basic_types/src/prover_dal.rs @@ -99,13 +99,13 @@ pub struct JobPosition { pub sequence_number: usize, } -#[derive(Debug, Default, PartialEq)] +#[derive(Debug, Default, PartialEq, Clone)] pub struct ProverJobStatusFailed { pub started_at: DateTime, pub error: String, } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub struct ProverJobStatusSuccessful { pub started_at: DateTime, pub time_taken: Duration, @@ -120,7 +120,7 @@ impl Default for ProverJobStatusSuccessful { } } -#[derive(Debug, Default, PartialEq)] +#[derive(Debug, Default, PartialEq, Clone)] pub struct ProverJobStatusInProgress { pub started_at: DateTime, } @@ -146,7 +146,7 @@ pub struct WitnessJobStatusFailed { pub error: String, } -#[derive(Debug, strum::Display, strum::EnumString, strum::AsRefStr, PartialEq)] +#[derive(Debug, strum::Display, strum::EnumString, strum::AsRefStr, PartialEq, Clone)] pub enum ProverJobStatus { #[strum(serialize = "queued")] Queued, @@ -238,6 +238,7 @@ impl FromStr for GpuProverInstanceStatus { } } +#[derive(Debug, Clone)] pub struct ProverJobFriInfo { pub id: u32, pub l1_batch_number: L1BatchNumber, @@ -260,6 +261,7 @@ pub struct ProverJobFriInfo { pub picked_by: Option, } +#[derive(Debug, Clone)] pub struct BasicWitnessGeneratorJobInfo { pub l1_batch_number: L1BatchNumber, pub merkle_tree_paths_blob_url: Option, @@ -276,6 +278,7 @@ pub struct BasicWitnessGeneratorJobInfo { pub eip_4844_blobs: Option, } +#[derive(Debug, Clone)] pub struct LeafWitnessGeneratorJobInfo { pub id: u32, pub l1_batch_number: L1BatchNumber, @@ -294,6 +297,7 @@ pub struct LeafWitnessGeneratorJobInfo { pub picked_by: Option, } +#[derive(Debug, Clone)] pub struct NodeWitnessGeneratorJobInfo { pub id: u32, pub l1_batch_number: L1BatchNumber, @@ -312,6 +316,22 @@ pub struct NodeWitnessGeneratorJobInfo { pub picked_by: Option, } +#[derive(Debug, Clone)] +pub struct RecursionTipWitnessGeneratorJobInfo { + pub l1_batch_number: L1BatchNumber, + pub status: WitnessJobStatus, + pub attempts: u32, + pub processing_started_at: Option, + pub time_taken: Option, + pub error: Option, + pub created_at: NaiveDateTime, + pub updated_at: NaiveDateTime, + pub number_of_final_node_jobs: Option, + pub protocol_version: Option, + pub picked_by: Option, +} + +#[derive(Debug, Clone)] pub struct SchedulerWitnessGeneratorJobInfo { pub l1_batch_number: L1BatchNumber, pub scheduler_partial_input_blob_url: String, @@ -326,7 +346,7 @@ pub struct SchedulerWitnessGeneratorJobInfo { pub picked_by: Option, } -#[derive(Debug, EnumString, Display)] +#[derive(Debug, EnumString, Display, Clone)] pub enum ProofCompressionJobStatus { #[strum(serialize = "queued")] Queued, @@ -342,6 +362,7 @@ pub enum ProofCompressionJobStatus { Skipped, } +#[derive(Debug, Clone)] pub struct ProofCompressionJobInfo { pub l1_batch_number: L1BatchNumber, pub attempts: u32, @@ -355,3 +376,15 @@ pub struct ProofCompressionJobInfo { pub time_taken: Option, pub picked_by: Option, } + +// This function corrects circuit IDs for the node witness generator. +// +// - Circuit IDs in the node witness generator are 2 higher than in other rounds. +// - The `EIP4844Repack` circuit (ID 255) is an exception and is set to 18. +pub fn correct_circuit_id(circuit_id: i16, aggregation_round: AggregationRound) -> u32 { + match (circuit_id, aggregation_round) { + (18, AggregationRound::NodeAggregation) => 255, + (circuit_id, AggregationRound::NodeAggregation) => (circuit_id as u32) - 2, + _ => circuit_id as u32, + } +} diff --git a/prover/Cargo.lock b/prover/Cargo.lock index cba0a1856d83..113c5b842976 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -4556,6 +4556,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bincode", + "circuit_definitions", "clap 4.4.6", "colored", "dialoguer", @@ -7769,6 +7770,7 @@ dependencies = [ "num_enum 0.7.2", "serde", "serde_json", + "sqlx", "strum", "thiserror", "tiny-keccak 2.0.2", diff --git a/prover/prover_cli/Cargo.toml b/prover/prover_cli/Cargo.toml index cf7b19b47e72..8b4e131caa2d 100644 --- a/prover/prover_cli/Cargo.toml +++ b/prover/prover_cli/Cargo.toml @@ -32,3 +32,4 @@ zksync_dal.workspace = true strum.workspace = true colored.workspace = true sqlx.workspace = true +circuit_definitions.workspace = true diff --git a/prover/prover_cli/src/commands/status/batch.rs b/prover/prover_cli/src/commands/status/batch.rs index b20078747f71..389437f17ac7 100644 --- a/prover/prover_cli/src/commands/status/batch.rs +++ b/prover/prover_cli/src/commands/status/batch.rs @@ -1,32 +1,45 @@ -use anyhow::{ensure, Context as _}; +use std::collections::BTreeMap; + +use anyhow::Context as _; +use circuit_definitions::zkevm_circuits::scheduler::aux::BaseLayerCircuitType; use clap::Args as ClapArgs; +use colored::*; use prover_dal::{Connection, ConnectionPool, Prover, ProverDal}; use zksync_types::{ - basic_fri_types::AggregationRound, prover_dal::WitnessJobStatus, url::SensitiveUrl, + basic_fri_types::AggregationRound, + prover_dal::{ + BasicWitnessGeneratorJobInfo, JobCountStatistics, LeafWitnessGeneratorJobInfo, + NodeWitnessGeneratorJobInfo, ProofCompressionJobInfo, ProverJobFriInfo, ProverJobStatus, + RecursionTipWitnessGeneratorJobInfo, SchedulerWitnessGeneratorJobInfo, + }, + url::SensitiveUrl, L1BatchNumber, }; -use super::utils::{AggregationRoundInfo, BatchData, Task, TaskStatus}; +use super::utils::{BatchData, StageInfo, Status}; use crate::cli::ProverCLIConfig; #[derive(ClapArgs)] pub struct Args { - #[clap(short = 'n', num_args = 1..)] + #[clap(short = 'n', num_args = 1.., required = true)] batches: Vec, #[clap(short, long, default_value("false"))] verbose: bool, } pub(crate) async fn run(args: Args, config: ProverCLIConfig) -> anyhow::Result<()> { - ensure!( - !args.batches.is_empty(), - "At least one batch number should be provided" - ); - let batches_data = get_batches_data(args.batches, config.db_url).await?; for batch_data in batches_data { - println!("{batch_data:?}"); + println!( + "== {} ==", + format!("Batch {} Status", batch_data.batch_number).bold() + ); + if !args.verbose { + display_batch_status(batch_data); + } else { + display_batch_info(batch_data); + } } Ok(()) @@ -50,47 +63,51 @@ async fn get_batches_data( for batch in batches { let current_batch_data = BatchData { batch_number: batch, - basic_witness_generator: Task::BasicWitnessGenerator { - status: get_proof_basic_witness_generator_status_for_batch(batch, &mut conn).await, - aggregation_round_info: get_aggregation_round_info_for_batch( + basic_witness_generator: StageInfo::BasicWitnessGenerator { + witness_generator_job_info: get_proof_basic_witness_generator_into_for_batch( + batch, &mut conn, + ) + .await, + prover_jobs_info: get_prover_jobs_info_for_batch( batch, AggregationRound::BasicCircuits, &mut conn, ) .await, }, - leaf_witness_generator: Task::LeafWitnessGenerator { - status: get_proof_leaf_witness_generator_status_for_batch(batch, &mut conn).await, - aggregation_round_info: get_aggregation_round_info_for_batch( + leaf_witness_generator: StageInfo::LeafWitnessGenerator { + witness_generator_jobs_info: get_proof_leaf_witness_generator_info_for_batch( + batch, &mut conn, + ) + .await, + prover_jobs_info: get_prover_jobs_info_for_batch( batch, AggregationRound::LeafAggregation, &mut conn, ) .await, }, - node_witness_generator: Task::NodeWitnessGenerator { - status: get_proof_node_witness_generator_status_for_batch(batch, &mut conn).await, - aggregation_round_info: get_aggregation_round_info_for_batch( - batch, - AggregationRound::NodeAggregation, - &mut conn, + node_witness_generator: StageInfo::NodeWitnessGenerator { + witness_generator_jobs_info: get_proof_node_witness_generator_info_for_batch( + batch, &mut conn, ) .await, - }, - scheduler_witness_generator: Task::SchedulerWitnessGenerator { - status: get_proof_scheduler_witness_generator_status_for_batch(batch, &mut conn) - .await, - aggregation_round_info: get_aggregation_round_info_for_batch( + prover_jobs_info: get_prover_jobs_info_for_batch( batch, - AggregationRound::Scheduler, + AggregationRound::NodeAggregation, &mut conn, ) .await, }, - compressor: Task::Compressor( - get_proof_compression_job_status_for_batch(batch, &mut conn).await, + recursion_tip_witness_generator: StageInfo::RecursionTipWitnessGenerator( + get_proof_recursion_tip_witness_generator_info_for_batch(batch, &mut conn).await, + ), + scheduler_witness_generator: StageInfo::SchedulerWitnessGenerator( + get_proof_scheduler_witness_generator_info_for_batch(batch, &mut conn).await, + ), + compressor: StageInfo::Compressor( + get_proof_compression_job_info_for_batch(batch, &mut conn).await, ), - ..Default::default() }; batches_data.push(current_batch_data); } @@ -98,80 +115,282 @@ async fn get_batches_data( Ok(batches_data) } -async fn get_aggregation_round_info_for_batch<'a>( +async fn get_prover_jobs_info_for_batch<'a>( batch_number: L1BatchNumber, aggregation_round: AggregationRound, conn: &mut Connection<'a, Prover>, -) -> AggregationRoundInfo { - let status: TaskStatus = conn - .fri_prover_jobs_dal() +) -> Vec { + conn.fri_prover_jobs_dal() .get_prover_jobs_stats_for_batch(batch_number, aggregation_round) .await - .into(); - - AggregationRoundInfo { - round: aggregation_round, - prover_jobs_status: status, - } } -async fn get_proof_basic_witness_generator_status_for_batch<'a>( +async fn get_proof_basic_witness_generator_into_for_batch<'a>( batch_number: L1BatchNumber, conn: &mut Connection<'a, Prover>, -) -> TaskStatus { +) -> Option { conn.fri_witness_generator_dal() .get_basic_witness_generator_job_for_batch(batch_number) .await - .map(|job| TaskStatus::from(job.status)) - .unwrap_or_default() } -async fn get_proof_leaf_witness_generator_status_for_batch<'a>( +async fn get_proof_leaf_witness_generator_info_for_batch<'a>( batch_number: L1BatchNumber, conn: &mut Connection<'a, Prover>, -) -> TaskStatus { +) -> Vec { conn.fri_witness_generator_dal() .get_leaf_witness_generator_jobs_for_batch(batch_number) .await - .iter() - .map(|s| s.status.clone()) - .collect::>() - .into() } -async fn get_proof_node_witness_generator_status_for_batch<'a>( +async fn get_proof_node_witness_generator_info_for_batch<'a>( batch_number: L1BatchNumber, conn: &mut Connection<'a, Prover>, -) -> TaskStatus { +) -> Vec { conn.fri_witness_generator_dal() .get_node_witness_generator_jobs_for_batch(batch_number) .await - .iter() - .map(|s| s.status.clone()) - .collect::>() - .into() } -async fn get_proof_scheduler_witness_generator_status_for_batch<'a>( +async fn get_proof_recursion_tip_witness_generator_info_for_batch<'a>( batch_number: L1BatchNumber, conn: &mut Connection<'a, Prover>, -) -> TaskStatus { +) -> Option { + conn.fri_witness_generator_dal() + .get_recursion_tip_witness_generator_jobs_for_batch(batch_number) + .await +} + +async fn get_proof_scheduler_witness_generator_info_for_batch<'a>( + batch_number: L1BatchNumber, + conn: &mut Connection<'a, Prover>, +) -> Option { conn.fri_witness_generator_dal() .get_scheduler_witness_generator_jobs_for_batch(batch_number) .await - .iter() - .map(|s| s.status.clone()) - .collect::>() - .into() } -async fn get_proof_compression_job_status_for_batch<'a>( +async fn get_proof_compression_job_info_for_batch<'a>( batch_number: L1BatchNumber, conn: &mut Connection<'a, Prover>, -) -> TaskStatus { +) -> Option { conn.fri_proof_compressor_dal() .get_proof_compression_job_for_batch(batch_number) .await - .map(|job| TaskStatus::from(job.status)) - .unwrap_or_default() +} + +fn display_batch_status(batch_data: BatchData) { + display_status_for_stage(batch_data.basic_witness_generator); + display_status_for_stage(batch_data.leaf_witness_generator); + display_status_for_stage(batch_data.node_witness_generator); + display_status_for_stage(batch_data.recursion_tip_witness_generator); + display_status_for_stage(batch_data.scheduler_witness_generator); + display_status_for_stage(batch_data.compressor); +} + +fn display_status_for_stage(stage_info: StageInfo) { + display_aggregation_round(&stage_info); + match stage_info.witness_generator_jobs_status() { + Status::Custom(msg) => { + println!("{}: {} \n", stage_info.to_string().bold(), msg); + } + Status::Queued | Status::WaitingForProofs | Status::Stuck | Status::JobsNotFound => { + println!( + "{}: {}", + stage_info.to_string().bold(), + stage_info.witness_generator_jobs_status() + ) + } + Status::InProgress | Status::Successful => { + println!( + "{}: {}", + stage_info.to_string().bold(), + stage_info.witness_generator_jobs_status() + ); + if let Some(job_status) = stage_info.prover_jobs_status() { + println!("> {}: {}", "Prover Jobs".to_owned().bold(), job_status); + } + } + } +} + +fn display_batch_info(batch_data: BatchData) { + display_info_for_stage(batch_data.basic_witness_generator); + display_info_for_stage(batch_data.leaf_witness_generator); + display_info_for_stage(batch_data.node_witness_generator); + display_info_for_stage(batch_data.recursion_tip_witness_generator); + display_info_for_stage(batch_data.scheduler_witness_generator); + display_info_for_stage(batch_data.compressor); +} + +fn display_info_for_stage(stage_info: StageInfo) { + display_aggregation_round(&stage_info); + match stage_info.witness_generator_jobs_status() { + Status::Custom(msg) => { + println!("{}: {}", stage_info.to_string().bold(), msg); + } + Status::Queued | Status::WaitingForProofs | Status::Stuck | Status::JobsNotFound => { + println!( + " > {}: {}", + stage_info.to_string().bold(), + stage_info.witness_generator_jobs_status() + ) + } + Status::InProgress => { + println!( + "v {}: {}", + stage_info.to_string().bold(), + stage_info.witness_generator_jobs_status() + ); + match stage_info { + StageInfo::BasicWitnessGenerator { + prover_jobs_info, .. + } => { + display_prover_jobs_info(prover_jobs_info); + } + StageInfo::LeafWitnessGenerator { + witness_generator_jobs_info, + prover_jobs_info, + } => { + display_leaf_witness_generator_jobs_info(witness_generator_jobs_info); + display_prover_jobs_info(prover_jobs_info); + } + StageInfo::NodeWitnessGenerator { + witness_generator_jobs_info, + prover_jobs_info, + } => { + display_node_witness_generator_jobs_info(witness_generator_jobs_info); + display_prover_jobs_info(prover_jobs_info); + } + _ => (), + } + } + Status::Successful => { + println!( + "> {}: {}", + stage_info.to_string().bold(), + stage_info.witness_generator_jobs_status() + ); + match stage_info { + StageInfo::BasicWitnessGenerator { + prover_jobs_info, .. + } + | StageInfo::LeafWitnessGenerator { + prover_jobs_info, .. + } + | StageInfo::NodeWitnessGenerator { + prover_jobs_info, .. + } => display_prover_jobs_info(prover_jobs_info), + _ => (), + } + } + } +} + +fn display_leaf_witness_generator_jobs_info( + mut leaf_witness_generators_jobs_info: Vec, +) { + leaf_witness_generators_jobs_info.sort_by_key(|job| job.circuit_id); + + leaf_witness_generators_jobs_info.iter().for_each(|job| { + println!( + " > {}: {}", + format!( + "{:?}", + BaseLayerCircuitType::from_numeric_value(job.circuit_id as u8) + ) + .bold(), + Status::from(job.status.clone()) + ) + }); +} + +fn display_node_witness_generator_jobs_info( + mut node_witness_generators_jobs_info: Vec, +) { + node_witness_generators_jobs_info.sort_by_key(|job| job.circuit_id); + + node_witness_generators_jobs_info.iter().for_each(|job| { + println!( + " > {}: {}", + format!( + "{:?}", + BaseLayerCircuitType::from_numeric_value(job.circuit_id as u8) + ) + .bold(), + Status::from(job.status.clone()) + ) + }); +} + +fn display_prover_jobs_info(prover_jobs_info: Vec) { + let prover_jobs_status = Status::from(prover_jobs_info.clone()); + + if matches!(prover_jobs_status, Status::Successful) + || matches!(prover_jobs_status, Status::JobsNotFound) + { + println!( + "> {}: {prover_jobs_status}", + "Prover Jobs".to_owned().bold() + ); + return; + } + + println!( + "v {}: {prover_jobs_status}", + "Prover Jobs".to_owned().bold() + ); + + let mut jobs_by_circuit_id: BTreeMap> = BTreeMap::new(); + prover_jobs_info.iter().for_each(|job| { + jobs_by_circuit_id + .entry(job.circuit_id) + .or_default() + .push(job.clone()) + }); + + for (circuit_id, prover_jobs_info) in jobs_by_circuit_id { + let status = Status::from(prover_jobs_info.clone()); + println!( + " > {}: {}", + format!( + "{:?}", + BaseLayerCircuitType::from_numeric_value(circuit_id as u8) + ) + .bold(), + status + ); + if matches!(status, Status::InProgress) { + display_job_status_count(prover_jobs_info); + } + } +} + +fn display_job_status_count(jobs: Vec) { + let mut jobs_counts = JobCountStatistics::default(); + let total_jobs = jobs.len(); + jobs.iter().for_each(|job| match job.status { + ProverJobStatus::Queued => jobs_counts.queued += 1, + ProverJobStatus::InProgress(_) => jobs_counts.in_progress += 1, + ProverJobStatus::Successful(_) => jobs_counts.successful += 1, + ProverJobStatus::Failed(_) => jobs_counts.failed += 1, + ProverJobStatus::Skipped | ProverJobStatus::Ignored | ProverJobStatus::InGPUProof => (), + }); + + println!(" - Total jobs: {}", total_jobs); + println!(" - Successful: {}", jobs_counts.successful); + println!(" - In Progress: {}", jobs_counts.in_progress); + println!(" - Queued: {}", jobs_counts.queued); + println!(" - Failed: {}", jobs_counts.failed); +} + +fn display_aggregation_round(stage_info: &StageInfo) { + if let Some(aggregation_round) = stage_info.aggregation_round() { + println!( + "\n-- {} --", + format!("Aggregation Round {}", aggregation_round as u8).bold() + ); + } else { + println!("\n-- {} --", "Proof Compression".to_owned().bold()); + }; } diff --git a/prover/prover_cli/src/commands/status/utils.rs b/prover/prover_cli/src/commands/status/utils.rs index d6b5e793b9b5..59c5553b530b 100644 --- a/prover/prover_cli/src/commands/status/utils.rs +++ b/prover/prover_cli/src/commands/status/utils.rs @@ -1,10 +1,13 @@ use std::fmt::Debug; -use colored::*; use strum::{Display, EnumString}; use zksync_types::{ basic_fri_types::AggregationRound, - prover_dal::{ProofCompressionJobStatus, ProverJobFriInfo, ProverJobStatus, WitnessJobStatus}, + prover_dal::{ + BasicWitnessGeneratorJobInfo, LeafWitnessGeneratorJobInfo, NodeWitnessGeneratorJobInfo, + ProofCompressionJobInfo, ProofCompressionJobStatus, ProverJobFriInfo, ProverJobStatus, + RecursionTipWitnessGeneratorJobInfo, SchedulerWitnessGeneratorJobInfo, WitnessJobStatus, + }, L1BatchNumber, }; @@ -13,83 +16,21 @@ pub struct BatchData { /// The number of the batch. pub batch_number: L1BatchNumber, /// The basic witness generator data. - pub basic_witness_generator: Task, + pub basic_witness_generator: StageInfo, /// The leaf witness generator data. - pub leaf_witness_generator: Task, + pub leaf_witness_generator: StageInfo, /// The node witness generator data. - pub node_witness_generator: Task, + pub node_witness_generator: StageInfo, /// The recursion tip data. - pub recursion_tip: Task, + pub recursion_tip_witness_generator: StageInfo, /// The scheduler data. - pub scheduler_witness_generator: Task, + pub scheduler_witness_generator: StageInfo, /// The compressor data. - pub compressor: Task, -} - -impl Debug for BatchData { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!( - f, - "== {} ==", - format!("Batch {} Status", self.batch_number).bold() - )?; - writeln!(f)?; - writeln!(f, "= {} =", "Proving Stages".to_owned().bold())?; - writeln!(f, "{:?}", self.basic_witness_generator)?; - writeln!(f, "{:?}", self.leaf_witness_generator)?; - writeln!(f, "{:?}", self.node_witness_generator)?; - writeln!(f, "{:?}", self.recursion_tip)?; - writeln!(f, "{:?}", self.scheduler_witness_generator)?; - writeln!(f, "{:?}", self.compressor) - } -} - -impl Default for BatchData { - fn default() -> Self { - BatchData { - batch_number: L1BatchNumber::default(), - basic_witness_generator: Task::BasicWitnessGenerator { - status: TaskStatus::default(), - aggregation_round_info: AggregationRoundInfo { - round: AggregationRound::BasicCircuits, - prover_jobs_status: TaskStatus::default(), - }, - }, - leaf_witness_generator: Task::LeafWitnessGenerator { - status: TaskStatus::default(), - aggregation_round_info: AggregationRoundInfo { - round: AggregationRound::LeafAggregation, - prover_jobs_status: TaskStatus::default(), - }, - }, - node_witness_generator: Task::NodeWitnessGenerator { - status: TaskStatus::default(), - aggregation_round_info: AggregationRoundInfo { - round: AggregationRound::NodeAggregation, - prover_jobs_status: TaskStatus::default(), - }, - }, - recursion_tip: Task::RecursionTip { - status: TaskStatus::default(), - aggregation_round_info: AggregationRoundInfo { - round: AggregationRound::RecursionTip, - prover_jobs_status: TaskStatus::default(), - }, - }, - scheduler_witness_generator: Task::SchedulerWitnessGenerator { - status: TaskStatus::default(), - aggregation_round_info: AggregationRoundInfo { - round: AggregationRound::Scheduler, - prover_jobs_status: TaskStatus::default(), - }, - }, - compressor: Task::Compressor(TaskStatus::JobsNotFound), - } - } + pub compressor: StageInfo, } #[derive(Default, Debug, EnumString, Clone, Display)] -pub enum TaskStatus { +pub enum Status { /// A custom status that can be set manually. /// Mostly used when a task has singular status. Custom(String), @@ -114,235 +55,207 @@ pub enum TaskStatus { JobsNotFound, } -// This implementation will change to `From>` for `AggregationRoundInfo` -// once the --verbose flag is implemented. -impl From> for TaskStatus { - fn from(jobs_vector: Vec) -> Self { - if jobs_vector.is_empty() { - TaskStatus::JobsNotFound - } else if jobs_vector +impl From for Status { + fn from(status: WitnessJobStatus) -> Self { + match status { + WitnessJobStatus::Queued => Status::Queued, + WitnessJobStatus::InProgress => Status::InProgress, + WitnessJobStatus::Successful(_) => Status::Successful, + WitnessJobStatus::Failed(_) => Status::InProgress, + WitnessJobStatus::WaitingForArtifacts => { + Status::Custom("Waiting for Artifacts ⏱️".to_owned()) + } + WitnessJobStatus::Skipped => Status::Custom("Skipped ⏩".to_owned()), + WitnessJobStatus::WaitingForProofs => Status::WaitingForProofs, + } + } +} + +impl From> for Status { + fn from(status_vector: Vec) -> Self { + if status_vector.is_empty() { + Status::JobsNotFound + } else if status_vector .iter() - .all(|job| matches!(job.status, ProverJobStatus::InGPUProof)) + .all(|job| matches!(job, WitnessJobStatus::Queued)) { - TaskStatus::Custom("In GPU ⚡️".to_owned()) - } else if jobs_vector + Status::Queued + } else if status_vector .iter() - .all(|job| matches!(job.status, ProverJobStatus::Queued)) + .all(|job| matches!(job, WitnessJobStatus::WaitingForProofs)) { - TaskStatus::Queued - } else if jobs_vector + Status::WaitingForProofs + } else if status_vector .iter() - .all(|job| matches!(job.status, ProverJobStatus::Successful(_))) + .all(|job| matches!(job, WitnessJobStatus::Successful(_))) { - TaskStatus::Successful + Status::Successful } else { - TaskStatus::InProgress + Status::InProgress } } } -impl From for TaskStatus { +impl From> for Status { + fn from(leaf_info_vector: Vec) -> Self { + leaf_info_vector + .iter() + .map(|s| s.status.clone()) + .collect::>() + .into() + } +} + +impl From> for Status { + fn from(node_info_vector: Vec) -> Self { + node_info_vector + .iter() + .map(|s| s.status.clone()) + .collect::>() + .into() + } +} + +impl From> for Status { + fn from(scheduler_info_vector: Vec) -> Self { + scheduler_info_vector + .iter() + .map(|s| s.status.clone()) + .collect::>() + .into() + } +} + +impl From> for Status { + fn from(scheduler_info_vector: Vec) -> Self { + scheduler_info_vector + .iter() + .map(|s| s.status.clone()) + .collect::>() + .into() + } +} + +impl From for Status { fn from(status: ProofCompressionJobStatus) -> Self { match status { - ProofCompressionJobStatus::Queued => TaskStatus::Queued, - ProofCompressionJobStatus::InProgress => TaskStatus::InProgress, - ProofCompressionJobStatus::Successful => TaskStatus::Successful, - ProofCompressionJobStatus::Failed => TaskStatus::InProgress, + ProofCompressionJobStatus::Queued => Status::Queued, + ProofCompressionJobStatus::InProgress => Status::InProgress, + ProofCompressionJobStatus::Successful => Status::Successful, + ProofCompressionJobStatus::Failed => Status::InProgress, ProofCompressionJobStatus::SentToServer => { - TaskStatus::Custom("Sent to server 📤".to_owned()) + Status::Custom("Sent to server 📤".to_owned()) } - ProofCompressionJobStatus::Skipped => TaskStatus::Custom("Skipped ⏩".to_owned()), + ProofCompressionJobStatus::Skipped => Status::Custom("Skipped ⏩".to_owned()), } } } -impl From> for TaskStatus { - fn from(status_vector: Vec) -> Self { - if status_vector.is_empty() { - TaskStatus::JobsNotFound - } else if status_vector +impl From> for Status { + fn from(jobs_vector: Vec) -> Self { + if jobs_vector.is_empty() { + Status::JobsNotFound + } else if jobs_vector .iter() - .all(|job| matches!(job, WitnessJobStatus::Queued)) + .all(|job| matches!(job.status, ProverJobStatus::InGPUProof)) { - TaskStatus::Queued - } else if status_vector + Status::Custom("In GPU Proof ⚡️".to_owned()) + } else if jobs_vector .iter() - .all(|job| matches!(job, WitnessJobStatus::WaitingForProofs)) + .all(|job| matches!(job.status, ProverJobStatus::Queued)) { - TaskStatus::WaitingForProofs - } else if status_vector + Status::Queued + } else if jobs_vector .iter() - .all(|job| matches!(job, WitnessJobStatus::Successful(_))) + .all(|job| matches!(job.status, ProverJobStatus::Successful(_))) { - TaskStatus::Successful + Status::Successful } else { - TaskStatus::InProgress + Status::InProgress } } } +#[allow(clippy::large_enum_variant)] #[derive(EnumString, Clone, Display)] -pub enum Task { - /// Represents the basic witness generator task and its status. +pub enum StageInfo { #[strum(to_string = "Basic Witness Generator")] BasicWitnessGenerator { - status: TaskStatus, - aggregation_round_info: AggregationRoundInfo, + witness_generator_job_info: Option, + prover_jobs_info: Vec, }, - /// Represents the leaf witness generator task, its status and the aggregation round 0 prover jobs data. #[strum(to_string = "Leaf Witness Generator")] LeafWitnessGenerator { - status: TaskStatus, - aggregation_round_info: AggregationRoundInfo, + witness_generator_jobs_info: Vec, + prover_jobs_info: Vec, }, - /// Represents the node witness generator task, its status and the aggregation round 1 prover jobs data. #[strum(to_string = "Node Witness Generator")] NodeWitnessGenerator { - status: TaskStatus, - aggregation_round_info: AggregationRoundInfo, + witness_generator_jobs_info: Vec, + prover_jobs_info: Vec, }, - /// Represents the recursion tip task, its status and the aggregation round 2 prover jobs data. #[strum(to_string = "Recursion Tip")] - RecursionTip { - status: TaskStatus, - aggregation_round_info: AggregationRoundInfo, - }, - /// Represents the scheduler task and its status. + RecursionTipWitnessGenerator(Option), #[strum(to_string = "Scheduler")] - SchedulerWitnessGenerator { - status: TaskStatus, - aggregation_round_info: AggregationRoundInfo, - }, - /// Represents the compressor task and its status. + SchedulerWitnessGenerator(Option), #[strum(to_string = "Compressor")] - Compressor(TaskStatus), + Compressor(Option), } -impl Task { - fn status(&self) -> TaskStatus { +impl StageInfo { + pub fn aggregation_round(&self) -> Option { match self { - Task::BasicWitnessGenerator { status, .. } - | Task::LeafWitnessGenerator { status, .. } - | Task::NodeWitnessGenerator { status, .. } - | Task::RecursionTip { status, .. } - | Task::SchedulerWitnessGenerator { status, .. } - | Task::Compressor(status) => status.clone(), + StageInfo::BasicWitnessGenerator { .. } => Some(AggregationRound::BasicCircuits), + StageInfo::LeafWitnessGenerator { .. } => Some(AggregationRound::LeafAggregation), + StageInfo::NodeWitnessGenerator { .. } => Some(AggregationRound::NodeAggregation), + StageInfo::RecursionTipWitnessGenerator { .. } => Some(AggregationRound::RecursionTip), + StageInfo::SchedulerWitnessGenerator { .. } => Some(AggregationRound::Scheduler), + StageInfo::Compressor(_) => None, } } - fn aggregation_round(&self) -> Option { - match self { - Task::BasicWitnessGenerator { - aggregation_round_info, - .. + pub fn prover_jobs_status(&self) -> Option { + match self.clone() { + StageInfo::BasicWitnessGenerator { + prover_jobs_info, .. } - | Task::LeafWitnessGenerator { - aggregation_round_info, - .. + | StageInfo::LeafWitnessGenerator { + prover_jobs_info, .. } - | Task::NodeWitnessGenerator { - aggregation_round_info, - .. - } - | Task::RecursionTip { - aggregation_round_info, - .. - } - | Task::SchedulerWitnessGenerator { - aggregation_round_info, - .. - } => Some(aggregation_round_info.round), - Task::Compressor(_) => None, + | StageInfo::NodeWitnessGenerator { + prover_jobs_info, .. + } => Some(Status::from(prover_jobs_info)), + StageInfo::RecursionTipWitnessGenerator(_) + | StageInfo::SchedulerWitnessGenerator(_) + | StageInfo::Compressor(_) => None, } } - /// Returns the status of the prover jobs. - /// If the task is not in progress or successful, returns None. - /// Otherwise, returns the status of the prover jobs if the task - /// has prover jobs. - fn prover_jobs_status(&self) -> Option { - match self { - Task::BasicWitnessGenerator { - status, - aggregation_round_info, - } - | Task::LeafWitnessGenerator { - status, - aggregation_round_info, - } - | Task::NodeWitnessGenerator { - status, - aggregation_round_info, - } - | Task::RecursionTip { - status, - aggregation_round_info, - } - | Task::SchedulerWitnessGenerator { - status, - aggregation_round_info, - } => match status { - TaskStatus::InProgress | TaskStatus::Successful => { - Some(aggregation_round_info.prover_jobs_status.clone()) - } - _ => None, - }, - Task::Compressor(_) => None, - } - } -} - -impl Debug for Task { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if let Some(aggregation_round_number) = self.aggregation_round() { - writeln!( - f, - "-- {} --", - format!("Aggregation Round {}", aggregation_round_number as u8).bold() - )?; - if let TaskStatus::Custom(msg) = self.status() { - writeln!(f, "{}: {}", self.to_string().bold(), msg)?; - } else { - writeln!(f, "{}: {}", self.to_string().bold(), self.status())?; - } - if let Some(prover_jobs_status) = self.prover_jobs_status() { - writeln!(f, "> Prover Jobs: {prover_jobs_status}")?; - } - } else { - writeln!(f, "-- {} --", self.to_string().bold())?; - writeln!(f, "{}", self.status())?; - } - Ok(()) - } -} - -impl From for TaskStatus { - fn from(status: WitnessJobStatus) -> Self { - match status { - WitnessJobStatus::Queued => TaskStatus::Queued, - WitnessJobStatus::InProgress => TaskStatus::InProgress, - WitnessJobStatus::Successful(_) => TaskStatus::Successful, - WitnessJobStatus::Failed(_) => TaskStatus::InProgress, - WitnessJobStatus::WaitingForArtifacts => { - TaskStatus::Custom("Waiting for Artifacts ⏱️".to_owned()) - } - WitnessJobStatus::Skipped => TaskStatus::Custom("Skipped ⏩".to_owned()), - WitnessJobStatus::WaitingForProofs => TaskStatus::WaitingForProofs, - } - } -} - -#[derive(Clone)] -pub struct AggregationRoundInfo { - pub round: AggregationRound, - pub prover_jobs_status: TaskStatus, -} - -impl Default for AggregationRoundInfo { - fn default() -> Self { - AggregationRoundInfo { - round: AggregationRound::BasicCircuits, - prover_jobs_status: TaskStatus::default(), + pub fn witness_generator_jobs_status(&self) -> Status { + match self.clone() { + StageInfo::BasicWitnessGenerator { + witness_generator_job_info, + .. + } => witness_generator_job_info + .map(|witness_generator_job_info| Status::from(witness_generator_job_info.status)) + .unwrap_or_default(), + StageInfo::LeafWitnessGenerator { + witness_generator_jobs_info, + .. + } => Status::from(witness_generator_jobs_info), + StageInfo::NodeWitnessGenerator { + witness_generator_jobs_info, + .. + } => Status::from(witness_generator_jobs_info), + StageInfo::RecursionTipWitnessGenerator(status) => status + .map(|job| Status::from(job.status)) + .unwrap_or_default(), + StageInfo::SchedulerWitnessGenerator(status) => status + .map(|job| Status::from(job.status)) + .unwrap_or_default(), + StageInfo::Compressor(status) => status + .map(|job| Status::from(job.status)) + .unwrap_or_default(), } } } diff --git a/prover/prover_dal/.sqlx/query-85a69b433c08847876bf6e7af9bc39ae8a6e053a0e03afd3fb5e02ee17157067.json b/prover/prover_dal/.sqlx/query-85a69b433c08847876bf6e7af9bc39ae8a6e053a0e03afd3fb5e02ee17157067.json new file mode 100644 index 000000000000..75a600d5b46b --- /dev/null +++ b/prover/prover_dal/.sqlx/query-85a69b433c08847876bf6e7af9bc39ae8a6e053a0e03afd3fb5e02ee17157067.json @@ -0,0 +1,82 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n *\n FROM\n recursion_tip_witness_jobs_fri\n WHERE\n l1_batch_number = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "l1_batch_number", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "status", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "attempts", + "type_info": "Int2" + }, + { + "ordinal": 3, + "name": "processing_started_at", + "type_info": "Timestamp" + }, + { + "ordinal": 4, + "name": "time_taken", + "type_info": "Time" + }, + { + "ordinal": 5, + "name": "error", + "type_info": "Text" + }, + { + "ordinal": 6, + "name": "created_at", + "type_info": "Timestamp" + }, + { + "ordinal": 7, + "name": "updated_at", + "type_info": "Timestamp" + }, + { + "ordinal": 8, + "name": "number_of_final_node_jobs", + "type_info": "Int4" + }, + { + "ordinal": 9, + "name": "protocol_version", + "type_info": "Int4" + }, + { + "ordinal": 10, + "name": "picked_by", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + true, + true, + true, + false, + false, + true, + true, + true + ] + }, + "hash": "85a69b433c08847876bf6e7af9bc39ae8a6e053a0e03afd3fb5e02ee17157067" +} diff --git a/prover/prover_dal/src/fri_prover_dal.rs b/prover/prover_dal/src/fri_prover_dal.rs index 7b7a2e4b3f1c..e1f427d5635c 100644 --- a/prover/prover_dal/src/fri_prover_dal.rs +++ b/prover/prover_dal/src/fri_prover_dal.rs @@ -5,7 +5,8 @@ use zksync_basic_types::{ basic_fri_types::{AggregationRound, CircuitIdRoundTuple}, protocol_version::ProtocolVersionId, prover_dal::{ - FriProverJobMetadata, JobCountStatistics, ProverJobFriInfo, ProverJobStatus, StuckJobs, + correct_circuit_id, FriProverJobMetadata, JobCountStatistics, ProverJobFriInfo, + ProverJobStatus, StuckJobs, }, L1BatchNumber, }; @@ -670,7 +671,8 @@ impl FriProverDal<'_, '_> { .map(|row| ProverJobFriInfo { id: row.id as u32, l1_batch_number, - circuit_id: row.circuit_id as u32, + // It is necessary to correct the circuit IDs due to the discrepancy between different aggregation rounds. + circuit_id: correct_circuit_id(row.circuit_id, aggregation_round), circuit_blob_url: row.circuit_blob_url.clone(), aggregation_round, sequence_number: row.sequence_number as u32, diff --git a/prover/prover_dal/src/fri_witness_generator_dal.rs b/prover/prover_dal/src/fri_witness_generator_dal.rs index c46b3c6fa569..73b546de4b96 100644 --- a/prover/prover_dal/src/fri_witness_generator_dal.rs +++ b/prover/prover_dal/src/fri_witness_generator_dal.rs @@ -6,8 +6,9 @@ use zksync_basic_types::{ basic_fri_types::{AggregationRound, Eip4844Blobs}, protocol_version::ProtocolVersionId, prover_dal::{ - BasicWitnessGeneratorJobInfo, JobCountStatistics, LeafAggregationJobMetadata, - LeafWitnessGeneratorJobInfo, NodeAggregationJobMetadata, NodeWitnessGeneratorJobInfo, + correct_circuit_id, BasicWitnessGeneratorJobInfo, JobCountStatistics, + LeafAggregationJobMetadata, LeafWitnessGeneratorJobInfo, NodeAggregationJobMetadata, + NodeWitnessGeneratorJobInfo, RecursionTipWitnessGeneratorJobInfo, SchedulerWitnessGeneratorJobInfo, StuckJobs, WitnessJobStatus, }, L1BatchNumber, @@ -1504,7 +1505,8 @@ impl FriWitnessGeneratorDal<'_, '_> { .map(|row| NodeWitnessGeneratorJobInfo { id: row.id as u32, l1_batch_number, - circuit_id: row.circuit_id as u32, + // It is necessary to correct the circuit IDs due to the discrepancy between different aggregation rounds. + circuit_id: correct_circuit_id(row.circuit_id, AggregationRound::NodeAggregation), depth: row.depth as u32, status: WitnessJobStatus::from_str(&row.status).unwrap(), attempts: row.attempts as u32, @@ -1524,7 +1526,7 @@ impl FriWitnessGeneratorDal<'_, '_> { pub async fn get_scheduler_witness_generator_jobs_for_batch( &mut self, l1_batch_number: L1BatchNumber, - ) -> Vec { + ) -> Option { sqlx::query!( r#" SELECT @@ -1536,10 +1538,9 @@ impl FriWitnessGeneratorDal<'_, '_> { "#, i64::from(l1_batch_number.0) ) - .fetch_all(self.storage.conn()) + .fetch_optional(self.storage.conn()) .await .unwrap() - .iter() .map(|row| SchedulerWitnessGeneratorJobInfo { l1_batch_number, scheduler_partial_input_blob_url: row.scheduler_partial_input_blob_url.clone(), @@ -1553,7 +1554,39 @@ impl FriWitnessGeneratorDal<'_, '_> { protocol_version: row.protocol_version, picked_by: row.picked_by.clone(), }) - .collect() + } + + pub async fn get_recursion_tip_witness_generator_jobs_for_batch( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> Option { + sqlx::query!( + r#" + SELECT + * + FROM + recursion_tip_witness_jobs_fri + WHERE + l1_batch_number = $1 + "#, + i64::from(l1_batch_number.0) + ) + .fetch_optional(self.storage.conn()) + .await + .unwrap() + .map(|row| RecursionTipWitnessGeneratorJobInfo { + l1_batch_number, + status: WitnessJobStatus::from_str(&row.status).unwrap(), + attempts: row.attempts as u32, + processing_started_at: row.processing_started_at, + time_taken: row.time_taken, + error: row.error.clone(), + created_at: row.created_at, + updated_at: row.updated_at, + number_of_final_node_jobs: row.number_of_final_node_jobs, + protocol_version: row.protocol_version, + picked_by: row.picked_by.clone(), + }) } pub async fn delete_witness_generator_data_for_batch(