Skip to content

Commit

Permalink
make secret reflector a little nicer to look at
Browse files Browse the repository at this point in the history
  • Loading branch information
clux committed Apr 7, 2020
1 parent 5796563 commit 4b5ad70
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 91 deletions.
29 changes: 14 additions & 15 deletions kube/examples/crd_reflector.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
#[macro_use] extern crate log;
use std::time::Duration;

use kube_derive::CustomResource;
use serde::{Deserialize, Serialize};
use tokio::time::delay_for;

use kube::{
api::{Api, ListParams, Meta},
Expand All @@ -28,21 +25,23 @@ async fn main() -> anyhow::Result<()> {
// This example requires `kubectl apply -f examples/foo.yaml` run first
let foos: Api<Foo> = Api::namespaced(client, &namespace);
let lp = ListParams::default().timeout(20); // low timeout in this example
let rf = Reflector::new(foos, lp).init().await?;
let rf = Reflector::new(foos).params(lp);
let runner = rf.clone().run();

let cloned = rf.clone();
tokio::spawn(async move {
loop {
if let Err(e) = cloned.poll().await {
warn!("Poll error: {:?}", e);
}
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
let crds = rf
.state()
.await
.unwrap()
.iter()
.map(Meta::name)
.collect::<Vec<_>>();
info!("Current crds: {:?}", crds);
}
});

loop {
delay_for(Duration::from_secs(5)).await;
// Read updated internal state (instant):
let crds = rf.state().await?.iter().map(Meta::name).collect::<Vec<_>>();
info!("Current crds: {:?}", crds);
}
runner.await?;
Ok(())
}
3 changes: 2 additions & 1 deletion kube/examples/event_informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() -> anyhow::Result<()> {

let events: Api<Event> = Api::all(client);
let lp = ListParams::default();
let ei = Informer::new(events, lp);
let ei = Informer::new(events).params(lp);

loop {
let mut events = ei.poll().await?.boxed();
Expand Down Expand Up @@ -47,6 +47,7 @@ fn handle_event(ev: WatchEvent<Event>) -> anyhow::Result<()> {
WatchEvent::Error(e) => {
warn!("Error event: {:?}", e);
}
_ => {}
}
Ok(())
}
1 change: 1 addition & 0 deletions kube/examples/job_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ async fn main() -> anyhow::Result<()> {
}
WatchEvent::Deleted(s) => info!("Deleted {}", Meta::name(&s)),
WatchEvent::Error(s) => error!("{}", s),
_ => {}
}
}

Expand Down
3 changes: 2 additions & 1 deletion kube/examples/node_informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() -> anyhow::Result<()> {
let nodes: Api<Node> = Api::all(client.clone());

let lp = ListParams::default().labels("beta.kubernetes.io/os=linux");
let ni = Informer::new(nodes, lp);
let ni = Informer::new(nodes).params(lp);

loop {
let mut nodes = ni.poll().await?.boxed();
Expand Down Expand Up @@ -77,6 +77,7 @@ async fn handle_nodes(events: &Api<Event>, ne: WatchEvent<Node>) -> anyhow::Resu
WatchEvent::Error(e) => {
warn!("Error event: {:?}", e);
}
_ => {}
}
Ok(())
}
35 changes: 8 additions & 27 deletions kube/examples/node_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,17 @@ async fn main() -> anyhow::Result<()> {
let lp = ListParams::default()
.labels("beta.kubernetes.io/instance-type=m4.2xlarge") // filter instances by label
.timeout(10); // short watch timeout in this example
let rf = Reflector::new(nodes, lp).init().await?;
let rf = Reflector::new(nodes).params(lp);
let runner = rf.clone().run();

// rf is initialized with full state, which can be extracted on demand.
// Output is an owned Vec<Node>
rf.state().await?.into_iter().for_each(|o| {
let labels = Meta::meta(&o).labels.clone().unwrap();
info!(
"Found node {} ({:?}) running {:?} with labels: {:?}",
Meta::name(&o),
o.spec.unwrap().provider_id.unwrap(),
o.status.unwrap().conditions.unwrap(),
labels
);
});

let cloned = rf.clone();
tokio::spawn(async move {
loop {
if let Err(e) = cloned.poll().await {
warn!("Poll error: {:?}", e);
}
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
let deploys: Vec<_> = rf.state().await.unwrap().iter().map(Meta::name).collect();
info!("Current {} nodes: {:?}", deploys.len(), deploys);
}
});

loop {
// Update internal state by calling watch (waits the full timeout)
rf.poll().await?;

// Read the updated internal state (instant):
let deploys: Vec<_> = rf.state().await?.iter().map(Meta::name).collect();
info!("Current {} nodes: {:?}", deploys.len(), deploys);
}
runner.await?;
Ok(())
}
1 change: 1 addition & 0 deletions kube/examples/pod_informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ fn handle_pod(ev: WatchEvent<Pod>) -> anyhow::Result<()> {
WatchEvent::Deleted(o) => {
info!("Deleted Pod: {}", Meta::name(&o));
}
WatchEvent::Bookmark(_) => {}
WatchEvent::Error(e) => {
warn!("Error event: {:?}", e);
}
Expand Down
35 changes: 8 additions & 27 deletions kube/examples/pod_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use kube::{
runtime::Reflector,
Client,
};
use std::time::Duration;
use tokio::time::delay_for;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand All @@ -17,34 +15,17 @@ async fn main() -> anyhow::Result<()> {

let pods: Api<Pod> = Api::namespaced(client, &namespace);
let lp = ListParams::default().timeout(10); // short watch timeout in this example
let rf = Reflector::new(pods, lp).init().await?;
let rf = Reflector::new(pods).params(lp);
let runner = rf.clone().run();

// Can read initial state now:
rf.state().await?.into_iter().for_each(|pod| {
let name = Meta::name(&pod);
let phase = pod.status.unwrap().phase.unwrap();
let containers = pod
.spec
.unwrap()
.containers
.into_iter()
.map(|c| c.name)
.collect::<Vec<_>>();
info!("Found initial pod {} ({}) with {:?}", name, phase, containers);
});

let cloned = rf.clone();
tokio::spawn(async move {
loop {
if let Err(e) = cloned.poll().await {
warn!("Poll error: {:?}", e);
}
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
let pods: Vec<_> = rf.state().await.unwrap().iter().map(Meta::name).collect();
info!("Current pods: {:?}", pods);
}
});

loop {
delay_for(Duration::from_secs(5)).await;
let pods: Vec<_> = rf.state().await?.iter().map(Meta::name).collect();
info!("Current pods: {:?}", pods);
}
runner.await?;
Ok(())
}
46 changes: 26 additions & 20 deletions kube/examples/secret_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ enum Decoded {

fn decode(secret: &Secret) -> BTreeMap<String, Decoded> {
let mut res = BTreeMap::new();
for (k, v) in secret.data.clone().unwrap() {
if let Ok(b) = std::str::from_utf8(&v.0) {
res.insert(k, Decoded::Utf8(b.to_string()));
} else {
res.insert(k, Decoded::Bytes(v.0));
// Ignoring binary data for now
if let Some(data) = secret.data.clone() {
for (k, v) in data {
if let Ok(b) = std::str::from_utf8(&v.0) {
res.insert(k, Decoded::Utf8(b.to_string()));
} else {
res.insert(k, Decoded::Bytes(v.0));
}
}
}
res
Expand All @@ -38,20 +41,23 @@ async fn main() -> anyhow::Result<()> {

let secrets: Api<Secret> = Api::namespaced(client, &namespace);
let lp = ListParams::default().timeout(10); // short watch timeout in this example
let rf = Reflector::new(secrets, lp).init().await?;

// Can read initial state now:
rf.state().await?.into_iter().for_each(|secret| {
let res = decode(&secret);
info!("Found secret {} with data: {:?}", Meta::name(&secret), res);
let rf = Reflector::new(secrets).params(lp);
let runner = rf.clone().run();

tokio::spawn(async move {
loop {
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
let secrets: Vec<_> = rf
.state()
.await
.unwrap()
.iter()
.map(|s| format!("{}: {:?}", Meta::name(s), decode(s).keys()))
.collect();
info!("Current secrets: {:?}", secrets);
}
});

loop {
// Update internal state by calling watch (waits the full timeout)
rf.poll().await?; // ideally call this from a thread/task

// Read updated internal state (instant):
let secrets: Vec<_> = rf.state().await?.iter().map(Meta::name).collect();
info!("Current secrets: {:?}", secrets);
}
runner.await?;
Ok(())
}

0 comments on commit 4b5ad70

Please sign in to comment.