Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce buffering between watcher and Store #1494

Merged
Merged
2 changes: 1 addition & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ rust.unsafe_code = "forbid"
#rust.missing_docs = "warn"

[dependencies]
futures.workspace = true
futures = { workspace = true, features = ["async-await"] }
kube-client = { path = "../kube-client", version = "=0.91.0", default-features = false, features = ["jsonpatch", "client"] }
derivative.workspace = true
serde.workspace = true
Expand Down
5 changes: 3 additions & 2 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1753,7 +1753,7 @@ mod tests {
|obj, _| {
Box::pin(async move {
// Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
println!("reconciling {:?}", obj.metadata.name);
//println!("reconciling {:?}", obj.metadata.name);
Ok(Action::requeue(Duration::ZERO))
})
},
Expand All @@ -1763,6 +1763,7 @@ mod tests {
queue_rx.map(Result::<_, Infallible>::Ok),
Config::default(),
));
store_tx.apply_watcher_event(&watcher::Event::Restart);
for i in 0..items {
let obj = ConfigMap {
metadata: ObjectMeta {
Expand All @@ -1772,7 +1773,7 @@ mod tests {
},
..Default::default()
};
store_tx.apply_watcher_event(&watcher::Event::Applied(obj.clone()));
store_tx.apply_watcher_event(&watcher::Event::Apply(obj.clone()));
queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
}

Expand Down
104 changes: 75 additions & 29 deletions kube-runtime/src/reflector/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,11 @@ pub(crate) mod test {
let foo = testpod("foo");
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Applied(foo.clone())),
Ok(Event::Apply(foo.clone())),
Err(Error::TooManyObjects),
Ok(Event::Restarted(vec![foo, bar])),
Ok(Event::RestartInit),
Ok(Event::RestartPage(vec![foo, bar])),
Ok(Event::Restart),
]);

let (reader, writer) = reflector::store_shared(10);
Expand All @@ -178,7 +180,7 @@ pub(crate) mod test {
assert_eq!(reader.len(), 0);
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Applied(_))))
Poll::Ready(Some(Ok(Event::Apply(_))))
));

// Make progress and assert all events are seen
Expand All @@ -190,7 +192,15 @@ pub(crate) mod test {
assert_eq!(reader.len(), 1);

let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Restarted(_))))));
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::RestartInit)))));
assert_eq!(reader.len(), 1);

let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::RestartPage(_))))));
assert_eq!(reader.len(), 1);

let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Restart)))));
assert_eq!(reader.len(), 2);

assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
Expand All @@ -206,10 +216,12 @@ pub(crate) mod test {
let foo = testpod("foo");
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Deleted(foo.clone())),
Ok(Event::Applied(foo.clone())),
Ok(Event::Delete(foo.clone())),
Ok(Event::Apply(foo.clone())),
Err(Error::TooManyObjects),
Ok(Event::Restarted(vec![foo.clone(), bar.clone()])),
Ok(Event::RestartInit),
Ok(Event::RestartPage(vec![foo.clone(), bar.clone()])),
Ok(Event::Restart),
]);

let foo = Arc::new(foo);
Expand All @@ -224,13 +236,13 @@ pub(crate) mod test {
// Deleted events should be skipped by subscriber.
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Deleted(_))))
Poll::Ready(Some(Ok(Event::Delete(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Pending);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Applied(_))))
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));

Expand All @@ -242,12 +254,25 @@ pub(crate) mod test {
assert!(matches!(poll!(subscriber.next()), Poll::Pending));

// Restart event will yield all objects in the list

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restarted(_))))
Poll::Ready(Some(Ok(Event::RestartInit)))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartPage(_))))
));

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restart)))
));

// these don't come back in order atm:
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));

// When main channel is closed, it is propagated to subscribers
assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
Expand All @@ -261,12 +286,14 @@ pub(crate) mod test {
let foo = testpod("foo");
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Applied(foo.clone())),
Ok(Event::Restarted(vec![foo.clone(), bar.clone()])),
Ok(Event::Apply(foo.clone())),
Ok(Event::RestartInit),
Ok(Event::RestartPage(vec![foo.clone(), bar.clone()])),
Ok(Event::Restart),
]);

let foo = Arc::new(foo);
let bar = Arc::new(bar);
let _bar = Arc::new(bar);

let (_, writer) = reflector::store_shared(10);
let subscriber = writer.subscribe().unwrap();
Expand All @@ -275,7 +302,7 @@ pub(crate) mod test {

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Applied(_))))
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));

Expand All @@ -284,14 +311,28 @@ pub(crate) mod test {
//
// First, subscribers should be pending.
assert_eq!(poll!(subscriber.next()), Poll::Pending);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restarted(_))))
Poll::Ready(Some(Ok(Event::RestartInit)))
));
assert_eq!(poll!(subscriber.next()), Poll::Pending);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartPage(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Pending);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restart)))
));
drop(reflect);

assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
// we will get foo and bar here, but we dont have a guaranteed ordering on page events
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
}

Expand All @@ -305,8 +346,9 @@ pub(crate) mod test {
let foo = testpod("foo");
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Applied(foo.clone())),
Ok(Event::Restarted(vec![foo.clone(), bar.clone()])),
Ok(Event::Apply(foo.clone())),
Ok(Event::Apply(bar.clone())),
Ok(Event::Apply(foo.clone())),
]);

let foo = Arc::new(foo);
Expand All @@ -325,13 +367,14 @@ pub(crate) mod test {

// Poll first subscriber, but not the second.
//
// The buffer can hold one value, so even if we have a slow subscriber,
// The buffer can hold one object value, so even if we have a slow subscriber,
// we will still get an event from the root.
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Applied(_))))
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));

// One subscriber is not reading, so we need to apply backpressure until
// channel has capacity.
//
Expand All @@ -348,18 +391,21 @@ pub(crate) mod test {

// We now have room for only one more item. In total, the previous event
// had two. We repeat the same pattern.
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
assert!(matches!(poll!(reflect.next()), Poll::Pending));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert!(matches!(poll!(reflect.next()), Poll::Pending));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone())));
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restarted(_))))
Poll::Ready(Some(Ok(Event::Apply(_))))
));
// Poll again to drain the queue.
assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone())));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));

assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(None));
Expand Down
29 changes: 14 additions & 15 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,10 @@ mod tests {
},
..ConfigMap::default()
};
reflector(
store_w,
stream::iter(vec![Ok(watcher::Event::Applied(cm.clone()))]),
)
.map(|_| ())
.collect::<()>()
.await;
reflector(store_w, stream::iter(vec![Ok(watcher::Event::Apply(cm.clone()))]))
.map(|_| ())
.collect::<()>()
.await;
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
}

Expand All @@ -189,8 +186,8 @@ mod tests {
reflector(
store_w,
stream::iter(vec![
Ok(watcher::Event::Applied(cm.clone())),
Ok(watcher::Event::Applied(updated_cm.clone())),
Ok(watcher::Event::Apply(cm.clone())),
Ok(watcher::Event::Apply(updated_cm.clone())),
]),
)
.map(|_| ())
Expand All @@ -213,8 +210,8 @@ mod tests {
reflector(
store_w,
stream::iter(vec![
Ok(watcher::Event::Applied(cm.clone())),
Ok(watcher::Event::Deleted(cm.clone())),
Ok(watcher::Event::Apply(cm.clone())),
Ok(watcher::Event::Delete(cm.clone())),
]),
)
.map(|_| ())
Expand Down Expand Up @@ -244,8 +241,10 @@ mod tests {
reflector(
store_w,
stream::iter(vec![
Ok(watcher::Event::Applied(cm_a.clone())),
Ok(watcher::Event::Restarted(vec![cm_b.clone()])),
Ok(watcher::Event::Apply(cm_a.clone())),
Ok(watcher::Event::RestartInit),
Ok(watcher::Event::RestartPage(vec![cm_b.clone()])),
Ok(watcher::Event::Restart),
]),
)
.map(|_| ())
Expand Down Expand Up @@ -276,9 +275,9 @@ mod tests {
..ConfigMap::default()
};
Ok(if deleted {
watcher::Event::Deleted(obj)
watcher::Event::Delete(obj)
} else {
watcher::Event::Applied(obj)
watcher::Event::Apply(obj)
})
})),
)
Expand Down
Loading
Loading