Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement dynamically adjustment of NodeDeleteDelayAfterTaint based on round trip time between CA and api-server #6019

Merged
merged 1 commit into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,7 @@ type AutoscalingOptions struct {
ParallelDrain bool
// NodeGroupSetRatio is a collection of ratios used by CA used to make scaling decisions.
NodeGroupSetRatios NodeGroupDifferenceRatios
// dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint
// based on the latency between the CA and the api-server
DynamicNodeDeleteDelayAfterTaintEnabled bool
}
47 changes: 35 additions & 12 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
damikag marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ type Actuator struct {
// TODO: Move budget processor to scaledown planner, potentially merge into PostFilteringScaleDownNodeProcessor
// This is a larger change to the code structure which impacts some existing actuator unit tests
// as well as Cluster Autoscaler implementations that may override ScaleDownSetProcessor
budgetProcessor *budgets.ScaleDownBudgetProcessor
configGetter actuatorNodeGroupConfigGetter
budgetProcessor *budgets.ScaleDownBudgetProcessor
configGetter actuatorNodeGroupConfigGetter
nodeDeleteDelayAfterTaint time.Duration
}

// actuatorNodeGroupConfigGetter is an interface to limit the functions that can be used
Expand All @@ -66,13 +67,14 @@ type actuatorNodeGroupConfigGetter interface {
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions, configGetter actuatorNodeGroupConfigGetter) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval)
return &Actuator{
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
deleteOptions: deleteOptions,
configGetter: configGetter,
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
deleteOptions: deleteOptions,
configGetter: configGetter,
nodeDeleteDelayAfterTaint: ctx.NodeDeleteDelayAfterTaint,
}
}

Expand Down Expand Up @@ -158,20 +160,41 @@ func (a *Actuator) deleteAsyncEmpty(NodeGroupViews []*budgets.NodeGroupView) (re
// applied taints are cleaned up.
func (a *Actuator) taintNodesSync(NodeGroupViews []*budgets.NodeGroupView) errors.AutoscalerError {
var taintedNodes []*apiv1.Node
var updateLatencyTracker *UpdateLatencyTracker
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker = NewUpdateLatencyTracker(a.ctx.AutoscalingKubeClients.ListerRegistry.AllNodeLister())
go updateLatencyTracker.Start()
}
for _, bucket := range NodeGroupViews {
for _, node := range bucket.Nodes {
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker.StartTimeChan <- nodeTaintStartTime{node.Name, time.Now()}
}
err := a.taintNode(node)
if err != nil {
a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
// Clean up already applied taints in case of issues.
for _, taintedNode := range taintedNodes {
_, _ = taints.CleanToBeDeleted(taintedNode, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
}
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
close(updateLatencyTracker.AwaitOrStopChan)
}
return errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", node)
}
taintedNodes = append(taintedNodes, node)
}
}
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker.AwaitOrStopChan <- true
latency, ok := <-updateLatencyTracker.ResultChan
if ok {
// CA is expected to wait 3 times the round-trip time between CA and the api-server.
// Therefore, the nodeDeleteDelayAfterTaint is set 2 times the latency.
// A delay of one round trip time is implicitly there when measuring the latency.
a.nodeDeleteDelayAfterTaint = 2 * latency
damikag marked this conversation as resolved.
Show resolved Hide resolved
}
}
return nil
}

Expand Down Expand Up @@ -207,9 +230,9 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
return
}

if a.ctx.NodeDeleteDelayAfterTaint > time.Duration(0) {
klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.ctx.NodeDeleteDelayAfterTaint)
time.Sleep(a.ctx.NodeDeleteDelayAfterTaint)
if a.nodeDeleteDelayAfterTaint > time.Duration(0) {
klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.nodeDeleteDelayAfterTaint)
time.Sleep(a.nodeDeleteDelayAfterTaint)
}

clusterSnapshot, err := a.createSnapshot(nodes)
Expand Down
148 changes: 148 additions & 0 deletions cluster-autoscaler/core/scaledown/actuation/update_latency_tracker.go
damikag marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2022 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package actuation

import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/klog/v2"
)

const sleepDurationWhenPolling = 50 * time.Millisecond
damikag marked this conversation as resolved.
Show resolved Hide resolved
const waitForTaintingTimeoutDuration = 30 * time.Second

type nodeTaintStartTime struct {
nodeName string
startTime time.Time
}

// UpdateLatencyTracker can be used to calculate round-trip time between CA and api-server
// when adding ToBeDeletedTaint to nodes
type UpdateLatencyTracker struct {
damikag marked this conversation as resolved.
Show resolved Hide resolved
damikag marked this conversation as resolved.
Show resolved Hide resolved
startTimestamp map[string]time.Time
finishTimestamp map[string]time.Time
remainingNodeCount int
nodeLister kubernetes.NodeLister
// Sends node tainting start timestamps to the tracker
StartTimeChan chan nodeTaintStartTime
sleepDurationWhenPolling time.Duration
// Passing a bool will wait for all the started nodes to get tainted and calculate
// latency based on latencies observed. (If all the nodes did not get tained within
// waitForTaintingTimeoutDuration after passing a bool, latency calculation will be
// aborted and the ResultChan will be closed without returning a value) Closing the
// AwaitOrStopChan without passing any bool will abort the latency calculation.
AwaitOrStopChan chan bool
// Communicate back the measured latency
ResultChan chan time.Duration
// now is used only to make the testing easier
now func() time.Time
}

// NewUpdateLatencyTracker returns a new NewUpdateLatencyTracker object
func NewUpdateLatencyTracker(nodeLister kubernetes.NodeLister) *UpdateLatencyTracker {
return &UpdateLatencyTracker{
startTimestamp: map[string]time.Time{},
finishTimestamp: map[string]time.Time{},
remainingNodeCount: 0,
nodeLister: nodeLister,
StartTimeChan: make(chan nodeTaintStartTime),
sleepDurationWhenPolling: sleepDurationWhenPolling,
AwaitOrStopChan: make(chan bool),
ResultChan: make(chan time.Duration),
now: time.Now,
}
}

// Start starts listening for node tainting start timestamps and update the timestamps that
// the taint appears for the first time for a particular node. Listen AwaitOrStopChan for stop/await signals
func (u *UpdateLatencyTracker) Start() {
for {
select {
case _, ok := <-u.AwaitOrStopChan:
if ok {
u.await()
}
return
case ntst := <-u.StartTimeChan:
u.startTimestamp[ntst.nodeName] = ntst.startTime
u.remainingNodeCount += 1
continue
default:
}
u.updateFinishTime()
time.Sleep(u.sleepDurationWhenPolling)
}
}

func (u *UpdateLatencyTracker) updateFinishTime() {
for nodeName := range u.startTimestamp {
if _, ok := u.finishTimestamp[nodeName]; ok {
continue
}
node, err := u.nodeLister.Get(nodeName)
if err != nil {
klog.Errorf("Error getting node: %v", err)
continue
}
if taints.HasToBeDeletedTaint(node) {
u.finishTimestamp[node.Name] = u.now()
u.remainingNodeCount -= 1
}
}
}

func (u *UpdateLatencyTracker) calculateLatency() time.Duration {
var maxLatency time.Duration = 0
for node, startTime := range u.startTimestamp {
endTime, _ := u.finishTimestamp[node]
currentLatency := endTime.Sub(startTime)
if currentLatency > maxLatency {
maxLatency = currentLatency
}
}
return maxLatency
}

func (u *UpdateLatencyTracker) await() {
waitingForTaintingStartTime := time.Now()
for {
switch {
case u.remainingNodeCount == 0:
latency := u.calculateLatency()
u.ResultChan <- latency
return
case time.Now().After(waitingForTaintingStartTime.Add(waitForTaintingTimeoutDuration)):
klog.Errorf("Timeout before tainting all nodes, latency measurement will be stale")
close(u.ResultChan)
return
default:
time.Sleep(u.sleepDurationWhenPolling)
u.updateFinishTime()
}
}
}

// NewUpdateLatencyTrackerForTesting returns a UpdateLatencyTracker object with
// reduced sleepDurationWhenPolling and mock clock for testing
func NewUpdateLatencyTrackerForTesting(nodeLister kubernetes.NodeLister, now func() time.Time) *UpdateLatencyTracker {
updateLatencyTracker := NewUpdateLatencyTracker(nodeLister)
updateLatencyTracker.now = now
updateLatencyTracker.sleepDurationWhenPolling = time.Millisecond
return updateLatencyTracker
}
Loading