diff --git a/deny.toml b/deny.toml index 1e80d7a6e..47137ef97 100644 --- a/deny.toml +++ b/deny.toml @@ -68,4 +68,11 @@ allow-git = [] [bans] multiple-versions = "deny" -skip = [] + +[[bans.skip]] +# The following dependencies are still working on upgrading to 0.7: +# https://github.com/hyperium/h2/pull/603 +# https://github.com/tower-rs/tower/pull/638 +# https://github.com/tower-rs/tower-http/pull/221 +name = "tokio-util" +version = "0.6" diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 0a25b6943..dfd974823 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -24,7 +24,7 @@ latest = ["k8s-openapi/v1_22"] deprecated = ["kube/deprecated-crd-v1beta1", "k8s-openapi/v1_21"] [dev-dependencies] -tokio-util = "0.6.8" +tokio-util = "0.7.0" assert-json-diff = "2.0.1" validator = { version = "0.14.0", features = ["derive"] } anyhow = "1.0.44" diff --git a/kube-client/Cargo.toml b/kube-client/Cargo.toml index d8c92df68..ad7e86290 100644 --- a/kube-client/Cargo.toml +++ b/kube-client/Cargo.toml @@ -58,7 +58,7 @@ bytes = { version = "1.1.0", optional = true } tokio = { version = "1.14.0", features = ["time", "signal", "sync"], optional = true } kube-core = { path = "../kube-core", version = "^0.69.0"} jsonpath_lib = { version = "0.3.0", optional = true } -tokio-util = { version = "0.6.8", optional = true, features = ["io", "codec"] } +tokio-util = { version = "0.7.0", optional = true, features = ["io", "codec"] } hyper = { version = "0.14.13", optional = true, features = ["client", "http1", "stream", "tcp"] } hyper-tls = { version = "0.5.0", optional = true } hyper-rustls = { version = "0.23.0", optional = true } diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index b3fbf59c3..e2934d8d2 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -29,7 +29,7 @@ ahash = "0.7" parking_lot = "0.11" pin-project = "1.0.2" tokio = { version = "1.14.0", features = ["time"] } -tokio-util = { version = "0.6.8", features = ["time"] } +tokio-util = { version = "0.7.0", features = ["time"] } tracing = "0.1.29" json-patch = "0.2.6" serde_json = "1.0.68" diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 9e4a70982..0f9cbc5e9 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -7,7 +7,7 @@ use crate::{ store::{Store, Writer}, ObjectRef, }, - scheduler::{self, scheduler, ScheduleRequest}, + scheduler::{scheduler, ScheduleRequest}, utils::{ try_flatten_applied, try_flatten_touched, trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, @@ -43,8 +43,6 @@ pub enum Error), #[error("reconciler for object {1} failed")] ReconcilerFailed(#[source] ReconcilerErr, ObjectRef), - #[error("scheduler dequeue failed")] - SchedulerDequeueFailed(#[source] scheduler::Error), #[error("event queue error")] QueueError(#[source] QueueErr), } @@ -279,8 +277,6 @@ where .right_future(), } }) - .map_err(Error::SchedulerDequeueFailed) - .map(|res| res.and_then(|x| x)) .on_complete(async { tracing::debug!("applier runner terminated") }) }, ) diff --git a/kube-runtime/src/controller/runner.rs b/kube-runtime/src/controller/runner.rs index 26aa838c2..4d6021a42 100644 --- a/kube-runtime/src/controller/runner.rs +++ b/kube-runtime/src/controller/runner.rs @@ -1,5 +1,5 @@ use super::future_hash_map::FutureHashMap; -use crate::scheduler::{self, ScheduleRequest, Scheduler}; +use crate::scheduler::{ScheduleRequest, Scheduler}; use futures::{Future, Stream, StreamExt}; use pin_project::pin_project; use std::{ @@ -43,14 +43,14 @@ where F: Future + Unpin, MkF: FnMut(&T) -> F, { - type Item = scheduler::Result; + type Item = F::Output; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); let slots = this.slots; let scheduler = &mut this.scheduler; let has_active_slots = match slots.poll_next_unpin(cx) { - Poll::Ready(Some(result)) => return Poll::Ready(Some(Ok(result))), + Poll::Ready(Some(result)) => return Poll::Ready(Some(result)), Poll::Ready(None) => false, Poll::Pending => true, }; @@ -63,7 +63,7 @@ where .hold_unless(|msg| !slots.contains_key(msg)) .poll_next_unpin(cx); match next_msg_poll { - Poll::Ready(Some(Ok(msg))) => { + Poll::Ready(Some(msg)) => { let msg_fut = (this.run_msg)(&msg); assert!( slots.insert(msg, msg_fut).is_none(), @@ -71,7 +71,6 @@ where ); cx.waker().wake_by_ref(); } - Poll::Ready(Some(Err(err))) => break Poll::Ready(Some(Err(err))), Poll::Ready(None) => { break if has_active_slots { // We're done listening for new messages, but still have some that @@ -93,7 +92,7 @@ mod tests { use crate::scheduler::{scheduler, ScheduleRequest}; use futures::{ channel::{mpsc, oneshot}, - future, poll, SinkExt, TryStreamExt, + future, poll, SinkExt, StreamExt, }; use std::{cell::RefCell, time::Duration}; use tokio::{ @@ -118,7 +117,7 @@ mod tests { drop(mutex_ref); }) }) - .try_for_each(|_| async { Ok(()) }), + .for_each(|_| async {}), ); sched_tx .send(ScheduleRequest { @@ -135,7 +134,7 @@ mod tests { }) .await .unwrap(); - let ((), run) = future::join( + future::join( async { tokio::time::sleep(Duration::from_secs(5)).await; drop(sched_tx); @@ -143,7 +142,6 @@ mod tests { runner, ) .await; - run.unwrap(); // Validate that we actually ran both requests assert_eq!(count, 2); } @@ -158,7 +156,7 @@ mod tests { let mut runner = Runner::new(scheduler(sched_rx), |msg: &u8| futures::future::ready(*msg)); // Start a background task that starts listening /before/ we enqueue the message // We can't just use Stream::poll_next(), since that bypasses the waker system - Handle::current().spawn(async move { result_tx.send(runner.try_next().await.unwrap()).unwrap() }); + Handle::current().spawn(async move { result_tx.send(runner.next().await).unwrap() }); // Ensure that the background task actually gets to initiate properly, and starts polling the runner yield_now().await; sched_tx diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index f63193c54..ba86c65b9 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -8,17 +8,9 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use thiserror::Error; -use tokio::time::{self, Instant}; +use tokio::time::Instant; use tokio_util::time::delay_queue::{self, DelayQueue}; -#[derive(Debug, Error)] -pub enum Error { - #[error("timer failure: {0}")] - TimerError(#[source] time::error::Error), -} -pub type Result = std::result::Result; - /// A request to re-emit `message` at a given `Instant` (`run_at`). #[derive(Debug)] pub struct ScheduleRequest { @@ -95,24 +87,23 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { &mut self, cx: &mut Context<'_>, can_take_message: impl Fn(&T) -> bool, - ) -> Poll> { + ) -> Poll { if let Some(msg) = self.pending.iter().find(|msg| can_take_message(*msg)).cloned() { - return Poll::Ready(Ok(self.pending.take(&msg).unwrap())); + return Poll::Ready(self.pending.take(&msg).unwrap()); } loop { match self.queue.poll_expired(cx) { - Poll::Ready(Some(Ok(msg))) => { + Poll::Ready(Some(msg)) => { let msg = msg.into_inner(); self.scheduled.remove(&msg).expect( - "Expired message was popped from the Scheduler queue, but was not in the metadata map", - ); + "Expired message was popped from the Scheduler queue, but was not in the metadata map", + ); if can_take_message(&msg) { - break Poll::Ready(Ok(msg)); + break Poll::Ready(msg); } self.pending.insert(msg); } - Poll::Ready(Some(Err(err))) => break Poll::Ready(Err(err)), Poll::Ready(None) | Poll::Pending => break Poll::Pending, } } @@ -131,7 +122,7 @@ where R: Stream>, C: Fn(&T) -> bool + Unpin, { - type Item = Result; + type Item = T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -147,7 +138,7 @@ where } match scheduler.poll_pop_queue_message(cx, &can_take_message) { - Poll::Ready(expired) => Poll::Ready(Some(expired.map_err(Error::TimerError))), + Poll::Ready(expired) => Poll::Ready(Some(expired)), Poll::Pending => Poll::Pending, } } @@ -187,7 +178,7 @@ where T: Eq + Hash + Clone, R: Stream>, { - type Item = Result; + type Item = T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.hold_unless(|_| true)).poll_next(cx) @@ -239,9 +230,7 @@ mod tests { assert!(poll!(scheduler.as_mut().hold_unless(|_| false).next()).is_pending()); assert!(scheduler.contains_pending(&1)); assert_eq!( - unwrap_poll(poll!(scheduler.as_mut().hold_unless(|_| true).next())) - .unwrap() - .unwrap(), + unwrap_poll(poll!(scheduler.as_mut().hold_unless(|_| true).next())).unwrap(), 1_u8 ); assert!(!scheduler.contains_pending(&1)); @@ -272,7 +261,7 @@ mod tests { drop(tx); }, async { - assert_eq!(scheduler.next().await.unwrap().unwrap(), 1); + assert_eq!(scheduler.next().await.unwrap(), 1); assert!(scheduler.next().await.is_none()) }, ) @@ -295,13 +284,7 @@ mod tests { .on_complete(sleep(Duration::from_secs(2))), )); assert_eq!( - scheduler - .as_mut() - .hold_unless(|x| *x != 1) - .next() - .await - .unwrap() - .unwrap(), + scheduler.as_mut().hold_unless(|x| *x != 1).next().await.unwrap(), 2 ); } @@ -325,10 +308,10 @@ mod tests { pin_mut!(scheduler); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; - assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), 1); + assert_eq!(scheduler.next().now_or_never().unwrap().unwrap(), 1); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; - assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), 2); + assert_eq!(scheduler.next().now_or_never().unwrap().unwrap(), 2); // Stream has terminated assert!(scheduler.next().await.is_none()); } @@ -352,7 +335,7 @@ mod tests { pin_mut!(scheduler); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; - scheduler.next().now_or_never().unwrap().unwrap().unwrap(); + scheduler.next().now_or_never().unwrap().unwrap(); // Stream has terminated assert!(scheduler.next().await.is_none()); } @@ -376,7 +359,7 @@ mod tests { pin_mut!(scheduler); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; - scheduler.next().now_or_never().unwrap().unwrap().unwrap(); + scheduler.next().now_or_never().unwrap().unwrap(); // Stream has terminated assert!(scheduler.next().await.is_none()); } @@ -395,7 +378,7 @@ mod tests { .unwrap(); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; - scheduler.next().now_or_never().unwrap().unwrap().unwrap(); + scheduler.next().now_or_never().unwrap().unwrap(); assert!(poll!(scheduler.next()).is_pending()); schedule_tx .send(ScheduleRequest { @@ -406,7 +389,7 @@ mod tests { .unwrap(); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; - scheduler.next().now_or_never().unwrap().unwrap().unwrap(); + scheduler.next().now_or_never().unwrap().unwrap(); assert!(poll!(scheduler.next()).is_pending()); } }