Skip to content

Commit

Permalink
Merge pull request #149 from Appva/feature/134-fix-informer-reset
Browse files Browse the repository at this point in the history
Always reset the Informer version to 0
  • Loading branch information
clux authored Feb 26, 2020
2 parents d2f7bdf + 127da76 commit a12ee46
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 37 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
0.27.0 / 2020-02-XX
===================
* `Reflector` + `Informer` moved from `kube::api` to `kube::runtime`
* `Informer` now resets the version to 0 rather than dropping events - #134
* Removed `Informer::init`, since it is now a no-op when building the `Informer`

0.26.0 / 2020-02-25
===================
Expand Down
2 changes: 1 addition & 1 deletion examples/event_informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() -> anyhow::Result<()> {
let client = APIClient::new(config);

let events = Api::v1Event(client);
let ei = Informer::new(events).init().await?;
let ei = Informer::new(events);

loop {
let mut events = ei.poll().await?.boxed();
Expand Down
4 changes: 1 addition & 3 deletions examples/node_informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ async fn main() -> anyhow::Result<()> {

let nodes = RawApi::v1Node();
let events = Api::v1Event(client.clone());
let ni = Informer::raw(client.clone(), nodes)
let ni = Informer::raw(client.clone(), nodes);
//.labels("beta.kubernetes.io/os=linux")
.init()
.await?;

loop {
let mut nodes = ni.poll().await?.boxed();
Expand Down
2 changes: 1 addition & 1 deletion examples/pod_informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn main() -> anyhow::Result<()> {
let namespace = env::var("NAMESPACE").unwrap_or("default".into());

let resource = Api::v1Pod(client.clone()).within(&namespace);
let inf = Informer::new(resource.clone()).init().await?;
let inf = Informer::new(resource.clone());

loop {
let mut pods = inf.poll().await?.boxed();
Expand Down
40 changes: 8 additions & 32 deletions src/runtime/informer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
api::{
resource::{KubeObject, ObjectList, WatchEvent},
Api, ListParams, NotUsed, RawApi,
resource::{KubeObject, WatchEvent},
Api, ListParams, RawApi,
},
client::APIClient,
Result,
Expand Down Expand Up @@ -105,15 +105,6 @@ where

// finalizers:

/// Initialize without a prior version
///
/// Will seed resourceVersion with a 1 limit list call to the resource
pub async fn init(self) -> Result<Self> {
info!("Starting Informer for {:?}", self.resource);
self.reset().await?;
Ok(self)
}

/// Initialize from a prior version
pub fn init_from(self, v: String) -> Self {
info!("Recreating Informer for {:?} at {}", self.resource, v);
Expand Down Expand Up @@ -151,7 +142,7 @@ where
Delay::new(dur).await;
// If we are outside history, start over from latest
if *needs_resync {
self.reset().await?;
self.reset().await;
}
*needs_resync = false;
*needs_retry = false;
Expand Down Expand Up @@ -212,11 +203,11 @@ where
}
}

/// Reset the resourceVersion to latest
pub async fn reset(&self) -> Result<()> {
let latest = self.get_resource_version().await?;
*self.version.lock().await = latest;
Ok(())
/// Reset the resourceVersion to 0
///
/// Note: This will cause duplicate Added events for all existing resources
pub async fn reset(&self) {
*self.version.lock().await = 0.to_string();
}

/// Return the current version
Expand All @@ -225,19 +216,4 @@ where
// to get a lock on our version
futures::executor::block_on(async { self.version.lock().await.clone() })
}

/// Init helper
async fn get_resource_version(&self) -> Result<String> {
let req = self.resource.list_zero_resource_entries(&self.params)?;

// parse to void a ResourceList into void except for Metadata
let res = self.client.request::<ObjectList<NotUsed>>(req).await?;

let version = res.metadata.resourceVersion.unwrap_or_else(|| "0".into());
debug!(
"Got fresh resourceVersion={} for {}",
version, self.resource.resource
);
Ok(version)
}
}

0 comments on commit a12ee46

Please sign in to comment.