diff --git a/Cargo.toml b/Cargo.toml index 921877e..5abfdcb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,13 +39,18 @@ rustls = { version = "0.23.2", optional = true } rustls-pemfile = { version = "2", optional = true } webpki-roots = { version = "0.26.1", optional = true } derive-where = "1.2.7" -tokio-rustls = "0.26.0" fastrand = "2.0.2" tracing = "0.1.40" rsasl = { version = "2.0.1", default-features = false, features = ["provider", "config_builder", "registry_static", "std"], optional = true } md5 = { version = "0.7.0", optional = true } hex = { version = "0.4.3", optional = true } linkme = { version = "0.2", optional = true } +async-io = "2.3.2" +futures = "0.3.30" +async-net = "2.0.0" +futures-rustls = "0.26.0" +futures-lite = "2.3.0" +asyncs = "0.2.0" [dev-dependencies] test-log = { version = "0.2.15", features = ["log", "trace"] } @@ -59,9 +64,15 @@ assert_matches = "1.5.0" tempfile = "3.6.0" rcgen = { version = "0.12.1", features = ["default", "x509-parser"] } serial_test = "3.0.0" +asyncs = { version = "0.2.0", features = ["test"] } +blocking = "1.6.0" [package.metadata.cargo-all-features] skip_optional_dependencies = true [package.metadata.docs.rs] all-features = true + +[profile.dev] +# Need this for linkme crate to work for spawns in macOS +lto = "thin" diff --git a/src/client/mod.rs b/src/client/mod.rs index 6e3331c..071181a 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -8,9 +8,9 @@ use std::time::Duration; use const_format::formatcp; use either::{Either, Left, Right}; +use futures::channel::mpsc; use ignore_result::Ignore; use thiserror::Error; -use tokio::sync::mpsc; use tracing::instrument; pub use self::watcher::{OneshotWatcher, PersistentWatcher, StateWatcher}; @@ -322,9 +322,9 @@ impl Client { fn send_marshalled_request(&self, request: MarshalledRequest) -> StateReceiver { let (operation, receiver) = SessionOperation::new_marshalled(request).with_responser(); - if let Err(mpsc::error::SendError(operation)) = self.requester.send(operation) { + if let Err(err) = self.requester.unbounded_send(operation) { let state = self.state(); - operation.responser.send(Err(state.to_error())); + err.into_inner().responser.send(Err(state.to_error())); } receiver } @@ -514,7 +514,7 @@ impl Client { // TODO: move these to session side so to eliminate owned Client and String. fn delete_background(self, path: String) { - tokio::spawn(async move { + asyncs::spawn(async move { self.delete_foreground(&path).await; }); } @@ -524,7 +524,7 @@ impl Client { } fn delete_ephemeral_background(self, prefix: String, unique: bool) { - tokio::spawn(async move { + asyncs::spawn(async move { let (parent, tree, name) = util::split_path(&prefix); let mut children = Self::retry_on_connection_loss(|| self.list_children(parent)).await?; if unique { @@ -1673,13 +1673,13 @@ impl Connector { let mut buf = Vec::with_capacity(4096); let mut depot = Depot::new(); let conn = session.start(&mut endpoints, &mut buf, &mut depot).await?; - let (sender, receiver) = mpsc::unbounded_channel(); + let (sender, receiver) = mpsc::unbounded(); let session_info = session.session.clone(); let session_timeout = session.session_timeout; let mut state_watcher = StateWatcher::new(state_receiver); // Consume all state changes so far. state_watcher.state(); - tokio::spawn(async move { + asyncs::spawn(async move { session.serve(endpoints, conn, buf, depot, receiver).await; }); let client = @@ -2270,7 +2270,7 @@ mod tests { .is_equal_to(Error::BadArguments(&"directory node must not be sequential")); } - #[test_log::test(tokio::test)] + #[test_log::test(asyncs::test)] async fn session_last_zxid_seen() { use testcontainers::clients::Cli as DockerCli; use testcontainers::core::{Healthcheck, WaitFor}; diff --git a/src/deadline.rs b/src/deadline.rs index 5dd26c5..9fb216f 100644 --- a/src/deadline.rs +++ b/src/deadline.rs @@ -1,32 +1,34 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::Duration; +use std::time::{Duration, Instant}; -use tokio::time::{self, Instant, Sleep}; +use async_io::Timer; +use futures::future::{Fuse, FusedFuture, FutureExt}; pub struct Deadline { - sleep: Option, + timer: Fuse, + deadline: Option, } impl Deadline { pub fn never() -> Self { - Self { sleep: None } + Self { timer: Timer::never().fuse(), deadline: None } } pub fn until(deadline: Instant) -> Self { - Self { sleep: Some(time::sleep_until(deadline)) } + Self { timer: Timer::at(deadline).fuse(), deadline: Some(deadline) } } pub fn elapsed(&self) -> bool { - self.sleep.as_ref().map(|f| f.is_elapsed()).unwrap_or(false) + self.timer.is_terminated() } /// Remaining timeout. pub fn timeout(&self) -> Duration { - match self.sleep.as_ref() { + match self.deadline.as_ref() { None => Duration::MAX, - Some(sleep) => sleep.deadline().saturating_duration_since(Instant::now()), + Some(deadline) => deadline.saturating_duration_since(Instant::now()), } } } @@ -35,10 +37,7 @@ impl Future for Deadline { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.sleep.is_none() { - return Poll::Pending; - } - let sleep = unsafe { self.map_unchecked_mut(|deadline| deadline.sleep.as_mut().unwrap_unchecked()) }; - sleep.poll(cx) + let timer = unsafe { self.map_unchecked_mut(|deadline| &mut deadline.timer) }; + timer.poll(cx).map(|_| ()) } } diff --git a/src/endpoint.rs b/src/endpoint.rs index 14d8211..707084a 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -1,6 +1,8 @@ use std::fmt::{self, Display, Formatter}; use std::time::Duration; +use async_io::Timer; + use crate::chroot::Chroot; use crate::error::Error; use crate::util::{Ref, ToRef}; @@ -219,7 +221,7 @@ impl IterableEndpoints { async fn delay(&self, index: Index, max_delay: Duration) { let timeout = max_delay.min(Self::timeout(index, self.endpoints.len())); if timeout != Duration::ZERO { - tokio::time::sleep(timeout).await; + Timer::after(timeout).await; } } @@ -336,7 +338,7 @@ mod tests { ); } - #[tokio::test] + #[asyncs::test] async fn test_iterable_endpoints_next() { use std::time::Duration; diff --git a/src/session/connection.rs b/src/session/connection.rs index ed3374a..e04f43a 100644 --- a/src/session/connection.rs +++ b/src/session/connection.rs @@ -3,21 +3,24 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; +use async_io::Timer; +use async_net::TcpStream; +use asyncs::select; use bytes::buf::BufMut; +use futures::io::BufReader; +use futures::prelude::*; +use futures_lite::AsyncReadExt; use ignore_result::Ignore; -use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufStream, ReadBuf}; -use tokio::net::TcpStream; -use tokio::{select, time}; use tracing::{debug, trace}; #[cfg(feature = "tls")] mod tls { pub use std::sync::Arc; + pub use futures_rustls::client::TlsStream; + pub use futures_rustls::TlsConnector; pub use rustls::pki_types::ServerName; pub use rustls::ClientConfig; - pub use tokio_rustls::client::TlsStream; - pub use tokio_rustls::TlsConnector; } #[cfg(feature = "tls")] use tls::*; @@ -51,7 +54,7 @@ pub trait AsyncReadToBuf: AsyncReadExt { impl AsyncReadToBuf for T where T: AsyncReadExt {} impl AsyncRead for Connection { - fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { match self.get_mut() { Self::Raw(stream) => Pin::new(stream).poll_read(cx, buf), #[cfg(feature = "tls")] @@ -85,11 +88,11 @@ impl AsyncWrite for Connection { } } - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.get_mut() { - Self::Raw(stream) => Pin::new(stream).poll_shutdown(cx), + Self::Raw(stream) => Pin::new(stream).poll_close(cx), #[cfg(feature = "tls")] - Self::Tls(stream) => Pin::new(stream).poll_shutdown(cx), + Self::Tls(stream) => Pin::new(stream).poll_close(cx), } } } @@ -99,7 +102,7 @@ pub struct ConnReader<'a> { } impl AsyncRead for ConnReader<'_> { - fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { Pin::new(&mut self.get_mut().conn).poll_read(cx, buf) } } @@ -121,8 +124,8 @@ impl AsyncWrite for ConnWriter<'_> { Pin::new(&mut self.get_mut().conn).poll_flush(cx) } - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.get_mut().conn).poll_shutdown(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().conn).poll_close(cx) } } @@ -142,13 +145,14 @@ impl Connection { Self::Tls(stream) } - pub async fn command(self, cmd: &str) -> Result { - let mut stream = BufStream::new(self); - stream.write_all(cmd.as_bytes()).await?; - stream.flush().await?; + pub async fn command(mut self, cmd: &str) -> Result { + // let mut stream = BufStream::new(self); + self.write_all(cmd.as_bytes()).await?; + self.flush().await?; let mut line = String::new(); - stream.read_line(&mut line).await?; - stream.shutdown().await.ignore(); + let mut reader = BufReader::new(self); + reader.read_line(&mut line).await?; + reader.close().await.ignore(); Ok(line) } @@ -212,7 +216,7 @@ impl Connector { } select! { _ = unsafe { Pin::new_unchecked(deadline) } => Err(Error::new(ErrorKind::TimedOut, "deadline exceed")), - _ = time::sleep(self.timeout) => Err(Error::new(ErrorKind::TimedOut, format!("connection timeout{:?} exceed", self.timeout))), + _ = Timer::after(self.timeout) => Err(Error::new(ErrorKind::TimedOut, format!("connection timeout{:?} exceed", self.timeout))), r = TcpStream::connect((endpoint.host, endpoint.port)) => { match r { Err(err) => Err(err), @@ -255,10 +259,10 @@ impl Connector { "fails to contact writable server from endpoints {:?}", endpoints.endpoints() ); - time::sleep(timeout).await; + Timer::after(timeout).await; timeout = max_timeout.min(timeout * 2); } else { - time::sleep(Duration::from_millis(5)).await; + Timer::after(Duration::from_millis(5)).await; } } None @@ -273,7 +277,7 @@ mod tests { use crate::deadline::Deadline; use crate::endpoint::EndpointRef; - #[tokio::test] + #[asyncs::test] async fn raw() { let connector = Connector::new(); let endpoint = EndpointRef::new("host1", 2181, true); diff --git a/src/session/depot.rs b/src/session/depot.rs index e9f6ba5..51703a1 100644 --- a/src/session/depot.rs +++ b/src/session/depot.rs @@ -1,9 +1,9 @@ use std::collections::VecDeque; use std::io::IoSlice; +use futures_lite::io::AsyncWriteExt; use hashbrown::HashMap; use strum::IntoEnumIterator; -use tokio::io::AsyncWriteExt; use tracing::debug; use super::request::{MarshalledRequest, OpStat, Operation, SessionOperation, StateResponser}; diff --git a/src/session/mod.rs b/src/session/mod.rs index 39c8925..6a0795e 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -6,13 +6,14 @@ mod types; mod watch; mod xid; -use std::time::Duration; +use std::pin::pin; +use std::time::{Duration, Instant}; +use async_io::Timer; +use asyncs::select; +use futures::channel::mpsc; +use futures::{AsyncWriteExt, StreamExt}; use ignore_result::Ignore; -use tokio::io::AsyncWriteExt; -use tokio::select; -use tokio::sync::mpsc; -use tokio::time::{self, Instant}; use tracing::field::display; use tracing::{debug, info, instrument, warn, Span}; @@ -213,7 +214,7 @@ impl Session { async fn close_requester(mut requester: mpsc::UnboundedReceiver, err: &Error) { requester.close(); - while let Some(operation) = requester.recv().await { + while let Some(operation) = requester.next().await { let responser = operation.into_responser(); responser.send(Err(err.clone())); } @@ -505,9 +506,8 @@ impl Session { depot: &mut Depot, ) -> Result<(), Error> { let mut pinged = false; - let mut tick = time::interval(self.tick_timeout); - tick.set_missed_tick_behavior(time::MissedTickBehavior::Skip); let (mut reader, mut writer) = conn.split(); + let mut tick = pin!(Timer::interval(self.tick_timeout)); while !(self.session_state.is_connected() && depot.is_empty()) { select! { r = reader.read_to_buf(buf) => match r.map_err(Error::other)? { @@ -518,7 +518,7 @@ impl Session { r?; self.last_send = Instant::now(); }, - now = tick.tick() => { + now = tick.as_mut() => { if now >= self.last_recv + self.connector.timeout() { return Err(Error::with_message(format!("no response from connection in {}ms", self.connector.timeout().as_millis()))); } @@ -554,8 +554,7 @@ impl Session { self.sasl_session.take(); let mut seek_for_writable = if self.session.readonly { Some(self.connector.clone().seek_for_writable(endpoints)) } else { None }; - let mut tick = time::interval(self.tick_timeout); - tick.set_missed_tick_behavior(time::MissedTickBehavior::Skip); + let mut tick = pin!(Timer::interval(self.tick_timeout)); let mut err = None; let mut channel_halted = false; depot.start(); @@ -575,7 +574,7 @@ impl Session { r?; self.last_send = Instant::now(); }, - r = requester.recv(), if !channel_halted => { + r = requester.next(), if !channel_halted => { let operation = if let Some(operation) = r { operation } else { @@ -587,10 +586,10 @@ impl Session { }; depot.push_session(operation); }, - r = unwatch_requester.recv() => if let Some((watcher_id, responser)) = r { + r = unwatch_requester.next() => if let Some((watcher_id, responser)) = r { self.watch_manager.remove_watcher(watcher_id, responser, depot); }, - now = tick.tick() => { + now = tick.as_mut() => { if now >= self.last_recv + self.connector.timeout() { return Err(Error::with_message(format!("no response from connection in {}ms", self.connector.timeout().as_millis()))); } diff --git a/src/session/request.rs b/src/session/request.rs index f0bf8bf..fb66867 100644 --- a/src/session/request.rs +++ b/src/session/request.rs @@ -3,8 +3,8 @@ use std::pin::Pin; use std::task::{Context, Poll}; use bytes::{Buf, BufMut}; +use futures::channel::oneshot; use ignore_result::Ignore; -use tokio::sync::oneshot; use super::types::WatchMode; use super::watch::WatchReceiver; diff --git a/src/session/watch.rs b/src/session/watch.rs index 99b2faf..e861bf3 100644 --- a/src/session/watch.rs +++ b/src/session/watch.rs @@ -1,7 +1,8 @@ +use futures::channel::{mpsc, oneshot}; +use futures::StreamExt; use hashbrown::HashMap; use hashlink::LinkedHashSet; use ignore_result::Ignore; -use tokio::sync::{mpsc, oneshot}; use super::depot::Depot; use super::event::WatcherEvent; @@ -69,7 +70,7 @@ impl OneshotReceiver { let id = self.id; let unwatch = unsafe { self.into_unwatch() }; let (sender, receiver) = oneshot::channel(); - unwatch.send((id, StateResponser::new(sender))).ignore(); + unwatch.unbounded_send((id, StateResponser::new(sender))).ignore(); let receiver = StateReceiver::new(OpCode::RemoveWatches, receiver); receiver.await?; Ok(()) @@ -78,7 +79,7 @@ impl OneshotReceiver { impl Drop for OneshotReceiver { fn drop(&mut self) { - self.unwatch.send((self.id, Default::default())).ignore(); + self.unwatch.unbounded_send((self.id, Default::default())).ignore(); } } @@ -106,14 +107,14 @@ impl PersistentReceiver { } pub async fn recv(&mut self) -> WatchedEvent { - self.receiver.recv().await.unwrap() + self.receiver.next().await.unwrap() } pub async fn remove(self) -> Result<(), Error> { let id = self.id; let unwatch = unsafe { self.into_unwatch() }; let (sender, receiver) = oneshot::channel(); - unwatch.send((id, StateResponser::new(sender))).ignore(); + unwatch.unbounded_send((id, StateResponser::new(sender))).ignore(); let receiver = StateReceiver::new(OpCode::RemoveWatches, receiver); receiver.await?; Ok(()) @@ -122,7 +123,7 @@ impl PersistentReceiver { impl Drop for PersistentReceiver { fn drop(&mut self) { - self.unwatch.send((self.id, Default::default())).ignore(); + self.unwatch.unbounded_send((self.id, Default::default())).ignore(); } } @@ -195,7 +196,7 @@ impl Watch { let sender = watcher.sender.into_oneshot(); sender.send(event.to_value()).ignore(); }, - WatchSender::Persistent(sender) => sender.send(event.to_value()).ignore(), + WatchSender::Persistent(sender) => sender.unbounded_send(event.to_value()).ignore(), } } } @@ -219,7 +220,7 @@ pub struct WatchManager { impl WatchManager { pub fn new() -> (Self, mpsc::UnboundedReceiver<(WatcherId, StateResponser)>) { - let (unwatch_sender, unwatch_receiver) = mpsc::unbounded_channel(); + let (unwatch_sender, unwatch_receiver) = mpsc::unbounded(); let manager = WatchManager { cached_paths: LinkedHashSet::with_capacity(1000), cached_watches: Vec::with_capacity(100), @@ -265,7 +266,7 @@ impl WatchManager { fn add_persistent_watch(&mut self, path: &str, kind: WatcherKind) -> PersistentReceiver { let id = self.new_watcher_id(); - let (sender, receiver) = mpsc::unbounded_channel(); + let (sender, receiver) = mpsc::unbounded(); let watcher = Watcher { id, kind, sender: WatchSender::Persistent(sender) }; self.add_watch(path, watcher); PersistentReceiver::new(id, receiver, self.unwatch_sender.clone()) @@ -354,7 +355,7 @@ impl WatchManager { path = unsafe { path.get_unchecked(..i) }; if let Some(watch) = self.watches.get_mut(path) { for watcher in watch.iter().filter(|watcher| watcher.kind == WatcherKind::PersistentRecursive) { - watcher.sender.get_persistent().send(event.to_value()).ignore(); + watcher.sender.get_persistent().unbounded_send(event.to_value()).ignore(); has_watch = true; } } diff --git a/tests/zookeeper.rs b/tests/zookeeper.rs index 552aab3..9f1a5ca 100644 --- a/tests/zookeeper.rs +++ b/tests/zookeeper.rs @@ -3,7 +3,9 @@ use std::fmt::Write as _; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; +use async_io::Timer; use std::{fs, future}; +use std::pin::Pin; use assert_matches::assert_matches; use assertor::*; @@ -17,7 +19,7 @@ use test_case::test_case; use testcontainers::clients::Cli as DockerCli; use testcontainers::core::{Container, Healthcheck, LogStream, RunnableImage, WaitFor}; use testcontainers::images::generic::GenericImage; -use tokio::select; +use asyncs::select; use zookeeper_client as zk; static ZK_IMAGE_TAG: &'static str = "3.9.0"; @@ -129,9 +131,9 @@ async fn example() { assert_eq!(session_event.session_state, zk::SessionState::Closed); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_example() { - tokio::spawn(async move { example().await }).await.unwrap() + asyncs::spawn(async move { example().await }).await.unwrap() } async fn connect(cluster: &Cluster, chroot: &str) -> zk::Client { @@ -155,7 +157,7 @@ async fn connect(cluster: &Cluster, chroot: &str) -> zk::Client { client.chroot(chroot).unwrap() } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_connect_nohosts() { assert_that!(zk::Client::connector() .session_timeout(Duration::from_secs(24 * 3600)) @@ -166,7 +168,7 @@ async fn test_connect_nohosts() { .is_equal_to(zk::Error::NoHosts); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_connect_timeout() { assert_that!(zk::Client::connector() .session_timeout(Duration::from_secs(1)) @@ -176,14 +178,14 @@ async fn test_connect_timeout() { .is_equal_to(zk::Error::Timeout); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_connect_session_expired() { let cluster = Cluster::new().await; let client = cluster.custom_client(None, |connector| connector.detached()).await.unwrap(); let timeout = client.session_timeout(); let session = client.into_session(); - tokio::time::sleep(timeout * 2).await; + Timer::after(timeout * 2).await; assert_that!(cluster.custom_client(None, |connector| connector.session(session)).await.unwrap_err()) .is_equal_to(zk::Error::SessionExpired); @@ -502,7 +504,7 @@ serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory tasks.push(( id, - tokio::task::spawn_blocking({ + blocking::unblock({ let docker = self.docker.clone(); move || unsafe { std::mem::transmute::<_, Container<'static, GenericImage>>(docker.run(image)) } }), @@ -510,7 +512,7 @@ serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory } for task in tasks { let id = task.0; - let container = task.1.await.unwrap(); + let container = task.1.await; self.containers.push((id, container)); } } @@ -604,7 +606,7 @@ serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory #[test_case("/"; "no_chroot")] #[test_case("/x"; "chroot_x")] #[test_case("/x/y"; "chroot_x_y")] -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_multi(chroot: &str) { let cluster = Cluster::new().await; let client = connect(&cluster, chroot).await; @@ -705,7 +707,7 @@ async fn test_multi(chroot: &str) { assert_that!(results).is_empty(); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_multi_async_order() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -730,7 +732,7 @@ async fn test_multi_async_order() { assert_that!(stat).is_equal_to(set_stat); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_check_writer() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -762,7 +764,7 @@ async fn test_check_writer() { #[test_case("/x"; "chroot_x")] #[test_case("/x/y"; "chroot_x_y")] -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_lock_shared(chroot: &str) { let cluster = Cluster::new().await; @@ -777,7 +779,7 @@ async fn test_lock_shared(chroot: &str) { #[test_case("/x"; "chroot_x")] #[test_case("/x/y"; "chroot_x_y")] -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_lock_custom(chroot: &str) { let cluster = Cluster::new().await; @@ -788,7 +790,7 @@ async fn test_lock_custom(chroot: &str) { #[test_case("/x"; "chroot_x")] #[test_case("/x/y"; "chroot_x_y")] -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_lock_curator(chroot: &str) { let cluster = Cluster::new().await; @@ -797,7 +799,7 @@ async fn test_lock_curator(chroot: &str) { test_lock_with_path(&cluster, chroot, lock1_prefix, lock2_prefix).await; } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_lock_no_node() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -805,7 +807,7 @@ async fn test_lock_no_node() { assert_eq!(client.lock(prefix, b"", zk::Acls::anyone_all()).await.unwrap_err(), zk::Error::NoNode); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_lock_curator_filter() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -831,7 +833,7 @@ async fn test_lock_with_path( let options = zk::LockOptions::new(zk::Acls::anyone_all()).with_ancestor_options(CONTAINER_OPEN.clone()).unwrap(); let lock1 = client1.lock(lock1_prefix, b"", options.clone()).await.unwrap(); - let contender2 = tokio::spawn(async move { + let contender2 = asyncs::spawn(async move { let lock2 = client2.lock(lock2_prefix, b"", options).await.unwrap(); let (data, stat) = lock2.client().get_data("/lock-path").await.unwrap(); lock2.set_data("/lock-path", lock2.lock_path().as_bytes(), None).await.unwrap(); @@ -840,7 +842,7 @@ async fn test_lock_with_path( }); // Let lock2 get chance to chime in. - tokio::time::sleep(Duration::from_millis(100)).await; + Timer::after(Duration::from_millis(100)).await; let lock1_path = lock1.lock_path().to_string(); @@ -860,12 +862,12 @@ async fn test_lock_with_path( let lock2_path = String::from_utf8(data).unwrap(); // Let background delete get chance to chime in. - tokio::time::sleep(Duration::from_millis(100)).await; + Timer::after(Duration::from_millis(100)).await; assert_that!(client1.check_stat(&lock1_path).await.unwrap()).is_equal_to(None); assert_that!(client1.check_stat(&lock2_path).await.unwrap()).is_equal_to(None); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_no_node() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -883,7 +885,7 @@ async fn test_no_node() { ); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_request_order() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -911,7 +913,7 @@ async fn test_request_order() { assert_that!(get_child_data.await).is_equal_to(Err(zk::Error::NoNode)); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_data_node() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -930,7 +932,7 @@ async fn test_data_node() { assert_eq!(client.check_stat(path).await.unwrap(), None); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_create_root() { let cluster = Cluster::new().await; let client = cluster.client(None).await.chroot("/a").unwrap(); @@ -940,7 +942,7 @@ async fn test_create_root() { .is_equal_to(zk::Error::BadArguments(&"can not create root node")); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_create_sequential() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -964,7 +966,7 @@ async fn test_create_sequential() { assert_eq!((data, stat2), client.get_data(&path2).await.unwrap()); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_create_ttl() { let cluster = Cluster::with_properties(vec![ "-Dzookeeper.extendedTypesEnabled=true", @@ -976,13 +978,13 @@ async fn test_create_ttl() { let ttl_options = PERSISTENT_OPEN.clone().with_ttl(Duration::from_millis(500)); client.create("/ttl", &vec![], &ttl_options).await.unwrap(); client.create("/ttl/child", &vec![], PERSISTENT_OPEN).await.unwrap(); - tokio::time::sleep(Duration::from_secs(4)).await; + Timer::after(Duration::from_secs(4)).await; client.delete("/ttl/child", None).await.unwrap(); - tokio::time::sleep(Duration::from_secs(4)).await; + Timer::after(Duration::from_secs(4)).await; assert_that!(client.delete("/ttl", None).await.unwrap_err()).is_equal_to(zk::Error::NoNode); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_create_container() { let cluster = Cluster::with_properties(vec![ "-Dzookeeper.extendedTypesEnabled=true", @@ -993,15 +995,15 @@ async fn test_create_container() { client.create("/container", &vec![], &zk::CreateMode::Container.with_acls(zk::Acls::anyone_all())).await.unwrap(); client.create("/container/child", &vec![], PERSISTENT_OPEN).await.unwrap(); - tokio::time::sleep(Duration::from_secs(4)).await; + Timer::after(Duration::from_secs(4)).await; client.delete("/container/child", None).await.unwrap(); - tokio::time::sleep(Duration::from_secs(4)).await; + Timer::after(Duration::from_secs(4)).await; assert_that!(client.delete("/container", None).await.unwrap_err()).is_equal_to(zk::Error::NoNode); } #[test_case("3.3"; "3.3")] #[test_case("3.4"; "3.4")] -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_zookeeper_old_server(tag: &'static str) { let cluster = Cluster::with_options(ClusterOptions { tag, ..Default::default() }, Some(Encryption::Raw)).await; @@ -1037,7 +1039,7 @@ async fn test_zookeeper_old_server(tag: &'static str) { assert_eq!(event.event_type, zk::EventType::NodeDeleted); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_mkdir() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1053,7 +1055,7 @@ async fn test_mkdir() { assert_that!(_client.mkdir("/a/b/c", PERSISTENT_OPEN).await.unwrap_err()).is_equal_to(zk::Error::NoNode); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_descendants_number() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1101,7 +1103,7 @@ where } } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_ephemerals() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1163,7 +1165,7 @@ async fn test_ephemerals() { assert_eq!(vec!["/"], child_root_client.list_ephemerals("/").await.unwrap().into_sorted()); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_chroot() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1208,7 +1210,7 @@ async fn test_chroot() { assert_eq!(relative_grandchild_event.path, relative_grandchild_path); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_auth() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1230,7 +1232,7 @@ async fn test_auth() { assert!(authed_users.contains(&authed_user)); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_no_auth() { let cluster = Cluster::with_options(Default::default(), Some(Encryption::Raw)).await; let client = cluster.client(None).await; @@ -1260,7 +1262,7 @@ async fn test_no_auth() { assert_eq!(no_auth_client.set_data("/acl_test_2", b"set_my_data", None).await.unwrap_err(), zk::Error::NoAuth); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] #[should_panic(expected = "AuthFailed")] async fn test_auth_failed() { let cluster = Cluster::with_options( @@ -1279,7 +1281,7 @@ async fn test_auth_failed() { cluster.client(None).await; } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_delete() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1297,7 +1299,7 @@ async fn test_delete() { client.delete(path, Some(stat.version)).await.unwrap(); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_oneshot_watcher() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1469,7 +1471,7 @@ async fn test_oneshot_watcher() { eprintln!("node deletion done"); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_config_watch() { let cluster = Cluster::new().await; @@ -1485,7 +1487,7 @@ async fn test_config_watch() { assert_eq!(event.path, "/zookeeper/config"); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_persistent_watcher_passive_remove() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1519,7 +1521,7 @@ async fn test_persistent_watcher_passive_remove() { assert_eq!(child_event.path, "/"); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_fail_watch_with_multiple_unwatching() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1540,11 +1542,11 @@ async fn test_fail_watch_with_multiple_unwatching() { select! { state = state_watcher.changed() => panic!("expect no state update, but got {state}"), - _ = tokio::time::sleep(Duration::from_millis(10)) => {}, + _ = Timer::after(Duration::from_millis(10)) => {}, } } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_fail_watch_with_concurrent_passive_remove() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1564,7 +1566,7 @@ async fn test_fail_watch_with_concurrent_passive_remove() { assert_that!(event.path).is_same_string_to("/a"); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_persistent_watcher() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1579,7 +1581,7 @@ async fn test_persistent_watcher() { let path_persistent_watcher = client.watch(path, zk::AddWatchMode::PersistentRecursive).await.unwrap(); drop(root_recursive_watcher); path_persistent_watcher.remove().await.unwrap(); - tokio::time::sleep(Duration::from_millis(10)).await; + Timer::after(Duration::from_millis(10)).await; let mut root_recursive_watcher = client.watch("/", zk::AddWatchMode::PersistentRecursive).await.unwrap(); let mut path_recursive_watcher = client.watch(path, zk::AddWatchMode::PersistentRecursive).await.unwrap(); @@ -1684,7 +1686,7 @@ async fn test_persistent_watcher() { assert_eq!(event, path_persistent_watcher.changed().await); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_watcher_coexist_on_same_path() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1743,7 +1745,7 @@ async fn test_watcher_coexist_on_same_path() { } // Use "current_thread" explicitly. -#[test_log::test(tokio::test(flavor = "current_thread"))] +#[test_log::test(asyncs::test(parallelism = 1))] async fn test_remove_no_watcher() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1752,7 +1754,7 @@ async fn test_remove_no_watcher() { let create = client.create("/a", &vec![], PERSISTENT_OPEN); // Let session task issue `create` request first, oneshot watch will be removed by server. - tokio::task::yield_now().await; + asyncs::task::yield_now().await; // Issue `RemoveWatches` which likely happen before watch event notification as it involves // several IO paths. @@ -1761,12 +1763,12 @@ async fn test_remove_no_watcher() { let (_, _, data_watcher) = client.get_and_watch_data("/a").await.unwrap(); let delete = client.delete("/a", None); - tokio::task::yield_now().await; + asyncs::task::yield_now().await; assert_that!(data_watcher.remove().await.unwrap_err()).is_equal_to(zk::Error::NoWatcher); delete.await.unwrap(); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_session_event() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1797,7 +1799,7 @@ async fn test_session_event() { assert_eq!(client.get_data("/a/no-exist-path").await.unwrap_err(), zk::Error::SessionExpired); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_state_watcher() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1813,11 +1815,11 @@ async fn test_state_watcher() { select! { biased; _ = state_watcher.changed() => panic!("expect no state update after terminal state"), - _ = tokio::time::sleep(Duration::from_millis(10)) => {}, + _ = Timer::after(Duration::from_millis(10)) => {}, } } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_client_drop() { let cluster = Cluster::new().await; let client = cluster.client(None).await; @@ -1829,7 +1831,7 @@ async fn test_client_drop() { cluster.custom_client(None, |connector| connector.session(session)).await.unwrap_err(); } -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_client_detach() { let cluster = Cluster::new().await; let client = cluster.custom_client(None, |connector| connector.detached()).await.unwrap(); @@ -1842,7 +1844,7 @@ async fn test_client_detach() { } #[cfg(feature = "sasl-digest-md5")] -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_sasl_digest_md5() { let cluster = Cluster::with_options( ClusterOptions { @@ -1922,7 +1924,7 @@ fn generate_client_cert(cn: &str) -> Certificate { } #[cfg(feature = "tls")] -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_tls() { let cluster = Cluster::with_options(Default::default(), Some(Encryption::Tls)).await; let client = cluster.client(None).await; @@ -1944,7 +1946,7 @@ trait StateWaiter { impl StateWaiter for zk::StateWatcher { async fn wait(&mut self, expected: zk::SessionState, timeout: Option) { let timeout = timeout.unwrap_or_else(|| Duration::from_secs(60)); - let mut sleep = tokio::time::sleep(timeout); + let mut sleep = Timer::after(timeout); let mut got = self.state(); loop { if got == expected { @@ -1961,7 +1963,7 @@ impl StateWaiter for zk::StateWatcher { } #[cfg(target_os = "linux")] -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] #[serial_test::serial(network_host)] async fn test_readonly_plaintext() { test_readonly(Encryption::Raw).await @@ -1969,7 +1971,7 @@ async fn test_readonly_plaintext() { #[cfg(feature = "tls")] #[cfg(target_os = "linux")] -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] #[serial_test::serial(network_host)] async fn test_readonly_tls() { test_readonly(Encryption::Tls).await @@ -2043,7 +2045,7 @@ async fn test_readonly(encryption: Encryption) { /// * https://docs.docker.com/network/links/ #[cfg(target_os = "linux")] #[serial_test::serial(network_host)] -#[test_log::test(tokio::test)] +#[test_log::test(asyncs::test)] async fn test_update_ensemble() { let _cluster = Cluster::with_options( ClusterOptions {