Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Tokio to version 0.3.0 #681

Merged
merged 9 commits into from
Nov 19, 2020
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
## Unreleased

### BREAKING CHANGES

- `[tendermint-rpc, tendermint-light-client]` Upgrade Tokio to version 0.3.0 ([#683])
- Upgrade `hyper` to `v0.14-dev`
- Upgrade `async-tungstenite` to `v0.10`

### IMPROVEMENTS:

- `[light-client]` Only require Tokio when `rpc-client` feature is enabled ([#425])

[#425]: https://github.com/informalsystems/tendermint-rs/issues/425
[#683]: https://github.com/informalsystems/tendermint-rs/issues/683


## v0.17.0-rc3

Expand Down
2 changes: 1 addition & 1 deletion light-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ serde_derive = "1.0.106"
sled = "0.34.3"
static_assertions = "1.1.0"
thiserror = "1.0.15"
tokio = { version = "0.2.20", optional = true }
tokio = { version = "0.3", features = ["rt"], optional = true }

[dev-dependencies]
tendermint-testgen = { path = "../testgen"}
Expand Down
4 changes: 4 additions & 0 deletions light-client/src/components/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ pub enum IoError {
/// Task timed out.
#[error("task timed out after {} ms", .0.as_millis())]
Timeout(Duration),

/// Failed to initialize runtime
#[error("failed to initialize runtime")]
Runtime,
}

impl IoError {
Expand Down
5 changes: 2 additions & 3 deletions light-client/src/utils/block_on.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ where
F::Output: Send,
{
std::thread::spawn(move || {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
.map_err(|_| IoError::Runtime)?;

if let Some(timeout) = timeout {
let task = async { tokio::time::timeout(timeout, f).await };
Expand Down
3 changes: 1 addition & 2 deletions light-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ name = "tendermint-light-node"
path = "src/bin/tendermint-light-node/main.rs"

[dependencies]
abscissa_tokio = "0.5"
anomaly = { version = "0.2", features = [ "serializer" ] }
async-trait = "0.1"
gumdrop = "0.7"
Expand All @@ -42,7 +41,7 @@ tendermint = { version = "0.17.0-rc3", path = "../tendermint" }
tendermint-light-client = { version = "0.17.0-rc3", path = "../light-client" }
tendermint-rpc = { version = "0.17.0-rc3", path = "../rpc", features = [ "http-client" ] }
thiserror = "1.0"
tokio = { version = "0.2", features = ["full"] }
tokio = { version = "0.3", features = ["full"] }

[dependencies.abscissa_core]
version = "0.5.0"
Expand Down
4 changes: 1 addition & 3 deletions light-node/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use abscissa_core::{
application::{self, AppCell},
config, trace, Application, EntryPoint, FrameworkError, StandardPaths,
};
use abscissa_tokio::TokioComponent;

/// Application state
pub static APPLICATION: AppCell<LightNodeApp> = AppCell::new();
Expand Down Expand Up @@ -83,8 +82,7 @@ impl Application for LightNodeApp {
/// beyond the default ones provided by the framework, this is the place
/// to do so.
fn register_components(&mut self, command: &Self::Cmd) -> Result<(), FrameworkError> {
let mut components = self.framework_components(command)?;
components.push(Box::new(TokioComponent::new()?));
let components = self.framework_components(command)?;
self.state.components.register(components)
}

Expand Down
57 changes: 25 additions & 32 deletions light-node/src/commands/start.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
//! `start` subcommand - start the light node.

use std::process;

use crate::application::{app_config, APPLICATION};
use crate::application::app_config;
use crate::config::{LightClientConfig, LightNodeConfig};
use crate::rpc;
use crate::rpc::Server;
Expand Down Expand Up @@ -44,42 +42,37 @@ pub struct StartCmd {
impl Runnable for StartCmd {
/// Start the application.
fn run(&self) {
if let Err(err) = abscissa_tokio::run(&APPLICATION, async {
if let Err(e) = StartCmd::assert_init_was_run() {
if let Err(e) = StartCmd::assert_init_was_run() {
status_err!(&e);
panic!(e);
}

let supervisor = match self.construct_supervisor() {
Ok(supervisor) => supervisor,
Err(e) => {
status_err!(&e);
panic!(e);
}
};

let rpc_handler = supervisor.handle();
StartCmd::start_rpc_server(rpc_handler);

let supervisor = match self.construct_supervisor() {
Ok(supervisor) => supervisor,
Err(e) => {
status_err!(&e);
panic!(e);
let handle = supervisor.handle();
std::thread::spawn(|| supervisor.run());

loop {
match handle.verify_to_highest() {
Ok(light_block) => {
status_info!("synced to block:", light_block.height().to_string());
}
};

let rpc_handler = supervisor.handle();
StartCmd::start_rpc_server(rpc_handler);

let handle = supervisor.handle();
std::thread::spawn(|| supervisor.run());

loop {
match handle.verify_to_highest() {
Ok(light_block) => {
status_info!("synced to block:", light_block.height().to_string());
}
Err(err) => {
status_err!("sync failed: {}", err);
}
Err(err) => {
status_err!("sync failed: {}", err);
}

// TODO(liamsi): use ticks and make this configurable:
std::thread::sleep(Duration::from_millis(800));
}
}) {
status_err!("Unexpected error while running application: {}", err);
process::exit(1);

// TODO(liamsi): use ticks and make this configurable:
std::thread::sleep(Duration::from_millis(800));
}
}
}
Expand Down
14 changes: 11 additions & 3 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ websocket-client = [
"async-trait",
"async-tungstenite",
"futures",
"tokio/rt",
"tokio/rt-multi-thread",
"tokio/fs",
"tokio/macros",
"tokio/stream",
Expand All @@ -53,9 +55,15 @@ uuid = { version = "0.8", default-features = false }
subtle-encoding = { version = "0.5", features = ["bech32-preview"] }

async-trait = { version = "0.1", optional = true }
async-tungstenite = { version="0.8", features = ["tokio-runtime"], optional = true }
async-tungstenite = { version = "0.10", features = ["tokio-runtime"], optional = true }
futures = { version = "0.3", optional = true }
http = { version = "0.2", optional = true }
hyper = { version = "0.13", optional = true }
tokio = { version = "0.2", optional = true }
tokio = { version = "0.3", optional = true }
tracing = { version = "0.1", optional = true }
pin-project = "1.0.1"

[dependencies.hyper]
version = "0.14.0-dev"
git = "https://github.com/hyperium/hyper/"
rev = "2a19ab74ed69bc776da25544e98979c9fb6e1834"
optional = true
14 changes: 10 additions & 4 deletions rpc/src/client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use async_trait::async_trait;
use futures::task::{Context, Poll};
use futures::Stream;
use getrandom::getrandom;
use pin_project::pin_project;
use std::collections::HashMap;
use std::convert::TryInto;
use std::pin::Pin;
Expand Down Expand Up @@ -53,14 +54,19 @@ pub trait SubscriptionClient {
/// ```
///
/// [`Event`]: ./event/struct.Event.html
#[pin_project]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this! 👍 I was wondering when this was going to become necessary 😁

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I initially did it the unsafe and manual way, but that unsafe block kept me most of the following night, so I figured I'd rather trust a real Rust dev and take the (admittedly very small) compile-time + code size penalty with pin-project.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before/after: 7c8f444

#[derive(Debug)]
pub struct Subscription {
/// The query for which events will be produced.
pub query: Query,

/// The ID of this subscription (automatically assigned).
pub id: SubscriptionId,

// Our internal result event receiver for this subscription.
#[pin]
event_rx: ChannelRx<Result<Event>>,

// Allows us to interact with the subscription driver (exclusively to
// terminate this subscription).
cmd_tx: ChannelTx<SubscriptionDriverCmd>,
Expand All @@ -69,8 +75,8 @@ pub struct Subscription {
impl Stream for Subscription {
type Item = Result<Event>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.event_rx.poll_recv(cx)
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().event_rx.poll_next(cx)
}
}

Expand Down Expand Up @@ -509,7 +515,7 @@ mod test {
}

async fn must_recv<T>(ch: &mut ChannelRx<T>, timeout_ms: u64) -> T {
let mut delay = time::delay_for(Duration::from_millis(timeout_ms));
let mut delay = time::sleep(Duration::from_millis(timeout_ms));
tokio::select! {
_ = &mut delay, if !delay.is_elapsed() => panic!("timed out waiting for recv"),
Some(v) = ch.recv() => v,
Expand All @@ -520,7 +526,7 @@ mod test {
where
T: std::fmt::Debug,
{
let mut delay = time::delay_for(Duration::from_millis(timeout_ms));
let mut delay = time::sleep(Duration::from_millis(timeout_ms));
tokio::select! {
_ = &mut delay, if !delay.is_elapsed() => (),
Some(v) = ch.recv() => panic!("got unexpected result from channel: {:?}", v),
Expand Down
18 changes: 14 additions & 4 deletions rpc/src/client/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
//! convenience methods. We also only implement unbounded channels at present.
//! In future, if RPC consumers need it, we will implement bounded channels.

use crate::{Error, Result};
use std::pin::Pin;

use futures::task::{Context, Poll};
use futures::Stream;
use pin_project::pin_project;
use tokio::sync::mpsc;

use crate::{Error, Result};

/// Constructor for an unbounded channel.
pub fn unbounded<T>() -> (ChannelTx<T>, ChannelRx<T>) {
let (tx, rx) = mpsc::unbounded_channel();
Expand All @@ -33,17 +38,22 @@ impl<T> ChannelTx<T> {
}

/// Receiver interface for a channel.
#[pin_project]
#[derive(Debug)]
pub struct ChannelRx<T>(mpsc::UnboundedReceiver<T>);
pub struct ChannelRx<T>(#[pin] mpsc::UnboundedReceiver<T>);

impl<T> ChannelRx<T> {
/// Wait indefinitely until we receive a value from the channel (or the
/// channel is closed).
pub async fn recv(&mut self) -> Option<T> {
self.0.recv().await
}
}

impl<T> Stream for ChannelRx<T> {
type Item = T;

pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.0.poll_recv(cx)
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().0.poll_next(cx)
}
}
2 changes: 1 addition & 1 deletion rpc/src/client/transport/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl WebSocketClientDriver {
pub async fn run(mut self) -> Result<()> {
let mut ping_interval =
tokio::time::interval_at(Instant::now().add(PING_INTERVAL), PING_INTERVAL);
let mut recv_timeout = tokio::time::delay_for(PING_INTERVAL);
let mut recv_timeout = tokio::time::sleep(PING_INTERVAL);
loop {
tokio::select! {
Some(res) = self.stream.next() => match res {
Expand Down
4 changes: 2 additions & 2 deletions tendermint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ k256 = { version = "0.5", optional = true, features = ["ecdsa"] }
ripemd160 = { version = "0.9", optional = true }

[dev-dependencies]
tendermint-rpc = { path = "../rpc", features = [ "http-client", "websocket-client" ] }
tokio = { version = "0.2", features = [ "macros" ] }
tendermint-rpc = { path = "../rpc", features = ["http-client", "websocket-client"] }
tokio = { version = "0.3", features = ["macros"] }

[features]
secp256k1 = ["k256", "ripemd160"]
6 changes: 3 additions & 3 deletions tendermint/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ mod rpc {
let mut cur_tx_id = 0_u32;

while !expected_tx_values.is_empty() {
let mut delay = tokio::time::delay_for(Duration::from_secs(3));
let mut delay = tokio::time::sleep(Duration::from_secs(3));
tokio::select! {
Some(res) = subs.next() => {
let ev = res.unwrap();
Expand Down Expand Up @@ -314,7 +314,7 @@ mod rpc {
.broadcast_tx_async(Transaction::from(tx.into_bytes()))
.await
.unwrap();
tokio::time::delay_for(Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
});

Expand All @@ -327,7 +327,7 @@ mod rpc {
);

while expected_new_blocks > 0 && !expected_tx_values.is_empty() {
let mut timeout = tokio::time::delay_for(Duration::from_secs(3));
let mut timeout = tokio::time::sleep(Duration::from_secs(3));
tokio::select! {
Some(res) = combined_subs.next() => {
let ev: Event = res.unwrap();
Expand Down