Skip to content

Commit

Permalink
fix race condition between ca and scheduler
Browse files Browse the repository at this point in the history
Implement dynamically adjustment of NodeDeleteDelayAfterTaint based on round trip time between ca and apiserver
  • Loading branch information
damikag committed Sep 8, 2023
1 parent 893a51b commit e3bb2ce
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 28 deletions.
3 changes: 3 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,7 @@ type AutoscalingOptions struct {
ParallelDrain bool
// NodeGroupSetRatio is a collection of ratios used by CA used to make scaling decisions.
NodeGroupSetRatios NodeGroupDifferenceRatios
// dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint
// based on the latency between the CA and the api-server
DynamicNodeDeleteDelayAfterTaintEnabled bool
}
47 changes: 35 additions & 12 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ type Actuator struct {
// TODO: Move budget processor to scaledown planner, potentially merge into PostFilteringScaleDownNodeProcessor
// This is a larger change to the code structure which impacts some existing actuator unit tests
// as well as Cluster Autoscaler implementations that may override ScaleDownSetProcessor
budgetProcessor *budgets.ScaleDownBudgetProcessor
configGetter actuatorNodeGroupConfigGetter
budgetProcessor *budgets.ScaleDownBudgetProcessor
configGetter actuatorNodeGroupConfigGetter
nodeDeleteDelayAfterTaint time.Duration
}

// actuatorNodeGroupConfigGetter is an interface to limit the functions that can be used
Expand All @@ -66,13 +67,14 @@ type actuatorNodeGroupConfigGetter interface {
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions, configGetter actuatorNodeGroupConfigGetter) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval)
return &Actuator{
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
deleteOptions: deleteOptions,
configGetter: configGetter,
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
deleteOptions: deleteOptions,
configGetter: configGetter,
nodeDeleteDelayAfterTaint: ctx.NodeDeleteDelayAfterTaint,
}
}

Expand Down Expand Up @@ -158,20 +160,41 @@ func (a *Actuator) deleteAsyncEmpty(NodeGroupViews []*budgets.NodeGroupView) (re
// applied taints are cleaned up.
func (a *Actuator) taintNodesSync(NodeGroupViews []*budgets.NodeGroupView) errors.AutoscalerError {
var taintedNodes []*apiv1.Node
var updateLatencyTracker *UpdateLatencyTracker
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker = NewUpdateLatencyTracker(a.ctx.AutoscalingKubeClients.ListerRegistry.AllNodeLister(), 50*time.Millisecond, time.Now)
go updateLatencyTracker.Start()
}
for _, bucket := range NodeGroupViews {
for _, node := range bucket.Nodes {
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker.StartTimeChan <- NodeTaintStartTime{node.Name, time.Now()}
}
err := a.taintNode(node)
if err != nil {
a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
// Clean up already applied taints in case of issues.
for _, taintedNode := range taintedNodes {
_, _ = taints.CleanToBeDeleted(taintedNode, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
}
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
close(updateLatencyTracker.AwaitOrStopChan)
}
return errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", node)
}
taintedNodes = append(taintedNodes, node)
}
}
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker.AwaitOrStopChan <- true
latency, ok := <-updateLatencyTracker.ResultChan
if ok {
// CA is expected to wait 3 times the round-trip time between CA and the api-server.
// Therefore, the nodeDeleteDelayAfterTaint is set 2 times the latency.
// A delay of one round trip time is implicitly there when measuring the latency.
a.nodeDeleteDelayAfterTaint = 2 * latency
}
}
return nil
}

Expand Down Expand Up @@ -207,9 +230,9 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
return
}

if a.ctx.NodeDeleteDelayAfterTaint > time.Duration(0) {
klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.ctx.NodeDeleteDelayAfterTaint)
time.Sleep(a.ctx.NodeDeleteDelayAfterTaint)
if a.nodeDeleteDelayAfterTaint > time.Duration(0) {
klog.V(0).Infof("Scale-down: waiting %v before trying to delete nodes", a.nodeDeleteDelayAfterTaint)
time.Sleep(a.nodeDeleteDelayAfterTaint)
}

clusterSnapshot, err := a.createSnapshot(nodes)
Expand Down
117 changes: 117 additions & 0 deletions cluster-autoscaler/core/scaledown/actuation/update_latency_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package actuation

import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/klog/v2"
)

const waitForTaintingTimeoutDuration = 30 * time.Second

type NodeTaintStartTime struct {
nodeName string
startTime time.Time
}

// UpdateLatencyTracker can be used to calculate round-trip time between CA and api-server
// when adding ToBeDeletedTaint to nodes
type UpdateLatencyTracker struct {
startTimestamp map[string]time.Time
finishTimestamp map[string]time.Time
remainingNodeCount int
nodeLister kubernetes.NodeLister
StartTimeChan chan NodeTaintStartTime
sleepDurationWhenPolling time.Duration
// Passing a bool will wait for all the started nodes to get tainted and calculate
// latency based on latencies observed. (If all the nodes did not get tained within
// waitForTaintingTimeoutDuration after passing a bool, latency calculation will be
// aborted and the ResultChan will be closed without returning a value) Closing the
// AwaitOrStopChan without passing any bool will abort the latency calculation.
AwaitOrStopChan chan bool
ResultChan chan time.Duration
// now is used only to make the testing easier
now func() time.Time
}

func NewUpdateLatencyTracker(nodeLister kubernetes.NodeLister, sleepDurationWhenPolling time.Duration, now func() time.Time) *UpdateLatencyTracker {
return &UpdateLatencyTracker{
startTimestamp: map[string]time.Time{},
finishTimestamp: map[string]time.Time{},
remainingNodeCount: 0,
nodeLister: nodeLister,
StartTimeChan: make(chan NodeTaintStartTime),
sleepDurationWhenPolling: sleepDurationWhenPolling,
AwaitOrStopChan: make(chan bool),
ResultChan: make(chan time.Duration),
now: now,
}
}

func (u *UpdateLatencyTracker) Start() {
for {
select {
case _, ok := <-u.AwaitOrStopChan:
if ok {
u.await()
}
return
case ntst := <-u.StartTimeChan:
u.startTimestamp[ntst.nodeName] = ntst.startTime
u.remainingNodeCount += 1
continue
default:
}
u.updateFinishTime()
time.Sleep(u.sleepDurationWhenPolling)
}
}

func (u *UpdateLatencyTracker) updateFinishTime() {
for nodeName := range u.startTimestamp {
if _, ok := u.finishTimestamp[nodeName]; ok {
continue
}
node, err := u.nodeLister.Get(nodeName)
if err != nil {
klog.Errorf("Error getting node: %v", err)
continue
}
if taints.HasToBeDeletedTaint(node) {
u.finishTimestamp[node.Name] = u.now()
u.remainingNodeCount -= 1
}
}
}

func (u *UpdateLatencyTracker) calculateLatency() time.Duration {
var maxLatency time.Duration = 0
for node, startTime := range u.startTimestamp {
endTime, _ := u.finishTimestamp[node]
currentLatency := endTime.Sub(startTime)
if currentLatency > maxLatency {
maxLatency = currentLatency
}
}
return maxLatency
}

func (u *UpdateLatencyTracker) await() {
waitingForTaintingStartTime := time.Now()
for {
switch {
case u.remainingNodeCount == 0:
latency := u.calculateLatency()
u.ResultChan <- latency
return
case time.Now().After(waitingForTaintingStartTime.Add(waitForTaintingTimeoutDuration)):
klog.Errorf("Timeout before tainting all nodes, latency measurement will be stale")
close(u.ResultChan)
return
default:
time.Sleep(u.sleepDurationWhenPolling)
u.updateFinishTime()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package actuation

import (
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
)

// mockClock is used to mock time.Now() when testing UpdateLatencyTracker
// For the n th call to Now() it will return a timestamp after duration[n] to
// the startTime if n < the length of durations. Otherwise, it will return current time.
type mockClock struct {
startTime time.Time
durations []time.Duration
index int
mutex sync.Mutex
}

// Returns a new NewMockClock object
func NewMockClock(startTime time.Time, durations []time.Duration) mockClock {
return mockClock{
startTime: startTime,
durations: durations,
index: 0,
}
}

// Returns a time after Nth duration from the start time if N < length of durations.
// Otherwise, returns the current time
func (m *mockClock) Now() time.Time {
m.mutex.Lock()
defer m.mutex.Unlock()
var timeToSend time.Time
if m.index < len(m.durations) {
timeToSend = m.startTime.Add(m.durations[m.index])
} else {
timeToSend = time.Now()
}
m.index += 1
return timeToSend
}

// Returns the number of times that the Now function was called
func (m *mockClock) getIndex() int {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.index
}

// TestCustomNodeLister can be used to mock nodeLister Get call when testing delayed tainting
type TestCustomNodeLister struct {
nodes map[string]*apiv1.Node
getCallCount map[string]int
nodeTaintAfterNthGetCall map[string]int
}

// List returns all nodes in test lister.
func (l *TestCustomNodeLister) List() ([]*apiv1.Node, error) {
var nodes []*apiv1.Node
for _, node := range l.nodes {
nodes = append(nodes, node)
}
return nodes, nil
}

// Get returns node from test lister. Add ToBeDeletedTaint to the node
// during the N th call specified in the nodeTaintAfterNthGetCall
func (l *TestCustomNodeLister) Get(name string) (*apiv1.Node, error) {
for _, node := range l.nodes {
l.getCallCount[node.Name] += 1
if node.Name == name {
if l.getCallCount[node.Name] == l.nodeTaintAfterNthGetCall[node.Name] {
toBeDeletedTaint := apiv1.Taint{Key: taints.ToBeDeletedTaint, Effect: apiv1.TaintEffectNoSchedule}
node.Spec.Taints = append(node.Spec.Taints, toBeDeletedTaint)
}
return node, nil
}
}
return nil, fmt.Errorf("Node %s not found", name)
}

// Return new TestCustomNodeLister object
func NewTestCustomNodeLister(nodes map[string]*apiv1.Node, nodeTaintAfterNthGetCall map[string]int) *TestCustomNodeLister {
getCallCounts := map[string]int{}
for name := range nodes {
getCallCounts[name] = 0
}
return &TestCustomNodeLister{
nodes: nodes,
getCallCount: getCallCounts,
nodeTaintAfterNthGetCall: nodeTaintAfterNthGetCall,
}
}

func TestUpdateLatencyCalculation(t *testing.T) {

testCases := []struct {
description string
startTime time.Time
// N < 1 result in node will not get tainted before timeout
nodeTaintAfterNthGetCall map[string]int
durations []time.Duration
wantLatency time.Duration
wantResultChanOpen bool
}{
{
description: "latency when tainting a single node - node is tainted in the first call to the lister",
startTime: time.Now(),
nodeTaintAfterNthGetCall: map[string]int{"n1": 1},
durations: []time.Duration{100 * time.Millisecond},
wantLatency: 100 * time.Millisecond,
wantResultChanOpen: true,
},
{
description: "latency when tainting a single node - node is not tainted in the first call to the lister",
startTime: time.Now(),
nodeTaintAfterNthGetCall: map[string]int{"n1": 3},
durations: []time.Duration{100 * time.Millisecond},
wantLatency: 100 * time.Millisecond,
wantResultChanOpen: true,
},
{
description: "latency when tainting multiple nodes - nodes are tainted in the first calls to the lister",
startTime: time.Now(),
nodeTaintAfterNthGetCall: map[string]int{"n1": 1, "n2": 1},
durations: []time.Duration{100 * time.Millisecond, 150 * time.Millisecond},
wantLatency: 150 * time.Millisecond,
wantResultChanOpen: true,
},
{
description: "latency when tainting multiple nodes - nodes are not tainted in the first calls to the lister",
startTime: time.Now(),
nodeTaintAfterNthGetCall: map[string]int{"n1": 3, "n2": 5},
durations: []time.Duration{100 * time.Millisecond, 150 * time.Millisecond},
wantLatency: 150 * time.Millisecond,
wantResultChanOpen: true,
},
{
description: "Some nodes fails to taint before timeout",
startTime: time.Now(),
nodeTaintAfterNthGetCall: map[string]int{"n1": 1, "n3": -1},
durations: []time.Duration{100 * time.Millisecond, 150 * time.Millisecond},
wantResultChanOpen: false,
},
}

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
mc := NewMockClock(tc.startTime, tc.durations)
nodes := map[string]*apiv1.Node{}
for name := range tc.nodeTaintAfterNthGetCall {
node := test.BuildTestNode(name, 100, 100)
nodes[name] = node
}
nodeLister := NewTestCustomNodeLister(nodes, tc.nodeTaintAfterNthGetCall)
updateLatencyTracker := NewUpdateLatencyTracker(nodeLister, time.Millisecond, mc.Now)
go updateLatencyTracker.Start()
for _, node := range nodes {
updateLatencyTracker.StartTimeChan <- NodeTaintStartTime{node.Name, tc.startTime}
}
updateLatencyTracker.AwaitOrStopChan <- true
latency, ok := <-updateLatencyTracker.ResultChan
assert.Equal(t, tc.wantResultChanOpen, ok)
if ok {
assert.Equal(t, tc.wantLatency, latency)
assert.Equal(t, len(tc.durations), mc.getIndex())
}
})
}
}
Loading

0 comments on commit e3bb2ce

Please sign in to comment.