Skip to content

Commit

Permalink
Remove scheduler::Error completely
Browse files Browse the repository at this point in the history
This depends on #826
  • Loading branch information
nightkr committed Feb 14, 2022
1 parent 903a87c commit f2175a3
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 40 deletions.
6 changes: 1 addition & 5 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
store::{Store, Writer},
ObjectRef,
},
scheduler::{self, scheduler, ScheduleRequest},
scheduler::{scheduler, ScheduleRequest},
utils::{
try_flatten_applied, try_flatten_touched, trystream_try_via, CancelableJoinHandle,
KubeRuntimeStreamExt, StreamBackoff,
Expand Down Expand Up @@ -43,8 +43,6 @@ pub enum Error<ReconcilerErr: std::error::Error + 'static, QueueErr: std::error:
ObjectNotFound(ObjectRef<DynamicObject>),
#[error("reconciler for object {1} failed")]
ReconcilerFailed(#[source] ReconcilerErr, ObjectRef<DynamicObject>),
#[error("scheduler dequeue failed")]
SchedulerDequeueFailed(#[source] scheduler::Error),
#[error("event queue error")]
QueueError(#[source] QueueErr),
}
Expand Down Expand Up @@ -279,8 +277,6 @@ where
.right_future(),
}
})
.map_err(Error::SchedulerDequeueFailed)
.map(|res| res.and_then(|x| x))
.on_complete(async { tracing::debug!("applier runner terminated") })
},
)
Expand Down
18 changes: 8 additions & 10 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::future_hash_map::FutureHashMap;
use crate::scheduler::{self, ScheduleRequest, Scheduler};
use crate::scheduler::{ScheduleRequest, Scheduler};
use futures::{Future, Stream, StreamExt};
use pin_project::pin_project;
use std::{
Expand Down Expand Up @@ -43,14 +43,14 @@ where
F: Future + Unpin,
MkF: FnMut(&T) -> F,
{
type Item = scheduler::Result<F::Output>;
type Item = F::Output;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let slots = this.slots;
let scheduler = &mut this.scheduler;
let has_active_slots = match slots.poll_next_unpin(cx) {
Poll::Ready(Some(result)) => return Poll::Ready(Some(Ok(result))),
Poll::Ready(Some(result)) => return Poll::Ready(Some(result)),
Poll::Ready(None) => false,
Poll::Pending => true,
};
Expand All @@ -63,15 +63,14 @@ where
.hold_unless(|msg| !slots.contains_key(msg))
.poll_next_unpin(cx);
match next_msg_poll {
Poll::Ready(Some(Ok(msg))) => {
Poll::Ready(Some(msg)) => {
let msg_fut = (this.run_msg)(&msg);
assert!(
slots.insert(msg, msg_fut).is_none(),
"Runner tried to replace a running future.. please report this as a kube-rs bug!"
);
cx.waker().wake_by_ref();
}
Poll::Ready(Some(Err(err))) => break Poll::Ready(Some(Err(err))),
Poll::Ready(None) => {
break if has_active_slots {
// We're done listening for new messages, but still have some that
Expand All @@ -93,7 +92,7 @@ mod tests {
use crate::scheduler::{scheduler, ScheduleRequest};
use futures::{
channel::{mpsc, oneshot},
future, poll, SinkExt, TryStreamExt,
future, poll, SinkExt, StreamExt,
};
use std::{cell::RefCell, time::Duration};
use tokio::{
Expand All @@ -118,7 +117,7 @@ mod tests {
drop(mutex_ref);
})
})
.try_for_each(|_| async { Ok(()) }),
.for_each(|_| async {}),
);
sched_tx
.send(ScheduleRequest {
Expand All @@ -135,15 +134,14 @@ mod tests {
})
.await
.unwrap();
let ((), run) = future::join(
future::join(
async {
tokio::time::sleep(Duration::from_secs(5)).await;
drop(sched_tx);
},
runner,
)
.await;
run.unwrap();
// Validate that we actually ran both requests
assert_eq!(count, 2);
}
Expand All @@ -158,7 +156,7 @@ mod tests {
let mut runner = Runner::new(scheduler(sched_rx), |msg: &u8| futures::future::ready(*msg));
// Start a background task that starts listening /before/ we enqueue the message
// We can't just use Stream::poll_next(), since that bypasses the waker system
Handle::current().spawn(async move { result_tx.send(runner.try_next().await.unwrap()).unwrap() });
Handle::current().spawn(async move { result_tx.send(runner.next().await).unwrap() });
// Ensure that the background task actually gets to initiate properly, and starts polling the runner
yield_now().await;
sched_tx
Expand Down
37 changes: 12 additions & 25 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,9 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use thiserror::Error;
use tokio::time::Instant;
use tokio_util::time::delay_queue::{self, DelayQueue};

#[derive(Debug, Error)]
pub enum Error {}
pub type Result<T, E = Error> = std::result::Result<T, E>;

/// A request to re-emit `message` at a given `Instant` (`run_at`).
#[derive(Debug)]
pub struct ScheduleRequest<T> {
Expand Down Expand Up @@ -127,7 +122,7 @@ where
R: Stream<Item = ScheduleRequest<T>>,
C: Fn(&T) -> bool + Unpin,
{
type Item = Result<T>;
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
Expand All @@ -143,7 +138,7 @@ where
}

match scheduler.poll_pop_queue_message(cx, &can_take_message) {
Poll::Ready(expired) => Poll::Ready(Some(Ok(expired))),
Poll::Ready(expired) => Poll::Ready(Some(expired)),
Poll::Pending => Poll::Pending,
}
}
Expand Down Expand Up @@ -183,7 +178,7 @@ where
T: Eq + Hash + Clone,
R: Stream<Item = ScheduleRequest<T>>,
{
type Item = Result<T>;
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.hold_unless(|_| true)).poll_next(cx)
Expand Down Expand Up @@ -235,9 +230,7 @@ mod tests {
assert!(poll!(scheduler.as_mut().hold_unless(|_| false).next()).is_pending());
assert!(scheduler.contains_pending(&1));
assert_eq!(
unwrap_poll(poll!(scheduler.as_mut().hold_unless(|_| true).next()))
.unwrap()
.unwrap(),
unwrap_poll(poll!(scheduler.as_mut().hold_unless(|_| true).next())).unwrap(),
1_u8
);
assert!(!scheduler.contains_pending(&1));
Expand Down Expand Up @@ -268,7 +261,7 @@ mod tests {
drop(tx);
},
async {
assert_eq!(scheduler.next().await.unwrap().unwrap(), 1);
assert_eq!(scheduler.next().await.unwrap(), 1);
assert!(scheduler.next().await.is_none())
},
)
Expand All @@ -291,13 +284,7 @@ mod tests {
.on_complete(sleep(Duration::from_secs(2))),
));
assert_eq!(
scheduler
.as_mut()
.hold_unless(|x| *x != 1)
.next()
.await
.unwrap()
.unwrap(),
scheduler.as_mut().hold_unless(|x| *x != 1).next().await.unwrap(),
2
);
}
Expand All @@ -321,10 +308,10 @@ mod tests {
pin_mut!(scheduler);
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), 1);
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap(), 1);
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), 2);
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap(), 2);
// Stream has terminated
assert!(scheduler.next().await.is_none());
}
Expand All @@ -348,7 +335,7 @@ mod tests {
pin_mut!(scheduler);
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
scheduler.next().now_or_never().unwrap().unwrap().unwrap();
scheduler.next().now_or_never().unwrap().unwrap();
// Stream has terminated
assert!(scheduler.next().await.is_none());
}
Expand All @@ -372,7 +359,7 @@ mod tests {
pin_mut!(scheduler);
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
scheduler.next().now_or_never().unwrap().unwrap().unwrap();
scheduler.next().now_or_never().unwrap().unwrap();
// Stream has terminated
assert!(scheduler.next().await.is_none());
}
Expand All @@ -391,7 +378,7 @@ mod tests {
.unwrap();
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
scheduler.next().now_or_never().unwrap().unwrap().unwrap();
scheduler.next().now_or_never().unwrap().unwrap();
assert!(poll!(scheduler.next()).is_pending());
schedule_tx
.send(ScheduleRequest {
Expand All @@ -402,7 +389,7 @@ mod tests {
.unwrap();
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
scheduler.next().now_or_never().unwrap().unwrap().unwrap();
scheduler.next().now_or_never().unwrap().unwrap();
assert!(poll!(scheduler.next()).is_pending());
}
}

0 comments on commit f2175a3

Please sign in to comment.