diff --git a/go.mod b/go.mod index 705a1d6..ceb904a 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( k8s.io/apimachinery v0.25.0 k8s.io/client-go v0.25.0 k8s.io/klog/v2 v2.70.1 + k8s.io/kubernetes v1.25.0 k8s.io/metrics v0.25.0 k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed sigs.k8s.io/controller-runtime v0.13.0 @@ -28,10 +29,34 @@ require ( replace ( k8s.io/api => k8s.io/api v0.25.0 + k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.25.0 k8s.io/apimachinery => k8s.io/apimachinery v0.25.0 + k8s.io/apiserver => k8s.io/apiserver v0.25.0 + k8s.io/cli-runtime => k8s.io/cli-runtime v0.25.0 k8s.io/client-go => k8s.io/client-go v0.25.0 + k8s.io/cloud-provider => k8s.io/cloud-provider v0.25.0 + k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.25.0 + k8s.io/code-generator => k8s.io/code-generator v0.25.0 + k8s.io/component-base => k8s.io/component-base v0.25.0 + k8s.io/component-helpers => k8s.io/component-helpers v0.25.0 + k8s.io/controller-manager => k8s.io/controller-manager v0.25.0 + k8s.io/cri-api => k8s.io/cri-api v0.25.0 + k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.25.0 k8s.io/klog/v2 => k8s.io/klog/v2 v2.70.1 + k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.25.0 + k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.25.0 + k8s.io/kube-proxy => k8s.io/kube-proxy v0.25.0 + k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.25.0 + k8s.io/kubectl => k8s.io/kubectl v0.25.0 + k8s.io/kubelet => k8s.io/kubelet v0.25.0 + k8s.io/kubernetes => k8s.io/kubernetes v1.25.0 + k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.25.0 k8s.io/metrics => k8s.io/metrics v0.25.0 + k8s.io/mount-utils => k8s.io/mount-utils v0.25.0 + k8s.io/pod-security-admission => k8s.io/pod-security-admission v0.25.0 + k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.25.0 + k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.25.0 + k8s.io/sample-controller => k8s.io/sample-controller v0.25.0 ) require ( @@ -48,6 +73,7 @@ require ( github.com/blang/semver/v4 v4.0.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/docker/distribution v2.8.1+incompatible // indirect github.com/emicklei/go-restful/v3 v3.8.0 // indirect github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect @@ -74,6 +100,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect @@ -96,6 +123,7 @@ require ( k8s.io/apiextensions-apiserver v0.25.0 // indirect k8s.io/apiserver v0.25.0 // indirect k8s.io/component-base v0.25.0 // indirect + k8s.io/component-helpers v0.0.0 // indirect k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect diff --git a/go.sum b/go.sum index f3e843f..d6c4292 100644 --- a/go.sum +++ b/go.sum @@ -103,6 +103,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68= +github.com/docker/distribution v2.8.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/emicklei/go-restful/v3 v3.8.0 h1:eCZ8ulSerjdAiaNpF7GxXIE7ZCMo1moN1qX+S609eVw= github.com/emicklei/go-restful/v3 v3.8.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= @@ -302,6 +304,8 @@ github.com/onsi/ginkgo/v2 v2.1.4 h1:GNapqRSid3zijZ9H77KrgVG4/8KqiyRsxcSxe+7ApXY= github.com/onsi/ginkgo/v2 v2.1.4/go.mod h1:um6tUpWM/cxCK3/FK8BXqEiUMUwRgSM4JXG47RKZmLU= github.com/onsi/gomega v1.20.0 h1:8W0cWlwFkflGPLltQvLRB7ZVD5HuP6ng320w2IS245Q= github.com/onsi/gomega v1.20.0/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -814,10 +818,14 @@ k8s.io/client-go v0.25.0 h1:CVWIaCETLMBNiTUta3d5nzRbXvY5Hy9Dpl+VvREpu5E= k8s.io/client-go v0.25.0/go.mod h1:lxykvypVfKilxhTklov0wz1FoaUZ8X4EwbhS6rpRfN8= k8s.io/component-base v0.25.0 h1:haVKlLkPCFZhkcqB6WCvpVxftrg6+FK5x1ZuaIDaQ5Y= k8s.io/component-base v0.25.0/go.mod h1:F2Sumv9CnbBlqrpdf7rKZTmmd2meJq0HizeyY/yAFxk= +k8s.io/component-helpers v0.25.0 h1:vNzYfqnVXj7f+CPksduKVv2Z9kC+IDsOs9yaOyxZrj0= +k8s.io/component-helpers v0.25.0/go.mod h1:auaFj2bvb5Zmy0mLk4WJNmwP0w4e7Zk+/Tu9FFBGA20= k8s.io/klog/v2 v2.70.1 h1:7aaoSdahviPmR+XkS7FyxlkkXs6tHISSG03RxleQAVQ= k8s.io/klog/v2 v2.70.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 h1:MQ8BAZPZlWk3S9K4a9NCkIFQtZShWqoha7snGixVgEA= k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1/go.mod h1:C/N6wCaBHeBHkHUesQOQy2/MZqGgMAFPqGsGQLdbZBU= +k8s.io/kubernetes v1.25.0 h1:NwTRyLrdXTORd5V7DLlUltxDbl/KZjYDiRgwI+pBYGE= +k8s.io/kubernetes v1.25.0/go.mod h1:UdtILd5Zg1vGZvShiO1EYOqmjzM2kZOG1hzwQnM5JxY= k8s.io/metrics v0.25.0 h1:z/tyqXUCxvmFsKIO7GH6ulvogYvGp+pDmlz5ANSQVPE= k8s.io/metrics v0.25.0/go.mod h1:HZZrbhuRX+fsDcRc3u59o2FbrKhqD67IGnoFECNmovc= k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed h1:jAne/RjBTyawwAy0utX5eqigAwz/lQhTmy+Hr/Cpue4= diff --git a/main.go b/main.go index dbbac33..01834ee 100644 --- a/main.go +++ b/main.go @@ -378,7 +378,7 @@ func initHorizontalPortraitProviders(client client.Client, eventTrigger chan eve func initPortraitGenerators(client client.Client, metricProvider metricprovider.Interface, scaler *scale.Scaler) map[autoscalingv1alpha1.PortraitType]portraitgenerator.Interface { generators := make(map[autoscalingv1alpha1.PortraitType]portraitgenerator.Interface) - generators[autoscalingv1alpha1.ReactivePortraitType] = reactive.NewPortraitGenerator(client, metricProvider, scaler) + generators[autoscalingv1alpha1.ReactivePortraitType] = reactive.NewPortraitGenerator(metricProvider, util.NewCtrlPodLister(client), scaler) return generators } diff --git a/pkg/portrait/generator/reactive/generator.go b/pkg/portrait/generator/reactive/generator.go index 710346b..60d482b 100644 --- a/pkg/portrait/generator/reactive/generator.go +++ b/pkg/portrait/generator/reactive/generator.go @@ -1,5 +1,6 @@ /* Copyright 2023 The Kapacity Authors. + Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -23,7 +24,12 @@ import ( "time" k8sautoscalingv2 "k8s.io/api/autoscaling/v2" - "sigs.k8s.io/controller-runtime/pkg/client" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/kubernetes/pkg/controller/podautoscaler" + podautoscalermetrics "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" "sigs.k8s.io/controller-runtime/pkg/log" autoscalingv1alpha1 "github.com/traas-stack/kapacity/apis/autoscaling/v1alpha1" @@ -42,17 +48,17 @@ const ( // PortraitGenerator generates portraits reactively based on latest metrics. type PortraitGenerator struct { - client client.Client - metricProvider metricprovider.Interface - scaler *pkgscale.Scaler + metricsClient podautoscalermetrics.MetricsClient + podLister corev1listers.PodLister + scaler *pkgscale.Scaler } // NewPortraitGenerator creates a new PortraitGenerator with the given Kubernetes client, metrics provider and scaler. -func NewPortraitGenerator(client client.Client, metricProvider metricprovider.Interface, scaler *pkgscale.Scaler) portraitgenerator.Interface { +func NewPortraitGenerator(metricProvider metricprovider.Interface, podLister corev1listers.PodLister, scaler *pkgscale.Scaler) portraitgenerator.Interface { return &PortraitGenerator{ - client: client, - metricProvider: metricProvider, - scaler: scaler, + metricsClient: NewMetricsClient(metricProvider), + podLister: podLister, + scaler: scaler, } } @@ -86,19 +92,12 @@ func (g *PortraitGenerator) GenerateHorizontal(ctx context.Context, namespace st return nil, 0, fmt.Errorf("failed to get the target's current scale: %v", err) } specReplicas := scale.Spec.Replicas + statusReplicas := scale.Status.Replicas selector, err := util.ParseScaleSelector(scale.Status.Selector) if err != nil { return nil, 0, fmt.Errorf("failed to parse label selector %q of target's current scale: %v", scale.Status.Selector, err) } - replicaCalc := &replicaCalculator{ - Client: g.client, - MetricsClient: metricsClient{ - MetricProvider: g.metricProvider, - }, - Tolerance: tolerance, - CPUInitializationPeriod: cpuInitializationPeriod, - DelayOfInitialReadinessStatus: delayOfInitialReadinessStatus, - } + replicaCalc := podautoscaler.NewReplicaCalculator(g.metricsClient, g.podLister, tolerance, cpuInitializationPeriod, delayOfInitialReadinessStatus) var ( replicas int32 = 1 @@ -106,7 +105,7 @@ func (g *PortraitGenerator) GenerateHorizontal(ctx context.Context, namespace st invalidMetricError error ) for _, metric := range metrics { - replicasProposal, err := replicaCalc.ComputeReplicasForMetric(ctx, specReplicas, metric, namespace, selector) + replicasProposal, err := computeReplicasForMetric(ctx, replicaCalc, specReplicas, statusReplicas, metric, namespace, selector) if err != nil { l.Error(err, "failed to compute replicas for metric") if invalidMetricsCount == 0 { @@ -130,3 +129,101 @@ func (g *PortraitGenerator) GenerateHorizontal(ctx context.Context, namespace st }, }, syncPeriod, nil } + +// computeReplicasForMetric computes the desired number of replicas for the specified metric. +func computeReplicasForMetric(ctx context.Context, replicaCalc *podautoscaler.ReplicaCalculator, specReplicas, statusReplicas int32, metric autoscalingv1alpha1.MetricSpec, namespace string, selector labels.Selector) (int32, error) { + switch metric.Type { + case k8sautoscalingv2.ResourceMetricSourceType: + return computeReplicasForResourceMetric(ctx, replicaCalc, specReplicas, metric.Resource.Target, metric.Resource.Name, namespace, "", selector) + case k8sautoscalingv2.ContainerResourceMetricSourceType: + return computeReplicasForResourceMetric(ctx, replicaCalc, specReplicas, metric.ContainerResource.Target, metric.ContainerResource.Name, namespace, metric.ContainerResource.Container, selector) + case k8sautoscalingv2.PodsMetricSourceType: + return computeReplicasForPodsMetric(replicaCalc, specReplicas, metric.Pods.Target, metric.Pods.Metric, namespace, selector) + case k8sautoscalingv2.ObjectMetricSourceType: + return computeReplicasForObjectMetric(replicaCalc, specReplicas, statusReplicas, metric.Object.Target, metric.Object.Metric, metric.Object.DescribedObject, namespace, selector) + case k8sautoscalingv2.ExternalMetricSourceType: + return computeReplicasForExternalMetric(replicaCalc, specReplicas, statusReplicas, metric.External.Target, metric.External.Metric, namespace, selector) + default: + return 0, fmt.Errorf("unsupported metric source type %q", metric.Type) + } +} + +// computeReplicasForResourceMetric computes the desired number of replicas for the specified metric of type (Container)ResourceMetricSourceType. +func computeReplicasForResourceMetric(ctx context.Context, replicaCalc *podautoscaler.ReplicaCalculator, currentReplicas int32, target k8sautoscalingv2.MetricTarget, + resourceName corev1.ResourceName, namespace string, container string, selector labels.Selector) (int32, error) { + if target.AverageValue != nil { + replicaCountProposal, _, _, err := replicaCalc.GetRawResourceReplicas(ctx, currentReplicas, target.AverageValue.MilliValue(), resourceName, namespace, selector, container) + if err != nil { + return 0, fmt.Errorf("failed to get %s usage: %v", resourceName, err) + } + return replicaCountProposal, nil + } + + if target.AverageUtilization == nil { + return 0, fmt.Errorf("invalid resource metric source: neither an average utilization target nor an average value (usage) target was set") + } + + targetUtilization := *target.AverageUtilization + replicaCountProposal, _, _, _, err := replicaCalc.GetResourceReplicas(ctx, currentReplicas, targetUtilization, resourceName, namespace, selector, container) + if err != nil { + return 0, fmt.Errorf("failed to get %s utilization: %v", resourceName, err) + } + return replicaCountProposal, nil +} + +// computeReplicasForPodsMetric computes the desired number of replicas for the specified metric of type PodsMetricSourceType. +func computeReplicasForPodsMetric(replicaCalc *podautoscaler.ReplicaCalculator, currentReplicas int32, target k8sautoscalingv2.MetricTarget, + metric k8sautoscalingv2.MetricIdentifier, namespace string, selector labels.Selector) (int32, error) { + metricSelector, err := metav1.LabelSelectorAsSelector(metric.Selector) + if err != nil { + return 0, fmt.Errorf("failed to parse metric selector as label selector: %v", err) + } + + replicaCountProposal, _, _, err := replicaCalc.GetMetricReplicas(currentReplicas, target.AverageValue.MilliValue(), metric.Name, namespace, selector, metricSelector) + if err != nil { + return 0, fmt.Errorf("failed to get pods metric: %v", err) + } + return replicaCountProposal, nil +} + +// computeReplicasForObjectMetric computes the desired number of replicas for the specified metric of type ObjectMetricSourceType. +func computeReplicasForObjectMetric(replicaCalc *podautoscaler.ReplicaCalculator, specReplicas, statusReplicas int32, target k8sautoscalingv2.MetricTarget, + metric k8sautoscalingv2.MetricIdentifier, describedObject k8sautoscalingv2.CrossVersionObjectReference, namespace string, selector labels.Selector) (int32, error) { + metricSelector, err := metav1.LabelSelectorAsSelector(metric.Selector) + if err != nil { + return 0, fmt.Errorf("failed to parse metric selector as label selector: %v", err) + } + + var replicaCountProposal int32 + if target.Type == k8sautoscalingv2.ValueMetricType { + replicaCountProposal, _, _, err = replicaCalc.GetObjectMetricReplicas(specReplicas, target.Value.MilliValue(), metric.Name, namespace, &describedObject, selector, metricSelector) + } else if target.Type == k8sautoscalingv2.AverageValueMetricType { + replicaCountProposal, _, _, err = replicaCalc.GetObjectPerPodMetricReplicas(statusReplicas, target.AverageValue.MilliValue(), metric.Name, namespace, &describedObject, metricSelector) + } else { + return 0, fmt.Errorf("invalid object metric source: neither a value target nor an average value target was set") + } + if err != nil { + return 0, fmt.Errorf("failed to get object metric: %v", err) + } + return replicaCountProposal, nil +} + +// computeReplicasForExternalMetric computes the desired number of replicas for the specified metric of type ExternalMetricSourceType. +func computeReplicasForExternalMetric(replicaCalc *podautoscaler.ReplicaCalculator, specReplicas, statusReplicas int32, target k8sautoscalingv2.MetricTarget, + metric k8sautoscalingv2.MetricIdentifier, namespace string, selector labels.Selector) (int32, error) { + var ( + replicaCountProposal int32 + err error + ) + if target.AverageValue != nil { + replicaCountProposal, _, _, err = replicaCalc.GetExternalPerPodMetricReplicas(statusReplicas, target.AverageValue.MilliValue(), metric.Name, namespace, metric.Selector) + } else if target.Value != nil { + replicaCountProposal, _, _, err = replicaCalc.GetExternalMetricReplicas(specReplicas, target.Value.MilliValue(), metric.Name, namespace, metric.Selector, selector) + } else { + return 0, fmt.Errorf("invalid external metric source: neither a value target nor an average value target was set") + } + if err != nil { + return 0, fmt.Errorf("failed to get external metric: %v", err) + } + return replicaCountProposal, nil +} diff --git a/pkg/portrait/generator/reactive/generator_test.go b/pkg/portrait/generator/reactive/generator_test.go index 6b02ffb..8d776c0 100644 --- a/pkg/portrait/generator/reactive/generator_test.go +++ b/pkg/portrait/generator/reactive/generator_test.go @@ -46,6 +46,7 @@ import ( autoscalingv1alpha1 "github.com/traas-stack/kapacity/apis/autoscaling/v1alpha1" "github.com/traas-stack/kapacity/pkg/metric/provider/metricsapi" pkgscale "github.com/traas-stack/kapacity/pkg/scale" + "github.com/traas-stack/kapacity/pkg/util" ) var ( @@ -72,6 +73,7 @@ func TestGenerateHorizontal(t *testing.T) { } scaleTargetRef := k8sautoscalingv2.CrossVersionObjectReference{Kind: "Deployment", APIVersion: "apps/v1beta2", Name: "foo"} fakeClient := fake.NewClientBuilder().WithObjects(preparePod(1)).Build() + fakePodLister := util.NewCtrlPodLister(fakeClient) algorithm := autoscalingv1alpha1.PortraitAlgorithm{Type: autoscalingv1alpha1.KubeHPAPortraitAlgorithmType} metricSpecs := prepareMetricSpec(corev1.ResourceCPU, 30) fakeScaler := prepareScaleClient(t) @@ -81,7 +83,7 @@ func TestGenerateHorizontal(t *testing.T) { prepareFakeMetricsClient(testCase.resourceName, testCase.podMetricsMap, testCase.timestamp).MetricsV1beta1(), ) - portraitGenerator := NewPortraitGenerator(fakeClient, fakeMetricProvider, fakeScaler) + portraitGenerator := NewPortraitGenerator(fakeMetricProvider, fakePodLister, fakeScaler) assert.NotNil(t, portraitGenerator) portraitData, _, _ := portraitGenerator.GenerateHorizontal(context.Background(), testNamespace, scaleTargetRef, metricSpecs, algorithm) diff --git a/pkg/portrait/generator/reactive/metrics.go b/pkg/portrait/generator/reactive/metrics.go deleted file mode 100644 index 7479220..0000000 --- a/pkg/portrait/generator/reactive/metrics.go +++ /dev/null @@ -1,168 +0,0 @@ -/* - Copyright 2023 The Kapacity Authors. - Copyright 2017 The Kubernetes Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package reactive - -import ( - "context" - "fmt" - "time" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/labels" - "sigs.k8s.io/controller-runtime/pkg/log" - - "github.com/traas-stack/kapacity/pkg/metric" - metricprovider "github.com/traas-stack/kapacity/pkg/metric/provider" -) - -// podMetric contains pod metric value (the metric values are expected to be the metric as a milli-value). -type podMetric struct { - Timestamp time.Time - Window time.Duration - Value int64 -} - -// podMetricsInfo contains pod metrics as a map from pod names to podMetricsInfo. -type podMetricsInfo map[string]*podMetric - -// metricsClient knows how to query a remote interface to retrieve container-level -// resource metrics as well as pod-level arbitrary metrics. -type metricsClient struct { - MetricProvider metricprovider.Interface -} - -// GetResourceMetric gets the given resource metric for all pods matching the specified selector in the given namespace. -func (c *metricsClient) GetResourceMetric(ctx context.Context, resource corev1.ResourceName, namespace string, selector labels.Selector, container string) (podMetricsInfo, error) { - switch resource { - case corev1.ResourceCPU: - case corev1.ResourceMemory: - default: - return nil, fmt.Errorf("unsupported resource %q", resource) - } - - prq := &metric.PodResourceQuery{ - Namespace: namespace, - Selector: selector, - ResourceName: resource, - } - if container != "" { - metrics, err := c.MetricProvider.QueryLatest(ctx, &metric.Query{ - Type: metric.ContainerResourceQueryType, - ContainerResource: &metric.ContainerResourceQuery{ - PodResourceQuery: *prq, - ContainerName: container, - }, - }) - if err != nil { - return nil, fmt.Errorf("failed to query lastest container resource metrics: %v", err) - } - if len(metrics) == 0 { - return nil, fmt.Errorf("no metrics returned from lastest container resource metrics query") - } - return buildPodMetricsInfoFromSamples(ctx, metrics, resource), nil - } else { - metrics, err := c.MetricProvider.QueryLatest(ctx, &metric.Query{ - Type: metric.PodResourceQueryType, - PodResource: prq, - }) - if err != nil { - return nil, fmt.Errorf("failed to query lastest pod resource metrics: %v", err) - } - if len(metrics) == 0 { - return nil, fmt.Errorf("no metrics returned from lastest pod resource metrics query") - } - return buildPodMetricsInfoFromSamples(ctx, metrics, resource), nil - } -} - -func buildPodMetricsInfoFromSamples(ctx context.Context, samples []*metric.Sample, resource corev1.ResourceName) podMetricsInfo { - l := log.FromContext(ctx) - res := make(podMetricsInfo, len(samples)) - for _, s := range samples { - podName := string(s.Labels[metric.LabelPodName]) - if podName == "" { - l.Info("met invalid metric sample without pod name label", "sample", s) - continue - } - if s.Window == nil { - l.Info("met invalid metric sample without window", "sample", s) - continue - } - res[podName] = &podMetric{ - Timestamp: s.Timestamp.Time(), - Window: *s.Window, - Value: getResourceQuantityFromRawValue(resource, s.Value).MilliValue(), - } - } - return res -} - -func getResourceQuantityFromRawValue(name corev1.ResourceName, v float64) *resource.Quantity { - switch name { - case corev1.ResourceCPU: - return resource.NewMilliQuantity(int64(v*1000.0), resource.DecimalSI) - case corev1.ResourceMemory: - return resource.NewMilliQuantity(int64(v*1000.0), resource.BinarySI) - default: - return nil - } -} - -// getResourceUtilizationRatio takes in a set of metrics, a set of matching requests, -// and a target utilization percentage, and calculates the ratio of desired to actual utilization. -func getResourceUtilizationRatio(metrics podMetricsInfo, requests map[string]int64, targetUtilization int32) (utilizationRatio float64, err error) { - metricsTotal := int64(0) - requestsTotal := int64(0) - numEntries := 0 - - for podName, m := range metrics { - request, hasRequest := requests[podName] - if !hasRequest { - // we check for missing requests elsewhere, so assuming missing requests == extraneous metrics - continue - } - - metricsTotal += m.Value - requestsTotal += request - numEntries++ - } - - // if the set of requests is completely disjoint from the set of metrics, - // then we could have an issue where the requests total is zero - if requestsTotal == 0 { - return 0, fmt.Errorf("no metrics returned matched known pods") - } - - currentUtilization := int32((metricsTotal * 100) / requestsTotal) - - return float64(currentUtilization) / float64(targetUtilization), nil -} - -// getMetricUsageRatio takes in a set of metrics and a target usage value, -// and calculates the ratio of desired to actual usage. -func getMetricUsageRatio(metrics podMetricsInfo, targetUsage int64) float64 { - metricsTotal := int64(0) - for _, m := range metrics { - metricsTotal += m.Value - } - - currentUsage := metricsTotal / int64(len(metrics)) - - return float64(currentUsage) / float64(targetUsage) -} diff --git a/pkg/portrait/generator/reactive/metrics_client.go b/pkg/portrait/generator/reactive/metrics_client.go new file mode 100644 index 0000000..a45c3b1 --- /dev/null +++ b/pkg/portrait/generator/reactive/metrics_client.go @@ -0,0 +1,240 @@ +/* + Copyright 2023 The Kapacity Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package reactive + +import ( + "context" + "fmt" + "math" + "time" + + k8sautoscalingv2 "k8s.io/api/autoscaling/v2" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + podautoscalermetrics "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/traas-stack/kapacity/pkg/metric" + metricprovider "github.com/traas-stack/kapacity/pkg/metric/provider" +) + +const ( + defaultMetricWindow = time.Minute +) + +// NewMetricsClient creates a podautoscalermetrics.MetricsClient backed by given metric provider. +func NewMetricsClient(metricProvider metricprovider.Interface) podautoscalermetrics.MetricsClient { + return &metricsClient{ + metricProvider: metricProvider, + } +} + +type metricsClient struct { + metricProvider metricprovider.Interface +} + +func (c *metricsClient) GetResourceMetric(ctx context.Context, resource corev1.ResourceName, namespace string, selector labels.Selector, container string) (podautoscalermetrics.PodMetricsInfo, time.Time, error) { + switch resource { + case corev1.ResourceCPU: + case corev1.ResourceMemory: + default: + return nil, time.Time{}, fmt.Errorf("unsupported resource %q", resource) + } + prq := &metric.PodResourceQuery{ + Namespace: namespace, + Selector: selector, + ResourceName: resource, + } + if container != "" { + metrics, err := c.metricProvider.QueryLatest(ctx, &metric.Query{ + Type: metric.ContainerResourceQueryType, + ContainerResource: &metric.ContainerResourceQuery{ + PodResourceQuery: *prq, + ContainerName: container, + }, + }) + if err != nil { + return nil, time.Time{}, fmt.Errorf("failed to query lastest container resource metrics: %v", err) + } + if len(metrics) == 0 { + return nil, time.Time{}, fmt.Errorf("no metrics returned from lastest container resource metrics query") + } + return buildResourcePodMetricsInfoFromSamples(ctx, metrics, resource), metrics[0].Timestamp.Time(), nil + } else { + metrics, err := c.metricProvider.QueryLatest(ctx, &metric.Query{ + Type: metric.PodResourceQueryType, + PodResource: prq, + }) + if err != nil { + return nil, time.Time{}, fmt.Errorf("failed to query lastest pod resource metrics: %v", err) + } + if len(metrics) == 0 { + return nil, time.Time{}, fmt.Errorf("no metrics returned from lastest pod resource metrics query") + } + return buildResourcePodMetricsInfoFromSamples(ctx, metrics, resource), metrics[0].Timestamp.Time(), nil + } +} + +func (c *metricsClient) GetRawMetric(metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (podautoscalermetrics.PodMetricsInfo, time.Time, error) { + metricLabelSelector, err := metav1.ParseToLabelSelector(metricSelector.String()) + if err != nil { + return nil, time.Time{}, fmt.Errorf("failed to parse metric selector %q to label selector: %v", metricSelector, err) + } + metrics, err := c.metricProvider.QueryLatest(context.TODO(), &metric.Query{ + Type: metric.ObjectQueryType, + Object: &metric.ObjectQuery{ + GroupKind: schema.GroupKind{Kind: "Pod"}, + Namespace: namespace, + Selector: selector, + Metric: k8sautoscalingv2.MetricIdentifier{ + Name: metricName, + Selector: metricLabelSelector, + }, + }, + }) + if err != nil { + return nil, time.Time{}, fmt.Errorf("failed to query lastest pod metrics: %v", err) + } + if len(metrics) == 0 { + return nil, time.Time{}, fmt.Errorf("no metrics returned from lastest pod metrics query") + } + return buildPodMetricsInfoFromSamples(metrics), metrics[0].Timestamp.Time(), nil +} + +func (c *metricsClient) GetObjectMetric(metricName string, namespace string, objectRef *k8sautoscalingv2.CrossVersionObjectReference, metricSelector labels.Selector) (int64, time.Time, error) { + gvk := schema.FromAPIVersionAndKind(objectRef.APIVersion, objectRef.Kind) + metricLabelSelector, err := metav1.ParseToLabelSelector(metricSelector.String()) + if err != nil { + return 0, time.Time{}, fmt.Errorf("failed to parse metric selector %q to label selector: %v", metricSelector, err) + } + metrics, err := c.metricProvider.QueryLatest(context.TODO(), &metric.Query{ + Type: metric.ObjectQueryType, + Object: &metric.ObjectQuery{ + GroupKind: schema.GroupKind{ + Group: gvk.Group, + Kind: gvk.Kind, + }, + Namespace: namespace, + Name: objectRef.Name, + Metric: k8sautoscalingv2.MetricIdentifier{ + Name: metricName, + Selector: metricLabelSelector, + }, + }, + }) + if err != nil { + return 0, time.Time{}, fmt.Errorf("failed to query lastest object metrics: %v", err) + } + if len(metrics) == 0 { + return 0, time.Time{}, fmt.Errorf("no metrics returned from lastest object metrics query") + } + return getQuantityFromRawValue(metrics[0].Value).MilliValue(), metrics[0].Timestamp.Time(), nil +} + +func (c *metricsClient) GetExternalMetric(metricName string, namespace string, selector labels.Selector) ([]int64, time.Time, error) { + metricLabelSelector, err := metav1.ParseToLabelSelector(selector.String()) + if err != nil { + return nil, time.Time{}, fmt.Errorf("failed to parse metric selector %q to label selector: %v", selector, err) + } + metrics, err := c.metricProvider.QueryLatest(context.TODO(), &metric.Query{ + Type: metric.ExternalQueryType, + External: &metric.ExternalQuery{ + Namespace: namespace, + Metric: k8sautoscalingv2.MetricIdentifier{ + Name: metricName, + Selector: metricLabelSelector, + }, + }, + }) + if err != nil { + return nil, time.Time{}, fmt.Errorf("failed to query lastest external metrics: %v", err) + } + if len(metrics) == 0 { + return nil, time.Time{}, fmt.Errorf("no metrics returned from lastest external metrics query") + } + res := make([]int64, 0, len(metrics)) + for _, m := range metrics { + res = append(res, getQuantityFromRawValue(m.Value).MilliValue()) + } + return res, metrics[0].Timestamp.Time(), nil +} + +func buildResourcePodMetricsInfoFromSamples(ctx context.Context, samples []*metric.Sample, resource corev1.ResourceName) podautoscalermetrics.PodMetricsInfo { + l := log.FromContext(ctx) + res := make(podautoscalermetrics.PodMetricsInfo, len(samples)) + for _, s := range samples { + podName := string(s.Labels[metric.LabelPodName]) + if podName == "" { + l.Info("met invalid metric sample without pod name label", "sample", s) + continue + } + if s.Window == nil { + l.Info("met invalid metric sample without window", "sample", s) + continue + } + res[podName] = podautoscalermetrics.PodMetric{ + Timestamp: s.Timestamp.Time(), + Window: *s.Window, + Value: getResourceQuantityFromRawValue(resource, s.Value).MilliValue(), + } + } + return res +} + +func buildPodMetricsInfoFromSamples(samples []*metric.Sample) podautoscalermetrics.PodMetricsInfo { + res := make(podautoscalermetrics.PodMetricsInfo, len(samples)) + for _, s := range samples { + podName := string(s.Labels[metric.LabelPodName]) + if podName == "" { + continue + } + window := defaultMetricWindow + if s.Window != nil { + window = *s.Window + } + res[podName] = podautoscalermetrics.PodMetric{ + Timestamp: s.Timestamp.Time(), + Window: window, + Value: getQuantityFromRawValue(s.Value).MilliValue(), + } + } + return res +} + +func getResourceQuantityFromRawValue(name corev1.ResourceName, v float64) *resource.Quantity { + if math.IsNaN(v) || v < 0 { + v = 0 + } + switch name { + case corev1.ResourceCPU: + return resource.NewMilliQuantity(int64(v*1000.0), resource.DecimalSI) + case corev1.ResourceMemory: + return resource.NewMilliQuantity(int64(v*1000.0), resource.BinarySI) + default: + return nil + } +} + +func getQuantityFromRawValue(v float64) *resource.Quantity { + if math.IsNaN(v) { + return resource.NewQuantity(0, resource.DecimalSI) + } + return resource.NewMilliQuantity(int64(v*1000.0), resource.DecimalSI) +} diff --git a/pkg/portrait/generator/reactive/metrics_test.go b/pkg/portrait/generator/reactive/metrics_client_test.go similarity index 88% rename from pkg/portrait/generator/reactive/metrics_test.go rename to pkg/portrait/generator/reactive/metrics_client_test.go index 19df2c4..8eee9be 100644 --- a/pkg/portrait/generator/reactive/metrics_test.go +++ b/pkg/portrait/generator/reactive/metrics_client_test.go @@ -44,7 +44,7 @@ func TestGetResourceMetric_UnsupportedResource(t *testing.T) { metricsClient := metricsClient{} selector, _ := labels.Parse("foo=bar") - _, err := metricsClient.GetResourceMetric(context.Background(), corev1.ResourceStorage, testNamespace, selector, "test-container") + _, _, err := metricsClient.GetResourceMetric(context.Background(), corev1.ResourceStorage, testNamespace, selector, "test-container") assert.NotNil(t, err, "unsupported resource for %s", corev1.ResourceStorage) } @@ -62,14 +62,14 @@ func TestGetResourceMetric(t *testing.T) { for _, testCase := range testCases { fakeMetricsClient := prepareFakeMetricsClient(testCase.resourceName, testCase.podMetricsMap, testCase.timestamp) metricsClient := metricsClient{ - MetricProvider: metricsapi.NewMetricProvider( + metricProvider: metricsapi.NewMetricProvider( fakeMetricsClient.MetricsV1beta1(), ), } selector, _ := labels.Parse("name=test-pod") // pod resources - podMetrics, err := metricsClient.GetResourceMetric(context.Background(), testCase.resourceName, testNamespace, selector, "") + podMetrics, _, err := metricsClient.GetResourceMetric(context.Background(), testCase.resourceName, testNamespace, selector, "") assert.Nil(t, err) for podName, resValues := range testCase.podMetricsMap { @@ -79,7 +79,7 @@ func TestGetResourceMetric(t *testing.T) { // container resources containerName := buildContainerName(podName, index+1) - containerMetrics, err := metricsClient.GetResourceMetric(context.Background(), testCase.resourceName, testNamespace, selector, containerName) + containerMetrics, _, err := metricsClient.GetResourceMetric(context.Background(), testCase.resourceName, testNamespace, selector, containerName) assert.Nil(t, err, "failed to get resource metrics") assert.NotNil(t, containerMetrics, "container metrics not found for %s", containerName) assert.Equal(t, containerValue, containerMetrics[podName].Value, "container metrics not expected for %s", containerName) @@ -92,7 +92,7 @@ func TestGetResourceMetric(t *testing.T) { type metricsTestCase struct { resourceName corev1.ResourceName - //key is pod name, value is container resource metric values + // key is pod name, value is container resource metric values podMetricsMap map[string][]int64 timestamp time.Time } diff --git a/pkg/portrait/generator/reactive/replica_calculator.go b/pkg/portrait/generator/reactive/replica_calculator.go deleted file mode 100644 index 16232f4..0000000 --- a/pkg/portrait/generator/reactive/replica_calculator.go +++ /dev/null @@ -1,322 +0,0 @@ -/* - Copyright 2023 The Kapacity Authors. - Copyright 2016 The Kubernetes Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package reactive - -import ( - "context" - "fmt" - "math" - "time" - - k8sautoscalingv2 "k8s.io/api/autoscaling/v2" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/sets" - "sigs.k8s.io/controller-runtime/pkg/client" - - autoscalingv1alpha1 "github.com/traas-stack/kapacity/apis/autoscaling/v1alpha1" - "github.com/traas-stack/kapacity/pkg/util" -) - -// replicaCalculator bundles all needed information to calculate the target amount of replicas. -type replicaCalculator struct { - Client client.Client - MetricsClient metricsClient - Tolerance float64 - CPUInitializationPeriod time.Duration - DelayOfInitialReadinessStatus time.Duration -} - -// ComputeReplicasForMetric computes the desired number of replicas for a specific metric specification. -func (c *replicaCalculator) ComputeReplicasForMetric(ctx context.Context, currentReplicas int32, metric autoscalingv1alpha1.MetricSpec, namespace string, selector labels.Selector) (int32, error) { - switch metric.Type { - case k8sautoscalingv2.ResourceMetricSourceType: - return c.ComputeReplicasForResourceMetric(ctx, currentReplicas, metric.Resource.Target, metric.Resource.Name, namespace, "", selector) - case k8sautoscalingv2.ContainerResourceMetricSourceType: - return c.ComputeReplicasForResourceMetric(ctx, currentReplicas, metric.ContainerResource.Target, metric.ContainerResource.Name, namespace, metric.ContainerResource.Container, selector) - // TODO(zqzten): support more metric types - default: - return 0, fmt.Errorf("unsupported metric source type %q", metric.Type) - } -} - -// ComputeReplicasForResourceMetric computes the desired number of replicas for a specific resource metric specification. -func (c *replicaCalculator) ComputeReplicasForResourceMetric(ctx context.Context, currentReplicas int32, target k8sautoscalingv2.MetricTarget, - resourceName corev1.ResourceName, namespace string, container string, selector labels.Selector) (int32, error) { - if target.AverageValue != nil { - replicaCountProposal, err := c.getRawResourceReplicas(ctx, currentReplicas, target.AverageValue.MilliValue(), resourceName, namespace, selector, container) - if err != nil { - return 0, fmt.Errorf("failed to get %s usage: %v", resourceName, err) - } - return replicaCountProposal, nil - } - - if target.AverageUtilization == nil { - return 0, fmt.Errorf("invalid resource metric source: neither an average utilization target nor an average value (usage) target was set") - } - - targetUtilization := *target.AverageUtilization - replicaCountProposal, err := c.getResourceReplicas(ctx, currentReplicas, targetUtilization, resourceName, namespace, selector, container) - if err != nil { - return 0, fmt.Errorf("failed to get %s utilization: %v", resourceName, err) - } - return replicaCountProposal, nil -} - -// getResourceReplicas calculates the desired replica count based on a target resource utilization percentage -// of the given resource for pods matching the given selector in the given namespace, and the current replica count. -func (c *replicaCalculator) getResourceReplicas(ctx context.Context, currentReplicas int32, targetUtilization int32, resource corev1.ResourceName, namespace string, selector labels.Selector, container string) (replicaCount int32, err error) { - metrics, err := c.MetricsClient.GetResourceMetric(ctx, resource, namespace, selector, container) - if err != nil { - return 0, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err) - } - - podList := &corev1.PodList{} - if err := c.Client.List(ctx, podList, client.InNamespace(namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil { - return 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err) - } - if len(podList.Items) == 0 { - return 0, fmt.Errorf("no pods returned by selector while calculating replica count") - } - - readyPodCount, unreadyPods, missingPods, ignoredPods := groupPods(podList.Items, metrics, resource, c.CPUInitializationPeriod, c.DelayOfInitialReadinessStatus) - removeMetricsForPods(metrics, ignoredPods) - removeMetricsForPods(metrics, unreadyPods) - if len(metrics) == 0 { - return 0, fmt.Errorf("did not receive metrics for any ready pods") - } - - requests, err := calculatePodRequests(podList.Items, container, resource) - if err != nil { - return 0, err - } - - usageRatio, err := getResourceUtilizationRatio(metrics, requests, targetUtilization) - if err != nil { - return 0, err - } - - scaleUpWithUnready := len(unreadyPods) > 0 && usageRatio > 1.0 - if !scaleUpWithUnready && len(missingPods) == 0 { - if math.Abs(1.0-usageRatio) <= c.Tolerance { - // return the current replicas if the change would be too small - return currentReplicas, nil - } - - // if we don't have any unready or missing pods, we can calculate the new replica count now - return int32(math.Ceil(usageRatio * float64(readyPodCount))), nil - } - - if len(missingPods) > 0 { - if usageRatio < 1.0 { - // on a scale-down, treat missing pods as using 100% (all) of the resource request - // or the utilization target for targets higher than 100% - fallbackUtilization := int64(util.MaxInt32(100, targetUtilization)) - for podName := range missingPods { - metrics[podName] = &podMetric{Value: requests[podName] * fallbackUtilization / 100} - } - } else if usageRatio > 1.0 { - // on a scale-up, treat missing pods as using 0% of the resource request - for podName := range missingPods { - metrics[podName] = &podMetric{Value: 0} - } - } - } - - if scaleUpWithUnready { - // on a scale-up, treat unready pods as using 0% of the resource request - for podName := range unreadyPods { - metrics[podName] = &podMetric{Value: 0} - } - } - - // re-run the utilization calculation with our new numbers - newUsageRatio, err := getResourceUtilizationRatio(metrics, requests, targetUtilization) - if err != nil { - return 0, err - } - - if math.Abs(1.0-newUsageRatio) <= c.Tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) { - // return the current replicas if the change would be too small, - // or if the new usage ratio would cause a change in scale direction - return currentReplicas, nil - } - - newReplicas := int32(math.Ceil(newUsageRatio * float64(len(metrics)))) - if (newUsageRatio < 1.0 && newReplicas > currentReplicas) || (newUsageRatio > 1.0 && newReplicas < currentReplicas) { - // return the current replicas if the change of metrics length would cause a change in scale direction - return currentReplicas, nil - } - - // return the result, where the number of replicas considered is - // however many replicas factored into our calculation - return newReplicas, nil -} - -// getRawResourceReplicas calculates the desired replica count based on a target resource usage (as a raw milli-value) -// for pods matching the given selector in the given namespace, and the current replica count. -func (c *replicaCalculator) getRawResourceReplicas(ctx context.Context, currentReplicas int32, targetUsage int64, resource corev1.ResourceName, namespace string, selector labels.Selector, container string) (replicaCount int32, err error) { - metrics, err := c.MetricsClient.GetResourceMetric(ctx, resource, namespace, selector, container) - if err != nil { - return 0, fmt.Errorf("failed to get metrics for resource %q: %v", resource, err) - } - return c.calcPlainMetricReplicas(ctx, metrics, currentReplicas, targetUsage, namespace, selector, resource) -} - -// calcPlainMetricReplicas calculates the desired replicas for plain (i.e. non-utilization percentage) metrics. -func (c *replicaCalculator) calcPlainMetricReplicas(ctx context.Context, metrics podMetricsInfo, currentReplicas int32, targetUsage int64, namespace string, selector labels.Selector, resource corev1.ResourceName) (replicaCount int32, err error) { - podList := &corev1.PodList{} - if err := c.Client.List(ctx, podList, client.InNamespace(namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil { - return 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err) - } - if len(podList.Items) == 0 { - return 0, fmt.Errorf("no pods returned by selector while calculating replica count") - } - - readyPodCount, unreadyPods, missingPods, ignoredPods := groupPods(podList.Items, metrics, resource, c.CPUInitializationPeriod, c.DelayOfInitialReadinessStatus) - removeMetricsForPods(metrics, ignoredPods) - removeMetricsForPods(metrics, unreadyPods) - - if len(metrics) == 0 { - return 0, fmt.Errorf("did not receive metrics for any ready pods") - } - - usageRatio := getMetricUsageRatio(metrics, targetUsage) - - scaleUpWithUnready := len(unreadyPods) > 0 && usageRatio > 1.0 - - if !scaleUpWithUnready && len(missingPods) == 0 { - if math.Abs(1.0-usageRatio) <= c.Tolerance { - // return the current replicas if the change would be too small - return currentReplicas, nil - } - - // if we don't have any unready or missing pods, we can calculate the new replica count now - return int32(math.Ceil(usageRatio * float64(readyPodCount))), nil - } - - if len(missingPods) > 0 { - if usageRatio < 1.0 { - // on a scale-down, treat missing pods as using exactly the target amount - for podName := range missingPods { - metrics[podName] = &podMetric{Value: targetUsage} - } - } else { - // on a scale-up, treat missing pods as using 0% of the resource request - for podName := range missingPods { - metrics[podName] = &podMetric{Value: 0} - } - } - } - - if scaleUpWithUnready { - // on a scale-up, treat unready pods as using 0% of the resource request - for podName := range unreadyPods { - metrics[podName] = &podMetric{Value: 0} - } - } - - // re-run the usage calculation with our new numbers - newUsageRatio := getMetricUsageRatio(metrics, targetUsage) - - if math.Abs(1.0-newUsageRatio) <= c.Tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) { - // return the current replicas if the change would be too small, - // or if the new usage ratio would cause a change in scale direction - return currentReplicas, nil - } - - newReplicas := int32(math.Ceil(newUsageRatio * float64(len(metrics)))) - if (newUsageRatio < 1.0 && newReplicas > currentReplicas) || (newUsageRatio > 1.0 && newReplicas < currentReplicas) { - // return the current replicas if the change of metrics length would cause a change in scale direction - return currentReplicas, nil - } - - // return the result, where the number of replicas considered is - // however many replicas factored into our calculation - return newReplicas, nil -} - -func groupPods(pods []corev1.Pod, metrics podMetricsInfo, resource corev1.ResourceName, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) (readyPodCount int, unreadyPods, missingPods, ignoredPods sets.String) { - missingPods = sets.NewString() - unreadyPods = sets.NewString() - ignoredPods = sets.NewString() - for _, pod := range pods { - if pod.DeletionTimestamp != nil || pod.Status.Phase == corev1.PodFailed { - ignoredPods.Insert(pod.Name) - continue - } - // Pending pods are ignored. - if pod.Status.Phase == corev1.PodPending { - unreadyPods.Insert(pod.Name) - continue - } - // Pods missing metrics. - metric, found := metrics[pod.Name] - if !found { - missingPods.Insert(pod.Name) - continue - } - // Unready pods are ignored. - if resource == corev1.ResourceCPU { - var unready bool - _, condition := util.GetPodCondition(&pod.Status, corev1.PodReady) - if condition == nil || pod.Status.StartTime == nil { - unready = true - } else { - // Pod still within possible initialisation period. - if pod.Status.StartTime.Add(cpuInitializationPeriod).After(time.Now()) { - // Ignore sample if pod is unready or one window of metric wasn't collected since last state transition. - unready = condition.Status == corev1.ConditionFalse || metric.Timestamp.Before(condition.LastTransitionTime.Time.Add(metric.Window)) - } else { - // Ignore metric if pod is unready and it has never been ready. - unready = condition.Status == corev1.ConditionFalse && pod.Status.StartTime.Add(delayOfInitialReadinessStatus).After(condition.LastTransitionTime.Time) - } - } - if unready { - unreadyPods.Insert(pod.Name) - continue - } - } - readyPodCount++ - } - return -} - -func calculatePodRequests(pods []corev1.Pod, container string, resource corev1.ResourceName) (map[string]int64, error) { - requests := make(map[string]int64, len(pods)) - for _, pod := range pods { - podSum := int64(0) - for _, c := range pod.Spec.Containers { - if container == "" || container == c.Name { - if containerRequest, ok := c.Resources.Requests[resource]; ok { - podSum += containerRequest.MilliValue() - } else { - return nil, fmt.Errorf("missing request for %s in container %q of Pod %q", resource, c.Name, pod.Name) - } - } - } - requests[pod.Name] = podSum - } - return requests, nil -} - -func removeMetricsForPods(metrics podMetricsInfo, pods sets.String) { - for _, pod := range pods.UnsortedList() { - delete(metrics, pod) - } -} diff --git a/pkg/util/client.go b/pkg/util/client.go new file mode 100644 index 0000000..8dbf434 --- /dev/null +++ b/pkg/util/client.go @@ -0,0 +1,82 @@ +/* + Copyright 2023 The Kapacity Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package util + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + corev1listers "k8s.io/client-go/listers/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// NewCtrlPodLister creates a corev1listers.PodLister wrapper for given controller-runtime client. +func NewCtrlPodLister(client client.Client) corev1listers.PodLister { + return &ctrlPodLister{ + client: client, + } +} + +type ctrlPodLister struct { + client client.Client +} + +type ctrlPodNamespaceLister struct { + client client.Client + namespace string +} + +func (l *ctrlPodLister) List(selector labels.Selector) ([]*corev1.Pod, error) { + podList := &corev1.PodList{} + if err := l.client.List(context.TODO(), podList, client.MatchingLabelsSelector{Selector: selector}); err != nil { + return nil, err + } + return convertPodListToPointerSlice(podList), nil +} + +func (l *ctrlPodLister) Pods(namespace string) corev1listers.PodNamespaceLister { + return &ctrlPodNamespaceLister{ + client: l.client, + namespace: namespace, + } +} + +func (l *ctrlPodNamespaceLister) List(selector labels.Selector) ([]*corev1.Pod, error) { + podList := &corev1.PodList{} + if err := l.client.List(context.TODO(), podList, client.InNamespace(l.namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil { + return nil, err + } + return convertPodListToPointerSlice(podList), nil +} + +func (l *ctrlPodNamespaceLister) Get(name string) (*corev1.Pod, error) { + pod := &corev1.Pod{} + if err := l.client.Get(context.TODO(), types.NamespacedName{Namespace: l.namespace, Name: name}, pod); err != nil { + return nil, err + } + return pod, nil +} + +func convertPodListToPointerSlice(podList *corev1.PodList) []*corev1.Pod { + s := make([]*corev1.Pod, len(podList.Items)) + for i := range podList.Items { + s[i] = &podList.Items[i] + } + return s +} diff --git a/pkg/util/client_test.go b/pkg/util/client_test.go new file mode 100644 index 0000000..eaf386a --- /dev/null +++ b/pkg/util/client_test.go @@ -0,0 +1,72 @@ +/* + Copyright 2023 The Kapacity Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package util + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestCtrlPodLister(t *testing.T) { + fakeClient := fake.NewClientBuilder().WithObjects( + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "ns1", + Labels: map[string]string{ + "key1": "value1", + }, + }, + }, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "ns1", + Labels: map[string]string{ + "key2": "value2", + }, + }, + }, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod3", + Namespace: "ns2", + Labels: map[string]string{ + "key1": "value1", + }, + }, + }, + ).Build() + lister := NewCtrlPodLister(fakeClient) + + pods, err := lister.List(labels.SelectorFromSet(map[string]string{"key1": "value1"})) + assert.Nil(t, err) + assert.Equal(t, 2, len(pods)) + for _, pod := range pods { + assert.NotEqual(t, "pod2", pod.Name) + } + + pods, err = lister.Pods("ns1").List(labels.SelectorFromSet(map[string]string{"key1": "value1"})) + assert.Nil(t, err) + assert.Equal(t, 1, len(pods)) + assert.Equal(t, "pod1", pods[0].Name) +}