Skip to content

Commit

Permalink
fix race condition between ca and scheduler
Browse files Browse the repository at this point in the history
Implement dynamically adjustment of NodeDeleteDelayAfterTaint based on round trip time between ca and apiserver
  • Loading branch information
damikag committed Sep 1, 2023
1 parent 893a51b commit dd3258a
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 28 deletions.
3 changes: 3 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,7 @@ type AutoscalingOptions struct {
ParallelDrain bool
// NodeGroupSetRatio is a collection of ratios used by CA used to make scaling decisions.
NodeGroupSetRatios NodeGroupDifferenceRatios
// dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint
// based on the latency between the CA and the api-server
DynamicNodeDeleteDelayAfterTaintEnabled bool
}
43 changes: 31 additions & 12 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ type Actuator struct {
// TODO: Move budget processor to scaledown planner, potentially merge into PostFilteringScaleDownNodeProcessor
// This is a larger change to the code structure which impacts some existing actuator unit tests
// as well as Cluster Autoscaler implementations that may override ScaleDownSetProcessor
budgetProcessor *budgets.ScaleDownBudgetProcessor
configGetter actuatorNodeGroupConfigGetter
budgetProcessor *budgets.ScaleDownBudgetProcessor
configGetter actuatorNodeGroupConfigGetter
nodeDeleteDelayAfterTaint time.Duration
}

// actuatorNodeGroupConfigGetter is an interface to limit the functions that can be used
Expand All @@ -66,13 +67,14 @@ type actuatorNodeGroupConfigGetter interface {
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions, configGetter actuatorNodeGroupConfigGetter) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval)
return &Actuator{
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
deleteOptions: deleteOptions,
configGetter: configGetter,
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
deleteOptions: deleteOptions,
configGetter: configGetter,
nodeDeleteDelayAfterTaint: ctx.NodeDeleteDelayAfterTaint,
}
}

Expand Down Expand Up @@ -158,20 +160,37 @@ func (a *Actuator) deleteAsyncEmpty(NodeGroupViews []*budgets.NodeGroupView) (re
// applied taints are cleaned up.
func (a *Actuator) taintNodesSync(NodeGroupViews []*budgets.NodeGroupView) errors.AutoscalerError {
var taintedNodes []*apiv1.Node
updateLatencyTracker := NewUpdateLatencyTracker(a.ctx.AutoscalingKubeClients.ListerRegistry.AllNodeLister())
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
go updateLatencyTracker.Start()
}
for _, bucket := range NodeGroupViews {
for _, node := range bucket.Nodes {
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker.startTimeChan <- NodeTaintStartTime{node.Name, time.Now()}
}
err := a.taintNode(node)
if err != nil {
a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
// Clean up already applied taints in case of issues.
for _, taintedNode := range taintedNodes {
_, _ = taints.CleanToBeDeleted(taintedNode, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
}
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
close(updateLatencyTracker.stopChan)
}
return errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", node)
}
taintedNodes = append(taintedNodes, node)
}
}
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
close(updateLatencyTracker.stopChan)
latency, ok := <-updateLatencyTracker.resultChan
if ok {
a.nodeDeleteDelayAfterTaint = 2 * latency
}
}
return nil
}

Expand Down Expand Up @@ -207,9 +226,9 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
return
}

if a.ctx.NodeDeleteDelayAfterTaint > time.Duration(0) {
klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.ctx.NodeDeleteDelayAfterTaint)
time.Sleep(a.ctx.NodeDeleteDelayAfterTaint)
if a.nodeDeleteDelayAfterTaint > time.Duration(0) {
klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.nodeDeleteDelayAfterTaint)
time.Sleep(a.nodeDeleteDelayAfterTaint)
}

clusterSnapshot, err := a.createSnapshot(nodes)
Expand Down
100 changes: 100 additions & 0 deletions cluster-autoscaler/core/scaledown/actuation/update_latency_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package actuation

import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/klog/v2"
)

const sleepDurationWhenPolling = 50 * time.Millisecond
const watiForTaintingTimeoutDuration = 5 * time.Minute

type NodeTaintStartTime struct {
nodeName string
startTime time.Time
}

type UpdateLatencyTracker struct {
startTimestamp map[string]time.Time
finishTimestamp map[string]time.Time
remainingNodeCount int
nodeLister kubernetes.NodeLister
startTimeChan chan NodeTaintStartTime
stopChan chan struct{}
resultChan chan time.Duration
}

func NewUpdateLatencyTracker(nodeLister kubernetes.NodeLister) *UpdateLatencyTracker {
return &UpdateLatencyTracker{
startTimestamp: map[string]time.Time{},
finishTimestamp: map[string]time.Time{},
remainingNodeCount: 0,
nodeLister: nodeLister,
startTimeChan: make(chan NodeTaintStartTime),
stopChan: make(chan struct{}),
resultChan: make(chan time.Duration),
}
}

func (u *UpdateLatencyTracker) Start() {
for {
select {
case <-u.stopChan:
waitingForTaintingStartTime := time.Now()
for {
switch {
case u.remainingNodeCount == 0:
latency := u.calculateLatency()
u.resultChan <- latency
return
case time.Now().After(waitingForTaintingStartTime.Add(watiForTaintingTimeoutDuration)):
klog.Errorf("Timeout before tainting all nodes")
close(u.resultChan)
return
default:
time.Sleep(sleepDurationWhenPolling)
u.updateFinishTime()
}
}

case ntst := <-u.startTimeChan:
u.startTimestamp[ntst.nodeName] = ntst.startTime
u.remainingNodeCount += 1
continue
default:
}
u.updateFinishTime()
time.Sleep(sleepDurationWhenPolling)
}
}

func (u *UpdateLatencyTracker) updateFinishTime() {
for nodeName, _ := range u.startTimestamp {
if _, ok := u.finishTimestamp[nodeName]; ok {
continue
}
node, err := u.nodeLister.Get(nodeName)
if err != nil {
klog.Errorf("Error getting node: %v", err)
continue
}
if taints.HasToBeDeletedTaint(node) {
u.finishTimestamp[node.Name] = time.Now()
u.remainingNodeCount -= 1
}
}
}

func (u *UpdateLatencyTracker) calculateLatency() time.Duration {
var maxLatency time.Duration = 0
for node, startTime := range u.startTimestamp {
endTime, _ := u.finishTimestamp[node]
currentLatency := endTime.Sub(startTime)
if currentLatency > maxLatency {
maxLatency = currentLatency
}
}
return maxLatency
}
34 changes: 18 additions & 16 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,22 +218,23 @@ var (
"maxNodeGroupBackoffDuration is the maximum backoff duration for a NodeGroup after new nodes failed to start.")
nodeGroupBackoffResetTimeout = flag.Duration("node-group-backoff-reset-timeout", 3*time.Hour,
"nodeGroupBackoffResetTimeout is the time after last failed scale-up when the backoff duration is reset.")
maxScaleDownParallelismFlag = flag.Int("max-scale-down-parallelism", 10, "Maximum number of nodes (both empty and needing drain) that can be deleted in parallel.")
maxDrainParallelismFlag = flag.Int("max-drain-parallelism", 1, "Maximum number of nodes needing drain, that can be drained and deleted in parallel.")
recordDuplicatedEvents = flag.Bool("record-duplicated-events", false, "enable duplication of similar events within a 5 minute window.")
maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.")
maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.")
skipNodesWithSystemPods = flag.Bool("skip-nodes-with-system-pods", true, "If true cluster autoscaler will never delete nodes with pods from kube-system (except for DaemonSet or mirror pods)")
skipNodesWithLocalStorage = flag.Bool("skip-nodes-with-local-storage", true, "If true cluster autoscaler will never delete nodes with pods with local storage, e.g. EmptyDir or HostPath")
skipNodesWithCustomControllerPods = flag.Bool("skip-nodes-with-custom-controller-pods", true, "If true cluster autoscaler will never delete nodes with pods owned by custom controllers")
minReplicaCount = flag.Int("min-replica-count", 0, "Minimum number or replicas that a replica set or replication controller should have to allow their pods deletion in scale down")
nodeDeleteDelayAfterTaint = flag.Duration("node-delete-delay-after-taint", 5*time.Second, "How long to wait before deleting a node after tainting it")
scaleDownSimulationTimeout = flag.Duration("scale-down-simulation-timeout", 30*time.Second, "How long should we run scale down simulation.")
parallelDrain = flag.Bool("parallel-drain", true, "Whether to allow parallel drain of nodes. This flag is deprecated and will be removed in future releases.")
maxCapacityMemoryDifferenceRatio = flag.Float64("memory-difference-ratio", config.DefaultMaxCapacityMemoryDifferenceRatio, "Maximum difference in memory capacity between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's memory capacity.")
maxFreeDifferenceRatio = flag.Float64("max-free-difference-ratio", config.DefaultMaxFreeDifferenceRatio, "Maximum difference in free resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's free resource.")
maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.")
forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.")
maxScaleDownParallelismFlag = flag.Int("max-scale-down-parallelism", 10, "Maximum number of nodes (both empty and needing drain) that can be deleted in parallel.")
maxDrainParallelismFlag = flag.Int("max-drain-parallelism", 1, "Maximum number of nodes needing drain, that can be drained and deleted in parallel.")
recordDuplicatedEvents = flag.Bool("record-duplicated-events", false, "enable duplication of similar events within a 5 minute window.")
maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.")
maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.")
skipNodesWithSystemPods = flag.Bool("skip-nodes-with-system-pods", true, "If true cluster autoscaler will never delete nodes with pods from kube-system (except for DaemonSet or mirror pods)")
skipNodesWithLocalStorage = flag.Bool("skip-nodes-with-local-storage", true, "If true cluster autoscaler will never delete nodes with pods with local storage, e.g. EmptyDir or HostPath")
skipNodesWithCustomControllerPods = flag.Bool("skip-nodes-with-custom-controller-pods", true, "If true cluster autoscaler will never delete nodes with pods owned by custom controllers")
minReplicaCount = flag.Int("min-replica-count", 0, "Minimum number or replicas that a replica set or replication controller should have to allow their pods deletion in scale down")
nodeDeleteDelayAfterTaint = flag.Duration("node-delete-delay-after-taint", 5*time.Second, "How long to wait before deleting a node after tainting it")
scaleDownSimulationTimeout = flag.Duration("scale-down-simulation-timeout", 30*time.Second, "How long should we run scale down simulation.")
parallelDrain = flag.Bool("parallel-drain", true, "Whether to allow parallel drain of nodes. This flag is deprecated and will be removed in future releases.")
maxCapacityMemoryDifferenceRatio = flag.Float64("memory-difference-ratio", config.DefaultMaxCapacityMemoryDifferenceRatio, "Maximum difference in memory capacity between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's memory capacity.")
maxFreeDifferenceRatio = flag.Float64("max-free-difference-ratio", config.DefaultMaxFreeDifferenceRatio, "Maximum difference in free resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's free resource.")
maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.")
forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.")
dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", true, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -379,6 +380,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
MaxAllocatableDifferenceRatio: *maxAllocatableDifferenceRatio,
MaxFreeDifferenceRatio: *maxFreeDifferenceRatio,
},
DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled,
}
}

Expand Down

0 comments on commit dd3258a

Please sign in to comment.