Skip to content

Commit

Permalink
fix(supportability): skip logs for pending pods
Browse files Browse the repository at this point in the history
Signed-off-by: sinhaashish <[email protected]>
  • Loading branch information
sinhaashish committed Dec 3, 2024
1 parent 660b714 commit f262efb
Showing 1 changed file with 98 additions and 75 deletions.
173 changes: 98 additions & 75 deletions k8s/supportability/src/collect/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ use crate::collect::{
utils::log,
};
use async_trait::async_trait;
use k8s_openapi::api::core::v1::Pod;
use std::{collections::HashSet, iter::Iterator, path::PathBuf};
use k8s_openapi::api::core::v1::{Node, Pod};
use std::{
collections::{HashMap, HashSet},
iter::Iterator,
path::PathBuf,
};

/// Error that can occur while interacting with logs module
#[derive(Debug)]
Expand Down Expand Up @@ -109,95 +113,114 @@ impl LogCollection {
}))
}

async fn get_logging_resources(
async fn pod_logging_resources(
&self,
pods: Vec<Pod>,
pod: Pod,
nodes_map: HashMap<String, Node>,
) -> Result<HashSet<LogResource>, LogError> {
let nodes_map = self
.k8s_logger_client
.get_k8s_clientset()
.get_nodes_map()
.await?;
let mut logging_resources = HashSet::new();

for pod in pods {
let service_name = pod
.metadata
.labels
.as_ref()
let service_name = pod
.metadata
.labels
.as_ref()
.ok_or_else(|| {
K8sResourceError::invalid_k8s_resource_value(format!(
"No labels found in pod {:?}",
pod.metadata.name
))
})?
.get("app")
.unwrap_or(&"".to_string())
.clone();

let mut hostname = None;
if is_host_name_required(service_name.clone()) {
let node_name = pod
.spec
.clone()
.ok_or_else(|| {
K8sResourceError::invalid_k8s_resource_value(format!(
"No labels found in pod {:?}",
"Pod spec not found in pod {:?} resource",
pod.metadata.name
))
})?
.get("app")
.unwrap_or(&"".to_string())
.node_name
.as_ref()
.ok_or_else(|| {
K8sResourceError::invalid_k8s_resource_value(
"Node name not found in running pod resource".to_string(),
)
})?
.clone();

let mut hostname = None;
if is_host_name_required(service_name.clone()) {
let node_name = pod
.spec
.clone()
hostname = Some(
nodes_map
.get(node_name.as_str())
.ok_or_else(|| {
K8sResourceError::invalid_k8s_resource_value(format!(
"Pod spec not found in pod {:?} resource",
pod.metadata.name
"Unable to find node: {} object",
node_name.clone()
))
})?
.node_name
.metadata
.labels
.as_ref()
.ok_or_else(|| {
K8sResourceError::invalid_k8s_resource_value(
"Node name not found in running pod resource".to_string(),
)
K8sResourceError::invalid_k8s_resource_value(format!(
"No labels found in node {}",
node_name.clone()
))
})?
.clone();
hostname = Some(
nodes_map
.get(node_name.as_str())
.ok_or_else(|| {
K8sResourceError::invalid_k8s_resource_value(format!(
"Unable to find node: {} object",
node_name.clone()
))
})?
.metadata
.labels
.as_ref()
.ok_or_else(|| {
K8sResourceError::invalid_k8s_resource_value(format!(
"No labels found in node {}",
node_name.clone()
))
})?
.get(KUBERNETES_HOST_LABEL_KEY)
.ok_or_else(|| {
K8sResourceError::invalid_k8s_resource_value(format!(
"Hostname not found for node {}",
node_name.clone()
))
})?
.clone(),
);
}
// Since pod object fetched from Kube-apiserver there will be always
// spec associated to pod
let containers = pod
.spec
.ok_or_else(|| {
K8sResourceError::invalid_k8s_resource_value("Pod sepc not found".to_string())
})?
.containers;

for container in containers {
logging_resources.insert(LogResource {
container_name: container.name,
host_name: hostname.clone(),
label_selector: format!("app={}", service_name.clone()),
service_type: service_name.clone(),
});
.get(KUBERNETES_HOST_LABEL_KEY)
.ok_or_else(|| {
K8sResourceError::invalid_k8s_resource_value(format!(
"Hostname not found for node {}",
node_name.clone()
))
})?
.clone(),
);
}
// Since pod object fetched from Kube-apiserver there will be always
// spec associated to pod
let containers = pod
.spec
.ok_or_else(|| {
K8sResourceError::invalid_k8s_resource_value("Pod sepc not found".to_string())
})?
.containers;

for container in containers {
logging_resources.insert(LogResource {
container_name: container.name,
host_name: hostname.clone(),
label_selector: format!("app={}", service_name.clone()),
service_type: service_name.clone(),
});
}
Ok(logging_resources)
}

async fn get_logging_resources(
&self,
pods: Vec<Pod>,
) -> Result<HashSet<LogResource>, LogError> {
let nodes_map = self
.k8s_logger_client
.get_k8s_clientset()
.get_nodes_map()
.await?;
let mut logging_resources = HashSet::new();

for pod in pods {
match self
.pod_logging_resources(pod.clone(), nodes_map.clone())
.await
{
Ok(resources) => logging_resources.extend(resources),
Err(error) => log(format!(
"Skipping the pod in pending state: {:?} error : {:?}",
pod.metadata.name, error
)),
}
}
Ok(logging_resources)
Expand Down

0 comments on commit f262efb

Please sign in to comment.