Skip to content

Commit

Permalink
address pr comments 1
Browse files Browse the repository at this point in the history
  • Loading branch information
jubrad committed Mar 23, 2024
1 parent 484a96e commit bd85c36
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 66 deletions.
16 changes: 8 additions & 8 deletions eip_operator/src/controller/eip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ impl k8s_controller::Context for Context {
) -> Result<Option<Action>, Self::Error> {
let eip_api = Api::namespaced(client.clone(), &eip.namespace().unwrap());

let uid = eip.metadata.uid.as_ref().ok_or(Error::MissingEipUid)?;
let name = eip.metadata.name.as_ref().ok_or(Error::MissingEipName)?;
let uid = eip.uid().ok_or(Error::MissingEipUid)?;
let name = eip.name_unchecked();
let selector = &eip.spec.selector;
let addresses = crate::aws::describe_addresses_with_tag_value(
&self.ec2_client,
crate::aws::EIP_UID_TAG,
uid,
&uid,
)
.await?
.addresses
Expand All @@ -60,8 +60,8 @@ impl k8s_controller::Context for Context {
0 => {
let response = crate::aws::allocate_address(
&self.ec2_client,
uid,
name,
&uid,
&name,
selector,
&self.cluster_name,
&eip.namespace().unwrap(),
Expand All @@ -87,7 +87,7 @@ impl k8s_controller::Context for Context {
return Err(Error::MultipleEipsTaggedForPod);
}
};
crate::eip::set_status_created(&eip_api, name, &allocation_id, &public_ip).await?;
crate::eip::set_status_created(&eip_api, &name, &allocation_id, &public_ip).await?;

if eip.status.as_ref().is_some_and(|s| s.resource_id.is_some()) {
// do nothing
Expand Down Expand Up @@ -133,11 +133,11 @@ impl k8s_controller::Context for Context {
_client: Client,
eip: &Self::Resource,
) -> Result<Option<Action>, Self::Error> {
let uid = eip.metadata.uid.as_ref().ok_or(Error::MissingEipUid)?;
let uid = eip.uid().ok_or(Error::MissingEipUid)?;
let addresses = crate::aws::describe_addresses_with_tag_value(
&self.ec2_client,
crate::aws::EIP_UID_TAG,
uid,
&uid,
)
.await?
.addresses;
Expand Down
57 changes: 31 additions & 26 deletions eip_operator/src/controller/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use k8s_openapi::api::core::v1::Node;
use kube::api::{Api, ListParams};
use kube::Client;
use kube::{Client, ResourceExt};
use kube_runtime::controller::Action;
use tracing::{event, info, instrument, Level};

Expand Down Expand Up @@ -38,7 +38,7 @@ impl k8s_controller::Context for Context {
client: Client,
node: &Self::Resource,
) -> Result<Option<Action>, Self::Error> {
let name = node.metadata.name.as_ref().ok_or(Error::MissingNodeName)?;
let name = node.name_unchecked();
event!(Level::INFO, name = %name, "Applying node.");

let eip_api = Api::<Eip>::namespaced(
Expand All @@ -47,30 +47,32 @@ impl k8s_controller::Context for Context {
);

let node_ip = node.ip().ok_or(Error::MissingNodeIp)?;
let node_labels = node.labels().ok_or(Error::MissingNodeLabels)?;
let node_labels = node.labels();
let provider_id = node.provider_id().ok_or(Error::MissingProviderId)?;
let instance_id = provider_id
.rsplit_once('/')
.ok_or(Error::MalformedProviderId)?
.1;
let all_eips: Vec<Eip> = eip_api
let matched_eips: Vec<Eip> = eip_api
.list(&ListParams::default())
.await?
.items
.into_iter()
.filter(|eip| eip.matches_node(node_labels))
.collect();
if all_eips.is_empty() {
if matched_eips.is_empty() {
return Err(Error::NoEipResourceWithThatNodeSelector);
}
let eip = all_eips
.into_iter()
.find(|eip| eip.status.as_ref().is_some_and(|s| s.resource_id.is_none()));
if eip.is_none() {
info!("No un-associated eips found for node {}", name);
let eip = matched_eips.into_iter().find(|eip| {
eip.status.as_ref().is_some_and(|s| {
s.resource_id.is_none()
|| s.resource_id.as_ref().map(|r| r == &name).unwrap_or(false)
})
});
let Some(eip) = eip else {
info!("No un-associated eips found for node {}", &name);
return Ok(None);
}
let eip = eip.unwrap();
};
let eip_name = eip.name().ok_or(Error::MissingEipName)?;
let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?;
let eip_description = crate::aws::describe_address(&self.ec2_client, allocation_id)
Expand All @@ -86,19 +88,27 @@ impl k8s_controller::Context for Context {
if eip_description.network_interface_id != Some(eni_id.to_owned())
|| eip_description.private_ip_address != Some(node_ip.to_owned())
{
match crate::eip::set_status_attached(&eip_api, &eip, &eni_id, node_ip, name).await {
match crate::eip::set_status_attached(&eip_api, &eip, &eni_id, node_ip, &name).await {
Ok(_) => {
info!("Found matching Eip, claiming it");
crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, node_ip)
.await?;
let association_id = crate::aws::associate_eip(
&self.ec2_client,
allocation_id,
&eni_id,
node_ip,
)
.await?
.association_id
.ok_or(Error::MissingAssociationId)?;
crate::eip::set_status_associated(&eip_api, eip_name, &association_id).await?;
}
Err(err)
if err
.to_string()
.contains("Operation cannot be fulfilled on eips.materialize.cloud") =>
{
info!(
"Pod {} failed to claim eip {}, rescheduling to try another",
"Node {} failed to claim eip {}, rescheduling to try another",
name, eip_name
);
return Ok(Some(Action::requeue(Duration::from_secs(1))));
Expand All @@ -119,18 +129,17 @@ impl k8s_controller::Context for Context {
client.clone(),
self.namespace.as_deref().unwrap_or("default"),
);

let node_labels = node.labels().ok_or(Error::MissingNodeLabels)?;
let all_eips = eip_api.list(&ListParams::default()).await?.items;
let eip = all_eips
let node_labels = node.labels();
let matched_eips = eip_api.list(&ListParams::default()).await?.items;
let eip = matched_eips
.into_iter()
.filter(|eip| eip.attached())
.find(|eip| {
eip.matches_node(node_labels)
&& eip
.status
.as_ref()
.is_some_and(|s| s.resource_id == Some(node.metadata.name.clone().unwrap()))
.is_some_and(|s| s.resource_id == Some(node.name_unchecked().clone()))
});
if let Some(eip) = eip {
let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?;
Expand All @@ -143,11 +152,7 @@ impl k8s_controller::Context for Context {
crate::aws::disassociate_eip(&self.ec2_client, &association_id).await?;
}
}
crate::eip::set_status_detached(
&eip_api,
eip.metadata.name.as_ref().ok_or(Error::MissingEipName)?,
)
.await?;
crate::eip::set_status_detached(&eip_api, &eip.name_unchecked()).await?;
}
Ok(None)
}
Expand Down
47 changes: 28 additions & 19 deletions eip_operator/src/controller/pod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ impl k8s_controller::Context for Context {
client: Client,
pod: &Self::Resource,
) -> Result<Option<Action>, Self::Error> {
let name = pod.metadata.name.as_ref().ok_or(Error::MissingPodName)?;
let name = pod.name_unchecked();

let eip_api = Api::<Eip>::namespaced(client.clone(), &pod.namespace().unwrap());
let pod_api = Api::<Pod>::namespaced(client.clone(), &pod.namespace().unwrap());
let node_api = Api::<Node>::all(client.clone());

if should_autocreate_eip(pod) {
event!(Level::INFO, should_autocreate_eip = true);
crate::eip::create_for_pod(&eip_api, name).await?;
crate::eip::create_for_pod(&eip_api, &name).await?;
}

let pod_ip = pod.ip().ok_or(Error::MissingPodIp)?;
Expand All @@ -65,26 +65,39 @@ impl k8s_controller::Context for Context {
.ok_or(Error::NoInterfaceWithThatIp)?
}
};

let all_eips = eip_api.list(&ListParams::default()).await?.items;
let eip = all_eips
let eips = eip_api
.list(&ListParams::default())
.await?
.items
.into_iter()
.find(|eip| eip.matches_pod(name))
.ok_or_else(|| Error::NoEipResourceWithThatPodName(name.to_owned()))?;
.filter(|eip| eip.matches_pod(&name))
.collect::<Vec<_>>();
let eip = match eips.len() {
0 => return Err(Error::NoEipResourceWithThatPodName(name.clone())),
1 => eips[0].clone(),
_ => return Err(Error::MultipleEipsTaggedForPod),
};
let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?;
let eip_description = crate::aws::describe_address(&self.ec2_client, allocation_id)
.await?
.addresses
.ok_or(Error::MissingAddresses)?
.swap_remove(0);
let public_ip = eip_description.public_ip.ok_or(Error::MissingPublicIp)?;
// having multiple EIPs
crate::eip::set_status_attached(&eip_api, &eip, &eni_id, pod_ip, &name).await?;
if eip_description.network_interface_id != Some(eni_id.to_owned())
|| eip_description.private_ip_address != Some(pod_ip.to_owned())
{
crate::eip::set_status_attached(&eip_api, &eip, &eni_id, pod_ip, name).await?;
crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, pod_ip).await?;
add_dns_target_annotation(&pod_api, name, &public_ip, allocation_id).await?;
let association_id =
crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, pod_ip)
.await?
.association_id
.ok_or(Error::MissingAssociationId)?;
crate::eip::set_status_associated(&eip_api, &eip.name_unchecked(), &association_id)
.await?;
}
add_dns_target_annotation(&pod_api, &name, &public_ip, allocation_id).await?;
Ok(None)
}

Expand All @@ -94,12 +107,12 @@ impl k8s_controller::Context for Context {
client: Client,
pod: &Self::Resource,
) -> Result<Option<Action>, Self::Error> {
let name = pod.metadata.name.as_ref().ok_or(Error::MissingPodUid)?;
let name = pod.name_unchecked();

let eip_api = Api::<Eip>::namespaced(client.clone(), &pod.namespace().unwrap());

let all_eips = eip_api.list(&ListParams::default()).await?.items;
let eip = all_eips.into_iter().find(|eip| eip.matches_pod(name));
let matched_eips = eip_api.list(&ListParams::default()).await?.items;
let eip = matched_eips.into_iter().find(|eip| eip.matches_pod(&name));
if let Some(eip) = eip {
let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?;
let addresses = crate::aws::describe_address(&self.ec2_client, allocation_id)
Expand All @@ -111,15 +124,11 @@ impl k8s_controller::Context for Context {
crate::aws::disassociate_eip(&self.ec2_client, &association_id).await?;
}
}
crate::eip::set_status_detached(
&eip_api,
eip.metadata.name.as_ref().ok_or(Error::MissingEipName)?,
)
.await?;
crate::eip::set_status_detached(&eip_api, &eip.name_unchecked()).await?;
};
if should_autocreate_eip(pod) {
event!(Level::INFO, should_autocreate_eip = true);
crate::eip::delete(&eip_api, name).await?;
crate::eip::delete(&eip_api, &name).await?;
}
Ok(None)
}
Expand Down
40 changes: 34 additions & 6 deletions eip_operator/src/eip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub mod v2 {
use kube::api::{Api, ListParams};
use kube::core::{DynamicObject, GroupVersionKind};
use kube::discovery::ApiResource;
use kube::{Client, CustomResource, Resource};
use kube::{Client, CustomResource, Resource, ResourceExt};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
Expand Down Expand Up @@ -125,7 +125,8 @@ pub mod v2 {
printcolumn = r#"{"name": "Selector", "type": "string", "description": "Selector for the pod or node to associate the EIP with.", "jsonPath": ".spec.selector", "priority": 1}"#,
printcolumn = r#"{"name": "ENI", "type": "string", "description": "ID of the Elastic Network Interface of the pod.", "jsonPath": ".status.eni", "priority": 1}"#,
printcolumn = r#"{"name": "PrivateIP", "type": "string", "description": "Private IP address of the pod.", "jsonPath": ".status.privateIpAddress", "priority": 1}"#,
printcolumn = r#"{"name": "ResourceId", "type": "string", "description": "ID of resource the EIP is attached to..", "jsonPath": ".status.resourceId", "priority": 1}"#
printcolumn = r#"{"name": "ResourceId", "type": "string", "description": "ID of resource the EIP is attached to.", "jsonPath": ".status.resourceId", "priority": 1}"#,
printcolumn = r#"{"name": "AssociationId", "type": "string", "description": "ID of the association for the attachment.", "jsonPath": ".status.resourceId", "priority": 1}"#
)]
pub struct EipSpec {
pub selector: EipSelector,
Expand Down Expand Up @@ -186,8 +187,12 @@ pub mod v2 {
Pod::kind(&()).as_ref(),
);
let api_resource = ApiResource::from_gvk(&gvk);
// Api::namespaced(client, &self.namespace().unwrap())
Api::default_namespaced_with(client.clone(), &api_resource)
Api::namespaced_with(
client.clone(),
// eips are namespaced
&self.namespace().unwrap_or("default".to_owned()),
&api_resource,
)
}
EipSelector::Node { selector: _ } => {
let gvk = GroupVersionKind::gvk(
Expand All @@ -196,9 +201,7 @@ pub mod v2 {
Node::kind(&()).as_ref(),
);
let api_resource = ApiResource::from_gvk(&gvk);
// Api::namespaced(client, &self.namespace().unwrap())
Api::all_with(client.clone(), &api_resource)
// Api::all(client),
}
}
}
Expand Down Expand Up @@ -262,6 +265,7 @@ pub struct EipStatus {
pub eni: Option<String>,
pub private_ip_address: Option<String>,
pub resource_id: Option<String>,
pub association_id: Option<String>,
}

/// Registers the Eip custom resource with Kubernetes,
Expand Down Expand Up @@ -381,6 +385,30 @@ pub(crate) async fn set_status_created(
result
}

/// Sets the associationId field in the Eip status.
#[instrument(skip(api), err)]
pub(crate) async fn set_status_associated(
api: &Api<v2::Eip>,
name: &str,
association_id: &str,
) -> Result<Eip, kube::Error> {
event!(Level::INFO, "Updating status for created EIP.");
let patch = serde_json::json!({
"apiVersion": Eip::version(),
"kind": "Eip",
"status": {
"associationId": association_id,
}
});
let patch = Patch::Merge(&patch);
let params = PatchParams::default();
let result = api.patch_status(name, &params, &patch).await;
if result.is_ok() {
event!(Level::INFO, "Done updating status for created EIP.");
}
result
}

/// Sets the eni and privateIpAddress fields in the Eip status.
#[instrument(skip(api), err)]
pub(crate) async fn set_status_attached(
Expand Down
7 changes: 0 additions & 7 deletions eip_operator/src/kube_ext.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::collections::BTreeMap;

use k8s_openapi::api::core::v1::{Node, Pod};

pub(crate) trait NodeExt {
fn ip(&self) -> Option<&str>;
fn labels(&self) -> Option<&BTreeMap<String, String>>;
fn provider_id(&self) -> Option<&str>;
}

Expand All @@ -21,10 +18,6 @@ impl NodeExt for Node {
})
}

fn labels(&self) -> Option<&BTreeMap<String, String>> {
self.metadata.labels.as_ref()
}

fn provider_id(&self) -> Option<&str> {
self.spec
.as_ref()
Expand Down
2 changes: 2 additions & 0 deletions eip_operator_shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ pub enum Error {
MultipleEipsTaggedForPod,
#[error("allocation_id was None.")]
MissingAllocationId,
#[error("aassociation_id was None.")]
MissingAssociationId,
#[error("public_ip was None.")]
MissingPublicIp,
#[error("DescribeInstancesResult.reservations was None.")]
Expand Down

0 comments on commit bd85c36

Please sign in to comment.