Skip to content

Commit

Permalink
Merge pull request #4828 from x13n/ndt
Browse files Browse the repository at this point in the history
Make NodeDeletionTracker implement ActuationStatus interface
  • Loading branch information
k8s-ci-robot authored Apr 28, 2022
2 parents 7fe1e45 + c550b77 commit 561a9da
Show file tree
Hide file tree
Showing 10 changed files with 337 additions and 128 deletions.
143 changes: 102 additions & 41 deletions cluster-autoscaler/core/scaledown/deletiontracker/nodedeletiontracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,92 +18,153 @@ package deletiontracker

import (
"sync"
"time"

"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/utils/expiring"

apiv1 "k8s.io/api/core/v1"
klog "k8s.io/klog/v2"
"k8s.io/utils/clock"
)

// NodeDeletionTracker keeps track of node deletions.
// TODO: extend to implement ActuationStatus interface
type NodeDeletionTracker struct {
sync.Mutex
nonEmptyNodeDeleteInProgress bool
// A map of node delete results by node name. It's being constantly emptied into ScaleDownStatus
// objects in order to notify the ScaleDownStatusProcessor that the node drain has ended or that
// an error occurred during the deletion process.
nodeDeleteResults map[string]status.NodeDeleteResult
// A map which keeps track of deletions in progress for nodepools.
// Key is a node group id and value is a number of node deletions in progress.
deletionsInProgress map[string]int
deletionsPerNodeGroup map[string]int
// This mapping contains node names of all empty nodes currently undergoing deletion.
emptyNodeDeletions map[string]bool
// This mapping contains node names of all nodes currently undergoing drain and deletion.
drainedNodeDeletions map[string]bool
// Clock for checking current time.
clock clock.PassiveClock
// Helper struct for tracking pod evictions.
evictions *expiring.List
// How long evictions are considered as recent.
evictionsTTL time.Duration
// Helper struct for tracking deletion results.
deletionResults *expiring.List
}

type deletionResult struct {
nodeName string
result status.NodeDeleteResult
}

// NewNodeDeletionTracker creates new NodeDeletionTracker.
func NewNodeDeletionTracker() *NodeDeletionTracker {
func NewNodeDeletionTracker(podEvictionsTTL time.Duration) *NodeDeletionTracker {
return &NodeDeletionTracker{
nodeDeleteResults: make(map[string]status.NodeDeleteResult),
deletionsInProgress: make(map[string]int),
deletionsPerNodeGroup: make(map[string]int),
emptyNodeDeletions: make(map[string]bool),
drainedNodeDeletions: make(map[string]bool),
clock: clock.RealClock{},
evictions: expiring.NewList(),
evictionsTTL: podEvictionsTTL,
deletionResults: expiring.NewList(),
}
}

// IsNonEmptyNodeDeleteInProgress returns true if a non empty node is being deleted.
func (n *NodeDeletionTracker) IsNonEmptyNodeDeleteInProgress() bool {
n.Lock()
defer n.Unlock()
return n.nonEmptyNodeDeleteInProgress
}

// SetNonEmptyNodeDeleteInProgress sets non empty node deletion in progress status.
func (n *NodeDeletionTracker) SetNonEmptyNodeDeleteInProgress(status bool) {
// StartDeletion increments node deletion in progress counter for the given nodegroup.
func (n *NodeDeletionTracker) StartDeletion(nodeGroupId, nodeName string) {
n.Lock()
defer n.Unlock()
n.nonEmptyNodeDeleteInProgress = status
n.deletionsPerNodeGroup[nodeGroupId]++
n.emptyNodeDeletions[nodeName] = true
}

// StartDeletion increments node deletion in progress counter for the given nodegroup.
func (n *NodeDeletionTracker) StartDeletion(nodeGroupId string) {
// StartDeletionWithDrain is equivalent to StartDeletion, but for counting nodes that are drained first.
func (n *NodeDeletionTracker) StartDeletionWithDrain(nodeGroupId, nodeName string) {
n.Lock()
defer n.Unlock()
n.deletionsInProgress[nodeGroupId]++
n.deletionsPerNodeGroup[nodeGroupId]++
n.drainedNodeDeletions[nodeName] = true
}

// EndDeletion decrements node deletion in progress counter for the given nodegroup.
func (n *NodeDeletionTracker) EndDeletion(nodeGroupId string) {
func (n *NodeDeletionTracker) EndDeletion(nodeGroupId, nodeName string, result status.NodeDeleteResult) {
n.Lock()
defer n.Unlock()

value, found := n.deletionsInProgress[nodeGroupId]
n.deletionResults.RegisterElement(&deletionResult{nodeName, result})
value, found := n.deletionsPerNodeGroup[nodeGroupId]
if !found {
klog.Errorf("This should never happen, counter for %s in DelayedNodeDeletionStatus wasn't found", nodeGroupId)
klog.Errorf("This should never happen, counter for %s in NodeDeletionTracker wasn't found", nodeGroupId)
return
}
if value <= 0 {
klog.Errorf("This should never happen, counter for %s in DelayedNodeDeletionStatus isn't greater than 0, counter value is %d", nodeGroupId, value)
klog.Errorf("This should never happen, counter for %s in NodeDeletionTracker isn't greater than 0, counter value is %d", nodeGroupId, value)
}
n.deletionsPerNodeGroup[nodeGroupId]--
if n.deletionsPerNodeGroup[nodeGroupId] <= 0 {
delete(n.deletionsPerNodeGroup, nodeGroupId)
}
delete(n.emptyNodeDeletions, nodeName)
delete(n.drainedNodeDeletions, nodeName)
}

// DeletionsInProgress returns a list of all node names currently undergoing deletion.
func (n *NodeDeletionTracker) DeletionsInProgress() ([]string, []string) {
n.Lock()
defer n.Unlock()
return mapKeysSlice(n.emptyNodeDeletions), mapKeysSlice(n.drainedNodeDeletions)
}

func mapKeysSlice(m map[string]bool) []string {
s := make([]string, len(m))
i := 0
for k := range m {
s[i] = k
i++
}
n.deletionsInProgress[nodeGroupId]--
if n.deletionsInProgress[nodeGroupId] <= 0 {
delete(n.deletionsInProgress, nodeGroupId)
return s
}

// RegisterEviction stores information about a pod that was recently evicted.
func (n *NodeDeletionTracker) RegisterEviction(pod *apiv1.Pod) {
n.Lock()
defer n.Unlock()
n.evictions.RegisterElement(pod)
}

// RecentEvictions returns a list of pods that were recently evicted by Cluster Autoscaler.
func (n *NodeDeletionTracker) RecentEvictions() []*apiv1.Pod {
n.Lock()
defer n.Unlock()
n.evictions.DropNotNewerThan(n.clock.Now().Add(-n.evictionsTTL))
els := n.evictions.ToSlice()
pods := make([]*apiv1.Pod, 0, len(els))
for _, el := range els {
pods = append(pods, el.(*apiv1.Pod))
}
return pods
}

// GetDeletionsInProgress returns the number of deletions in progress for the given node group.
func (n *NodeDeletionTracker) GetDeletionsInProgress(nodeGroupId string) int {
// DeletionsCount returns the number of deletions in progress for the given node group.
func (n *NodeDeletionTracker) DeletionsCount(nodeGroupId string) int {
n.Lock()
defer n.Unlock()
return n.deletionsInProgress[nodeGroupId]
return n.deletionsPerNodeGroup[nodeGroupId]
}

// AddNodeDeleteResult adds a node delete result to the result map.
func (n *NodeDeletionTracker) AddNodeDeleteResult(nodeName string, result status.NodeDeleteResult) {
// DeletionResults returns deletion results in a map form, along with the timestamp of last result.
func (n *NodeDeletionTracker) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) {
n.Lock()
defer n.Unlock()
n.nodeDeleteResults[nodeName] = result
els, ts := n.deletionResults.ToSliceWithTimestamp()
drs := make(map[string]status.NodeDeleteResult)
for _, el := range els {
dr := el.(*deletionResult)
drs[dr.nodeName] = dr.result
}
return drs, ts
}

// GetAndClearNodeDeleteResults returns the whole result map and replaces it with a new empty one.
func (n *NodeDeletionTracker) GetAndClearNodeDeleteResults() map[string]status.NodeDeleteResult {
// ClearResultsNotNewerThan iterates over existing deletion results and keeps
// only the ones that are newer than the provided timestamp.
func (n *NodeDeletionTracker) ClearResultsNotNewerThan(t time.Time) {
n.Lock()
defer n.Unlock()
results := n.nodeDeleteResults
n.nodeDeleteResults = make(map[string]status.NodeDeleteResult)
return results
n.deletionResults.DropNotNewerThan(t)
}
38 changes: 13 additions & 25 deletions cluster-autoscaler/core/scaledown/legacy/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func NewScaleDown(context *context.AutoscalingContext, processors *processors.Au
nodeUtilizationMap: make(map[string]utilization.Info),
usageTracker: simulator.NewUsageTracker(),
unneededNodesList: make([]*apiv1.Node, 0),
nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(),
nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(0 * time.Second),
unremovableNodeReasons: make(map[string]*simulator.UnremovableNode),
}
}
Expand Down Expand Up @@ -637,11 +637,6 @@ func (sd *ScaleDown) UnremovableNodes() []*simulator.UnremovableNode {
return ns
}

// IsNonEmptyNodeDeleteInProgress returns true if any nodes are being deleted.
func (sd *ScaleDown) IsNonEmptyNodeDeleteInProgress() bool {
return sd.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress()
}

// markSimulationError indicates a simulation error by clearing relevant scale
// down state and returning an appropriate error.
func (sd *ScaleDown) markSimulationError(simulatorErr errors.AutoscalerError,
Expand Down Expand Up @@ -692,8 +687,8 @@ func (sd *ScaleDown) TryToScaleDown(
currentTime time.Time,
pdbs []*policyv1.PodDisruptionBudget,
) (*status.ScaleDownStatus, errors.AutoscalerError) {

scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: sd.nodeDeletionTracker.GetAndClearNodeDeleteResults()}
ndr, ts := sd.nodeDeletionTracker.DeletionResults()
scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: ndr, NodeDeleteResultsAsOf: ts}
nodeDeletionDuration := time.Duration(0)
findNodesToRemoveDuration := time.Duration(0)
defer updateScaleDownMetrics(time.Now(), &findNodesToRemoveDuration, &nodeDeletionDuration)
Expand Down Expand Up @@ -790,7 +785,7 @@ func (sd *ScaleDown) TryToScaleDown(
continue
}

deletionsInProgress := sd.nodeDeletionTracker.GetDeletionsInProgress(nodeGroup.Id())
deletionsInProgress := sd.nodeDeletionTracker.DeletionsCount(nodeGroup.Id())
if size-deletionsInProgress <= nodeGroup.MinSize() {
klog.V(1).Infof("Skipping %s - node group min size reached", node.Name)
sd.addUnremovableNodeReason(node, simulator.NodeGroupMinSizeReached)
Expand Down Expand Up @@ -890,19 +885,16 @@ func (sd *ScaleDown) TryToScaleDown(

// Starting deletion.
nodeDeletionDuration = time.Now().Sub(nodeDeletionStart)
sd.nodeDeletionTracker.SetNonEmptyNodeDeleteInProgress(true)
nodeGroup, found := candidateNodeGroups[toRemove.Node.Name]
if !found {
return scaleDownStatus, errors.NewAutoscalerError(errors.InternalError, "failed to find node group for %s", toRemove.Node.Name)
}
sd.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), toRemove.Node.Name)

go func() {
// Finishing the delete process once this goroutine is over.
var result status.NodeDeleteResult
defer func() { sd.nodeDeletionTracker.AddNodeDeleteResult(toRemove.Node.Name, result) }()
defer sd.nodeDeletionTracker.SetNonEmptyNodeDeleteInProgress(false)
nodeGroup, found := candidateNodeGroups[toRemove.Node.Name]
if !found {
result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: errors.NewAutoscalerError(
errors.InternalError, "failed to find node group for %s", toRemove.Node.Name)}
return
}
defer func() { sd.nodeDeletionTracker.EndDeletion(nodeGroup.Id(), toRemove.Node.Name, result) }()
result = sd.deleteNode(toRemove.Node, toRemove.PodsToReschedule, toRemove.DaemonSetPods, nodeGroup)
if result.ResultType != status.NodeDeleteOk {
klog.Errorf("Failed to delete %s: %v", toRemove.Node.Name, result.Err)
Expand Down Expand Up @@ -968,7 +960,7 @@ func (sd *ScaleDown) getEmptyNodesToRemove(candidates []string, resourcesLimits
klog.Errorf("Failed to get size for %s: %v ", nodeGroup.Id(), err)
continue
}
deletionsInProgress := sd.nodeDeletionTracker.GetDeletionsInProgress(nodeGroup.Id())
deletionsInProgress := sd.nodeDeletionTracker.DeletionsCount(nodeGroup.Id())
available = size - nodeGroup.MinSize() - deletionsInProgress
if available < 0 {
available = 0
Expand Down Expand Up @@ -1016,10 +1008,9 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodesToRemove []simulator.Nod
}
deletedNodes = append(deletedNodes, empty.Node)
go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup, evictByDefault bool) {
sd.nodeDeletionTracker.StartDeletion(nodeGroupForDeletedNode.Id())
defer sd.nodeDeletionTracker.EndDeletion(nodeGroupForDeletedNode.Id())
sd.nodeDeletionTracker.StartDeletion(nodeGroupForDeletedNode.Id(), nodeToDelete.Name)
var result status.NodeDeleteResult
defer func() { sd.nodeDeletionTracker.AddNodeDeleteResult(nodeToDelete.Name, result) }()
defer func() { sd.nodeDeletionTracker.EndDeletion(nodeGroupForDeletedNode.Id(), nodeToDelete.Name, result) }()

var deleteErr errors.AutoscalerError
// If we fail to delete the node we want to remove delete taint
Expand Down Expand Up @@ -1110,9 +1101,6 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, daemonSetPo
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToMarkToBeDeleted, Err: errors.ToAutoscalerError(errors.ApiCallError, err)}
}

sd.nodeDeletionTracker.StartDeletion(nodeGroup.Id())
defer sd.nodeDeletionTracker.EndDeletion(nodeGroup.Id())

// If we fail to evict all the pods from the node we want to remove delete taint
defer func() {
if !deleteSuccessful {
Expand Down
12 changes: 7 additions & 5 deletions cluster-autoscaler/core/scaledown/legacy/legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,8 @@ func TestScaleDown(t *testing.T) {

func waitForDeleteToFinish(t *testing.T, sd *ScaleDown) {
for start := time.Now(); time.Since(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) {
if !sd.IsNonEmptyNodeDeleteInProgress() {
_, drained := sd.nodeDeletionTracker.DeletionsInProgress()
if len(drained) == 0 {
return
}
}
Expand Down Expand Up @@ -1530,9 +1531,9 @@ func TestScaleDownEmptyMinGroupSizeLimitHit(t *testing.T) {
}

func TestScaleDownEmptyMinGroupSizeLimitHitWhenOneNodeIsBeingDeleted(t *testing.T) {
nodeDeletionTracker := deletiontracker.NewNodeDeletionTracker()
nodeDeletionTracker.StartDeletion("ng1")
nodeDeletionTracker.StartDeletion("ng1")
nodeDeletionTracker := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
nodeDeletionTracker.StartDeletion("ng1", "n1")
nodeDeletionTracker.StartDeletion("ng1", "n2")
options := defaultScaleDownOptions
config := &ScaleTestConfig{
Nodes: []NodeConfig{
Expand Down Expand Up @@ -1622,7 +1623,6 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) {
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil)
assert.NoError(t, autoscalererr)
scaleDownStatus, err := scaleDown.TryToScaleDown(time.Now(), nil)
assert.False(t, scaleDown.IsNonEmptyNodeDeleteInProgress())

assert.NoError(t, err)
var expectedScaleDownResult status.ScaleDownResult
Expand Down Expand Up @@ -1652,6 +1652,8 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) {

assert.Equal(t, expectedScaleDownCount, len(deleted))
assert.Subset(t, config.ExpectedScaleDowns, deleted)
_, nonEmptyDeletions := scaleDown.nodeDeletionTracker.DeletionsInProgress()
assert.Equal(t, 0, len(nonEmptyDeletions))
}

func TestNoScaleDownUnready(t *testing.T) {
Expand Down
43 changes: 2 additions & 41 deletions cluster-autoscaler/core/scaledown/legacy/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
Expand Down Expand Up @@ -89,49 +88,11 @@ func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node, current
func (p *ScaleDownWrapper) CheckStatus() scaledown.ActuationStatus {
// TODO: snapshot information from the tracker instead of keeping live
// updated object.
return &actuationStatus{
ndt: p.sd.nodeDeletionTracker,
}
return p.sd.nodeDeletionTracker
}

// ClearResultsNotNewerThan clears old node deletion results kept by the
// Actuator.
func (p *ScaleDownWrapper) ClearResultsNotNewerThan(t time.Time) {
// TODO: implement this once results are not cleared while being
// fetched.
}

type actuationStatus struct {
ndt *deletiontracker.NodeDeletionTracker
}

// DeletionsInProgress returns node names of currently deleted nodes.
// Current implementation is not aware of the actual nodes names, so it returns
// a fake node name instead.
// TODO: Return real node names
func (a *actuationStatus) DeletionsInProgress() []string {
if a.ndt.IsNonEmptyNodeDeleteInProgress() {
return []string{"fake-node-name"}
}
return nil
}

// DeletionsCount returns total number of ongoing deletions in a given node
// group.
func (a *actuationStatus) DeletionsCount(nodeGroupId string) int {
return a.ndt.GetDeletionsInProgress(nodeGroupId)
}

// RecentEvictions should return a list of recently evicted pods. Since legacy
// scale down logic only drains at most one node at a time, this safeguard is
// not really needed there, so we can just return an empty list.
func (a *actuationStatus) RecentEvictions() []*apiv1.Pod {
return nil
}

// DeletionResults returns a map of recent node deletion results.
func (a *actuationStatus) DeletionResults() map[string]status.NodeDeleteResult {
// TODO: update nodeDeletionTracker so it doesn't get & clear in the
// same step.
return a.ndt.GetAndClearNodeDeleteResults()
p.sd.nodeDeletionTracker.ClearResultsNotNewerThan(t)
}
Loading

0 comments on commit 561a9da

Please sign in to comment.