Skip to content

Commit

Permalink
Merge pull request kubernetes#6589 from kawych/nowait
Browse files Browse the repository at this point in the history
Add an option to trigger iterations more frequently
  • Loading branch information
k8s-ci-robot authored Mar 15, 2024
2 parents 75e7df6 + 702883d commit 109998d
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 18 deletions.
4 changes: 4 additions & 0 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type Autoscaler interface {
RunOnce(currentTime time.Time) errors.AutoscalerError
// ExitCleanUp is a clean-up performed just before process termination.
ExitCleanUp()
// LastScaleUpTime is a time of the last scale up
LastScaleUpTime() time.Time
// LastScaleUpTime is a time of the last scale down
LastScaleDownDeleteTime() time.Time
}

// NewAutoscaler creates an autoscaler of an appropriate type according to the parameters
Expand Down
10 changes: 10 additions & 0 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@ func NewStaticAutoscaler(
}
}

// LastScaleUpTime returns last scale up time
func (a *StaticAutoscaler) LastScaleUpTime() time.Time {
return a.lastScaleUpTime
}

// LastScaleDownDeleteTime returns the last successful scale down time
func (a *StaticAutoscaler) LastScaleDownDeleteTime() time.Time {
return a.lastScaleDownDeleteTime
}

// Start starts components running in background.
func (a *StaticAutoscaler) Start() error {
a.clusterStateRegistry.Start()
Expand Down
44 changes: 44 additions & 0 deletions cluster-autoscaler/loop/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package loop

import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
)

type autoscaler interface {
// RunOnce represents an iteration in the control-loop of CA.
RunOnce(currentTime time.Time) errors.AutoscalerError
}

// RunAutoscalerOnce triggers a single autoscaling iteration.
func RunAutoscalerOnce(autoscaler autoscaler, healthCheck *metrics.HealthCheck, loopStart time.Time) {
metrics.UpdateLastTime(metrics.Main, loopStart)
healthCheck.UpdateLastActivity(loopStart)

err := autoscaler.RunOnce(loopStart)
if err != nil && err.Type() != errors.TransientError {
metrics.RegisterError(err)
} else {
healthCheck.UpdateLastSuccessfulRun(time.Now())
}

metrics.UpdateDurationFromStart(metrics.Main, loopStart)
}
144 changes: 144 additions & 0 deletions cluster-autoscaler/loop/trigger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package loop

import (
"context"
"time"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
podv1 "k8s.io/kubernetes/pkg/api/v1/pod"
)

const maxPodChangeAge = 10 * time.Second

var (
podsResource = "pods"
unschedulablePodSelector = fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" +
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
)

// scalingTimesGetter exposes recent autoscaler activity
type scalingTimesGetter interface {
LastScaleUpTime() time.Time
LastScaleDownDeleteTime() time.Time
}

// LoopTrigger object implements criteria used to start new autoscaling iteration
type LoopTrigger struct {
podObserver *UnschedulablePodObserver
scanInterval time.Duration
scalingTimesGetter scalingTimesGetter
}

// NewLoopTrigger creates a LoopTrigger object
func NewLoopTrigger(podObserver *UnschedulablePodObserver, scalingTimesGetter scalingTimesGetter, scanInterval time.Duration) *LoopTrigger {
return &LoopTrigger{
podObserver: podObserver,
scanInterval: scanInterval,
scalingTimesGetter: scalingTimesGetter,
}
}

// Wait waits for the next autoscaling iteration
func (t *LoopTrigger) Wait(lastRun time.Time) {
sleepStart := time.Now()
defer metrics.UpdateDurationFromStart(metrics.LoopWait, sleepStart)

// To improve scale-up throughput, Cluster Autoscaler starts new iteration
// immediately if the previous one was productive.
if !t.scalingTimesGetter.LastScaleUpTime().Before(lastRun) ||
!t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) {
select {
case <-t.podObserver.unschedulablePodChan:
klog.Info("Autoscaler loop triggered by unschedulable pod appearing")
default:
klog.Infof("Autoscaler loop triggered immediately after a productive iteration")
}
return
}

// Unschedulable pod triggers autoscaling immediately.
select {
case <-time.After(t.scanInterval):
klog.Infof("Autoscaler loop triggered by a %v timer", t.scanInterval)
case <-t.podObserver.unschedulablePodChan:
klog.Info("Autoscaler loop triggered by unschedulable pod appearing")
}
}

// UnschedulablePodObserver triggers a new loop if there are new unschedulable pods
type UnschedulablePodObserver struct {
unschedulablePodChan <-chan any
}

// StartPodObserver creates an informer and starts a goroutine watching for newly added
// or updated pods. Each time a new unschedulable pod appears or a change causes a pod to become
// unschedulable, a message is sent to the UnschedulablePodObserver's channel.
func StartPodObserver(ctx context.Context, kubeClient kube_client.Interface) *UnschedulablePodObserver {
podChan := make(chan any, 1)
listWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), podsResource, apiv1.NamespaceAll, unschedulablePodSelector)
informer := cache.NewSharedInformer(listWatch, &apiv1.Pod{}, time.Hour)
addEventHandlerFunc := func(obj any) {
if isRecentUnschedulablePod(obj) {
klog.V(5).Infof(" filterPodChanUntilClose emits signal")
select {
case podChan <- struct{}{}:
default:
}
}
}
updateEventHandlerFunc := func(old any, newOjb any) { addEventHandlerFunc(newOjb) }
_, _ = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: addEventHandlerFunc,
UpdateFunc: updateEventHandlerFunc,
})
go informer.Run(ctx.Done())
return &UnschedulablePodObserver{
unschedulablePodChan: podChan,
}
}

// isRecentUnschedulablePod checks if the object is an unschedulable pod observed recently.
func isRecentUnschedulablePod(obj any) bool {
pod, ok := obj.(*apiv1.Pod)
if !ok {
return false
}
if pod.Status.Phase == apiv1.PodSucceeded || pod.Status.Phase == apiv1.PodFailed {
return false
}
if pod.Spec.NodeName != "" {
return false
}
_, scheduledCondition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
if scheduledCondition == nil {
return false
}
if scheduledCondition.Status != apiv1.ConditionFalse || scheduledCondition.Reason != "Unschedulable" {
return false
}
if scheduledCondition.LastTransitionTime.Time.Add(maxPodChangeAge).Before(time.Now()) {
return false
}
return true
}
35 changes: 17 additions & 18 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/loop"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"
Expand Down Expand Up @@ -62,7 +63,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
Expand Down Expand Up @@ -258,6 +258,7 @@ var (
"Priority evictor reuses the concepts of drain logic in kubelet(https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2712-pod-priority-based-graceful-node-shutdown#migration-from-the-node-graceful-shutdown-feature)."+
"Eg. flag usage: '10000:20,1000:100,0:60'")
provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.")
frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -591,23 +592,21 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho
}

// Autoscale ad infinitum.
for {
select {
case <-time.After(*scanInterval):
{
loopStart := time.Now()
metrics.UpdateLastTime(metrics.Main, loopStart)
healthCheck.UpdateLastActivity(loopStart)

err := autoscaler.RunOnce(loopStart)
if err != nil && err.Type() != errors.TransientError {
metrics.RegisterError(err)
} else {
healthCheck.UpdateLastSuccessfulRun(time.Now())
}

metrics.UpdateDurationFromStart(metrics.Main, loopStart)
}
context, cancel := ctx.WithCancel(ctx.Background())
defer cancel()
if *frequentLoopsEnabled {
podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(createAutoscalingOptions().KubeClientOpts))
trigger := loop.NewLoopTrigger(podObserver, autoscaler, *scanInterval)
lastRun := time.Now()
for {
trigger.Wait(lastRun)
lastRun = time.Now()
loop.RunAutoscalerOnce(autoscaler, healthCheck, lastRun)
}
} else {
for {
time.Sleep(*scanInterval)
loop.RunAutoscalerOnce(autoscaler, healthCheck, time.Now())
}
}
}
Expand Down
1 change: 1 addition & 0 deletions cluster-autoscaler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ const (
Poll FunctionLabel = "poll"
Reconfigure FunctionLabel = "reconfigure"
Autoscaling FunctionLabel = "autoscaling"
LoopWait FunctionLabel = "loopWait"
)

var (
Expand Down

0 comments on commit 109998d

Please sign in to comment.