diff --git a/core/lib/dal/migrations/20240325133100_add_tee_verifier_input_producer_jobs_table.down.sql b/core/lib/dal/migrations/20240325133100_add_tee_verifier_input_producer_jobs_table.down.sql index bccd4f97dcbb..2d4d6a6ad7a7 100644 --- a/core/lib/dal/migrations/20240325133100_add_tee_verifier_input_producer_jobs_table.down.sql +++ b/core/lib/dal/migrations/20240325133100_add_tee_verifier_input_producer_jobs_table.down.sql @@ -1,5 +1,4 @@ DROP INDEX IF EXISTS idx_tee_verifier_input_producer_jobs_status_processing_attempts; -DROP INDEX IF EXISTS idx_tee_verifier_input_producer_jobs_l1_batch_number_status; DROP TABLE IF EXISTS tee_verifier_input_producer_jobs; diff --git a/core/lib/dal/migrations/20240325143100_add_tee_verifier_input_producer_jobs_table.up.sql b/core/lib/dal/migrations/20240325143100_add_tee_verifier_input_producer_jobs_table.up.sql index 26e7e5a11940..ead044266df4 100644 --- a/core/lib/dal/migrations/20240325143100_add_tee_verifier_input_producer_jobs_table.up.sql +++ b/core/lib/dal/migrations/20240325143100_add_tee_verifier_input_producer_jobs_table.up.sql @@ -15,6 +15,3 @@ CREATE TABLE IF NOT EXISTS tee_verifier_input_producer_jobs CREATE INDEX IF NOT EXISTS idx_tee_verifier_input_producer_jobs_status_processing_attempts ON tee_verifier_input_producer_jobs (status, processing_started_at, attempts); - -CREATE INDEX IF NOT EXISTS idx_tee_verifier_input_producer_jobs_l1_batch_number_status -ON tee_verifier_input_producer_jobs (l1_batch_number, status); diff --git a/core/lib/dal/migrations/20240523085604_add_tee_proof_generation_details_table.down.sql b/core/lib/dal/migrations/20240523085604_add_tee_proof_generation_details_table.down.sql index d46bc4aa154d..15de3d74c65c 100644 --- a/core/lib/dal/migrations/20240523085604_add_tee_proof_generation_details_table.down.sql +++ b/core/lib/dal/migrations/20240523085604_add_tee_proof_generation_details_table.down.sql @@ -1,4 +1,6 @@ -DROP INDEX IF EXISTS idx_proofs_number_per_batch_number_and_tee_type; +DROP INDEX IF EXISTS idx_tee_proofs_status_prover_taken_at; DROP TABLE IF EXISTS tee_attestations; DROP TABLE IF EXISTS tee_proofs; + +DROP TYPE IF EXISTS tee_proofs_job_status; diff --git a/core/lib/dal/migrations/20240523085604_add_tee_proof_generation_details_table.up.sql b/core/lib/dal/migrations/20240523085604_add_tee_proof_generation_details_table.up.sql index 013bdcd02b17..35074baecc94 100644 --- a/core/lib/dal/migrations/20240523085604_add_tee_proof_generation_details_table.up.sql +++ b/core/lib/dal/migrations/20240523085604_add_tee_proof_generation_details_table.up.sql @@ -1,3 +1,5 @@ +CREATE TYPE tee_proofs_job_status AS ENUM ('ReadyToBeProven', 'PickedByProver', 'Generated', 'Skipped'); + CREATE TABLE IF NOT EXISTS tee_attestations ( pubkey BYTEA PRIMARY KEY, @@ -6,16 +8,18 @@ CREATE TABLE IF NOT EXISTS tee_attestations CREATE TABLE IF NOT EXISTS tee_proofs ( - id BIGSERIAL PRIMARY KEY, l1_batch_number BIGINT NOT NULL REFERENCES tee_verifier_input_producer_jobs (l1_batch_number) ON DELETE CASCADE, tee_type TEXT NOT NULL, + status tee_proofs_job_status NOT NULL, pubkey BYTEA REFERENCES tee_attestations (pubkey) ON DELETE CASCADE, signature BYTEA, proof BYTEA, created_at TIMESTAMP NOT NULL, - proved_at TIMESTAMP, - prover_taken_at TIMESTAMP + updated_at TIMESTAMP NOT NULL, + prover_taken_at TIMESTAMP, + PRIMARY KEY (l1_batch_number, tee_type) ); -CREATE INDEX IF NOT EXISTS idx_proofs_number_per_batch_number_and_tee_type - ON tee_proofs (l1_batch_number, tee_type); +CREATE INDEX IF NOT EXISTS idx_tee_proofs_status_prover_taken_at + ON tee_proofs (prover_taken_at) + WHERE status = 'PickedByProver'; diff --git a/core/lib/dal/src/models/storage_tee_proof.rs b/core/lib/dal/src/models/storage_tee_proof.rs index 9c55c86eb26d..f2a2647e86d0 100644 --- a/core/lib/dal/src/models/storage_tee_proof.rs +++ b/core/lib/dal/src/models/storage_tee_proof.rs @@ -25,8 +25,6 @@ pub struct StorageTeeProof { /// TODO rename it TeeProof once StorageTeeProof is moved to api/mod.rs? #[derive(Debug, Clone, sqlx::FromRow)] pub struct TmpStorageTeeProof { - #[allow(dead_code)] - pub id: i64, pub pubkey: Option>, pub signature: Option>, pub proof: Option>, diff --git a/core/lib/dal/src/tee_proof_generation_dal.rs b/core/lib/dal/src/tee_proof_generation_dal.rs index f0ebc7720523..e503c2ce9d64 100644 --- a/core/lib/dal/src/tee_proof_generation_dal.rs +++ b/core/lib/dal/src/tee_proof_generation_dal.rs @@ -8,6 +8,7 @@ use zksync_types::{tee_types::TeeType, L1BatchNumber}; use crate::{ models::storage_tee_proof::{StorageTeeProof, TmpStorageTeeProof}, + tee_verifier_input_producer_dal::TeeVerifierInputProducerJobStatus, Core, }; @@ -16,45 +17,61 @@ pub struct TeeProofGenerationDal<'a, 'c> { pub(crate) storage: &'a mut Connection<'c, Core>, } +#[derive(Debug, sqlx::Type)] +#[sqlx(type_name = "tee_proofs_job_status")] +enum TeeProofGenerationJobStatus { + ReadyToBeProven, + PickedByProver, + Generated, + Skipped, +} + impl TeeProofGenerationDal<'_, '_> { pub async fn get_next_batch_to_be_proven( &mut self, tee_type: TeeType, processing_timeout: Duration, - ) -> DalResult> { + ) -> DalResult> { let processing_timeout = pg_interval_from_duration(processing_timeout); let query = sqlx::query!( r#" UPDATE tee_proofs SET + status = $1, + updated_at = NOW(), prover_taken_at = NOW() WHERE - id = ( + tee_type = $2 + AND l1_batch_number = ( SELECT - proofs.id + proofs.l1_batch_number FROM tee_proofs AS proofs JOIN tee_verifier_input_producer_jobs AS inputs ON proofs.l1_batch_number = inputs.l1_batch_number WHERE - inputs.status = 'Successful' - AND proofs.tee_type = $1 + inputs.status = $3 AND ( - proofs.prover_taken_at IS NULL - OR proofs.prover_taken_at < NOW() - $2::INTERVAL + proofs.status = $4 + OR ( + proofs.status = $5 + AND proofs.prover_taken_at < NOW() - $6::INTERVAL + ) ) ORDER BY - tee_proofs.l1_batch_number ASC, - proofs.prover_taken_at ASC NULLS FIRST + l1_batch_number ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING - tee_proofs.id, tee_proofs.l1_batch_number "#, + TeeProofGenerationJobStatus::PickedByProver as TeeProofGenerationJobStatus, &tee_type.to_string(), + TeeVerifierInputProducerJobStatus::Successful as TeeVerifierInputProducerJobStatus, + TeeProofGenerationJobStatus::ReadyToBeProven as TeeProofGenerationJobStatus, + TeeProofGenerationJobStatus::PickedByProver as TeeProofGenerationJobStatus, &processing_timeout, ); let batch_number = Instrumented::new("get_next_batch_to_be_proven") @@ -70,7 +87,8 @@ impl TeeProofGenerationDal<'_, '_> { pub async fn save_proof_artifacts_metadata( &mut self, - proof_id: i64, + batch_number: L1BatchNumber, + tee_type: TeeType, pubkey: &[u8], signature: &[u8], proof: &[u8], @@ -79,23 +97,27 @@ impl TeeProofGenerationDal<'_, '_> { r#" UPDATE tee_proofs SET - pubkey = $1, - signature = $2, - proof = $3, - proved_at = NOW() + tee_type = $1, + status = $2, + pubkey = $3, + signature = $4, + proof = $5, + updated_at = NOW() WHERE - id = $4 + l1_batch_number = $6 "#, + tee_type.to_string(), + TeeProofGenerationJobStatus::Generated as TeeProofGenerationJobStatus, pubkey, signature, proof, - proof_id + i64::from(batch_number.0) ); let instrumentation = Instrumented::new("save_proof_artifacts_metadata") + .with_arg("tee_type", &tee_type) .with_arg("pubkey", &pubkey) .with_arg("signature", &signature) - .with_arg("proof", &proof) - .with_arg("proof_id", &proof_id); + .with_arg("proof", &proof); let result = instrumentation .clone() .with(query) @@ -103,8 +125,8 @@ impl TeeProofGenerationDal<'_, '_> { .await?; if result.rows_affected() == 0 { let err = instrumentation.constraint_error(anyhow::anyhow!( - "Updating TEE proof for a non-existent ID {} is not allowed", - proof_id + "Updating TEE proof for a non-existent batch number {} is not allowed", + batch_number )); return Err(err); } @@ -121,12 +143,13 @@ impl TeeProofGenerationDal<'_, '_> { let query = sqlx::query!( r#" INSERT INTO - tee_proofs (l1_batch_number, tee_type, created_at) + tee_proofs (l1_batch_number, tee_type, status, created_at, updated_at) VALUES - ($1, $2, NOW()) + ($1, $2, $3, NOW(), NOW()) "#, batch_number, tee_type.to_string(), + TeeProofGenerationJobStatus::ReadyToBeProven as TeeProofGenerationJobStatus, ); let instrumentation = Instrumented::new("insert_tee_proof_generation_job") .with_arg("l1_batch_number", &batch_number) @@ -138,7 +161,7 @@ impl TeeProofGenerationDal<'_, '_> { .await?; if result.rows_affected() == 0 { let err = instrumentation.constraint_error(anyhow::anyhow!( - "Unable to insert TEE proof for {}, {}", + "Unable to insert TEE proof for batch number {}, TEE type {}", batch_number, tee_type )); @@ -148,33 +171,6 @@ impl TeeProofGenerationDal<'_, '_> { Ok(()) } - #[cfg(test)] - pub async fn get_oldest_unpicked_batch(&mut self) -> DalResult> { - let query = sqlx::query!( - r#" - SELECT - proofs.l1_batch_number - FROM - tee_proofs AS proofs - JOIN tee_verifier_input_producer_jobs AS inputs ON proofs.l1_batch_number = inputs.l1_batch_number - WHERE - inputs.status = 'Successful' - AND proofs.prover_taken_at IS NULL - ORDER BY - proofs.l1_batch_number ASC - LIMIT - 1 - "#, - ); - let batch_number = Instrumented::new("get_oldest_unpicked_batch") - .with(query) - .fetch_optional(self.storage) - .await? - .map(|row| L1BatchNumber(row.l1_batch_number as u32)); - - Ok(batch_number) - } - pub async fn save_attestation(&mut self, pubkey: &[u8], attestation: &[u8]) -> DalResult<()> { let query = sqlx::query!( r#" @@ -197,7 +193,8 @@ impl TeeProofGenerationDal<'_, '_> { .await?; if result.rows_affected() == 0 { let err = instrumentation.constraint_error(anyhow::anyhow!( - "Unable to insert TEE attestation: given pubkey already has an attestation assigned" + "Unable to insert TEE attestation: pubkey {:?} already has an attestation assigned", + pubkey )); return Err(err); } @@ -213,26 +210,33 @@ impl TeeProofGenerationDal<'_, '_> { let query = format!( r#" SELECT - tp.id, tp.pubkey, tp.signature, tp.proof, - tp.proved_at, + tp.updated_at, ta.attestation FROM tee_proofs tp LEFT JOIN tee_attestations ta ON tp.pubkey = ta.pubkey WHERE - tp.l1_batch_number = {} + tp.l1_batch_number = $1 + AND tp.status = $2 {} ORDER BY tp.id "#, - i64::from(batch_number.0), - tee_type.map_or_else(String::new, |tt| format!("AND tp.tee_type = '{}'", tt)) + tee_type.map_or_else(String::new, |_| "AND tp.tee_type = $3".to_string()) ); - let proofs: Vec = sqlx::query_as(&query) + let mut query = sqlx::query_as(&query) + .bind(i64::from(batch_number.0)) + .bind(TeeProofGenerationJobStatus::Generated as TeeProofGenerationJobStatus); + + if let Some(tee_type) = tee_type { + query = query.bind(tee_type.to_string()); + } + + let proofs: Vec = query .fetch_all(self.storage.conn()) .await .unwrap() @@ -250,4 +254,33 @@ impl TeeProofGenerationDal<'_, '_> { Ok(proofs) } + + #[cfg(test)] + pub async fn get_oldest_unpicked_batch(&mut self) -> DalResult> { + let query = sqlx::query!( + r#" + SELECT + proofs.l1_batch_number + FROM + tee_proofs AS proofs + JOIN tee_verifier_input_producer_jobs AS inputs ON proofs.l1_batch_number = inputs.l1_batch_number + WHERE + inputs.status = $1 + AND proofs.status = $2 + ORDER BY + proofs.l1_batch_number ASC + LIMIT + 1 + "#, + TeeVerifierInputProducerJobStatus::Successful as TeeVerifierInputProducerJobStatus, + TeeProofGenerationJobStatus::ReadyToBeProven as TeeProofGenerationJobStatus, + ); + let batch_number = Instrumented::new("get_oldest_unpicked_batch") + .with(query) + .fetch_optional(self.storage) + .await? + .map(|row| L1BatchNumber(row.l1_batch_number as u32)); + + Ok(batch_number) + } } diff --git a/core/lib/prover_interface/src/api.rs b/core/lib/prover_interface/src/api.rs index 6f937d6ab91b..a8a578a8de08 100644 --- a/core/lib/prover_interface/src/api.rs +++ b/core/lib/prover_interface/src/api.rs @@ -30,10 +30,7 @@ pub enum ProofGenerationDataResponse { } #[derive(Debug, Serialize, Deserialize)] -pub struct TeeProofGenerationDataResponse { - pub proof_id: Option, - pub input: Option>, -} +pub struct TeeProofGenerationDataResponse(pub Option>); #[derive(Debug, Serialize, Deserialize)] pub enum SubmitProofResponse { diff --git a/core/node/proof_data_handler/src/tee_request_processor.rs b/core/node/proof_data_handler/src/tee_request_processor.rs index 20613ba909e2..989b065c0494 100644 --- a/core/node/proof_data_handler/src/tee_request_processor.rs +++ b/core/node/proof_data_handler/src/tee_request_processor.rs @@ -53,15 +53,9 @@ impl TeeRequestProcessor { .await .map_err(RequestProcessorError::Dal)?; - let (proof_id, l1_batch_number) = match l1_batch_number_result { - Some((proof_id, number)) => (proof_id, number), - None => { - // TODO introduce a proper type - return Ok(Json(TeeProofGenerationDataResponse { - proof_id: None, - input: None, - })); - } + let (l1_batch_number) = match l1_batch_number_result { + Some(number) => number, + None => return Ok(Json(TeeProofGenerationDataResponse(None))), }; let tee_verifier_input: TeeVerifierInput = self @@ -70,10 +64,7 @@ impl TeeRequestProcessor { .await .map_err(RequestProcessorError::ObjectStore)?; - let response = TeeProofGenerationDataResponse { - proof_id: Some(proof_id), - input: Some(Box::new(tee_verifier_input)), - }; + let response = TeeProofGenerationDataResponse(Some(Box::new(tee_verifier_input))); Ok(Json(response)) } @@ -97,7 +88,8 @@ impl TeeRequestProcessor { l1_batch_number ); dal.save_proof_artifacts_metadata( - proof.0.proof_id, + l1_batch_number, + proof.0.tee_type, &proof.0.pubkey, &proof.0.signature, &proof.0.proof, diff --git a/core/node/proof_data_handler/src/tests.rs b/core/node/proof_data_handler/src/tests.rs index 0dac2f960a87..07d675ec3cfa 100644 --- a/core/node/proof_data_handler/src/tests.rs +++ b/core/node/proof_data_handler/src/tests.rs @@ -18,7 +18,7 @@ use zksync_prover_interface::{ api::SubmitTeeProofRequest, inputs::{TeeVerifierInput, V1TeeVerifierInput, WitnessInputMerklePaths}, }; -use zksync_types::{commitment::L1BatchCommitmentMode, L1BatchNumber, H256}; +use zksync_types::{commitment::L1BatchCommitmentMode, tee_types::TeeType, L1BatchNumber, H256}; use crate::create_proof_processing_router; @@ -94,7 +94,7 @@ async fn request_tee_proof_inputs() { }, L1BatchCommitmentMode::Rollup, ); - let req_body = Body::from(serde_json::to_vec(&json!({})).unwrap()); + let req_body = Body::from(serde_json::to_vec(&json!({ "tee_type": "Sgx" })).unwrap()); let response = app .oneshot( Request::builder() @@ -113,9 +113,9 @@ async fn request_tee_proof_inputs() { .await .unwrap(); let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); - let deserialized: TeeVerifierInput = serde_json::from_value(json).unwrap(); + let deserialized: TeeProofGenerationDataResponse = serde_json::from_value(json).unwrap(); - assert_eq!(tvi, deserialized); + assert_eq!(tvi, deserialized.input.unwrap()); } // Test /tee/submit_proofs endpoint using a mocked TEE proof and verify response and db state @@ -183,17 +183,19 @@ async fn submit_tee_proof() { // there should be one TEE proof in the db now - let proof = proof_db_conn + let proofs = proof_db_conn .tee_proof_generation_dal() - .get_tee_proof(batch_number) + .get_tee_proofs(batch_number, Some(TeeType::Sgx)) .await - .unwrap() .unwrap(); + assert_eq!(proofs.len(), 1); + let proof = &proofs[0]; + assert_eq!(proof.tee_type.unwrap(), tee_proof_request.0.tee_type); + assert_eq!(proof.proof.unwrap(), tee_proof_request.0.proof); + assert_eq!(proof.attestation.unwrap(), attestation); assert_eq!(proof.signature.unwrap(), tee_proof_request.0.signature); assert_eq!(proof.pubkey.unwrap(), tee_proof_request.0.pubkey); - assert_eq!(proof.proof.unwrap(), tee_proof_request.0.proof); - assert_eq!(proof.tee_type.unwrap(), tee_proof_request.0.tee_type); } // Mock SQL db with information about the status of the TEE proof generation @@ -229,7 +231,7 @@ async fn mock_tee_batch_status( // mock SQL table with relevant information about the status of TEE proof generation ('ready_to_be_proven') proof_dal - .insert_tee_proof_generation_job(batch_number) + .insert_tee_proof_generation_job(batch_number, TeeType::Sgx) .await .expect("Failed to insert tee_proof_generation_job");