From 84767bd94de51e8583f0c0506cdc4c7ade152eec Mon Sep 17 00:00:00 2001 From: Damika Gamlath Date: Fri, 23 Feb 2024 13:08:51 +0000 Subject: [PATCH] implement time limiter for binpacking --- cluster-autoscaler/main.go | 3 + .../processors/binpacking/combined_limiter.go | 66 +++++++++++++++++++ .../processors/binpacking/time_limiter.go | 61 +++++++++++++++++ 3 files changed, 130 insertions(+) create mode 100644 cluster-autoscaler/processors/binpacking/combined_limiter.go create mode 100644 cluster-autoscaler/processors/binpacking/time_limiter.go diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 8f388dbf0d27..7ac90a40725e 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -31,6 +31,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" + "k8s.io/autoscaler/cluster-autoscaler/processors/binpacking" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config" @@ -196,6 +197,7 @@ var ( writeStatusConfigMapFlag = flag.Bool("write-status-configmap", true, "Should CA write status information to a configmap") statusConfigMapName = flag.String("status-config-map-name", "cluster-autoscaler-status", "Status configmap name") maxInactivityTimeFlag = flag.Duration("max-inactivity", 10*time.Minute, "Maximum time from last recorded autoscaler activity before automatic restart") + maxBinpackingTimeFlag = flag.Duration("max-binpacking-time", 5*time.Minute, "Maximum time spend on binpacking for a single scale-up. If binpacking is limited by this, scale-up will continue with the already calculated scale-up options.") maxFailingTimeFlag = flag.Duration("max-failing-time", 15*time.Minute, "Maximum time from last recorded successful autoscaler run before automatic restart") balanceSimilarNodeGroupsFlag = flag.Bool("balance-similar-node-groups", false, "Detect similar node groups and balance the number of nodes between them") nodeAutoprovisioningEnabled = flag.Bool("node-autoprovisioning-enabled", false, "Should CA autoprovision node groups when needed.This flag is deprecated and will be removed in future releases.") @@ -487,6 +489,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions) opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets) + opts.Processors.BinpackingLimiter = binpacking.NewCombinedLimiter([]binpacking.BinpackingLimiter{opts.Processors.BinpackingLimiter, binpacking.NewTimeLimiter(*maxBinpackingTimeFlag)}) podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker) if autoscalingOptions.ProvisioningRequestEnabled { diff --git a/cluster-autoscaler/processors/binpacking/combined_limiter.go b/cluster-autoscaler/processors/binpacking/combined_limiter.go new file mode 100644 index 000000000000..ec712d4cb22b --- /dev/null +++ b/cluster-autoscaler/processors/binpacking/combined_limiter.go @@ -0,0 +1,66 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package binpacking + +import ( + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/expander" +) + +// CombinedLimiter combines the outcome of multiple limiters. It will limit +// binpacking when at least one limiter meets the stop condition. +type CombinedLimiter struct { + limiters []BinpackingLimiter +} + +// NewCombinedLimiter returns an instance of a new CombinedLimiter +func NewCombinedLimiter(limiters []BinpackingLimiter) *CombinedLimiter { + return &CombinedLimiter{ + limiters: limiters, + } +} + +// InitBinpacking initialises all the underline limiters. +func (l *CombinedLimiter) InitBinpacking(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup) { + for _, limiter := range l.limiters { + limiter.InitBinpacking(context, nodeGroups) + } +} + +// MarkProcessed marks the nodegroup as processed in all underline limiters. +func (l *CombinedLimiter) MarkProcessed(context *context.AutoscalingContext, nodegroupId string) { + for _, limiter := range l.limiters { + limiter.MarkProcessed(context, nodegroupId) + } +} + +// StopBinpacking returns true if at least one of the underline limiters met the stop condition. +func (l *CombinedLimiter) StopBinpacking(context *context.AutoscalingContext, evaluatedOptions []expander.Option) bool { + stopCondition := false + for _, limiter := range l.limiters { + stopCondition = limiter.StopBinpacking(context, evaluatedOptions) || stopCondition + } + return stopCondition +} + +// FinalizeBinpacking will call FinalizeBinpacking for all the underline limiters. +func (l *CombinedLimiter) FinalizeBinpacking(context *context.AutoscalingContext, finalOptions []expander.Option) { + for _, limiter := range l.limiters { + limiter.FinalizeBinpacking(context, finalOptions) + } +} diff --git a/cluster-autoscaler/processors/binpacking/time_limiter.go b/cluster-autoscaler/processors/binpacking/time_limiter.go new file mode 100644 index 000000000000..4d4f6161ab49 --- /dev/null +++ b/cluster-autoscaler/processors/binpacking/time_limiter.go @@ -0,0 +1,61 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package binpacking + +import ( + "time" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/expander" + "k8s.io/klog/v2" +) + +// TimeLimiter imits binpacking based on the total time spends on binpacking. +type TimeLimiter struct { + startTime time.Time + maxBinpackingDuration time.Duration +} + +// NewTimeLimiter returns an instance of a new TimeLimiter. +func NewTimeLimiter(maxBinpackingDuration time.Duration) *TimeLimiter { + return &TimeLimiter{ + maxBinpackingDuration: maxBinpackingDuration, + } +} + +// InitBinpacking initialises the TimeLimiter. +func (b *TimeLimiter) InitBinpacking(context *context.AutoscalingContext, nodeGroups []cloudprovider.NodeGroup) { + b.startTime = time.Now() +} + +// MarkProcessed marks the nodegroup as processed. +func (b *TimeLimiter) MarkProcessed(context *context.AutoscalingContext, nodegroupId string) { +} + +// StopBinpacking returns true if the binpacking time exceeds maxBinpackingDuration. +func (b *TimeLimiter) StopBinpacking(context *context.AutoscalingContext, evaluatedOptions []expander.Option) bool { + if time.Now().After(b.startTime.Add(b.maxBinpackingDuration)) { + klog.Info("Binpacking is cut short due to maxBinpackingDuration reached.") + return true + } + return false +} + +// FinalizeBinpacking is called to finalize the BinpackingLimiter. +func (b *TimeLimiter) FinalizeBinpacking(context *context.AutoscalingContext, finalOptions []expander.Option) { +}