Skip to content

Commit

Permalink
Refactor EIP Operator
Browse files Browse the repository at this point in the history
        - transition eip/pod watchers to mostly adjust tags
        - update logic of eip controller to:
          1. validate current claim or remove
          2. detatch if attatched and unclaimed
          3. find a pod/node to claim
          4. attach to node or pod node
          5. update status to refect current state
  • Loading branch information
jubrad committed Jul 21, 2023
1 parent 2d8d404 commit a69caab
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 161 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

164 changes: 155 additions & 9 deletions eip_operator/src/controller/eip.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::collections::HashMap;

use kube::api::Api;
use kube::{Client, ResourceExt};
use k8s_openapi::api::core::v1::{Node, Pod};
use kube::api::{Api, ListParams};
use kube::Client;
use kube_runtime::controller::Action;
use tracing::instrument;
use tracing::{info, warn};

use eip_operator_shared::Error;

use crate::eip::v2::Eip;
use crate::eip::v2::{Eip, EipSelector};
use crate::kube_ext::{NodeExt, PodExt};

pub(crate) struct Context {
ec2_client: aws_sdk_ec2::Client,
Expand Down Expand Up @@ -42,11 +45,16 @@ impl k8s_controller::Context for Context {
client: Client,
eip: &Self::Resource,
) -> Result<Option<Action>, Self::Error> {
let eip_api = Api::namespaced(client, &eip.namespace().unwrap());
let eip_api = Api::namespaced(client.clone(), eip.namespace().unwrap());
let pod_api = Api::<Pod>::namespaced(client.clone(), eip.namespace().unwrap());
let node_api = Api::<Node>::all(client.clone());

// Ensure EIP is Created
let uid = eip.metadata.uid.as_ref().ok_or(Error::MissingEipUid)?;
let name = eip.metadata.name.as_ref().ok_or(Error::MissingEipName)?;
let selector = &eip.spec.selector;
let mut status = eip.status.clone().unwrap_or_default();

let addresses = crate::aws::describe_addresses_with_tag_value(
&self.ec2_client,
crate::aws::EIP_UID_TAG,
Expand All @@ -55,21 +63,23 @@ impl k8s_controller::Context for Context {
.await?
.addresses
.ok_or(Error::MissingAddresses)?;
let (allocation_id, public_ip) = match addresses.len() {
let allocation_id = match addresses.len() {
0 => {
let response = crate::aws::allocate_address(
&self.ec2_client,
uid,
name,
selector,
&self.cluster_name,
&eip.namespace().unwrap(),
eip.namespace().unwrap(),
&self.default_tags,
)
.await?;
let allocation_id = response.allocation_id.ok_or(Error::MissingAllocationId)?;
let public_ip = response.public_ip.ok_or(Error::MissingPublicIp)?;
(allocation_id, public_ip)
status.allocation_id = Some(allocation_id.clone());
status.public_ip_address = Some(public_ip);
allocation_id
}
1 => {
let allocation_id = addresses[0]
Expand All @@ -80,13 +90,149 @@ impl k8s_controller::Context for Context {
.public_ip
.as_ref()
.ok_or(Error::MissingPublicIp)?;
(allocation_id.to_owned(), public_ip.to_owned())
status.allocation_id = Some(allocation_id.clone());
status.public_ip_address = Some(public_ip.clone());
allocation_id.to_owned()
}
_ => {
return Err(Error::MultipleEipsTaggedForPod);
}
};
crate::eip::set_status_created(&eip_api, name, &allocation_id, &public_ip).await?;

// disassociate if unclaimed or if claimed and the node / pod terminating
let mut disassociate = status.claim.is_none();
if let Some(claim) = status.claim.clone() {
match &eip.spec.selector {
EipSelector::Node { selector: _ } => {
if let Some(node) = node_api.get_opt(&claim.clone()).await? {
disassociate = node.metadata.deletion_timestamp.is_some();
}
}
EipSelector::Pod { pod_name: _ } => {
if let Some(pod) = pod_api.get_opt(&claim.clone()).await? {
disassociate = pod.metadata.deletion_timestamp.is_some()
}
}
}
}
if disassociate {
crate::aws::disassociate_eip(&self.ec2_client, &allocation_id).await?;
status.claim = None;
status.eni = None;
status.private_ip_address = None;
}

// search for new resource to be claimed by
match eip.spec.selector {
EipSelector::Node { selector: _ } => {
let nodes: Vec<Node> = node_api
.list(&ListParams::default())
.await?
.into_iter()
.filter(|node| eip.matches_node(node))
.collect();
match nodes.len() {
0 => {
warn!("Eip {} matches no nodes", name);
}
1 => {
let node_name = nodes[0]
.metadata
.name
.clone()
.ok_or(Error::MissingNodeName)?;
info!("Eip {} matches node {}, updating claim", name, node_name,);
status.claim = Some(node_name);
}
_ => {
warn!(
"Eip {} matches multiple nodes - {}",
name,
nodes
.iter()
.map(|node| { node.metadata.name.clone().unwrap_or_default() })
.collect::<Vec<String>>()
.join(",")
);
}
}
}
EipSelector::Pod { pod_name: _ } => {
let pods: Vec<Pod> = pod_api
.list(&ListParams::default())
.await?
.into_iter()
.filter(|pod| eip.matches_pod(pod))
.collect();
match pods.len() {
0 => {
warn!("Eip {} matches no pods", name);
}
1 => {
info!(
"Eip {} matches pod {}, updating claim",
name,
pods[0].metadata.name.clone().ok_or(Error::MissingPodName)?,
);
status.claim =
Some(pods[0].metadata.name.clone().ok_or(Error::MissingPodName)?);
}
_ => {
info!(
"Eip {} matches multiple pods - {}",
name,
pods.iter()
.map(|pod| { pod.metadata.name.clone().unwrap_or_default() })
.collect::<Vec<String>>()
.join(",")
);
}
}
}
}

// Associate if claimed
if status.claim.is_some() && !eip.attached() {
let claim = status.claim.clone().unwrap();
let (node, ip) = match &eip.spec.selector {
EipSelector::Node { selector: _ } => {
let node = node_api.get_opt(&claim).await?.ok_or(Error::MissingNode)?;
let node_ip = node.ip().ok_or(Error::MissingNodeIp)?;
(node.to_owned(), node_ip.to_owned())
}
EipSelector::Pod { pod_name: _ } => {
let pod = pod_api.get_opt(&claim).await?.ok_or(Error::MissingPod)?;
let node_name = pod.node_name().ok_or(Error::MissingNodeName)?;
let node = node_api
.get_opt(node_name)
.await?
.ok_or(Error::MissingNode)?;
let pod_ip = pod.ip().ok_or(Error::MissingPodIp)?;
(node, pod_ip.to_owned())
}
};
// attach to node
let provider_id = node.provider_id().ok_or(Error::MissingProviderId)?;
let instance_id = provider_id
.rsplit_once('/')
.ok_or(Error::MalformedProviderId)?
.1;
let instance_description =
crate::aws::describe_instance(&self.ec2_client, instance_id).await?;
let allocation_id = status
.allocation_id
.clone()
.ok_or(Error::MissingAllocationId)?;
let eni_id = crate::aws::get_eni_from_private_ip(&instance_description, &ip)
.ok_or(Error::NoInterfaceWithThatIp)?;
crate::aws::associate_eip(&self.ec2_client, &allocation_id, &eni_id, &ip).await?;
status.eni = Some(eni_id);
status.private_ip_address = Some(ip.to_owned());
}

if eip.status != Some(status.clone()) {
crate::eip::update_status(&eip_api, name, &status).await?;
}
Ok(None)
}

Expand Down
72 changes: 21 additions & 51 deletions eip_operator/src/controller/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,20 @@ use k8s_openapi::api::core::v1::Node;
use kube::api::{Api, ListParams};
use kube::Client;
use kube_runtime::controller::Action;
use tracing::{event, instrument, Level};
use tracing::{event, info, instrument, Level};

use eip_operator_shared::Error;

use crate::eip::v2::Eip;
use crate::kube_ext::NodeExt;

pub(crate) struct Context {
ec2_client: aws_sdk_ec2::Client,
namespace: Option<String>,
}

impl Context {
pub(crate) fn new(ec2_client: aws_sdk_ec2::Client, namespace: Option<String>) -> Self {
Self {
ec2_client,
namespace,
}
pub(crate) fn new(namespace: Option<String>) -> Self {
Self { namespace }
}
}

Expand All @@ -36,45 +32,32 @@ impl k8s_controller::Context for Context {
client: Client,
node: &Self::Resource,
) -> Result<Option<Action>, Self::Error> {
// Find an EIP and claim if not claimed
let name = node.metadata.name.as_ref().ok_or(Error::MissingNodeName)?;
event!(Level::INFO, name = %name, "Applying node.");

let eip_api = Api::<Eip>::namespaced(
client.clone(),
self.namespace.as_deref().unwrap_or("default"),
);

let node_ip = node.ip().ok_or(Error::MissingNodeIp)?;
let node_labels = node.labels().ok_or(Error::MissingNodeLabels)?;
let provider_id = node.provider_id().ok_or(Error::MissingProviderId)?;
let instance_id = provider_id
.rsplit_once('/')
.ok_or(Error::MalformedProviderId)?
.1;
let all_eips = eip_api.list(&ListParams::default()).await?.items;
let eip = all_eips
.into_iter()
.find(|eip| eip.matches_node(node_labels))
.find(|eip| eip.matches_node_labels(node_labels) && eip.status.is_some())
.ok_or(Error::NoEipResourceWithThatNodeSelector)?;
// do nothing if eip already claimed
let eip_name = eip.name().ok_or(Error::MissingEipName)?;
let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?;
let eip_description = crate::aws::describe_address(&self.ec2_client, allocation_id)
.await?
.addresses
.ok_or(Error::MissingAddresses)?
.swap_remove(0);
let instance_description =
crate::aws::describe_instance(&self.ec2_client, instance_id).await?;

let eni_id = crate::aws::get_eni_from_private_ip(&instance_description, node_ip)
.ok_or(Error::NoInterfaceWithThatIp)?;
if eip_description.network_interface_id != Some(eni_id.to_owned())
|| eip_description.private_ip_address != Some(node_ip.to_owned())
{
crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, node_ip).await?;
}
crate::eip::set_status_attached(&eip_api, eip_name, &eni_id, node_ip).await?;

if eip.claimed() {
info!(
"Found claimed ip {} matching node {}, skipping",
eip_name, name,
);
return Ok(None);
};
let eip_name = eip.name().ok_or(Error::MissingEipName)?;
// ensure there's an allocation id before claiming
let _allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?;
crate::eip::set_status_claimed(&eip_api, eip_name, name).await?;
Ok(None)
}

Expand All @@ -84,33 +67,20 @@ impl k8s_controller::Context for Context {
client: Client,
node: &Self::Resource,
) -> Result<Option<Action>, Self::Error> {
// remove claim
let eip_api = Api::<Eip>::namespaced(
client.clone(),
self.namespace.as_deref().unwrap_or("default"),
);

let node_labels = node.labels().ok_or(Error::MissingNodeLabels)?;
let all_eips = eip_api.list(&ListParams::default()).await?.items;
let eip = all_eips
.into_iter()
.filter(|eip| eip.attached())
.find(|eip| eip.matches_node(node_labels));
.find(|eip| eip.matches_node_labels(node_labels));
if let Some(eip) = eip {
let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?;
let addresses = crate::aws::describe_address(&self.ec2_client, allocation_id)
.await?
.addresses
.ok_or(Error::MissingAddresses)?;
for address in addresses {
if let Some(association_id) = address.association_id {
crate::aws::disassociate_eip(&self.ec2_client, &association_id).await?;
}
}
crate::eip::set_status_detached(
&eip_api,
eip.metadata.name.as_ref().ok_or(Error::MissingEipName)?,
)
.await?;
crate::eip::set_status_detached(&eip_api, eip.name().ok_or(Error::MissingEipName)?)
.await?;
}
Ok(None)
}
Expand Down
Loading

0 comments on commit a69caab

Please sign in to comment.