diff --git a/Cargo.lock b/Cargo.lock index a6850fa..ecf6d4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -547,15 +547,13 @@ dependencies = [ [[package]] name = "cilium-eip-no-masquerade-agent" -version = "0.4.0" +version = "0.5.0" dependencies = [ "async-trait", "eip-operator-shared", "futures", "iptables", "json-patch", - "k8s-openapi", - "kube", "netlink-packet-route", "rand", "rtnetlink", diff --git a/README.md b/README.md index f7ae20e..f33ee5f 100644 --- a/README.md +++ b/README.md @@ -171,8 +171,6 @@ Cilium (as of 1.12.0) does not seem to support configuring masquerade on a per-p ##### B. Run a privileged daemonset in the host network to inject ip rules for pods managed by the eip-operator. -You must set the `VPC_CIDR` environment variable to match the Cilium `ipv4NativeRoutingCIDR`. This allows the agent to detect the appropriate table to forward to from the existing Cilium-created rules. - ```yaml apiVersion: apps/v1 kind: DaemonSet @@ -193,13 +191,6 @@ spec: env: - name: RUST_LOG value: INFO - - name: VPC_CIDR - value: 10.2.0.0/16 - - name: NODE_NAME - valueFrom: - fieldRef: - apiVersion: v1 - fieldPath: spec.nodeName image: materialize/k8s-eip-operator name: eip-operator securityContext: diff --git a/cilium_eip_no_masquerade_agent/Cargo.toml b/cilium_eip_no_masquerade_agent/Cargo.toml index 337abef..c492393 100644 --- a/cilium_eip_no_masquerade_agent/Cargo.toml +++ b/cilium_eip_no_masquerade_agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cilium-eip-no-masquerade-agent" -version = "0.4.0" +version = "0.5.0" edition = "2021" license = "Apache-2.0" @@ -12,8 +12,6 @@ eip-operator-shared = { workspace = true } futures = { workspace = true } iptables = { workspace = true } json-patch = { workspace = true } -k8s-openapi = { workspace = true } -kube = { workspace = true } netlink-packet-route = { workspace = true } rand = { workspace = true } rtnetlink = { workspace = true } diff --git a/cilium_eip_no_masquerade_agent/src/main.rs b/cilium_eip_no_masquerade_agent/src/main.rs index aee258e..df703ea 100644 --- a/cilium_eip_no_masquerade_agent/src/main.rs +++ b/cilium_eip_no_masquerade_agent/src/main.rs @@ -1,21 +1,14 @@ use std::collections::HashSet; -use std::net::Ipv4Addr; -use futures::{future, TryStream, TryStreamExt}; +use futures::{future, TryStreamExt}; use iptables::IPTables; -use json_patch::{PatchOperation, RemoveOperation, TestOperation}; -use k8s_openapi::api::core::v1::Pod; -use kube::api::{ListParams, Patch, PatchParams}; -use kube::{Api, Client as KubeClient, ResourceExt}; use netlink_packet_route::rule::Nla; -use netlink_packet_route::RuleMessage; use rtnetlink::{new_connection, IpVersion, RuleHandle}; use tokio::time::{sleep, Duration}; use tracing::{event, info, trace, Level}; -use eip_operator_shared::{run_with_tracing, Error, MANAGE_EIP_LABEL}; +use eip_operator_shared::{run_with_tracing, Error}; -const FINALIZER_NAME: &str = "eip.materialize.cloud/cilium-no-masquerade-rule"; const TABLE: &str = "mangle"; const CHAIN: &str = "CILIUM_PRE_mangle"; const FW_MASK: u32 = 0xf; @@ -24,27 +17,9 @@ const FIRST_SECONDARY_ENI_INDEX: u32 = 1; // eth15, No AWS instance type supports more than 15 ENIs const LAST_SECONDARY_ENI_INDEX: u32 = 15; -async fn filter_pod_rules( - rules: impl TryStream, - pod_ip: Ipv4Addr, -) -> Result, rtnetlink::Error> { - rules - .try_filter(|rule| { - future::ready( - rule.header.src_len == 32 - && rule.nlas.contains(&Nla::Source(pod_ip.octets().to_vec())), - ) - }) - .try_collect() - .await -} - struct RuleManager { iptables: IPTables, ip_rule_handle: RuleHandle, - kube_client: KubeClient, - global_pod_api: Api, - node_name: String, } impl RuleManager { @@ -55,82 +30,12 @@ impl RuleManager { let ip_rule_handle = rtnetlink_handle.rule(); tokio::spawn(connection); - let kube_client = KubeClient::try_default().await.unwrap(); - let global_pod_api: Api = Api::all(kube_client.clone()); - - let node_name = std::env::var("NODE_NAME") - .expect("NODE_NAME env var must be set to the name of the kubernetes node this agent is running on."); - RuleManager { iptables, ip_rule_handle, - kube_client, - global_pod_api, - node_name, } } - async fn cleanup_legacy_per_pod_rules(&self, pod: &Pod) -> Result<(), Error> { - let pod_name = pod.name_unchecked(); - - // Assuming that if it doesn't have an IP during cleanup, that it never had one. - if let Some(pod_ip_str) = &pod - .status - .as_ref() - .and_then(|status| status.pod_ip.as_ref()) - { - let pod_ip: Ipv4Addr = pod_ip_str.parse()?; - let rules = self - .ip_rule_handle - .get(IpVersion::V4) - .execute() - .into_stream(); - let pod_rules = filter_pod_rules(rules, pod_ip).await?; - if let Some(rule) = pod_rules - .into_iter() - .find(|rule| rule.header.dst_len == 0 && rule.header.action == 1) - { - event!(Level::INFO, pod_name = %pod_name, pod_ip = %pod_ip, rule = ?rule, "Deleting rule."); - self.ip_rule_handle.del(rule).execute().await?; - } - } - self.remove_finalizer(pod, &pod_name).await?; - Ok(()) - } - - async fn remove_finalizer(&self, pod: &Pod, pod_name: &str) -> Result<(), Error> { - // https://docs.rs/kube-runtime/latest/src/kube_runtime/finalizer.rs.html - let finalizer_index = pod - .finalizers() - .iter() - .enumerate() - .find(|(_, finalizer)| *finalizer == FINALIZER_NAME) - .map(|(i, _)| i); - if let Some(finalizer_index) = finalizer_index { - let pod_api: Api = - Api::namespaced(self.kube_client.clone(), &pod.namespace().unwrap()); - let finalizer_path = format!("/metadata/finalizers/{finalizer_index}"); - pod_api - .patch::( - pod_name, - &PatchParams::default(), - &Patch::Json(json_patch::Patch(vec![ - // All finalizers run concurrently and we use an integer index - // `Test` ensures that we fail instead of deleting someone else's finalizer - PatchOperation::Test(TestOperation { - path: finalizer_path.clone(), - value: FINALIZER_NAME.into(), - }), - PatchOperation::Remove(RemoveOperation { - path: finalizer_path, - }), - ])), - ) - .await?; - } - Ok(()) - } - async fn wait_for_chain_to_exist(&self) -> Result<(), Box> { info!("Waiting for {CHAIN} chain to exist"); while !self.iptables.chain_exists(TABLE, CHAIN)? { @@ -228,18 +133,6 @@ async fn run() -> Result<(), Error> { } } - let pods = manager - .global_pod_api - .list( - &ListParams::default() - .labels(MANAGE_EIP_LABEL) - .fields(&format!("spec.nodeName={}", manager.node_name)), - ) - .await?; - for pod in pods { - manager.cleanup_legacy_per_pod_rules(&pod).await?; - } - let delay_secs = 1; info!("Done! Will recheck in {delay_secs} seconds"); sleep(Duration::from_secs(delay_secs)).await;