Skip to content

Commit

Permalink
Handle transient errors gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
pbeza committed Jul 3, 2024
1 parent 5141a71 commit fb96103
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 35 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/bin/zksync_tee_prover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async-trait.workspace = true
reqwest.workspace = true
secp256k1.workspace = true
serde = { workspace = true, features = ["derive"] }
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
url.workspace = true
Expand Down
8 changes: 5 additions & 3 deletions core/bin/zksync_tee_prover/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use zksync_prover_interface::{
};
use zksync_types::{tee_types::TeeType, L1BatchNumber};

use crate::error::TeeProverError;

/// Implementation of the API client for the proof data handler, run by
/// [`zksync_proof_data_handler::run_server`].
#[derive(Debug)]
Expand Down Expand Up @@ -56,7 +58,7 @@ impl TeeApiClient {
&self,
attestation_quote_bytes: Vec<u8>,
public_key: &PublicKey,
) -> anyhow::Result<()> {
) -> Result<(), TeeProverError> {
let request = RegisterTeeAttestationRequest {
attestation: attestation_quote_bytes,
pubkey: public_key.serialize().to_vec(),
Expand All @@ -72,7 +74,7 @@ impl TeeApiClient {

/// Fetches the next job for the TEE prover to process, verifying and signing it if the
/// verification is successful.
pub async fn get_job(&self) -> anyhow::Result<Option<Box<TeeVerifierInput>>> {
pub async fn get_job(&self) -> Result<Option<Box<TeeVerifierInput>>, TeeProverError> {
let request = TeeProofGenerationDataRequest {};
let response = self
.post::<_, TeeProofGenerationDataResponse, _>("/tee/proof_inputs", request)
Expand All @@ -88,7 +90,7 @@ impl TeeApiClient {
pubkey: &PublicKey,
root_hash: H256,
tee_type: TeeType,
) -> anyhow::Result<()> {
) -> Result<(), TeeProverError> {
let request = SubmitTeeProofRequest(Box::new(L1BatchTeeProofForL1 {
signature: signature.serialize_compact().into(),
pubkey: pubkey.serialize().into(),
Expand Down
47 changes: 47 additions & 0 deletions core/bin/zksync_tee_prover/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::{error::Error as StdError, io};

use reqwest::StatusCode;

#[derive(Debug, thiserror::Error)]
pub(crate) enum TeeProverError {
#[error(transparent)]
Request(#[from] reqwest::Error),
#[error(transparent)]
Verification(anyhow::Error),
}

impl TeeProverError {
pub fn is_transient(&self) -> bool {
match self {
Self::Request(err) => is_transient_http_error(err),
_ => false,
}
}
}

fn is_transient_http_error(err: &reqwest::Error) -> bool {
err.is_timeout()
|| err.is_connect()
// Not all request errors are logically transient, but a significant part of them are (e.g.,
// `hyper` protocol-level errors), and it's safer to consider an error transient.
|| err.is_request()
|| has_transient_io_source(err)
|| err.status() == Some(StatusCode::BAD_GATEWAY)
|| err.status() == Some(StatusCode::SERVICE_UNAVAILABLE)
}

fn has_transient_io_source(err: &(dyn StdError + 'static)) -> bool {
// We treat any I/O errors as transient. This isn't always true, but frequently occurring I/O errors
// (e.g., "connection reset by peer") *are* transient, and treating an error as transient is a safer option,
// even if it can lead to unnecessary retries.
get_source::<io::Error>(err).is_some()
}

fn get_source<'a, T: StdError + 'static>(mut err: &'a (dyn StdError + 'static)) -> Option<&'a T> {
loop {
if let Some(err) = err.downcast_ref::<T>() {
return Some(err);
}
err = err.source()?;
}
}
1 change: 1 addition & 0 deletions core/bin/zksync_tee_prover/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use zksync_node_framework::{

mod api_client;
mod config;
mod error;
mod tee_prover;

/// This application serves as a TEE verifier, a.k.a. a TEE prover.
Expand Down
106 changes: 74 additions & 32 deletions core/bin/zksync_tee_prover/src/tee_prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use zksync_prover_interface::inputs::TeeVerifierInput;
use zksync_tee_verifier::Verify;
use zksync_types::{tee_types::TeeType, L1BatchNumber};

use crate::api_client::TeeApiClient;
use crate::{api_client::TeeApiClient, error::TeeProverError};

/// Wiring layer for `TeeProver`
///
Expand Down Expand Up @@ -55,6 +55,7 @@ impl WiringLayer for TeeProverLayer {

async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
let tee_prover_task = TeeProver {
config: Default::default(),
signing_key: self.signing_key,
public_key: self.signing_key.public_key(&Secp256k1::new()),
attestation_quote_bytes: self.attestation_quote_bytes,
Expand All @@ -67,6 +68,7 @@ impl WiringLayer for TeeProverLayer {
}

struct TeeProver {
config: TeeProverConfig,
signing_key: SecretKey,
public_key: PublicKey,
attestation_quote_bytes: Vec<u8>,
Expand All @@ -75,19 +77,66 @@ struct TeeProver {
}

impl TeeProver {
fn verify(&self, tvi: TeeVerifierInput) -> anyhow::Result<(Signature, L1BatchNumber, H256)> {
fn verify(
&self,
tvi: TeeVerifierInput,
) -> Result<(Signature, L1BatchNumber, H256), TeeProverError> {
match tvi {
TeeVerifierInput::V1(tvi) => {
let verification_result = tvi.verify()?;
let verification_result = tvi.verify().map_err(TeeProverError::Verification)?;
let root_hash_bytes = verification_result.value_hash.as_bytes();
let batch_number = verification_result.batch_number;
let msg_to_sign = Message::from_slice(root_hash_bytes)?;
let msg_to_sign = Message::from_slice(root_hash_bytes)
.map_err(|e| TeeProverError::Verification(e.into()))?;
let signature = self.signing_key.sign_ecdsa(msg_to_sign);
Ok((signature, batch_number, verification_result.value_hash))
}
_ => Err(anyhow::anyhow!(
_ => Err(TeeProverError::Verification(anyhow::anyhow!(
"Only TeeVerifierInput::V1 verification supported."
)),
))),
}
}

async fn step(&self) -> Result<(), TeeProverError> {
match self.api_client.get_job().await? {
Some(job) => {
let (signature, batch_number, root_hash) = self.verify(*job)?;
self.api_client
.submit_proof(
batch_number,
signature,
&self.public_key,
root_hash,
self.tee_type,
)
.await?;
}
None => tracing::trace!("There are currently no pending batches to be proven"),
}
Ok(())
}
}

/// TEE prover configuration options.
#[derive(Debug, Clone)]
pub struct TeeProverConfig {
/// Number of retries for transient errors before giving up on recovery (i.e., returning an error
/// from [`Self::run()`]).
pub max_retries: usize,
/// Initial back-off interval when retrying recovery on a transient error. Each subsequent retry interval
/// will be multiplied by [`Self.retry_backoff_multiplier`].
pub initial_retry_backoff: Duration,
pub retry_backoff_multiplier: f32,
pub max_backoff: Duration,
}

impl Default for TeeProverConfig {
fn default() -> Self {
Self {
max_retries: 5,
initial_retry_backoff: Duration::from_secs(1),
retry_backoff_multiplier: 2.0,
max_backoff: Duration::from_secs(128),
}
}
}
Expand All @@ -105,42 +154,35 @@ impl Task for TeeProver {
.register_attestation(self.attestation_quote_bytes.clone(), &self.public_key)
.await?;

const POLLING_INTERVAL_MS: u64 = 1000;
const MAX_BACKOFF_MS: u64 = 60_000;
const BACKOFF_MULTIPLIER: u64 = 2;

let mut backoff: u64 = POLLING_INTERVAL_MS;
let mut retries = 1;
let mut backoff = self.config.initial_retry_backoff;

loop {
if *stop_receiver.0.borrow() {
tracing::info!("Stop signal received, shutting down TEE Prover component");
return Ok(());
}
let job = match self.api_client.get_job().await {
Ok(Some(job)) => {
backoff = POLLING_INTERVAL_MS;
job
let result = self.step().await;
match result {
Ok(()) => {
retries = 1;
backoff = self.config.initial_retry_backoff;
}
Ok(None) => {
tracing::info!("There are currently no pending batches to be proven; backing off for {} ms", backoff);
tokio::time::timeout(Duration::from_millis(backoff), stop_receiver.0.changed())
Err(err) => {
if !err.is_transient() || retries > self.config.max_retries {
return Err(err.into());
}
retries += 1;
tracing::warn!(%err, "Failed TEE prover step function {retries}/{}, retrying in {} milliseconds.", self.config.max_retries, backoff.as_millis());
tokio::time::timeout(backoff, stop_receiver.0.changed())
.await
.ok();
backoff = (backoff * BACKOFF_MULTIPLIER).min(MAX_BACKOFF_MS);
continue;
backoff = std::cmp::min(
backoff.mul_f32(self.config.retry_backoff_multiplier),
self.config.max_backoff,
);
}
Err(e) => return Err(e),
};
let (signature, batch_number, root_hash) = self.verify(*job)?;
self.api_client
.submit_proof(
batch_number,
signature,
&self.public_key,
root_hash,
self.tee_type,
)
.await?;
}
}
}
}

0 comments on commit fb96103

Please sign in to comment.