Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use reflector for egress labeling #476

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ netlink-packet-route = "0.17"
rand = "0.8"
rtnetlink = "0.13"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
tokio-stream = "0.1"
kube = { version = "0.92.1", default-features = false, features = ["derive", "openssl-tls"] }
kube-runtime = "0.92.1"
tracing = "0.1"
Expand Down
1 change: 1 addition & 0 deletions eip_operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ schemars = "0.8"
serde = "1"
serde_json = "1"
tokio = { workspace = true }
tokio-stream = { workspace = true }
tracing = { workspace = true }

eip-operator-shared = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions eip_operator/src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub(crate) mod egress;
pub(crate) mod eip;
pub(crate) mod node;
pub(crate) mod pod;
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,36 @@ use crate::Error;
use k8s_openapi::api::core::v1::Node;
use kube::api::{Api, Patch, PatchParams};
use kube::ResourceExt;
use tracing::{info, instrument};
use kube_runtime::controller::Action;
use std::{sync::Arc, time::Duration};
use tracing::{event, info, instrument, Level};

const EGRESS_NODE_LABEL_REQUEUE_DURATION: std::time::Duration = std::time::Duration::from_secs(300);

#[derive(Debug)]
pub struct Context {
pub node_api: Api<Node>,
}

/// Basic controller reconciliation for egress labeling.
/// k8s-controller crate is not being used since this controller does not add a Finalizer.
#[instrument(err)]
pub(crate) async fn reconcile(eip: Arc<Eip>, ctx: Arc<Context>) -> Result<Action, kube::Error> {
let node_api = &ctx.node_api;

if let Err(err) = label_egress_nodes(&eip, node_api).await {
event!(Level::ERROR, err = %err, "Node egress labeling reporting error");
}

Ok(
Action::requeue(EGRESS_NODE_LABEL_REQUEUE_DURATION), // requeue every 5 minutes
)
}

pub(crate) fn error_policy(_eip: Arc<Eip>, error: &kube::Error, _ctx: Arc<Context>) -> Action {
event!(Level::ERROR, err = %error, "Reconcile error");
Action::requeue(Duration::from_secs(60))
}

/// Applies label specifying the ready status of the egress gateway node.
#[instrument(skip(api), err)]
Expand Down Expand Up @@ -40,7 +69,10 @@ pub(crate) async fn add_gateway_status_label(
pub(crate) async fn label_egress_nodes(eip: &Eip, node_api: &Api<Node>) -> Result<(), Error> {
let egress_nodes = node_api.list(&eip.get_resource_list_params()).await?.items;
if egress_nodes.is_empty() {
info!("No egress nodes found. Skipping egress node ready status labeling.");
info!(
"No egress nodes found for EIP {}. Skipping egress node ready status labeling.",
eip.name_unchecked()
);
return Ok(());
}

Expand All @@ -66,6 +98,15 @@ pub(crate) async fn label_egress_nodes(eip: &Eip, node_api: &Api<Node>) -> Resul
.map(|condition| condition.status.clone())
.ok_or(Error::MissingNodeReadyCondition)?;
if node_ready_status != "True" {
info!(
"Node {} is not ready. Skipping egress node ready status labeling.",
node.name_unchecked()
);
info!(
"Available egress nodes for EIP {} are: {:?}",
eip.name_unchecked(),
egress_nodes
);
return Ok(());
} else {
// mark node with EIP as ready for egress traffic
Expand Down
8 changes: 6 additions & 2 deletions eip_operator/src/controller/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,12 @@ impl k8s_controller::Context for Context {
}

let node_api = Api::<Node>::all(client);
crate::egress::add_gateway_status_label(&node_api, node.name_unchecked().as_str(), "false")
.await?;
crate::controller::egress::add_gateway_status_label(
&node_api,
node.name_unchecked().as_str(),
"false",
)
.await?;

Ok(None)
}
Expand Down
39 changes: 22 additions & 17 deletions eip_operator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;

use aws_config::{BehaviorVersion, ConfigLoader};
Expand All @@ -8,7 +8,7 @@ use aws_sdk_ec2::Client as Ec2Client;
use aws_sdk_servicequotas::types::ServiceQuota;
use aws_sdk_servicequotas::Client as ServiceQuotaClient;
use futures::future::join_all;
use futures::TryStreamExt;
use futures::StreamExt;
use json_patch::{PatchOperation, RemoveOperation, TestOperation};
use k8s_controller::Controller;
use k8s_openapi::api::core::v1::{Node, Pod};
Expand All @@ -23,7 +23,6 @@ use eip::v2::Eip;

mod aws;
mod controller;
mod egress;
mod eip;
mod kube_ext;

Expand Down Expand Up @@ -160,23 +159,29 @@ async fn run() -> Result<(), Error> {
tasks.push({
let eip_api = eip_api.clone();
let node_api = node_api.clone();
let watch_config = kube_runtime::watcher::Config::default();
let context = Arc::new(controller::egress::Context {
node_api: node_api.clone(),
});
let watch_config: kube_runtime::watcher::Config = Default::default();

task::spawn(async move {
let mut watcher = pin!(kube_runtime::watcher(eip_api.clone(), watch_config));

while let Some(eip_event) = watcher.try_next().await.unwrap_or_else(|e| {
event!(Level::ERROR, err = %e, "Error watching eips");
None
}) {
if let kube_runtime::watcher::Event::Apply(eip)
| kube_runtime::watcher::Event::Delete(eip) = eip_event
{
if let Err(err) = crate::egress::label_egress_nodes(&eip, &node_api).await {
event!(Level::ERROR, err = %err, "Node egress labeling reporting error");
kube_runtime::controller::Controller::new(eip_api, watch_config)
.run(
controller::egress::reconcile,
controller::egress::error_policy,
context,
)
.for_each(|reconcile_result| async move {
match reconcile_result {
Ok(_) => {
event!(Level::INFO, "Egress Labeling Reconciliation successful");
}
Err(err) => {
event!(Level::ERROR, err = %err, "Reconciliation error");
}
}
}
}
})
.await;
})
});

Expand Down
2 changes: 1 addition & 1 deletion eip_operator_shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ serde_json = "1"
thiserror = "1"
tokio-native-tls = { version = "0.3.1" }
tokio = { workspace = true }
tonic = { version = "0.12.1", features = ["transport"] }
tonic = { version = "0.12.3", features = ["transport"] }
tracing = "0.1"
tracing-opentelemetry = "0.25"
tracing-subscriber = { version = "0.3", features = [
Expand Down
Loading