Skip to content

Commit

Permalink
fixes bug where multiple nodes fight over an eip
Browse files Browse the repository at this point in the history
        - introduces resource_ids to eip status
        - moves setting attachment status to be
        before an attempt at attaching
        - moves status update from patch to replace
        this should raise errors if the resource has
        changed allowing the resource_id to function
        somewhat as a lock
        - resource won't attempt to claim a resource
        with a resource id
        - fixes cargo deny errors
  • Loading branch information
jubrad committed Mar 15, 2024
1 parent f9deed6 commit 484a96e
Show file tree
Hide file tree
Showing 12 changed files with 388 additions and 248 deletions.
346 changes: 179 additions & 167 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ members = [

[workspace.package]
edition = "2021"
rust-version = "1.74.0"
rust-version = "1.76.0"


# Use this section only to change the source of dependencies that might
Expand Down
9 changes: 5 additions & 4 deletions deny.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[graph]
targets = [
{ triple = "aarch64-apple-darwin" },
{ triple = "aarch64-unknown-linux-gnu" },
Expand All @@ -6,7 +7,7 @@ targets = [
]

[advisories]
vulnerability = "deny"
version = 2

[bans]
multiple-versions = "deny"
Expand All @@ -20,9 +21,9 @@ skip = [
{ name = "hashbrown", version = "0.12.3" },
{ name = "hashbrown", version = "0.14.0" },
{ name = "nix", version = "0.26.4" },
{ name = "nix", version = "0.27.1" },
{ name = "ordered-float", version = "2.10.0" },
{ name = "ordered-float", version = "3.4.0" },
{ name = "fastrand", version = "2.0.1" },
{ name = "regex-syntax", version = "0.6.29" },
]

# Use `tracing` instead.
Expand All @@ -35,14 +36,14 @@ name = "env_logger"
name = "rustls"

[licenses]
version = 2
allow = [
"Apache-2.0",
"BSD-2-Clause",
"BSD-3-Clause",
"MIT",
"Unicode-DFS-2016",
]
copyleft = "deny"

[[licenses.clarify]]
name = "ring"
Expand Down
18 changes: 4 additions & 14 deletions eip_operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,10 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
aws-config = { version = "0.55.1", default-features = false, features = [
"native-tls",
] }
aws-sdk-ec2 = { version = "0.28", default-features = false, features = [
"native-tls",
"rt-tokio",
] }
aws-sdk-servicequotas = { version = "0.28", default-features = false, features = [
"native-tls",
"rt-tokio",
] }
aws-smithy-http = { version = "0.55", default-features = false, features = [
"rt-tokio",
] }
aws-config = { version = "0.101", default-features = false}
aws-sdk-ec2 = { version = "0.38", default-features = false, features = [ "rt-tokio" ] }
aws-sdk-servicequotas = { version = "0.38", default-features = false, features = [ "rt-tokio" ] }
aws-smithy-http = { version = "0.59", default-features = false, features = [ "rt-tokio" ] }
futures = { workspace = true }


Expand Down
17 changes: 11 additions & 6 deletions eip_operator/src/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use aws_sdk_ec2::operation::disassociate_address::DisassociateAddressError;
use aws_sdk_ec2::operation::release_address::{ReleaseAddressError, ReleaseAddressOutput};
use aws_sdk_ec2::types::{Address, DomainType, Filter, ResourceType, Tag, TagSpecification};
use aws_sdk_ec2::Client as Ec2Client;
use tracing::{debug, info, instrument};
use tracing::{debug, error, info, instrument};

pub(crate) const LEGACY_CLUSTER_NAME_TAG: &str = "eip.aws.materialize.com/cluster_name";

Expand Down Expand Up @@ -150,19 +150,24 @@ pub(crate) async fn describe_addresses_with_tag_value(
pub(crate) async fn disassociate_eip(
ec2_client: &Ec2Client,
association_id: &str,
) -> Result<(), SdkError<DisassociateAddressError>> {
) -> Result<(), DisassociateAddressError> {
match ec2_client
.disassociate_address()
.association_id(association_id)
.send()
.await
{
Ok(_) => Ok(()),
Err(e) if e.to_string().contains("InvalidAssociationID.NotFound") => {
info!(already_disassociated = true);
Ok(())
Err(e) => {
let e = e.into_service_error();
if e.meta().code() == Some("InvalidAssociationID.NotFound") {
info!("Association id {} already disassociated", association_id);
Ok(())
} else {
error!("Error disassociating {} - {:?}", association_id, e);
Err(e)
}
}
Err(e) => Err(e),
}
}

Expand Down
43 changes: 40 additions & 3 deletions eip_operator/src/controller/eip.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};

use kube::api::Api;
use kube::api::{Api, PatchParams};
use kube::{Client, ResourceExt};
use kube_runtime::controller::Action;
use tracing::instrument;
use tracing::{info, instrument};

use eip_operator_shared::Error;

Expand Down Expand Up @@ -42,7 +43,7 @@ impl k8s_controller::Context for Context {
client: Client,
eip: &Self::Resource,
) -> Result<Option<Action>, Self::Error> {
let eip_api = Api::namespaced(client, &eip.namespace().unwrap());
let eip_api = Api::namespaced(client.clone(), &eip.namespace().unwrap());

let uid = eip.metadata.uid.as_ref().ok_or(Error::MissingEipUid)?;
let name = eip.metadata.name.as_ref().ok_or(Error::MissingEipName)?;
Expand Down Expand Up @@ -87,6 +88,42 @@ impl k8s_controller::Context for Context {
}
};
crate::eip::set_status_created(&eip_api, name, &allocation_id, &public_ip).await?;

if eip.status.as_ref().is_some_and(|s| s.resource_id.is_some()) {
// do nothing
} else {
let resource_api = eip.get_resource_api(&client);
let matched_resources = resource_api
.list(&eip.get_resource_list_params())
.await?
.items;
info!(
"Eip apply for {} Found matched {} resources",
name,
matched_resources.len()
);
for resource in matched_resources {
info!(
"Updating eip refresh label for {}",
resource.name_unchecked()
);
let data = resource.clone().data(serde_json::json!({
"metadata": {
"labels":{
"eip.materialize.cloud/refresh": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs().to_string()
}
}
}));
resource_api
.patch_metadata(
&resource.name_unchecked(),
&PatchParams::default(),
&kube::core::params::Patch::Merge(serde_json::json!(data)),
)
.await?;
}
}

Ok(None)
}

Expand Down
53 changes: 45 additions & 8 deletions eip_operator/src/controller/node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::time::Duration;

use k8s_openapi::api::core::v1::Node;
use kube::api::{Api, ListParams};
use kube::Client;
use kube_runtime::controller::Action;
use tracing::{event, instrument, Level};
use tracing::{event, info, instrument, Level};

use eip_operator_shared::Error;

Expand Down Expand Up @@ -51,11 +53,24 @@ impl k8s_controller::Context for Context {
.rsplit_once('/')
.ok_or(Error::MalformedProviderId)?
.1;
let all_eips = eip_api.list(&ListParams::default()).await?.items;
let all_eips: Vec<Eip> = eip_api
.list(&ListParams::default())
.await?
.items
.into_iter()
.filter(|eip| eip.matches_node(node_labels))
.collect();
if all_eips.is_empty() {
return Err(Error::NoEipResourceWithThatNodeSelector);
}
let eip = all_eips
.into_iter()
.find(|eip| eip.matches_node(node_labels))
.ok_or(Error::NoEipResourceWithThatNodeSelector)?;
.find(|eip| eip.status.as_ref().is_some_and(|s| s.resource_id.is_none()));
if eip.is_none() {
info!("No un-associated eips found for node {}", name);
return Ok(None);
}
let eip = eip.unwrap();
let eip_name = eip.name().ok_or(Error::MissingEipName)?;
let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?;
let eip_description = crate::aws::describe_address(&self.ec2_client, allocation_id)
Expand All @@ -71,10 +86,26 @@ impl k8s_controller::Context for Context {
if eip_description.network_interface_id != Some(eni_id.to_owned())
|| eip_description.private_ip_address != Some(node_ip.to_owned())
{
crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, node_ip).await?;
match crate::eip::set_status_attached(&eip_api, &eip, &eni_id, node_ip, name).await {
Ok(_) => {
info!("Found matching Eip, claiming it");
crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, node_ip)
.await?;
}
Err(err)
if err
.to_string()
.contains("Operation cannot be fulfilled on eips.materialize.cloud") =>
{
info!(
"Pod {} failed to claim eip {}, rescheduling to try another",
name, eip_name
);
return Ok(Some(Action::requeue(Duration::from_secs(1))));
}
Err(e) => return Err(e),
};
}
crate::eip::set_status_attached(&eip_api, eip_name, &eni_id, node_ip).await?;

Ok(None)
}

Expand All @@ -94,7 +125,13 @@ impl k8s_controller::Context for Context {
let eip = all_eips
.into_iter()
.filter(|eip| eip.attached())
.find(|eip| eip.matches_node(node_labels));
.find(|eip| {
eip.matches_node(node_labels)
&& eip
.status
.as_ref()
.is_some_and(|s| s.resource_id == Some(node.metadata.name.clone().unwrap()))
});
if let Some(eip) = eip {
let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?;
let addresses = crate::aws::describe_address(&self.ec2_client, allocation_id)
Expand Down
5 changes: 2 additions & 3 deletions eip_operator/src/controller/pod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ impl k8s_controller::Context for Context {
.into_iter()
.find(|eip| eip.matches_pod(name))
.ok_or_else(|| Error::NoEipResourceWithThatPodName(name.to_owned()))?;
let eip_name = eip.name().ok_or(Error::MissingEipName)?;
let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?;
let eip_description = crate::aws::describe_address(&self.ec2_client, allocation_id)
.await?
Expand All @@ -82,10 +81,10 @@ impl k8s_controller::Context for Context {
if eip_description.network_interface_id != Some(eni_id.to_owned())
|| eip_description.private_ip_address != Some(pod_ip.to_owned())
{
crate::eip::set_status_attached(&eip_api, &eip, &eni_id, pod_ip, name).await?;
crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, pod_ip).await?;
add_dns_target_annotation(&pod_api, name, &public_ip, allocation_id).await?;
}
crate::eip::set_status_attached(&eip_api, eip_name, &eni_id, pod_ip).await?;
add_dns_target_annotation(&pod_api, name, &public_ip, allocation_id).await?;
Ok(None)
}

Expand Down
Loading

0 comments on commit 484a96e

Please sign in to comment.