From dd3258a32673b014876e16223c729084093a536f Mon Sep 17 00:00:00 2001 From: Damika Gamlath Date: Mon, 7 Aug 2023 11:51:36 +0000 Subject: [PATCH] fix race condition between ca and scheduler Implement dynamically adjustment of NodeDeleteDelayAfterTaint based on round trip time between ca and apiserver --- .../config/autoscaling_options.go | 3 + .../core/scaledown/actuation/actuator.go | 43 +++++--- .../actuation/update_latency_tracker.go | 100 ++++++++++++++++++ cluster-autoscaler/main.go | 34 +++--- 4 files changed, 152 insertions(+), 28 deletions(-) create mode 100644 cluster-autoscaler/core/scaledown/actuation/update_latency_tracker.go diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index ae1bc09b673e..3282b0bb64ea 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -274,4 +274,7 @@ type AutoscalingOptions struct { ParallelDrain bool // NodeGroupSetRatio is a collection of ratios used by CA used to make scaling decisions. NodeGroupSetRatios NodeGroupDifferenceRatios + // dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint + // based on the latency between the CA and the api-server + DynamicNodeDeleteDelayAfterTaintEnabled bool } diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index db9e239b17c1..c07325996322 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -51,8 +51,9 @@ type Actuator struct { // TODO: Move budget processor to scaledown planner, potentially merge into PostFilteringScaleDownNodeProcessor // This is a larger change to the code structure which impacts some existing actuator unit tests // as well as Cluster Autoscaler implementations that may override ScaleDownSetProcessor - budgetProcessor *budgets.ScaleDownBudgetProcessor - configGetter actuatorNodeGroupConfigGetter + budgetProcessor *budgets.ScaleDownBudgetProcessor + configGetter actuatorNodeGroupConfigGetter + nodeDeleteDelayAfterTaint time.Duration } // actuatorNodeGroupConfigGetter is an interface to limit the functions that can be used @@ -66,13 +67,14 @@ type actuatorNodeGroupConfigGetter interface { func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions, configGetter actuatorNodeGroupConfigGetter) *Actuator { ndb := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval) return &Actuator{ - ctx: ctx, - clusterState: csr, - nodeDeletionTracker: ndt, - nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)), - budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx), - deleteOptions: deleteOptions, - configGetter: configGetter, + ctx: ctx, + clusterState: csr, + nodeDeletionTracker: ndt, + nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)), + budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx), + deleteOptions: deleteOptions, + configGetter: configGetter, + nodeDeleteDelayAfterTaint: ctx.NodeDeleteDelayAfterTaint, } } @@ -158,8 +160,15 @@ func (a *Actuator) deleteAsyncEmpty(NodeGroupViews []*budgets.NodeGroupView) (re // applied taints are cleaned up. func (a *Actuator) taintNodesSync(NodeGroupViews []*budgets.NodeGroupView) errors.AutoscalerError { var taintedNodes []*apiv1.Node + updateLatencyTracker := NewUpdateLatencyTracker(a.ctx.AutoscalingKubeClients.ListerRegistry.AllNodeLister()) + if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled { + go updateLatencyTracker.Start() + } for _, bucket := range NodeGroupViews { for _, node := range bucket.Nodes { + if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled { + updateLatencyTracker.startTimeChan <- NodeTaintStartTime{node.Name, time.Now()} + } err := a.taintNode(node) if err != nil { a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err) @@ -167,11 +176,21 @@ func (a *Actuator) taintNodesSync(NodeGroupViews []*budgets.NodeGroupView) error for _, taintedNode := range taintedNodes { _, _ = taints.CleanToBeDeleted(taintedNode, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate) } + if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled { + close(updateLatencyTracker.stopChan) + } return errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", node) } taintedNodes = append(taintedNodes, node) } } + if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled { + close(updateLatencyTracker.stopChan) + latency, ok := <-updateLatencyTracker.resultChan + if ok { + a.nodeDeleteDelayAfterTaint = 2 * latency + } + } return nil } @@ -207,9 +226,9 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider return } - if a.ctx.NodeDeleteDelayAfterTaint > time.Duration(0) { - klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.ctx.NodeDeleteDelayAfterTaint) - time.Sleep(a.ctx.NodeDeleteDelayAfterTaint) + if a.nodeDeleteDelayAfterTaint > time.Duration(0) { + klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.nodeDeleteDelayAfterTaint) + time.Sleep(a.nodeDeleteDelayAfterTaint) } clusterSnapshot, err := a.createSnapshot(nodes) diff --git a/cluster-autoscaler/core/scaledown/actuation/update_latency_tracker.go b/cluster-autoscaler/core/scaledown/actuation/update_latency_tracker.go new file mode 100644 index 000000000000..c1c8b559d2de --- /dev/null +++ b/cluster-autoscaler/core/scaledown/actuation/update_latency_tracker.go @@ -0,0 +1,100 @@ +package actuation + +import ( + "time" + + "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" + "k8s.io/klog/v2" +) + +const sleepDurationWhenPolling = 50 * time.Millisecond +const watiForTaintingTimeoutDuration = 5 * time.Minute + +type NodeTaintStartTime struct { + nodeName string + startTime time.Time +} + +type UpdateLatencyTracker struct { + startTimestamp map[string]time.Time + finishTimestamp map[string]time.Time + remainingNodeCount int + nodeLister kubernetes.NodeLister + startTimeChan chan NodeTaintStartTime + stopChan chan struct{} + resultChan chan time.Duration +} + +func NewUpdateLatencyTracker(nodeLister kubernetes.NodeLister) *UpdateLatencyTracker { + return &UpdateLatencyTracker{ + startTimestamp: map[string]time.Time{}, + finishTimestamp: map[string]time.Time{}, + remainingNodeCount: 0, + nodeLister: nodeLister, + startTimeChan: make(chan NodeTaintStartTime), + stopChan: make(chan struct{}), + resultChan: make(chan time.Duration), + } +} + +func (u *UpdateLatencyTracker) Start() { + for { + select { + case <-u.stopChan: + waitingForTaintingStartTime := time.Now() + for { + switch { + case u.remainingNodeCount == 0: + latency := u.calculateLatency() + u.resultChan <- latency + return + case time.Now().After(waitingForTaintingStartTime.Add(watiForTaintingTimeoutDuration)): + klog.Errorf("Timeout before tainting all nodes") + close(u.resultChan) + return + default: + time.Sleep(sleepDurationWhenPolling) + u.updateFinishTime() + } + } + + case ntst := <-u.startTimeChan: + u.startTimestamp[ntst.nodeName] = ntst.startTime + u.remainingNodeCount += 1 + continue + default: + } + u.updateFinishTime() + time.Sleep(sleepDurationWhenPolling) + } +} + +func (u *UpdateLatencyTracker) updateFinishTime() { + for nodeName, _ := range u.startTimestamp { + if _, ok := u.finishTimestamp[nodeName]; ok { + continue + } + node, err := u.nodeLister.Get(nodeName) + if err != nil { + klog.Errorf("Error getting node: %v", err) + continue + } + if taints.HasToBeDeletedTaint(node) { + u.finishTimestamp[node.Name] = time.Now() + u.remainingNodeCount -= 1 + } + } +} + +func (u *UpdateLatencyTracker) calculateLatency() time.Duration { + var maxLatency time.Duration = 0 + for node, startTime := range u.startTimestamp { + endTime, _ := u.finishTimestamp[node] + currentLatency := endTime.Sub(startTime) + if currentLatency > maxLatency { + maxLatency = currentLatency + } + } + return maxLatency +} diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 5005a0ba4b32..2a3e30cd8287 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -218,22 +218,23 @@ var ( "maxNodeGroupBackoffDuration is the maximum backoff duration for a NodeGroup after new nodes failed to start.") nodeGroupBackoffResetTimeout = flag.Duration("node-group-backoff-reset-timeout", 3*time.Hour, "nodeGroupBackoffResetTimeout is the time after last failed scale-up when the backoff duration is reset.") - maxScaleDownParallelismFlag = flag.Int("max-scale-down-parallelism", 10, "Maximum number of nodes (both empty and needing drain) that can be deleted in parallel.") - maxDrainParallelismFlag = flag.Int("max-drain-parallelism", 1, "Maximum number of nodes needing drain, that can be drained and deleted in parallel.") - recordDuplicatedEvents = flag.Bool("record-duplicated-events", false, "enable duplication of similar events within a 5 minute window.") - maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.") - maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.") - skipNodesWithSystemPods = flag.Bool("skip-nodes-with-system-pods", true, "If true cluster autoscaler will never delete nodes with pods from kube-system (except for DaemonSet or mirror pods)") - skipNodesWithLocalStorage = flag.Bool("skip-nodes-with-local-storage", true, "If true cluster autoscaler will never delete nodes with pods with local storage, e.g. EmptyDir or HostPath") - skipNodesWithCustomControllerPods = flag.Bool("skip-nodes-with-custom-controller-pods", true, "If true cluster autoscaler will never delete nodes with pods owned by custom controllers") - minReplicaCount = flag.Int("min-replica-count", 0, "Minimum number or replicas that a replica set or replication controller should have to allow their pods deletion in scale down") - nodeDeleteDelayAfterTaint = flag.Duration("node-delete-delay-after-taint", 5*time.Second, "How long to wait before deleting a node after tainting it") - scaleDownSimulationTimeout = flag.Duration("scale-down-simulation-timeout", 30*time.Second, "How long should we run scale down simulation.") - parallelDrain = flag.Bool("parallel-drain", true, "Whether to allow parallel drain of nodes. This flag is deprecated and will be removed in future releases.") - maxCapacityMemoryDifferenceRatio = flag.Float64("memory-difference-ratio", config.DefaultMaxCapacityMemoryDifferenceRatio, "Maximum difference in memory capacity between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's memory capacity.") - maxFreeDifferenceRatio = flag.Float64("max-free-difference-ratio", config.DefaultMaxFreeDifferenceRatio, "Maximum difference in free resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's free resource.") - maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.") - forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.") + maxScaleDownParallelismFlag = flag.Int("max-scale-down-parallelism", 10, "Maximum number of nodes (both empty and needing drain) that can be deleted in parallel.") + maxDrainParallelismFlag = flag.Int("max-drain-parallelism", 1, "Maximum number of nodes needing drain, that can be drained and deleted in parallel.") + recordDuplicatedEvents = flag.Bool("record-duplicated-events", false, "enable duplication of similar events within a 5 minute window.") + maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.") + maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.") + skipNodesWithSystemPods = flag.Bool("skip-nodes-with-system-pods", true, "If true cluster autoscaler will never delete nodes with pods from kube-system (except for DaemonSet or mirror pods)") + skipNodesWithLocalStorage = flag.Bool("skip-nodes-with-local-storage", true, "If true cluster autoscaler will never delete nodes with pods with local storage, e.g. EmptyDir or HostPath") + skipNodesWithCustomControllerPods = flag.Bool("skip-nodes-with-custom-controller-pods", true, "If true cluster autoscaler will never delete nodes with pods owned by custom controllers") + minReplicaCount = flag.Int("min-replica-count", 0, "Minimum number or replicas that a replica set or replication controller should have to allow their pods deletion in scale down") + nodeDeleteDelayAfterTaint = flag.Duration("node-delete-delay-after-taint", 5*time.Second, "How long to wait before deleting a node after tainting it") + scaleDownSimulationTimeout = flag.Duration("scale-down-simulation-timeout", 30*time.Second, "How long should we run scale down simulation.") + parallelDrain = flag.Bool("parallel-drain", true, "Whether to allow parallel drain of nodes. This flag is deprecated and will be removed in future releases.") + maxCapacityMemoryDifferenceRatio = flag.Float64("memory-difference-ratio", config.DefaultMaxCapacityMemoryDifferenceRatio, "Maximum difference in memory capacity between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's memory capacity.") + maxFreeDifferenceRatio = flag.Float64("max-free-difference-ratio", config.DefaultMaxFreeDifferenceRatio, "Maximum difference in free resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's free resource.") + maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.") + forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.") + dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", true, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server") ) func isFlagPassed(name string) bool { @@ -379,6 +380,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { MaxAllocatableDifferenceRatio: *maxAllocatableDifferenceRatio, MaxFreeDifferenceRatio: *maxFreeDifferenceRatio, }, + DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled, } }