diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index ae1bc09b673e..3282b0bb64ea 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -274,4 +274,7 @@ type AutoscalingOptions struct { ParallelDrain bool // NodeGroupSetRatio is a collection of ratios used by CA used to make scaling decisions. NodeGroupSetRatios NodeGroupDifferenceRatios + // dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint + // based on the latency between the CA and the api-server + DynamicNodeDeleteDelayAfterTaintEnabled bool } diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index db9e239b17c1..3b8fd7868337 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -51,8 +51,9 @@ type Actuator struct { // TODO: Move budget processor to scaledown planner, potentially merge into PostFilteringScaleDownNodeProcessor // This is a larger change to the code structure which impacts some existing actuator unit tests // as well as Cluster Autoscaler implementations that may override ScaleDownSetProcessor - budgetProcessor *budgets.ScaleDownBudgetProcessor - configGetter actuatorNodeGroupConfigGetter + budgetProcessor *budgets.ScaleDownBudgetProcessor + configGetter actuatorNodeGroupConfigGetter + nodeDeleteDelayAfterTaint time.Duration } // actuatorNodeGroupConfigGetter is an interface to limit the functions that can be used @@ -66,13 +67,14 @@ type actuatorNodeGroupConfigGetter interface { func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions, configGetter actuatorNodeGroupConfigGetter) *Actuator { ndb := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval) return &Actuator{ - ctx: ctx, - clusterState: csr, - nodeDeletionTracker: ndt, - nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)), - budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx), - deleteOptions: deleteOptions, - configGetter: configGetter, + ctx: ctx, + clusterState: csr, + nodeDeletionTracker: ndt, + nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)), + budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx), + deleteOptions: deleteOptions, + configGetter: configGetter, + nodeDeleteDelayAfterTaint: ctx.NodeDeleteDelayAfterTaint, } } @@ -158,8 +160,16 @@ func (a *Actuator) deleteAsyncEmpty(NodeGroupViews []*budgets.NodeGroupView) (re // applied taints are cleaned up. func (a *Actuator) taintNodesSync(NodeGroupViews []*budgets.NodeGroupView) errors.AutoscalerError { var taintedNodes []*apiv1.Node + var updateLatencyTracker *UpdateLatencyTracker + if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled { + updateLatencyTracker = NewUpdateLatencyTracker(a.ctx.AutoscalingKubeClients.ListerRegistry.AllNodeLister(), time.Now) + go updateLatencyTracker.Start() + } for _, bucket := range NodeGroupViews { for _, node := range bucket.Nodes { + if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled { + updateLatencyTracker.StartTimeChan <- NodeTaintStartTime{node.Name, time.Now()} + } err := a.taintNode(node) if err != nil { a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err) @@ -167,11 +177,24 @@ func (a *Actuator) taintNodesSync(NodeGroupViews []*budgets.NodeGroupView) error for _, taintedNode := range taintedNodes { _, _ = taints.CleanToBeDeleted(taintedNode, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate) } + if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled { + close(updateLatencyTracker.AwaitOrStopChan) + } return errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", node) } taintedNodes = append(taintedNodes, node) } } + if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled { + updateLatencyTracker.AwaitOrStopChan <- true + latency, ok := <-updateLatencyTracker.ResultChan + if ok { + // CA is expected to wait 3 times the round-trip time between CA and the api-server. + // Therefore, the nodeDeleteDelayAfterTaint is set 2 times the latency. + // A delay of one round trip time is implicitly there when measuring the latency. + a.nodeDeleteDelayAfterTaint = 2 * latency + } + } return nil } @@ -207,9 +230,9 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider return } - if a.ctx.NodeDeleteDelayAfterTaint > time.Duration(0) { - klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.ctx.NodeDeleteDelayAfterTaint) - time.Sleep(a.ctx.NodeDeleteDelayAfterTaint) + if a.nodeDeleteDelayAfterTaint > time.Duration(0) { + klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.nodeDeleteDelayAfterTaint) + time.Sleep(a.nodeDeleteDelayAfterTaint) } clusterSnapshot, err := a.createSnapshot(nodes) diff --git a/cluster-autoscaler/core/scaledown/actuation/update_latency_tracker.go b/cluster-autoscaler/core/scaledown/actuation/update_latency_tracker.go new file mode 100644 index 000000000000..7d1e731fc92f --- /dev/null +++ b/cluster-autoscaler/core/scaledown/actuation/update_latency_tracker.go @@ -0,0 +1,116 @@ +package actuation + +import ( + "time" + + "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" + "k8s.io/klog/v2" +) + +const sleepDurationWhenPolling = 50 * time.Millisecond +const waitForTaintingTimeoutDuration = 30 * time.Second + +type NodeTaintStartTime struct { + nodeName string + startTime time.Time +} + +// UpdateLatencyTracker can be used to calculate round-trip time between CA and api-server +// when adding ToBeDeletedTaint to nodes +type UpdateLatencyTracker struct { + startTimestamp map[string]time.Time + finishTimestamp map[string]time.Time + remainingNodeCount int + nodeLister kubernetes.NodeLister + StartTimeChan chan NodeTaintStartTime + // Passing a bool will wait for all the started nodes to get tainted and calculate + // latency based on latencies observed. (If all the nodes did not get tained within + // waitForTaintingTimeoutDuration after passing a bool, latency calculation will be + // aborted and the ResultChan will be closed without returning a value) Closing the + // AwaitOrStopChan without passing any bool will abort the latency calculation. + AwaitOrStopChan chan bool + ResultChan chan time.Duration + // now is used only to make the testing easier + now func() time.Time +} + +func NewUpdateLatencyTracker(nodeLister kubernetes.NodeLister, now func() time.Time) *UpdateLatencyTracker { + return &UpdateLatencyTracker{ + startTimestamp: map[string]time.Time{}, + finishTimestamp: map[string]time.Time{}, + remainingNodeCount: 0, + nodeLister: nodeLister, + StartTimeChan: make(chan NodeTaintStartTime), + AwaitOrStopChan: make(chan bool), + ResultChan: make(chan time.Duration), + now: now, + } +} + +func (u *UpdateLatencyTracker) Start() { + for { + select { + case _, ok := <-u.AwaitOrStopChan: + if ok { + u.await() + } + return + case ntst := <-u.StartTimeChan: + u.startTimestamp[ntst.nodeName] = ntst.startTime + u.remainingNodeCount += 1 + continue + default: + } + u.updateFinishTime() + time.Sleep(sleepDurationWhenPolling) + } +} + +func (u *UpdateLatencyTracker) updateFinishTime() { + for nodeName, _ := range u.startTimestamp { + if _, ok := u.finishTimestamp[nodeName]; ok { + continue + } + node, err := u.nodeLister.Get(nodeName) + if err != nil { + klog.Errorf("Error getting node: %v", err) + continue + } + if taints.HasToBeDeletedTaint(node) { + u.finishTimestamp[node.Name] = u.now() + u.remainingNodeCount -= 1 + } + } +} + +func (u *UpdateLatencyTracker) calculateLatency() time.Duration { + var maxLatency time.Duration = 0 + for node, startTime := range u.startTimestamp { + endTime, _ := u.finishTimestamp[node] + currentLatency := endTime.Sub(startTime) + if currentLatency > maxLatency { + maxLatency = currentLatency + } + } + return maxLatency +} + +func (u *UpdateLatencyTracker) await() { + waitingForTaintingStartTime := time.Now() + for { + switch { + case u.remainingNodeCount == 0: + latency := u.calculateLatency() + u.ResultChan <- latency + return + case time.Now().After(waitingForTaintingStartTime.Add(waitForTaintingTimeoutDuration)): + klog.Errorf("Timeout before tainting all nodes, latency measurement will be stale") + close(u.ResultChan) + return + default: + time.Sleep(sleepDurationWhenPolling) + u.updateFinishTime() + } + } +} diff --git a/cluster-autoscaler/core/scaledown/actuation/update_latency_tracker_test.go b/cluster-autoscaler/core/scaledown/actuation/update_latency_tracker_test.go new file mode 100644 index 000000000000..dc6141bca35a --- /dev/null +++ b/cluster-autoscaler/core/scaledown/actuation/update_latency_tracker_test.go @@ -0,0 +1,112 @@ +package actuation + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" + "k8s.io/autoscaler/cluster-autoscaler/utils/test" +) + +// mockClock is used to mock time.Now() when testing UpdateLatencyTracker +// For the n th call to Now() it will return a timestamp after duration[n] to +// the startTime if n < the length of durations. Otherwise, it will return current time. +type mockClock struct { + startTime time.Time + durations []time.Duration + index int + mutex sync.Mutex +} + +func NewMockClock(startTime time.Time, durations []time.Duration) mockClock { + return mockClock{ + startTime: startTime, + durations: durations, + index: 0, + } +} + +func (m *mockClock) Now() time.Time { + m.mutex.Lock() + defer m.mutex.Unlock() + var timeToSend time.Time + if m.index < len(m.durations) { + timeToSend = m.startTime.Add(m.durations[m.index]) + } else { + timeToSend = time.Now() + } + m.index += 1 + return timeToSend +} + +func (m *mockClock) getIndex() int { + m.mutex.Lock() + defer m.mutex.Unlock() + return m.index +} + +func TestUpdateLatencyCalculation(t *testing.T) { + toBeDeletedTaint := apiv1.Taint{Key: taints.ToBeDeletedTaint, Effect: apiv1.TaintEffectNoSchedule} + n1 := test.BuildTestNode("n1", 100, 100) + n1.Spec.Taints = append(n1.Spec.Taints, toBeDeletedTaint) + n2 := test.BuildTestNode("n2", 100, 100) + n2.Spec.Taints = append(n2.Spec.Taints, toBeDeletedTaint) + n3 := test.BuildTestNode("n3", 100, 100) + + testCases := []struct { + description string + startTime time.Time + nodes []*apiv1.Node + durations []time.Duration + wantLatency time.Duration + wantResultChanNotClosed bool + }{ + { + description: "latency when tainting a single node", + startTime: time.Now(), + nodes: []*apiv1.Node{n1}, + durations: []time.Duration{100 * time.Millisecond}, + wantLatency: 100 * time.Millisecond, + wantResultChanNotClosed: true, + }, + { + description: "latency when tainting multiple nodes", + startTime: time.Now(), + nodes: []*apiv1.Node{n1, n2}, + durations: []time.Duration{100 * time.Millisecond, 150 * time.Millisecond}, + wantLatency: 150 * time.Millisecond, + wantResultChanNotClosed: true, + }, + { + description: "Some nodes fails to taint before timeout", + startTime: time.Now(), + nodes: []*apiv1.Node{n1, n3}, + durations: []time.Duration{100 * time.Millisecond, 150 * time.Millisecond}, + wantResultChanNotClosed: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + assert.Equal(t, len(tc.nodes), len(tc.durations)) + mc := NewMockClock(tc.startTime, tc.durations) + nodeLister := kubernetes.NewTestNodeLister(tc.nodes) + updateLatencyTracker := NewUpdateLatencyTracker(nodeLister, mc.Now) + go updateLatencyTracker.Start() + for _, node := range tc.nodes { + updateLatencyTracker.StartTimeChan <- NodeTaintStartTime{node.Name, tc.startTime} + } + updateLatencyTracker.AwaitOrStopChan <- true + latency, ok := <-updateLatencyTracker.ResultChan + assert.Equal(t, tc.wantResultChanNotClosed, ok) + if ok { + assert.Equal(t, tc.wantLatency, latency) + assert.Equal(t, len(tc.durations), mc.getIndex()) + } + }) + } +} diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 5005a0ba4b32..2a3e30cd8287 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -218,22 +218,23 @@ var ( "maxNodeGroupBackoffDuration is the maximum backoff duration for a NodeGroup after new nodes failed to start.") nodeGroupBackoffResetTimeout = flag.Duration("node-group-backoff-reset-timeout", 3*time.Hour, "nodeGroupBackoffResetTimeout is the time after last failed scale-up when the backoff duration is reset.") - maxScaleDownParallelismFlag = flag.Int("max-scale-down-parallelism", 10, "Maximum number of nodes (both empty and needing drain) that can be deleted in parallel.") - maxDrainParallelismFlag = flag.Int("max-drain-parallelism", 1, "Maximum number of nodes needing drain, that can be drained and deleted in parallel.") - recordDuplicatedEvents = flag.Bool("record-duplicated-events", false, "enable duplication of similar events within a 5 minute window.") - maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.") - maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.") - skipNodesWithSystemPods = flag.Bool("skip-nodes-with-system-pods", true, "If true cluster autoscaler will never delete nodes with pods from kube-system (except for DaemonSet or mirror pods)") - skipNodesWithLocalStorage = flag.Bool("skip-nodes-with-local-storage", true, "If true cluster autoscaler will never delete nodes with pods with local storage, e.g. EmptyDir or HostPath") - skipNodesWithCustomControllerPods = flag.Bool("skip-nodes-with-custom-controller-pods", true, "If true cluster autoscaler will never delete nodes with pods owned by custom controllers") - minReplicaCount = flag.Int("min-replica-count", 0, "Minimum number or replicas that a replica set or replication controller should have to allow their pods deletion in scale down") - nodeDeleteDelayAfterTaint = flag.Duration("node-delete-delay-after-taint", 5*time.Second, "How long to wait before deleting a node after tainting it") - scaleDownSimulationTimeout = flag.Duration("scale-down-simulation-timeout", 30*time.Second, "How long should we run scale down simulation.") - parallelDrain = flag.Bool("parallel-drain", true, "Whether to allow parallel drain of nodes. This flag is deprecated and will be removed in future releases.") - maxCapacityMemoryDifferenceRatio = flag.Float64("memory-difference-ratio", config.DefaultMaxCapacityMemoryDifferenceRatio, "Maximum difference in memory capacity between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's memory capacity.") - maxFreeDifferenceRatio = flag.Float64("max-free-difference-ratio", config.DefaultMaxFreeDifferenceRatio, "Maximum difference in free resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's free resource.") - maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.") - forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.") + maxScaleDownParallelismFlag = flag.Int("max-scale-down-parallelism", 10, "Maximum number of nodes (both empty and needing drain) that can be deleted in parallel.") + maxDrainParallelismFlag = flag.Int("max-drain-parallelism", 1, "Maximum number of nodes needing drain, that can be drained and deleted in parallel.") + recordDuplicatedEvents = flag.Bool("record-duplicated-events", false, "enable duplication of similar events within a 5 minute window.") + maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.") + maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.") + skipNodesWithSystemPods = flag.Bool("skip-nodes-with-system-pods", true, "If true cluster autoscaler will never delete nodes with pods from kube-system (except for DaemonSet or mirror pods)") + skipNodesWithLocalStorage = flag.Bool("skip-nodes-with-local-storage", true, "If true cluster autoscaler will never delete nodes with pods with local storage, e.g. EmptyDir or HostPath") + skipNodesWithCustomControllerPods = flag.Bool("skip-nodes-with-custom-controller-pods", true, "If true cluster autoscaler will never delete nodes with pods owned by custom controllers") + minReplicaCount = flag.Int("min-replica-count", 0, "Minimum number or replicas that a replica set or replication controller should have to allow their pods deletion in scale down") + nodeDeleteDelayAfterTaint = flag.Duration("node-delete-delay-after-taint", 5*time.Second, "How long to wait before deleting a node after tainting it") + scaleDownSimulationTimeout = flag.Duration("scale-down-simulation-timeout", 30*time.Second, "How long should we run scale down simulation.") + parallelDrain = flag.Bool("parallel-drain", true, "Whether to allow parallel drain of nodes. This flag is deprecated and will be removed in future releases.") + maxCapacityMemoryDifferenceRatio = flag.Float64("memory-difference-ratio", config.DefaultMaxCapacityMemoryDifferenceRatio, "Maximum difference in memory capacity between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's memory capacity.") + maxFreeDifferenceRatio = flag.Float64("max-free-difference-ratio", config.DefaultMaxFreeDifferenceRatio, "Maximum difference in free resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's free resource.") + maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.") + forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.") + dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", true, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server") ) func isFlagPassed(name string) bool { @@ -379,6 +380,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { MaxAllocatableDifferenceRatio: *maxAllocatableDifferenceRatio, MaxFreeDifferenceRatio: *maxFreeDifferenceRatio, }, + DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled, } }