diff --git a/kube/src/runtime/informer.rs b/kube/src/runtime/informer.rs index 4db298211..5c1694c09 100644 --- a/kube/src/runtime/informer.rs +++ b/kube/src/runtime/informer.rs @@ -20,6 +20,9 @@ use std::{sync::Arc, time::Duration}; /// but we have configured timeouts such that this should not happen frequently. /// /// On boot, the initial watch causes added events for every currently live object. +/// +/// Because of https://github.com/clux/kube-rs/issues/219 we recommend you use this +/// with kubernetes >= 1.16 and watch bookmarks enabled. #[derive(Clone)] pub struct Informer where diff --git a/kube/src/runtime/reflector.rs b/kube/src/runtime/reflector.rs index b394982a7..de0a204dc 100644 --- a/kube/src/runtime/reflector.rs +++ b/kube/src/runtime/reflector.rs @@ -1,12 +1,12 @@ use crate::{ - api::{Api, ListParams, Meta, Resource, WatchEvent}, - runtime::Informer, + api::{Api, ListParams, Meta, WatchEvent}, Error, Result, }; -use futures::{lock::Mutex, StreamExt, TryStreamExt}; +use futures::{future::FutureExt, lock::Mutex, pin_mut, select, TryStreamExt}; use serde::de::DeserializeOwned; +use tokio::{signal, time::delay_for}; -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; /// A reflection of state for a Kubernetes ['Api'] resource /// @@ -26,33 +26,34 @@ where K: Clone + DeserializeOwned + Send + Meta, { state: Arc>>, - informer: Informer, - resource: Resource, + params: ListParams, + api: Api, + needs_resync: Arc>, } impl Reflector where - K: Clone + DeserializeOwned + Meta + Send + Sync, + K: Clone + DeserializeOwned + Meta + Send, { /// Create a reflector on an api resource pub fn new(api: Api) -> Self { Reflector { - resource: api.resource.clone(), - informer: Informer::new(api), + api, + params: ListParams::default(), + needs_resync: Arc::new(Mutex::new(false)), state: Default::default(), } } /// Modify the default watch parameters for the underlying watch pub fn params(mut self, lp: ListParams) -> Self { - self.informer = self.informer.params(lp); + self.params = lp; self } /// Start the reflectors self-driving polling pub async fn run(self) -> Result<()> { - use futures::{future::FutureExt, pin_mut, select}; - use tokio::signal; + self.reset().await?; loop { let signal_fut = signal::ctrl_c().fuse(); // TODO: SIGTERM let stream_fut = self.poll().fuse(); @@ -64,8 +65,11 @@ where }, stream = stream_fut => { if let Err(e) = stream { - error!("Kube state failed to recover: {}", e); - return Err(e); + warn!("Poll error on {}: {}: {:?}", self.api.resource.kind, e, e); + // If desynched due to mismatching resourceVersion, retry in a bit + let dur = Duration::from_secs(10); + delay_for(dur).await; + self.reset().await?; // propagate error if this failed.. } }, complete => continue, // another poll @@ -76,25 +80,42 @@ where /// A single poll call to modify the internal state async fn poll(&self) -> Result<()> { - let kind = &self.resource.kind; - trace!("Polling {}", kind); - let mut stream = self.informer.poll().await?.boxed(); + let kind = &self.api.resource.kind; + let resource_version = self.state.lock().await.version.clone(); + trace!("Polling {} from resourceVersion={}", kind, resource_version); + let stream = self.api.watch(&self.params, &resource_version).await?; + pin_mut!(stream); // For every event, modify our state while let Some(ev) = stream.try_next().await? { let mut state = self.state.lock().await; + match &ev { + WatchEvent::Added(o) + | WatchEvent::Modified(o) + | WatchEvent::Deleted(o) + | WatchEvent::Bookmark(o) => { + // always store the last seen resourceVersion + if let Some(nv) = Meta::resource_ver(o) { + trace!("Updating reflector version for {} to {}", kind, nv); + state.version = nv.clone(); + } + } + _ => {} + } + + let data = &mut state.data; match ev { WatchEvent::Added(o) => { debug!("Adding {} to {}", Meta::name(&o), kind); - state.entry(ObjectId::key_for(&o)).or_insert_with(|| o.clone()); + data.entry(ObjectId::key_for(&o)).or_insert_with(|| o.clone()); } WatchEvent::Modified(o) => { debug!("Modifying {} in {}", Meta::name(&o), kind); - state.entry(ObjectId::key_for(&o)).and_modify(|e| *e = o.clone()); + data.entry(ObjectId::key_for(&o)).and_modify(|e| *e = o.clone()); } WatchEvent::Deleted(o) => { debug!("Removing {} from {}", Meta::name(&o), kind); - state.remove(&ObjectId::key_for(&o)); + data.remove(&ObjectId::key_for(&o)); } WatchEvent::Bookmark(o) => { debug!("Bookmarking {} from {}", Meta::name(&o), kind); @@ -108,12 +129,52 @@ where Ok(()) } + /// Reset the state of the underlying informer and clear the cache + pub async fn reset(&self) -> Result<()> { + trace!("Resetting {}", self.api.resource.kind); + // Simplified for k8s >= 1.16 + //*self.state.lock().await = Default::default(); + //self.informer.reset().await + + // For now: + let (data, version) = self.get_full_resource_entries().await?; + *self.state.lock().await = State { data, version }; + Ok(()) + } + + /// Legacy helper for kubernetes < 1.16 + /// + /// Needed to do an initial list operation because of https://github.com/clux/kube-rs/issues/219 + /// Soon, this goes away as we drop support for k8s < 1.16 + async fn get_full_resource_entries(&self) -> Result<(Cache, String)> { + let res = self.api.list(&self.params).await?; + let version = res.metadata.resource_version.unwrap_or_default(); + trace!( + "Got {} {} at resourceVersion={:?}", + res.items.len(), + self.api.resource.kind, + version + ); + let mut data = BTreeMap::new(); + for i in res.items { + // The non-generic parts we care about are spec + status + data.insert(ObjectId::key_for(&i), i); + } + let keys = data + .keys() + .map(ObjectId::to_string) + .collect::>() + .join(", "); + debug!("Initialized with: [{}]", keys); + Ok((data, version)) + } + /// Read data for users of the reflector /// /// This is instant if you are reading and writing from the same context. pub async fn state(&self) -> Result> { let state = self.state.lock().await; - Ok(state.values().cloned().collect::>()) + Ok(state.data.values().cloned().collect::>()) } /// Read a single entry by name @@ -124,10 +185,10 @@ where pub async fn get(&self, name: &str) -> Result> { let id = ObjectId { name: name.into(), - namespace: self.resource.namespace.clone(), + namespace: self.api.resource.namespace.clone(), }; - Ok(self.state.lock().await.get(&id).map(Clone::clone)) + Ok(self.state.lock().await.data.get(&id).map(Clone::clone)) } /// Read a single entry by name within a specific namespace @@ -139,14 +200,7 @@ where name: name.into(), namespace: Some(ns.into()), }; - Ok(self.state.lock().await.get(&id).map(Clone::clone)) - } - - /// Reset the state of the underlying informer and clear the cache - pub async fn reset(&self) { - trace!("Resetting {}", self.resource.kind); - *self.state.lock().await = Default::default(); - self.informer.reset().await + Ok(self.state.lock().await.data.get(&id).map(Clone::clone)) } } @@ -177,5 +231,21 @@ impl ObjectId { } } +/// Internal shared state of Reflector +/// +/// Can remove this in k8s >= 1.16 once this uses Informer +struct State { + data: Cache, + version: String, +} + +impl Default for State { + fn default() -> Self { + State { + data: Default::default(), + version: 0.to_string(), + } + } +} /// Internal representation for Reflector -type State = BTreeMap; +type Cache = BTreeMap;