Skip to content

Commit

Permalink
add egress control loop for setting egress status label on unresponsive
Browse files Browse the repository at this point in the history
nodes
  • Loading branch information
evanharmon committed Aug 30, 2024
1 parent 1b1d204 commit c0dd103
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
11 changes: 4 additions & 7 deletions eip_operator/src/controller/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ impl k8s_controller::Context for Context {
self.namespace.as_deref().unwrap_or("default"),
);

// Check for unresponsive egress nodes and cleanup their egress status.
// This check must be done before any checks for EIPS. The EIPS may be
// already unassociated from the node.
let node_api = Api::<Node>::all(client.clone());
cleanup_old_egress_nodes(&eip_api, &node_api).await?;

let node_ip = node.ip().ok_or(Error::MissingNodeIp)?;
let node_labels = node.labels();
let node_condition_ready_status =
Expand Down Expand Up @@ -261,7 +255,10 @@ fn get_nodes_ready_status(node_list: Vec<Node>) -> Result<BTreeMap<String, Strin
/// Update their status label if another node is available.
/// Note: Egress traffic will be immediately dropped once the egress status label is changed away from "true"
#[instrument(skip(), err)]
async fn cleanup_old_egress_nodes(eip_api: &Api<Eip>, node_api: &Api<Node>) -> Result<(), Error> {
pub(crate) async fn cleanup_old_egress_nodes(
eip_api: &Api<Eip>,
node_api: &Api<Node>,
) -> Result<(), Error> {
// Gather a list of egress nodes and EIPs to check for potential cleanup.
let node_list = get_egress_nodes(&Api::all(node_api.clone().into())).await?;
if node_list.len() < 2 {
Expand Down
33 changes: 32 additions & 1 deletion eip_operator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::collections::{HashMap, HashSet};
use std::pin::pin;
use std::time::Duration;

use aws_sdk_ec2::types::Filter;
use aws_sdk_ec2::Client as Ec2Client;
use aws_sdk_servicequotas::types::ServiceQuota;
use aws_sdk_servicequotas::Client as ServiceQuotaClient;
use futures::future::join_all;
use futures::TryStreamExt;
use json_patch::{PatchOperation, RemoveOperation, TestOperation};
use k8s_controller::Controller;
use k8s_openapi::api::core::v1::Pod;
use k8s_openapi::api::core::v1::{Node, Pod};
use kube::api::{Api, ListParams, Patch, PatchParams};
use kube::{Client, ResourceExt};
use tokio::task;
Expand Down Expand Up @@ -92,6 +94,9 @@ async fn run() -> Result<(), Error> {
None => Api::<Eip>::all(k8s_client.clone()),
};

debug!("Getting node api");
let node_api = Api::<Node>::all(k8s_client.clone());

debug!("Cleaning up any orphaned EIPs");
cleanup_orphan_eips(
&ec2_client,
Expand Down Expand Up @@ -152,6 +157,32 @@ async fn run() -> Result<(), Error> {
task::spawn(eip_controller.run())
});

tasks.push({
let eip_api = eip_api.clone();
let node_api = node_api.clone();
let watch_config =
kube_runtime::watcher::Config::default().labels(EGRESS_GATEWAY_NODE_SELECTOR_LABEL_KEY);

task::spawn(async move {
let mut watcher = pin!(kube_runtime::watcher(node_api.clone(), watch_config));

while let Some(node) = watcher.try_next().await.unwrap_or_else(|e| {
event!(Level::ERROR, err = %e, "Error watching nodes");
None
}) {
if let kube_runtime::watcher::Event::Applied(_)
| kube_runtime::watcher::Event::Deleted(_) = node
{
if let Err(err) =
controller::node::cleanup_old_egress_nodes(&eip_api, &node_api).await
{
event!(Level::ERROR, err = %err, "Node egress cleanup reporting error");
}
}
}
})
});

join_all(tasks).await;

debug!("exiting");
Expand Down

0 comments on commit c0dd103

Please sign in to comment.