From 84ef8ad3c9fb26429486d30b76a4da2ab7d64b1b Mon Sep 17 00:00:00 2001 From: Jakub Jarosz Date: Wed, 13 Mar 2024 17:44:58 +0000 Subject: [PATCH 1/2] Add number of NIC replicas to telemetry data --- .../nginx-ingress/templates/clusterrole.yaml | 6 ++ deployments/rbac/rbac.yaml | 6 ++ internal/k8s/controller.go | 13 ++- internal/telemetry/cluster.go | 21 +++++ internal/telemetry/cluster_test.go | 94 ++++++++++++++++++- internal/telemetry/collector.go | 12 +++ internal/telemetry/collector_test.go | 14 ++- internal/telemetry/exporter.go | 3 + 8 files changed, 162 insertions(+), 7 deletions(-) diff --git a/charts/nginx-ingress/templates/clusterrole.yaml b/charts/nginx-ingress/templates/clusterrole.yaml index 902d8bb4f7..2c2537bdbe 100644 --- a/charts/nginx-ingress/templates/clusterrole.yaml +++ b/charts/nginx-ingress/templates/clusterrole.yaml @@ -55,6 +55,12 @@ rules: - nodes verbs: - list +- apiGroups: + - "apps" + resources: + - replicasets + verbs: + - get - apiGroups: - networking.k8s.io resources: diff --git a/deployments/rbac/rbac.yaml b/deployments/rbac/rbac.yaml index b7c83e9667..c0eadb1225 100644 --- a/deployments/rbac/rbac.yaml +++ b/deployments/rbac/rbac.yaml @@ -11,6 +11,12 @@ rules: - get - list - watch +- apiGroups: + - "apps" + resources: + - replicasets + verbs: + - get - apiGroups: - "" resources: diff --git a/internal/k8s/controller.go b/internal/k8s/controller.go index b54cc855c5..10623103bd 100644 --- a/internal/k8s/controller.go +++ b/internal/k8s/controller.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net" + "os" "strconv" "strings" "sync" @@ -43,6 +44,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -347,17 +349,20 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc // NIC Telemetry Reporting if input.EnableTelemetryReporting { + lbc.telemetryChan = make(chan struct{}) + collectorConfig := telemetry.CollectorConfig{ K8sClientReader: input.KubeClient, CustomK8sClientReader: input.ConfClient, Period: 5 * time.Second, Configurator: lbc.configurator, Version: input.NICVersion, + PodNSName: types.NamespacedName{ + Namespace: os.Getenv("POD_NAMESPACE"), + Name: os.Getenv("POD_NAME"), + }, } - lbc.telemetryChan = make(chan struct{}) - collector, err := telemetry.NewCollector( - collectorConfig, - ) + collector, err := telemetry.NewCollector(collectorConfig) if err != nil { glog.Fatalf("failed to initialize telemetry collector: %v", err) } diff --git a/internal/telemetry/cluster.go b/internal/telemetry/cluster.go index 05e88edae3..12829813bd 100644 --- a/internal/telemetry/cluster.go +++ b/internal/telemetry/cluster.go @@ -3,6 +3,7 @@ package telemetry import ( "context" "errors" + "fmt" "strings" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -18,6 +19,26 @@ func (c *Collector) NodeCount(ctx context.Context) (int, error) { return len(nodes.Items), nil } +// ReplicaCount returns a number of running NIC replicas. +func (c *Collector) ReplicaCount(ctx context.Context) (int, error) { + pod, err := c.Config.K8sClientReader.CoreV1().Pods(c.Config.PodNSName.Namespace).Get(ctx, c.Config.PodNSName.Name, metaV1.GetOptions{}) + if err != nil { + return 0, err + } + podRef := pod.GetOwnerReferences() + if len(podRef) != 1 { + return 0, fmt.Errorf("expected pod owner reference to be 1, got %d", len(podRef)) + } + if podRef[0].Kind != "ReplicaSet" { + return 0, fmt.Errorf("expected pod owner reference to be ReplicaSet, got %s", pod.OwnerReferences[0].Kind) + } + rs, err := c.Config.K8sClientReader.AppsV1().ReplicaSets(c.Config.PodNSName.Namespace).Get(ctx, podRef[0].Name, metaV1.GetOptions{}) + if err != nil { + return 0, err + } + return int(*rs.Spec.Replicas), nil +} + // ClusterID returns the UID of the kube-system namespace representing cluster id. // It returns an error if the underlying k8s API client errors. func (c *Collector) ClusterID(ctx context.Context) (string, error) { diff --git a/internal/telemetry/cluster_test.go b/internal/telemetry/cluster_test.go index c37d78022c..94bcf88f01 100644 --- a/internal/telemetry/cluster_test.go +++ b/internal/telemetry/cluster_test.go @@ -5,9 +5,11 @@ import ( "testing" "github.com/nginxinc/kubernetes-ingress/internal/telemetry" + appsV1 "k8s.io/api/apps/v1" apiCoreV1 "k8s.io/api/core/v1" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ) func TestNodeCountInAClusterWithThreeNodes(t *testing.T) { @@ -350,6 +352,23 @@ func TestPlatformLookupOnMalformedPartialPlatformIDField(t *testing.T) { } } +func TestReplicaCountReturnsNumberOfNICPods(t *testing.T) { + t.Parallel() + + c := newTestCollectorForClusterWithNodes(t, node1, pod, replica) + + got, err := c.ReplicaCount(context.Background()) + if err != nil { + t.Fatal(err) + } + + want := 1 + + if want != got { + t.Errorf("want %d, got %d", want, got) + } +} + // newTestCollectorForClusterWithNodes returns a telemetry collector configured // to simulate collecting data on a cluser with provided nodes. func newTestCollectorForClusterWithNodes(t *testing.T, nodes ...runtime.Object) *telemetry.Collector { @@ -362,9 +381,82 @@ func newTestCollectorForClusterWithNodes(t *testing.T, nodes ...runtime.Object) t.Fatal(err) } c.Config.K8sClientReader = newTestClientset(nodes...) + c.Config.PodNSName = types.NamespacedName{ + Namespace: "nginx-ingress", + Name: "nginx-ingress", + } return c } +// Pod and ReplicaSet for testing NIC replica sets. +var ( + pod = &apiCoreV1.Pod{ + TypeMeta: metaV1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metaV1.ObjectMeta{ + Name: "nginx-ingress", + Namespace: "nginx-ingress", + OwnerReferences: []metaV1.OwnerReference{ + { + Kind: "ReplicaSet", + Name: "nginx-ingress", + }, + }, + Labels: map[string]string{ + "app": "nginx-ingress", + "app.kubernetes.io/name": "nginx-ingress", + }, + }, + Spec: apiCoreV1.PodSpec{ + Containers: []apiCoreV1.Container{ + { + Name: "nginx-ingress", + Image: "nginx-ingress", + ImagePullPolicy: "Always", + Env: []apiCoreV1.EnvVar{ + { + Name: "POD_NAMESPACE", + Value: "nginx-ingress", + }, + { + Name: "POD_NAME", + Value: "nginx-ingress", + }, + }, + }, + }, + }, + } + + replicaNum int32 = 1 + replica = &appsV1.ReplicaSet{ + TypeMeta: metaV1.TypeMeta{ + Kind: "ReplicaSet", + APIVersion: "apps/v1", + }, + ObjectMeta: metaV1.ObjectMeta{ + Name: "nginx-ingress", + Namespace: "nginx-ingress", + Labels: map[string]string{ + "app": "nginx-ingress", + "app.kubernetes.io/name": "nginx-ingress", + }, + }, + + Spec: appsV1.ReplicaSetSpec{ + Replicas: &replicaNum, + }, + Status: appsV1.ReplicaSetStatus{ + Replicas: replicaNum, + ReadyReplicas: replicaNum, + AvailableReplicas: replicaNum, + }, + } +) + +// Nodes for testing NIC namespaces. var ( node1 = &apiCoreV1.Node{ TypeMeta: metaV1.TypeMeta{ @@ -373,7 +465,7 @@ var ( }, ObjectMeta: metaV1.ObjectMeta{ Name: "test-node-1", - Namespace: "default", + Namespace: "nginx-ingress", }, Spec: apiCoreV1.NodeSpec{}, } diff --git a/internal/telemetry/collector.go b/internal/telemetry/collector.go index 583e9f424b..956c5f3ec5 100644 --- a/internal/telemetry/collector.go +++ b/internal/telemetry/collector.go @@ -12,6 +12,7 @@ import ( "github.com/nginxinc/kubernetes-ingress/internal/configs" k8s_nginx "github.com/nginxinc/kubernetes-ingress/pkg/client/clientset/versioned" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -58,6 +59,9 @@ type CollectorConfig struct { // Version represents NIC version. Version string + + // PodNSName represents NIC Pod's NamespacedName. + PodNSName types.NamespacedName } // NewCollector takes 0 or more options and creates a new TraceReporter. @@ -104,6 +108,7 @@ func (c *Collector) Collect(ctx context.Context) { VirtualServers: int64(report.VirtualServers), VirtualServerRoutes: int64(report.VirtualServerRoutes), TransportServers: int64(report.TransportServers), + Replicas: int64(report.NICReplicaCount), }, } @@ -125,6 +130,7 @@ type Report struct { ClusterVersion string ClusterPlatform string ClusterNodeCount int + NICReplicaCount int VirtualServers int VirtualServerRoutes int TransportServers int @@ -161,6 +167,11 @@ func (c *Collector) BuildReport(ctx context.Context) (Report, error) { glog.Errorf("Error collecting telemetry data: Platform: %v", err) } + replicas, err := c.ReplicaCount(ctx) + if err != nil { + glog.Errorf("Error collecting telemetry data: Replicas: %v", err) + } + return Report{ Name: "NIC", Version: c.Config.Version, @@ -169,6 +180,7 @@ func (c *Collector) BuildReport(ctx context.Context) (Report, error) { ClusterVersion: version, ClusterPlatform: platform, ClusterNodeCount: nodes, + NICReplicaCount: replicas, VirtualServers: vsCount, VirtualServerRoutes: vsrCount, TransportServers: tsCount, diff --git a/internal/telemetry/collector_test.go b/internal/telemetry/collector_test.go index dfba4ce8fb..82036c2e91 100644 --- a/internal/telemetry/collector_test.go +++ b/internal/telemetry/collector_test.go @@ -19,8 +19,10 @@ import ( tel "github.com/nginxinc/telemetry-exporter/pkg/telemetry" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/version" fakediscovery "k8s.io/client-go/discovery/fake" + testClient "k8s.io/client-go/kubernetes/fake" ) @@ -342,13 +344,17 @@ func TestCountVirtualServers(t *testing.T) { configurator := newConfigurator(t) c, err := telemetry.NewCollector(telemetry.CollectorConfig{ - K8sClientReader: newTestClientset(kubeNS, node1), + K8sClientReader: newTestClientset(kubeNS, node1, pod, replica), Configurator: configurator, Version: telemetryNICData.ProjectVersion, }) if err != nil { t.Fatal(err) } + c.Config.PodNSName = types.NamespacedName{ + Namespace: "nginx-ingress", + Name: "nginx-ingress", + } for _, vs := range test.virtualServers { _, err := configurator.AddOrUpdateVirtualServer(vs) @@ -503,13 +509,17 @@ func TestCountTransportServers(t *testing.T) { configurator := newConfigurator(t) c, err := telemetry.NewCollector(telemetry.CollectorConfig{ - K8sClientReader: newTestClientset(kubeNS, node1), + K8sClientReader: newTestClientset(kubeNS, node1, pod, replica), Configurator: configurator, Version: telemetryNICData.ProjectVersion, }) if err != nil { t.Fatal(err) } + c.Config.PodNSName = types.NamespacedName{ + Namespace: "nginx-ingress", + Name: "nginx-ingress", + } for _, ts := range test.transportServers { _, err := configurator.AddOrUpdateTransportServer(ts) diff --git a/internal/telemetry/exporter.go b/internal/telemetry/exporter.go index 3969fa3852..412cb9e6af 100644 --- a/internal/telemetry/exporter.go +++ b/internal/telemetry/exporter.go @@ -42,4 +42,7 @@ type NICResourceCounts struct { VirtualServerRoutes int64 // TransportServers is the number of TransportServers managed by the Ingress Controller. TransportServers int64 + + // Replicas is the number of NIC replicas. + Replicas int64 } From 12caa498e4b1591f42958c2f501658eeb173fda3 Mon Sep 17 00:00:00 2001 From: Jakub Jarosz Date: Wed, 13 Mar 2024 20:48:17 +0000 Subject: [PATCH 2/2] Add daemonset to telemetry data --- .../nginx-ingress/templates/clusterrole.yaml | 1 + deployments/rbac/rbac.yaml | 1 + internal/telemetry/cluster.go | 23 +++-- internal/telemetry/cluster_test.go | 91 ++++++++++++++++--- internal/telemetry/collector_test.go | 4 +- 5 files changed, 96 insertions(+), 24 deletions(-) diff --git a/charts/nginx-ingress/templates/clusterrole.yaml b/charts/nginx-ingress/templates/clusterrole.yaml index 2c2537bdbe..a231ca820e 100644 --- a/charts/nginx-ingress/templates/clusterrole.yaml +++ b/charts/nginx-ingress/templates/clusterrole.yaml @@ -59,6 +59,7 @@ rules: - "apps" resources: - replicasets + - daemonsets verbs: - get - apiGroups: diff --git a/deployments/rbac/rbac.yaml b/deployments/rbac/rbac.yaml index c0eadb1225..6427b4e8e7 100644 --- a/deployments/rbac/rbac.yaml +++ b/deployments/rbac/rbac.yaml @@ -15,6 +15,7 @@ rules: - "apps" resources: - replicasets + - daemonsets verbs: - get - apiGroups: diff --git a/internal/telemetry/cluster.go b/internal/telemetry/cluster.go index 12829813bd..abf63ec672 100644 --- a/internal/telemetry/cluster.go +++ b/internal/telemetry/cluster.go @@ -29,14 +29,23 @@ func (c *Collector) ReplicaCount(ctx context.Context) (int, error) { if len(podRef) != 1 { return 0, fmt.Errorf("expected pod owner reference to be 1, got %d", len(podRef)) } - if podRef[0].Kind != "ReplicaSet" { - return 0, fmt.Errorf("expected pod owner reference to be ReplicaSet, got %s", pod.OwnerReferences[0].Kind) - } - rs, err := c.Config.K8sClientReader.AppsV1().ReplicaSets(c.Config.PodNSName.Namespace).Get(ctx, podRef[0].Name, metaV1.GetOptions{}) - if err != nil { - return 0, err + + switch podRef[0].Kind { + case "ReplicaSet": + rs, err := c.Config.K8sClientReader.AppsV1().ReplicaSets(c.Config.PodNSName.Namespace).Get(ctx, podRef[0].Name, metaV1.GetOptions{}) + if err != nil { + return 0, err + } + return int(*rs.Spec.Replicas), nil + case "DaemonSet": + ds, err := c.Config.K8sClientReader.AppsV1().DaemonSets(c.Config.PodNSName.Namespace).Get(ctx, podRef[0].Name, metaV1.GetOptions{}) + if err != nil { + return 0, err + } + return int(ds.Status.CurrentNumberScheduled), nil + default: + return 0, fmt.Errorf("expected pod owner reference to be ReplicaSet or DeamonSet, got %s", podRef[0].Kind) } - return int(*rs.Spec.Replicas), nil } // ClusterID returns the UID of the kube-system namespace representing cluster id. diff --git a/internal/telemetry/cluster_test.go b/internal/telemetry/cluster_test.go index 94bcf88f01..6911fd57ef 100644 --- a/internal/telemetry/cluster_test.go +++ b/internal/telemetry/cluster_test.go @@ -352,10 +352,10 @@ func TestPlatformLookupOnMalformedPartialPlatformIDField(t *testing.T) { } } -func TestReplicaCountReturnsNumberOfNICPods(t *testing.T) { +func TestReplicaCountReturnsNumberOfNICReplicas(t *testing.T) { t.Parallel() - c := newTestCollectorForClusterWithNodes(t, node1, pod, replica) + c := newTestCollectorForClusterWithNodes(t, node1, pod1, replica) got, err := c.ReplicaCount(context.Background()) if err != nil { @@ -363,7 +363,21 @@ func TestReplicaCountReturnsNumberOfNICPods(t *testing.T) { } want := 1 + if want != got { + t.Errorf("want %d, got %d", want, got) + } +} + +func TestReplicaCountReturnsNumberOfNICDaemonSets(t *testing.T) { + t.Parallel() + + c := newTestCollectorForClusterWithNodes(t, node1, pod2, daemon) + got, err := c.ReplicaCount(context.Background()) + if err != nil { + t.Fatal(err) + } + want := 1 if want != got { t.Errorf("want %d, got %d", want, got) } @@ -390,7 +404,7 @@ func newTestCollectorForClusterWithNodes(t *testing.T, nodes ...runtime.Object) // Pod and ReplicaSet for testing NIC replica sets. var ( - pod = &apiCoreV1.Pod{ + pod1 = &apiCoreV1.Pod{ TypeMeta: metaV1.TypeMeta{ Kind: "Pod", APIVersion: "v1", @@ -454,6 +468,65 @@ var ( AvailableReplicas: replicaNum, }, } + + pod2 = &apiCoreV1.Pod{ + TypeMeta: metaV1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metaV1.ObjectMeta{ + Name: "nginx-ingress", + Namespace: "nginx-ingress", + OwnerReferences: []metaV1.OwnerReference{ + { + Kind: "DaemonSet", + Name: "nginx-ingress", + }, + }, + Labels: map[string]string{ + "app": "nginx-ingress", + "app.kubernetes.io/name": "nginx-ingress", + }, + }, + Spec: apiCoreV1.PodSpec{ + Containers: []apiCoreV1.Container{ + { + Name: "nginx-ingress", + Image: "nginx-ingress", + ImagePullPolicy: "Always", + Env: []apiCoreV1.EnvVar{ + { + Name: "POD_NAMESPACE", + Value: "nginx-ingress", + }, + { + Name: "POD_NAME", + Value: "nginx-ingress", + }, + }, + }, + }, + }, + } + + daemonNum int32 = 1 + daemon = &appsV1.DaemonSet{ + TypeMeta: metaV1.TypeMeta{ + Kind: "DaemonSet", + APIVersion: "apps/v1", + }, + ObjectMeta: metaV1.ObjectMeta{ + Name: "nginx-ingress", + Namespace: "nginx-ingress", + Labels: map[string]string{"app": "nginx-ingress"}, + }, + Spec: appsV1.DaemonSetSpec{}, + Status: appsV1.DaemonSetStatus{ + CurrentNumberScheduled: daemonNum, + NumberReady: daemonNum, + NumberAvailable: daemonNum, + }, + } ) // Nodes for testing NIC namespaces. @@ -505,18 +578,6 @@ var ( }, Spec: apiCoreV1.NamespaceSpec{}, } - - dummyKubeNS = &apiCoreV1.Namespace{ - TypeMeta: metaV1.TypeMeta{ - Kind: "Namespace", - APIVersion: "v1", - }, - ObjectMeta: metaV1.ObjectMeta{ - Name: "kube-system", - UID: "", - }, - Spec: apiCoreV1.NamespaceSpec{}, - } ) // Cloud providers' nodes for testing ProviderID lookups. diff --git a/internal/telemetry/collector_test.go b/internal/telemetry/collector_test.go index 82036c2e91..5e177cbe4b 100644 --- a/internal/telemetry/collector_test.go +++ b/internal/telemetry/collector_test.go @@ -344,7 +344,7 @@ func TestCountVirtualServers(t *testing.T) { configurator := newConfigurator(t) c, err := telemetry.NewCollector(telemetry.CollectorConfig{ - K8sClientReader: newTestClientset(kubeNS, node1, pod, replica), + K8sClientReader: newTestClientset(kubeNS, node1, pod1, replica), Configurator: configurator, Version: telemetryNICData.ProjectVersion, }) @@ -509,7 +509,7 @@ func TestCountTransportServers(t *testing.T) { configurator := newConfigurator(t) c, err := telemetry.NewCollector(telemetry.CollectorConfig{ - K8sClientReader: newTestClientset(kubeNS, node1, pod, replica), + K8sClientReader: newTestClientset(kubeNS, node1, pod1, replica), Configurator: configurator, Version: telemetryNICData.ProjectVersion, })