Skip to content

Commit

Permalink
fix race condition between ca and scheduler
Browse files Browse the repository at this point in the history
Implement dynamically adjustment of NodeDeleteDelayAfterTaint based on round trip time between ca and apiserver
  • Loading branch information
damikag committed Aug 30, 2023
1 parent 893a51b commit e472bb1
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 0 deletions.
16 changes: 16 additions & 0 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package actuation

import (
"fmt"
"strings"
"time"

Expand Down Expand Up @@ -159,18 +160,33 @@ func (a *Actuator) deleteAsyncEmpty(NodeGroupViews []*budgets.NodeGroupView) (re
func (a *Actuator) taintNodesSync(NodeGroupViews []*budgets.NodeGroupView) errors.AutoscalerError {
var taintedNodes []*apiv1.Node
for _, bucket := range NodeGroupViews {
startTimestamp := map[string]time.Time{}
updateLatencyTracker := NewUpdateLatencyTracker(startTimestamp, a.ctx.AutoscalingKubeClients.ListerRegistry.AllNodeLister())
stopChan := make(chan struct{})
resultChan := make(chan int64)
go updateLatencyTracker.Start(stopChan, resultChan)
for _, node := range bucket.Nodes {
startTimestamp[node.Name] = time.Now()
err := a.taintNode(node)
if err != nil {
a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
// Clean up already applied taints in case of issues.
for _, taintedNode := range taintedNodes {
_, _ = taints.CleanToBeDeleted(taintedNode, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
}
close(stopChan)
return errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", node)
}
taintedNodes = append(taintedNodes, node)
}
close(stopChan)
latency := <-resultChan
latencyDuration, err := time.ParseDuration(fmt.Sprintf("%vms", latency))
if err != nil {
klog.Errorf("Error parsing latency: ", err)
continue
}
a.ctx.NodeDeleteDelayAfterTaint = 2 * latencyDuration
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package actuation

import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/klog/v2"
)

type UpdateLatencyTracker struct {
startTimestamp map[string]time.Time
finishTimestamp map[string]time.Time
nodeLister kubernetes.NodeLister
}

func NewUpdateLatencyTracker(startTimestamp map[string]time.Time, nodeLister kubernetes.NodeLister) *UpdateLatencyTracker {
return &UpdateLatencyTracker{
startTimestamp: startTimestamp,
finishTimestamp: map[string]time.Time{},
nodeLister: nodeLister,
}
}

func (u *UpdateLatencyTracker) Start(stopChan <-chan struct{}, resultChan chan int64) {
for {
select {
case <-stopChan:
var remainingNodes []string
for node, _ := range u.startTimestamp {
remainingNodes = append(remainingNodes, node)
}
for remainingNodes != nil {
time.Sleep(100 * time.Millisecond)
u.updateFinishTime()
remainingNodes = u.remainingNodesToFinish(remainingNodes)
}
latency := u.calculateLatency()
resultChan <- latency
return
default:
}
u.updateFinishTime()
time.Sleep(50 * time.Millisecond)
}
}

func (u *UpdateLatencyTracker) updateFinishTime() {
nodes, err := u.nodeLister.List()
if err != nil {
klog.Errorf("Error listing nodes: %v", err)
}
for _, node := range nodes {
if _, ok := u.finishTimestamp[node.Name]; !ok && taints.HasToBeDeletedTaint(node) {
u.finishTimestamp[node.Name] = time.Now()
}
}
}

func (u *UpdateLatencyTracker) remainingNodesToFinish(nodes []string) []string {
var remainingNodes []string
for _, node := range nodes {
if _, ok := u.finishTimestamp[node]; !ok {
remainingNodes = append(remainingNodes, node)
}
}
return remainingNodes
}

func (u *UpdateLatencyTracker) calculateLatency() int64 {
var sumOfDurations time.Duration = 0
var n int64 = 0
for node, startTime := range u.startTimestamp {
endTime, _ := u.finishTimestamp[node]
sumOfDurations += endTime.Sub(startTime)
n += 1
}
return sumOfDurations.Milliseconds() / n
}

0 comments on commit e472bb1

Please sign in to comment.