Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor EIP Operator #348

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,121 changes: 627 additions & 494 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ members = [

[workspace.package]
edition = "2021"
rust-version = "1.74.0"
rust-version = "1.76.0"


# Use this section only to change the source of dependencies that might
Expand Down
10 changes: 6 additions & 4 deletions deny.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[graph]
targets = [
{ triple = "aarch64-apple-darwin" },
{ triple = "aarch64-unknown-linux-gnu" },
Expand All @@ -6,7 +7,7 @@ targets = [
]

[advisories]
jubrad marked this conversation as resolved.
Show resolved Hide resolved
vulnerability = "deny"
version = 2

[bans]
multiple-versions = "deny"
Expand All @@ -20,9 +21,10 @@ skip = [
{ name = "hashbrown", version = "0.12.3" },
{ name = "hashbrown", version = "0.14.0" },
{ name = "nix", version = "0.26.4" },
{ name = "nix", version = "0.27.1" },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we continue to add all versions of duplicate deps here? it makes it easier to track when things change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can but cargo deny was yelling at me, maybe I need to adjust my configuration?

{ name = "ordered-float", version = "2.10.0" },
{ name = "ordered-float", version = "3.4.0" },
{ name = "fastrand", version = "2.0.1" },
{ name = "regex-automata", version = "0.4.6" },
{ name = "regex-syntax", version = "0.6.29" },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm fairly sure we can get rid of these duplicates with some pretty easy selective downgrades, which i think would be preferable if possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's definitely possible, I spent like 2 hours fighting with versions and updating code after updates. I don't really care to hop down that rabbit hole again.

]

# Use `tracing` instead.
Expand All @@ -35,14 +37,14 @@ name = "env_logger"
name = "rustls"

[licenses]
version = 2
allow = [
"Apache-2.0",
"BSD-2-Clause",
"BSD-3-Clause",
"MIT",
"Unicode-DFS-2016",
]
copyleft = "deny"
jubrad marked this conversation as resolved.
Show resolved Hide resolved

[[licenses.clarify]]
name = "ring"
Expand Down
18 changes: 4 additions & 14 deletions eip_operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,10 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
aws-config = { version = "0.55.1", default-features = false, features = [
"native-tls",
] }
aws-sdk-ec2 = { version = "0.28", default-features = false, features = [
"native-tls",
"rt-tokio",
] }
aws-sdk-servicequotas = { version = "0.28", default-features = false, features = [
"native-tls",
"rt-tokio",
] }
aws-smithy-http = { version = "0.55", default-features = false, features = [
"rt-tokio",
] }
aws-config = { version = "0.101", default-features = false}
aws-sdk-ec2 = { version = "0.38", default-features = false, features = [ "rt-tokio" ] }
aws-sdk-servicequotas = { version = "0.38", default-features = false, features = [ "rt-tokio" ] }
aws-smithy-http = { version = "0.59", default-features = false, features = [ "rt-tokio" ] }
futures = { workspace = true }


Expand Down
17 changes: 11 additions & 6 deletions eip_operator/src/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use aws_sdk_ec2::operation::disassociate_address::DisassociateAddressError;
use aws_sdk_ec2::operation::release_address::{ReleaseAddressError, ReleaseAddressOutput};
use aws_sdk_ec2::types::{Address, DomainType, Filter, ResourceType, Tag, TagSpecification};
use aws_sdk_ec2::Client as Ec2Client;
use tracing::{debug, info, instrument};
use tracing::{debug, error, info, instrument};

pub(crate) const LEGACY_CLUSTER_NAME_TAG: &str = "eip.aws.materialize.com/cluster_name";

Expand Down Expand Up @@ -150,19 +150,24 @@ pub(crate) async fn describe_addresses_with_tag_value(
pub(crate) async fn disassociate_eip(
ec2_client: &Ec2Client,
association_id: &str,
) -> Result<(), SdkError<DisassociateAddressError>> {
) -> Result<(), DisassociateAddressError> {
match ec2_client
.disassociate_address()
.association_id(association_id)
.send()
.await
{
Ok(_) => Ok(()),
Err(e) if e.to_string().contains("InvalidAssociationID.NotFound") => {
info!(already_disassociated = true);
Ok(())
Err(e) => {
let e = e.into_service_error();
if e.meta().code() == Some("InvalidAssociationID.NotFound") {
info!("Association id {} already disassociated", association_id);
Ok(())
} else {
error!("Error disassociating {} - {:?}", association_id, e);
Err(e)
}
}
Err(e) => Err(e),
}
}

Expand Down
191 changes: 187 additions & 4 deletions eip_operator/src/controller/eip.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use std::collections::HashMap;
use std::time::Duration;

use kube::api::Api;
use k8s_openapi::api::core::v1::{Node, Pod};
use k8s_openapi::Metadata;
use kube::api::{Api, ListParams};
use kube::{Client, ResourceExt};
use kube_runtime::controller::Action;
use tracing::instrument;
use tracing::{error, info, warn};

use eip_operator_shared::Error;

use crate::eip::v2::Eip;
use crate::eip::v2::{Eip, EipSelector};
use crate::kube_ext::{NodeExt, PodExt};

pub(crate) struct Context {
ec2_client: aws_sdk_ec2::Client,
Expand Down Expand Up @@ -42,11 +47,16 @@ impl k8s_controller::Context for Context {
client: Client,
eip: &Self::Resource,
) -> Result<Option<Action>, Self::Error> {
let eip_api = Api::namespaced(client, &eip.namespace().unwrap());
let eip_api = Api::namespaced(client.clone(), eip.namespace().unwrap());
let pod_api = Api::<Pod>::namespaced(client.clone(), eip.namespace().unwrap());
let node_api = Api::<Node>::all(client.clone());

// Ensure EIP is Created
let uid = eip.metadata.uid.as_ref().ok_or(Error::MissingEipUid)?;
let name = eip.metadata.name.as_ref().ok_or(Error::MissingEipName)?;
let selector = &eip.spec.selector;
let mut status = eip.status.clone().unwrap_or_default();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pattern we're moving to here where we just update the mutable status variable is a bit dangerous because if we exit early at any point (with the various uses of ?, etc) we will lose the updates we have made, which is i think wrong because we are doing things like updating the status after making real changes (disassociating eips, etc). i think we want to make sure to save the status field changes back to the kubernetes api whenever we update this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is a fair point. We should patch the resource every time we make a durable change. - code has been updated


let addresses = crate::aws::describe_addresses_with_tag_value(
&self.ec2_client,
crate::aws::EIP_UID_TAG,
Expand All @@ -55,6 +65,8 @@ impl k8s_controller::Context for Context {
.await?
.addresses
.ok_or(Error::MissingAddresses)?;

// Ensure the EIP Exists
let (allocation_id, public_ip) = match addresses.len() {
0 => {
let response = crate::aws::allocate_address(
Expand All @@ -63,7 +75,7 @@ impl k8s_controller::Context for Context {
name,
selector,
&self.cluster_name,
&eip.namespace().unwrap(),
eip.namespace().unwrap(),
&self.default_tags,
)
.await?;
Expand All @@ -87,6 +99,177 @@ impl k8s_controller::Context for Context {
}
};
crate::eip::set_status_created(&eip_api, name, &allocation_id, &public_ip).await?;
jubrad marked this conversation as resolved.
Show resolved Hide resolved
status.allocation_id = Some(allocation_id);
status.public_ip_address = Some(public_ip);

// get potential attachments
let mut matched_pods: Vec<Pod> = vec![];
let mut matched_nodes: Vec<Node> = vec![];

match eip.spec.selector {
EipSelector::Pod { ref pod_name } => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is ref actually necessary here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At one point my LSP told me to do it so I did it. I'm not sure if it's still needed, but nothing is complaining.

matched_pods = pod_api
.list(&ListParams {
field_selector: Some(format!("metadata.name={}", pod_name)),
..Default::default()
})
.await?
.into_iter()
.filter(|pod| pod.metadata.deletion_timestamp.is_none())
.collect::<Vec<Pod>>();
matched_pods.sort_unstable_by_key(|s| s.name_unchecked());
}
EipSelector::Node { ref selector } => {
let label_selector = selector
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<String>>()
.join(",");
matched_nodes = node_api
.list(&ListParams {
label_selector: Some(label_selector),
..Default::default()
})
.await?
.into_iter()
.filter(|node| node.metadata().deletion_timestamp.is_none())
.collect::<Vec<Node>>();
matched_nodes.sort_unstable_by_key(|s| s.name_unchecked());
}
}

// Check to make sure our resource still matches
// incase our selectors have updated, or the nodes/pods have changed
let mut disassociate = false;
jubrad marked this conversation as resolved.
Show resolved Hide resolved
match &eip.spec.selector {
EipSelector::Pod { pod_name: _ } => {
if matched_pods.is_empty() {
disassociate = true;
}
}
EipSelector::Node { selector: _ } => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the logic here different for pods and nodes?

disassociate = true;
for node in &matched_nodes {
if eip.associated_with_node(node) {
disassociate = false;
break;
}
}
}
};

// Disassociaate if conditions are met
if disassociate {
info!("Disassociating EIP! {}", name);
let association_id = match addresses.len() {
0 => Ok(None),
1 => Ok(addresses[0].association_id().map(|id| id.to_owned())),
_ => Err(Error::MultipleAddressesAssociatedToEip),
}?;

if let Some(id) = association_id {
crate::aws::disassociate_eip(&self.ec2_client, &id).await?;
} else {
info!("EIP {} was already disassociated", name);
alex-hunt-materialize marked this conversation as resolved.
Show resolved Hide resolved
}
status.eni = None;
status.private_ip_address = None;
crate::eip::update_status(&eip_api, name, &status).await?;
}

// Find new resource to associate with there is no current claim
let mut associatiate_with_ip: Option<&str> = None;
let mut associatiate_with_node: Option<Node> = None;
if !eip.attached() {
match eip.spec.selector {
EipSelector::Node { selector: _ } => match matched_nodes.len() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if matched_nodes.is_empty() { maybe?

0 => {
warn!("Eip {} matches no nodes", name);
}
_ => {
let node_name = matched_nodes[0].name_unchecked();
info!("Eip {} matches node {}, updating", name, node_name,);
associatiate_with_node = Some(matched_nodes[0].clone());
associatiate_with_ip =
Some(matched_nodes[0].ip().ok_or(Error::MissingNodeIp)?);
if matched_nodes.len() > 1 {
warn!(
"Eip {} matches multiple nodes - {}, choosing the first",
name,
matched_nodes
.iter()
.map(|node| node.name_unchecked())
.collect::<Vec<String>>()
.join(",")
);
}
}
},
EipSelector::Pod { pod_name: _ } => match matched_pods.len() {
0 => {
warn!("Eip {} matches no pods", name);
}
1 => {
info!(
"Eip {} matches pod {}, updating",
name,
matched_pods[0].name_unchecked()
);
let node = node_api
.get_opt(
matched_pods[0]
.node_name()
.ok_or(Error::MissingPodNodeName)?,
)
.await?
.ok_or(Error::MissingNode)?;
associatiate_with_node = Some(node);
if let Some(ip) = matched_pods[0].ip() {
associatiate_with_ip = Some(ip);
} else {
// This is a case where we've found a pod but it has yet to be
// scheduled, we need to retry
return Ok(Some(Action::requeue(Duration::from_secs(1))));
}
}
_ => {
error!(
"Eip {} matches multiple pods - {}",
name,
matched_pods
.iter()
.map(|pod| pod.name_unchecked())
.collect::<Vec<String>>()
.join(",")
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be unreachable, as kubernetes does not allow multiple pods with the same name.

}
},
}
}

match (associatiate_with_node, associatiate_with_ip) {
(Some(node), Some(ip)) => {
status.eni = Some(
eip.associate_with_node_and_ip(&self.ec2_client, &node, ip)
.await?,
);
status.private_ip_address = Some(ip.to_owned());
info!(
"Eip {} has been successfully associated after reconciliation",
eip.name().unwrap()
);
}
(None, None) => {
info!("Eip {} is correctly associated!", eip.name_unchecked());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
info!("Eip {} is correctly associated!", eip.name_unchecked());
info!("Eip {} is correctly not associated!", eip.name_unchecked());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the node and ip are none then we don't want to modify our association or create a new association.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is correct that it is not associated. The phrasing of the log message makes it seem like it is associated and that that is correct, which isn't what we are trying to indicate. We are trying to indicate both that it is not associated, and that that is the correct state.

}
(_, _) => {
// this should not be possible
error!("Bad state, need both node and eip to associate");
}
}
if eip.status != Some(status.clone()) {
crate::eip::update_status(&eip_api, name, &status).await?;
}
Ok(None)
}

Expand Down
Loading
Loading