Skip to content

Commit

Permalink
feat: Make house keeper emit correct protocol version (#2062)
Browse files Browse the repository at this point in the history
## What ❔

Emit protocol version from database in metrics.

## Why ❔

Before we were emitting hardcoded protocol version, which is not
accurate

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
Artemka374 authored May 30, 2024
1 parent dc9bea1 commit a58a7e8
Show file tree
Hide file tree
Showing 37 changed files with 442 additions and 330 deletions.
7 changes: 7 additions & 0 deletions core/lib/basic_types/src/basic_fri_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ impl TryFrom<i32> for AggregationRound {
}
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, Hash)]
pub struct JobIdentifiers {
pub circuit_id: u8,
pub aggregation_round: u8,
pub protocol_version: u16,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
12 changes: 9 additions & 3 deletions core/lib/basic_types/src/prover_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,21 @@ pub struct FriProverJobMetadata {
}

#[derive(Debug, Clone, Copy, Default)]
pub struct JobCountStatistics {
pub struct ExtendedJobCountStatistics {
pub queued: usize,
pub in_progress: usize,
pub failed: usize,
pub successful: usize,
}

impl Add for JobCountStatistics {
type Output = JobCountStatistics;
#[derive(Debug, Clone, Copy, Default)]
pub struct JobCountStatistics {
pub queued: usize,
pub in_progress: usize,
}

impl Add for ExtendedJobCountStatistics {
type Output = ExtendedJobCountStatistics;

fn add(self, rhs: Self) -> Self::Output {
Self {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use async_trait::async_trait;
use prover_dal::{Prover, ProverDal};
use zksync_dal::ConnectionPool;
Expand All @@ -24,7 +26,9 @@ impl FriProofCompressorQueueReporter {
}
}

async fn get_job_statistics(pool: &ConnectionPool<Prover>) -> JobCountStatistics {
async fn get_job_statistics(
pool: &ConnectionPool<Prover>,
) -> HashMap<ProtocolVersionId, JobCountStatistics> {
pool.connection()
.await
.unwrap()
Expand All @@ -41,25 +45,24 @@ impl PeriodicJob for FriProofCompressorQueueReporter {
async fn run_routine_task(&mut self) -> anyhow::Result<()> {
let stats = Self::get_job_statistics(&self.pool).await;

if stats.queued > 0 {
tracing::info!(
"Found {} free {} in progress proof compressor jobs",
stats.queued,
stats.in_progress
);
}
for (protocol_version, stats) in &stats {
if stats.queued > 0 {
tracing::info!(
"Found {} free {} in progress proof compressor jobs for protocol version {}",
stats.queued,
stats.in_progress,
protocol_version
);
}

PROVER_FRI_METRICS.proof_compressor_jobs[&(
JobStatus::Queued,
ProtocolVersionId::current_prover_version().to_string(),
)]
.set(stats.queued as u64);
PROVER_FRI_METRICS.proof_compressor_jobs
[&(JobStatus::Queued, protocol_version.to_string())]
.set(stats.queued as u64);

PROVER_FRI_METRICS.proof_compressor_jobs[&(
JobStatus::InProgress,
ProtocolVersionId::current_prover_version().to_string(),
)]
.set(stats.in_progress as u64);
PROVER_FRI_METRICS.proof_compressor_jobs
[&(JobStatus::InProgress, protocol_version.to_string())]
.set(stats.in_progress as u64);
}

let oldest_not_compressed_batch = self
.pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,40 +40,43 @@ impl PeriodicJob for FriProverQueueReporter {
let mut conn = self.prover_connection_pool.connection().await.unwrap();
let stats = conn.fri_prover_jobs_dal().get_prover_jobs_stats().await;

for ((circuit_id, aggregation_round), stats) in stats.into_iter() {
for (job_identifiers, stats) in &stats {
// BEWARE, HERE BE DRAGONS.
// In database, the `circuit_id` stored is the circuit for which the aggregation is done,
// not the circuit which is running.
// There is a single node level aggregation circuit, which is circuit 2.
// This can aggregate multiple leaf nodes (which may belong to different circuits).
// This reporting is a hacky forced way to use `circuit_id` 2 which will solve auto scalers.
// A proper fix will be later provided to solve this at database level.
let circuit_id = if aggregation_round == 2 {
let circuit_id = if job_identifiers.aggregation_round == 2 {
2
} else {
circuit_id
job_identifiers.circuit_id
};

let group_id = self
.config
.get_group_id_for_circuit_id_and_aggregation_round(circuit_id, aggregation_round)
.get_group_id_for_circuit_id_and_aggregation_round(
circuit_id,
job_identifiers.aggregation_round,
)
.unwrap_or(u8::MAX);

FRI_PROVER_METRICS.report_prover_jobs(
"queued",
circuit_id,
aggregation_round,
job_identifiers.aggregation_round,
group_id,
ProtocolVersionId::current_prover_version(),
ProtocolVersionId::try_from(job_identifiers.protocol_version).unwrap(),
stats.queued as u64,
);

FRI_PROVER_METRICS.report_prover_jobs(
"in_progress",
circuit_id,
aggregation_round,
job_identifiers.aggregation_round,
group_id,
ProtocolVersionId::current_prover_version(),
ProtocolVersionId::try_from(job_identifiers.protocol_version).unwrap(),
stats.in_progress as u64,
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,63 +25,65 @@ impl FriWitnessGeneratorQueueReporter {
}
}

async fn get_job_statistics(&self) -> HashMap<AggregationRound, JobCountStatistics> {
async fn get_job_statistics(
&self,
) -> HashMap<(AggregationRound, ProtocolVersionId), JobCountStatistics> {
let mut conn = self.pool.connection().await.unwrap();
HashMap::from([
(
AggregationRound::BasicCircuits,
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::BasicCircuits)
.await,
),
(
AggregationRound::LeafAggregation,
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::LeafAggregation)
.await,
),
(
AggregationRound::NodeAggregation,
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::NodeAggregation)
.await,
),
(
AggregationRound::RecursionTip,
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::RecursionTip)
.await,
),
(
AggregationRound::Scheduler,
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::Scheduler)
.await,
),
])
let mut result = HashMap::new();
result.extend(
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::BasicCircuits)
.await,
);
result.extend(
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::LeafAggregation)
.await,
);
result.extend(
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::NodeAggregation)
.await,
);
result.extend(
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::RecursionTip)
.await,
);
result.extend(
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::Scheduler)
.await,
);
result
}
}

fn emit_metrics_for_round(round: AggregationRound, stats: JobCountStatistics) {
fn emit_metrics_for_round(
round: AggregationRound,
protocol_version: ProtocolVersionId,
stats: &JobCountStatistics,
) {
if stats.queued > 0 || stats.in_progress > 0 {
tracing::trace!(
"Found {} free and {} in progress {:?} FRI witness generators jobs",
"Found {} free and {} in progress {:?} FRI witness generators jobs for protocol version {}",
stats.queued,
stats.in_progress,
round
round,
protocol_version
);
}

SERVER_METRICS.witness_generator_jobs_by_round[&(
"queued",
format!("{:?}", round),
ProtocolVersionId::current_prover_version().to_string(),
protocol_version.to_string(),
)]
.set(stats.queued as u64);
SERVER_METRICS.witness_generator_jobs_by_round[&(
"in_progress",
format!("{:?}", round),
ProtocolVersionId::current_prover_version().to_string(),
protocol_version.to_string(),
)]
.set(stats.in_progress as u64);
}
Expand All @@ -92,31 +94,31 @@ impl PeriodicJob for FriWitnessGeneratorQueueReporter {

async fn run_routine_task(&mut self) -> anyhow::Result<()> {
let stats_for_all_rounds = self.get_job_statistics().await;
let mut aggregated = JobCountStatistics::default();
for (round, stats) in stats_for_all_rounds {
emit_metrics_for_round(round, stats);
aggregated = aggregated + stats;
}
let mut aggregated = HashMap::<ProtocolVersionId, JobCountStatistics>::new();
for ((round, protocol_version), stats) in stats_for_all_rounds {
emit_metrics_for_round(round, protocol_version, &stats);

if aggregated.queued > 0 {
tracing::trace!(
"Found {} free {} in progress witness generators jobs",
aggregated.queued,
aggregated.in_progress
);
let entry = aggregated.entry(protocol_version).or_default();
entry.queued += stats.queued;
entry.in_progress += stats.in_progress;
}

SERVER_METRICS.witness_generator_jobs[&(
"queued",
ProtocolVersionId::current_prover_version().to_string(),
)]
.set(aggregated.queued as u64);
for (protocol_version, stats) in &aggregated {
if stats.queued > 0 || stats.in_progress > 0 {
tracing::trace!(
"Found {} free {} in progress witness generators jobs for protocol version {}",
stats.queued,
stats.in_progress,
protocol_version
);
}

SERVER_METRICS.witness_generator_jobs[&(
"in_progress",
ProtocolVersionId::current_prover_version().to_string(),
)]
.set(aggregated.in_progress as u64);
SERVER_METRICS.witness_generator_jobs[&("queued", protocol_version.to_string())]
.set(stats.queued as u64);

SERVER_METRICS.witness_generator_jobs[&("in_progress", protocol_version.to_string())]
.set(stats.in_progress as u64);
}

Ok(())
}
Expand Down
5 changes: 4 additions & 1 deletion infrastructure/zk/src/format_sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ function formatOneLineQuery(line: string): string {

return prefix + '\n' + formattedQuery + '\n' + suffix;
}

async function formatFile(filePath: string, check: boolean) {
const content = await fs.promises.readFile(filePath, { encoding: 'utf-8' });
let linesToQuery = null;
Expand Down Expand Up @@ -157,7 +158,9 @@ async function formatFile(filePath: string, check: boolean) {

export async function formatSqlxQueries(check: boolean) {
process.chdir(`${process.env.ZKSYNC_HOME}`);
const { stdout: filesRaw } = await utils.exec('find core/lib/dal -type f -name "*.rs"');
const { stdout: filesRaw } = await utils.exec(
'find core/lib/dal -type f -name "*.rs" && find prover/prover_dal -type f -name "*.rs"'
);
const files = filesRaw.trim().split('\n');
const formatResults = await Promise.all(files.map((file) => formatFile(file, check)));
if (check) {
Expand Down
4 changes: 2 additions & 2 deletions prover/prover_cli/src/commands/status/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use prover_dal::{Connection, ConnectionPool, Prover, ProverDal};
use zksync_types::{
basic_fri_types::AggregationRound,
prover_dal::{
BasicWitnessGeneratorJobInfo, JobCountStatistics, LeafWitnessGeneratorJobInfo,
BasicWitnessGeneratorJobInfo, ExtendedJobCountStatistics, LeafWitnessGeneratorJobInfo,
NodeWitnessGeneratorJobInfo, ProofCompressionJobInfo, ProverJobFriInfo, ProverJobStatus,
RecursionTipWitnessGeneratorJobInfo, SchedulerWitnessGeneratorJobInfo,
},
Expand Down Expand Up @@ -383,7 +383,7 @@ fn display_prover_jobs_info(prover_jobs_info: Vec<ProverJobFriInfo>) {
}

fn display_job_status_count(jobs: Vec<ProverJobFriInfo>) {
let mut jobs_counts = JobCountStatistics::default();
let mut jobs_counts = ExtendedJobCountStatistics::default();
let total_jobs = jobs.len();
jobs.iter().for_each(|job| match job.status {
ProverJobStatus::Queued => jobs_counts.queued += 1,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a58a7e8

Please sign in to comment.