Skip to content

Commit

Permalink
refactor egress labeler
Browse files Browse the repository at this point in the history
  • Loading branch information
evanharmon committed Sep 11, 2024
1 parent f3171cc commit ea2d3cb
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 89 deletions.
2 changes: 1 addition & 1 deletion eip_operator/src/controller/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl k8s_controller::Context for Context {
});
if let Some(eip) = node_eip {
warn!(
"Node {} is in an unknown state, disassociating EIP {}",
"Node {} is in an unresponsive state, detaching EIP {}",
&name.clone(),
&eip.name_unchecked()
);
Expand Down
121 changes: 37 additions & 84 deletions eip_operator/src/egress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::eip::v2::Eip;
use crate::Error;
use k8s_openapi::api::core::v1::Node;
use kube::api::{Api, ListParams, Patch, PatchParams};
use std::collections::{BTreeMap, BTreeSet};
use kube::ResourceExt;
use tracing::{info, instrument};

use crate::EGRESS_GATEWAY_NODE_SELECTOR_LABEL_KEY;
Expand Down Expand Up @@ -56,97 +56,50 @@ async fn get_egress_nodes(api: &Api<Node>) -> Result<Vec<Node>, kube::Error> {
/// This controls whether traffic is allowed to egress through the node.
/// Note: Egress traffic will be immediately dropped when the ready status label value is changed away from "true".
#[instrument(skip(), err)]
pub(crate) async fn label_egress_nodes(
eip_api: &Api<Eip>,
node_api: &Api<Node>,
) -> Result<(), Error> {
pub(crate) async fn label_egress_nodes(eip: &Eip, node_api: &Api<Node>) -> Result<(), Error> {
let egress_nodes = get_egress_nodes(&Api::all(node_api.clone().into())).await?;
if egress_nodes.is_empty() {
info!("No egress nodes found. Skipping egress node ready status labeling.");
return Ok(());
}

// Build up a list of EIP resourceId's to check EIP attachment against node names.
let eip_resource_ids: BTreeSet<String> = eip_api
.list(&ListParams::default())
.await?
.items
.into_iter()
.filter_map(|eip| eip.status.and_then(|s| s.resource_id))
.collect();

// Build up a map of egress node names along with their
// ready status and whether or not they have an EIP attached.
let mut node_map: BTreeMap<String, (String, bool)> = BTreeMap::new();
for node in egress_nodes {
if let Some(ref node_name) = node.metadata.name {
let ready_status = node
.status
.as_ref()
.and_then(|status| status.conditions.as_ref())
.and_then(|conditions| conditions.iter().find(|c| c.type_ == "Ready"))
.map(|condition| condition.status.clone())
.ok_or(Error::MissingNodeReadyCondition)?;
let eip_attached_status = eip_resource_ids.contains(node_name.as_str());
node_map.insert(node_name.clone(), (ready_status, eip_attached_status));
// Note(Evan): find nodes that match eips we're reconciling
// if eip has a resource id, see if the node with the resoruce is ready
// if no, do nothing
// if yes, mark that node as egress_ready=true, and mark all other nodes as egress_ready=false
if let Some(resource_id) = eip.status.as_ref().and_then(|s| s.resource_id.as_ref()) {
let node = egress_nodes
.iter()
.find(|node| {
node.metadata
.name
.as_ref()
.map(|n| n == resource_id)
.unwrap_or(false)
})
.ok_or(Error::MissingNodeName)?;
let node_ready_status = node
.status
.as_ref()
.and_then(|status| status.conditions.as_ref())
.and_then(|conditions| conditions.iter().find(|c| c.type_ == "Ready"))
.map(|condition| condition.status.clone())
.ok_or(Error::MissingNodeReadyCondition)?;
if node_ready_status != "True" {
return Ok(());
} else {
// mark node with EIP as ready for egress traffic
add_gateway_status_label(node_api, node.name_unchecked().as_str(), "true").await?;
// mark all other nodes as not ready for egress traffic
for other_node in egress_nodes
.iter()
.filter(|n| n.name_unchecked() != node.name_unchecked())
{
add_gateway_status_label(node_api, other_node.name_unchecked().as_str(), "false")
.await?;
}
}
}

let egress_nodes_ready_with_eip: BTreeMap<String, (String, bool)> = node_map
.iter()
.filter(|(_, &(ref ready_status, eip_attached_status))| {
ready_status == "True" && eip_attached_status
})
.map(|(node_name, &(ref ready_status, eip_attached_status))| {
(
node_name.clone(),
(ready_status.clone(), eip_attached_status),
)
})
.collect();

// Wait to label egress nodes until an EIP is attached.
// Setting the ready status label to "ready" routes egress traffic through the node
// Wait to label egress nodes until they are ready and have an EIP attached.
if egress_nodes_ready_with_eip.is_empty() {
info!(
"No egress nodes found with a ready status and attached EIP. Skipping egress labeling."
);
return Ok(());
}

// At least one egress node should be ready with an EIP attached in this current implementation.
// This allows the ready status label to be set and traffic to be routed through the node.
assert!(egress_nodes_ready_with_eip.len() == 1);
if let Some((first_key, _)) = egress_nodes_ready_with_eip.first_key_value() {
let node_name = first_key.clone();
add_gateway_status_label(node_api, node_name.as_str(), "true").await?;
}

// We immediately disassociate EIPs from egress nodes that are in an
// unresponsive state in the eip node controller.
// This allows the EIP to be re-associated with a new healthy node.
// However, unresponsive nodes may still exist with a ready-status egress label
// set to "true". This allows the old node to still serve traffic if possible until a
// new node is ready to take over.
// Clean up unresponsive egress nodes in a ready state of `Unknown` if another node is ready with an EIP.
// Egress nodes in a ready state of "False" are not re-labelled since they should be able to be cleaned
// up by normal methods.
let egress_nodes_not_ready_without_eip: BTreeMap<String, (String, bool)> = node_map
.iter()
.filter(|(_, &(ref ready_status, eip_attached_status))| {
(ready_status == "Unknown") && !eip_attached_status
})
.map(|(node_name, &(ref ready_status, eip_attached_status))| {
(
node_name.clone(),
(ready_status.clone(), eip_attached_status),
)
})
.collect();
for node_name in egress_nodes_not_ready_without_eip.keys() {
add_gateway_status_label(node_api, node_name.as_str(), "false").await?;
}

Ok(())
}
8 changes: 4 additions & 4 deletions eip_operator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,14 @@ async fn run() -> Result<(), Error> {
task::spawn(async move {
let mut watcher = pin!(kube_runtime::watcher(eip_api.clone(), watch_config));

while let Some(eip) = watcher.try_next().await.unwrap_or_else(|e| {
while let Some(eip_event) = watcher.try_next().await.unwrap_or_else(|e| {
event!(Level::ERROR, err = %e, "Error watching eips");
None
}) {
if let kube_runtime::watcher::Event::Apply(_)
| kube_runtime::watcher::Event::Delete(_) = eip
if let kube_runtime::watcher::Event::Apply(eip)
| kube_runtime::watcher::Event::Delete(eip) = eip_event
{
if let Err(err) = crate::egress::label_egress_nodes(&eip_api, &node_api).await {
if let Err(err) = crate::egress::label_egress_nodes(&eip, &node_api).await {
event!(Level::ERROR, err = %err, "Node egress labeling reporting error");
}
}
Expand Down

0 comments on commit ea2d3cb

Please sign in to comment.