Skip to content

Commit

Permalink
wip tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziosestito committed May 15, 2024
1 parent cf2eb27 commit 2797cbf
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 14 deletions.
58 changes: 50 additions & 8 deletions kube-runtime/src/reflector/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ pub(crate) mod test {
let st = stream::iter([
Ok(Event::Applied(foo.clone())),
Err(Error::TooManyObjects),
Ok(Event::Restarted(vec![foo, bar])),
Ok(Event::RestartedStart),
Ok(Event::RestartedPage(vec![foo, bar])),
Ok(Event::RestartedDone),
]);

let (reader, writer) = reflector::store_shared(10);
Expand All @@ -190,7 +192,18 @@ pub(crate) mod test {
assert_eq!(reader.len(), 1);

let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Restarted(_))))));
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::RestartedStart)))));
assert_eq!(reader.len(), 1);

let restarted = poll!(reflect.next());
assert!(matches!(
restarted,
Poll::Ready(Some(Ok(Event::RestartedPage(_))))
));
assert_eq!(reader.len(), 1);

let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::RestartedDone)))));
assert_eq!(reader.len(), 2);

assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
Expand All @@ -209,7 +222,9 @@ pub(crate) mod test {
Ok(Event::Deleted(foo.clone())),
Ok(Event::Applied(foo.clone())),
Err(Error::TooManyObjects),
Ok(Event::Restarted(vec![foo.clone(), bar.clone()])),
Ok(Event::RestartedStart),
Ok(Event::RestartedPage(vec![foo.clone(), bar.clone()])),
Ok(Event::RestartedDone),
]);

let foo = Arc::new(foo);
Expand Down Expand Up @@ -242,10 +257,22 @@ pub(crate) mod test {
assert!(matches!(poll!(subscriber.next()), Poll::Pending));

// Restart event will yield all objects in the list

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restarted(_))))
Poll::Ready(Some(Ok(Event::RestartedStart)))
));

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartedPage(_))))
));

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartedDone)))
));

assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));

Expand All @@ -262,7 +289,9 @@ pub(crate) mod test {
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Applied(foo.clone())),
Ok(Event::Restarted(vec![foo.clone(), bar.clone()])),
Ok(Event::RestartedStart),
Ok(Event::RestartedPage(vec![foo.clone(), bar.clone()])),
Ok(Event::RestartedDone),
]);

let foo = Arc::new(foo);
Expand All @@ -284,9 +313,20 @@ pub(crate) mod test {
//
// First, subscribers should be pending.
assert_eq!(poll!(subscriber.next()), Poll::Pending);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartedStart)))
));

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartedPage(_))))
));

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restarted(_))))
Poll::Ready(Some(Ok(Event::RestartedDone)))
));
drop(reflect);

Expand All @@ -306,7 +346,9 @@ pub(crate) mod test {
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Applied(foo.clone())),
Ok(Event::Restarted(vec![foo.clone(), bar.clone()])),
Ok(Event::RestartedStart),
Ok(Event::RestartedPage(vec![foo.clone(), bar.clone()])),
Ok(Event::RestartedDone),
]);

let foo = Arc::new(foo);
Expand Down Expand Up @@ -354,7 +396,7 @@ pub(crate) mod test {
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restarted(_))))
Poll::Ready(Some(Ok(Event::RestartedPage(_))))
));
// Poll again to drain the queue.
assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
Expand Down
7 changes: 5 additions & 2 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use crate::watcher;
use async_stream::stream;
use futures::{Stream, StreamExt};
use std::hash::Hash;
#[cfg(feature = "unstable-runtime-subscribe")] pub use store::store_shared;
#[cfg(feature = "unstable-runtime-subscribe")]
pub use store::store_shared;
pub use store::{store, Store};

/// Cache objects from a [`watcher()`] stream into a local [`Store`]
Expand Down Expand Up @@ -245,7 +246,9 @@ mod tests {
store_w,
stream::iter(vec![
Ok(watcher::Event::Applied(cm_a.clone())),
Ok(watcher::Event::Restarted(vec![cm_b.clone()])),
Ok(watcher::Event::RestartedStart),
Ok(watcher::Event::RestartedPage(vec![cm_b.clone()])),
Ok(watcher::Event::RestartedDone),
]),
)
.map(|_| ())
Expand Down
4 changes: 2 additions & 2 deletions kube-runtime/src/utils/event_modify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub(crate) mod test {
let st = stream::iter([
Ok(Event::Applied(0)),
Err(Error::TooManyObjects),
Ok(Event::Restarted(vec![10])),
Ok(Event::RestartedPage(vec![10])),
]);
let mut ev_modify = pin!(EventModify::new(st, |x| {
*x += 1;
Expand All @@ -75,7 +75,7 @@ pub(crate) mod test {
let restarted = poll!(ev_modify.next());
assert!(matches!(
restarted,
Poll::Ready(Some(Ok(Event::Restarted(vec)))) if vec == [11]
Poll::Ready(Some(Ok(Event::RestartedPage(vec)))) if vec == [11]
));

assert!(matches!(poll!(ev_modify.next()), Poll::Ready(None)));
Expand Down
21 changes: 19 additions & 2 deletions kube-runtime/src/utils/reflect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ pub(crate) mod test {
let st = stream::iter([
Ok(Event::Applied(foo.clone())),
Err(Error::TooManyObjects),
Ok(Event::Restarted(vec![foo, bar])),
Ok(Event::RestartedStart),
Ok(Event::RestartedPage(vec![foo, bar])),
Ok(Event::RestartedDone),
]);
let (reader, writer) = reflector::store();

Expand All @@ -93,8 +95,23 @@ pub(crate) mod test {
));
assert_eq!(reader.len(), 1);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartedStart)))
));
assert_eq!(reader.len(), 1);

let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Restarted(_))))));
assert!(matches!(
restarted,
Poll::Ready(Some(Ok(Event::RestartedPage(_))))
));
assert_eq!(reader.len(), 1);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartedDone)))
));
assert_eq!(reader.len(), 2);

assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
Expand Down

0 comments on commit 2797cbf

Please sign in to comment.