Skip to content

Commit

Permalink
One proof instance per batch number per TEE type
Browse files Browse the repository at this point in the history
  • Loading branch information
pbeza committed Jul 31, 2024
1 parent cea2cf9 commit 26eb5f1
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 98 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
DROP INDEX IF EXISTS idx_tee_verifier_input_producer_jobs_status_processing_attempts;
DROP INDEX IF EXISTS idx_tee_verifier_input_producer_jobs_l1_batch_number_status;

DROP TABLE IF EXISTS tee_verifier_input_producer_jobs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,3 @@ CREATE TABLE IF NOT EXISTS tee_verifier_input_producer_jobs

CREATE INDEX IF NOT EXISTS idx_tee_verifier_input_producer_jobs_status_processing_attempts
ON tee_verifier_input_producer_jobs (status, processing_started_at, attempts);

CREATE INDEX IF NOT EXISTS idx_tee_verifier_input_producer_jobs_l1_batch_number_status
ON tee_verifier_input_producer_jobs (l1_batch_number, status);
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
DROP INDEX IF EXISTS idx_proofs_number_per_batch_number_and_tee_type;
DROP INDEX IF EXISTS idx_tee_proofs_status_prover_taken_at;

DROP TABLE IF EXISTS tee_attestations;
DROP TABLE IF EXISTS tee_proofs;

DROP TYPE IF EXISTS tee_proofs_job_status;
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
CREATE TYPE tee_proofs_job_status AS ENUM ('ReadyToBeProven', 'PickedByProver', 'Generated', 'Skipped');

CREATE TABLE IF NOT EXISTS tee_attestations
(
pubkey BYTEA PRIMARY KEY,
Expand All @@ -6,16 +8,18 @@ CREATE TABLE IF NOT EXISTS tee_attestations

CREATE TABLE IF NOT EXISTS tee_proofs
(
id BIGSERIAL PRIMARY KEY,
l1_batch_number BIGINT NOT NULL REFERENCES tee_verifier_input_producer_jobs (l1_batch_number) ON DELETE CASCADE,
tee_type TEXT NOT NULL,
status tee_proofs_job_status NOT NULL,
pubkey BYTEA REFERENCES tee_attestations (pubkey) ON DELETE CASCADE,
signature BYTEA,
proof BYTEA,
created_at TIMESTAMP NOT NULL,
proved_at TIMESTAMP,
prover_taken_at TIMESTAMP
updated_at TIMESTAMP NOT NULL,
prover_taken_at TIMESTAMP,
PRIMARY KEY (l1_batch_number, tee_type)
);

CREATE INDEX IF NOT EXISTS idx_proofs_number_per_batch_number_and_tee_type
ON tee_proofs (l1_batch_number, tee_type);
CREATE INDEX IF NOT EXISTS idx_tee_proofs_status_prover_taken_at
ON tee_proofs (prover_taken_at)
WHERE status = 'PickedByProver';
2 changes: 0 additions & 2 deletions core/lib/dal/src/models/storage_tee_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ pub struct StorageTeeProof {
/// TODO rename it TeeProof once StorageTeeProof is moved to api/mod.rs?
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct TmpStorageTeeProof {
#[allow(dead_code)]
pub id: i64,
pub pubkey: Option<Vec<u8>>,
pub signature: Option<Vec<u8>>,
pub proof: Option<Vec<u8>>,
Expand Down
149 changes: 91 additions & 58 deletions core/lib/dal/src/tee_proof_generation_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use zksync_types::{tee_types::TeeType, L1BatchNumber};

use crate::{
models::storage_tee_proof::{StorageTeeProof, TmpStorageTeeProof},
tee_verifier_input_producer_dal::TeeVerifierInputProducerJobStatus,
Core,
};

Expand All @@ -16,45 +17,61 @@ pub struct TeeProofGenerationDal<'a, 'c> {
pub(crate) storage: &'a mut Connection<'c, Core>,
}

#[derive(Debug, sqlx::Type)]
#[sqlx(type_name = "tee_proofs_job_status")]
enum TeeProofGenerationJobStatus {
ReadyToBeProven,
PickedByProver,
Generated,
Skipped,
}

impl TeeProofGenerationDal<'_, '_> {
pub async fn get_next_batch_to_be_proven(
&mut self,
tee_type: TeeType,
processing_timeout: Duration,
) -> DalResult<Option<(i64, L1BatchNumber)>> {
) -> DalResult<Option<L1BatchNumber>> {
let processing_timeout = pg_interval_from_duration(processing_timeout);
let query = sqlx::query!(
r#"
UPDATE tee_proofs
SET
status = $1,
updated_at = NOW(),
prover_taken_at = NOW()
WHERE
id = (
tee_type = $2
AND l1_batch_number = (
SELECT
proofs.id
proofs.l1_batch_number
FROM
tee_proofs AS proofs
JOIN tee_verifier_input_producer_jobs AS inputs ON proofs.l1_batch_number = inputs.l1_batch_number
WHERE
inputs.status = 'Successful'
AND proofs.tee_type = $1
inputs.status = $3
AND (
proofs.prover_taken_at IS NULL
OR proofs.prover_taken_at < NOW() - $2::INTERVAL
proofs.status = $4
OR (
proofs.status = $5
AND proofs.prover_taken_at < NOW() - $6::INTERVAL
)
)
ORDER BY
tee_proofs.l1_batch_number ASC,
proofs.prover_taken_at ASC NULLS FIRST
l1_batch_number ASC
LIMIT
1
FOR UPDATE
SKIP LOCKED
)
RETURNING
tee_proofs.id,
tee_proofs.l1_batch_number
"#,
TeeProofGenerationJobStatus::PickedByProver as TeeProofGenerationJobStatus,
&tee_type.to_string(),
TeeVerifierInputProducerJobStatus::Successful as TeeVerifierInputProducerJobStatus,
TeeProofGenerationJobStatus::ReadyToBeProven as TeeProofGenerationJobStatus,
TeeProofGenerationJobStatus::PickedByProver as TeeProofGenerationJobStatus,
&processing_timeout,
);
let batch_number = Instrumented::new("get_next_batch_to_be_proven")
Expand All @@ -70,7 +87,8 @@ impl TeeProofGenerationDal<'_, '_> {

pub async fn save_proof_artifacts_metadata(
&mut self,
proof_id: i64,
batch_number: L1BatchNumber,
tee_type: TeeType,
pubkey: &[u8],
signature: &[u8],
proof: &[u8],
Expand All @@ -79,32 +97,36 @@ impl TeeProofGenerationDal<'_, '_> {
r#"
UPDATE tee_proofs
SET
pubkey = $1,
signature = $2,
proof = $3,
proved_at = NOW()
tee_type = $1,
status = $2,
pubkey = $3,
signature = $4,
proof = $5,
updated_at = NOW()
WHERE
id = $4
l1_batch_number = $6
"#,
tee_type.to_string(),
TeeProofGenerationJobStatus::Generated as TeeProofGenerationJobStatus,
pubkey,
signature,
proof,
proof_id
i64::from(batch_number.0)
);
let instrumentation = Instrumented::new("save_proof_artifacts_metadata")
.with_arg("tee_type", &tee_type)
.with_arg("pubkey", &pubkey)
.with_arg("signature", &signature)
.with_arg("proof", &proof)
.with_arg("proof_id", &proof_id);
.with_arg("proof", &proof);
let result = instrumentation
.clone()
.with(query)
.execute(self.storage)
.await?;
if result.rows_affected() == 0 {
let err = instrumentation.constraint_error(anyhow::anyhow!(
"Updating TEE proof for a non-existent ID {} is not allowed",
proof_id
"Updating TEE proof for a non-existent batch number {} is not allowed",
batch_number
));
return Err(err);
}
Expand All @@ -121,12 +143,13 @@ impl TeeProofGenerationDal<'_, '_> {
let query = sqlx::query!(
r#"
INSERT INTO
tee_proofs (l1_batch_number, tee_type, created_at)
tee_proofs (l1_batch_number, tee_type, status, created_at, updated_at)
VALUES
($1, $2, NOW())
($1, $2, $3, NOW(), NOW())
"#,
batch_number,
tee_type.to_string(),
TeeProofGenerationJobStatus::ReadyToBeProven as TeeProofGenerationJobStatus,
);
let instrumentation = Instrumented::new("insert_tee_proof_generation_job")
.with_arg("l1_batch_number", &batch_number)
Expand All @@ -138,7 +161,7 @@ impl TeeProofGenerationDal<'_, '_> {
.await?;
if result.rows_affected() == 0 {
let err = instrumentation.constraint_error(anyhow::anyhow!(
"Unable to insert TEE proof for {}, {}",
"Unable to insert TEE proof for batch number {}, TEE type {}",
batch_number,
tee_type
));
Expand All @@ -148,33 +171,6 @@ impl TeeProofGenerationDal<'_, '_> {
Ok(())
}

#[cfg(test)]
pub async fn get_oldest_unpicked_batch(&mut self) -> DalResult<Option<L1BatchNumber>> {
let query = sqlx::query!(
r#"
SELECT
proofs.l1_batch_number
FROM
tee_proofs AS proofs
JOIN tee_verifier_input_producer_jobs AS inputs ON proofs.l1_batch_number = inputs.l1_batch_number
WHERE
inputs.status = 'Successful'
AND proofs.prover_taken_at IS NULL
ORDER BY
proofs.l1_batch_number ASC
LIMIT
1
"#,
);
let batch_number = Instrumented::new("get_oldest_unpicked_batch")
.with(query)
.fetch_optional(self.storage)
.await?
.map(|row| L1BatchNumber(row.l1_batch_number as u32));

Ok(batch_number)
}

pub async fn save_attestation(&mut self, pubkey: &[u8], attestation: &[u8]) -> DalResult<()> {
let query = sqlx::query!(
r#"
Expand All @@ -197,7 +193,8 @@ impl TeeProofGenerationDal<'_, '_> {
.await?;
if result.rows_affected() == 0 {
let err = instrumentation.constraint_error(anyhow::anyhow!(
"Unable to insert TEE attestation: given pubkey already has an attestation assigned"
"Unable to insert TEE attestation: pubkey {:?} already has an attestation assigned",
pubkey
));
return Err(err);
}
Expand All @@ -213,26 +210,33 @@ impl TeeProofGenerationDal<'_, '_> {
let query = format!(
r#"
SELECT
tp.id,
tp.pubkey,
tp.signature,
tp.proof,
tp.proved_at,
tp.updated_at,
ta.attestation
FROM
tee_proofs tp
LEFT JOIN
tee_attestations ta ON tp.pubkey = ta.pubkey
WHERE
tp.l1_batch_number = {}
tp.l1_batch_number = $1
AND tp.status = $2
{}
ORDER BY tp.id
"#,
i64::from(batch_number.0),
tee_type.map_or_else(String::new, |tt| format!("AND tp.tee_type = '{}'", tt))
tee_type.map_or_else(String::new, |_| "AND tp.tee_type = $3".to_string())
);

let proofs: Vec<StorageTeeProof> = sqlx::query_as(&query)
let mut query = sqlx::query_as(&query)
.bind(i64::from(batch_number.0))
.bind(TeeProofGenerationJobStatus::Generated as TeeProofGenerationJobStatus);

if let Some(tee_type) = tee_type {
query = query.bind(tee_type.to_string());
}

let proofs: Vec<StorageTeeProof> = query
.fetch_all(self.storage.conn())
.await
.unwrap()
Expand All @@ -250,4 +254,33 @@ impl TeeProofGenerationDal<'_, '_> {

Ok(proofs)
}

#[cfg(test)]
pub async fn get_oldest_unpicked_batch(&mut self) -> DalResult<Option<L1BatchNumber>> {
let query = sqlx::query!(
r#"
SELECT
proofs.l1_batch_number
FROM
tee_proofs AS proofs
JOIN tee_verifier_input_producer_jobs AS inputs ON proofs.l1_batch_number = inputs.l1_batch_number
WHERE
inputs.status = $1
AND proofs.status = $2
ORDER BY
proofs.l1_batch_number ASC
LIMIT
1
"#,
TeeVerifierInputProducerJobStatus::Successful as TeeVerifierInputProducerJobStatus,
TeeProofGenerationJobStatus::ReadyToBeProven as TeeProofGenerationJobStatus,
);
let batch_number = Instrumented::new("get_oldest_unpicked_batch")
.with(query)
.fetch_optional(self.storage)
.await?
.map(|row| L1BatchNumber(row.l1_batch_number as u32));

Ok(batch_number)
}
}
5 changes: 1 addition & 4 deletions core/lib/prover_interface/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ pub enum ProofGenerationDataResponse {
}

#[derive(Debug, Serialize, Deserialize)]
pub struct TeeProofGenerationDataResponse {
pub proof_id: Option<i64>,
pub input: Option<Box<TeeVerifierInput>>,
}
pub struct TeeProofGenerationDataResponse(pub Option<Box<TeeVerifierInput>>);

#[derive(Debug, Serialize, Deserialize)]
pub enum SubmitProofResponse {
Expand Down
20 changes: 6 additions & 14 deletions core/node/proof_data_handler/src/tee_request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,9 @@ impl TeeRequestProcessor {
.await
.map_err(RequestProcessorError::Dal)?;

let (proof_id, l1_batch_number) = match l1_batch_number_result {
Some((proof_id, number)) => (proof_id, number),
None => {
// TODO introduce a proper type
return Ok(Json(TeeProofGenerationDataResponse {
proof_id: None,
input: None,
}));
}
let (l1_batch_number) = match l1_batch_number_result {
Some(number) => number,
None => return Ok(Json(TeeProofGenerationDataResponse(None))),
};

let tee_verifier_input: TeeVerifierInput = self
Expand All @@ -70,10 +64,7 @@ impl TeeRequestProcessor {
.await
.map_err(RequestProcessorError::ObjectStore)?;

let response = TeeProofGenerationDataResponse {
proof_id: Some(proof_id),
input: Some(Box::new(tee_verifier_input)),
};
let response = TeeProofGenerationDataResponse(Some(Box::new(tee_verifier_input)));

Ok(Json(response))
}
Expand All @@ -97,7 +88,8 @@ impl TeeRequestProcessor {
l1_batch_number
);
dal.save_proof_artifacts_metadata(
proof.0.proof_id,
l1_batch_number,
proof.0.tee_type,
&proof.0.pubkey,
&proof.0.signature,
&proof.0.proof,
Expand Down
Loading

0 comments on commit 26eb5f1

Please sign in to comment.