diff --git a/apis/v1alpha1/opentelemetrycollector_types.go b/apis/v1alpha1/opentelemetrycollector_types.go index 1b47654e8a..6f62b62b73 100644 --- a/apis/v1alpha1/opentelemetrycollector_types.go +++ b/apis/v1alpha1/opentelemetrycollector_types.go @@ -37,6 +37,10 @@ type OpenTelemetryCollectorSpec struct { // +optional Replicas *int32 `json:"replicas,omitempty"` + // MaxReplicas sets an upper bound to the autoscaling feature. If MaxReplicas is set autoscaling is enabled. + // +optional + MaxReplicas *int32 `json:"maxReplicas,omitempty"` + // ImagePullPolicy indicates the pull policy to be used for retrieving the container image (Always, Never, IfNotPresent) // +optional ImagePullPolicy v1.PullPolicy `json:"imagePullPolicy,omitempty"` diff --git a/apis/v1alpha1/opentelemetrycollector_webhook.go b/apis/v1alpha1/opentelemetrycollector_webhook.go index a531b059dc..56cbec4ed6 100644 --- a/apis/v1alpha1/opentelemetrycollector_webhook.go +++ b/apis/v1alpha1/opentelemetrycollector_webhook.go @@ -109,5 +109,16 @@ func (r *OpenTelemetryCollector) validateCRDSpec() error { } } + // validate autoscale with horizontal pod autoscaler + if r.Spec.MaxReplicas != nil { + if *r.Spec.MaxReplicas < int32(1) { + return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, maxReplicas should be defined and more than one") + } + + if r.Spec.Replicas != nil && *r.Spec.Replicas > *r.Spec.MaxReplicas { + return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, replicas must not be greater than maxReplicas") + } + } + return nil } diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index 5ea87e5479..e2d1070709 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -264,6 +264,11 @@ func (in *OpenTelemetryCollectorSpec) DeepCopyInto(out *OpenTelemetryCollectorSp *out = new(int32) **out = **in } + if in.MaxReplicas != nil { + in, out := &in.MaxReplicas, &out.MaxReplicas + *out = new(int32) + **out = **in + } out.TargetAllocator = in.TargetAllocator if in.SecurityContext != nil { in, out := &in.SecurityContext, &out.SecurityContext diff --git a/bundle/manifests/opentelemetry-operator.clusterserviceversion.yaml b/bundle/manifests/opentelemetry-operator.clusterserviceversion.yaml index 989d82cd86..cd2852ab3c 100644 --- a/bundle/manifests/opentelemetry-operator.clusterserviceversion.yaml +++ b/bundle/manifests/opentelemetry-operator.clusterserviceversion.yaml @@ -188,6 +188,18 @@ spec: - patch - update - watch + - apiGroups: + - autoscaling + resources: + - horizontalpodautoscalers + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - coordination.k8s.io resources: diff --git a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml index 755c8f754b..90c6203964 100644 --- a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml +++ b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml @@ -218,6 +218,11 @@ spec: description: ImagePullPolicy indicates the pull policy to be used for retrieving the container image (Always, Never, IfNotPresent) type: string + maxReplicas: + description: MaxReplicas sets an upper bound to the autoscaling feature. + If MaxReplicas is set autoscaling is enabled. + format: int32 + type: integer mode: description: Mode represents how the collector should be deployed (deployment, daemonset, statefulset or sidecar) diff --git a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml index 7cb8fdc8ad..733ac90a83 100644 --- a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml +++ b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml @@ -216,6 +216,11 @@ spec: description: ImagePullPolicy indicates the pull policy to be used for retrieving the container image (Always, Never, IfNotPresent) type: string + maxReplicas: + description: MaxReplicas sets an upper bound to the autoscaling feature. + If MaxReplicas is set autoscaling is enabled. + format: int32 + type: integer mode: description: Mode represents how the collector should be deployed (deployment, daemonset, statefulset or sidecar) diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index a3f7f86e73..a321ff7891 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -99,6 +99,18 @@ rules: - patch - update - watch +- apiGroups: + - autoscaling + resources: + - horizontalpodautoscalers + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - coordination.k8s.io resources: diff --git a/controllers/opentelemetrycollector_controller.go b/controllers/opentelemetrycollector_controller.go index 5beb60f8f9..5c5f799c55 100644 --- a/controllers/opentelemetrycollector_controller.go +++ b/controllers/opentelemetrycollector_controller.go @@ -21,6 +21,7 @@ import ( "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" + autoscalingv1 "k8s.io/api/autoscaling/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -84,6 +85,11 @@ func NewReconciler(p Params) *OpenTelemetryCollectorReconciler { reconcile.Deployments, true, }, + { + "horizontal pod autoscalers", + reconcile.HorizontalPodAutoscalers, + true, + }, { "daemon sets", reconcile.DaemonSets, @@ -173,6 +179,7 @@ func (r *OpenTelemetryCollectorReconciler) SetupWithManager(mgr ctrl.Manager) er Owns(&corev1.ServiceAccount{}). Owns(&corev1.Service{}). Owns(&appsv1.Deployment{}). + Owns(&autoscalingv1.HorizontalPodAutoscaler{}). Owns(&appsv1.DaemonSet{}). Owns(&appsv1.StatefulSet{}). Complete(r) diff --git a/docs/api.md b/docs/api.md index 493daab7fd..7bcf038609 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1446,6 +1446,15 @@ OpenTelemetryCollectorSpec defines the desired state of OpenTelemetryCollector. ImagePullPolicy indicates the pull policy to be used for retrieving the container image (Always, Never, IfNotPresent)
false + + maxReplicas + integer + + MaxReplicas sets an upper bound to the autoscaling feature. If MaxReplicas is set autoscaling is enabled.
+
+ Format: int32
+ + false mode enum diff --git a/pkg/collector/horizontalpodautoscaler.go b/pkg/collector/horizontalpodautoscaler.go new file mode 100644 index 0000000000..e559d7da24 --- /dev/null +++ b/pkg/collector/horizontalpodautoscaler.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "github.com/go-logr/logr" + autoscalingv1 "k8s.io/api/autoscaling/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" + "github.com/open-telemetry/opentelemetry-operator/internal/config" + "github.com/open-telemetry/opentelemetry-operator/pkg/naming" +) + +const defaultCPUTarget int32 = 90 + +func HorizontalPodAutoscaler(cfg config.Config, logger logr.Logger, otelcol v1alpha1.OpenTelemetryCollector) autoscalingv1.HorizontalPodAutoscaler { + labels := Labels(otelcol) + labels["app.kubernetes.io/name"] = naming.Collector(otelcol) + + annotations := Annotations(otelcol) + cpuTarget := defaultCPUTarget + + return autoscalingv1.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Name: naming.Collector(otelcol), + Namespace: otelcol.Namespace, + Labels: labels, + Annotations: annotations, + }, + Spec: autoscalingv1.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: naming.Collector(otelcol), + }, + MinReplicas: otelcol.Spec.Replicas, + MaxReplicas: *otelcol.Spec.MaxReplicas, + TargetCPUUtilizationPercentage: &cpuTarget, + }, + } +} diff --git a/pkg/collector/horizontalpodautoscaler_test.go b/pkg/collector/horizontalpodautoscaler_test.go new file mode 100644 index 0000000000..b168816549 --- /dev/null +++ b/pkg/collector/horizontalpodautoscaler_test.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" + "github.com/open-telemetry/opentelemetry-operator/internal/config" + . "github.com/open-telemetry/opentelemetry-operator/pkg/collector" +) + +func TestHPA(t *testing.T) { + // prepare + var minReplicas int32 = 3 + var maxReplicas int32 = 5 + + otelcol := v1alpha1.OpenTelemetryCollector{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-instance", + }, + Spec: v1alpha1.OpenTelemetryCollectorSpec{ + Replicas: &minReplicas, + MaxReplicas: &maxReplicas, + }, + } + + cfg := config.New() + hpa := HorizontalPodAutoscaler(cfg, logger, otelcol) + + // verify + assert.Equal(t, "my-instance-collector", hpa.Name) + assert.Equal(t, "my-instance-collector", hpa.Labels["app.kubernetes.io/name"]) + assert.Equal(t, int32(3), *hpa.Spec.MinReplicas) + assert.Equal(t, int32(5), hpa.Spec.MaxReplicas) + assert.Equal(t, int32(90), *hpa.Spec.TargetCPUUtilizationPercentage) +} diff --git a/pkg/collector/reconcile/deployment.go b/pkg/collector/reconcile/deployment.go index 8176390948..86bc1ecc2c 100644 --- a/pkg/collector/reconcile/deployment.go +++ b/pkg/collector/reconcile/deployment.go @@ -98,6 +98,17 @@ func expectedDeployments(ctx context.Context, params Params, expected []appsv1.D updated.ObjectMeta.Labels[k] = v } + // if autoscale is enabled, use replicas from current Status + if params.Instance.Spec.MaxReplicas != nil { + currentReplicas := existing.Status.Replicas + // if replicas (minReplicas from HPA perspective) is bigger than + // current status use it. + if *params.Instance.Spec.Replicas > currentReplicas { + currentReplicas = *params.Instance.Spec.Replicas + } + updated.Spec.Replicas = ¤tReplicas + } + patch := client.MergeFrom(existing) if err := params.Client.Patch(ctx, updated, patch); err != nil { diff --git a/pkg/collector/reconcile/horizontalpodautoscaler.go b/pkg/collector/reconcile/horizontalpodautoscaler.go new file mode 100644 index 0000000000..b7e2c7df4f --- /dev/null +++ b/pkg/collector/reconcile/horizontalpodautoscaler.go @@ -0,0 +1,140 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reconcile + +import ( + "context" + "fmt" + + autoscalingv1 "k8s.io/api/autoscaling/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + "github.com/open-telemetry/opentelemetry-operator/pkg/collector" +) + +// +kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;create;update;patch;delete + +// HorizontalPodAutoscaler reconciles HorizontalPodAutoscalers if autoscale is true and replicas is nil. +func HorizontalPodAutoscalers(ctx context.Context, params Params) error { + desired := []autoscalingv1.HorizontalPodAutoscaler{} + + // check if autoscale mode is on, e.g MaxReplicas is not nil + if params.Instance.Spec.MaxReplicas != nil { + desired = append(desired, collector.HorizontalPodAutoscaler(params.Config, params.Log, params.Instance)) + } + + // first, handle the create/update parts + if err := expectedHorizontalPodAutoscalers(ctx, params, desired); err != nil { + return fmt.Errorf("failed to reconcile the expected horizontal pod autoscalers: %w", err) + } + + // then, delete the extra objects + if err := deleteHorizontalPodAutoscalers(ctx, params, desired); err != nil { + return fmt.Errorf("failed to reconcile the horizontal pod autoscalers: %w", err) + } + + return nil +} + +func expectedHorizontalPodAutoscalers(ctx context.Context, params Params, expected []autoscalingv1.HorizontalPodAutoscaler) error { + for _, obj := range expected { + desired := obj + + if err := controllerutil.SetControllerReference(¶ms.Instance, &desired, params.Scheme); err != nil { + return fmt.Errorf("failed to set controller reference: %w", err) + } + + existing := &autoscalingv1.HorizontalPodAutoscaler{} + nns := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name} + err := params.Client.Get(ctx, nns, existing) + if k8serrors.IsNotFound(err) { + if err := params.Client.Create(ctx, &desired); err != nil { + return fmt.Errorf("failed to create: %w", err) + } + params.Log.V(2).Info("created", "hpa.name", desired.Name, "hpa.namespace", desired.Namespace) + continue + } else if err != nil { + return fmt.Errorf("failed to get %w", err) + } + + updated := existing.DeepCopy() + if updated.Annotations == nil { + updated.Annotations = map[string]string{} + } + if updated.Labels == nil { + updated.Labels = map[string]string{} + } + + updated.OwnerReferences = desired.OwnerReferences + updated.Spec.MinReplicas = params.Instance.Spec.Replicas + if params.Instance.Spec.MaxReplicas != nil { + updated.Spec.MaxReplicas = *params.Instance.Spec.MaxReplicas + } + + for k, v := range desired.Annotations { + updated.Annotations[k] = v + } + for k, v := range desired.Labels { + updated.Labels[k] = v + } + + patch := client.MergeFrom(existing) + + if err := params.Client.Patch(ctx, updated, patch); err != nil { + return fmt.Errorf("failed to apply changes: %w", err) + } + + params.Log.V(2).Info("applied", "hpa.name", desired.Name, "hpa.namespace", desired.Namespace) + } + + return nil +} + +func deleteHorizontalPodAutoscalers(ctx context.Context, params Params, expected []autoscalingv1.HorizontalPodAutoscaler) error { + opts := []client.ListOption{ + client.InNamespace(params.Instance.Namespace), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", params.Instance.Namespace, params.Instance.Name), + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }), + } + + list := &autoscalingv1.HorizontalPodAutoscalerList{} + if err := params.Client.List(ctx, list, opts...); err != nil { + return fmt.Errorf("failed to list: %w", err) + } + + for i := range list.Items { + existing := list.Items[i] + del := true + for _, keep := range expected { + if keep.Name == existing.Name && keep.Namespace == existing.Namespace { + del = false + } + } + + if del { + if err := params.Client.Delete(ctx, &existing); err != nil { + return fmt.Errorf("failed to delete: %w", err) + } + params.Log.V(2).Info("deleted", "hpa.name", existing.Name, "hpa.namespace", existing.Namespace) + } + } + + return nil +} diff --git a/pkg/collector/reconcile/horizontalpodautoscaler_test.go b/pkg/collector/reconcile/horizontalpodautoscaler_test.go new file mode 100644 index 0000000000..62b901da37 --- /dev/null +++ b/pkg/collector/reconcile/horizontalpodautoscaler_test.go @@ -0,0 +1,122 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reconcile + +import ( + "context" + "fmt" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/apps/v1" + autoscalingv1 "k8s.io/api/autoscaling/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/tools/record" + + "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" + "github.com/open-telemetry/opentelemetry-operator/internal/config" + "github.com/open-telemetry/opentelemetry-operator/pkg/collector" +) + +func TestExpectedHPA(t *testing.T) { + params := paramsWithHPA() + expectedHPA := collector.HorizontalPodAutoscaler(params.Config, logger, params.Instance) + + t.Run("should create HPA", func(t *testing.T) { + err := expectedHorizontalPodAutoscalers(context.Background(), params, []autoscalingv1.HorizontalPodAutoscaler{expectedHPA}) + assert.NoError(t, err) + + exists, err := populateObjectIfExists(t, &autoscalingv1.HorizontalPodAutoscaler{}, types.NamespacedName{Namespace: "default", Name: "test-collector"}) + assert.NoError(t, err) + assert.True(t, exists) + }) + + t.Run("should update HPA", func(t *testing.T) { + minReplicas := int32(1) + maxReplicas := int32(3) + updateParms := paramsWithHPA() + updateParms.Instance.Spec.Replicas = &minReplicas + updateParms.Instance.Spec.MaxReplicas = &maxReplicas + updatedHPA := collector.HorizontalPodAutoscaler(updateParms.Config, logger, updateParms.Instance) + + createObjectIfNotExists(t, "test-collector", &updatedHPA) + err := expectedHorizontalPodAutoscalers(context.Background(), updateParms, []autoscalingv1.HorizontalPodAutoscaler{updatedHPA}) + assert.NoError(t, err) + + actual := autoscalingv1.HorizontalPodAutoscaler{} + exists, err := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "test-collector"}) + + assert.NoError(t, err) + assert.True(t, exists) + assert.Equal(t, int32(1), *actual.Spec.MinReplicas) + assert.Equal(t, int32(3), actual.Spec.MaxReplicas) + }) + + t.Run("should delete HPA", func(t *testing.T) { + err := deleteHorizontalPodAutoscalers(context.Background(), params, []autoscalingv1.HorizontalPodAutoscaler{expectedHPA}) + assert.NoError(t, err) + + actual := v1.Deployment{} + exists, _ := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "test-collecto"}) + assert.False(t, exists) + }) +} + +func paramsWithHPA() Params { + configYAML, err := ioutil.ReadFile("../testdata/test.yaml") + if err != nil { + fmt.Printf("Error getting yaml file: %v", err) + } + + minReplicas := int32(3) + maxReplicas := int32(5) + + return Params{ + Config: config.New(config.WithCollectorImage(defaultCollectorImage), config.WithTargetAllocatorImage(defaultTaAllocationImage)), + Client: k8sClient, + Instance: v1alpha1.OpenTelemetryCollector{ + TypeMeta: metav1.TypeMeta{ + Kind: "opentelemetry.io", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + UID: instanceUID, + }, + Spec: v1alpha1.OpenTelemetryCollectorSpec{ + Ports: []corev1.ServicePort{{ + Name: "web", + Port: 80, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 80, + }, + NodePort: 0, + }}, + Config: string(configYAML), + Replicas: &minReplicas, + MaxReplicas: &maxReplicas, + }, + }, + Scheme: testScheme, + Log: logger, + Recorder: record.NewFakeRecorder(10), + } +}