Skip to content

Commit

Permalink
Merge pull request #2031 from krzysztof-jastrzebski/master
Browse files Browse the repository at this point in the history
Add functionality which delays node deletion to let other components prepare for deletion.
  • Loading branch information
k8s-ci-robot authored May 20, 2019
2 parents 8d2ec08 + 4247c8b commit cb4e60f
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 110 deletions.
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type AutoscalingOptions struct {
// The formula to calculate additional candidates number is following:
// max(#nodes * ScaleDownCandidatesPoolRatio, ScaleDownCandidatesPoolMinCount)
ScaleDownCandidatesPoolMinCount int
// NodeDeletionDelayTimeout is maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node.
NodeDeletionDelayTimeout time.Duration
// WriteStatusConfigMap tells if the status information should be written to a ConfigMap
WriteStatusConfigMap bool
// BalanceSimilarNodeGroups enables logic that identifies node groups with similar machines and tries to balance node count between them.
Expand Down
247 changes: 159 additions & 88 deletions cluster-autoscaler/core/scale_down.go

Large diffs are not rendered by default.

98 changes: 86 additions & 12 deletions cluster-autoscaler/core/scale_down_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/mock"
batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1"
Expand Down Expand Up @@ -599,13 +600,14 @@ func TestDeleteNode(t *testing.T) {
fakeClient.Fake.AddReactor("get", "pods", podNotFoundFunc)

// build context
context := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, nil, provider, nil)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
context := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, registry, provider, nil)

clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
sd := NewScaleDown(&context, clusterStateRegistry)

// attempt delete
result := sd.deleteNode(n1, pods)
result := sd.deleteNode(n1, pods, provider.GetNodeGroup("ng1"))

// verify
if scenario.expectedDeletion {
Expand Down Expand Up @@ -950,7 +952,7 @@ func TestScaleDown(t *testing.T) {

func waitForDeleteToFinish(t *testing.T, sd *ScaleDown) {
for start := time.Now(); time.Since(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) {
if !sd.nodeDeleteStatus.IsDeleteInProgress() {
if !sd.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress() {
return
}
}
Expand Down Expand Up @@ -1088,6 +1090,24 @@ func TestScaleDownEmptyMinGroupSizeLimitHit(t *testing.T) {
simpleScaleDownEmpty(t, config)
}

func TestScaleDownEmptyMinGroupSizeLimitHitWhenOneNodeIsBeingDeleted(t *testing.T) {
nodeDeletionTracker := NewNodeDeletionTracker()
nodeDeletionTracker.StartDeletion("ng1")
nodeDeletionTracker.StartDeletion("ng1")
options := defaultScaleDownOptions
config := &scaleTestConfig{
nodes: []nodeConfig{
{"n1", 2000, 1000, 0, true, "ng1"},
{"n2", 2000, 1000, 0, true, "ng1"},
{"n3", 2000, 1000, 0, true, "ng1"},
},
options: options,
expectedScaleDowns: []string{},
nodeDeletionTracker: nodeDeletionTracker,
}
simpleScaleDownEmpty(t, config)
}

func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
updatedNodes := make(chan string, 10)
deletedNodes := make(chan string, 10)
Expand Down Expand Up @@ -1147,22 +1167,23 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {

assert.NotNil(t, provider)

context := NewScaleTestAutoscalingContext(config.options, fakeClient, nil, provider, nil)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
context := NewScaleTestAutoscalingContext(config.options, fakeClient, registry, provider, nil)

clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
scaleDown := NewScaleDown(&context, clusterStateRegistry)
if config.nodeDeletionTracker != nil {
scaleDown.nodeDeletionTracker = config.nodeDeletionTracker
}
scaleDown.UpdateUnneededNodes(nodes,
nodes, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil)
scaleDownStatus, err := scaleDown.TryToScaleDown(nodes, []*apiv1.Pod{}, nil, time.Now())
waitForDeleteToFinish(t, scaleDown)
// This helps to verify that TryToScaleDown doesn't attempt to remove anything
// after delete in progress status is gone.
close(deletedNodes)
assert.False(t, scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress())

assert.NoError(t, err)
var expectedScaleDownResult status.ScaleDownResult
if len(config.expectedScaleDowns) > 0 {
expectedScaleDownResult = status.ScaleDownNodeDeleted
expectedScaleDownResult = status.ScaleDownNodeDeleteStarted
} else {
expectedScaleDownResult = status.ScaleDownNoUnneeded
}
Expand All @@ -1172,8 +1193,8 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
// Report only up to 10 extra nodes found.
deleted := make([]string, 0, len(config.expectedScaleDowns)+10)
for i := 0; i < len(config.expectedScaleDowns)+10; i++ {
d := getStringFromChanImmediately(deletedNodes)
if d == "" { // a closed channel yields empty value
d := getStringFromChan(deletedNodes)
if d == nothingReturned { // a closed channel yields empty value
break
}
deleted = append(deleted, d)
Expand Down Expand Up @@ -1222,7 +1243,8 @@ func TestNoScaleDownUnready(t *testing.T) {
ScaleDownUnreadyTime: time.Hour,
MaxGracefulTerminationSec: 60,
}
context := NewScaleTestAutoscalingContext(options, fakeClient, nil, provider, nil)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
context := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)

// N1 is unready so it requires a bigger unneeded time.
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
Expand Down Expand Up @@ -1730,3 +1752,55 @@ func TestSoftTaintTimeLimit(t *testing.T) {
assert.Empty(t, errs)
assert.Equal(t, 0, countDeletionCandidateTaints(t, fakeClient))
}

func TestWaitForDelayDeletion(t *testing.T) {
type testcase struct {
name string
addAnnotation bool
removeAnnotation bool
expectCallingGetNode bool
}
tests := []testcase{
{
name: "annotation not set",
addAnnotation: false,
removeAnnotation: false,
},
{
name: "annotation set and removed",
addAnnotation: true,
removeAnnotation: true,
},
{
name: "annotation set but not removed",
addAnnotation: true,
removeAnnotation: false,
},
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
node := BuildTestNode("n1", 1000, 10)
nodeWithAnnotation := BuildTestNode("n1", 1000, 10)
nodeWithAnnotation.Annotations = map[string]string{DelayDeletionAnnotationPrefix + "ingress": "true"}
allNodeListerMock := &nodeListerMock{}
if test.addAnnotation {
if test.removeAnnotation {
allNodeListerMock.On("Get").Return(node, nil).Once()
} else {
allNodeListerMock.On("Get").Return(nodeWithAnnotation, nil).Twice()
}
}
var err error
if test.addAnnotation {
err = waitForDelayDeletion(nodeWithAnnotation, allNodeListerMock, 6*time.Second)
} else {
err = waitForDelayDeletion(node, allNodeListerMock, 6*time.Second)
}
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, allNodeListerMock)
})
}
}
1 change: 1 addition & 0 deletions cluster-autoscaler/core/scale_test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type scaleTestConfig struct {
expectedFinalScaleUp groupSizeChange // we expect this to be delivered via scale-up event
expectedScaleDowns []string
options config.AutoscalingOptions
nodeDeletionTracker *NodeDeletionTracker
}

// NewScaleTestAutoscalingContext creates a new test autoscaling context for scaling tests.
Expand Down
6 changes: 3 additions & 3 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,16 +398,16 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) ||
a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime)
// In dry run only utilization is updated
calculateUnneededOnly := scaleDownInCooldown || scaleDown.nodeDeleteStatus.IsDeleteInProgress()
calculateUnneededOnly := scaleDownInCooldown || scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress()

klog.V(4).Infof("Scale down status: unneededOnly=%v lastScaleUpTime=%s "+
"lastScaleDownDeleteTime=%v lastScaleDownFailTime=%s scaleDownForbidden=%v isDeleteInProgress=%v",
calculateUnneededOnly, a.lastScaleUpTime, a.lastScaleDownDeleteTime, a.lastScaleDownFailTime,
a.processorCallbacks.disableScaleDownForLoop, scaleDown.nodeDeleteStatus.IsDeleteInProgress())
a.processorCallbacks.disableScaleDownForLoop, scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress())

if scaleDownInCooldown {
scaleDownStatus.Result = status.ScaleDownInCooldown
} else if scaleDown.nodeDeleteStatus.IsDeleteInProgress() {
} else if scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress() {
scaleDownStatus.Result = status.ScaleDownInProgress
} else {
klog.V(4).Infof("Starting scale down")
Expand Down
4 changes: 4 additions & 0 deletions cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (l *nodeListerMock) List() ([]*apiv1.Node, error) {
args := l.Called()
return args.Get(0).([]*apiv1.Node), args.Error(1)
}
func (l *nodeListerMock) Get(name string) (*apiv1.Node, error) {
args := l.Called()
return args.Get(0).(*apiv1.Node), args.Error(1)
}

type podListerMock struct {
mock.Mock
Expand Down
4 changes: 4 additions & 0 deletions cluster-autoscaler/expander/price/preferred_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (n *testNodeLister) List() ([]*apiv1.Node, error) {
return n.list, nil
}

func (n *testNodeLister) Get(name string) (*apiv1.Node, error) {
return nil, nil
}

func testPreferredNodeSingleCase(t *testing.T, currentNodes int, expectedNodeSize int) {
nodes := []*apiv1.Node{}
for i := 1; i <= currentNodes; i++ {
Expand Down
15 changes: 8 additions & 7 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,13 @@ var (
"for scale down when some candidates from previous iteration are no longer valid."+
"When calculating the pool size for additional candidates we take"+
"max(#nodes * scale-down-candidates-pool-ratio, scale-down-candidates-pool-min-count).")
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
memoryTotal = flag.String("memory-total", minMaxFlagString(0, config.DefaultMaxClusterMemory), "Minimum and maximum number of gigabytes of memory in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
gpuTotal = multiStringFlag("gpu-total", "Minimum and maximum number of different GPUs in cluster, in the format <gpu_type>:<min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers. Can be passed multiple times. CURRENTLY THIS FLAG ONLY WORKS ON GKE.")
cloudProviderFlag = flag.String("cloud-provider", cloudBuilder.DefaultCloudProvider,
nodeDeletionDelayTimeout = flag.Duration("node-deletion-delay-timeout", 2*time.Minute, "Maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node.")
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
memoryTotal = flag.String("memory-total", minMaxFlagString(0, config.DefaultMaxClusterMemory), "Minimum and maximum number of gigabytes of memory in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
gpuTotal = multiStringFlag("gpu-total", "Minimum and maximum number of different GPUs in cluster, in the format <gpu_type>:<min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers. Can be passed multiple times. CURRENTLY THIS FLAG ONLY WORKS ON GKE.")
cloudProviderFlag = flag.String("cloud-provider", cloudBuilder.DefaultCloudProvider,
"Cloud provider type. Available values: ["+strings.Join(cloudBuilder.AvailableCloudProviders, ",")+"]")
maxBulkSoftTaintCount = flag.Int("max-bulk-soft-taint-count", 10, "Maximum number of nodes that can be tainted/untainted PreferNoSchedule at the same time. Set to 0 to turn off such tainting.")
maxBulkSoftTaintTime = flag.Duration("max-bulk-soft-taint-time", 3*time.Second, "Maximum duration of tainting/untainting nodes as PreferNoSchedule at the same time.")
Expand Down Expand Up @@ -185,7 +186,6 @@ func createAutoscalingOptions() config.AutoscalingOptions {
if err != nil {
klog.Fatalf("Failed to parse flags: %v", err)
}

return config.AutoscalingOptions{
CloudConfig: *cloudConfig,
CloudProviderName: *cloudProviderFlag,
Expand Down Expand Up @@ -231,6 +231,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
NewPodScaleUpDelay: *newPodScaleUpDelay,
FilterOutSchedulablePodsUsesPacking: *filterOutSchedulablePodsUsesPacking,
IgnoredTaints: *ignoreTaintsFlag,
NodeDeletionDelayTimeout: *nodeDeletionDelayTimeout,
}
}

Expand Down
19 changes: 19 additions & 0 deletions cluster-autoscaler/utils/kubernetes/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struc
// NodeLister lists nodes.
type NodeLister interface {
List() ([]*apiv1.Node, error)
Get(name string) (*apiv1.Node, error)
}

// ReadyNodeLister lists ready nodes.
Expand All @@ -245,6 +246,15 @@ func (readyNodeLister *ReadyNodeLister) List() ([]*apiv1.Node, error) {
return readyNodes, nil
}

// Get returns the node with the given name.
func (readyNodeLister *ReadyNodeLister) Get(name string) (*apiv1.Node, error) {
node, err := readyNodeLister.nodeLister.Get(name)
if err != nil {
return nil, err
}
return node, nil
}

// NewReadyNodeLister builds a node lister.
func NewReadyNodeLister(kubeClient client.Interface, stopChannel <-chan struct{}) NodeLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
Expand Down Expand Up @@ -272,6 +282,15 @@ func (allNodeLister *AllNodeLister) List() ([]*apiv1.Node, error) {
return allNodes, nil
}

// Get returns the node with the given name.
func (allNodeLister *AllNodeLister) Get(name string) (*apiv1.Node, error) {
node, err := allNodeLister.nodeLister.Get(name)
if err != nil {
return nil, err
}
return node, nil
}

// NewAllNodeLister builds a node lister that returns all nodes (ready and unready)
func NewAllNodeLister(kubeClient client.Interface, stopchannel <-chan struct{}) NodeLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
Expand Down

0 comments on commit cb4e60f

Please sign in to comment.