Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Fabrizio Sestito <[email protected]>
  • Loading branch information
fabriziosestito committed May 13, 2024
1 parent 3170f68 commit 7d8ec65
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 123 deletions.
48 changes: 39 additions & 9 deletions kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ where
K::DynamicType: Eq + Hash + Clone,
{
store: Cache<K>,
buffer: Cache<K>,
dyntype: K::DynamicType,
ready_tx: Option<delayed_init::Initializer<()>>,
ready_rx: Arc<DelayedInit<()>>,
Expand All @@ -39,6 +40,7 @@ where
let (ready_tx, ready_rx) = DelayedInit::new();
Writer {
store: Default::default(),
buffer: Default::default(),
dyntype,
ready_tx: Some(ready_tx),
ready_rx: Arc::new(ready_rx),
Expand All @@ -60,6 +62,7 @@ where
let (ready_tx, ready_rx) = DelayedInit::new();
Writer {
store: Default::default(),
buffer: Default::default(),
dyntype,
ready_tx: Some(ready_tx),
ready_rx: Arc::new(ready_rx),
Expand Down Expand Up @@ -105,39 +108,66 @@ where
let key = obj.to_object_ref(self.dyntype.clone());
self.store.write().remove(&key);
}
watcher::Event::Restarted(new_objs) => {
watcher::Event::RestartedStart => {
let mut buffer = self.buffer.write();
*buffer = AHashMap::new();
}
watcher::Event::RestartedPage(new_objs) => {
let new_objs = new_objs
.iter()
.map(|obj| (obj.to_object_ref(self.dyntype.clone()), Arc::new(obj.clone())))
.collect::<AHashMap<_, _>>();
*self.store.write() = new_objs;
self.buffer.write().extend(new_objs);
}
}
watcher::Event::RestartedDone => {
let mut store = self.store.write();

// Swap the buffer into the store
let mut buffer = self.buffer.write();
std::mem::swap(&mut *store, &mut *buffer);

// Mark as ready after the first event, "releasing" any calls to Store::wait_until_ready()
if let Some(ready_tx) = self.ready_tx.take() {
ready_tx.init(())
// Clear the buffer
*buffer = AHashMap::new();

// Mark as ready after the RestartedDone, "releasing" any calls to Store::wait_until_ready()
if let Some(ready_tx) = self.ready_tx.take() {
ready_tx.init(())
}
}
watcher::Event::RestartedApplied(obj) => {
let key = obj.to_object_ref(self.dyntype.clone());
let obj = Arc::new(obj.clone());
self.buffer.write().insert(key, obj);
}
watcher::Event::RestartedDeleted(obj) => {
let key = obj.to_object_ref(self.dyntype.clone());
self.buffer.write().remove(&key);
}
}
}

/// Broadcast an event to any downstream listeners subscribed on the store
pub(crate) async fn dispatch_event(&mut self, event: &watcher::Event<K>) {
if let Some(ref mut dispatcher) = self.dispatcher {
match event {
watcher::Event::Applied(obj) => {
watcher::Event::Applied(obj) | watcher::Event::RestartedApplied(obj) => {
let obj_ref = obj.to_object_ref(self.dyntype.clone());
// TODO (matei): should this take a timeout to log when backpressure has
// been applied for too long, e.g. 10s
dispatcher.broadcast(obj_ref).await;
}

watcher::Event::Restarted(new_objs) => {
watcher::Event::RestartedPage(new_objs) => {
let obj_refs = new_objs.iter().map(|obj| obj.to_object_ref(self.dyntype.clone()));
for obj_ref in obj_refs {
dispatcher.broadcast(obj_ref).await;
}
}
watcher::Event::Deleted(_) => {}

watcher::Event::RestartedStart
| watcher::Event::RestartedDone
| watcher::Event::Deleted(_)
| watcher::Event::RestartedDeleted(_) => {}
}
}
}
Expand Down
12 changes: 8 additions & 4 deletions kube-runtime/src/utils/event_flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,26 @@ where
if let Some(item) = me.queue.next() {
break Some(Ok(item));
}
break match ready!(me.stream.as_mut().poll_next(cx)) {
Some(Ok(Event::Applied(obj))) => Some(Ok(obj)),
let var_name = match ready!(me.stream.as_mut().poll_next(cx)) {
Some(Ok(
Event::Applied(obj) | Event::RestartedApplied(obj) | Event::RestartedDeleted(obj),
)) => Some(Ok(obj)),
Some(Ok(Event::Deleted(obj))) => {
if *me.emit_deleted {
Some(Ok(obj))
} else {
continue;
}
}
Some(Ok(Event::Restarted(objs))) => {
Some(Ok(Event::RestartedPage(objs))) => {
*me.queue = objs.into_iter();
continue;
}
Some(Ok(Event::RestartedStart | Event::RestartedDone)) => continue,
Some(Err(err)) => Some(Err(err)),
None => return Poll::Ready(None),
};
break var_name;
})
}
}
Expand All @@ -70,7 +74,7 @@ pub(crate) mod tests {
Ok(Event::Applied(1)),
Ok(Event::Deleted(0)),
Ok(Event::Applied(2)),
Ok(Event::Restarted(vec![1, 2])),
Ok(Event::RestartedPage(vec![1, 2])),
Err(Error::TooManyObjects),
Ok(Event::Applied(2)),
]);
Expand Down
Loading

0 comments on commit 7d8ec65

Please sign in to comment.