Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always reset the Informer version to 0 #149

Merged
merged 2 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
0.27.0 / 2020-02-XX
===================
* `Reflector` + `Informer` moved from `kube::api` to `kube::runtime`
* `Informer` now resets the version to 0 rather than dropping events - #134
* Removed `Informer::init`, since it is now a no-op when building the `Informer`

0.26.0 / 2020-02-25
===================
Expand Down
2 changes: 1 addition & 1 deletion examples/event_informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() -> anyhow::Result<()> {
let client = APIClient::new(config);

let events = Api::v1Event(client);
let ei = Informer::new(events).init().await?;
let ei = Informer::new(events);

loop {
let mut events = ei.poll().await?.boxed();
Expand Down
4 changes: 1 addition & 3 deletions examples/node_informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ async fn main() -> anyhow::Result<()> {

let nodes = RawApi::v1Node();
let events = Api::v1Event(client.clone());
let ni = Informer::raw(client.clone(), nodes)
let ni = Informer::raw(client.clone(), nodes);
//.labels("beta.kubernetes.io/os=linux")
.init()
.await?;

loop {
let mut nodes = ni.poll().await?.boxed();
Expand Down
2 changes: 1 addition & 1 deletion examples/pod_informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn main() -> anyhow::Result<()> {
let namespace = env::var("NAMESPACE").unwrap_or("default".into());

let resource = Api::v1Pod(client.clone()).within(&namespace);
let inf = Informer::new(resource.clone()).init().await?;
let inf = Informer::new(resource.clone());

loop {
let mut pods = inf.poll().await?.boxed();
Expand Down
40 changes: 8 additions & 32 deletions src/runtime/informer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
api::{
resource::{KubeObject, ObjectList, WatchEvent},
Api, ListParams, NotUsed, RawApi,
resource::{KubeObject, WatchEvent},
Api, ListParams, RawApi,
},
client::APIClient,
Result,
Expand Down Expand Up @@ -105,15 +105,6 @@ where

// finalizers:

/// Initialize without a prior version
///
/// Will seed resourceVersion with a 1 limit list call to the resource
pub async fn init(self) -> Result<Self> {
info!("Starting Informer for {:?}", self.resource);
self.reset().await?;
Ok(self)
}

/// Initialize from a prior version
pub fn init_from(self, v: String) -> Self {
info!("Recreating Informer for {:?} at {}", self.resource, v);
Expand Down Expand Up @@ -151,7 +142,7 @@ where
Delay::new(dur).await;
// If we are outside history, start over from latest
if *needs_resync {
self.reset().await?;
self.reset().await;
}
*needs_resync = false;
*needs_retry = false;
Expand Down Expand Up @@ -212,11 +203,11 @@ where
}
}

/// Reset the resourceVersion to latest
pub async fn reset(&self) -> Result<()> {
let latest = self.get_resource_version().await?;
*self.version.lock().await = latest;
Ok(())
/// Reset the resourceVersion to 0
///
/// Note: This will cause duplicate Added events for all existing resources
pub async fn reset(&self) {
*self.version.lock().await = 0.to_string();
}

/// Return the current version
Expand All @@ -225,19 +216,4 @@ where
// to get a lock on our version
futures::executor::block_on(async { self.version.lock().await.clone() })
}

/// Init helper
async fn get_resource_version(&self) -> Result<String> {
let req = self.resource.list_zero_resource_entries(&self.params)?;

// parse to void a ResourceList into void except for Metadata
let res = self.client.request::<ObjectList<NotUsed>>(req).await?;

let version = res.metadata.resourceVersion.unwrap_or_else(|| "0".into());
debug!(
"Got fresh resourceVersion={} for {}",
version, self.resource.resource
);
Ok(version)
}
}