From 26370566462fcb84f5e41744649298302b5fe155 Mon Sep 17 00:00:00 2001 From: clux Date: Wed, 8 Apr 2020 21:27:16 +0100 Subject: [PATCH] experiment with a Store parameter for Reflectors - #102 in an effort to solve #102 we try to factor out the implicit data store inside Reflector as a trait object that can be customized. The abstraction, feels very rigid, and not sure how useable it is. Ultimately, I wanted a nicer interface for users of library that wasnt "give me a CLONE of ALL state" or "give me one thing with a matching ID". It's potentially possible to take type that implemented iterator with this approach, as well as MAYBE handing out an arc'd clone to users (provided they are told not to keep it locked for ages). Another problem I wanted to solve was having the stored state be a simplified version of the underlying data (i.e. add/modify would perform some kind of transformation before storing it). This was super awkward with the old Reflector setup because you'd effectively force two whole clones of the two types, and you'd have to recreate your shadow tree every time you asked for the state (which was ALL of the input). --- kube/src/runtime/reflector.rs | 153 ++++++++++++++++++---------------- 1 file changed, 82 insertions(+), 71 deletions(-) diff --git a/kube/src/runtime/reflector.rs b/kube/src/runtime/reflector.rs index caf076e57..b70e40b68 100644 --- a/kube/src/runtime/reflector.rs +++ b/kube/src/runtime/reflector.rs @@ -11,6 +11,9 @@ use tokio::{ use std::{collections::BTreeMap, sync::Arc, time::Duration}; +/// A Reflector with a default MapCache +pub type Reflector = GenericReflector>; + /// A reflection of state for a Kubernetes ['Api'] resource /// /// This builds on top of the ['Informer'] by tracking the events received, @@ -20,29 +23,33 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; /// It is prone to the same desync problems as an informer, but it will self-heal, /// as best as possible - though this means that you might occasionally see a full /// reset (boot equivalent) when network issues are encountered. -/// During a reset, the state is cleared before it is rebuilt. +/// During a reset, the state is cleared and rebuilt in an atomic operation. /// /// The internal state is exposed readably through a getter. #[derive(Clone)] -pub struct Reflector +pub struct GenericReflector where K: Clone + DeserializeOwned + Meta, + S: Store + Default, { - state: Arc>>, + store: Arc>, + version: Arc>, params: ListParams, api: Api, } -impl Reflector +impl GenericReflector where K: Clone + DeserializeOwned + Meta, + S: Store + Default, { /// Create a reflector on an api resource pub fn new(api: Api) -> Self { Reflector { api, params: ListParams::default(), - state: Default::default(), + version: Arc::new(Mutex::new(0.to_string())), + store: Default::default(), } } @@ -94,14 +101,13 @@ where /// A single poll call to modify the internal state async fn poll(&self) -> Result<()> { let kind = &self.api.resource.kind; - let resource_version = self.state.lock().await.version.clone(); + let resource_version = self.version.lock().await.clone(); trace!("Polling {} from resourceVersion={}", kind, resource_version); let stream = self.api.watch(&self.params, &resource_version).await?; pin_mut!(stream); // For every event, modify our state while let Some(ev) = stream.try_next().await? { - let mut state = self.state.lock().await; // Informer-like version tracking: match &ev { WatchEvent::Added(o) @@ -111,49 +117,38 @@ where // always store the last seen resourceVersion if let Some(nv) = Meta::resource_ver(o) { trace!("Updating reflector version for {} to {}", kind, nv); - state.version = nv.clone(); + *self.version.lock().await = nv.clone(); } } - _ => {} + WatchEvent::Error(e) => { + warn!("Failed to watch {}: {:?}", kind, e); + return Err(Error::Api(e.to_owned())); + } } - let data = &mut state.data; // Core Reflector logic + let mut store = self.store.lock().await; match ev { - WatchEvent::Added(o) => { - debug!("Adding {} to {}", Meta::name(&o), kind); - data.entry(ObjectId::key_for(&o)).or_insert_with(|| o.clone()); - } - WatchEvent::Modified(o) => { - debug!("Modifying {} in {}", Meta::name(&o), kind); - data.entry(ObjectId::key_for(&o)).and_modify(|e| *e = o.clone()); - } - WatchEvent::Deleted(o) => { - debug!("Removing {} from {}", Meta::name(&o), kind); - data.remove(&ObjectId::key_for(&o)); - } - WatchEvent::Bookmark(o) => { - debug!("Bookmarking {} from {}", Meta::name(&o), kind); - } - WatchEvent::Error(e) => { - warn!("Failed to watch {}: {:?}", kind, e); - return Err(Error::Api(e)); - } + WatchEvent::Added(o) => store.add(o), + WatchEvent::Modified(o) => store.modify(o), + WatchEvent::Deleted(o) => store.delete(o), + _ => {} } } Ok(()) } - /// Reset the state of the underlying informer and clear the cache - pub async fn reset(&self) -> Result<()> { + /// Reset the resource version and clear cache + async fn reset(&self) -> Result<()> { trace!("Resetting {}", self.api.resource.kind); - // Simplified for k8s >= 1.16 - //*self.state.lock().await = Default::default(); - //self.informer.reset().await - // For now: let (data, version) = self.get_full_resource_entries().await?; - *self.state.lock().await = State { data, version }; + *self.version.lock().await = version; + let mut store = self.store.lock().await; + store.clear(); + for d in data { + store.add(d); + } Ok(()) } @@ -161,35 +156,25 @@ where /// /// Needed to do an initial list operation because of https://github.com/clux/kube-rs/issues/219 /// Soon, this goes away as we drop support for k8s < 1.16 - async fn get_full_resource_entries(&self) -> Result<(Cache, String)> { + async fn get_full_resource_entries(&self) -> Result<(Vec, String)> { let res = self.api.list(&self.params).await?; + debug!("Initializing {}", K::KIND); let version = res.metadata.resource_version.unwrap_or_default(); - trace!( - "Got {} {} at resourceVersion={:?}", + debug!( + "Initialized {} with {} objects at {}", + K::KIND, res.items.len(), - self.api.resource.kind, version ); - let mut data = BTreeMap::new(); - for i in res.items { - // The non-generic parts we care about are spec + status - data.insert(ObjectId::key_for(&i), i); - } - let keys = data - .keys() - .map(ObjectId::to_string) - .collect::>() - .join(", "); - debug!("Initialized with: [{}]", keys); - Ok((data, version)) + Ok((res.items, version)) } /// Read data for users of the reflector /// /// This is instant if you are reading and writing from the same context. pub async fn state(&self) -> Result> { - let state = self.state.lock().await; - Ok(state.data.values().cloned().collect::>()) + let store = self.store.lock().await; + Ok(store.values()) } /// Read a single entry by name @@ -203,7 +188,7 @@ where namespace: self.api.resource.namespace.clone(), }; - Ok(self.state.lock().await.data.get(&id).map(Clone::clone)) + Ok(self.store.lock().await.get(&id).map(Clone::clone)) } /// Read a single entry by name within a specific namespace @@ -215,7 +200,7 @@ where name: name.into(), namespace: Some(ns.into()), }; - Ok(self.state.lock().await.data.get(&id).map(Clone::clone)) + Ok(self.store.lock().await.get(&id).map(Clone::clone)) } } @@ -223,7 +208,7 @@ where /// /// This is an internal subset of ['k8s_openapi::api::core::v1::ObjectReference'] #[derive(Ord, PartialOrd, Hash, Eq, PartialEq, Clone)] -struct ObjectId { +pub struct ObjectId { name: String, namespace: Option, } @@ -246,21 +231,47 @@ impl ObjectId { } } -/// Internal shared state of Reflector -/// -/// Can remove this in k8s >= 1.16 once this uses Informer -struct State { - data: Cache, - version: String, +/// A store that can be plugged into a Reflector +pub trait Store { + fn clear(&mut self); + fn get(&self, id: &ObjectId) -> Option<&K>; + fn add(&mut self, k: K); + fn values(&self) -> Vec; + fn modify(&mut self, k: K); + fn delete(&mut self, k: K); } -impl Default for State { - fn default() -> Self { - State { - data: Default::default(), - version: 0.to_string(), - } +/// Default Store for a Reflector +pub type MapCache = BTreeMap; + +impl Store for MapCache +where + K: Clone + DeserializeOwned + Meta, +{ + fn clear(&mut self) { + self.clear() + } + + fn get(&self, id: &ObjectId) -> Option<&K> { + self.get(id) + } + + fn values(&self) -> Vec { + self.values().cloned().collect() + } + + fn add(&mut self, k: K) { + debug!("Adding {} to {}", Meta::name(&k), K::KIND); + self.entry(ObjectId::key_for(&k)).or_insert_with(|| k); + } + + fn modify(&mut self, k: K) { + debug!("Modifying {} in {}", Meta::name(&k), K::KIND); + self.entry(ObjectId::key_for(&k)).and_modify(|e| *e = k); + } + + fn delete(&mut self, k: K) { + debug!("Removing {} from {}", Meta::name(&k), K::KIND); + self.remove(&ObjectId::key_for(&k)); } } -/// Internal representation for Reflector -type Cache = BTreeMap;