Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add inbound policy index metrics #12356

Merged
merged 3 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,7 @@ dependencies = [
"linkerd-policy-controller-k8s-api",
"maplit",
"parking_lot",
"prometheus-client",
"thiserror",
"tokio",
"tokio-stream",
Expand Down
1 change: 1 addition & 0 deletions policy-controller/k8s/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ kubert = { version = "0.21.2", default-features = false, features = ["index"] }
linkerd-policy-controller-core = { path = "../../core" }
linkerd-policy-controller-k8s-api = { path = "../api" }
parking_lot = "0.12"
prometheus-client = { version = "0.22.0", default-features = false }
thiserror = "1"
tokio = { version = "1", features = ["macros", "rt", "sync"] }
tracing = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion policy-controller/k8s/index/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod server;
pub mod server_authorization;
mod workload;

pub use index::{Index, SharedIndex};
pub use index::{metrics, Index, SharedIndex};

#[cfg(test)]
mod tests;
3 changes: 2 additions & 1 deletion policy-controller/k8s/index/src/inbound/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use std::{
use tokio::sync::watch;
use tracing::info_span;

pub mod metrics;

pub type SharedIndex = Arc<RwLock<Index>>;

/// Holds all indexing state. Owned and updated by a single task that processes
Expand Down Expand Up @@ -1269,7 +1271,6 @@ impl ExternalWorkloadIndex {
}
}

//
impl ExternalWorkload {
/// Determines the policies for ports on this workload
fn reindex_servers(&mut self, policy: &PolicyIndex, authentications: &AuthenticationNsIndex) {
Expand Down
129 changes: 129 additions & 0 deletions policy-controller/k8s/index/src/inbound/index/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use prometheus_client::{
collector::Collector,
encoding::{DescriptorEncoder, EncodeMetric},
metrics::{gauge::ConstGauge, MetricType},
registry::Registry,
};

use super::SharedIndex;

#[derive(Debug)]
struct Instrumented(SharedIndex);

pub fn register(reg: &mut Registry, index: SharedIndex) {
reg.register_collector(Box::new(Instrumented(index)));
}

impl Collector for Instrumented {
fn encode(
&self,
mut encoder: DescriptorEncoder<'_>,
) -> std::prelude::v1::Result<(), std::fmt::Error> {
let this = self.0.read();

let mut meshtls_authn_encoder = encoder.encode_descriptor(
"meshtls_authentication_index_size",
"The number of MeshTLS authentications in index",
None,
MetricType::Gauge,
)?;
for (ns, auth) in &this.authentications.by_ns {
let labels = [("namespace", ns.as_str())];
let meshtls_authn = ConstGauge::new(auth.meshtls.len() as u32);
let meshtls_authn_encoder = meshtls_authn_encoder.encode_family(&labels)?;
meshtls_authn.encode(meshtls_authn_encoder)?;
}

let mut network_authn_encoder = encoder.encode_descriptor(
"network_authentication_index_size",
"The number of Network authentications in index",
None,
MetricType::Gauge,
)?;
for (ns, auth) in &this.authentications.by_ns {
let labels = [("namespace", ns.as_str())];
let network_authn = ConstGauge::new(auth.network.len() as u32);
let network_authn_encoder = network_authn_encoder.encode_family(&labels)?;
network_authn.encode(network_authn_encoder)?;
}

let mut pods_encoder = encoder.encode_descriptor(
"pod_index_size",
"The number of pods in index",
None,
MetricType::Gauge,
)?;
for (ns, index) in &this.namespaces.by_ns {
let labels = [("namespace", ns.as_str())];
let pods = ConstGauge::new(index.pods.by_name.len() as u32);
let pods_encoder = pods_encoder.encode_family(&labels)?;
pods.encode(pods_encoder)?;
}

let mut external_workloads_encoder = encoder.encode_descriptor(
"external_workload_index_size",
"The number of external workloads in index",
None,
MetricType::Gauge,
)?;
for (ns, index) in &this.namespaces.by_ns {
let labels = [("namespace", ns.as_str())];
let external_workloads = ConstGauge::new(index.external_workloads.by_name.len() as u32);
let external_workloads_encoder = external_workloads_encoder.encode_family(&labels)?;
external_workloads.encode(external_workloads_encoder)?;
}

let mut servers_encoder = encoder.encode_descriptor(
"server_index_size",
"The number of servers in index",
None,
MetricType::Gauge,
)?;
for (ns, index) in &this.namespaces.by_ns {
let labels = [("namespace", ns.as_str())];
let servers = ConstGauge::new(index.policy.servers.len() as u32);
let servers_encoder = servers_encoder.encode_family(&labels)?;
servers.encode(servers_encoder)?;
}

let mut server_authz_encoder = encoder.encode_descriptor(
"server_authorization_index_size",
"The number of server authorizations in index",
None,
MetricType::Gauge,
)?;
for (ns, index) in &this.namespaces.by_ns {
let labels = [("namespace", ns.as_str())];
let server_authz = ConstGauge::new(index.policy.server_authorizations.len() as u32);
let server_authz_encoder = server_authz_encoder.encode_family(&labels)?;
server_authz.encode(server_authz_encoder)?;
}

let mut authz_policies_encoder = encoder.encode_descriptor(
"authorization_policy_index_size",
"The number of authorization policies in index",
None,
MetricType::Gauge,
)?;
for (ns, index) in &this.namespaces.by_ns {
let labels = [("namespace", ns.as_str())];
let authz_policies = ConstGauge::new(index.policy.authorization_policies.len() as u32);
let authz_policies_encoder = authz_policies_encoder.encode_family(&labels)?;
authz_policies.encode(authz_policies_encoder)?;
}

let mut http_routes_encoder = encoder.encode_descriptor(
"http_route_index_size",
"The number of HTTP routes in index",
None,
MetricType::Gauge,
)?;
for (ns, index) in &this.namespaces.by_ns {
let labels = [("namespace", ns.as_str())];
let http_routes = ConstGauge::new(index.policy.http_routes.len() as u32);
let http_routes_encoder = http_routes_encoder.encode_family(&labels)?;
http_routes.encode(http_routes_encoder)?;
}
Ok(())
}
}
41 changes: 23 additions & 18 deletions policy-controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,6 @@ async fn main() -> Result<()> {
Some(server)
};

let mut prom = <Registry>::default();
let resource_status = prom.sub_registry_with_prefix("resource_status");
let status_metrics = status::ControllerMetrics::register(resource_status);
let status_index_metrcs = status::IndexMetrics::register(resource_status);

let mut runtime = kubert::Runtime::builder()
.with_log(log_level, log_format)
.with_admin(admin.into_builder().with_prometheus(prom))
.with_client(client)
.with_optional_server(server)
.build()
.await?;

let probe_networks = probe_networks.map(|IpNets(nets)| nets).unwrap_or_default();

let default_opaque_ports = parse_portset(&default_opaque_ports)?;
Expand All @@ -152,6 +139,29 @@ async fn main() -> Result<()> {
probe_networks,
});

// Build the API index data structures which will maintain information
// necessary for serving the inbound policy and outbound policy gRPC APIs.
let inbound_index = inbound::Index::shared(cluster_info.clone());
let outbound_index = outbound::Index::shared(cluster_info);

let mut prom = <Registry>::default();
let resource_status = prom.sub_registry_with_prefix("resource_status");
let status_metrics = status::ControllerMetrics::register(resource_status);
let status_index_metrcs = status::IndexMetrics::register(resource_status);

inbound::metrics::register(
prom.sub_registry_with_prefix("inbound_index"),
inbound_index.clone(),
);

let mut runtime = kubert::Runtime::builder()
.with_log(log_level, log_format)
.with_admin(admin.into_builder().with_prometheus(prom))
.with_client(client)
.with_optional_server(server)
.build()
.await?;

let hostname =
std::env::var("HOSTNAME").expect("Failed to fetch `HOSTNAME` environment variable");
let params = kubert::lease::ClaimParams {
Expand All @@ -167,11 +177,6 @@ async fn main() -> Result<()> {
.await?;
let (claims, _task) = lease.spawn(hostname.clone(), params).await?;

// Build the API index data structures which will maintain information
// necessary for serving the inbound policy and outbound policy gRPC APIs.
let inbound_index = inbound::Index::shared(cluster_info.clone());
let outbound_index = outbound::Index::shared(cluster_info);

// Build the status index which will maintain information necessary for
// updating the status field of policy resources.
let (updates_tx, updates_rx) = mpsc::channel(STATUS_UPDATE_QUEUE_SIZE);
Expand Down