Skip to content

Commit

Permalink
Revamp event and log examples to allow more parameters (#1287)
Browse files Browse the repository at this point in the history
Revamp event and log `examples` to allow more kubectl like parameters

We already have `clap` in the dependency tree so thought it would be useful
to show how to do some more things in the kubectl style.

- `event_watcher` now has an optional flag to limit to one object (ala `kubectl events --for`)
- `log_stream` now supports the basic flags that `kubectl logs` do
- `kubectl` helpers for formatting duration are aligned across examples

Signed-off-by: clux <[email protected]>
  • Loading branch information
clux authored Sep 7, 2023
1 parent 6842ea4 commit 259ed96
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 47 deletions.
10 changes: 9 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ For a basic overview of how to use the `Api` try:

```sh
cargo run --example job_api
cargo run --example log_stream
cargo run --example pod_api
cargo run --example dynamic_api
cargo run --example dynamic_jsonpath
Expand All @@ -42,6 +41,15 @@ cargo run --example kubectl -- apply -f configmapgen_controller_crd.yaml

Supported flags are `-lLABELSELECTOR`, `-nNAMESPACE`, `--all`, and `-oyaml`.

There are also two other examples that serve as simplistic analogues of `kubectl logs` and `kubectl events`:

```sh
# tail logs
cargo run --example log_stream -- prometheus-promstack-kube-prometheus-prometheus-0 -c prometheus -f --since=3600
# get events for an object
cargo run --example event_watcher -- --for=Pod/prometheus-promstack-kube-prometheus-prometheus-0
```

## kube admission controller example
Admission controllers are a bit of a special beast. They don't actually need `kube_client` (unless you need to verify something with the api-server) or `kube_runtime` (unless you also build a complementing reconciler) because, by themselves, they simply get changes sent to them over `https`. You will need a webserver, certificates, and either your controller deployed behind a `Service`, or as we do here: running locally with a private ip that your `k3d` cluster can reach.

Expand Down
70 changes: 46 additions & 24 deletions examples/event_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,64 @@
use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::{core::v1::ObjectReference, events::v1::Event};
use k8s_openapi::{
api::{core::v1::ObjectReference, events::v1::Event},
apimachinery::pkg::apis::meta::v1::Time,
chrono::Utc,
};
use kube::{
api::Api,
runtime::{watcher, WatchStreamExt},
Client,
Api, Client, ResourceExt,
};
use tracing::info;

/// limited variant of `kubectl events` that works on current context's namespace
///
/// requires a new enough cluster that apis/events.k8s.io/v1 work (kubectl uses corev1::Event)
/// for old style usage of core::v1::Event see node_watcher
#[derive(clap::Parser)]
struct App {
/// Filter by object and kind
///
/// Using --for=Pod/blog-xxxxx
/// Note that kind name is case sensitive
#[arg(long)]
r#for: Option<String>,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;
let app: App = clap::Parser::parse();

let events: Api<Event> = Api::all(client);
let ew = watcher(events, watcher::Config::default()).applied_objects();
let events: Api<Event> = Api::default_namespaced(client);
let mut conf = watcher::Config::default();
if let Some(forval) = app.r#for {
if let Some((kind, name)) = forval.split_once('/') {
conf = conf.fields(&format!("regarding.kind={kind},regarding.name={name}"));
}
}
let event_stream = watcher(events, conf).default_backoff().applied_objects();
pin_mut!(event_stream);

pin_mut!(ew);
while let Some(event) = ew.try_next().await? {
handle_event(event)?;
println!("{0:<6} {1:<15} {2:<55} {3}", "AGE", "REASON", "OBJECT", "MESSAGE");
while let Some(ev) = event_stream.try_next().await? {
let age = ev.creation_timestamp().map(format_creation).unwrap_or_default();
let reason = ev.reason.unwrap_or_default();
let obj = ev.regarding.map(format_objref).flatten().unwrap_or_default();
let note = ev.note.unwrap_or_default();
println!("{0:<6} {1:<15} {2:<55} {3}", age, reason, obj, note);
}
Ok(())
}

// This function lets the app handle an added/modified event from k8s
fn handle_event(ev: Event) -> anyhow::Result<()> {
info!(
"{}: {} ({})",
ev.regarding.map(fmt_obj_ref).unwrap_or_default(),
ev.reason.unwrap_or_default(),
ev.note.unwrap_or_default(),
);
Ok(())
fn format_objref(oref: ObjectReference) -> Option<String> {
Some(format!("{}/{}", oref.kind?, oref.name?))
}

fn fmt_obj_ref(oref: ObjectReference) -> String {
format!(
"{}/{}",
oref.kind.unwrap_or_default(),
oref.name.unwrap_or_default()
)
fn format_creation(time: Time) -> String {
let dur = Utc::now().signed_duration_since(time.0);
match (dur.num_days(), dur.num_hours(), dur.num_minutes()) {
(days, _, _) if days > 0 => format!("{days}d"),
(_, hours, _) if hours > 0 => format!("{hours}h"),
(_, _, mins) => format!("{mins}m"),
}
}
15 changes: 5 additions & 10 deletions examples/kubectl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
//! with labels and namespace selectors supported.
use anyhow::{bail, Context, Result};
use futures::{StreamExt, TryStreamExt};
use k8s_openapi::{
apimachinery::pkg::apis::meta::v1::Time,
chrono::{Duration, Utc},
};
use k8s_openapi::{apimachinery::pkg::apis::meta::v1::Time, chrono::Utc};
use kube::{
api::{Api, DynamicObject, ListParams, Patch, PatchParams, ResourceExt},
core::GroupVersionKind,
Expand Down Expand Up @@ -103,7 +100,7 @@ impl App {
let max_name = result.iter().map(|x| x.name_any().len() + 2).max().unwrap_or(63);
println!("{0:<width$} {1:<20}", "NAME", "AGE", width = max_name);
for inst in result {
let age = format_creation_since(inst.creation_timestamp());
let age = inst.creation_timestamp().map(format_creation).unwrap_or_default();
println!("{0:<width$} {1:<20}", inst.name_any(), age, width = max_name);
}
}
Expand Down Expand Up @@ -131,7 +128,7 @@ impl App {
let mut stream = watcher(api, wc).applied_objects().boxed();
println!("{0:<width$} {1:<20}", "NAME", "AGE", width = 63);
while let Some(inst) = stream.try_next().await? {
let age = format_creation_since(inst.creation_timestamp());
let age = inst.creation_timestamp().map(format_creation).unwrap_or_default();
println!("{0:<width$} {1:<20}", inst.name_any(), age, width = 63);
}
Ok(())
Expand Down Expand Up @@ -240,10 +237,8 @@ fn dynamic_api(
}
}

fn format_creation_since(time: Option<Time>) -> String {
format_duration(Utc::now().signed_duration_since(time.unwrap().0))
}
fn format_duration(dur: Duration) -> String {
fn format_creation(time: Time) -> String {
let dur = Utc::now().signed_duration_since(time.0);
match (dur.num_days(), dur.num_hours(), dur.num_minutes()) {
(days, _, _) if days > 0 => format!("{days}d"),
(_, hours, _) if hours > 0 => format!("{hours}h"),
Expand Down
45 changes: 33 additions & 12 deletions examples/log_stream.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,56 @@
use anyhow::{anyhow, Result};
use futures::{AsyncBufReadExt, TryStreamExt};
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, LogParams},
Client,
};
use std::env;
use tracing::*;

/// limited variant of kubectl logs
#[derive(clap::Parser)]
struct App {
#[arg(long, short = 'c')]
container: Option<String>,

#[arg(long, short = 't')]
tail: Option<i64>,

#[arg(long, short = 'f')]
follow: bool,

/// Since seconds
#[arg(long, short = 's')]
since: Option<i64>,

/// Include timestamps in the log output
#[arg(long, default_value = "false")]
timestamps: bool,

pod: String,
}

#[tokio::main]
async fn main() -> Result<()> {
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let app: App = clap::Parser::parse();
let client = Client::try_default().await?;

let mypod = env::args()
.nth(1)
.ok_or_else(|| anyhow!("Usage: log_follow <pod>"))?;
info!("Fetching logs for {:?}", mypod);

info!("Fetching logs for {:?}", app.pod);
let pods: Api<Pod> = Api::default_namespaced(client);
let mut logs = pods
.log_stream(&mypod, &LogParams {
follow: true,
tail_lines: Some(1),
.log_stream(&app.pod, &LogParams {
follow: app.follow,
container: app.container,
tail_lines: app.tail,
since_seconds: app.since,
timestamps: app.timestamps,
..LogParams::default()
})
.await?
.lines();

while let Some(line) = logs.try_next().await? {
info!("{}", line);
println!("{}", line);
}
Ok(())
}

0 comments on commit 259ed96

Please sign in to comment.