Skip to content

Commit

Permalink
Add inbound policy index metrics (#12356)
Browse files Browse the repository at this point in the history
We add index size gauges for each of the resource indexes in the inbound policy index.

```
# HELP inbound_index_meshtls_authentication_index_size The number of MeshTLS authentications in index
# TYPE inbound_index_meshtls_authentication_index_size gauge
inbound_index_meshtls_authentication_index_size{namespace="linkerd-viz"} 1
# HELP inbound_index_network_autnetication_index_size The number of Network authentications in index
# TYPE inbound_index_network_autnetication_index_size gauge
inbound_index_network_autnetication_index_size{namespace="linkerd-viz"} 2
# HELP inbound_index_pod_index_size The number of pods in index
# TYPE inbound_index_pod_index_size gauge
inbound_index_pod_index_size{namespace="linkerd"} 3
inbound_index_pod_index_size{namespace="emojivoto"} 4
inbound_index_pod_index_size{namespace="linkerd-viz"} 5
# HELP inbound_index_external_workload_index_size The number of external workloads in index
# TYPE inbound_index_external_workload_index_size gauge
inbound_index_external_workload_index_size{namespace="linkerd"} 0
inbound_index_external_workload_index_size{namespace="emojivoto"} 0
inbound_index_external_workload_index_size{namespace="linkerd-viz"} 0
# HELP inbound_index_server_index_size The number of servers in index
# TYPE inbound_index_server_index_size gauge
inbound_index_server_index_size{namespace="linkerd"} 0
inbound_index_server_index_size{namespace="emojivoto"} 1
inbound_index_server_index_size{namespace="linkerd-viz"} 4
# HELP inbound_index_server_authorization_index_size The number of server authorizations in index
# TYPE inbound_index_server_authorization_index_size gauge
inbound_index_server_authorization_index_size{namespace="linkerd"} 0
inbound_index_server_authorization_index_size{namespace="emojivoto"} 0
inbound_index_server_authorization_index_size{namespace="linkerd-viz"} 0
# HELP inbound_index_authorization_policy_index_size The number of authorization policies in index
# TYPE inbound_index_authorization_policy_index_size gauge
inbound_index_authorization_policy_index_size{namespace="linkerd"} 0
inbound_index_authorization_policy_index_size{namespace="emojivoto"} 0
inbound_index_authorization_policy_index_size{namespace="linkerd-viz"} 4
# HELP inbound_index_http_route_index_size The number of HTTP routes in index
# TYPE inbound_index_http_route_index_size gauge
inbound_index_http_route_index_size{namespace="linkerd"} 0
inbound_index_http_route_index_size{namespace="emojivoto"} 600
inbound_index_http_route_index_size{namespace="linkerd-viz"} 0
```

Signed-off-by: Alex Leong <[email protected]>
  • Loading branch information
adleong authored May 8, 2024
1 parent abd9b7b commit 1fed61e
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,7 @@ dependencies = [
"linkerd-policy-controller-k8s-api",
"maplit",
"parking_lot",
"prometheus-client",
"thiserror",
"tokio",
"tokio-stream",
Expand Down
1 change: 1 addition & 0 deletions policy-controller/k8s/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ kubert = { version = "0.21.2", default-features = false, features = ["index"] }
linkerd-policy-controller-core = { path = "../../core" }
linkerd-policy-controller-k8s-api = { path = "../api" }
parking_lot = "0.12"
prometheus-client = { version = "0.22.0", default-features = false }
thiserror = "1"
tokio = { version = "1", features = ["macros", "rt", "sync"] }
tracing = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion policy-controller/k8s/index/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod server;
pub mod server_authorization;
mod workload;

pub use index::{Index, SharedIndex};
pub use index::{metrics, Index, SharedIndex};

#[cfg(test)]
mod tests;
3 changes: 2 additions & 1 deletion policy-controller/k8s/index/src/inbound/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use std::{
use tokio::sync::watch;
use tracing::info_span;

pub mod metrics;

pub type SharedIndex = Arc<RwLock<Index>>;

/// Holds all indexing state. Owned and updated by a single task that processes
Expand Down Expand Up @@ -1269,7 +1271,6 @@ impl ExternalWorkloadIndex {
}
}

//
impl ExternalWorkload {
/// Determines the policies for ports on this workload
fn reindex_servers(&mut self, policy: &PolicyIndex, authentications: &AuthenticationNsIndex) {
Expand Down
129 changes: 129 additions & 0 deletions policy-controller/k8s/index/src/inbound/index/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use prometheus_client::{
collector::Collector,
encoding::{DescriptorEncoder, EncodeMetric},
metrics::{gauge::ConstGauge, MetricType},
registry::Registry,
};

use super::SharedIndex;

#[derive(Debug)]
struct Instrumented(SharedIndex);

pub fn register(reg: &mut Registry, index: SharedIndex) {
reg.register_collector(Box::new(Instrumented(index)));
}

impl Collector for Instrumented {
fn encode(
&self,
mut encoder: DescriptorEncoder<'_>,
) -> std::prelude::v1::Result<(), std::fmt::Error> {
let this = self.0.read();

let mut meshtls_authn_encoder = encoder.encode_descriptor(
"meshtls_authentication_index_size",
"The number of MeshTLS authentications in index",
None,
MetricType::Gauge,
)?;
for (ns, auth) in &this.authentications.by_ns {
let labels = [("namespace", ns.as_str())];
let meshtls_authn = ConstGauge::new(auth.meshtls.len() as u32);
let meshtls_authn_encoder = meshtls_authn_encoder.encode_family(&labels)?;
meshtls_authn.encode(meshtls_authn_encoder)?;
}

let mut network_authn_encoder = encoder.encode_descriptor(
"network_authentication_index_size",
"The number of Network authentications in index",
None,
MetricType::Gauge,
)?;
for (ns, auth) in &this.authentications.by_ns {
let labels = [("namespace", ns.as_str())];
let network_authn = ConstGauge::new(auth.network.len() as u32);
let network_authn_encoder = network_authn_encoder.encode_family(&labels)?;
network_authn.encode(network_authn_encoder)?;
}

let mut pods_encoder = encoder.encode_descriptor(
"pod_index_size",
"The number of pods in index",
None,
MetricType::Gauge,
)?;
for (ns, index) in &this.namespaces.by_ns {
let labels = [("namespace", ns.as_str())];
let pods = ConstGauge::new(index.pods.by_name.len() as u32);
let pods_encoder = pods_encoder.encode_family(&labels)?;
pods.encode(pods_encoder)?;
}

let mut external_workloads_encoder = encoder.encode_descriptor(
"external_workload_index_size",
"The number of external workloads in index",
None,
MetricType::Gauge,
)?;
for (ns, index) in &this.namespaces.by_ns {
let labels = [("namespace", ns.as_str())];
let external_workloads = ConstGauge::new(index.external_workloads.by_name.len() as u32);
let external_workloads_encoder = external_workloads_encoder.encode_family(&labels)?;
external_workloads.encode(external_workloads_encoder)?;
}

let mut servers_encoder = encoder.encode_descriptor(
"server_index_size",
"The number of servers in index",
None,
MetricType::Gauge,
)?;
for (ns, index) in &this.namespaces.by_ns {
let labels = [("namespace", ns.as_str())];
let servers = ConstGauge::new(index.policy.servers.len() as u32);
let servers_encoder = servers_encoder.encode_family(&labels)?;
servers.encode(servers_encoder)?;
}

let mut server_authz_encoder = encoder.encode_descriptor(
"server_authorization_index_size",
"The number of server authorizations in index",
None,
MetricType::Gauge,
)?;
for (ns, index) in &this.namespaces.by_ns {
let labels = [("namespace", ns.as_str())];
let server_authz = ConstGauge::new(index.policy.server_authorizations.len() as u32);
let server_authz_encoder = server_authz_encoder.encode_family(&labels)?;
server_authz.encode(server_authz_encoder)?;
}

let mut authz_policies_encoder = encoder.encode_descriptor(
"authorization_policy_index_size",
"The number of authorization policies in index",
None,
MetricType::Gauge,
)?;
for (ns, index) in &this.namespaces.by_ns {
let labels = [("namespace", ns.as_str())];
let authz_policies = ConstGauge::new(index.policy.authorization_policies.len() as u32);
let authz_policies_encoder = authz_policies_encoder.encode_family(&labels)?;
authz_policies.encode(authz_policies_encoder)?;
}

let mut http_routes_encoder = encoder.encode_descriptor(
"http_route_index_size",
"The number of HTTP routes in index",
None,
MetricType::Gauge,
)?;
for (ns, index) in &this.namespaces.by_ns {
let labels = [("namespace", ns.as_str())];
let http_routes = ConstGauge::new(index.policy.http_routes.len() as u32);
let http_routes_encoder = http_routes_encoder.encode_family(&labels)?;
http_routes.encode(http_routes_encoder)?;
}
Ok(())
}
}
41 changes: 23 additions & 18 deletions policy-controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,6 @@ async fn main() -> Result<()> {
Some(server)
};

let mut prom = <Registry>::default();
let resource_status = prom.sub_registry_with_prefix("resource_status");
let status_metrics = status::ControllerMetrics::register(resource_status);
let status_index_metrcs = status::IndexMetrics::register(resource_status);

let mut runtime = kubert::Runtime::builder()
.with_log(log_level, log_format)
.with_admin(admin.into_builder().with_prometheus(prom))
.with_client(client)
.with_optional_server(server)
.build()
.await?;

let probe_networks = probe_networks.map(|IpNets(nets)| nets).unwrap_or_default();

let default_opaque_ports = parse_portset(&default_opaque_ports)?;
Expand All @@ -152,6 +139,29 @@ async fn main() -> Result<()> {
probe_networks,
});

// Build the API index data structures which will maintain information
// necessary for serving the inbound policy and outbound policy gRPC APIs.
let inbound_index = inbound::Index::shared(cluster_info.clone());
let outbound_index = outbound::Index::shared(cluster_info);

let mut prom = <Registry>::default();
let resource_status = prom.sub_registry_with_prefix("resource_status");
let status_metrics = status::ControllerMetrics::register(resource_status);
let status_index_metrcs = status::IndexMetrics::register(resource_status);

inbound::metrics::register(
prom.sub_registry_with_prefix("inbound_index"),
inbound_index.clone(),
);

let mut runtime = kubert::Runtime::builder()
.with_log(log_level, log_format)
.with_admin(admin.into_builder().with_prometheus(prom))
.with_client(client)
.with_optional_server(server)
.build()
.await?;

let hostname =
std::env::var("HOSTNAME").expect("Failed to fetch `HOSTNAME` environment variable");
let params = kubert::lease::ClaimParams {
Expand All @@ -167,11 +177,6 @@ async fn main() -> Result<()> {
.await?;
let (claims, _task) = lease.spawn(hostname.clone(), params).await?;

// Build the API index data structures which will maintain information
// necessary for serving the inbound policy and outbound policy gRPC APIs.
let inbound_index = inbound::Index::shared(cluster_info.clone());
let outbound_index = outbound::Index::shared(cluster_info);

// Build the status index which will maintain information necessary for
// updating the status field of policy resources.
let (updates_tx, updates_rx) = mpsc::channel(STATUS_UPDATE_QUEUE_SIZE);
Expand Down

0 comments on commit 1fed61e

Please sign in to comment.