diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 8f3bf3e33a..ae5e616edf 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -4145,7 +4145,7 @@ dependencies = [ "hyperlane-fuel", "hyperlane-sealevel", "hyperlane-test", - "itertools 0.11.0", + "itertools 0.12.0", "maplit", "paste", "prometheus", @@ -4192,7 +4192,7 @@ dependencies = [ "fixed-hash 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "getrandom 0.2.11", "hex 0.4.3", - "itertools 0.11.0", + "itertools 0.12.0", "num 0.4.1", "num-derive 0.4.1", "num-traits", @@ -4226,6 +4226,7 @@ dependencies = [ "hyperlane-core", "injective-protobuf", "injective-std", + "itertools 0.12.0", "once_cell", "protobuf", "ripemd", @@ -4438,7 +4439,7 @@ dependencies = [ "hyperlane-core", "hyperlane-sealevel-interchain-security-module-interface", "hyperlane-sealevel-message-recipient-interface", - "itertools 0.11.0", + "itertools 0.12.0", "log", "num-derive 0.4.1", "num-traits", @@ -4464,7 +4465,7 @@ dependencies = [ "hyperlane-sealevel-test-ism", "hyperlane-sealevel-test-send-receiver", "hyperlane-test-utils", - "itertools 0.11.0", + "itertools 0.12.0", "log", "num-derive 0.4.1", "num-traits", @@ -4981,15 +4982,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.12.0" @@ -6918,7 +6910,7 @@ dependencies = [ "hyperlane-core", "hyperlane-ethereum", "hyperlane-test", - "itertools 0.11.0", + "itertools 0.12.0", "num-derive 0.4.1", "num-traits", "once_cell", @@ -7565,7 +7557,7 @@ dependencies = [ "hyperlane-base", "hyperlane-core", "hyperlane-test", - "itertools 0.11.0", + "itertools 0.12.0", "migration", "num-bigint 0.4.4", "prometheus", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 1310918ed4..1c9a2e2ff4 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -97,7 +97,7 @@ hyper = "0.14" hyper-tls = "0.5.0" injective-protobuf = "0.2.2" injective-std = "0.1.5" -itertools = "0.11.0" +itertools = "*" jobserver = "=0.1.26" jsonrpc-core = "18.0" k256 = { version = "0.13.1", features = ["std", "ecdsa"] } diff --git a/rust/agents/relayer/Cargo.toml b/rust/agents/relayer/Cargo.toml index 0bd5697971..d6eb28a153 100644 --- a/rust/agents/relayer/Cargo.toml +++ b/rust/agents/relayer/Cargo.toml @@ -34,7 +34,7 @@ tokio = { workspace = true, features = ["rt", "macros", "parking_lot"] } tracing-futures.workspace = true tracing.workspace = true -hyperlane-core = { path = "../../hyperlane-core", features = ["agent"] } +hyperlane-core = { path = "../../hyperlane-core", features = ["agent", "fallback-provider"] } hyperlane-base = { path = "../../hyperlane-base" } hyperlane-ethereum = { path = "../../chains/hyperlane-ethereum" } diff --git a/rust/agents/validator/Cargo.toml b/rust/agents/validator/Cargo.toml index f562938db2..bfea4c16a3 100644 --- a/rust/agents/validator/Cargo.toml +++ b/rust/agents/validator/Cargo.toml @@ -24,7 +24,7 @@ tokio = { workspace = true, features = ["rt", "macros", "parking_lot"] } tracing-futures.workspace = true tracing.workspace = true -hyperlane-core = { path = "../../hyperlane-core", features = ["agent"] } +hyperlane-core = { path = "../../hyperlane-core", features = ["agent", "fallback-provider"] } hyperlane-base = { path = "../../hyperlane-base" } hyperlane-ethereum = { path = "../../chains/hyperlane-ethereum" } hyperlane-cosmos = { path = "../../chains/hyperlane-cosmos" } diff --git a/rust/chains/hyperlane-cosmos/Cargo.toml b/rust/chains/hyperlane-cosmos/Cargo.toml index b48aa3590d..6115a61e6d 100644 --- a/rust/chains/hyperlane-cosmos/Cargo.toml +++ b/rust/chains/hyperlane-cosmos/Cargo.toml @@ -22,6 +22,7 @@ hyper = { workspace = true } hyper-tls = { workspace = true } injective-protobuf = { workspace = true } injective-std = { workspace = true } +itertools = { workspace = true } once_cell = { workspace = true } protobuf = { workspace = true } ripemd = { workspace = true } @@ -38,4 +39,4 @@ tracing = { workspace = true } tracing-futures = { workspace = true } url = { workspace = true } -hyperlane-core = { path = "../../hyperlane-core" } +hyperlane-core = { path = "../../hyperlane-core", features = ["fallback-provider"]} diff --git a/rust/chains/hyperlane-cosmos/src/error.rs b/rust/chains/hyperlane-cosmos/src/error.rs index 2c3e1e7475..06fffaff7e 100644 --- a/rust/chains/hyperlane-cosmos/src/error.rs +++ b/rust/chains/hyperlane-cosmos/src/error.rs @@ -1,5 +1,6 @@ use cosmrs::proto::prost; use hyperlane_core::ChainCommunicationError; +use std::fmt::Debug; /// Errors from the crates specific to the hyperlane-cosmos /// implementation. @@ -28,6 +29,9 @@ pub enum HyperlaneCosmosError { /// Tonic error #[error("{0}")] Tonic(#[from] tonic::transport::Error), + /// Tonic codegen error + #[error("{0}")] + TonicGenError(#[from] tonic::codegen::StdError), /// Tendermint RPC Error #[error(transparent)] TendermintError(#[from] tendermint_rpc::error::Error), @@ -37,6 +41,9 @@ pub enum HyperlaneCosmosError { /// Protobuf error #[error("{0}")] Protobuf(#[from] protobuf::ProtobufError), + /// Fallback providers failed + #[error("Fallback providers failed. (Errors: {0:?})")] + FallbackProvidersFailed(Vec), } impl From for ChainCommunicationError { diff --git a/rust/chains/hyperlane-cosmos/src/lib.rs b/rust/chains/hyperlane-cosmos/src/lib.rs index 82a4a0ece1..c0ce3ad549 100644 --- a/rust/chains/hyperlane-cosmos/src/lib.rs +++ b/rust/chains/hyperlane-cosmos/src/lib.rs @@ -16,6 +16,7 @@ mod multisig_ism; mod payloads; mod providers; mod routing_ism; +mod rpc_clients; mod signers; mod trait_builder; mod types; diff --git a/rust/chains/hyperlane-cosmos/src/payloads/aggregate_ism.rs b/rust/chains/hyperlane-cosmos/src/payloads/aggregate_ism.rs index 8276675ff7..23bb35a8f8 100644 --- a/rust/chains/hyperlane-cosmos/src/payloads/aggregate_ism.rs +++ b/rust/chains/hyperlane-cosmos/src/payloads/aggregate_ism.rs @@ -1,11 +1,11 @@ use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct VerifyRequest { pub verify: VerifyRequestInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct VerifyRequestInner { pub metadata: String, pub message: String, diff --git a/rust/chains/hyperlane-cosmos/src/payloads/general.rs b/rust/chains/hyperlane-cosmos/src/payloads/general.rs index 488cae2d37..af2a4b0b6e 100644 --- a/rust/chains/hyperlane-cosmos/src/payloads/general.rs +++ b/rust/chains/hyperlane-cosmos/src/payloads/general.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct EmptyStruct {} #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/rust/chains/hyperlane-cosmos/src/payloads/ism_routes.rs b/rust/chains/hyperlane-cosmos/src/payloads/ism_routes.rs index 052a1cc48b..1f659840e4 100644 --- a/rust/chains/hyperlane-cosmos/src/payloads/ism_routes.rs +++ b/rust/chains/hyperlane-cosmos/src/payloads/ism_routes.rs @@ -1,12 +1,12 @@ use super::general::EmptyStruct; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct IsmRouteRequest { pub route: IsmRouteRequestInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct IsmRouteRequestInner { pub message: String, // hexbinary } @@ -16,22 +16,22 @@ pub struct IsmRouteRespnose { pub ism: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct QueryRoutingIsmGeneralRequest { pub routing_ism: T, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct QueryRoutingIsmRouteResponse { pub ism: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct QueryIsmGeneralRequest { pub ism: T, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct QueryIsmModuleTypeRequest { pub module_type: EmptyStruct, } diff --git a/rust/chains/hyperlane-cosmos/src/payloads/mailbox.rs b/rust/chains/hyperlane-cosmos/src/payloads/mailbox.rs index 145ba5b16c..75eef04595 100644 --- a/rust/chains/hyperlane-cosmos/src/payloads/mailbox.rs +++ b/rust/chains/hyperlane-cosmos/src/payloads/mailbox.rs @@ -3,52 +3,52 @@ use serde::{Deserialize, Serialize}; use super::general::EmptyStruct; // Requests -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct GeneralMailboxQuery { pub mailbox: T, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct CountRequest { pub count: EmptyStruct, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct NonceRequest { pub nonce: EmptyStruct, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct RecipientIsmRequest { pub recipient_ism: RecipientIsmRequestInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct RecipientIsmRequestInner { pub recipient_addr: String, // hexbinary } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct DefaultIsmRequest { pub default_ism: EmptyStruct, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct DeliveredRequest { pub message_delivered: DeliveredRequestInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct DeliveredRequestInner { pub id: String, // hexbinary } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct ProcessMessageRequest { pub process: ProcessMessageRequestInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct ProcessMessageRequestInner { pub metadata: String, pub message: String, diff --git a/rust/chains/hyperlane-cosmos/src/payloads/merkle_tree_hook.rs b/rust/chains/hyperlane-cosmos/src/payloads/merkle_tree_hook.rs index 7635f0ef72..e960628771 100644 --- a/rust/chains/hyperlane-cosmos/src/payloads/merkle_tree_hook.rs +++ b/rust/chains/hyperlane-cosmos/src/payloads/merkle_tree_hook.rs @@ -4,24 +4,24 @@ use super::general::EmptyStruct; const TREE_DEPTH: usize = 32; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct MerkleTreeGenericRequest { pub merkle_hook: T, } // --------- Requests --------- -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct MerkleTreeRequest { pub tree: EmptyStruct, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct MerkleTreeCountRequest { pub count: EmptyStruct, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct CheckPointRequest { pub check_point: EmptyStruct, } diff --git a/rust/chains/hyperlane-cosmos/src/payloads/multisig_ism.rs b/rust/chains/hyperlane-cosmos/src/payloads/multisig_ism.rs index 204e726dc7..c56588d1d6 100644 --- a/rust/chains/hyperlane-cosmos/src/payloads/multisig_ism.rs +++ b/rust/chains/hyperlane-cosmos/src/payloads/multisig_ism.rs @@ -1,11 +1,11 @@ use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct VerifyInfoRequest { pub verify_info: VerifyInfoRequestInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct VerifyInfoRequestInner { pub message: String, // hexbinary } diff --git a/rust/chains/hyperlane-cosmos/src/payloads/validator_announce.rs b/rust/chains/hyperlane-cosmos/src/payloads/validator_announce.rs index fdf449c7c4..cf4e5eb1f8 100644 --- a/rust/chains/hyperlane-cosmos/src/payloads/validator_announce.rs +++ b/rust/chains/hyperlane-cosmos/src/payloads/validator_announce.rs @@ -2,17 +2,17 @@ use serde::{Deserialize, Serialize}; use super::general::EmptyStruct; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct GetAnnouncedValidatorsRequest { pub get_announced_validators: EmptyStruct, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct GetAnnounceStorageLocationsRequest { pub get_announce_storage_locations: GetAnnounceStorageLocationsRequestInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct GetAnnounceStorageLocationsRequestInner { pub validators: Vec, } diff --git a/rust/chains/hyperlane-cosmos/src/providers/grpc.rs b/rust/chains/hyperlane-cosmos/src/providers/grpc.rs index 3594c7398e..a6bc070aba 100644 --- a/rust/chains/hyperlane-cosmos/src/providers/grpc.rs +++ b/rust/chains/hyperlane-cosmos/src/providers/grpc.rs @@ -24,7 +24,9 @@ use cosmrs::{ tx::{self, Fee, MessageExt, SignDoc, SignerInfo}, Any, Coin, }; +use derive_new::new; use hyperlane_core::{ + rpc_clients::{BlockNumberGetter, FallbackProvider}, ChainCommunicationError, ChainResult, ContractLocator, FixedPointNumber, HyperlaneDomain, U256, }; use protobuf::Message as _; @@ -33,9 +35,10 @@ use tonic::{ transport::{Channel, Endpoint}, GrpcMethod, IntoRequest, }; +use url::Url; -use crate::HyperlaneCosmosError; use crate::{address::CosmosAddress, CosmosAmount}; +use crate::{rpc_clients::CosmosFallbackProvider, HyperlaneCosmosError}; use crate::{signers::Signer, ConnectionConf}; /// A multiplier applied to a simulated transaction's gas usage to @@ -45,6 +48,36 @@ const GAS_ESTIMATE_MULTIPLIER: f64 = 1.25; /// be valid for. const TIMEOUT_BLOCKS: u64 = 1000; +#[derive(Debug, Clone, new)] +struct CosmosChannel { + channel: Channel, + /// The url that this channel is connected to. + /// Not explicitly used, but useful for debugging. + _url: Url, +} + +#[async_trait] +impl BlockNumberGetter for CosmosChannel { + async fn get_block_number(&self) -> Result { + let mut client = ServiceClient::new(self.channel.clone()); + let request = tonic::Request::new(GetLatestBlockRequest {}); + + let response = client + .get_latest_block(request) + .await + .map_err(ChainCommunicationError::from_other)? + .into_inner(); + let height = response + .block + .ok_or_else(|| ChainCommunicationError::from_other_str("block not present"))? + .header + .ok_or_else(|| ChainCommunicationError::from_other_str("header not present"))? + .height; + + Ok(height as u64) + } +} + #[async_trait] /// Cosmwasm GRPC Provider pub trait WasmProvider: Send + Sync { @@ -56,14 +89,14 @@ pub trait WasmProvider: Send + Sync { async fn latest_block_height(&self) -> ChainResult; /// Perform a wasm query against the stored contract address. - async fn wasm_query( + async fn wasm_query( &self, payload: T, block_height: Option, ) -> ChainResult>; /// Perform a wasm query against a specified contract address. - async fn wasm_query_to( + async fn wasm_query_to( &self, to: String, payload: T, @@ -71,14 +104,17 @@ pub trait WasmProvider: Send + Sync { ) -> ChainResult>; /// Send a wasm tx. - async fn wasm_send( + async fn wasm_send( &self, payload: T, gas_limit: Option, ) -> ChainResult; /// Estimate gas for a wasm tx. - async fn wasm_estimate_gas(&self, payload: T) -> ChainResult; + async fn wasm_estimate_gas( + &self, + payload: T, + ) -> ChainResult; } #[derive(Debug, Clone)] @@ -95,7 +131,7 @@ pub struct WasmGrpcProvider { signer: Option, /// GRPC Channel that can be cheaply cloned. /// See `` - channel: Channel, + provider: CosmosFallbackProvider, gas_price: CosmosAmount, } @@ -108,9 +144,21 @@ impl WasmGrpcProvider { locator: Option, signer: Option, ) -> ChainResult { - let endpoint = - Endpoint::new(conf.get_grpc_url()).map_err(Into::::into)?; - let channel = endpoint.connect_lazy(); + // get all the configured grpc urls and convert them to a Vec + let channels: Result, _> = conf + .get_grpc_urls() + .into_iter() + .map(|url| { + Endpoint::new(url.to_string()) + .map(|e| CosmosChannel::new(e.connect_lazy(), url)) + .map_err(Into::::into) + }) + .collect(); + let mut builder = FallbackProvider::builder(); + builder = builder.add_providers(channels?); + let fallback_provider = builder.build(); + let provider = CosmosFallbackProvider::new(fallback_provider); + let contract_address = locator .map(|l| { CosmosAddress::from_h256( @@ -126,7 +174,7 @@ impl WasmGrpcProvider { conf, contract_address, signer, - channel, + provider, gas_price, }) } @@ -225,21 +273,36 @@ impl WasmGrpcProvider { // https://github.com/cosmos/cosmjs/blob/44893af824f0712d1f406a8daa9fcae335422235/packages/stargate/src/modules/tx/queries.ts#L67 signatures: vec![vec![]], }; - - let mut client = TxServiceClient::new(self.channel.clone()); let tx_bytes = raw_tx .to_bytes() .map_err(ChainCommunicationError::from_other)?; - #[allow(deprecated)] - let sim_req = tonic::Request::new(SimulateRequest { tx: None, tx_bytes }); - let gas_used = client - .simulate(sim_req) - .await - .map_err(ChainCommunicationError::from_other)? - .into_inner() - .gas_info - .ok_or_else(|| ChainCommunicationError::from_other_str("gas info not present"))? - .gas_used; + let gas_used = self + .provider + .call(move |provider| { + let tx_bytes_clone = tx_bytes.clone(); + let future = async move { + let mut client = TxServiceClient::new(provider.channel.clone()); + #[allow(deprecated)] + let sim_req = tonic::Request::new(SimulateRequest { + tx: None, + tx_bytes: tx_bytes_clone, + }); + let gas_used = client + .simulate(sim_req) + .await + .map_err(ChainCommunicationError::from_other)? + .into_inner() + .gas_info + .ok_or_else(|| { + ChainCommunicationError::from_other_str("gas info not present") + })? + .gas_used; + + Ok(gas_used) + }; + Box::pin(future) + }) + .await?; let gas_estimate = (gas_used as f64 * GAS_ESTIMATE_MULTIPLIER) as u64; @@ -248,14 +311,25 @@ impl WasmGrpcProvider { /// Fetches balance for a given `address` and `denom` pub async fn get_balance(&self, address: String, denom: String) -> ChainResult { - let mut client = QueryBalanceClient::new(self.channel.clone()); - - let balance_request = tonic::Request::new(QueryBalanceRequest { address, denom }); - let response = client - .balance(balance_request) - .await - .map_err(ChainCommunicationError::from_other)? - .into_inner(); + let response = self + .provider + .call(move |provider| { + let address = address.clone(); + let denom = denom.clone(); + let future = async move { + let mut client = QueryBalanceClient::new(provider.channel.clone()); + let balance_request = + tonic::Request::new(QueryBalanceRequest { address, denom }); + let response = client + .balance(balance_request) + .await + .map_err(ChainCommunicationError::from_other)? + .into_inner(); + Ok(response) + }; + Box::pin(future) + }) + .await?; let balance = response .balance @@ -272,14 +346,23 @@ impl WasmGrpcProvider { return self.account_query_injective(account).await; } - let mut client = QueryAccountClient::new(self.channel.clone()); - - let request = tonic::Request::new(QueryAccountRequest { address: account }); - let response = client - .account(request) - .await - .map_err(ChainCommunicationError::from_other)? - .into_inner(); + let response = self + .provider + .call(move |provider| { + let address = account.clone(); + let future = async move { + let mut client = QueryAccountClient::new(provider.channel.clone()); + let request = tonic::Request::new(QueryAccountRequest { address }); + let response = client + .account(request) + .await + .map_err(ChainCommunicationError::from_other)? + .into_inner(); + Ok(response) + }; + Box::pin(future) + }) + .await?; let account = BaseAccount::decode( response @@ -294,32 +377,46 @@ impl WasmGrpcProvider { /// Injective-specific logic for querying an account. async fn account_query_injective(&self, account: String) -> ChainResult { - let request = tonic::Request::new( - injective_std::types::cosmos::auth::v1beta1::QueryAccountRequest { address: account }, - ); - - // Borrowed from the logic of `QueryAccountClient` in `cosmrs`, but using injective types. - - let mut grpc_client = tonic::client::Grpc::new(self.channel.clone()); - grpc_client - .ready() - .await - .map_err(Into::::into)?; - - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/cosmos.auth.v1beta1.Query/Account"); - let mut req: tonic::Request< - injective_std::types::cosmos::auth::v1beta1::QueryAccountRequest, - > = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("cosmos.auth.v1beta1.Query", "Account")); - - let response: tonic::Response< - injective_std::types::cosmos::auth::v1beta1::QueryAccountResponse, - > = grpc_client - .unary(req, path, codec) - .await - .map_err(Into::::into)?; + let response = self + .provider + .call(move |provider| { + let address = account.clone(); + let future = async move { + let request = tonic::Request::new( + injective_std::types::cosmos::auth::v1beta1::QueryAccountRequest { + address, + }, + ); + + // Borrowed from the logic of `QueryAccountClient` in `cosmrs`, but using injective types. + + let mut grpc_client = tonic::client::Grpc::new(provider.channel.clone()); + grpc_client + .ready() + .await + .map_err(Into::::into)?; + + let codec = tonic::codec::ProstCodec::default(); + let path = + http::uri::PathAndQuery::from_static("/cosmos.auth.v1beta1.Query/Account"); + let mut req: tonic::Request< + injective_std::types::cosmos::auth::v1beta1::QueryAccountRequest, + > = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("cosmos.auth.v1beta1.Query", "Account")); + + let response: tonic::Response< + injective_std::types::cosmos::auth::v1beta1::QueryAccountResponse, + > = grpc_client + .unary(req, path, codec) + .await + .map_err(Into::::into)?; + + Ok(response) + }; + Box::pin(future) + }) + .await?; let mut eth_account = injective_protobuf::proto::account::EthAccount::parse_from_bytes( response @@ -349,14 +446,23 @@ impl WasmGrpcProvider { #[async_trait] impl WasmProvider for WasmGrpcProvider { async fn latest_block_height(&self) -> ChainResult { - let mut client = ServiceClient::new(self.channel.clone()); - let request = tonic::Request::new(GetLatestBlockRequest {}); + let response = self + .provider + .call(move |provider| { + let future = async move { + let mut client = ServiceClient::new(provider.channel.clone()); + let request = tonic::Request::new(GetLatestBlockRequest {}); + let response = client + .get_latest_block(request) + .await + .map_err(ChainCommunicationError::from_other)? + .into_inner(); + Ok(response) + }; + Box::pin(future) + }) + .await?; - let response = client - .get_latest_block(request) - .await - .map_err(ChainCommunicationError::from_other)? - .into_inner(); let height = response .block .ok_or_else(|| ChainCommunicationError::from_other_str("block not present"))? @@ -369,7 +475,7 @@ impl WasmProvider for WasmGrpcProvider { async fn wasm_query(&self, payload: T, block_height: Option) -> ChainResult> where - T: Serialize + Send + Sync, + T: Serialize + Send + Sync + Clone, { let contract_address = self.contract_address.as_ref().ok_or_else(|| { ChainCommunicationError::from_other_str("No contract address available") @@ -385,39 +491,48 @@ impl WasmProvider for WasmGrpcProvider { block_height: Option, ) -> ChainResult> where - T: Serialize + Send + Sync, + T: Serialize + Send + Sync + Clone, { - let mut client = WasmQueryClient::new(self.channel.clone()); - let mut request = tonic::Request::new(QuerySmartContractStateRequest { - address: to, - query_data: serde_json::to_string(&payload)?.as_bytes().to_vec(), - }); - - if let Some(block_height) = block_height { - request - .metadata_mut() - .insert("x-cosmos-block-height", block_height.into()); - } - - let response = client - .smart_contract_state(request) - .await - .map_err(ChainCommunicationError::from_other)? - .into_inner(); + let query_data = serde_json::to_string(&payload)?.as_bytes().to_vec(); + let response = self + .provider + .call(move |provider| { + let to = to.clone(); + let query_data = query_data.clone(); + let future = async move { + let mut client = WasmQueryClient::new(provider.channel.clone()); + + let mut request = tonic::Request::new(QuerySmartContractStateRequest { + address: to, + query_data, + }); + if let Some(block_height) = block_height { + request + .metadata_mut() + .insert("x-cosmos-block-height", block_height.into()); + } + let response = client + .smart_contract_state(request) + .await + .map_err(ChainCommunicationError::from_other)? + .into_inner(); + Ok(response) + }; + Box::pin(future) + }) + .await?; Ok(response.data) } async fn wasm_send(&self, payload: T, gas_limit: Option) -> ChainResult where - T: Serialize + Send + Sync, + T: Serialize + Send + Sync + Clone, { let signer = self.get_signer()?; - let mut client = TxServiceClient::new(self.channel.clone()); let contract_address = self.contract_address.as_ref().ok_or_else(|| { ChainCommunicationError::from_other_str("No contract address available") })?; - let msgs = vec![MsgExecuteContract { sender: signer.address.clone(), contract: contract_address.address(), @@ -426,9 +541,6 @@ impl WasmProvider for WasmGrpcProvider { } .to_any() .map_err(ChainCommunicationError::from_other)?]; - - // We often use U256s to represent gas limits, but Cosmos expects u64s. Try to convert, - // and if it fails, just fallback to None which will result in gas estimation. let gas_limit: Option = gas_limit.and_then(|limit| match limit.try_into() { Ok(limit) => Some(limit), Err(err) => { @@ -439,20 +551,30 @@ impl WasmProvider for WasmGrpcProvider { None } }); - - let tx_req = BroadcastTxRequest { - tx_bytes: self.generate_raw_signed_tx(msgs, gas_limit).await?, - mode: BroadcastMode::Sync as i32, - }; - - let tx_res = client - .broadcast_tx(tx_req) - .await - .map_err(Into::::into)? - .into_inner() - .tx_response - .ok_or_else(|| ChainCommunicationError::from_other_str("Empty tx_response"))?; - + let tx_bytes = self.generate_raw_signed_tx(msgs, gas_limit).await?; + let tx_res = self + .provider + .call(move |provider| { + let tx_bytes = tx_bytes.clone(); + let future = async move { + let mut client = TxServiceClient::new(provider.channel.clone()); + // We often use U256s to represent gas limits, but Cosmos expects u64s. Try to convert, + // and if it fails, just fallback to None which will result in gas estimation. + let tx_req = BroadcastTxRequest { + tx_bytes, + mode: BroadcastMode::Sync as i32, + }; + client + .broadcast_tx(tx_req) + .await + .map_err(Into::::into)? + .into_inner() + .tx_response + .ok_or_else(|| ChainCommunicationError::from_other_str("Empty tx_response")) + }; + Box::pin(future) + }) + .await?; Ok(tx_res) } @@ -482,3 +604,10 @@ impl WasmProvider for WasmGrpcProvider { Ok(response) } } + +#[async_trait] +impl BlockNumberGetter for WasmGrpcProvider { + async fn get_block_number(&self) -> Result { + self.latest_block_height().await + } +} diff --git a/rust/chains/hyperlane-cosmos/src/rpc_clients/fallback.rs b/rust/chains/hyperlane-cosmos/src/rpc_clients/fallback.rs new file mode 100644 index 0000000000..cf933ee73d --- /dev/null +++ b/rust/chains/hyperlane-cosmos/src/rpc_clients/fallback.rs @@ -0,0 +1,140 @@ +use std::{ + fmt::{Debug, Formatter}, + ops::Deref, +}; + +use derive_new::new; +use hyperlane_core::rpc_clients::FallbackProvider; + +/// Wrapper of `FallbackProvider` for use in `hyperlane-cosmos` +#[derive(new, Clone)] +pub struct CosmosFallbackProvider { + fallback_provider: FallbackProvider, +} + +impl Deref for CosmosFallbackProvider { + type Target = FallbackProvider; + + fn deref(&self) -> &Self::Target { + &self.fallback_provider + } +} + +impl Debug for CosmosFallbackProvider +where + C: Debug, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.fallback_provider.fmt(f) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use async_trait::async_trait; + use hyperlane_core::rpc_clients::test::ProviderMock; + use hyperlane_core::rpc_clients::{BlockNumberGetter, FallbackProviderBuilder}; + use hyperlane_core::ChainCommunicationError; + use tokio::time::sleep; + + use super::*; + + #[derive(Debug, Clone)] + struct CosmosProviderMock(ProviderMock); + + impl Deref for CosmosProviderMock { + type Target = ProviderMock; + + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl Default for CosmosProviderMock { + fn default() -> Self { + Self(ProviderMock::default()) + } + } + + impl CosmosProviderMock { + fn new(request_sleep: Option) -> Self { + Self(ProviderMock::new(request_sleep)) + } + } + + #[async_trait] + impl BlockNumberGetter for CosmosProviderMock { + async fn get_block_number(&self) -> Result { + Ok(0) + } + } + + impl Into> for CosmosProviderMock { + fn into(self) -> Box { + Box::new(self) + } + } + + impl CosmosFallbackProvider { + async fn low_level_test_call(&mut self) -> Result<(), ChainCommunicationError> { + self.call(|provider| { + provider.push("GET", "http://localhost:1234"); + let future = async move { + let body = tonic::body::BoxBody::default(); + let response = http::Response::builder().status(200).body(body).unwrap(); + if let Some(sleep_duration) = provider.request_sleep() { + sleep(sleep_duration).await; + } + Ok(response) + }; + Box::pin(future) + }) + .await?; + Ok(()) + } + } + + #[tokio::test] + async fn test_first_provider_is_attempted() { + let fallback_provider_builder = FallbackProviderBuilder::default(); + let providers = vec![ + CosmosProviderMock::default(), + CosmosProviderMock::default(), + CosmosProviderMock::default(), + ]; + let fallback_provider = fallback_provider_builder.add_providers(providers).build(); + let mut cosmos_fallback_provider = CosmosFallbackProvider::new(fallback_provider); + cosmos_fallback_provider + .low_level_test_call() + .await + .unwrap(); + let provider_call_count: Vec<_> = + ProviderMock::get_call_counts(&cosmos_fallback_provider).await; + assert_eq!(provider_call_count, vec![1, 0, 0]); + } + + #[tokio::test] + async fn test_one_stalled_provider() { + let fallback_provider_builder = FallbackProviderBuilder::default(); + let providers = vec![ + CosmosProviderMock::new(Some(Duration::from_millis(10))), + CosmosProviderMock::default(), + CosmosProviderMock::default(), + ]; + let fallback_provider = fallback_provider_builder + .add_providers(providers) + .with_max_block_time(Duration::from_secs(0)) + .build(); + let mut cosmos_fallback_provider = CosmosFallbackProvider::new(fallback_provider); + cosmos_fallback_provider + .low_level_test_call() + .await + .unwrap(); + + let provider_call_count: Vec<_> = + ProviderMock::get_call_counts(&cosmos_fallback_provider).await; + assert_eq!(provider_call_count, vec![0, 0, 1]); + } +} diff --git a/rust/chains/hyperlane-cosmos/src/rpc_clients/mod.rs b/rust/chains/hyperlane-cosmos/src/rpc_clients/mod.rs new file mode 100644 index 0000000000..536845688d --- /dev/null +++ b/rust/chains/hyperlane-cosmos/src/rpc_clients/mod.rs @@ -0,0 +1,3 @@ +pub use self::fallback::*; + +mod fallback; diff --git a/rust/chains/hyperlane-cosmos/src/trait_builder.rs b/rust/chains/hyperlane-cosmos/src/trait_builder.rs index 2bacb4d2f5..1bb3627b9d 100644 --- a/rust/chains/hyperlane-cosmos/src/trait_builder.rs +++ b/rust/chains/hyperlane-cosmos/src/trait_builder.rs @@ -2,12 +2,13 @@ use std::str::FromStr; use derive_new::new; use hyperlane_core::{ChainCommunicationError, FixedPointNumber}; +use url::Url; /// Cosmos connection configuration #[derive(Debug, Clone)] pub struct ConnectionConf { /// The GRPC url to connect to - grpc_url: String, + grpc_urls: Vec, /// The RPC url to connect to rpc_url: String, /// The chain ID @@ -76,8 +77,8 @@ pub enum ConnectionConfError { impl ConnectionConf { /// Get the GRPC url - pub fn get_grpc_url(&self) -> String { - self.grpc_url.clone() + pub fn get_grpc_urls(&self) -> Vec { + self.grpc_urls.clone() } /// Get the RPC url @@ -112,7 +113,7 @@ impl ConnectionConf { /// Create a new connection configuration pub fn new( - grpc_url: String, + grpc_urls: Vec, rpc_url: String, chain_id: String, bech32_prefix: String, @@ -121,7 +122,7 @@ impl ConnectionConf { contract_address_bytes: usize, ) -> Self { Self { - grpc_url, + grpc_urls, rpc_url, chain_id, bech32_prefix, diff --git a/rust/chains/hyperlane-ethereum/Cargo.toml b/rust/chains/hyperlane-ethereum/Cargo.toml index 8d6db17f45..a72855a00b 100644 --- a/rust/chains/hyperlane-ethereum/Cargo.toml +++ b/rust/chains/hyperlane-ethereum/Cargo.toml @@ -30,7 +30,7 @@ tracing-futures.workspace = true tracing.workspace = true url.workspace = true -hyperlane-core = { path = "../../hyperlane-core" } +hyperlane-core = { path = "../../hyperlane-core", features = ["fallback-provider"]} ethers-prometheus = { path = "../../ethers-prometheus", features = ["serde"] } [build-dependencies] diff --git a/rust/chains/hyperlane-ethereum/src/rpc_clients/fallback.rs b/rust/chains/hyperlane-ethereum/src/rpc_clients/fallback.rs index 2ac6ae009f..1243ecc106 100644 --- a/rust/chains/hyperlane-ethereum/src/rpc_clients/fallback.rs +++ b/rust/chains/hyperlane-ethereum/src/rpc_clients/fallback.rs @@ -12,23 +12,23 @@ use serde_json::Value; use tokio::time::sleep; use tracing::{instrument, warn_span}; -use ethers_prometheus::json_rpc_client::PrometheusJsonRpcClientConfigExt; +use ethers_prometheus::json_rpc_client::{JsonRpcBlockGetter, PrometheusJsonRpcClientConfigExt}; use crate::rpc_clients::{categorize_client_response, CategorizedResponse}; /// Wrapper of `FallbackProvider` for use in `hyperlane-ethereum` #[derive(new)] -pub struct EthereumFallbackProvider(FallbackProvider); +pub struct EthereumFallbackProvider(FallbackProvider); -impl Deref for EthereumFallbackProvider { - type Target = FallbackProvider; +impl Deref for EthereumFallbackProvider { + type Target = FallbackProvider; fn deref(&self) -> &Self::Target { &self.0 } } -impl Debug for EthereumFallbackProvider +impl Debug for EthereumFallbackProvider where C: JsonRpcClient + PrometheusJsonRpcClientConfigExt, { @@ -73,16 +73,17 @@ impl From for ProviderError { #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] -impl JsonRpcClient for EthereumFallbackProvider +impl JsonRpcClient for EthereumFallbackProvider> where C: JsonRpcClient + + Into> + PrometheusJsonRpcClientConfigExt - + Into> + Clone, + JsonRpcBlockGetter: BlockNumberGetter, { type Error = ProviderError; - // TODO: Refactor the reusable parts of this function when implementing the cosmos-specific logic + // TODO: Refactor to use `FallbackProvider::call` #[instrument] async fn request(&self, method: &str, params: T) -> Result where @@ -108,7 +109,7 @@ where let resp = fut.await; self.handle_stalled_provider(priority, provider).await; let _span = - warn_span!("request_with_fallback", fallback_count=%idx, provider_index=%priority.index, ?provider).entered(); + warn_span!("request", fallback_count=%idx, provider_index=%priority.index, ?provider).entered(); match categorize_client_response(method, resp) { IsOk(v) => return Ok(serde_json::from_value(v)?), @@ -125,41 +126,37 @@ where #[cfg(test)] mod tests { use ethers_prometheus::json_rpc_client::{JsonRpcBlockGetter, BLOCK_NUMBER_RPC}; + use hyperlane_core::rpc_clients::test::ProviderMock; use hyperlane_core::rpc_clients::FallbackProviderBuilder; use super::*; - use std::sync::{Arc, Mutex}; #[derive(Debug, Clone)] - struct ProviderMock { - // Store requests as tuples of (method, params) - // Even if the tests were single-threaded, need the arc-mutex - // for interior mutability in `JsonRpcClient::request` - requests: Arc>>, - } + struct EthereumProviderMock(ProviderMock); - impl ProviderMock { - fn new() -> Self { - Self { - requests: Arc::new(Mutex::new(vec![])), - } + impl Deref for EthereumProviderMock { + type Target = ProviderMock; + + fn deref(&self) -> &Self::Target { + &self.0 } + } - fn push(&self, method: &str, params: T) { - self.requests - .lock() - .unwrap() - .push((method.to_owned(), format!("{:?}", params))); + impl Default for EthereumProviderMock { + fn default() -> Self { + Self(ProviderMock::default()) } + } - fn requests(&self) -> Vec<(String, String)> { - self.requests.lock().unwrap().clone() + impl EthereumProviderMock { + fn new(request_sleep: Option) -> Self { + Self(ProviderMock::new(request_sleep)) } } - impl Into> for ProviderMock { - fn into(self) -> Box { - Box::new(JsonRpcBlockGetter::new(self.clone())) + impl Into> for EthereumProviderMock { + fn into(self) -> JsonRpcBlockGetter { + JsonRpcBlockGetter::new(self) } } @@ -171,7 +168,7 @@ mod tests { } #[async_trait] - impl JsonRpcClient for ProviderMock { + impl JsonRpcClient for EthereumProviderMock { type Error = HttpClientError; /// Pushes the `(method, params)` to the back of the `requests` queue, @@ -182,12 +179,14 @@ mod tests { params: T, ) -> Result { self.push(method, params); - sleep(Duration::from_millis(10)).await; + if let Some(sleep_duration) = self.request_sleep() { + sleep(sleep_duration).await; + } dummy_return_value() } } - impl PrometheusJsonRpcClientConfigExt for ProviderMock { + impl PrometheusJsonRpcClientConfigExt for EthereumProviderMock { fn node_host(&self) -> &str { todo!() } @@ -197,35 +196,32 @@ mod tests { } } - async fn get_call_counts(fallback_provider: &FallbackProvider) -> Vec { - fallback_provider - .inner - .priorities - .read() - .await - .iter() - .map(|p| { - let provider = &fallback_provider.inner.providers[p.index]; - provider.requests().len() - }) - .collect() + impl EthereumFallbackProvider> + where + C: JsonRpcClient + + PrometheusJsonRpcClientConfigExt + + Into> + + Clone, + JsonRpcBlockGetter: BlockNumberGetter, + { + async fn low_level_test_call(&self) { + self.request::<_, u64>(BLOCK_NUMBER_RPC, ()).await.unwrap(); + } } #[tokio::test] async fn test_first_provider_is_attempted() { let fallback_provider_builder = FallbackProviderBuilder::default(); let providers = vec![ - ProviderMock::new(), - ProviderMock::new(), - ProviderMock::new(), + EthereumProviderMock::default(), + EthereumProviderMock::default(), + EthereumProviderMock::default(), ]; let fallback_provider = fallback_provider_builder.add_providers(providers).build(); let ethereum_fallback_provider = EthereumFallbackProvider::new(fallback_provider); - ethereum_fallback_provider - .request::<_, u64>(BLOCK_NUMBER_RPC, ()) - .await - .unwrap(); - let provider_call_count: Vec<_> = get_call_counts(ðereum_fallback_provider).await; + ethereum_fallback_provider.low_level_test_call().await; + let provider_call_count: Vec<_> = + ProviderMock::get_call_counts(ðereum_fallback_provider).await; assert_eq!(provider_call_count, vec![1, 0, 0]); } @@ -233,21 +229,18 @@ mod tests { async fn test_one_stalled_provider() { let fallback_provider_builder = FallbackProviderBuilder::default(); let providers = vec![ - ProviderMock::new(), - ProviderMock::new(), - ProviderMock::new(), + EthereumProviderMock::new(Some(Duration::from_millis(10))), + EthereumProviderMock::default(), + EthereumProviderMock::default(), ]; let fallback_provider = fallback_provider_builder .add_providers(providers) .with_max_block_time(Duration::from_secs(0)) .build(); let ethereum_fallback_provider = EthereumFallbackProvider::new(fallback_provider); - ethereum_fallback_provider - .request::<_, u64>(BLOCK_NUMBER_RPC, ()) - .await - .unwrap(); - - let provider_call_count: Vec<_> = get_call_counts(ðereum_fallback_provider).await; + ethereum_fallback_provider.low_level_test_call().await; + let provider_call_count: Vec<_> = + ProviderMock::get_call_counts(ðereum_fallback_provider).await; assert_eq!(provider_call_count, vec![0, 0, 2]); } diff --git a/rust/chains/hyperlane-ethereum/src/trait_builder.rs b/rust/chains/hyperlane-ethereum/src/trait_builder.rs index 31fa128d86..a0ab93af48 100644 --- a/rust/chains/hyperlane-ethereum/src/trait_builder.rs +++ b/rust/chains/hyperlane-ethereum/src/trait_builder.rs @@ -16,8 +16,8 @@ use reqwest::{Client, Url}; use thiserror::Error; use ethers_prometheus::json_rpc_client::{ - JsonRpcClientMetrics, JsonRpcClientMetricsBuilder, NodeInfo, PrometheusJsonRpcClient, - PrometheusJsonRpcClientConfig, + JsonRpcBlockGetter, JsonRpcClientMetrics, JsonRpcClientMetricsBuilder, NodeInfo, + PrometheusJsonRpcClient, PrometheusJsonRpcClientConfig, }; use ethers_prometheus::middleware::{ MiddlewareMetrics, PrometheusMiddleware, PrometheusMiddlewareConf, @@ -116,7 +116,10 @@ pub trait BuildableWithProvider { builder = builder.add_provider(metrics_provider); } let fallback_provider = builder.build(); - let ethereum_fallback_provider = EthereumFallbackProvider::new(fallback_provider); + let ethereum_fallback_provider = EthereumFallbackProvider::< + _, + JsonRpcBlockGetter>, + >::new(fallback_provider); self.build( ethereum_fallback_provider, locator, diff --git a/rust/chains/hyperlane-fuel/Cargo.toml b/rust/chains/hyperlane-fuel/Cargo.toml index 7dabcdd514..82bdbc782e 100644 --- a/rust/chains/hyperlane-fuel/Cargo.toml +++ b/rust/chains/hyperlane-fuel/Cargo.toml @@ -19,7 +19,7 @@ tracing-futures.workspace = true tracing.workspace = true url.workspace = true -hyperlane-core = { path = "../../hyperlane-core" } +hyperlane-core = { path = "../../hyperlane-core", features = ["fallback-provider"]} [build-dependencies] abigen = { path = "../../utils/abigen", features = ["fuels"] } diff --git a/rust/chains/hyperlane-sealevel/Cargo.toml b/rust/chains/hyperlane-sealevel/Cargo.toml index 248e3dfac1..ab4e9b17f6 100644 --- a/rust/chains/hyperlane-sealevel/Cargo.toml +++ b/rust/chains/hyperlane-sealevel/Cargo.toml @@ -24,7 +24,7 @@ tracing.workspace = true url.workspace = true account-utils = { path = "../../sealevel/libraries/account-utils" } -hyperlane-core = { path = "../../hyperlane-core", features = ["solana"] } +hyperlane-core = { path = "../../hyperlane-core", features = ["solana", "fallback-provider"] } hyperlane-sealevel-interchain-security-module-interface = { path = "../../sealevel/libraries/interchain-security-module-interface" } hyperlane-sealevel-mailbox = { path = "../../sealevel/programs/mailbox", features = ["no-entrypoint"] } hyperlane-sealevel-igp = { path = "../../sealevel/programs/hyperlane-sealevel-igp", features = ["no-entrypoint"] } diff --git a/rust/config/mainnet3_config.json b/rust/config/mainnet3_config.json index 65e0e8ebd7..522196bf79 100644 --- a/rust/config/mainnet3_config.json +++ b/rust/config/mainnet3_config.json @@ -516,7 +516,11 @@ "http": "https://rpc-injective.goldenratiostaking.net:443" } ], - "grpcUrl": "https://injective-grpc.publicnode.com/", + "grpcUrls": [ + { + "http": "https://injective-grpc.goldenratiostaking.net:443" + } + ], "canonicalAsset": "inj", "bech32Prefix": "inj", "gasPrice": { @@ -896,4 +900,4 @@ } }, "defaultRpcConsensusType": "fallback" -} +} \ No newline at end of file diff --git a/rust/ethers-prometheus/src/json_rpc_client.rs b/rust/ethers-prometheus/src/json_rpc_client.rs index 7e0c4d1fee..2cc8defe9b 100644 --- a/rust/ethers-prometheus/src/json_rpc_client.rs +++ b/rust/ethers-prometheus/src/json_rpc_client.rs @@ -186,9 +186,11 @@ where } } -impl From> for Box { +impl From> + for JsonRpcBlockGetter> +{ fn from(val: PrometheusJsonRpcClient) -> Self { - Box::new(JsonRpcBlockGetter::new(val)) + JsonRpcBlockGetter::new(val) } } diff --git a/rust/helm/hyperlane-agent/templates/external-secret.yaml b/rust/helm/hyperlane-agent/templates/external-secret.yaml index 5d0eae5ced..36f287eca3 100644 --- a/rust/helm/hyperlane-agent/templates/external-secret.yaml +++ b/rust/helm/hyperlane-agent/templates/external-secret.yaml @@ -29,7 +29,7 @@ spec: {{- if not .disabled }} HYP_CHAINS_{{ .name | upper }}_CUSTOMRPCURLS: {{ printf "'{{ .%s_rpcs | mustFromJson | join \",\" }}'" .name }} {{- if eq .protocol "cosmos" }} - HYP_CHAINS_{{ .name | upper }}_GRPCURL: {{ printf "'{{ .%s_grpc }}'" .name }} + HYP_CHAINS_{{ .name | upper }}_GRPCURLS: {{ printf "'{{ .%s_grpcs | mustFromJson | join \",\" }}'" .name }} {{- end }} {{- end }} {{- end }} @@ -44,9 +44,9 @@ spec: remoteRef: key: {{ printf "%s-rpc-endpoints-%s" $.Values.hyperlane.runEnv .name }} {{- if eq .protocol "cosmos" }} - - secretKey: {{ printf "%s_grpc" .name }} + - secretKey: {{ printf "%s_grpcs" .name }} remoteRef: - key: {{ printf "%s-grpc-endpoint-%s" $.Values.hyperlane.runEnv .name }} + key: {{ printf "%s-grpc-endpoints-%s" $.Values.hyperlane.runEnv .name }} {{- end }} {{- end }} {{- end }} diff --git a/rust/hyperlane-base/src/settings/parser/connection_parser.rs b/rust/hyperlane-base/src/settings/parser/connection_parser.rs index 5d42ce4411..0d47d6eca9 100644 --- a/rust/hyperlane-base/src/settings/parser/connection_parser.rs +++ b/rust/hyperlane-base/src/settings/parser/connection_parser.rs @@ -6,7 +6,7 @@ use url::Url; use crate::settings::envs::*; use crate::settings::ChainConnectionConf; -use super::{parse_cosmos_gas_price, ValueParser}; +use super::{parse_base_and_override_urls, parse_cosmos_gas_price, ValueParser}; pub fn build_ethereum_connection_conf( rpcs: &[Url], @@ -43,19 +43,8 @@ pub fn build_cosmos_connection_conf( err: &mut ConfigParsingError, ) -> Option { let mut local_err = ConfigParsingError::default(); - - let grpc_url = chain - .chain(&mut local_err) - .get_key("grpcUrl") - .parse_string() - .end() - .or_else(|| { - local_err.push( - &chain.cwp + "grpc_url", - eyre!("Missing grpc definitions for chain"), - ); - None - }); + let grpcs = + parse_base_and_override_urls(chain, "grpcUrls", "customGrpcUrls", "http", &mut local_err); let chain_id = chain .chain(&mut local_err) @@ -114,7 +103,7 @@ pub fn build_cosmos_connection_conf( None } else { Some(ChainConnectionConf::Cosmos(h_cosmos::ConnectionConf::new( - grpc_url.unwrap().to_string(), + grpcs, rpcs.first().unwrap().to_string(), chain_id.unwrap().to_string(), prefix.unwrap().to_string(), diff --git a/rust/hyperlane-base/src/settings/parser/mod.rs b/rust/hyperlane-base/src/settings/parser/mod.rs index 76c88fe1b3..3bfb52f91f 100644 --- a/rust/hyperlane-base/src/settings/parser/mod.rs +++ b/rust/hyperlane-base/src/settings/parser/mod.rs @@ -18,6 +18,7 @@ use hyperlane_core::{ use itertools::Itertools; use serde::Deserialize; use serde_json::Value; +use url::Url; pub use self::json_value_parser::ValueParser; pub use super::envs::*; @@ -134,47 +135,7 @@ fn parse_chain( .parse_u32() .unwrap_or(1); - let rpcs_base = chain - .chain(&mut err) - .get_key("rpcUrls") - .into_array_iter() - .map(|urls| { - urls.filter_map(|v| { - v.chain(&mut err) - .get_key("http") - .parse_from_str("Invalid http url") - .end() - }) - .collect_vec() - }) - .unwrap_or_default(); - - let rpc_overrides = chain - .chain(&mut err) - .get_opt_key("customRpcUrls") - .parse_string() - .end() - .map(|urls| { - urls.split(',') - .filter_map(|url| { - url.parse() - .take_err(&mut err, || &chain.cwp + "customRpcUrls") - }) - .collect_vec() - }); - - let rpcs = rpc_overrides.unwrap_or(rpcs_base); - - if rpcs.is_empty() { - err.push( - &chain.cwp + "rpc_urls", - eyre!("Missing base rpc definitions for chain"), - ); - err.push( - &chain.cwp + "custom_rpc_urls", - eyre!("Also missing rpc overrides for chain"), - ); - } + let rpcs = parse_base_and_override_urls(&chain, "rpcUrls", "customRpcUrls", "http", &mut err); let from = chain .chain(&mut err) @@ -418,3 +379,66 @@ fn parse_cosmos_gas_price(gas_price: ValueParser) -> ConfigResult Vec { + chain + .chain(err) + .get_key(key) + .into_array_iter() + .map(|urls| { + urls.filter_map(|v| { + v.chain(err) + .get_key(protocol) + .parse_from_str("Invalid url") + .end() + }) + .collect_vec() + }) + .unwrap_or_default() +} + +fn parse_custom_urls( + chain: &ValueParser, + key: &str, + err: &mut ConfigParsingError, +) -> Option> { + chain + .chain(err) + .get_opt_key(key) + .parse_string() + .end() + .map(|urls| { + urls.split(',') + .filter_map(|url| url.parse().take_err(err, || &chain.cwp + "customGrpcUrls")) + .collect_vec() + }) +} + +fn parse_base_and_override_urls( + chain: &ValueParser, + base_key: &str, + override_key: &str, + protocol: &str, + err: &mut ConfigParsingError, +) -> Vec { + let base = parse_urls(chain, base_key, protocol, err); + let overrides = parse_custom_urls(chain, override_key, err); + let combined = overrides.unwrap_or(base); + + if combined.is_empty() { + err.push( + &chain.cwp + "rpc_urls", + eyre!("Missing base rpc definitions for chain"), + ); + err.push( + &chain.cwp + "custom_rpc_urls", + eyre!("Also missing rpc overrides for chain"), + ); + } + combined +} diff --git a/rust/hyperlane-core/Cargo.toml b/rust/hyperlane-core/Cargo.toml index 8329bce5f2..40468bef6c 100644 --- a/rust/hyperlane-core/Cargo.toml +++ b/rust/hyperlane-core/Cargo.toml @@ -37,6 +37,7 @@ serde_json = { workspace = true } sha3 = { workspace = true } strum = { workspace = true, optional = true, features = ["derive"] } thiserror = { workspace = true } +tokio = { workspace = true, optional = true, features = ["rt", "time"] } tracing.workspace = true primitive-types = { workspace = true, optional = true } solana-sdk = { workspace = true, optional = true } @@ -54,3 +55,4 @@ agent = ["ethers", "strum"] strum = ["dep:strum"] ethers = ["dep:ethers-core", "dep:ethers-contract", "dep:ethers-providers", "dep:primitive-types"] solana = ["dep:solana-sdk"] +fallback-provider = ["tokio"] diff --git a/rust/hyperlane-core/src/rpc_clients/fallback.rs b/rust/hyperlane-core/src/rpc_clients/fallback.rs index 6adcb76f55..6f75cbc4c8 100644 --- a/rust/hyperlane-core/src/rpc_clients/fallback.rs +++ b/rust/hyperlane-core/src/rpc_clients/fallback.rs @@ -1,15 +1,22 @@ use async_rwlock::RwLock; use async_trait::async_trait; use derive_new::new; +use itertools::Itertools; use std::{ - fmt::Debug, + fmt::{Debug, Formatter}, + future::Future, + marker::PhantomData, + pin::Pin, sync::Arc, time::{Duration, Instant}, }; -use tracing::info; +use tokio; +use tracing::{info, trace, warn_span}; use crate::ChainCommunicationError; +use super::RpcClientError; + /// Read the current block number from a chain. #[async_trait] pub trait BlockNumberGetter: Send + Sync + Debug { @@ -38,7 +45,6 @@ impl PrioritizedProviderInner { } } } - /// Sub-providers and priority information pub struct PrioritizedProviders { /// Unsorted list of providers this provider calls @@ -49,28 +55,56 @@ pub struct PrioritizedProviders { /// A provider that bundles multiple providers and attempts to call the first, /// then the second, and so on until a response is received. -pub struct FallbackProvider { +/// +/// Although no trait bounds are used in the struct definition, the intended purpose of `B` +/// is to be bound by `BlockNumberGetter` and have `T` be convertible to `B`. That is, +/// inner providers should be able to get the current block number, or be convertible into +/// something that is. +pub struct FallbackProvider { /// The sub-providers called by this provider pub inner: Arc>, max_block_time: Duration, + _phantom: PhantomData, } -impl Clone for FallbackProvider { +impl Clone for FallbackProvider { fn clone(&self) -> Self { Self { inner: self.inner.clone(), max_block_time: self.max_block_time, + _phantom: PhantomData, } } } -impl FallbackProvider +impl Debug for FallbackProvider +where + T: Debug, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + // iterate the inner providers and write them to the formatter + f.debug_struct("FallbackProvider") + .field( + "providers", + &self + .inner + .providers + .iter() + .map(|v| format!("{:?}", v)) + .join(", "), + ) + .finish() + } +} + +impl FallbackProvider where - T: Into> + Debug + Clone, + T: Into + Debug + Clone, + B: BlockNumberGetter, { /// Convenience method for creating a `FallbackProviderBuilder` with same /// `JsonRpcClient` types - pub fn builder() -> FallbackProviderBuilder { + pub fn builder() -> FallbackProviderBuilder { FallbackProviderBuilder::default() } @@ -112,7 +146,7 @@ where return; } - let block_getter: Box = provider.clone().into(); + let block_getter: B = provider.clone().into(); let current_block_height = block_getter .get_block_number() .await @@ -130,25 +164,62 @@ where .await; } } + + /// Call the first provider, then the second, and so on (in order of priority) until a response is received. + /// If all providers fail, return an error. + pub async fn call( + &self, + mut f: impl FnMut(T) -> Pin> + Send>>, + ) -> Result { + let mut errors = vec![]; + // make sure we do at least 4 total retries. + while errors.len() <= 3 { + if !errors.is_empty() { + tokio::time::sleep(Duration::from_millis(100)).await; + } + let priorities_snapshot = self.take_priorities_snapshot().await; + for (idx, priority) in priorities_snapshot.iter().enumerate() { + let provider = &self.inner.providers[priority.index]; + let resp = f(provider.clone()).await; + self.handle_stalled_provider(priority, provider).await; + let _span = + warn_span!("FallbackProvider::call", fallback_count=%idx, provider_index=%priority.index, ?provider).entered(); + match resp { + Ok(v) => return Ok(v), + Err(e) => { + trace!( + error=?e, + "Got error from inner fallback provider", + ); + errors.push(e) + } + } + } + } + + Err(RpcClientError::FallbackProvidersFailed(errors).into()) + } } /// Builder to create a new fallback provider. #[derive(Debug, Clone)] -pub struct FallbackProviderBuilder { +pub struct FallbackProviderBuilder { providers: Vec, max_block_time: Duration, + _phantom: PhantomData, } -impl Default for FallbackProviderBuilder { +impl Default for FallbackProviderBuilder { fn default() -> Self { Self { providers: Vec::new(), max_block_time: MAX_BLOCK_TIME, + _phantom: PhantomData, } } } -impl FallbackProviderBuilder { +impl FallbackProviderBuilder { /// Add a new provider to the set. Each new provider will be a lower /// priority than the previous. pub fn add_provider(mut self, provider: T) -> Self { @@ -170,7 +241,7 @@ impl FallbackProviderBuilder { } /// Create a fallback provider. - pub fn build(self) -> FallbackProvider { + pub fn build(self) -> FallbackProvider { let provider_count = self.providers.len(); let prioritized_providers = PrioritizedProviders { providers: self.providers, @@ -184,6 +255,80 @@ impl FallbackProviderBuilder { FallbackProvider { inner: Arc::new(prioritized_providers), max_block_time: self.max_block_time, + _phantom: PhantomData, + } + } +} + +/// Utilities to import when testing chain-specific fallback providers +pub mod test { + use super::*; + use std::{ + ops::Deref, + sync::{Arc, Mutex}, + }; + + /// Provider that stores requests and optionally sleeps before returning a dummy value + #[derive(Debug, Clone)] + pub struct ProviderMock { + // Store requests as tuples of (method, params) + // Even if the tests were single-threaded, need the arc-mutex + // for interior mutability in `JsonRpcClient::request` + requests: Arc>>, + request_sleep: Option, + } + + impl Default for ProviderMock { + fn default() -> Self { + Self { + requests: Arc::new(Mutex::new(vec![])), + request_sleep: None, + } + } + } + + impl ProviderMock { + /// Create a new provider + pub fn new(request_sleep: Option) -> Self { + Self { + request_sleep, + ..Default::default() + } + } + + /// Push a request to the internal store for later inspection + pub fn push(&self, method: &str, params: T) { + self.requests + .lock() + .unwrap() + .push((method.to_owned(), format!("{:?}", params))); + } + + /// Get the stored requests + pub fn requests(&self) -> Vec<(String, String)> { + self.requests.lock().unwrap().clone() + } + + /// Set the sleep duration + pub fn request_sleep(&self) -> Option { + self.request_sleep + } + + /// Get how many times each provider was called + pub async fn get_call_counts, B>( + fallback_provider: &FallbackProvider, + ) -> Vec { + fallback_provider + .inner + .priorities + .read() + .await + .iter() + .map(|p| { + let provider = &fallback_provider.inner.providers[p.index]; + provider.requests().len() + }) + .collect() } } } diff --git a/rust/hyperlane-core/src/rpc_clients/mod.rs b/rust/hyperlane-core/src/rpc_clients/mod.rs index 78851f9f26..02aaae99f5 100644 --- a/rust/hyperlane-core/src/rpc_clients/mod.rs +++ b/rust/hyperlane-core/src/rpc_clients/mod.rs @@ -1,4 +1,8 @@ -pub use self::{error::*, fallback::*}; +pub use self::error::*; + +#[cfg(feature = "fallback-provider")] +pub use self::fallback::*; mod error; +#[cfg(feature = "fallback-provider")] mod fallback; diff --git a/rust/utils/abigen/src/lib.rs b/rust/utils/abigen/src/lib.rs index 2e8d5ca081..b4b7970fdc 100644 --- a/rust/utils/abigen/src/lib.rs +++ b/rust/utils/abigen/src/lib.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "fuels")] use fuels_code_gen::ProgramType; use std::collections::BTreeSet; use std::ffi::OsStr; diff --git a/rust/utils/run-locally/src/cosmos/types.rs b/rust/utils/run-locally/src/cosmos/types.rs index 795d12ff39..ed95331cea 100644 --- a/rust/utils/run-locally/src/cosmos/types.rs +++ b/rust/utils/run-locally/src/cosmos/types.rs @@ -118,7 +118,7 @@ pub struct AgentConfig { pub protocol: String, pub chain_id: String, pub rpc_urls: Vec, - pub grpc_url: String, + pub grpc_urls: Vec, pub bech32_prefix: String, pub signer: AgentConfigSigner, pub index: AgentConfigIndex, @@ -156,7 +156,15 @@ impl AgentConfig { network.launch_resp.endpoint.rpc_addr.replace("tcp://", "") ), }], - grpc_url: format!("http://{}", network.launch_resp.endpoint.grpc_addr), + grpc_urls: vec![ + // The first url points to a nonexistent node, but is used for checking fallback provider logic + AgentUrl { + http: "localhost:1337".to_string(), + }, + AgentUrl { + http: format!("http://{}", network.launch_resp.endpoint.grpc_addr), + }, + ], bech32_prefix: "osmo".to_string(), signer: AgentConfigSigner { typ: "cosmosKey".to_string(), diff --git a/typescript/sdk/src/metadata/agentConfig.ts b/typescript/sdk/src/metadata/agentConfig.ts index e7133b4c4a..5aa89687ff 100644 --- a/typescript/sdk/src/metadata/agentConfig.ts +++ b/typescript/sdk/src/metadata/agentConfig.ts @@ -124,7 +124,7 @@ export const AgentChainMetadataSchema = ChainMetadataSchemaObject.merge( .string() .optional() .describe( - 'Specify a comma seperated list of custom RPC URLs to use for this chain. If not specified, the default RPC urls will be used.', + 'Specify a comma separated list of custom RPC URLs to use for this chain. If not specified, the default RPC urls will be used.', ), rpcConsensusType: z .nativeEnum(RpcConsensusType) diff --git a/typescript/sdk/src/metadata/chainMetadataTypes.ts b/typescript/sdk/src/metadata/chainMetadataTypes.ts index b13cea0349..565a151af0 100644 --- a/typescript/sdk/src/metadata/chainMetadataTypes.ts +++ b/typescript/sdk/src/metadata/chainMetadataTypes.ts @@ -115,6 +115,12 @@ export const ChainMetadataSchemaObject = z.object({ .array(RpcUrlSchema) .describe('For cosmos chains only, a list of gRPC API URLs') .optional(), + customGrpcUrls: z + .string() + .optional() + .describe( + 'Specify a comma separated list of custom GRPC URLs to use for this chain. If not specified, the default GRPC urls will be used.', + ), blockExplorers: z .array( z.object({