Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change context #91

Merged
merged 12 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ Kubernetes resource watching tool

![](screenshots/example.png)


## Features

- Pods list and container logs watching
- ConfigMap and secret watching, and data decoding
- Events watching
- Specific resources watching
- Namespace multiple selections
- Context selection
- Support unix like key bindings
- Support mouse event

Expand All @@ -38,6 +38,7 @@ General
| Key | Description |
| ------------------------------------ | ----------------------------------------------------------------------- |
| <kbd>n</kbd> | Open the popup for selecting the namespace |
| <kbd>c</kbd> | Open the popup for selecting the context |
| <kbd>Shift+n</kbd> | Open the popup for selecting multiple namespaces |
| <kbd>f</kbd>, <kbd>/</kbd> | Open the popup for selecting multiple api-resources (**only APIs tab**) |
| <kbd>Tab</kbd>, <kbd>Shift+Tab</kbd> | Change the focus of view within the active tab |
Expand Down Expand Up @@ -104,7 +105,6 @@ APIs

![](screenshots/apis.png)


### Select Items

Select namespaces (single)
Expand Down
238 changes: 116 additions & 122 deletions event/src/kubernetes/api_resources.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,92 @@
use crossbeam::channel::Sender;
use super::{
worker::{PollWorker, Worker},
KubeClient,
{v1_table::*, ApiResources},
{Event, Kube},
};

use super::{metric_type::*, WorkerResult};
use crate::error::Result;

use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
APIGroupList, APIResource, APIResourceList, APIVersions, GroupVersionForDiscovery,
};
use k8s_openapi::Resource;
use kube::Client;
use std::sync::Arc;

use std::time;
use tokio::time::Instant;

use super::{
request::{get_request, get_table_request},
{v1_table::*, ApiResources, KubeArgs, Namespaces},
{Event, Kube},
};

use futures::future::try_join_all;
use serde_json::Value as JsonValue;

use async_trait::async_trait;
use std::collections::{HashMap, HashSet};

use super::metric_type::*;
use crate::error::Result;
#[derive(Clone)]
pub struct ApiPollWorker {
inner: PollWorker,
api_resources: ApiResources,
}

impl ApiPollWorker {
pub fn new(inner: PollWorker, api_resources: ApiResources) -> Self {
Self {
inner,
api_resources,
}
}
}

#[async_trait]
impl Worker for ApiPollWorker {
type Output = Result<WorkerResult>;

async fn run(&self) -> Self::Output {
let Self {
inner:
PollWorker {
is_terminated,
tx,
namespaces,
kube_client,
},
api_resources,
} = self;

let mut interval = tokio::time::interval(time::Duration::from_millis(1000));

let api_info_list = get_all_api_info(kube_client).await?;

let mut db = convert_api_database(&api_info_list);

let mut last_tick = Instant::now();
let tick_rate = time::Duration::from_secs(10);

while !is_terminated.load(std::sync::atomic::Ordering::Relaxed) {
interval.tick().await;
let namespaces = namespaces.read().await;
let apis = api_resources.read().await;

if apis.is_empty() {
continue;
}

if tick_rate < last_tick.elapsed() {
last_tick = Instant::now();

let api_info_list = get_all_api_info(kube_client).await?;

db = convert_api_database(&api_info_list);
}

let result = get_api_resources(kube_client, &namespaces, &apis, &db).await;

tx.send(Event::Kube(Kube::APIsResults(result))).unwrap();
}

Ok(WorkerResult::Terminated)
}
}

#[derive(Debug, Clone)]
struct APIInfo {
Expand Down Expand Up @@ -65,8 +131,8 @@ fn is_preferred_version(
preferred_version.as_ref().map(|gv| gv.version == version)
}

pub async fn apis_list(client: &Client, server_url: &str) -> Result<Vec<String>> {
let api_info_list = get_all_api_info(client, server_url).await?;
pub async fn apis_list(client: KubeClient) -> Result<Vec<String>> {
let api_info_list = get_all_api_info(&client).await?;

let set: HashSet<String> = api_info_list
.iter()
Expand All @@ -85,36 +151,30 @@ pub async fn apis_list(client: &Client, server_url: &str) -> Result<Vec<String>>
Ok(ret)
}

async fn get_all_api_info(client: &Client, server_url: &str) -> Result<Vec<APIInfo>> {
async fn get_all_api_info(client: &KubeClient) -> Result<Vec<APIInfo>> {
let mut group_versions = Vec::new();

let result: Result<APIVersions, kube::Error> =
client.request(get_request(server_url, "api")?).await;
let api_versions: APIVersions = client.request("api").await?;

if let Ok(api_versions) = result.as_ref() {
api_versions.versions.iter().for_each(|v| {
group_versions.push(GroupVersion {
group: String::default(),
version: v.to_string(),
preferred_version: None,
})
});
}
api_versions.versions.iter().for_each(|v| {
group_versions.push(GroupVersion {
group: String::default(),
version: v.to_string(),
preferred_version: None,
})
});

let result: Result<APIGroupList, kube::Error> =
client.request(get_request(server_url, "apis")?).await;

if let Ok(api_group_list) = result.as_ref() {
api_group_list.groups.iter().for_each(|group| {
group.versions.iter().for_each(|gv| {
group_versions.push(GroupVersion {
group: group.name.to_string(),
version: gv.version.to_string(),
preferred_version: is_preferred_version(&gv.version, &group.preferred_version),
})
let api_group_list: APIGroupList = client.request("apis").await?;

api_group_list.groups.iter().for_each(|group| {
group.versions.iter().for_each(|gv| {
group_versions.push(GroupVersion {
group: group.name.to_string(),
version: gv.version.to_string(),
preferred_version: is_preferred_version(&gv.version, &group.preferred_version),
})
});
}
})
});

// APIResourceListを取得
// /api/v1
Expand All @@ -125,7 +185,7 @@ async fn get_all_api_info(client: &Client, server_url: &str) -> Result<Vec<APIIn
let job = try_join_all(
group_versions
.iter()
.map(|gv| api_resource_list_to_api_info_list(client, server_url, gv)),
.map(|gv| api_resource_list_to_api_info_list(client, gv)),
)
.await?;

Expand All @@ -137,13 +197,10 @@ fn can_get_request(api: &APIResource) -> bool {
}

async fn api_resource_list_to_api_info_list(
client: &Client,
server_url: &str,
client: &KubeClient,
gv: &GroupVersion,
) -> Result<Vec<APIInfo>> {
let result = client
.request::<APIResourceList>(get_request(server_url, &gv.api_url())?)
.await?;
let result = client.request::<APIResourceList>(&gv.api_url()).await?;

Ok(result
.resources
Expand Down Expand Up @@ -188,7 +245,7 @@ fn convert_api_database(api_info_list: &[APIInfo]) -> HashMap<String, APIInfo> {
db
}

fn merge_tabels(fetch_data: Vec<FetchData>, insert_ns: bool) -> Table {
fn merge_tables(fetch_data: Vec<FetchData>, insert_ns: bool) -> Table {
if fetch_data.is_empty() {
return Table::default();
}
Expand Down Expand Up @@ -239,26 +296,20 @@ fn header_by_api_info(info: &APIInfo) -> String {
}
}

async fn try_fetch_table(client: &Client, server_url: &str, path: String) -> Result<Table> {
let table = client
.request::<Table>(get_table_request(server_url, &path)?)
.await;
async fn try_fetch_table(client: &KubeClient, path: &str) -> Result<Table> {
let table = client.table_request::<Table>(path).await;

if let Ok(t) = table {
return Ok(t);
}

let table = client
.request::<NodeMetricsList>(get_table_request(server_url, &path)?)
.await;
let table = client.table_request::<NodeMetricsList>(path).await;

if let Ok(t) = table {
return Ok(t.into());
}

let table = client
.request::<PodMetricsList>(get_table_request(server_url, &path)?)
.await?;
let table = client.table_request::<PodMetricsList>(path).await?;

Ok(table.into())
}
Expand All @@ -269,12 +320,11 @@ struct FetchData {
}

async fn fetch_table_per_namespace(
client: &Client,
server_url: &str,
client: &KubeClient,
path: String,
ns: &str,
) -> Result<FetchData> {
let table = try_fetch_table(client, server_url, path).await?;
let table = try_fetch_table(client, &path).await?;

Ok(FetchData {
namespace: ns.to_string(),
Expand All @@ -284,39 +334,29 @@ async fn fetch_table_per_namespace(

#[inline]
async fn get_table_namespaced_resource(
client: &Client,
server_url: &str,
client: &KubeClient,
path: String,
kind: &str,
namespaces: &[String],
) -> Result<Table> {
let jobs = try_join_all(namespaces.iter().map(|ns| {
fetch_table_per_namespace(
client,
server_url,
format!("{}/namespaces/{}/{}", path, ns, kind),
ns,
)
let path = format!("{}/namespaces/{}/{}", path, ns, kind);
fetch_table_per_namespace(client, path, ns)
}))
.await?;

let result: Vec<FetchData> = jobs.into_iter().collect();

Ok(merge_tabels(result, insert_ns(namespaces)))
Ok(merge_tables(result, insert_ns(namespaces)))
}

#[inline]
async fn get_table_cluster_resource(
client: &Client,
server_url: &str,
path: String,
) -> Result<Table> {
Ok(try_fetch_table(client, server_url, path).await?)
async fn get_table_cluster_resource(client: &KubeClient, path: &str) -> Result<Table> {
Ok(try_fetch_table(client, path).await?)
}

async fn get_api_resources(
client: &Client,
server_url: &str,
client: &KubeClient,
namespaces: &[String],
apis: &[String],
db: &HashMap<String, APIInfo>,
Expand All @@ -328,7 +368,6 @@ async fn get_api_resources(
let table = if info.api_resource.namespaced {
get_table_namespaced_resource(
client,
server_url,
info.api_url(),
&info.api_resource.name,
namespaces,
Expand All @@ -337,8 +376,7 @@ async fn get_api_resources(
} else {
get_table_cluster_resource(
client,
server_url,
format!("{}/{}", info.api_url(), info.api_resource.name),
&format!("{}/{}", info.api_url(), info.api_resource.name),
)
.await
}?;
Expand All @@ -354,47 +392,3 @@ async fn get_api_resources(

Ok(ret)
}

pub async fn apis_loop(
tx: Sender<Event>,
namespace: Namespaces,
api_resources: ApiResources,
args: Arc<KubeArgs>,
) -> Result<()> {
let mut interval = tokio::time::interval(time::Duration::from_millis(1000));

let api_info_list = get_all_api_info(&args.client, &args.server_url).await?;

let mut db = convert_api_database(&api_info_list);

let mut last_tick = Instant::now();
let tick_rate = time::Duration::from_secs(10);

while !args
.is_terminated
.load(std::sync::atomic::Ordering::Relaxed)
{
interval.tick().await;
let namespaces = namespace.read().await;
let apis = api_resources.read().await;

if apis.is_empty() {
continue;
}

if tick_rate < last_tick.elapsed() {
last_tick = Instant::now();

let api_info_list = get_all_api_info(&args.client, &args.server_url).await?;

db = convert_api_database(&api_info_list);
}

let result =
get_api_resources(&args.client, &args.server_url, &namespaces, &apis, &db).await;

tx.send(Event::Kube(Kube::APIsResults(result))).unwrap();
}

Ok(())
}
Loading