Skip to content

Commit

Permalink
Merge pull request #96 from sarub0b0/restore-kube-state
Browse files Browse the repository at this point in the history
Restore kube state
  • Loading branch information
sarub0b0 authored Oct 6, 2021
2 parents 5425e56 + e9833ed commit 4462453
Show file tree
Hide file tree
Showing 7 changed files with 571 additions and 391 deletions.
94 changes: 81 additions & 13 deletions event/src/kubernetes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use futures::future::select_all;
use k8s_openapi::api::core::v1::Namespace;
use worker::Worker;

use std::{convert::TryFrom, panic, sync::atomic::AtomicBool, sync::Arc, time::Duration};
use std::{
collections::HashMap, convert::TryFrom, panic, sync::atomic::AtomicBool, sync::Arc,
time::Duration,
};

use crossbeam::channel::{Receiver, Sender};
use tokio::{
Expand Down Expand Up @@ -100,6 +103,8 @@ pub enum Kube {
SetContext(String),
GetCurrentContextRequest,
GetCurrentContextResponse(String, String), // current_context, namespace
// Context Restore
RestoreNamespaces(String, Vec<String>), // default_namespace, selected_namespaces
// Event
Event(Result<Vec<String>>),
// Namespace
Expand Down Expand Up @@ -144,7 +149,7 @@ async fn current_namespace(client: KubeClient, named_context: &NamedContext) ->
} else {
let namespaces = namespace_list(client).await;

if namespaces.iter().find(|&ns| ns == "default").is_some() {
if namespaces.iter().any(|ns| ns == "default") {
Ok("default".to_string())
} else if !namespaces.is_empty() {
Ok(namespaces[0].to_string())
Expand All @@ -156,15 +161,36 @@ async fn current_namespace(client: KubeClient, named_context: &NamedContext) ->
}
}

pub type Namespaces = Arc<RwLock<Vec<String>>>;
pub type ApiResources = Arc<RwLock<Vec<String>>>;
pub(super) type Namespaces = Arc<RwLock<Vec<String>>>;
pub(super) type ApiResources = Arc<RwLock<Vec<String>>>;

#[derive(Clone)]
pub enum WorkerResult {
ChangedContext(String),
Terminated,
}

#[derive(Debug, Default)]
struct KubeState {
default_namespace: String,
selected_namespaces: Vec<String>, // selected
api_resources: Vec<String>,
}

impl KubeState {
fn new(
default_namespace: impl Into<String>,
namespaces: impl Into<Vec<String>>,
api_resources: impl Into<Vec<String>>,
) -> Self {
Self {
default_namespace: default_namespace.into(),
selected_namespaces: namespaces.into(),
api_resources: api_resources.into(),
}
}
}

async fn inner_kube_process(
tx: Sender<Event>,
rx: Receiver<Event>,
Expand All @@ -174,6 +200,8 @@ async fn inner_kube_process(

let mut context: Option<String> = None;

let mut kube_state: HashMap<String, KubeState> = HashMap::new();

while !is_terminated.load(std::sync::atomic::Ordering::Relaxed) {
let (kube_client, current_namespace, current_context) = if let Some(context) = &context {
let named_context = kubeconfig
Expand Down Expand Up @@ -221,24 +249,51 @@ async fn inner_kube_process(
(kube_client, current_namespace, current_context)
};

let namespaces = Arc::new(RwLock::new(vec![current_namespace.to_string()]));
let api_resources: ApiResources = Default::default();
// Restore
let (current_namespace, namespaces, api_resources) =
if let Some(state) = kube_state.get(&current_context) {
let KubeState {
default_namespace,
selected_namespaces: namespaces,
api_resources,
} = state;

tx.send(Event::Kube(Kube::RestoreNamespaces(
default_namespace.to_string(),
namespaces.to_owned(),
)))?;

(
default_namespace.to_string(),
namespaces.to_owned(),
api_resources.to_owned(),
)
} else {
tx.send(Event::Kube(Kube::GetCurrentContextResponse(
current_context.to_string(),
current_namespace.to_string(),
)))?;

(
current_namespace.to_string(),
vec![current_namespace],
Default::default(),
)
};

tx.send(Event::Kube(Kube::GetCurrentContextResponse(
current_context.to_string(),
current_namespace.to_string(),
)))?;
let shared_namespaces = Arc::new(RwLock::new(namespaces.clone()));
let shared_api_resources = Arc::new(RwLock::new(api_resources.clone()));

let poll_worker = PollWorker {
namespaces: namespaces.clone(),
namespaces: shared_namespaces.clone(),
tx: tx.clone(),
is_terminated: is_terminated.clone(),
kube_client,
};

let main_handler = MainWorker {
inner: poll_worker.clone(),
api_resources: api_resources.clone(),
api_resources: shared_api_resources.clone(),
rx: rx.clone(),
contexts: kubeconfig.contexts.clone(),
}
Expand All @@ -247,7 +302,8 @@ async fn inner_kube_process(
let pod_handler = PodPollWorker::new(poll_worker.clone()).spawn();
let config_handler = ConfigsPollWorker::new(poll_worker.clone()).spawn();
let event_handler = EventPollWorker::new(poll_worker.clone()).spawn();
let apis_handler = ApiPollWorker::new(poll_worker.clone(), api_resources).spawn();
let apis_handler =
ApiPollWorker::new(poll_worker.clone(), shared_api_resources.clone()).spawn();

let mut handlers = vec![
main_handler,
Expand Down Expand Up @@ -275,6 +331,18 @@ async fn inner_kube_process(
abort(&handlers);

context = Some(ctx);

let namespaces = shared_namespaces.read().await;
let api_resources = shared_api_resources.read().await;

kube_state.insert(
current_context.to_string(),
KubeState::new(
current_namespace.to_string(),
namespaces.to_vec(),
api_resources.to_vec(),
),
);
}
WorkerResult::Terminated => {}
},
Expand Down
188 changes: 188 additions & 0 deletions src/action.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use crossbeam::channel::Receiver;

use event::{
error::Result,
kubernetes::{Kube, KubeTable},
Event,
};

use tui_wrapper::{
event::{exec_to_window_event, EventResult},
widget::{WidgetItem, WidgetTrait},
Window, WindowEvent,
};

use crate::{Context, Namespace};

pub mod view_id {

#![allow(non_upper_case_globals)]
macro_rules! generate_id {
($id:ident) => {
pub const $id: &str = stringify!($id);
};
}

generate_id!(tab_pods);
generate_id!(tab_pods_widget_pods);
generate_id!(tab_pods_widget_logs);
generate_id!(tab_configs);
generate_id!(tab_configs_widget_configs);
generate_id!(tab_configs_widget_raw_data);
generate_id!(tab_event);
generate_id!(tab_event_widget_event);
generate_id!(tab_apis);
generate_id!(tab_apis_widget_apis);

generate_id!(subwin_ctx);
generate_id!(subwin_ns);
generate_id!(subwin_apis);
generate_id!(subwin_single_ns);
}

macro_rules! error_format {
($fmt:literal, $($arg:tt)*) => {
format!(concat!("\x1b[31m", $fmt,"\x1b[39m"), $($arg)*)

};
}

pub fn window_action(window: &mut Window, rx: &Receiver<Event>) -> WindowEvent {
match rx.recv().unwrap() {
Event::User(ev) => match window.on_event(ev) {
EventResult::Nop => {}

EventResult::Ignore => {
if let Some(cb) = window.match_callback(ev) {
if let EventResult::Window(ev) = (cb)(window) {
return ev;
}
}
}
ev @ EventResult::Callback(_) => {
return exec_to_window_event(ev, window);
}
EventResult::Window(ev) => {
return ev;
}
},

Event::Tick => {}
Event::Kube(k) => return WindowEvent::UpdateContents(k),
Event::Error(_) => {}
}
WindowEvent::Continue
}

fn update_widget_item_for_table(window: &mut Window, id: &str, table: Result<KubeTable>) {
let widget = window.find_widget_mut(id);
let w = widget.as_mut_table();

match table {
Ok(table) => {
if w.equal_header(table.header()) {
w.update_widget_item(WidgetItem::DoubleArray(table.rows().to_owned()));
} else {
w.update_header_and_rows(table.header(), table.rows());
}
}
Err(e) => {
w.update_header_and_rows(&["ERROR".to_string()], &[vec![error_format!("{}", e)]]);
}
}
}

fn update_widget_item_for_vec(window: &mut Window, id: &str, vec: Result<Vec<String>>) {
let widget = window.find_widget_mut(id);
match vec {
Ok(i) => {
widget.update_widget_item(WidgetItem::Array(i));
}
Err(i) => {
widget.update_widget_item(WidgetItem::Array(vec![error_format!("{}", i)]));
}
}
}

pub fn update_contents(
window: &mut Window,
ev: Kube,
context: &mut Context,
namespace: &mut Namespace,
) {
match ev {
Kube::Pod(pods_table) => {
update_widget_item_for_table(window, view_id::tab_pods_widget_pods, pods_table);
}

Kube::Configs(configs_table) => {
update_widget_item_for_table(
window,
view_id::tab_configs_widget_configs,
configs_table,
);
}

Kube::LogStreamResponse(logs) => {
let widget = window.find_widget_mut(view_id::tab_pods_widget_logs);

match logs {
Ok(i) => {
widget.append_widget_item(WidgetItem::Array(i));
}
Err(i) => {
widget.append_widget_item(WidgetItem::Array(vec![error_format!("{}", i)]));
}
}
}

Kube::ConfigResponse(raw) => {
update_widget_item_for_vec(window, view_id::tab_configs_widget_raw_data, raw);
}

Kube::GetCurrentContextResponse(ctx, ns) => {
context.update(ctx);
namespace.default = ns.to_string();
namespace.selected = vec![ns];
}

Kube::Event(ev) => {
update_widget_item_for_vec(window, view_id::tab_event_widget_event, ev);
}

Kube::APIsResults(apis) => {
update_widget_item_for_vec(window, view_id::tab_apis_widget_apis, apis);
}

Kube::GetNamespacesResponse(ns) => {
window
.find_widget_mut(view_id::subwin_ns)
.update_widget_item(WidgetItem::Array(ns.to_vec()));
window
.find_widget_mut(view_id::subwin_single_ns)
.update_widget_item(WidgetItem::Array(ns));

let widget = window
.find_widget_mut(view_id::subwin_ns)
.as_mut_multiple_select();

if widget.selected_items().is_empty() {
widget.select_item(&namespace.default)
}
}

Kube::GetAPIsResponse(apis) => {
update_widget_item_for_vec(window, view_id::subwin_apis, apis);
}

Kube::GetContextsResponse(ctxs) => {
update_widget_item_for_vec(window, view_id::subwin_ctx, ctxs);
}

Kube::RestoreNamespaces(default, selected) => {
namespace.default = default;
namespace.selected = selected;
}
_ => unreachable!(),
}
}
Loading

0 comments on commit 4462453

Please sign in to comment.