From 8ed086afecfcad30bfda44fc4d29a00beea71cca Mon Sep 17 00:00:00 2001 From: Artem Fomiuk <88630083+Artemka374@users.noreply.github.com> Date: Wed, 28 Aug 2024 16:38:33 +0300 Subject: [PATCH] feat: Refactor metrics/make API use binaries (#2735) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Add metric-emitting middleware for ExternalProofIntegrationAPI Make API return binaries/allow uploading binaries for verification ## Why ❔ For better UX ## Checklist - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zk fmt` and `zk lint`. --- Cargo.lock | 18 ++ core/lib/prover_interface/src/api.rs | 3 - .../external_proof_integration_api/Cargo.toml | 2 +- .../external_proof_integration_api/src/lib.rs | 57 +++++-- .../src/metrics.rs | 27 --- .../src/middleware.rs | 22 +++ .../src/processor.rs | 156 ++++++++++++------ prover/docs/05_proving_batch.md | 16 +- 8 files changed, 199 insertions(+), 102 deletions(-) create mode 100644 core/node/external_proof_integration_api/src/middleware.rs diff --git a/Cargo.lock b/Cargo.lock index 8dc6c7638e86..fdd9835cab6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -350,6 +350,7 @@ dependencies = [ "matchit", "memchr", "mime", + "multer", "percent-encoding", "pin-project-lite", "rustversion", @@ -3778,6 +3779,23 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1" +[[package]] +name = "multer" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 1.1.0", + "httparse", + "memchr", + "mime", + "spin", + "version_check", +] + [[package]] name = "multimap" version = "0.8.3" diff --git a/core/lib/prover_interface/src/api.rs b/core/lib/prover_interface/src/api.rs index e4fe566618b8..bc95345bbbaa 100644 --- a/core/lib/prover_interface/src/api.rs +++ b/core/lib/prover_interface/src/api.rs @@ -65,9 +65,6 @@ pub enum SubmitProofRequest { SkippedProofGeneration, } -#[derive(Debug, Serialize, Deserialize)] -pub struct OptionalProofGenerationDataRequest(pub Option); - #[derive(Debug, Serialize, Deserialize)] pub struct VerifyProofRequest(pub Box); diff --git a/core/node/external_proof_integration_api/Cargo.toml b/core/node/external_proof_integration_api/Cargo.toml index 2e8176cd8832..362c315164cb 100644 --- a/core/node/external_proof_integration_api/Cargo.toml +++ b/core/node/external_proof_integration_api/Cargo.toml @@ -11,7 +11,7 @@ keywords.workspace = true categories.workspace = true [dependencies] -axum.workspace = true +axum = { workspace = true, features = ["multipart"] } tracing.workspace = true zksync_prover_interface.workspace = true zksync_basic_types.workspace = true diff --git a/core/node/external_proof_integration_api/src/lib.rs b/core/node/external_proof_integration_api/src/lib.rs index b1ef33b44c10..c81173b4ba8f 100644 --- a/core/node/external_proof_integration_api/src/lib.rs +++ b/core/node/external_proof_integration_api/src/lib.rs @@ -1,19 +1,28 @@ mod error; mod metrics; +mod middleware; mod processor; use std::{net::SocketAddr, sync::Arc}; use anyhow::Context; -use axum::{extract::Path, routing::post, Json, Router}; +use axum::{ + extract::{Multipart, Path, Request}, + middleware::Next, + routing::{get, post}, + Router, +}; use tokio::sync::watch; use zksync_basic_types::commitment::L1BatchCommitmentMode; use zksync_config::configs::external_proof_integration_api::ExternalProofIntegrationApiConfig; use zksync_dal::{ConnectionPool, Core}; use zksync_object_store::ObjectStore; -use zksync_prover_interface::api::{OptionalProofGenerationDataRequest, VerifyProofRequest}; -use crate::processor::Processor; +use crate::{ + metrics::{CallOutcome, Method}, + middleware::MetricsMiddleware, + processor::Processor, +}; pub async fn run_server( config: ExternalProofIntegrationApiConfig, @@ -23,7 +32,7 @@ pub async fn run_server( mut stop_receiver: watch::Receiver, ) -> anyhow::Result<()> { let bind_address = SocketAddr::from(([0, 0, 0, 0], config.http_port)); - tracing::debug!("Starting external prover API server on {bind_address}"); + tracing::info!("Starting external prover API server on {bind_address}"); let app = create_router(blob_store, connection_pool, commitment_mode).await; let listener = tokio::net::TcpListener::bind(bind_address) @@ -50,25 +59,45 @@ async fn create_router( let mut processor = Processor::new(blob_store.clone(), connection_pool.clone(), commitment_mode); let verify_proof_processor = processor.clone(); + let specific_proof_processor = processor.clone(); + + let middleware_factory = |method: Method| { + axum::middleware::from_fn(move |req: Request, next: Next| async move { + let middleware = MetricsMiddleware::new(method); + let response = next.run(req).await; + let outcome = match response.status().is_success() { + true => CallOutcome::Success, + false => CallOutcome::Failure, + }; + middleware.observe(outcome); + response + }) + }; + Router::new() .route( "/proof_generation_data", - post( - // we use post method because the returned data is not idempotent, - // i.e we return different result on each call. - move |payload: Json| async move { - processor.get_proof_generation_data(payload).await - }, - ), + get(move || async move { processor.get_proof_generation_data().await }) + .layer(middleware_factory(Method::GetLatestProofGenerationData)), + ) + .route( + "/proof_generation_data/:l1_batch_number", + get(move |l1_batch_number: Path| async move { + specific_proof_processor + .proof_generation_data_for_existing_batch(l1_batch_number) + .await + }) + .layer(middleware_factory(Method::GetSpecificProofGenerationData)), ) .route( "/verify_proof/:l1_batch_number", post( - move |l1_batch_number: Path, payload: Json| async move { + move |l1_batch_number: Path, multipart: Multipart| async move { verify_proof_processor - .verify_proof(l1_batch_number, payload) + .verify_proof(l1_batch_number, multipart) .await }, - ), + ) + .layer(middleware_factory(Method::VerifyProof)), ) } diff --git a/core/node/external_proof_integration_api/src/metrics.rs b/core/node/external_proof_integration_api/src/metrics.rs index 70815f542a05..f43b49b7b1c0 100644 --- a/core/node/external_proof_integration_api/src/metrics.rs +++ b/core/node/external_proof_integration_api/src/metrics.rs @@ -1,6 +1,5 @@ use std::time::Duration; -use tokio::time::Instant; use vise::{EncodeLabelSet, EncodeLabelValue, Histogram, LabeledFamily, Metrics}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)] @@ -25,31 +24,5 @@ pub(crate) struct ProofIntegrationApiMetrics { pub call_latency: LabeledFamily<(Method, CallOutcome), Histogram, 2>, } -pub(crate) struct MethodCallGuard { - method_type: Method, - outcome: CallOutcome, - started_at: Instant, -} - -impl MethodCallGuard { - pub(crate) fn new(method_type: Method) -> Self { - MethodCallGuard { - method_type, - outcome: CallOutcome::Failure, - started_at: Instant::now(), - } - } - - pub(crate) fn mark_successful(&mut self) { - self.outcome = CallOutcome::Success; - } -} - -impl Drop for MethodCallGuard { - fn drop(&mut self) { - METRICS.call_latency[&(self.method_type, self.outcome)].observe(self.started_at.elapsed()); - } -} - #[vise::register] pub(crate) static METRICS: vise::Global = vise::Global::new(); diff --git a/core/node/external_proof_integration_api/src/middleware.rs b/core/node/external_proof_integration_api/src/middleware.rs new file mode 100644 index 000000000000..1dc6aefe9171 --- /dev/null +++ b/core/node/external_proof_integration_api/src/middleware.rs @@ -0,0 +1,22 @@ +use tokio::time::Instant; + +use crate::metrics::{CallOutcome, Method, METRICS}; + +#[derive(Debug)] +pub(crate) struct MetricsMiddleware { + method: Method, + started_at: Instant, +} + +impl MetricsMiddleware { + pub fn new(method: Method) -> MetricsMiddleware { + MetricsMiddleware { + method, + started_at: Instant::now(), + } + } + + pub fn observe(&self, outcome: CallOutcome) { + METRICS.call_latency[&(self.method, outcome)].observe(self.started_at.elapsed()); + } +} diff --git a/core/node/external_proof_integration_api/src/processor.rs b/core/node/external_proof_integration_api/src/processor.rs index e9e56df4a068..64748f5c2278 100644 --- a/core/node/external_proof_integration_api/src/processor.rs +++ b/core/node/external_proof_integration_api/src/processor.rs @@ -1,26 +1,50 @@ use std::sync::Arc; -use axum::{extract::Path, Json}; +use axum::{ + extract::{Multipart, Path}, + http::header, + response::{IntoResponse, Response}, +}; use zksync_basic_types::{ basic_fri_types::Eip4844Blobs, commitment::L1BatchCommitmentMode, L1BatchNumber, }; use zksync_dal::{ConnectionPool, Core, CoreDal}; use zksync_object_store::{bincode, ObjectStore}; use zksync_prover_interface::{ - api::{ - OptionalProofGenerationDataRequest, ProofGenerationData, ProofGenerationDataResponse, - VerifyProofRequest, - }, + api::{ProofGenerationData, VerifyProofRequest}, inputs::{ L1BatchMetadataHashes, VMRunWitnessInputData, WitnessInputData, WitnessInputMerklePaths, }, outputs::L1BatchProofForL1, }; -use crate::{ - error::ProcessorError, - metrics::{Method, MethodCallGuard}, -}; +use crate::error::ProcessorError; + +pub(crate) struct ProofGenerationDataResponse(ProofGenerationData); + +impl IntoResponse for ProofGenerationDataResponse { + fn into_response(self) -> Response { + let l1_batch_number = self.0.l1_batch_number; + let data = match bincode::serialize(&self.0) { + Ok(data) => data, + Err(err) => { + return ProcessorError::Serialization(err).into_response(); + } + }; + + let headers = [ + (header::CONTENT_TYPE, "application/octet-stream"), + ( + header::CONTENT_DISPOSITION, + &format!( + "attachment; filename=\"witness_inputs_{}.bin\"", + l1_batch_number.0 + ), + ), + ]; + (headers, data).into_response() + } +} #[derive(Clone)] pub(crate) struct Processor { @@ -45,44 +69,65 @@ impl Processor { pub(crate) async fn verify_proof( &self, Path(l1_batch_number): Path, - Json(payload): Json, + mut multipart: Multipart, ) -> Result<(), ProcessorError> { - let mut guard = MethodCallGuard::new(Method::VerifyProof); - let l1_batch_number = L1BatchNumber(l1_batch_number); - tracing::info!( + tracing::debug!( "Received request to verify proof for batch: {:?}", l1_batch_number ); - let serialized_proof = bincode::serialize(&payload.0)?; + let latest_available_batch = self + .pool + .connection() + .await + .unwrap() + .proof_generation_dal() + .get_latest_proven_batch() + .await?; + + if l1_batch_number > latest_available_batch { + return Err(ProcessorError::BatchNotReady(l1_batch_number)); + } + + let mut serialized_proof = vec![]; + + while let Some(field) = multipart + .next_field() + .await + .map_err(|_| ProcessorError::InvalidProof)? + { + if field.name() == Some("proof") + && field.content_type() == Some("application/octet-stream") + { + serialized_proof.extend_from_slice(&field.bytes().await.unwrap()); + break; + } + } + + tracing::info!("Received proof is size: {}", serialized_proof.len()); + + let payload: VerifyProofRequest = bincode::deserialize(&serialized_proof)?; + let expected_proof = bincode::serialize( &self .blob_store .get::((l1_batch_number, payload.0.protocol_version)) - .await?, + .await + .map_err(ProcessorError::ObjectStore)?, )?; if serialized_proof != expected_proof { return Err(ProcessorError::InvalidProof); } - guard.mark_successful(); - Ok(()) } - #[tracing::instrument(skip_all)] pub(crate) async fn get_proof_generation_data( &mut self, - request: Json, - ) -> Result, ProcessorError> { - tracing::info!("Received request for proof generation data: {:?}", request); - - let mut guard = match request.0 .0 { - Some(_) => MethodCallGuard::new(Method::GetSpecificProofGenerationData), - None => MethodCallGuard::new(Method::GetLatestProofGenerationData), - }; + ) -> Result { + tracing::debug!("Received request for proof generation data"); let latest_available_batch = self .pool @@ -93,38 +138,45 @@ impl Processor { .get_latest_proven_batch() .await?; - let l1_batch_number = if let Some(l1_batch_number) = request.0 .0 { - if l1_batch_number > latest_available_batch { - tracing::error!( - "Requested batch is not available: {:?}, latest available batch is {:?}", - l1_batch_number, - latest_available_batch - ); - return Err(ProcessorError::BatchNotReady(l1_batch_number)); - } - l1_batch_number - } else { - latest_available_batch - }; + self.proof_generation_data_for_existing_batch_internal(latest_available_batch) + .await + .map(ProofGenerationDataResponse) + } - let proof_generation_data = self - .proof_generation_data_for_existing_batch(l1_batch_number) - .await; + pub(crate) async fn proof_generation_data_for_existing_batch( + &self, + Path(l1_batch_number): Path, + ) -> Result { + let l1_batch_number = L1BatchNumber(l1_batch_number); + tracing::debug!( + "Received request for proof generation data for batch: {:?}", + l1_batch_number + ); - match proof_generation_data { - Ok(data) => { - guard.mark_successful(); + let latest_available_batch = self + .pool + .connection() + .await + .unwrap() + .proof_generation_dal() + .get_latest_proven_batch() + .await?; - Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new( - data, - ))))) - } - Err(err) => Err(err), + if l1_batch_number > latest_available_batch { + tracing::error!( + "Requested batch is not available: {:?}, latest available batch is {:?}", + l1_batch_number, + latest_available_batch + ); + return Err(ProcessorError::BatchNotReady(l1_batch_number)); } + + self.proof_generation_data_for_existing_batch_internal(latest_available_batch) + .await + .map(ProofGenerationDataResponse) } - #[tracing::instrument(skip(self))] - async fn proof_generation_data_for_existing_batch( + async fn proof_generation_data_for_existing_batch_internal( &self, l1_batch_number: L1BatchNumber, ) -> Result { diff --git a/prover/docs/05_proving_batch.md b/prover/docs/05_proving_batch.md index 441a8225f866..e09a44cb0ff7 100644 --- a/prover/docs/05_proving_batch.md +++ b/prover/docs/05_proving_batch.md @@ -72,13 +72,13 @@ input file, called `witness_inputs_.bin` generated by different core comp batch, that was already proven. Example: ```shell - curl -H "Content-Type: application/json" -X POST {address}/proof_generation_data -d 'null' + wget --content-disposition {address}/proof_generation_data ``` or ```shell - curl -H "Content-Type: application/json" -X POST {address}/proof_generation_data -d '1000' + wget --content-disposition {address}/proof_generation_data/{l1_batch_number} ``` ### Preparing database @@ -140,6 +140,12 @@ And you are good to go! The prover subsystem will prove the batch and you can ch Now, assuming the proof is already generated, you can verify using `ExternalProofIntegrationAPI`. Usually proof is stored in GCS bucket(for which you can use the same steps as for getting the witness inputs data [here](#getting-data-needed-for-proving), but locally you can find it in `/artifacts/proofs_fri` directory). Now, simply -send the data to the endpoint `{address}/verify_batch/{batch_number}`. Note, that you need to pass the generated proof -as serialized JSON data when calling the endpoint. API will respond with status 200 if the proof is valid and with the -error message otherwise. +send the data to the endpoint `{address}/verify_batch/{batch_number}`. + +Example: + +```shell +curl -v -F proof=@{path_to_proof_binary} {address_of_API}/verify_proof/{l1_batch_number} +``` + +API will respond with status 200 if the proof is valid and with the error message otherwise.