Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

self-driving reflector + signal handling #218

Merged
merged 13 commits into from
Apr 8, 2020
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@
* implement `TryFrom<Config> for Client`
* `Client::try_default` or `Client::new` now recommended constructors
* People parsing `~/.kube/config` must use the `KubeConfig` struct instead
* `Reflector<K>` now takes an `Api<K>` + `ListParams` to construct
* `Informer<K>` now takes an `Api<K>` + `ListParams` to construct
* `Reflector<K>` now only takes an `Api<K>` to construct (.params method)
* `Informer<K>` now only takes an `Api<K>` to construct (.params method)
* `Informer::init_from` -> `Informer::set_version`
* `Reflector` now self-polls #151 + handles signals #152
* `Reflector::poll` made private in favour of `Reflector::run`
* `Api::watch` no longer filters out error events (`next` -> `try_next`)
* `Api::watch` returns `Result<WatchEvent>` rather than `WatchEvent`
* `WatchEvent::Bookmark` added to enum
* `ListParams::allow_bookmarks` added

0.31.0 / 2020-03-27
===================
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ An Informer updates the last received `resourceVersion` internally on every even

```rust
let pods: Api<Pod> = Api::namespaced(client, "default");
let inform = Informer::new(pods, pods);
let inform = Informer::new(pods);
```

The main feature of `Informer<K>` is being able to subscribe to events while having a streaming `.poll()` open:
Expand Down Expand Up @@ -120,7 +120,8 @@ async fn handle(event: WatchEvent<Pod>) -> anyhow::Result<()> {
},
WatchEvent::Error(e) => {
println!("Error event: {:?}", e);
}
},
_ => {},
}
Ok(())
}
Expand All @@ -136,7 +137,7 @@ A cache for `K` that keeps itself up to date. It does not expose events, but you
let nodes: Api<Node> = Api::namespaced(client, &namespace);
let lp = ListParams::default()
.labels("beta.kubernetes.io/instance-type=m4.2xlarge");
let rf = Reflector::new(nodes, lp);
let rf = Reflector::new(nodes).params(lp);
```

then you should `poll()` the reflector, and `state()` to get the current cached state:
Expand Down
28 changes: 28 additions & 0 deletions kube/examples/configmap_informer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#[macro_use] extern crate log;
use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
api::{Api, ListParams},
runtime::Informer,
Client,
};

/// Example way to read secrets
#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=trace");
env_logger::init();
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or("default".into());

let cms: Api<ConfigMap> = Api::namespaced(client, &namespace);
let lp = ListParams::default().allow_bookmarks().timeout(10); // short watch timeout in this example
let inf = Informer::new(cms).params(lp);

loop {
let mut stream = inf.poll().await?.boxed();
while let Some(event) = stream.try_next().await? {
info!("Got: {:?}", event);
}
}
}
28 changes: 15 additions & 13 deletions kube/examples/configmap_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ use kube::{
Client,
};

fn spawn_periodic_reader(rf: Reflector<ConfigMap>) {
tokio::spawn(async move {
loop {
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
let cms: Vec<_> = rf.state().await.unwrap().iter().map(Meta::name).collect();
info!("Current configmaps: {:?}", cms);
}
});
}

/// Example way to read secrets
#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand All @@ -16,19 +27,10 @@ async fn main() -> anyhow::Result<()> {

let cms: Api<ConfigMap> = Api::namespaced(client, &namespace);
let lp = ListParams::default().timeout(10); // short watch timeout in this example
let rf = Reflector::new(cms, lp).init().await?;

// Can read initial state now:
rf.state().await?.into_iter().for_each(|cm| {
info!("Found configmap {} with data: {:?}", Meta::name(&cm), cm.data);
});
let rf = Reflector::new(cms).params(lp);

loop {
// Update internal state by calling watch (waits the full timeout)
rf.poll().await?; // ideally call this from a thread/task
spawn_periodic_reader(rf.clone()); // read from a clone in a task

// up to date state:
let pods: Vec<_> = rf.state().await?.iter().map(Meta::name).collect();
info!("Current configmaps: {:?}", pods);
}
rf.run().await?; // run reflector and listen for signals
Ok(())
}
29 changes: 14 additions & 15 deletions kube/examples/crd_reflector.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
#[macro_use] extern crate log;
use std::time::Duration;

use kube_derive::CustomResource;
use serde::{Deserialize, Serialize};
use tokio::time::delay_for;

use kube::{
api::{Api, ListParams, Meta},
Expand All @@ -28,21 +25,23 @@ async fn main() -> anyhow::Result<()> {
// This example requires `kubectl apply -f examples/foo.yaml` run first
let foos: Api<Foo> = Api::namespaced(client, &namespace);
let lp = ListParams::default().timeout(20); // low timeout in this example
let rf = Reflector::new(foos, lp).init().await?;
let rf = Reflector::new(foos).params(lp);

let cloned = rf.clone();
let rf2 = rf.clone(); // read from a clone in a task
tokio::spawn(async move {
loop {
if let Err(e) = cloned.poll().await {
warn!("Poll error: {:?}", e);
}
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
let crds = rf2
.state()
.await
.unwrap()
.iter()
.map(Meta::name)
.collect::<Vec<_>>();
info!("Current crds: {:?}", crds);
}
});

loop {
delay_for(Duration::from_secs(5)).await;
// Read updated internal state (instant):
let crds = rf.state().await?.iter().map(Meta::name).collect::<Vec<_>>();
info!("Current crds: {:?}", crds);
}
rf.run().await?; // run reflector and listen for signals
Ok(())
}
38 changes: 11 additions & 27 deletions kube/examples/deployment_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,17 @@ async fn main() -> anyhow::Result<()> {

let deploys: Api<Deployment> = Api::namespaced(client, &namespace);
let lp = ListParams::default().timeout(10); // short watch timeout in this example
let rf = Reflector::new(deploys, lp).init().await?;
let rf = Reflector::new(deploys).params(lp);

// rf is initialized with full state, which can be extracted on demand.
// Output is an owned Vec<Deployment>
rf.state().await?.into_iter().for_each(|d| {
info!(
"Found deployment for {} - {} replicas running {:?}",
Meta::name(&d),
d.status.unwrap().replicas.unwrap(),
d.spec
.unwrap()
.template
.spec
.unwrap()
.containers
.into_iter()
.map(|c| c.image.unwrap())
.collect::<Vec<_>>()
);
let rf2 = rf.clone(); // read from a clone in a task
tokio::spawn(async move {
loop {
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
let deploys: Vec<_> = rf2.state().await.unwrap().iter().map(Meta::name).collect();
info!("Current deploys: {:?}", deploys);
}
});

loop {
// Update internal state by calling watch (waits the full timeout)
rf.poll().await?;

// Read the updated internal state (instant):
let deploys: Vec<_> = rf.state().await?.iter().map(Meta::name).collect();
info!("Current deploys: {:?}", deploys);
}
rf.run().await?; // run reflector and listen for signals
Ok(())
}
3 changes: 2 additions & 1 deletion kube/examples/event_informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() -> anyhow::Result<()> {

let events: Api<Event> = Api::all(client);
let lp = ListParams::default();
let ei = Informer::new(events, lp);
let ei = Informer::new(events).params(lp);

loop {
let mut events = ei.poll().await?.boxed();
Expand Down Expand Up @@ -47,6 +47,7 @@ fn handle_event(ev: WatchEvent<Event>) -> anyhow::Result<()> {
WatchEvent::Error(e) => {
warn!("Error event: {:?}", e);
}
_ => {}
}
Ok(())
}
1 change: 1 addition & 0 deletions kube/examples/job_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ async fn main() -> anyhow::Result<()> {
}
WatchEvent::Deleted(s) => info!("Deleted {}", Meta::name(&s)),
WatchEvent::Error(s) => error!("{}", s),
_ => {}
}
}

Expand Down
3 changes: 2 additions & 1 deletion kube/examples/node_informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() -> anyhow::Result<()> {
let nodes: Api<Node> = Api::all(client.clone());

let lp = ListParams::default().labels("beta.kubernetes.io/os=linux");
let ni = Informer::new(nodes, lp);
let ni = Informer::new(nodes).params(lp);

loop {
let mut nodes = ni.poll().await?.boxed();
Expand Down Expand Up @@ -77,6 +77,7 @@ async fn handle_nodes(events: &Api<Event>, ne: WatchEvent<Node>) -> anyhow::Resu
WatchEvent::Error(e) => {
warn!("Error event: {:?}", e);
}
_ => {}
}
Ok(())
}
35 changes: 8 additions & 27 deletions kube/examples/node_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,17 @@ async fn main() -> anyhow::Result<()> {
let lp = ListParams::default()
.labels("beta.kubernetes.io/instance-type=m4.2xlarge") // filter instances by label
.timeout(10); // short watch timeout in this example
let rf = Reflector::new(nodes, lp).init().await?;
let rf = Reflector::new(nodes).params(lp);

// rf is initialized with full state, which can be extracted on demand.
// Output is an owned Vec<Node>
rf.state().await?.into_iter().for_each(|o| {
let labels = Meta::meta(&o).labels.clone().unwrap();
info!(
"Found node {} ({:?}) running {:?} with labels: {:?}",
Meta::name(&o),
o.spec.unwrap().provider_id.unwrap(),
o.status.unwrap().conditions.unwrap(),
labels
);
});

let cloned = rf.clone();
let rf2 = rf.clone(); // read from a clone in a task
tokio::spawn(async move {
loop {
if let Err(e) = cloned.poll().await {
warn!("Poll error: {:?}", e);
}
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
let deploys: Vec<_> = rf2.state().await.unwrap().iter().map(Meta::name).collect();
info!("Current {} nodes: {:?}", deploys.len(), deploys);
}
});

loop {
// Update internal state by calling watch (waits the full timeout)
rf.poll().await?;

// Read the updated internal state (instant):
let deploys: Vec<_> = rf.state().await?.iter().map(Meta::name).collect();
info!("Current {} nodes: {:?}", deploys.len(), deploys);
}
rf.run().await?; // run reflector and listen for signals
Ok(())
}
3 changes: 2 additions & 1 deletion kube/examples/pod_informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() -> anyhow::Result<()> {
let namespace = env::var("NAMESPACE").unwrap_or("default".into());

let pods: Api<Pod> = Api::namespaced(client, &namespace);
let inf = Informer::new(pods, ListParams::default());
let inf = Informer::new(pods).params(ListParams::default().timeout(10));

loop {
let mut pods = inf.poll().await?.boxed();
Expand Down Expand Up @@ -50,6 +50,7 @@ fn handle_pod(ev: WatchEvent<Pod>) -> anyhow::Result<()> {
WatchEvent::Deleted(o) => {
info!("Deleted Pod: {}", Meta::name(&o));
}
WatchEvent::Bookmark(_) => {}
WatchEvent::Error(e) => {
warn!("Error event: {:?}", e);
}
Expand Down
34 changes: 8 additions & 26 deletions kube/examples/pod_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use kube::{
runtime::Reflector,
Client,
};
use std::time::Duration;
use tokio::time::delay_for;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand All @@ -17,34 +15,18 @@ async fn main() -> anyhow::Result<()> {

let pods: Api<Pod> = Api::namespaced(client, &namespace);
let lp = ListParams::default().timeout(10); // short watch timeout in this example
let rf = Reflector::new(pods, lp).init().await?;
let rf = Reflector::new(pods).params(lp);

// Can read initial state now:
rf.state().await?.into_iter().for_each(|pod| {
let name = Meta::name(&pod);
let phase = pod.status.unwrap().phase.unwrap();
let containers = pod
.spec
.unwrap()
.containers
.into_iter()
.map(|c| c.name)
.collect::<Vec<_>>();
info!("Found initial pod {} ({}) with {:?}", name, phase, containers);
});

let cloned = rf.clone();
let rf2 = rf.clone(); // read from a clone in a task
tokio::spawn(async move {
loop {
if let Err(e) = cloned.poll().await {
warn!("Poll error: {:?}", e);
}
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
let pods: Vec<_> = rf2.state().await.unwrap().iter().map(Meta::name).collect();
info!("Current pods: {:?}", pods);
}
});

loop {
delay_for(Duration::from_secs(5)).await;
let pods: Vec<_> = rf.state().await?.iter().map(Meta::name).collect();
info!("Current pods: {:?}", pods);
}
rf.run().await?; // run reflector and listen for signals
Ok(())
}
Loading