Skip to content

Commit

Permalink
Merge pull request #6019 from damikag/fix-rc-ca-sched
Browse files Browse the repository at this point in the history
Implement dynamically adjustment of NodeDeleteDelayAfterTaint based on round trip time between CA and api-server
  • Loading branch information
k8s-ci-robot authored Sep 8, 2023
2 parents 0260231 + 42691d3 commit 9622a50
Show file tree
Hide file tree
Showing 5 changed files with 402 additions and 28 deletions.
3 changes: 3 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,7 @@ type AutoscalingOptions struct {
ParallelDrain bool
// NodeGroupSetRatio is a collection of ratios used by CA used to make scaling decisions.
NodeGroupSetRatios NodeGroupDifferenceRatios
// dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint
// based on the latency between the CA and the api-server
DynamicNodeDeleteDelayAfterTaintEnabled bool
}
47 changes: 35 additions & 12 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ type Actuator struct {
// TODO: Move budget processor to scaledown planner, potentially merge into PostFilteringScaleDownNodeProcessor
// This is a larger change to the code structure which impacts some existing actuator unit tests
// as well as Cluster Autoscaler implementations that may override ScaleDownSetProcessor
budgetProcessor *budgets.ScaleDownBudgetProcessor
configGetter actuatorNodeGroupConfigGetter
budgetProcessor *budgets.ScaleDownBudgetProcessor
configGetter actuatorNodeGroupConfigGetter
nodeDeleteDelayAfterTaint time.Duration
}

// actuatorNodeGroupConfigGetter is an interface to limit the functions that can be used
Expand All @@ -66,13 +67,14 @@ type actuatorNodeGroupConfigGetter interface {
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions, configGetter actuatorNodeGroupConfigGetter) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval)
return &Actuator{
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
deleteOptions: deleteOptions,
configGetter: configGetter,
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
deleteOptions: deleteOptions,
configGetter: configGetter,
nodeDeleteDelayAfterTaint: ctx.NodeDeleteDelayAfterTaint,
}
}

Expand Down Expand Up @@ -158,20 +160,41 @@ func (a *Actuator) deleteAsyncEmpty(NodeGroupViews []*budgets.NodeGroupView) (re
// applied taints are cleaned up.
func (a *Actuator) taintNodesSync(NodeGroupViews []*budgets.NodeGroupView) errors.AutoscalerError {
var taintedNodes []*apiv1.Node
var updateLatencyTracker *UpdateLatencyTracker
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker = NewUpdateLatencyTracker(a.ctx.AutoscalingKubeClients.ListerRegistry.AllNodeLister())
go updateLatencyTracker.Start()
}
for _, bucket := range NodeGroupViews {
for _, node := range bucket.Nodes {
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker.StartTimeChan <- nodeTaintStartTime{node.Name, time.Now()}
}
err := a.taintNode(node)
if err != nil {
a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
// Clean up already applied taints in case of issues.
for _, taintedNode := range taintedNodes {
_, _ = taints.CleanToBeDeleted(taintedNode, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
}
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
close(updateLatencyTracker.AwaitOrStopChan)
}
return errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", node)
}
taintedNodes = append(taintedNodes, node)
}
}
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker.AwaitOrStopChan <- true
latency, ok := <-updateLatencyTracker.ResultChan
if ok {
// CA is expected to wait 3 times the round-trip time between CA and the api-server.
// Therefore, the nodeDeleteDelayAfterTaint is set 2 times the latency.
// A delay of one round trip time is implicitly there when measuring the latency.
a.nodeDeleteDelayAfterTaint = 2 * latency
}
}
return nil
}

Expand Down Expand Up @@ -207,9 +230,9 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
return
}

if a.ctx.NodeDeleteDelayAfterTaint > time.Duration(0) {
klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.ctx.NodeDeleteDelayAfterTaint)
time.Sleep(a.ctx.NodeDeleteDelayAfterTaint)
if a.nodeDeleteDelayAfterTaint > time.Duration(0) {
klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.nodeDeleteDelayAfterTaint)
time.Sleep(a.nodeDeleteDelayAfterTaint)
}

clusterSnapshot, err := a.createSnapshot(nodes)
Expand Down
148 changes: 148 additions & 0 deletions cluster-autoscaler/core/scaledown/actuation/update_latency_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package actuation

import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/klog/v2"
)

const sleepDurationWhenPolling = 50 * time.Millisecond
const waitForTaintingTimeoutDuration = 30 * time.Second

type nodeTaintStartTime struct {
nodeName string
startTime time.Time
}

// UpdateLatencyTracker can be used to calculate round-trip time between CA and api-server
// when adding ToBeDeletedTaint to nodes
type UpdateLatencyTracker struct {
startTimestamp map[string]time.Time
finishTimestamp map[string]time.Time
remainingNodeCount int
nodeLister kubernetes.NodeLister
// Sends node tainting start timestamps to the tracker
StartTimeChan chan nodeTaintStartTime
sleepDurationWhenPolling time.Duration
// Passing a bool will wait for all the started nodes to get tainted and calculate
// latency based on latencies observed. (If all the nodes did not get tained within
// waitForTaintingTimeoutDuration after passing a bool, latency calculation will be
// aborted and the ResultChan will be closed without returning a value) Closing the
// AwaitOrStopChan without passing any bool will abort the latency calculation.
AwaitOrStopChan chan bool
// Communicate back the measured latency
ResultChan chan time.Duration
// now is used only to make the testing easier
now func() time.Time
}

// NewUpdateLatencyTracker returns a new NewUpdateLatencyTracker object
func NewUpdateLatencyTracker(nodeLister kubernetes.NodeLister) *UpdateLatencyTracker {
return &UpdateLatencyTracker{
startTimestamp: map[string]time.Time{},
finishTimestamp: map[string]time.Time{},
remainingNodeCount: 0,
nodeLister: nodeLister,
StartTimeChan: make(chan nodeTaintStartTime),
sleepDurationWhenPolling: sleepDurationWhenPolling,
AwaitOrStopChan: make(chan bool),
ResultChan: make(chan time.Duration),
now: time.Now,
}
}

// Start starts listening for node tainting start timestamps and update the timestamps that
// the taint appears for the first time for a particular node. Listen AwaitOrStopChan for stop/await signals
func (u *UpdateLatencyTracker) Start() {
for {
select {
case _, ok := <-u.AwaitOrStopChan:
if ok {
u.await()
}
return
case ntst := <-u.StartTimeChan:
u.startTimestamp[ntst.nodeName] = ntst.startTime
u.remainingNodeCount += 1
continue
default:
}
u.updateFinishTime()
time.Sleep(u.sleepDurationWhenPolling)
}
}

func (u *UpdateLatencyTracker) updateFinishTime() {
for nodeName := range u.startTimestamp {
if _, ok := u.finishTimestamp[nodeName]; ok {
continue
}
node, err := u.nodeLister.Get(nodeName)
if err != nil {
klog.Errorf("Error getting node: %v", err)
continue
}
if taints.HasToBeDeletedTaint(node) {
u.finishTimestamp[node.Name] = u.now()
u.remainingNodeCount -= 1
}
}
}

func (u *UpdateLatencyTracker) calculateLatency() time.Duration {
var maxLatency time.Duration = 0
for node, startTime := range u.startTimestamp {
endTime, _ := u.finishTimestamp[node]
currentLatency := endTime.Sub(startTime)
if currentLatency > maxLatency {
maxLatency = currentLatency
}
}
return maxLatency
}

func (u *UpdateLatencyTracker) await() {
waitingForTaintingStartTime := time.Now()
for {
switch {
case u.remainingNodeCount == 0:
latency := u.calculateLatency()
u.ResultChan <- latency
return
case time.Now().After(waitingForTaintingStartTime.Add(waitForTaintingTimeoutDuration)):
klog.Errorf("Timeout before tainting all nodes, latency measurement will be stale")
close(u.ResultChan)
return
default:
time.Sleep(u.sleepDurationWhenPolling)
u.updateFinishTime()
}
}
}

// NewUpdateLatencyTrackerForTesting returns a UpdateLatencyTracker object with
// reduced sleepDurationWhenPolling and mock clock for testing
func NewUpdateLatencyTrackerForTesting(nodeLister kubernetes.NodeLister, now func() time.Time) *UpdateLatencyTracker {
updateLatencyTracker := NewUpdateLatencyTracker(nodeLister)
updateLatencyTracker.now = now
updateLatencyTracker.sleepDurationWhenPolling = time.Millisecond
return updateLatencyTracker
}
Loading

0 comments on commit 9622a50

Please sign in to comment.