diff --git a/kube/examples/crd_reflector.rs b/kube/examples/crd_reflector.rs index c1ea771c4..36c3cddaf 100644 --- a/kube/examples/crd_reflector.rs +++ b/kube/examples/crd_reflector.rs @@ -1,9 +1,6 @@ #[macro_use] extern crate log; -use std::time::Duration; - use kube_derive::CustomResource; use serde::{Deserialize, Serialize}; -use tokio::time::delay_for; use kube::{ api::{Api, ListParams, Meta}, @@ -28,21 +25,23 @@ async fn main() -> anyhow::Result<()> { // This example requires `kubectl apply -f examples/foo.yaml` run first let foos: Api = Api::namespaced(client, &namespace); let lp = ListParams::default().timeout(20); // low timeout in this example - let rf = Reflector::new(foos, lp).init().await?; + let rf = Reflector::new(foos).params(lp); + let runner = rf.clone().run(); - let cloned = rf.clone(); tokio::spawn(async move { loop { - if let Err(e) = cloned.poll().await { - warn!("Poll error: {:?}", e); - } + // Periodically read our state + tokio::time::delay_for(std::time::Duration::from_secs(5)).await; + let crds = rf + .state() + .await + .unwrap() + .iter() + .map(Meta::name) + .collect::>(); + info!("Current crds: {:?}", crds); } }); - - loop { - delay_for(Duration::from_secs(5)).await; - // Read updated internal state (instant): - let crds = rf.state().await?.iter().map(Meta::name).collect::>(); - info!("Current crds: {:?}", crds); - } + runner.await?; + Ok(()) } diff --git a/kube/examples/event_informer.rs b/kube/examples/event_informer.rs index 60e8ee38f..a79a42eac 100644 --- a/kube/examples/event_informer.rs +++ b/kube/examples/event_informer.rs @@ -16,7 +16,7 @@ async fn main() -> anyhow::Result<()> { let events: Api = Api::all(client); let lp = ListParams::default(); - let ei = Informer::new(events, lp); + let ei = Informer::new(events).params(lp); loop { let mut events = ei.poll().await?.boxed(); @@ -47,6 +47,7 @@ fn handle_event(ev: WatchEvent) -> anyhow::Result<()> { WatchEvent::Error(e) => { warn!("Error event: {:?}", e); } + _ => {} } Ok(()) } diff --git a/kube/examples/job_api.rs b/kube/examples/job_api.rs index 297721892..810b4e73c 100644 --- a/kube/examples/job_api.rs +++ b/kube/examples/job_api.rs @@ -65,6 +65,7 @@ async fn main() -> anyhow::Result<()> { } WatchEvent::Deleted(s) => info!("Deleted {}", Meta::name(&s)), WatchEvent::Error(s) => error!("{}", s), + _ => {} } } diff --git a/kube/examples/node_informer.rs b/kube/examples/node_informer.rs index 330461a53..71b9ed8c8 100644 --- a/kube/examples/node_informer.rs +++ b/kube/examples/node_informer.rs @@ -16,7 +16,7 @@ async fn main() -> anyhow::Result<()> { let nodes: Api = Api::all(client.clone()); let lp = ListParams::default().labels("beta.kubernetes.io/os=linux"); - let ni = Informer::new(nodes, lp); + let ni = Informer::new(nodes).params(lp); loop { let mut nodes = ni.poll().await?.boxed(); @@ -77,6 +77,7 @@ async fn handle_nodes(events: &Api, ne: WatchEvent) -> anyhow::Resu WatchEvent::Error(e) => { warn!("Error event: {:?}", e); } + _ => {} } Ok(()) } diff --git a/kube/examples/node_reflector.rs b/kube/examples/node_reflector.rs index 19f6968d7..f1f6bcd29 100644 --- a/kube/examples/node_reflector.rs +++ b/kube/examples/node_reflector.rs @@ -16,36 +16,17 @@ async fn main() -> anyhow::Result<()> { let lp = ListParams::default() .labels("beta.kubernetes.io/instance-type=m4.2xlarge") // filter instances by label .timeout(10); // short watch timeout in this example - let rf = Reflector::new(nodes, lp).init().await?; + let rf = Reflector::new(nodes).params(lp); + let runner = rf.clone().run(); - // rf is initialized with full state, which can be extracted on demand. - // Output is an owned Vec - rf.state().await?.into_iter().for_each(|o| { - let labels = Meta::meta(&o).labels.clone().unwrap(); - info!( - "Found node {} ({:?}) running {:?} with labels: {:?}", - Meta::name(&o), - o.spec.unwrap().provider_id.unwrap(), - o.status.unwrap().conditions.unwrap(), - labels - ); - }); - - let cloned = rf.clone(); tokio::spawn(async move { loop { - if let Err(e) = cloned.poll().await { - warn!("Poll error: {:?}", e); - } + // Periodically read our state + tokio::time::delay_for(std::time::Duration::from_secs(5)).await; + let deploys: Vec<_> = rf.state().await.unwrap().iter().map(Meta::name).collect(); + info!("Current {} nodes: {:?}", deploys.len(), deploys); } }); - - loop { - // Update internal state by calling watch (waits the full timeout) - rf.poll().await?; - - // Read the updated internal state (instant): - let deploys: Vec<_> = rf.state().await?.iter().map(Meta::name).collect(); - info!("Current {} nodes: {:?}", deploys.len(), deploys); - } + runner.await?; + Ok(()) } diff --git a/kube/examples/pod_informer.rs b/kube/examples/pod_informer.rs index 1aab10bb9..573fa6d1d 100644 --- a/kube/examples/pod_informer.rs +++ b/kube/examples/pod_informer.rs @@ -50,6 +50,7 @@ fn handle_pod(ev: WatchEvent) -> anyhow::Result<()> { WatchEvent::Deleted(o) => { info!("Deleted Pod: {}", Meta::name(&o)); } + WatchEvent::Bookmark(_) => {} WatchEvent::Error(e) => { warn!("Error event: {:?}", e); } diff --git a/kube/examples/pod_reflector.rs b/kube/examples/pod_reflector.rs index 5407cc278..ae1aab8ad 100644 --- a/kube/examples/pod_reflector.rs +++ b/kube/examples/pod_reflector.rs @@ -5,8 +5,6 @@ use kube::{ runtime::Reflector, Client, }; -use std::time::Duration; -use tokio::time::delay_for; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -17,34 +15,17 @@ async fn main() -> anyhow::Result<()> { let pods: Api = Api::namespaced(client, &namespace); let lp = ListParams::default().timeout(10); // short watch timeout in this example - let rf = Reflector::new(pods, lp).init().await?; + let rf = Reflector::new(pods).params(lp); + let runner = rf.clone().run(); - // Can read initial state now: - rf.state().await?.into_iter().for_each(|pod| { - let name = Meta::name(&pod); - let phase = pod.status.unwrap().phase.unwrap(); - let containers = pod - .spec - .unwrap() - .containers - .into_iter() - .map(|c| c.name) - .collect::>(); - info!("Found initial pod {} ({}) with {:?}", name, phase, containers); - }); - - let cloned = rf.clone(); tokio::spawn(async move { loop { - if let Err(e) = cloned.poll().await { - warn!("Poll error: {:?}", e); - } + // Periodically read our state + tokio::time::delay_for(std::time::Duration::from_secs(5)).await; + let pods: Vec<_> = rf.state().await.unwrap().iter().map(Meta::name).collect(); + info!("Current pods: {:?}", pods); } }); - - loop { - delay_for(Duration::from_secs(5)).await; - let pods: Vec<_> = rf.state().await?.iter().map(Meta::name).collect(); - info!("Current pods: {:?}", pods); - } + runner.await?; + Ok(()) } diff --git a/kube/examples/secret_reflector.rs b/kube/examples/secret_reflector.rs index d20e3b68f..bf38d6f84 100644 --- a/kube/examples/secret_reflector.rs +++ b/kube/examples/secret_reflector.rs @@ -19,11 +19,14 @@ enum Decoded { fn decode(secret: &Secret) -> BTreeMap { let mut res = BTreeMap::new(); - for (k, v) in secret.data.clone().unwrap() { - if let Ok(b) = std::str::from_utf8(&v.0) { - res.insert(k, Decoded::Utf8(b.to_string())); - } else { - res.insert(k, Decoded::Bytes(v.0)); + // Ignoring binary data for now + if let Some(data) = secret.data.clone() { + for (k, v) in data { + if let Ok(b) = std::str::from_utf8(&v.0) { + res.insert(k, Decoded::Utf8(b.to_string())); + } else { + res.insert(k, Decoded::Bytes(v.0)); + } } } res @@ -38,20 +41,23 @@ async fn main() -> anyhow::Result<()> { let secrets: Api = Api::namespaced(client, &namespace); let lp = ListParams::default().timeout(10); // short watch timeout in this example - let rf = Reflector::new(secrets, lp).init().await?; - - // Can read initial state now: - rf.state().await?.into_iter().for_each(|secret| { - let res = decode(&secret); - info!("Found secret {} with data: {:?}", Meta::name(&secret), res); + let rf = Reflector::new(secrets).params(lp); + let runner = rf.clone().run(); + + tokio::spawn(async move { + loop { + // Periodically read our state + tokio::time::delay_for(std::time::Duration::from_secs(5)).await; + let secrets: Vec<_> = rf + .state() + .await + .unwrap() + .iter() + .map(|s| format!("{}: {:?}", Meta::name(s), decode(s).keys())) + .collect(); + info!("Current secrets: {:?}", secrets); + } }); - - loop { - // Update internal state by calling watch (waits the full timeout) - rf.poll().await?; // ideally call this from a thread/task - - // Read updated internal state (instant): - let secrets: Vec<_> = rf.state().await?.iter().map(Meta::name).collect(); - info!("Current secrets: {:?}", secrets); - } + runner.await?; + Ok(()) }