Skip to content

Commit

Permalink
Fix optional resource deletion for collector CR (#3494)
Browse files Browse the repository at this point in the history
* Fix optional resource deletion for collector CR

* Refactor ConfigMap version keeping

* Revert change to clusterrole management

* Code Review fixes
  • Loading branch information
swiatekm authored Dec 18, 2024
1 parent 712cfe3 commit 5eefae8
Show file tree
Hide file tree
Showing 11 changed files with 851 additions and 111 deletions.
16 changes: 16 additions & 0 deletions .chloggen/fix_remove-optional-resources.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: collector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix deletion of optional resources for OpenTelemetryCollector CRs

# One or more tracking issues related to the change
issues: [3454]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
26 changes: 20 additions & 6 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (

"github.com/go-logr/logr"
rbacv1 "k8s.io/api/rbac/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
Expand Down Expand Up @@ -119,18 +119,32 @@ func BuildTargetAllocator(params targetallocator.Params) ([]client.Object, error
// getList queries the Kubernetes API to list the requested resource, setting the list l of type T.
func getList[T client.Object](ctx context.Context, cl client.Client, l T, options ...client.ListOption) (map[types.UID]client.Object, error) {
ownedObjects := map[types.UID]client.Object{}
list := &unstructured.UnstructuredList{}
gvk, err := apiutil.GVKForObject(l, cl.Scheme())
if err != nil {
return nil, err
}
list.SetGroupVersionKind(gvk)
err = cl.List(ctx, list, options...)
gvk.Kind = fmt.Sprintf("%sList", gvk.Kind)
list, err := cl.Scheme().New(gvk)
if err != nil {
return nil, fmt.Errorf("unable to list objects of type %s: %w", gvk.Kind, err)
}

objList := list.(client.ObjectList)

err = cl.List(ctx, objList, options...)
if err != nil {
return ownedObjects, fmt.Errorf("error listing %T: %w", l, err)
}
for i := range list.Items {
ownedObjects[list.Items[i].GetUID()] = &list.Items[i]
objs, err := apimeta.ExtractList(objList)
if err != nil {
return ownedObjects, fmt.Errorf("error listing %T: %w", l, err)
}
for i := range objs {
typedObj, ok := objs[i].(T)
if !ok {
return ownedObjects, fmt.Errorf("error listing %T: %w", l, err)
}
ownedObjects[typedObj.GetUID()] = typedObj
}
return ownedObjects, nil
}
Expand Down
165 changes: 99 additions & 66 deletions controllers/opentelemetrycollector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package controllers

import (
"context"
"fmt"
"sort"

"github.com/go-logr/logr"
Expand All @@ -30,12 +29,14 @@ import (
policyV1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
Expand All @@ -53,6 +54,8 @@ import (
"github.com/open-telemetry/opentelemetry-operator/pkg/featuregate"
)

const resourceOwnerKey = ".metadata.owner"

var (
ownedClusterObjectTypes = []client.Object{
&rbacv1.ClusterRole{},
Expand Down Expand Up @@ -82,51 +85,42 @@ type Params struct {

func (r *OpenTelemetryCollectorReconciler) findOtelOwnedObjects(ctx context.Context, params manifests.Params) (map[types.UID]client.Object, error) {
ownedObjects := map[types.UID]client.Object{}
ownedObjectTypes := []client.Object{
&autoscalingv2.HorizontalPodAutoscaler{},
&networkingv1.Ingress{},
&policyV1.PodDisruptionBudget{},
}
listOps := &client.ListOptions{
Namespace: params.OtelCol.Namespace,
LabelSelector: labels.SelectorFromSet(manifestutils.SelectorLabels(params.OtelCol.ObjectMeta, collector.ComponentOpenTelemetryCollector)),
}
if featuregate.PrometheusOperatorIsAvailable.IsEnabled() && r.config.PrometheusCRAvailability() == prometheus.Available {
ownedObjectTypes = append(ownedObjectTypes,
&monitoringv1.ServiceMonitor{},
&monitoringv1.PodMonitor{},
)
}
if params.Config.OpenShiftRoutesAvailability() == openshift.RoutesAvailable {
ownedObjectTypes = append(ownedObjectTypes, &routev1.Route{})
collectorConfigMaps := []*corev1.ConfigMap{}
ownedObjectTypes := r.GetOwnedResourceTypes()
listOpts := []client.ListOption{
client.InNamespace(params.OtelCol.Namespace),
client.MatchingFields{resourceOwnerKey: params.OtelCol.Name},
}
for _, objectType := range ownedObjectTypes {
objs, err := getList(ctx, r, objectType, listOps)
objs, err := getList(ctx, r, objectType, listOpts...)
if err != nil {
return nil, err
}
for uid, object := range objs {
ownedObjects[uid] = object
}
}
if params.Config.CreateRBACPermissions() == rbac.Available {
objs, err := r.findClusterRoleObjects(ctx, params)
if err != nil {
return nil, err
}
for uid, object := range objs {
ownedObjects[uid] = object
// save Collector ConfigMaps into a separate slice, we need to do additional filtering on them
switch objectType.(type) {
case *corev1.ConfigMap:
for _, object := range objs {
if !featuregate.CollectorUsesTargetAllocatorCR.IsEnabled() && object.GetLabels()["app.kubernetes.io/component"] != "opentelemetry-collector" {
// we only apply this to collector ConfigMaps
continue
}
configMap := object.(*corev1.ConfigMap)
collectorConfigMaps = append(collectorConfigMaps, configMap)
}
default:
}
}

configMapList := &corev1.ConfigMapList{}
err := r.List(ctx, configMapList, listOps)
if err != nil {
return nil, fmt.Errorf("error listing ConfigMaps: %w", err)
}
ownedConfigMaps := r.getConfigMapsToRemove(params.OtelCol.Spec.ConfigVersions, configMapList)
for i := range ownedConfigMaps {
ownedObjects[ownedConfigMaps[i].GetUID()] = &ownedConfigMaps[i]
// at this point we don't know if the most recent ConfigMap will still be the most recent after reconciliation, or
// if a new one will be created. We keep one additional ConfigMap to account for this. The next reconciliation that
// doesn't spawn a new ConfigMap will delete the extra one we kept here.
configVersionsToKeep := max(params.OtelCol.Spec.ConfigVersions, 1) + 1
configMapsToKeep := getCollectorConfigMapsToKeep(configVersionsToKeep, collectorConfigMaps)
for _, configMap := range configMapsToKeep {
delete(ownedObjects, configMap.GetUID())
}

return ownedObjects, nil
Expand All @@ -138,7 +132,8 @@ func (r *OpenTelemetryCollectorReconciler) findClusterRoleObjects(ctx context.Co
// Remove cluster roles and bindings.
// Users might switch off the RBAC creation feature on the operator which should remove existing RBAC.
listOpsCluster := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(manifestutils.SelectorLabels(params.OtelCol.ObjectMeta, collector.ComponentOpenTelemetryCollector)),
LabelSelector: labels.SelectorFromSet(
manifestutils.SelectorLabels(params.OtelCol.ObjectMeta, collector.ComponentOpenTelemetryCollector)),
}
for _, objectType := range ownedClusterObjectTypes {
objs, err := getList(ctx, r, objectType, listOpsCluster)
Expand All @@ -152,25 +147,21 @@ func (r *OpenTelemetryCollectorReconciler) findClusterRoleObjects(ctx context.Co
return ownedObjects, nil
}

// getConfigMapsToRemove returns a list of ConfigMaps to remove based on the number of ConfigMaps to keep.
// It keeps the newest ConfigMap, the `configVersionsToKeep` next newest ConfigMaps, and returns the remainder.
func (r *OpenTelemetryCollectorReconciler) getConfigMapsToRemove(configVersionsToKeep int, configMapList *corev1.ConfigMapList) []corev1.ConfigMap {
// getCollectorConfigMapsToKeep gets ConfigMaps the controller would normally delete, but which we want to keep around
// anyway. This is part of a feature to keep around previous ConfigMap versions to make rollbacks easier.
// Fundamentally, this just sorts by time created and picks configVersionsToKeep latest ones.
func getCollectorConfigMapsToKeep(configVersionsToKeep int, configMaps []*corev1.ConfigMap) []*corev1.ConfigMap {
configVersionsToKeep = max(1, configVersionsToKeep)
ownedConfigMaps := []corev1.ConfigMap{}
sort.Slice(configMapList.Items, func(i, j int) bool {
iTime := configMapList.Items[i].GetCreationTimestamp().Time
jTime := configMapList.Items[j].GetCreationTimestamp().Time
sort.Slice(configMaps, func(i, j int) bool {
iTime := configMaps[i].GetCreationTimestamp().Time
jTime := configMaps[j].GetCreationTimestamp().Time
// sort the ConfigMaps newest to oldest
return iTime.After(jTime)
})

for i := range configMapList.Items {
if i > configVersionsToKeep {
ownedConfigMaps = append(ownedConfigMaps, configMapList.Items[i])
}
}

return ownedConfigMaps
configMapsToKeep := min(configVersionsToKeep, len(configMaps))
// return the first configVersionsToKeep items
return configMaps[:configMapsToKeep]
}

func (r *OpenTelemetryCollectorReconciler) GetParams(ctx context.Context, instance v1beta1.OpenTelemetryCollector) (manifests.Params, error) {
Expand Down Expand Up @@ -310,32 +301,74 @@ func (r *OpenTelemetryCollectorReconciler) Reconcile(ctx context.Context, req ct

// SetupWithManager tells the manager what our controller is interested in.
func (r *OpenTelemetryCollectorReconciler) SetupWithManager(mgr ctrl.Manager) error {
err := r.SetupCaches(mgr)
if err != nil {
return err
}

ownedResources := r.GetOwnedResourceTypes()
builder := ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.OpenTelemetryCollector{}).
Owns(&corev1.ConfigMap{}).
Owns(&corev1.ServiceAccount{}).
Owns(&corev1.Service{}).
Owns(&appsv1.Deployment{}).
Owns(&appsv1.DaemonSet{}).
Owns(&appsv1.StatefulSet{}).
Owns(&networkingv1.Ingress{}).
Owns(&autoscalingv2.HorizontalPodAutoscaler{}).
Owns(&policyV1.PodDisruptionBudget{})
For(&v1beta1.OpenTelemetryCollector{})

for _, resource := range ownedResources {
builder.Owns(resource)
}

return builder.Complete(r)
}

// SetupCaches sets up caching and indexing for our controller.
func (r *OpenTelemetryCollectorReconciler) SetupCaches(cluster cluster.Cluster) error {
ownedResources := r.GetOwnedResourceTypes()
for _, resource := range ownedResources {
if err := cluster.GetCache().IndexField(context.Background(), resource, resourceOwnerKey, func(rawObj client.Object) []string {
owner := metav1.GetControllerOf(rawObj)
if owner == nil {
return nil
}
// make sure it's an OpenTelemetryCollector
if owner.Kind != "OpenTelemetryCollector" {
return nil
}

return []string{owner.Name}
}); err != nil {
return err
}
}
return nil
}

// GetOwnedResourceTypes returns all the resource types the controller can own. Even though this method returns an array
// of client.Object, these are (empty) example structs rather than actual resources.
func (r *OpenTelemetryCollectorReconciler) GetOwnedResourceTypes() []client.Object {
ownedResources := []client.Object{
&corev1.ConfigMap{},
&corev1.ServiceAccount{},
&corev1.Service{},
&appsv1.Deployment{},
&appsv1.DaemonSet{},
&appsv1.StatefulSet{},
&networkingv1.Ingress{},
&autoscalingv2.HorizontalPodAutoscaler{},
&policyV1.PodDisruptionBudget{},
}

if r.config.CreateRBACPermissions() == rbac.Available {
builder.Owns(&rbacv1.ClusterRoleBinding{})
builder.Owns(&rbacv1.ClusterRole{})
ownedResources = append(ownedResources, &rbacv1.ClusterRole{})
ownedResources = append(ownedResources, &rbacv1.ClusterRoleBinding{})
}

if featuregate.PrometheusOperatorIsAvailable.IsEnabled() && r.config.PrometheusCRAvailability() == prometheus.Available {
builder.Owns(&monitoringv1.ServiceMonitor{})
builder.Owns(&monitoringv1.PodMonitor{})
ownedResources = append(ownedResources, &monitoringv1.PodMonitor{})
ownedResources = append(ownedResources, &monitoringv1.ServiceMonitor{})
}

if r.config.OpenShiftRoutesAvailability() == openshift.RoutesAvailable {
builder.Owns(&routev1.Route{})
ownedResources = append(ownedResources, &routev1.Route{})
}

return builder.Complete(r)
return ownedResources
}

const collectorFinalizer = "opentelemetrycollector.opentelemetry.io/finalizer"
Expand Down
78 changes: 78 additions & 0 deletions controllers/opentelemetrycollector_reconciler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controllers

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestGetCollectorConfigMapsToKeep(t *testing.T) {
now := time.Now()
testCases := []struct {
name string
versionsToKeep int
input []*corev1.ConfigMap
output []*corev1.ConfigMap
}{
{
name: "no configmaps",
input: []*corev1.ConfigMap{},
output: []*corev1.ConfigMap{},
},
{
name: "one configmap",
input: []*corev1.ConfigMap{
{},
},
output: []*corev1.ConfigMap{
{},
},
},
{
name: "two configmaps, keep one",
input: []*corev1.ConfigMap{
{ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now}}},
{ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now.Add(time.Second)}}},
},
output: []*corev1.ConfigMap{
{ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now.Add(time.Second)}}},
},
},
{
name: "three configmaps, keep two",
versionsToKeep: 2,
input: []*corev1.ConfigMap{
{ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now}}},
{ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now.Add(time.Second)}}},
{ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now.Add(time.Minute)}}},
},
output: []*corev1.ConfigMap{
{ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now.Add(time.Minute)}}},
{ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now.Add(time.Second)}}},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualOutput := getCollectorConfigMapsToKeep(tc.versionsToKeep, tc.input)
assert.Equal(t, tc.output, actualOutput)
})
}
}
Loading

0 comments on commit 5eefae8

Please sign in to comment.