diff --git a/CHANGELOG.md b/CHANGELOG.md index 544ee0b64..40fd176ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +UNRELEASED +=================== + * chore: upgrade `tokio` to `1.0` + * BREAKING: This requires the whole application to upgrade to `tokio` 1.0 + 0.46.0 / 2021-01-02 =================== * feat: `kube` now has __optional__ websocket support with `async_tungstenite` under `ws` and `ws-*-tls` features #360 diff --git a/Cargo.toml b/Cargo.toml index 3155984f9..3efe7be25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,7 @@ members = [ "tests", "examples", ] + +[patch.crates-io] +# https://github.com/seanmonstar/reqwest/pull/1076 hasn't been released yet +reqwest = { git = "https://github.com/seanmonstar/reqwest.git", rev = "1f425a0244bcd7b4565dceb9076450d951f2ec03" } \ No newline at end of file diff --git a/examples/Cargo.toml b/examples/Cargo.toml index f087c17c5..3483e1716 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -18,24 +18,24 @@ rustls-tls = ["reqwest/rustls-tls", "kube/rustls-tls", "kube-runtime/rustls-tls" ws = ["kube/ws", "kube/ws-native-tls"] [dev-dependencies] -anyhow = "1.0.32" -env_logger = "0.7.1" -futures = "0.3.5" +anyhow = "1.0.37" +env_logger = "0.8.2" +futures = "0.3.8" kube = { path = "../kube", version = "^0.46.0", default-features = false } kube-derive = { path = "../kube-derive", version = "^0.46.0", default-features = false } # only needed to opt out of schema kube-runtime = { path = "../kube-runtime", version = "^0.46.0", default-features = false } k8s-openapi = { version = "0.10.0", features = ["v1_19"], default-features = false } log = "0.4.11" -serde = { version = "1.0.111", features = ["derive"] } -serde_json = "1.0.57" +serde = { version = "1.0.118", features = ["derive"] } +serde_json = "1.0.61" serde_yaml = "0.8.14" -tokio = { version = "0.2.24", features = ["full"] } -color-eyre = "0.5.1" -snafu = { version = "0.6.8", features = ["futures"] } +tokio = { version = "1.0.1", features = ["full"] } +color-eyre = "0.5.10" +snafu = { version = "0.6.10", features = ["futures"] } # Some Api::delete methods use Either -either = "1.6.0" +either = "1.6.1" # Some configuration tweaking require reqwest atm -reqwest = { version = "0.10.8", default-features = false, features = ["json", "gzip", "stream"] } +reqwest = { version = "0.10.10", default-features = false, features = ["json", "gzip", "stream"] } schemars = "0.8.0" static_assertions = "1.1.0" @@ -138,3 +138,6 @@ path = "node_watcher.rs" [[example]] name = "secret_reflector" path = "secret_reflector.rs" + +[dependencies] +tokio-util = "0.6.0" diff --git a/examples/configmap_reflector.rs b/examples/configmap_reflector.rs index 862dbd316..7f059c469 100644 --- a/examples/configmap_reflector.rs +++ b/examples/configmap_reflector.rs @@ -11,7 +11,7 @@ fn spawn_periodic_reader(reader: Store) { tokio::spawn(async move { loop { // Periodically read our state - tokio::time::delay_for(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(5)).await; let cms: Vec<_> = reader.state().iter().map(|obj| Meta::name(obj).clone()).collect(); info!("Current configmaps: {:?}", cms); } diff --git a/examples/crd_api.rs b/examples/crd_api.rs index 3e9cff8f2..7c49169d9 100644 --- a/examples/crd_api.rs +++ b/examples/crd_api.rs @@ -4,7 +4,7 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::json; use std::time::Duration; -use tokio::time::delay_for; +use tokio::time::sleep; use apiexts::CustomResourceDefinition; use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1beta1 as apiexts; @@ -60,7 +60,7 @@ async fn main() -> anyhow::Result<()> { }) }); // Wait for the delete to take place (map-left case or delete from previous run) - delay_for(Duration::from_secs(2)).await; + sleep(Duration::from_secs(2)).await; // Create the CRD so we can create Foos in kube let foocrd = Foo::crd(); @@ -76,7 +76,7 @@ async fn main() -> anyhow::Result<()> { Err(e) => return Err(e.into()), // any other case is probably bad } // Wait for the api to catch up - delay_for(Duration::from_secs(1)).await; + sleep(Duration::from_secs(1)).await; // Manage the Foo CR let foos: Api = Api::namespaced(client.clone(), &namespace); diff --git a/examples/crd_derive_no_schema.rs b/examples/crd_derive_no_schema.rs index af2156187..ec949d5e1 100644 --- a/examples/crd_derive_no_schema.rs +++ b/examples/crd_derive_no_schema.rs @@ -1,8 +1,9 @@ +#[cfg(not(feature = "schema"))] use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::{ CustomResourceDefinition, CustomResourceValidation, JSONSchemaProps, }; -use kube_derive::CustomResource; -use serde::{Deserialize, Serialize}; +#[cfg(not(feature = "schema"))] use kube_derive::CustomResource; +#[cfg(not(feature = "schema"))] use serde::{Deserialize, Serialize}; /// CustomResource with manually implemented schema /// @@ -17,6 +18,7 @@ pub struct MyBar { bars: u32, } +#[cfg(not(feature = "schema"))] const MANUAL_SCHEMA: &'static str = r#" type: object properties: diff --git a/examples/crd_reflector.rs b/examples/crd_reflector.rs index 65e50dcb4..40c38c33a 100644 --- a/examples/crd_reflector.rs +++ b/examples/crd_reflector.rs @@ -32,7 +32,7 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(async move { loop { // Periodically read our state - tokio::time::delay_for(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(5)).await; let crds = reader.state().iter().map(Meta::name).collect::>(); info!("Current crds: {:?}", crds); } diff --git a/examples/deployment_reflector.rs b/examples/deployment_reflector.rs index 7c93c5ce8..98c289feb 100644 --- a/examples/deployment_reflector.rs +++ b/examples/deployment_reflector.rs @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> { // Periodically read our state let deploys: Vec<_> = reader.state().iter().map(Meta::name).collect(); info!("Current deploys: {:?}", deploys); - tokio::time::delay_for(std::time::Duration::from_secs(30)).await; + tokio::time::sleep(std::time::Duration::from_secs(30)).await; } }); diff --git a/examples/node_reflector.rs b/examples/node_reflector.rs index 91ece8f30..e7e13686a 100644 --- a/examples/node_reflector.rs +++ b/examples/node_reflector.rs @@ -27,7 +27,7 @@ async fn main() -> anyhow::Result<()> { loop { let nodes = reader.state().iter().map(Meta::name).collect::>(); info!("Current {} nodes: {:?}", nodes.len(), nodes); - tokio::time::delay_for(std::time::Duration::from_secs(10)).await; + tokio::time::sleep(std::time::Duration::from_secs(10)).await; } }); diff --git a/examples/pod_attach.rs b/examples/pod_attach.rs index c3cf64cc1..81da62fd1 100644 --- a/examples/pod_attach.rs +++ b/examples/pod_attach.rs @@ -74,14 +74,14 @@ async fn main() -> anyhow::Result<()> { #[allow(dead_code)] async fn separate_outputs(mut attached: AttachedProcess) { - let stdout = tokio::io::reader_stream(attached.stdout().unwrap()); + let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap()); let stdouts = stdout.for_each(|res| async { if let Ok(bytes) = res { let out = std::io::stdout(); out.lock().write_all(&bytes).unwrap(); } }); - let stderr = tokio::io::reader_stream(attached.stderr().unwrap()); + let stderr = tokio_util::io::ReaderStream::new(attached.stderr().unwrap()); let stderrs = stderr.for_each(|res| async { if let Ok(bytes) = res { let out = std::io::stderr(); @@ -98,8 +98,8 @@ async fn separate_outputs(mut attached: AttachedProcess) { #[allow(dead_code)] async fn combined_output(mut attached: AttachedProcess) { - let stdout = tokio::io::reader_stream(attached.stdout().unwrap()); - let stderr = tokio::io::reader_stream(attached.stderr().unwrap()); + let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap()); + let stderr = tokio_util::io::ReaderStream::new(attached.stderr().unwrap()); let outputs = stream::select(stdout, stderr).for_each(|res| async { if let Ok(bytes) = res { let out = std::io::stdout(); diff --git a/examples/pod_exec.rs b/examples/pod_exec.rs index 787ec3006..d67d74f6b 100644 --- a/examples/pod_exec.rs +++ b/examples/pod_exec.rs @@ -86,7 +86,7 @@ async fn main() -> anyhow::Result<()> { ) .await?; let mut stdin_writer = attached.stdin().unwrap(); - let mut stdout_stream = tokio::io::reader_stream(attached.stdout().unwrap()); + let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap()); let next_stdout = stdout_stream.next(); stdin_writer.write(b"echo test string 1\n").await?; let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap(); @@ -114,7 +114,7 @@ async fn main() -> anyhow::Result<()> { } async fn get_output(mut attached: AttachedProcess) -> String { - let stdout = tokio::io::reader_stream(attached.stdout().unwrap()); + let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap()); let out = stdout .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) }) .collect::>() diff --git a/examples/secret_reflector.rs b/examples/secret_reflector.rs index c80fcdac6..d3baca487 100644 --- a/examples/secret_reflector.rs +++ b/examples/secret_reflector.rs @@ -42,7 +42,7 @@ fn spawn_periodic_reader(reader: Store) { .map(|s| format!("{}: {:?}", Meta::name(s), decode(s).keys())) .collect(); info!("Current secrets: {:?}", cms); - tokio::time::delay_for(std::time::Duration::from_secs(15)).await; + tokio::time::sleep(std::time::Duration::from_secs(15)).await; } }); } diff --git a/kube-derive/Cargo.toml b/kube-derive/Cargo.toml index e256e149c..fcadd111c 100644 --- a/kube-derive/Cargo.toml +++ b/kube-derive/Cargo.toml @@ -12,11 +12,11 @@ repository = "https://github.com/clux/kube-rs" readme = "../README.md" [dependencies] -proc-macro2 = "1.0.19" -quote = "1.0.7" -syn = { version = "1.0.30", features = ["extra-traits"] } +proc-macro2 = "1.0.24" +quote = "1.0.8" +syn = { version = "1.0.57", features = ["extra-traits"] } Inflector = "0.11.4" -serde_json = "1.0.53" +serde_json = "1.0.61" [lib] proc-macro = true @@ -26,8 +26,8 @@ default = ["schema"] schema = [] [dev-dependencies] -serde = { version = "1.0.111", features = ["derive"] } +serde = { version = "1.0.118", features = ["derive"] } serde_yaml = "0.8.14" k8s-openapi = { version = "0.10.0", default-features = false, features = ["v1_19"] } schemars = { version = "0.8.0", features = ["chrono"] } -chrono = "0.4" +chrono = "0.4.19" diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index b9291d440..63d4241c7 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -13,15 +13,16 @@ categories = ["web-programming::http-client"] edition = "2018" [dependencies] -futures = "0.3.5" +futures = "0.3.8" kube = { path = "../kube", version = "^0.46.0", default-features = false } derivative = "2.1.1" -serde = "1.0.115" -smallvec = "1.4.2" -pin-project = "0.4.23" -tokio = { version = "0.2.21", features = ["time"] } -snafu = { version = "0.6.8", features = ["futures"] } -dashmap = "3.11.10" +serde = "1.0.118" +smallvec = "1.6.0" +pin-project = "1.0.2" +tokio = { version = "1.0.1", features = ["time"] } +snafu = { version = "0.6.10", features = ["futures"] } +dashmap = "4.0.1" +tokio-util = { version = "0.6.0", features = ["time"] } [dependencies.k8s-openapi] version = "0.10.0" @@ -32,14 +33,14 @@ default = ["native-tls"] native-tls = ["kube/native-tls"] rustls-tls = ["kube/rustls-tls"] +[dev-dependencies] +kube-derive = { path = "../kube-derive", version = "^0.46.0"} +serde_json = "1.0.61" +tokio = { version = "1.0.1", features = ["full", "test-util"] } +rand = "0.8.0" +schemars = "0.8.0" + [dev-dependencies.k8s-openapi] version = "0.10.0" default-features = false features = ["v1_19"] - -[dev-dependencies] -kube-derive = { path = "../kube-derive", version = "^0.46.0"} -serde_json = "1.0.57" -tokio = { version = "0.2.22", features = ["full", "test-util"] } -rand = "0.7.3" -schemars = "0.8.0" diff --git a/kube-runtime/src/controller/future_hash_map.rs b/kube-runtime/src/controller/future_hash_map.rs index 7f2b35ef6..92dc7fced 100644 --- a/kube-runtime/src/controller/future_hash_map.rs +++ b/kube-runtime/src/controller/future_hash_map.rs @@ -85,7 +85,7 @@ mod tests { fhm.insert(i, future::ready(i)); } let mut values = fhm.collect::>().await; - values.sort(); + values.sort_unstable(); assert_eq!(values, (0..count).collect::>()); } diff --git a/kube-runtime/src/controller/runner.rs b/kube-runtime/src/controller/runner.rs index e20fef0bc..a67de8955 100644 --- a/kube-runtime/src/controller/runner.rs +++ b/kube-runtime/src/controller/runner.rs @@ -88,7 +88,7 @@ mod tests { use crate::scheduler::{scheduler, ScheduleRequest}; use futures::{channel::mpsc, poll, SinkExt, TryStreamExt}; use std::{cell::RefCell, time::Duration}; - use tokio::time::{delay_for, pause, Instant}; + use tokio::time::{pause, sleep, Instant}; #[tokio::test] async fn runner_should_never_run_two_instances_at_once() { @@ -102,7 +102,7 @@ mod tests { // Panic if this ref is already held, to simulate some unsafe action.. let mutex_ref = rc.borrow_mut(); Box::pin(async move { - delay_for(Duration::from_secs(1)).await; + sleep(Duration::from_secs(1)).await; drop(mutex_ref); }) }) diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index 8fd07679c..a2ec05ff2 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -149,13 +149,13 @@ mod tests { #[tokio::test] async fn reflector_store_should_not_contain_duplicates() { let mut rng = rand::thread_rng(); - let item_dist = Uniform::new(0u8, 100); + let item_dist = Uniform::new(0_u8, 100); let deleted_dist = Bernoulli::new(0.40).unwrap(); let store_w = store::Writer::default(); let store = store_w.as_reader(); reflector( store_w, - stream::iter((0u32..100000).map(|gen| { + stream::iter((0_u32..100_000).map(|gen| { let item = rng.sample(item_dist); let deleted = rng.sample(deleted_dist); let obj = ConfigMap { diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index 72a34c535..77e9a11f2 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -133,7 +133,7 @@ mod tests { let mut cluster_cm = cm.clone(); cluster_cm.metadata.namespace = None; let mut store_w = Writer::default(); - store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone())); + store_w.apply_watcher_event(&watcher::Event::Applied(cm)); let store = store_w.as_reader(); assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None); } diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index 656f9a540..64a999e43 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -10,17 +10,14 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tokio::time::{ - self, - delay_queue::{self, DelayQueue}, - Instant, -}; +use tokio::time::{self, Instant}; +use tokio_util::time::delay_queue::{self, DelayQueue}; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("timer failure: {}", source))] TimerError { - source: time::Error, + source: time::error::Error, backtrace: Backtrace, }, } @@ -102,7 +99,7 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { &mut self, cx: &mut Context<'_>, can_take_message: impl Fn(&T) -> bool, - ) -> Poll>> { + ) -> Poll>> { if let Some(msg) = self.pending.iter().find(|msg| can_take_message(*msg)).cloned() { return Poll::Ready(Some(Ok(self.pending.take(&msg).unwrap()))); } @@ -344,7 +341,7 @@ mod tests { ])); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; - assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), ()); + scheduler.next().now_or_never().unwrap().unwrap().unwrap(); // Stream has terminated assert!(scheduler.next().await.is_none()); } @@ -364,7 +361,7 @@ mod tests { ])); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; - assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), ()); + scheduler.next().now_or_never().unwrap().unwrap().unwrap(); // Stream has terminated assert!(scheduler.next().await.is_none()); } @@ -383,7 +380,7 @@ mod tests { .unwrap(); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; - assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), ()); + scheduler.next().now_or_never().unwrap().unwrap().unwrap(); assert!(poll!(scheduler.next()).is_pending()); schedule_tx .send(ScheduleRequest { @@ -394,7 +391,7 @@ mod tests { .unwrap(); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; - assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), ()); + scheduler.next().now_or_never().unwrap().unwrap().unwrap(); assert!(poll!(scheduler.next()).is_pending()); } } diff --git a/kube/Cargo.toml b/kube/Cargo.toml index 97805d11e..c03d14c3f 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -20,7 +20,7 @@ default = ["native-tls"] native-tls = ["openssl", "reqwest/native-tls"] rustls-tls = ["rustls", "reqwest/rustls-tls"] derive = ["kube-derive"] -ws = ["async-tungstenite"] +ws = ["async-tungstenite", "tokio-util"] # We can merge these with `native-tls`/`rustls-tls` when `dep?/feature` syntax becomes stable. # https://doc.rust-lang.org/nightly/cargo/reference/unstable.html#weak-dependency-features ws-native-tls = ["real-native-tls", "tokio-native-tls", "async-tungstenite/tokio-native-tls"] @@ -30,41 +30,42 @@ ws-rustls-tls = ["tokio-rustls", "async-tungstenite/tokio-rustls"] features = ["derive", "ws", "ws-native-tls"] [dependencies] -base64 = "0.12.1" -chrono = "0.4.15" +base64 = "0.13.0" +chrono = "0.4.19" dirs = { package = "dirs-next", version = "2.0.0" } -serde = { version = "1.0.111", features = ["derive"] } -serde_json = "1.0.53" +serde = { version = "1.0.118", features = ["derive"] } +serde_json = "1.0.61" serde_yaml = "0.8.14" -http = "0.2.1" -url = "2.1.1" +http = "0.2.2" +url = "2.2.0" log = "0.4.11" -time = "0.2.16" -either = "1.6.0" -thiserror = "1.0.20" -futures-util = "0.3.5" -futures = "0.3.5" -pem = "0.8.1" -openssl = { version = "0.10.30", optional = true } -rustls = { version = "0.18.1", optional = true } -real-native-tls = { version = "0.2", optional = true, package = "native-tls" } -tokio-native-tls = { version = "0.1", optional = true } -tokio-rustls = { version = "0.14.1", optional = true } -bytes = "0.5.6" +time = "0.2.23" +either = "1.6.1" +thiserror = "1.0.23" +futures-util = "0.3.8" +futures = "0.3.8" +pem = "0.8.2" +openssl = { version = "0.10.32", optional = true } +rustls = { version = "0.19.0", optional = true } +real-native-tls = { version = "0.2.7", optional = true, package = "native-tls" } +tokio-native-tls = { version = "0.3.0", optional = true } +tokio-rustls = { version = "0.22.0", optional = true } +bytes = "1.0.0" Inflector = "0.11.4" -tokio = { version = "0.2.24", features = ["time", "signal", "sync"] } +tokio = { version = "1.0.1", features = ["time", "signal", "sync"] } static_assertions = "1.1.0" kube-derive = { path = "../kube-derive", version = "^0.46.0", optional = true } -jsonpath_lib = "0.2.5" +jsonpath_lib = "0.2.6" +tokio-util = { version = "0.6.0", optional = true } [dependencies.async-tungstenite] -version = "0.9.3" +version = "0.11.0" default-features = false features = ["tokio-runtime"] optional = true [dependencies.reqwest] -version = "0.10.8" +version = "0.10.10" default-features = false features = ["json", "gzip", "stream"] @@ -75,7 +76,7 @@ features = [] [dev-dependencies] tempfile = "3.1.0" -tokio = { version = "0.2.21", features = ["full"] } +tokio = { version = "1.0.1", features = ["full"] } schemars = "0.8.0" [dev-dependencies.k8s-openapi] diff --git a/kube/src/api/remote_command.rs b/kube/src/api/remote_command.rs index bdfc74a4a..13cab055f 100644 --- a/kube/src/api/remote_command.rs +++ b/kube/src/api/remote_command.rs @@ -169,7 +169,7 @@ async fn start_message_loop( mut stdout: Option, mut stderr: Option, ) -> Option { - let mut stdin_stream = tokio::io::reader_stream(stdin); + let mut stdin_stream = tokio_util::io::ReaderStream::new(stdin); let (mut server_send, raw_server_recv) = stream.split(); // Work with filtered messages to reduce noise. let mut server_recv = raw_server_recv.filter_map(filter_message).boxed(); diff --git a/kube/src/client/mod.rs b/kube/src/client/mod.rs index c894ea400..b19e0c0d5 100644 --- a/kube/src/client/mod.rs +++ b/kube/src/client/mod.rs @@ -139,6 +139,7 @@ impl Client { })), Err(ws2::Error::HttpFormat(err)) => Err(Error::HttpError(err)), + #[cfg(feature = "ws-native-tls")] Err(ws2::Error::Tls(err)) => Err(Error::SslError(format!("{}", err))), // URL errors: diff --git a/kube/src/runtime/informer.rs b/kube/src/runtime/informer.rs index 4b856fa5a..7380640cb 100644 --- a/kube/src/runtime/informer.rs +++ b/kube/src/runtime/informer.rs @@ -105,7 +105,7 @@ where if *needs_resync { // Try again in a bit let dur = Duration::from_secs(10); - tokio::time::delay_for(dur).await; + tokio::time::sleep(dur).await; // If we are outside history, start over from latest if *needs_resync { self.reset().await; diff --git a/kube/src/runtime/reflector.rs b/kube/src/runtime/reflector.rs index c2bc4ef53..7c98f4e76 100644 --- a/kube/src/runtime/reflector.rs +++ b/kube/src/runtime/reflector.rs @@ -4,7 +4,7 @@ use crate::{ }; use futures::{future::FutureExt, lock::Mutex, pin_mut, select, TryStreamExt}; use serde::de::DeserializeOwned; -use tokio::{signal::ctrl_c, time::delay_for}; +use tokio::{signal::ctrl_c, time::sleep}; #[cfg(not(target_family = "windows"))] use tokio::signal; @@ -92,7 +92,7 @@ where warn!("Poll error on {}: {}: {:?}", self.api.resource.kind, e, e); // If desynched due to mismatching resourceVersion, retry in a bit let dur = Duration::from_secs(10); - delay_for(dur).await; + sleep(dur).await; self.reset().await?; // propagate error if this failed.. } } diff --git a/tests/Cargo.toml b/tests/Cargo.toml index b50301dc5..9ac6d7201 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -10,11 +10,11 @@ name = "dapp" path = "dapp.rs" [dependencies] -anyhow = "1.0.32" -env_logger = "0.7.1" -futures = "0.3.5" +anyhow = "1.0.37" +env_logger = "0.8.2" +futures = "0.3.8" kube = { path = "../kube", version = "^0.46.0"} k8s-openapi = { version = "0.10.0", features = ["v1_19"], default-features = false } log = "0.4.11" -serde_json = "1.0.55" -tokio = { version = "0.2.21", features = ["full"] } +serde_json = "1.0.61" +tokio = { version = "1.0.1", features = ["full"] }