From ea2d3cb324e0e911fc3e09e9772aded61fa2a3ce Mon Sep 17 00:00:00 2001 From: Evan Harmon Date: Mon, 9 Sep 2024 18:09:20 -0400 Subject: [PATCH] refactor egress labeler --- eip_operator/src/controller/node.rs | 2 +- eip_operator/src/egress.rs | 121 +++++++++------------------- eip_operator/src/main.rs | 8 +- 3 files changed, 42 insertions(+), 89 deletions(-) diff --git a/eip_operator/src/controller/node.rs b/eip_operator/src/controller/node.rs index b112c92..af08a44 100644 --- a/eip_operator/src/controller/node.rs +++ b/eip_operator/src/controller/node.rs @@ -93,7 +93,7 @@ impl k8s_controller::Context for Context { }); if let Some(eip) = node_eip { warn!( - "Node {} is in an unknown state, disassociating EIP {}", + "Node {} is in an unresponsive state, detaching EIP {}", &name.clone(), &eip.name_unchecked() ); diff --git a/eip_operator/src/egress.rs b/eip_operator/src/egress.rs index 970d9df..f4d627d 100644 --- a/eip_operator/src/egress.rs +++ b/eip_operator/src/egress.rs @@ -2,7 +2,7 @@ use crate::eip::v2::Eip; use crate::Error; use k8s_openapi::api::core::v1::Node; use kube::api::{Api, ListParams, Patch, PatchParams}; -use std::collections::{BTreeMap, BTreeSet}; +use kube::ResourceExt; use tracing::{info, instrument}; use crate::EGRESS_GATEWAY_NODE_SELECTOR_LABEL_KEY; @@ -56,97 +56,50 @@ async fn get_egress_nodes(api: &Api) -> Result, kube::Error> { /// This controls whether traffic is allowed to egress through the node. /// Note: Egress traffic will be immediately dropped when the ready status label value is changed away from "true". #[instrument(skip(), err)] -pub(crate) async fn label_egress_nodes( - eip_api: &Api, - node_api: &Api, -) -> Result<(), Error> { +pub(crate) async fn label_egress_nodes(eip: &Eip, node_api: &Api) -> Result<(), Error> { let egress_nodes = get_egress_nodes(&Api::all(node_api.clone().into())).await?; if egress_nodes.is_empty() { info!("No egress nodes found. Skipping egress node ready status labeling."); return Ok(()); } - // Build up a list of EIP resourceId's to check EIP attachment against node names. - let eip_resource_ids: BTreeSet = eip_api - .list(&ListParams::default()) - .await? - .items - .into_iter() - .filter_map(|eip| eip.status.and_then(|s| s.resource_id)) - .collect(); - - // Build up a map of egress node names along with their - // ready status and whether or not they have an EIP attached. - let mut node_map: BTreeMap = BTreeMap::new(); - for node in egress_nodes { - if let Some(ref node_name) = node.metadata.name { - let ready_status = node - .status - .as_ref() - .and_then(|status| status.conditions.as_ref()) - .and_then(|conditions| conditions.iter().find(|c| c.type_ == "Ready")) - .map(|condition| condition.status.clone()) - .ok_or(Error::MissingNodeReadyCondition)?; - let eip_attached_status = eip_resource_ids.contains(node_name.as_str()); - node_map.insert(node_name.clone(), (ready_status, eip_attached_status)); + // Note(Evan): find nodes that match eips we're reconciling + // if eip has a resource id, see if the node with the resoruce is ready + // if no, do nothing + // if yes, mark that node as egress_ready=true, and mark all other nodes as egress_ready=false + if let Some(resource_id) = eip.status.as_ref().and_then(|s| s.resource_id.as_ref()) { + let node = egress_nodes + .iter() + .find(|node| { + node.metadata + .name + .as_ref() + .map(|n| n == resource_id) + .unwrap_or(false) + }) + .ok_or(Error::MissingNodeName)?; + let node_ready_status = node + .status + .as_ref() + .and_then(|status| status.conditions.as_ref()) + .and_then(|conditions| conditions.iter().find(|c| c.type_ == "Ready")) + .map(|condition| condition.status.clone()) + .ok_or(Error::MissingNodeReadyCondition)?; + if node_ready_status != "True" { + return Ok(()); + } else { + // mark node with EIP as ready for egress traffic + add_gateway_status_label(node_api, node.name_unchecked().as_str(), "true").await?; + // mark all other nodes as not ready for egress traffic + for other_node in egress_nodes + .iter() + .filter(|n| n.name_unchecked() != node.name_unchecked()) + { + add_gateway_status_label(node_api, other_node.name_unchecked().as_str(), "false") + .await?; + } } } - let egress_nodes_ready_with_eip: BTreeMap = node_map - .iter() - .filter(|(_, &(ref ready_status, eip_attached_status))| { - ready_status == "True" && eip_attached_status - }) - .map(|(node_name, &(ref ready_status, eip_attached_status))| { - ( - node_name.clone(), - (ready_status.clone(), eip_attached_status), - ) - }) - .collect(); - - // Wait to label egress nodes until an EIP is attached. - // Setting the ready status label to "ready" routes egress traffic through the node - // Wait to label egress nodes until they are ready and have an EIP attached. - if egress_nodes_ready_with_eip.is_empty() { - info!( - "No egress nodes found with a ready status and attached EIP. Skipping egress labeling." - ); - return Ok(()); - } - - // At least one egress node should be ready with an EIP attached in this current implementation. - // This allows the ready status label to be set and traffic to be routed through the node. - assert!(egress_nodes_ready_with_eip.len() == 1); - if let Some((first_key, _)) = egress_nodes_ready_with_eip.first_key_value() { - let node_name = first_key.clone(); - add_gateway_status_label(node_api, node_name.as_str(), "true").await?; - } - - // We immediately disassociate EIPs from egress nodes that are in an - // unresponsive state in the eip node controller. - // This allows the EIP to be re-associated with a new healthy node. - // However, unresponsive nodes may still exist with a ready-status egress label - // set to "true". This allows the old node to still serve traffic if possible until a - // new node is ready to take over. - // Clean up unresponsive egress nodes in a ready state of `Unknown` if another node is ready with an EIP. - // Egress nodes in a ready state of "False" are not re-labelled since they should be able to be cleaned - // up by normal methods. - let egress_nodes_not_ready_without_eip: BTreeMap = node_map - .iter() - .filter(|(_, &(ref ready_status, eip_attached_status))| { - (ready_status == "Unknown") && !eip_attached_status - }) - .map(|(node_name, &(ref ready_status, eip_attached_status))| { - ( - node_name.clone(), - (ready_status.clone(), eip_attached_status), - ) - }) - .collect(); - for node_name in egress_nodes_not_ready_without_eip.keys() { - add_gateway_status_label(node_api, node_name.as_str(), "false").await?; - } - Ok(()) } diff --git a/eip_operator/src/main.rs b/eip_operator/src/main.rs index f93eace..d7a66b5 100644 --- a/eip_operator/src/main.rs +++ b/eip_operator/src/main.rs @@ -167,14 +167,14 @@ async fn run() -> Result<(), Error> { task::spawn(async move { let mut watcher = pin!(kube_runtime::watcher(eip_api.clone(), watch_config)); - while let Some(eip) = watcher.try_next().await.unwrap_or_else(|e| { + while let Some(eip_event) = watcher.try_next().await.unwrap_or_else(|e| { event!(Level::ERROR, err = %e, "Error watching eips"); None }) { - if let kube_runtime::watcher::Event::Apply(_) - | kube_runtime::watcher::Event::Delete(_) = eip + if let kube_runtime::watcher::Event::Apply(eip) + | kube_runtime::watcher::Event::Delete(eip) = eip_event { - if let Err(err) = crate::egress::label_egress_nodes(&eip_api, &node_api).await { + if let Err(err) = crate::egress::label_egress_nodes(&eip, &node_api).await { event!(Level::ERROR, err = %err, "Node egress labeling reporting error"); } }