diff --git a/Cargo.toml b/Cargo.toml index 55113ae6..da62104a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,8 @@ helios-opstack = { path = "./opstack" } tokio = { version = "1", features = ["full"] } dotenv = "0.15.0" serde = { version = "1.0.154", features = ["derive"] } +mockito = "0.31" +tracing = "0.1" [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] alloy = { version = "0.2.1", features = ["full"] } diff --git a/core/execution/rpc/http_rpc.rs b/core/execution/rpc/http_rpc.rs new file mode 100644 index 00000000..22eec318 --- /dev/null +++ b/core/execution/rpc/http_rpc.rs @@ -0,0 +1,76 @@ +use std::time::Duration; +use wasmtimer::tokio::sleep; + +/// Retry mechanism configuration +#[derive(Clone, Debug)] +pub struct RetryConfig { + /// Maximum number of attempts to execute the request + pub max_attempts: u32, + /// Initial delay between retry attempts + pub initial_backoff: Duration, + /// Maximum delay between retry attempts + pub max_backoff: Duration, +} + +impl HttpProvider { + // Add the ability to configure retry settings + pub fn with_retry_config(mut self, config: RetryConfig) -> Self { + self.retry_config = config; + self + } + + async fn execute_with_retry(&self, request: Request) -> Result, Error> + where + T: serde::Serialize + Send + Sync, + { + let mut attempts = 0; + let mut backoff = self.retry_config.initial_backoff; + + loop { + attempts += 1; + match self.execute_internal(request.clone()).await { + Ok(response) => return Ok(response), + Err(err) => { + if !should_retry(&err) || attempts >= self.retry_config.max_attempts { + return Err(err.into()); + } + + tracing::debug!( + "Request failed with error: {:?}. Retrying ({}/{})", + err, + attempts, + self.retry_config.max_attempts + ); + + sleep(backoff).await; + + backoff = std::cmp::min( + backoff * 2, + self.retry_config.max_backoff + ); + } + } + } + } +} + +// Extend the list of errors that can trigger a retry +fn should_retry(error: &Error) -> bool { + match error { + Error::RateLimitExceeded(_) => true, + Error::ConnectionError(_) => true, + Error::TimeoutError => true, + Error::ServerError(status) => status.as_u16() >= 500, + _ => false, + } +} + +// Update the existing execute method to use the retry mechanism +impl Provider for HttpProvider { + async fn execute(&self, request: Request) -> Result, Error> + where + T: serde::Serialize + Send + Sync, + { + self.execute_with_retry(request).await + } +} diff --git a/core/execution/rpc/tests/http_rpc_tests.rs b/core/execution/rpc/tests/http_rpc_tests.rs new file mode 100644 index 00000000..0020f20c --- /dev/null +++ b/core/execution/rpc/tests/http_rpc_tests.rs @@ -0,0 +1,39 @@ +use mockito::{mock, Server}; +use std::time::Duration; + +#[tokio::test] +async fn test_retry_mechanism() { + let mut server = Server::new(); + + // Test successful retry after a temporary error + let mock = server.mock("POST", "/") + .with_status(429) // Rate limit error + .create(); + + let provider = HttpProvider::new(server.url().as_str()); + let request = Request::new("eth_blockNumber", ()); + + let result = provider.execute(request).await; + assert!(result.is_err()); + mock.assert_hits(1); + + // Test the maximum number of retry attempts + let mock = server.mock("POST", "/") + .with_status(429) + .expect(3) // Expect exactly 3 attempts + .create(); + + let result = provider.execute(request).await; + assert!(result.is_err()); + mock.assert(); + + // Test for errors that should not trigger retries + let mock = server.mock("POST", "/") + .with_status(400) // Bad Request + .expect(1) // Expect only 1 attempt + .create(); + + let result = provider.execute(request).await; + assert!(result.is_err()); + mock.assert(); +} diff --git a/core/src/execution/rpc/http_rpc.rs b/core/src/execution/rpc/http_rpc.rs index 9b175695..428718db 100644 --- a/core/src/execution/rpc/http_rpc.rs +++ b/core/src/execution/rpc/http_rpc.rs @@ -20,6 +20,8 @@ use super::ExecutionRpc; pub struct HttpRpc { url: String, + #[cfg(target_arch = "wasm32")] + retry_config: RetryConfig, #[cfg(not(target_arch = "wasm32"))] provider: RootProvider>, N>, #[cfg(target_arch = "wasm32")] @@ -48,6 +50,8 @@ impl ExecutionRpc for HttpRpc { Ok(HttpRpc { url: rpc.to_string(), + #[cfg(target_arch = "wasm32")] + retry_config: RetryConfig::default(), provider, }) } @@ -200,12 +204,15 @@ impl ExecutionRpc for HttpRpc { .map_err(|e| RpcError::new("new_pending_transaction_filter", e))?) } + #[cfg(target_arch = "wasm32")] async fn chain_id(&self) -> Result { - Ok(self - .provider - .get_chain_id() - .await - .map_err(|e| RpcError::new("chain_id", e))?) + self.execute_with_retry(|| async { + self.provider + .get_chain_id() + .await + .map_err(|e| RpcError::new("chain_id", e)) + }) + .await } async fn get_fee_history( @@ -231,3 +238,113 @@ impl ExecutionRpc for HttpRpc { .ok_or(eyre!("block not found")) } } + +#[cfg(target_arch = "wasm32")] +use std::time::Duration; +#[cfg(target_arch = "wasm32")] +use wasmtimer::tokio::sleep; + +#[cfg(target_arch = "wasm32")] +#[derive(Clone, Debug)] +struct RetryConfig { + max_attempts: u32, + initial_backoff: Duration, + max_backoff: Duration, +} + +#[cfg(target_arch = "wasm32")] +impl Default for RetryConfig { + fn default() -> Self { + Self { + max_attempts: 3, + initial_backoff: Duration::from_millis(100), + max_backoff: Duration::from_secs(5), + } + } +} + +#[cfg(target_arch = "wasm32")] +impl HttpRpc { + async fn execute_with_retry(&self, operation: F) -> Result + where + F: Fn() -> Fut + Clone, + Fut: std::future::Future>, + { + let config = RetryConfig::default(); + let mut attempts = 0; + let mut backoff = config.initial_backoff; + + loop { + attempts += 1; + match operation().await { + Ok(response) => return Ok(response), + Err(err) => { + if !Self::should_retry(&err) || attempts >= config.max_attempts { + return Err(err); + } + + sleep(backoff).await; + backoff = std::cmp::min(backoff * 2, config.max_backoff); + } + } + } + } + + fn should_retry(err: &RpcError) -> bool { + if let Some(source) = &err.source { + let error_str = source.to_string().to_lowercase(); + error_str.contains("rate limit") || + error_str.contains("timeout") || + error_str.contains("connection") || + (error_str.contains("server") && error_str.contains("50")) + } else { + false + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use mockito::{mock, Server}; + use std::time::Duration; + + #[tokio::test] + #[cfg(target_arch = "wasm32")] + async fn test_retry_mechanism() { + let mut server = Server::new(); + + // Test rate limit retry + let mock = server.mock("POST", "/") + .with_status(429) + .with_header("content-type", "application/json") + .with_body(r#"{"error": "rate limit exceeded"}"#) + .expect(3) + .create(); + + let provider = HttpRpc::::new(&server.url()).unwrap(); + let result = provider.chain_id().await; + + assert!(result.is_err()); + mock.assert(); + + // Test successful retry + let mock = server.mock("POST", "/") + .with_status(429) + .with_header("content-type", "application/json") + .with_body(r#"{"error": "rate limit exceeded"}"#) + .times(2) + .create(); + + let mock_success = server.mock("POST", "/") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(r#"{"result": "0x1"}"#) + .create(); + + let result = provider.chain_id().await; + assert!(result.is_ok()); + mock.assert(); + mock_success.assert(); + } +} diff --git a/core/src/execution/rpc/tests/http_rpc_tests.rs b/core/src/execution/rpc/tests/http_rpc_tests.rs new file mode 100644 index 00000000..730dd23f --- /dev/null +++ b/core/src/execution/rpc/tests/http_rpc_tests.rs @@ -0,0 +1,40 @@ +#[cfg(test)] +mod tests { + use super::*; + use mockito::{mock, Server}; + use std::time::Duration; + use crate::execution::rpc::HttpRpc; + use crate::network_spec::NetworkSpec; + + #[tokio::test] + #[cfg(target_arch = "wasm32")] + async fn test_retry_mechanism() { + let mut server = Server::new(); + + // Test rate limit retry + let mock = server.mock("POST", "/") + .with_status(429) + .with_header("content-type", "application/json") + .with_body(r#"{"error": "rate limit exceeded"}"#) + .expect(3) + .create(); + + let provider = HttpRpc::::new(&server.url()).unwrap(); + let result = provider.chain_id().await; + + assert!(result.is_err()); + mock.assert(); + + // Test non-retryable error + let mock = server.mock("POST", "/") + .with_status(400) + .with_header("content-type", "application/json") + .with_body(r#"{"error": "bad request"}"#) + .expect(1) + .create(); + + let result = provider.chain_id().await; + assert!(result.is_err()); + mock.assert(); + } +}