Skip to content

Commit

Permalink
Use tokio to replace async_std for plugins and backends. (#1264)
Browse files Browse the repository at this point in the history
* Use tokio to replace async_std for plugins and backends.

Signed-off-by: ChenYing Kuo <[email protected]>

* Incorporate the review comments.

Signed-off-by: ChenYing Kuo <[email protected]>

* Support thread number config for tokio runtime.

Signed-off-by: ChenYing Kuo <[email protected]>

---------

Signed-off-by: ChenYing Kuo <[email protected]>
  • Loading branch information
evshary authored Aug 6, 2024
1 parent b1e4dba commit b7d42ef
Show file tree
Hide file tree
Showing 26 changed files with 210 additions and 130 deletions.
22 changes: 5 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ anyhow = { version = "1.0.69", default-features = false } # Default features are
async-executor = "1.5.0"
async-global-executor = "2.3.1"
async-io = "2.3.3"
async-std = { version = "=1.12.0", default-features = false } # Default features are disabled due to some crates' requirements
async-trait = "0.1.60"
base64 = "0.22.1"
bincode = "1.3.3"
Expand Down
6 changes: 6 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,12 @@
// __config__: "./plugins/zenoh-plugin-rest/config.json5",
// /// http port to answer to rest requests
// http_port: 8000,
// /// The number of worker thread in TOKIO runtime (default: 2)
// /// The configuration only takes effect if running as a dynamic plugin, which can not reuse the current runtime.
// work_thread_num: 0,
// /// The number of blocking thread in TOKIO runtime (default: 50)
// /// The configuration only takes effect if running as a dynamic plugin, which can not reuse the current runtime.
// max_block_thread_num: 50,
// },
//
// /// Configure the storage manager plugin
Expand Down
1 change: 0 additions & 1 deletion commons/zenoh-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ maintenance = { status = "actively-developed" }
test = []

[dependencies]
async-std = { workspace = true, features = ["default", "unstable"] }
tokio = { workspace = true, features = ["time", "net"] }
async-trait = { workspace = true }
flume = { workspace = true }
Expand Down
52 changes: 29 additions & 23 deletions commons/zenoh-util/src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use std::{
time::{Duration, Instant},
};

use async_std::{prelude::*, sync::Mutex, task};
use async_trait::async_trait;
use flume::{bounded, Receiver, RecvError, Sender};
use tokio::{runtime::Handle, select, sync::Mutex, task, time};
use zenoh_core::zconfigurable;

zconfigurable! {
Expand Down Expand Up @@ -120,7 +120,7 @@ async fn timer_task(
let mut events = events.lock().await;

loop {
// Fuuture for adding new events
// Future for adding new events
let new = new_event.recv_async();

match events.peek() {
Expand All @@ -130,12 +130,17 @@ async fn timer_task(
let next = next.clone();
let now = Instant::now();
if next.when > now {
task::sleep(next.when - now).await;
time::sleep(next.when - now).await;
}
Ok((false, next))
};

match new.race(wait).await {
let result = select! {
result = wait => { result },
result = new => { result },
};

match result {
Ok((is_new, mut ev)) => {
if is_new {
// A new event has just been added: push it onto the heap
Expand Down Expand Up @@ -204,14 +209,14 @@ impl Timer {
// Start the timer task
let c_e = timer.events.clone();
let fut = async move {
let _ = sl_receiver
.recv_async()
.race(timer_task(c_e, ev_receiver))
.await;
select! {
_ = sl_receiver.recv_async() => {},
_ = timer_task(c_e, ev_receiver) => {},
};
tracing::trace!("A - Timer task no longer running...");
};
if spawn_blocking {
task::spawn_blocking(|| task::block_on(fut));
task::spawn_blocking(|| Handle::current().block_on(fut));
} else {
task::spawn(fut);
}
Expand All @@ -234,14 +239,14 @@ impl Timer {
// Start the timer task
let c_e = self.events.clone();
let fut = async move {
let _ = sl_receiver
.recv_async()
.race(timer_task(c_e, ev_receiver))
.await;
select! {
_ = sl_receiver.recv_async() => {},
_ = timer_task(c_e, ev_receiver) => {},
};
tracing::trace!("A - Timer task no longer running...");
};
if spawn_blocking {
task::spawn_blocking(|| task::block_on(fut));
task::spawn_blocking(|| Handle::current().block_on(fut));
} else {
task::spawn(fut);
}
Expand Down Expand Up @@ -307,8 +312,8 @@ mod tests {
time::{Duration, Instant},
};

use async_std::task;
use async_trait::async_trait;
use tokio::{runtime::Runtime, time};

use super::{Timed, TimedEvent, Timer};

Expand Down Expand Up @@ -349,7 +354,7 @@ mod tests {
timer.add_async(event).await;

// Wait for the event to occur
task::sleep(3 * interval).await;
time::sleep(3 * interval).await;

// Load and reset the counter value
let value = counter.swap(0, Ordering::SeqCst);
Expand All @@ -368,7 +373,7 @@ mod tests {
handle.defuse();

// Wait for the event to occur
task::sleep(3 * interval).await;
time::sleep(3 * interval).await;

// Load and reset the counter value
let value = counter.swap(0, Ordering::SeqCst);
Expand All @@ -390,7 +395,7 @@ mod tests {
timer.add_async(event).await;

// Wait for the events to occur
task::sleep(to_elapse + interval).await;
time::sleep(to_elapse + interval).await;

// Load and reset the counter value
let value = counter.swap(0, Ordering::SeqCst);
Expand All @@ -401,7 +406,7 @@ mod tests {
handle.defuse();

// Wait a bit more to verify that not more events have been fired
task::sleep(to_elapse).await;
time::sleep(to_elapse).await;

// Load and reset the counter value
let value = counter.swap(0, Ordering::SeqCst);
Expand All @@ -416,7 +421,7 @@ mod tests {
timer.add_async(event).await;

// Wait for the events to occur
task::sleep(to_elapse + interval).await;
time::sleep(to_elapse + interval).await;

// Load and reset the counter value
let value = counter.swap(0, Ordering::SeqCst);
Expand All @@ -426,7 +431,7 @@ mod tests {
timer.stop_async().await;

// Wait some time
task::sleep(to_elapse).await;
time::sleep(to_elapse).await;

// Load and reset the counter value
let value = counter.swap(0, Ordering::SeqCst);
Expand All @@ -436,13 +441,14 @@ mod tests {
timer.start_async(false).await;

// Wait for the events to occur
task::sleep(to_elapse).await;
time::sleep(to_elapse).await;

// Load and reset the counter value
let value = counter.swap(0, Ordering::SeqCst);
assert_eq!(value, amount);
}

task::block_on(run());
let rt = Runtime::new().unwrap();
rt.block_on(run());
}
}
2 changes: 1 addition & 1 deletion io/zenoh-links/zenoh-link-vsock/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub fn get_vsock_addr(address: Address<'_>) -> ZResult<VsockAddr> {
}

pub struct LinkUnicastVsock {
// The underlying socket as returned from the async-std library
// The underlying socket as returned from the tokio library
socket: UnsafeCell<VsockStream>,
// The source socket address of this link (address used on the local host)
src_addr: VsockAddr,
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-backend-example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ name = "zenoh_backend_example"
crate-type = ["cdylib"]

[dependencies]
async-std = { workspace = true, features = ["default"] }
const_format = { workspace = true }
futures = { workspace = true }
git-version = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
serde_json = { workspace = true }
zenoh = { workspace = true, features = ["default"] }
zenoh-plugin-trait = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-backend-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
//
use std::collections::{hash_map::Entry, HashMap};

use async_std::sync::RwLock;
use async_trait::async_trait;
use tokio::sync::RwLock;
use zenoh::{internal::Value, key_expr::OwnedKeyExpr, prelude::*, time::Timestamp};
use zenoh_backend_traits::{
config::{StorageConfig, VolumeConfig},
Expand Down
1 change: 0 additions & 1 deletion plugins/zenoh-backend-traits/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ description = "Zenoh: traits to be implemented by backends libraries"
maintenance = { status = "actively-developed" }

[dependencies]
async-std = { workspace = true, features = ["default"] }
async-trait = { workspace = true }
derive_more = { workspace = true }
serde_json = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion plugins/zenoh-plugin-example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ name = "zenoh_plugin_example"
crate-type = ["cdylib"]

[dependencies]
async-std = { workspace = true, features = ["default"] }
const_format = { workspace = true }
zenoh-util = { workspace = true }
futures = { workspace = true }
lazy_static = { workspace = true }
git-version = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
serde_json = { workspace = true }
zenoh = { workspace = true, features = [
"default",
Expand Down
Loading

0 comments on commit b7d42ef

Please sign in to comment.