Skip to content

Commit

Permalink
Downgrade back to Tokio 0.2 (#417)
Browse files Browse the repository at this point in the history
* Downgrade back to Tokio 0.2

* Update changelog

* Revert to tendermint-rs 0.17.0-rc3
  • Loading branch information
romac authored Nov 25, 2020
1 parent db04e24 commit db44b16
Show file tree
Hide file tree
Showing 18 changed files with 116 additions and 100 deletions.
2 changes: 0 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ Special thanks to external contributors for this release: @CharlyCst ([#347]).

### FEATURES

- Update Tokio to version 0.3 ([#402])
- Update to tendermint-rs version `0.17-RC3` ([#403])
- [changelog] Added "unreleased" section in `CHANGELOG.MD` to help streamline releases ([#274])
- [relayer] Integrate relayer spike into relayer crate ([#335])
Expand Down Expand Up @@ -48,7 +47,6 @@ Special thanks to external contributors for this release: @CharlyCst ([#347]).
[#373]: https://github.com/informalsystems/ibc-rs/issues/373
[#374]: https://github.com/informalsystems/ibc-rs/issues/374
[#389]: https://github.com/informalsystems/ibc-rs/issues/389
[#402]: https://github.com/informalsystems/ibc-rs/issues/402
[#403]: https://github.com/informalsystems/ibc-rs/issues/403
[proto-compiler]: https://github.com/informalsystems/ibc-rs/tree/master/proto-compiler

Expand Down
15 changes: 0 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,3 @@ members = [
exclude = [
"proto-compiler"
]

[patch.crates-io]
tendermint = { git = "https://github.com/informalsystems/tendermint-rs", branch = "romac/prost-dev" }
tendermint-rpc = { git = "https://github.com/informalsystems/tendermint-rs", branch = "romac/prost-dev" }
tendermint-proto = { git = "https://github.com/informalsystems/tendermint-rs", branch = "romac/prost-dev" }
tendermint-light-client = { git = "https://github.com/informalsystems/tendermint-rs", branch = "romac/prost-dev" }

bytes = { git = "https://github.com/tokio-rs/bytes", tag = "v0.6.0" }

tonic = { git = "https://github.com/romac/tonic", branch = "romac/hyper-0.14-dev" }

prost = { git = "https://github.com/danburkert/prost", branch = "master" }
prost-types = { git = "https://github.com/danburkert/prost", branch = "master" }
prost-build = { git = "https://github.com/danburkert/prost", branch = "master" }
prost-derive = { git = "https://github.com/danburkert/prost", branch = "master" }
2 changes: 1 addition & 1 deletion modules/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ version = "0.17.0-rc2"
optional = true

[dev-dependencies]
tokio = { version = "0.3", features = ["macros"] }
tokio = { version = "0.2", features = ["macros"] }
subtle-encoding = { version = "0.5" }
tendermint-testgen = { version = "0.17.0-rc2" } # Needed for generating (synthetic) light blocks.
2 changes: 1 addition & 1 deletion relayer-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ anomaly = "0.2.0"
gumdrop = "0.7"
serde = { version = "1", features = ["serde_derive"] }
thiserror = "1"
tokio = { version = "0.3", features = ["rt", "rt-multi-thread", "time", "stream", "sync"] }
tokio = { version = "0.2", features = ["full"] }
tracing = "0.1.13"
tracing-subscriber = "0.2.3"
futures = "0.3.5"
Expand Down
12 changes: 8 additions & 4 deletions relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{ops::Deref, sync::Arc, thread};
use std::{
ops::Deref,
sync::{Arc, Mutex},
thread,
};

use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable};
use crossbeam_channel as channel;
Expand All @@ -17,7 +21,7 @@ pub struct ListenCmd {

impl ListenCmd {
fn cmd(&self) -> Result<(), BoxError> {
let rt = Arc::new(TokioRuntime::new().unwrap());
let rt = Arc::new(Mutex::new(TokioRuntime::new()?));
let config = app_config().clone();

let chain_id = self.chain_id.clone().unwrap();
Expand All @@ -39,7 +43,7 @@ impl Runnable for ListenCmd {
}

/// Listen to events
pub fn listen(rt: Arc<TokioRuntime>, config: ChainConfig) -> Result<(), BoxError> {
pub fn listen(rt: Arc<Mutex<TokioRuntime>>, config: ChainConfig) -> Result<(), BoxError> {
info!(chain.id = %config.id, "spawning event monitor for");

let (event_monitor, rx) = subscribe(config, rt)?;
Expand All @@ -54,7 +58,7 @@ pub fn listen(rt: Arc<TokioRuntime>, config: ChainConfig) -> Result<(), BoxError

fn subscribe(
chain_config: ChainConfig,
rt: Arc<tokio::runtime::Runtime>,
rt: Arc<Mutex<TokioRuntime>>,
) -> Result<(EventMonitor, channel::Receiver<EventBatch>), BoxError> {
let (mut event_monitor, rx) = EventMonitor::new(chain_config.id, chain_config.rpc_addr, rt)
.map_err(|e| format!("couldn't initialize event monitor: {}", e))?;
Expand Down
4 changes: 2 additions & 2 deletions relayer-cli/src/commands/query/channel.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use abscissa_core::{Command, Options, Runnable};
use tokio::runtime::Runtime as TokioRuntime;
Expand Down Expand Up @@ -98,7 +98,7 @@ impl Runnable for QueryChannelEndCmd {
// run without proof:
// cargo run --bin relayer -- -c relayer/tests/config/fixtures/simple_config.toml query channel end ibc-test firstport firstchannel --height 3 -p false

let rt = Arc::new(TokioRuntime::new().unwrap());
let rt = Arc::new(Mutex::new(TokioRuntime::new().unwrap()));

let chain = CosmosSDKChain::from_config(chain_config, rt).unwrap();
let height = ibc::Height::new(chain.id().version(), opts.height);
Expand Down
8 changes: 4 additions & 4 deletions relayer-cli/src/commands/query/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use abscissa_core::{Command, Options, Runnable};
use tokio::runtime::Runtime as TokioRuntime;
Expand Down Expand Up @@ -78,7 +78,7 @@ impl Runnable for QueryClientStateCmd {
};
status_info!("Options", "{:?}", opts);

let rt = Arc::new(TokioRuntime::new().unwrap());
let rt = Arc::new(Mutex::new(TokioRuntime::new().unwrap()));
let chain = CosmosSDKChain::from_config(chain_config, rt).unwrap();
let height = ibc::Height::new(chain.id().version(), opts.height);

Expand Down Expand Up @@ -171,7 +171,7 @@ impl Runnable for QueryClientConsensusCmd {
};
status_info!("Options", "{:?}", opts);

let rt = Arc::new(TokioRuntime::new().unwrap());
let rt = Arc::new(Mutex::new(TokioRuntime::new().unwrap()));
let chain = CosmosSDKChain::from_config(chain_config, rt).unwrap();
let height = ibc::Height::new(chain.id().version(), opts.height);

Expand Down Expand Up @@ -285,7 +285,7 @@ impl Runnable for QueryClientConnectionsCmd {
};
status_info!("Options", "{:?}", opts);

let rt = Arc::new(TokioRuntime::new().unwrap());
let rt = Arc::new(Mutex::new(TokioRuntime::new().unwrap()));
let chain = CosmosSDKChain::from_config(chain_config, rt).unwrap();
let height = ibc::Height::new(chain.id().version(), opts.height);

Expand Down
4 changes: 2 additions & 2 deletions relayer-cli/src/commands/query/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use abscissa_core::{Command, Options, Runnable};
use tokio::runtime::Runtime as TokioRuntime;
Expand Down Expand Up @@ -82,7 +82,7 @@ impl Runnable for QueryConnectionEndCmd {
};
status_info!("Options", "{:?}", opts);

let rt = Arc::new(TokioRuntime::new().unwrap());
let rt = Arc::new(Mutex::new(TokioRuntime::new().unwrap()));
let chain = CosmosSDKChain::from_config(chain_config, rt).unwrap();
let height = ibc::Height::new(chain.id().version(), opts.height);

Expand Down
2 changes: 1 addition & 1 deletion relayer-cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl StartCmd {

impl Runnable for StartCmd {
fn run(&self) {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut rt = tokio::runtime::Runtime::new().unwrap();

rt.block_on(async move {
self.cmd()
Expand Down
6 changes: 3 additions & 3 deletions relayer-cli/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
unused_qualifications
)]

use std::str::FromStr;
use std::sync::Arc;
use std::{str::FromStr, sync::Mutex};

use ibc::ics02_client::raw::ConnectionIds as DomainTypeClientConnections;
use ibc::ics04_channel::channel::{ChannelEnd, Order, State as ChannelState};
Expand Down Expand Up @@ -45,8 +45,8 @@ fn simd_config() -> Config {

/// Chain created for the informaldev/simd DockerHub image running on localhost.
fn simd_chain() -> CosmosSDKChain {
let rt = tokio::runtime::Runtime::new().unwrap();
CosmosSDKChain::from_config(simd_config().chains[0].clone(), Arc::new(rt)).unwrap()
let rt = Arc::new(Mutex::new(tokio::runtime::Runtime::new().unwrap()));
CosmosSDKChain::from_config(simd_config().chains[0].clone(), rt).unwrap()
}

/// Query connection ID. Requires the informaldev/simd Docker image running on localhost.
Expand Down
2 changes: 1 addition & 1 deletion relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ sled = { version = "0.34.4", features = ["no_metrics", "no_logs"] }
thiserror = "1.0.11"
toml = "0.5"
tracing = "0.1.13"
tokio = { version = "0.3", features = ["rt", "rt-multi-thread", "time", "stream", "sync"] }
tokio = { version = "0.2", features = ["rt-core", "time", "stream", "sync"] }
serde_json = { version = "1" }
bytes = "0.6.0"
prost = "0.6.1"
Expand Down
34 changes: 17 additions & 17 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::convert::{TryFrom, TryInto};
use std::future::Future;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{convert::TryFrom, convert::TryInto, sync::Arc};

use anomaly::fail;
use bitcoin::hashes::hex::ToHex;
Expand Down Expand Up @@ -55,12 +57,12 @@ use tonic::codegen::http::Uri;
pub struct CosmosSDKChain {
config: ChainConfig,
rpc_client: HttpClient,
rt: Arc<TokioRuntime>,
rt: Arc<Mutex<TokioRuntime>>,
keybase: KeyRing,
}

impl CosmosSDKChain {
pub fn from_config(config: ChainConfig, rt: Arc<TokioRuntime>) -> Result<Self, Error> {
pub fn from_config(config: ChainConfig, rt: Arc<Mutex<TokioRuntime>>) -> Result<Self, Error> {
let primary = config
.primary()
.ok_or_else(|| Kind::LightClient.context("no primary peer specified"))?;
Expand All @@ -87,18 +89,16 @@ impl CosmosSDKChain {
Uri::from_str(&self.config().grpc_addr).map_err(|e| Kind::Grpc.context(e))?;

let mut client = self
.rt
.block_on(
ibc_proto::cosmos::staking::v1beta1::query_client::QueryClient::connect(grpc_addr),
)
)?
.map_err(|e| Kind::Grpc.context(e))?;

let request =
tonic::Request::new(ibc_proto::cosmos::staking::v1beta1::QueryParamsRequest {});

let response = self
.rt
.block_on(client.params(request))
.block_on(client.params(request))?
.map_err(|e| Kind::Grpc.context(e))?;

let res = response
Expand All @@ -115,11 +115,15 @@ impl CosmosSDKChain {
/// Specific to the SDK and used only for Tendermint client create
pub fn query_consensus_params(&self) -> Result<Params, Error> {
Ok(self
.rt
.block_on(self.rpc_client().genesis())
.block_on(self.rpc_client().genesis())?
.map_err(|e| Kind::Rpc.context(e))?
.consensus_params)
}

/// Run a future to completion on the Tokio runtime.
fn block_on<F: Future>(&self, f: F) -> Result<F::Output, Error> {
Ok(self.rt.lock().map_err(|_| Kind::PoisonedMutex)?.block_on(f))
}
}

impl Chain for CosmosSDKChain {
Expand Down Expand Up @@ -150,8 +154,7 @@ impl Chain for CosmosSDKChain {
}

let response =
self.rt
.block_on(abci_query(&self, path, data.to_string(), height, prove))?;
self.block_on(abci_query(&self, path, data.to_string(), height, prove))??;

// Verify response proof, if requested.
if prove {
Expand Down Expand Up @@ -191,8 +194,7 @@ impl Chain for CosmosSDKChain {
};

let acct_response = self
.rt
.block_on(query_account(self, key.account))
.block_on(query_account(self, key.account))?
.map_err(|e| Kind::Grpc.context(e))?;

let single = Single { mode: 1 };
Expand Down Expand Up @@ -250,8 +252,7 @@ impl Chain for CosmosSDKChain {
prost::Message::encode(&tx_raw, &mut txraw_buf).unwrap();

let response = self
.rt
.block_on(broadcast_tx_commit(self, txraw_buf))
.block_on(broadcast_tx_commit(self, txraw_buf))?
.map_err(|e| Kind::Rpc.context(e))?;

Ok(response)
Expand All @@ -260,8 +261,7 @@ impl Chain for CosmosSDKChain {
/// Query the latest height the chain is at via a RPC query
fn query_latest_height(&self) -> Result<ICSHeight, Error> {
let status = self
.rt
.block_on(self.rpc_client().status())
.block_on(self.rpc_client().status())?
.map_err(|e| Kind::Rpc.context(e))?;

if status.sync_info.catching_up {
Expand Down
15 changes: 10 additions & 5 deletions relayer/src/chain/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{sync::Arc, thread};
use std::{
sync::{Arc, Mutex},
thread,
};

use crossbeam_channel as channel;

Expand Down Expand Up @@ -58,23 +61,25 @@ pub struct ChainRuntime<C: Chain> {
light_client: Box<dyn LightClient<C>>,

#[allow(dead_code)]
rt: Arc<TokioRuntime>,
rt: Arc<Mutex<TokioRuntime>>,
}

impl ChainRuntime<CosmosSDKChain> {
pub fn cosmos_sdk(
config: ChainConfig,
light_client: TMLightClient,
event_receiver: channel::Receiver<EventBatch>,
rt: Arc<TokioRuntime>,
rt: Arc<Mutex<TokioRuntime>>,
) -> Result<Self, Error> {
let chain = CosmosSDKChain::from_config(config, rt.clone())?;
Ok(Self::new(chain, light_client, event_receiver, rt))
}

// TODO: Make this work for a generic Chain
pub fn spawn(config: ChainConfig) -> Result<(impl ChainHandle, Threads), Error> {
let rt = Arc::new(TokioRuntime::new().map_err(|e| Kind::Io.context(e))?);
let rt = Arc::new(Mutex::new(
TokioRuntime::new().map_err(|e| Kind::Io.context(e))?,
));

// Initialize the light clients
let (light_client, supervisor) = TMLightClient::from_config(&config, true)?;
Expand Down Expand Up @@ -115,7 +120,7 @@ impl<C: Chain> ChainRuntime<C> {
chain: C,
light_client: impl LightClient<C> + 'static,
event_receiver: channel::Receiver<EventBatch>,
rt: Arc<TokioRuntime>,
rt: Arc<Mutex<TokioRuntime>>,
) -> Self {
let (sender, receiver) = channel::unbounded::<HandleInput>();

Expand Down
4 changes: 4 additions & 0 deletions relayer/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ pub enum Kind {
#[error("I/O error")]
Io,

/// Poisoned mutex
#[error("poisoned mutex")]
PoisonedMutex,

/// Invalid configuration
#[error("Invalid configuration")]
Config,
Expand Down
Loading

0 comments on commit db44b16

Please sign in to comment.