Skip to content

Commit

Permalink
feat: Cosmos RPC client supporting fallback rpcs (#4752)
Browse files Browse the repository at this point in the history
### Description

Added fallback functionality to the Cosmos RPC client

### Drive-by changes

N/A

### Related issues

- Fixes #3425

### Backward compatibility

Yes

### Testing

Unit tests already in place, no additional added, as the generic
CosmosFallbackProvider which is used is already tested
  • Loading branch information
Mantas-M authored Oct 29, 2024
1 parent d9505ab commit 8ff194d
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 45 deletions.
28 changes: 23 additions & 5 deletions rust/main/chains/hyperlane-cosmos/src/providers/cosmos/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use cosmrs::crypto::PublicKey;
use cosmrs::proto::traits::Message;
use cosmrs::tx::{MessageExt, SequenceNumber, SignerInfo, SignerPublicKey};
use cosmrs::{proto, AccountId, Any, Coin, Tx};
use hyperlane_core::rpc_clients::FallbackProvider;
use itertools::{any, cloned, Itertools};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
Expand All @@ -25,6 +26,7 @@ use hyperlane_core::{
use crate::grpc::{WasmGrpcProvider, WasmProvider};
use crate::providers::cosmos::provider::parse::PacketData;
use crate::providers::rpc::CosmosRpcClient;
use crate::rpc_clients::CosmosFallbackProvider;
use crate::{
ConnectionConf, CosmosAccountId, CosmosAddress, CosmosAmount, HyperlaneCosmosError, Signer,
};
Expand All @@ -43,7 +45,7 @@ pub struct CosmosProvider {
domain: HyperlaneDomain,
connection_conf: ConnectionConf,
grpc_provider: WasmGrpcProvider,
rpc_client: CosmosRpcClient,
rpc_client: CosmosFallbackProvider<CosmosRpcClient>,
}

impl CosmosProvider {
Expand All @@ -62,13 +64,21 @@ impl CosmosProvider {
locator,
signer,
)?;
let rpc_client = CosmosRpcClient::new(&conf)?;

let providers = conf
.get_rpc_urls()
.iter()
.map(CosmosRpcClient::new)
.collect::<Result<Vec<_>, _>>()?;
let provider = CosmosFallbackProvider::new(
FallbackProvider::builder().add_providers(providers).build(),
);

Ok(Self {
domain,
connection_conf: conf,
grpc_provider,
rpc_client,
rpc_client: provider,
})
}

Expand Down Expand Up @@ -362,7 +372,10 @@ impl HyperlaneChain for CosmosProvider {
#[async_trait]
impl HyperlaneProvider for CosmosProvider {
async fn get_block_by_height(&self, height: u64) -> ChainResult<BlockInfo> {
let response = self.rpc_client.get_block(height as u32).await?;
let response = self
.rpc_client
.call(|provider| Box::pin(async move { provider.get_block(height as u32).await }))
.await?;

let block = response.block;
let block_height = block.header.height.value();
Expand Down Expand Up @@ -392,7 +405,12 @@ impl HyperlaneProvider for CosmosProvider {
let tendermint_hash = Hash::from_bytes(Algorithm::Sha256, hash.as_bytes())
.expect("transaction hash should be of correct size");

let response = self.rpc_client.get_tx_by_hash(tendermint_hash).await?;
let response = self
.rpc_client
.call(|provider| {
Box::pin(async move { provider.get_tx_by_hash(tendermint_hash).await })
})
.await?;

let received_hash = H256::from_slice(response.hash.as_bytes());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fn provider(address: &str) -> WasmGrpcProvider {
domain.clone(),
ConnectionConf::new(
vec![Url::parse("http://grpc-kralum.neutron-1.neutron.org:80").unwrap()],
"https://rpc-kralum.neutron-1.neutron.org".to_owned(),
vec![Url::parse("https://rpc-kralum.neutron-1.neutron.org").unwrap()],
"neutron-1".to_owned(),
"neutron".to_owned(),
"untrn".to_owned(),
Expand Down
37 changes: 25 additions & 12 deletions rust/main/chains/hyperlane-cosmos/src/providers/rpc/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use cosmrs::proto::tendermint::blocksync::BlockResponse;
use hyperlane_core::rpc_clients::BlockNumberGetter;
use tendermint::Hash;
use tendermint_rpc::client::CompatMode;
use tendermint_rpc::endpoint::{block, block_by_hash, block_results, tx};
use tendermint_rpc::{Client, HttpClient};
use tendermint_rpc::{Client, HttpClient, HttpClientUrl, Url as TendermintUrl};

use hyperlane_core::ChainResult;
use hyperlane_core::{ChainCommunicationError, ChainResult};
use tonic::async_trait;
use url::Url;

use crate::{ConnectionConf, HyperlaneCosmosError};

Expand All @@ -16,16 +19,17 @@ pub struct CosmosRpcClient {

impl CosmosRpcClient {
/// Create new `CosmosRpcClient`
pub fn new(conf: &ConnectionConf) -> ChainResult<Self> {
let client = HttpClient::builder(
conf.get_rpc_url()
.parse()
.map_err(Into::<HyperlaneCosmosError>::into)?,
)
// Consider supporting different compatibility modes.
.compat_mode(CompatMode::latest())
.build()
.map_err(Into::<HyperlaneCosmosError>::into)?;
pub fn new(url: &Url) -> ChainResult<Self> {
let tendermint_url = tendermint_rpc::Url::try_from(url.to_owned())
.map_err(Into::<HyperlaneCosmosError>::into)?;
let url = tendermint_rpc::HttpClientUrl::try_from(tendermint_url)
.map_err(Into::<HyperlaneCosmosError>::into)?;

let client = HttpClient::builder(url)
// Consider supporting different compatibility modes.
.compat_mode(CompatMode::latest())
.build()
.map_err(Into::<HyperlaneCosmosError>::into)?;

Ok(Self { client })
}
Expand Down Expand Up @@ -75,3 +79,12 @@ impl CosmosRpcClient {
.map_err(Into::<HyperlaneCosmosError>::into)?)
}
}

#[async_trait]
impl BlockNumberGetter for CosmosRpcClient {
async fn get_block_number(&self) -> Result<u64, ChainCommunicationError> {
self.get_latest_block()
.await
.map(|block| block.block.header.height.value())
}
}
55 changes: 37 additions & 18 deletions rust/main/chains/hyperlane-cosmos/src/providers/rpc/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use async_trait::async_trait;
use cosmrs::cosmwasm::MsgExecuteContract;
use cosmrs::rpc::client::Client;
use futures::StreamExt;
use hyperlane_core::rpc_clients::{BlockNumberGetter, FallbackProvider};
use sha256::digest;
use tendermint::abci::{Event, EventAttribute};
use tendermint::hash::Algorithm;
use tendermint::Hash;
use tendermint_rpc::client::CompatMode;
use tendermint_rpc::endpoint::block::Response as BlockResponse;
use tendermint_rpc::endpoint::block_results::Response as BlockResultsResponse;
use tendermint_rpc::endpoint::block_results::{self, Response as BlockResultsResponse};
use tendermint_rpc::endpoint::tx;
use tendermint_rpc::HttpClient;
use time::OffsetDateTime;
Expand All @@ -21,6 +22,7 @@ use hyperlane_core::{
};

use crate::rpc::CosmosRpcClient;
use crate::rpc_clients::CosmosFallbackProvider;
use crate::{ConnectionConf, CosmosAddress, CosmosProvider, HyperlaneCosmosError};

#[async_trait]
Expand Down Expand Up @@ -79,7 +81,7 @@ pub struct CosmosWasmRpcProvider {
contract_address: CosmosAddress,
target_event_kind: String,
reorg_period: u32,
rpc_client: CosmosRpcClient,
rpc_client: CosmosFallbackProvider<CosmosRpcClient>,
}

impl CosmosWasmRpcProvider {
Expand All @@ -92,7 +94,15 @@ impl CosmosWasmRpcProvider {
event_type: String,
reorg_period: u32,
) -> ChainResult<Self> {
let rpc_client = CosmosRpcClient::new(&conf)?;
let providers = conf
.get_rpc_urls()
.iter()
.map(CosmosRpcClient::new)
.collect::<Result<Vec<_>, _>>()?;
let mut builder = FallbackProvider::builder();
builder = builder.add_providers(providers);
let fallback_provider = builder.build();
let provider = CosmosFallbackProvider::new(fallback_provider);

Ok(Self {
domain: locator.domain.clone(),
Expand All @@ -103,9 +113,15 @@ impl CosmosWasmRpcProvider {
)?,
target_event_kind: format!("{}-{}", Self::WASM_TYPE, event_type),
reorg_period,
rpc_client,
rpc_client: provider,
})
}

async fn get_block(&self, height: u32) -> ChainResult<BlockResponse> {
self.rpc_client
.call(|provider| Box::pin(async move { provider.get_block(height).await }))
.await
}
}

impl CosmosWasmRpcProvider {
Expand Down Expand Up @@ -222,7 +238,10 @@ impl WasmRpcProvider for CosmosWasmRpcProvider {
#[instrument(err, skip(self))]
#[allow(clippy::blocks_in_conditions)] // TODO: `rustc` 1.80.1 clippy issue
async fn get_finalized_block_number(&self) -> ChainResult<u32> {
let latest_block = self.rpc_client.get_latest_block().await?;
let latest_block = self
.rpc_client
.call(|provider| Box::pin(async move { provider.get_latest_block().await }))
.await?;
let latest_height: u32 = latest_block
.block
.header
Expand All @@ -244,15 +263,16 @@ impl WasmRpcProvider for CosmosWasmRpcProvider {
where
T: Send + Sync + PartialEq + Debug + 'static,
{
debug!(?block_number, cursor_label, domain=?self.domain, "Getting logs in block");

// The two calls below could be made in parallel, but on cosmos rate limiting is a bigger problem
// than indexing latency, so we do them sequentially.
let block = self.rpc_client.get_block(block_number).await?;

let block = self.get_block(block_number).await?;
debug!(?block_number, block_hash = ?block.block_id.hash, cursor_label, domain=?self.domain, "Getting logs in block with hash");

let block_results = self.rpc_client.get_block_results(block_number).await?;
let block_results = self
.rpc_client
.call(|provider| {
Box::pin(async move { provider.get_block_results(block_number).await })
})
.await?;

Ok(self.handle_txs(block, block_results, parser, cursor_label))
}
Expand All @@ -268,17 +288,16 @@ impl WasmRpcProvider for CosmosWasmRpcProvider {
where
T: Send + Sync + PartialEq + Debug + 'static,
{
debug!(?hash, cursor_label, domain=?self.domain, "Getting logs in transaction");

let tx = self.rpc_client.get_tx_by_hash(hash).await?;

let tx = self
.rpc_client
.call(|provider| Box::pin(async move { provider.get_tx_by_hash(hash).await }))
.await?;
let block_number = tx.height.value() as u32;
let block = self.rpc_client.get_block(block_number).await?;
let block = self.get_block(block_number).await?;
let block_hash = H256::from_slice(block.block_id.hash.as_bytes());

debug!(?block_number, block_hash = ?block.block_id.hash, cursor_label, domain=?self.domain, "Getting logs in transaction: block info");

let block_hash = H256::from_slice(block.block_id.hash.as_bytes());

Ok(self.handle_tx(tx, block_hash, parser).collect())
}
}
14 changes: 7 additions & 7 deletions rust/main/chains/hyperlane-cosmos/src/trait_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use hyperlane_core::{config::OperationBatchConfig, ChainCommunicationError, Fixe
/// Cosmos connection configuration
#[derive(Debug, Clone)]
pub struct ConnectionConf {
/// The GRPC url to connect to
/// The GRPC urls to connect to
grpc_urls: Vec<Url>,
/// The RPC url to connect to
rpc_url: String,
rpc_urls: Vec<Url>,
/// The chain ID
chain_id: String,
/// The human readable address prefix for the chains using bech32.
Expand Down Expand Up @@ -95,9 +95,9 @@ impl ConnectionConf {
self.grpc_urls.clone()
}

/// Get the RPC url
pub fn get_rpc_url(&self) -> String {
self.rpc_url.clone()
/// Get the RPC urls
pub fn get_rpc_urls(&self) -> Vec<Url> {
self.rpc_urls.clone()
}

/// Get the chain ID
Expand Down Expand Up @@ -134,7 +134,7 @@ impl ConnectionConf {
#[allow(clippy::too_many_arguments)]
pub fn new(
grpc_urls: Vec<Url>,
rpc_url: String,
rpc_urls: Vec<Url>,
chain_id: String,
bech32_prefix: String,
canonical_asset: String,
Expand All @@ -145,7 +145,7 @@ impl ConnectionConf {
) -> Self {
Self {
grpc_urls,
rpc_url,
rpc_urls,
chain_id,
bech32_prefix,
canonical_asset,
Expand Down
3 changes: 2 additions & 1 deletion rust/main/chains/hyperlane-cosmos/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use cosmrs::proto::cosmos::base::abci::v1beta1::TxResponse;
use cosmrs::proto::{cosmos::base::abci::v1beta1::TxResponse, tendermint::Error};
use hyperlane_core::{ChainResult, ModuleType, TxOutcome, H256, U256};
use url::Url;

pub struct IsmType(pub hyperlane_cosmwasm_interface::ism::IsmType);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub fn build_cosmos_connection_conf(
} else {
Some(ChainConnectionConf::Cosmos(h_cosmos::ConnectionConf::new(
grpcs,
rpcs.first().unwrap().to_string(),
rpcs.to_owned(),
chain_id.unwrap().to_string(),
prefix.unwrap().to_string(),
canonical_asset.unwrap(),
Expand Down

0 comments on commit 8ff194d

Please sign in to comment.