From 54f06cd0af9d9c051aa4c189ea1b6784b6bfac0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 26 Feb 2020 12:57:56 +0100 Subject: [PATCH 1/2] Always reset the Informer version to 0 Takes "approach 2" from https://github.com/clux/kube-rs/issues/134#issuecomment-589026965, see the noted caveats. Deletes Informer::init, since it is no longer useful. --- CHANGELOG.md | 2 ++ src/runtime/informer.rs | 34 +++++----------------------------- 2 files changed, 7 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 91859a578..724b805de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ 0.27.0 / 2020-02-XX =================== * `Reflector` + `Informer` moved from `kube::api` to `kube::runtime` + * `Informer` now resets the version to 0 rather than dropping events - #134 + * Removed `Informer::init`, since it is now a no-op when building the `Informer` 0.26.0 / 2020-02-25 =================== diff --git a/src/runtime/informer.rs b/src/runtime/informer.rs index 56ed3b2ed..615f6b775 100644 --- a/src/runtime/informer.rs +++ b/src/runtime/informer.rs @@ -105,15 +105,6 @@ where // finalizers: - /// Initialize without a prior version - /// - /// Will seed resourceVersion with a 1 limit list call to the resource - pub async fn init(self) -> Result { - info!("Starting Informer for {:?}", self.resource); - self.reset().await?; - Ok(self) - } - /// Initialize from a prior version pub fn init_from(self, v: String) -> Self { info!("Recreating Informer for {:?} at {}", self.resource, v); @@ -212,11 +203,11 @@ where } } - /// Reset the resourceVersion to latest - pub async fn reset(&self) -> Result<()> { - let latest = self.get_resource_version().await?; - *self.version.lock().await = latest; - Ok(()) + /// Reset the resourceVersion to 0 + /// + /// Note: This will cause duplicate Added events for all existing resources + pub async fn reset(&self) { + *self.version.lock().await = 0.to_string(); } /// Return the current version @@ -225,19 +216,4 @@ where // to get a lock on our version futures::executor::block_on(async { self.version.lock().await.clone() }) } - - /// Init helper - async fn get_resource_version(&self) -> Result { - let req = self.resource.list_zero_resource_entries(&self.params)?; - - // parse to void a ResourceList into void except for Metadata - let res = self.client.request::>(req).await?; - - let version = res.metadata.resourceVersion.unwrap_or_else(|| "0".into()); - debug!( - "Got fresh resourceVersion={} for {}", - version, self.resource.resource - ); - Ok(version) - } } From 127da76b9a2191bb1aeb1ec89a088f5d484dd8fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 26 Feb 2020 14:47:35 +0100 Subject: [PATCH 2/2] Fixed build errors --- examples/event_informer.rs | 2 +- examples/node_informer.rs | 4 +--- examples/pod_informer.rs | 2 +- src/runtime/informer.rs | 6 +++--- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/examples/event_informer.rs b/examples/event_informer.rs index 05514d1c1..74ed64afa 100644 --- a/examples/event_informer.rs +++ b/examples/event_informer.rs @@ -16,7 +16,7 @@ async fn main() -> anyhow::Result<()> { let client = APIClient::new(config); let events = Api::v1Event(client); - let ei = Informer::new(events).init().await?; + let ei = Informer::new(events); loop { let mut events = ei.poll().await?.boxed(); diff --git a/examples/node_informer.rs b/examples/node_informer.rs index 64be6e21d..eeaad26f4 100644 --- a/examples/node_informer.rs +++ b/examples/node_informer.rs @@ -20,10 +20,8 @@ async fn main() -> anyhow::Result<()> { let nodes = RawApi::v1Node(); let events = Api::v1Event(client.clone()); - let ni = Informer::raw(client.clone(), nodes) + let ni = Informer::raw(client.clone(), nodes); //.labels("beta.kubernetes.io/os=linux") - .init() - .await?; loop { let mut nodes = ni.poll().await?.boxed(); diff --git a/examples/pod_informer.rs b/examples/pod_informer.rs index b2fb4b2c4..d7b655617 100644 --- a/examples/pod_informer.rs +++ b/examples/pod_informer.rs @@ -19,7 +19,7 @@ async fn main() -> anyhow::Result<()> { let namespace = env::var("NAMESPACE").unwrap_or("default".into()); let resource = Api::v1Pod(client.clone()).within(&namespace); - let inf = Informer::new(resource.clone()).init().await?; + let inf = Informer::new(resource.clone()); loop { let mut pods = inf.poll().await?.boxed(); diff --git a/src/runtime/informer.rs b/src/runtime/informer.rs index 615f6b775..b2a103b2f 100644 --- a/src/runtime/informer.rs +++ b/src/runtime/informer.rs @@ -1,7 +1,7 @@ use crate::{ api::{ - resource::{KubeObject, ObjectList, WatchEvent}, - Api, ListParams, NotUsed, RawApi, + resource::{KubeObject, WatchEvent}, + Api, ListParams, RawApi, }, client::APIClient, Result, @@ -142,7 +142,7 @@ where Delay::new(dur).await; // If we are outside history, start over from latest if *needs_resync { - self.reset().await?; + self.reset().await; } *needs_resync = false; *needs_retry = false;