Skip to content

Commit

Permalink
remove nodes stuck creating even if this takes us below nodeGroup min…
Browse files Browse the repository at this point in the history
…Size
  • Loading branch information
David Morrison committed Oct 4, 2021
1 parent 82bdc4a commit 6f00826
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 32 deletions.
10 changes: 2 additions & 8 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import (
)

const (
scaleToZeroSupported = true
placeholderInstanceNamePrefix = "i-placeholder"
scaleToZeroSupported = true
)

type asgCache struct {
Expand Down Expand Up @@ -264,7 +263,7 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
for _, instance := range instances {
// check if the instance is a placeholder - a requested instance that was never created by the node group
// if it is, just decrease the size of the node group, as there's no specific instance we can remove
if m.isPlaceholderInstance(instance) {
if instance.isPlaceholder() {
klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+
"of deleting instance", instance.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
Expand All @@ -288,11 +287,6 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
return nil
}

// isPlaceholderInstance checks if the given instance is only a placeholder
func (m *asgCache) isPlaceholderInstance(instance *AwsInstanceRef) bool {
return strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix)
}

// Fetch automatically discovered ASGs. These ASGs should be unregistered if
// they no longer exist in AWS.
func (m *asgCache) fetchAutoAsgNames() ([]string, error) {
Expand Down
18 changes: 16 additions & 2 deletions cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (

const (
// GPULabel is the label added to nodes with GPU resource.
GPULabel = "k8s.amazonaws.com/accelerator"
GPULabel = "k8s.amazonaws.com/accelerator"
placeholderInstanceNamePrefix = "i-placeholder"
)

var (
Expand Down Expand Up @@ -157,6 +158,11 @@ type AwsInstanceRef struct {
Name string
}

// isPlaceholder checks if the given instance is only a placeholder
func (i *AwsInstanceRef) isPlaceholder() bool {
return strings.HasPrefix(i.Name, placeholderInstanceNamePrefix)
}

var validAwsRefIdRegex = regexp.MustCompile(fmt.Sprintf(`^aws\:\/\/\/[-0-9a-z]*\/[-0-9a-z]*(\/[-0-9a-z\.]*)?$|aws\:\/\/\/[-0-9a-z]*\/%s.*$`, placeholderInstanceNamePrefix))

// AwsRefFromProviderId creates AwsInstanceRef object from provider id which
Expand Down Expand Up @@ -316,7 +322,15 @@ func (ng *AwsNodeGroup) Nodes() ([]cloudprovider.Instance, error) {
instances := make([]cloudprovider.Instance, len(asgNodes))

for i, asgNode := range asgNodes {
instances[i] = cloudprovider.Instance{Id: asgNode.ProviderID}
status := cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}
// Placeholder instances have not been provisioned (created) by AWS yet
if asgNode.isPlaceholder() {
status.State = cloudprovider.InstanceCreating
}
instances[i] = cloudprovider.Instance{
Id: asgNode.ProviderID,
Status: &status,
}
}
return instances, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,12 @@ func TestNodeGroupForNode(t *testing.T) {

assert.NoError(t, err)

assert.Equal(t, []cloudprovider.Instance{{Id: "aws:///us-east-1a/test-instance-id"}}, nodes)
assert.Equal(t, []cloudprovider.Instance{
{
Id: "aws:///us-east-1a/test-instance-id",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning},
},
}, nodes)
service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)

// test node in cluster that is not in a group managed by cluster autoscaler
Expand Down
30 changes: 20 additions & 10 deletions cluster-autoscaler/cloudprovider/test/test_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,15 @@ type OnNodeGroupCreateFunc func(string) error
// OnNodeGroupDeleteFunc is a function called when a node group is deleted.
type OnNodeGroupDeleteFunc func(string) error

type TestNode struct {
groupName string
status *cloudprovider.InstanceStatus
}

// TestCloudProvider is a dummy cloud provider to be used in tests.
type TestCloudProvider struct {
sync.Mutex
nodes map[string]string
nodes map[string]TestNode
groups map[string]cloudprovider.NodeGroup
onScaleUp func(string, int) error
onScaleDown func(string, string) error
Expand All @@ -59,7 +64,7 @@ type TestCloudProvider struct {
// NewTestCloudProvider builds new TestCloudProvider
func NewTestCloudProvider(onScaleUp OnScaleUpFunc, onScaleDown OnScaleDownFunc) *TestCloudProvider {
return &TestCloudProvider{
nodes: make(map[string]string),
nodes: make(map[string]TestNode),
groups: make(map[string]cloudprovider.NodeGroup),
onScaleUp: onScaleUp,
onScaleDown: onScaleDown,
Expand All @@ -72,7 +77,7 @@ func NewTestAutoprovisioningCloudProvider(onScaleUp OnScaleUpFunc, onScaleDown O
onNodeGroupCreate OnNodeGroupCreateFunc, onNodeGroupDelete OnNodeGroupDeleteFunc,
machineTypes []string, machineTemplates map[string]*schedulerframework.NodeInfo) *TestCloudProvider {
return &TestCloudProvider{
nodes: make(map[string]string),
nodes: make(map[string]TestNode),
groups: make(map[string]cloudprovider.NodeGroup),
onScaleUp: onScaleUp,
onScaleDown: onScaleDown,
Expand Down Expand Up @@ -129,11 +134,11 @@ func (tcp *TestCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.
tcp.Lock()
defer tcp.Unlock()

groupName, found := tcp.nodes[node.Name]
nodeInfo, found := tcp.nodes[node.Name]
if !found {
return nil, nil
}
group, found := tcp.groups[groupName]
group, found := tcp.groups[nodeInfo.groupName]
if !found {
return nil, nil
}
Expand Down Expand Up @@ -245,12 +250,17 @@ func (tcp *TestCloudProvider) DeleteNodeGroup(id string) {
delete(tcp.groups, id)
}

// AddNode adds the given node to the group.
// AddNode adds the given node to the group
func (tcp *TestCloudProvider) AddNode(nodeGroupId string, node *apiv1.Node) {
tcp.AddNodeWithStatus(nodeGroupId, node, nil)
}

// AddNodeWithStatus adds the given node to the group with the specified Status
func (tcp *TestCloudProvider) AddNodeWithStatus(nodeGroupId string, node *apiv1.Node, status *cloudprovider.InstanceStatus) {
tcp.Lock()
defer tcp.Unlock()

tcp.nodes[node.Name] = nodeGroupId
tcp.nodes[node.Name] = TestNode{groupName: nodeGroupId, status: status}
}

// GetResourceLimiter returns struct containing limits (max, min) for resources (cores, memory etc.).
Expand Down Expand Up @@ -429,9 +439,9 @@ func (tng *TestNodeGroup) Nodes() ([]cloudprovider.Instance, error) {
defer tng.Unlock()

instances := make([]cloudprovider.Instance, 0)
for node, nodegroup := range tng.cloudProvider.nodes {
if nodegroup == tng.id {
instances = append(instances, cloudprovider.Instance{Id: node})
for node, nodeInfo := range tng.cloudProvider.nodes {
if nodeInfo.groupName == tng.id {
instances = append(instances, cloudprovider.Instance{Id: node, Status: nodeInfo.status})
}
}
return instances, nil
Expand Down
13 changes: 11 additions & 2 deletions cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,15 @@ type IncorrectNodeGroupSize struct {
FirstObserved time.Time
}

// UnregisteredNode contains information about nodes that are present on the cluster provider side
// UnregisteredNode contains information about nodes that are requested on the cluster provider side
// but failed to register in Kubernetes.
type UnregisteredNode struct {
// Node is a dummy node that contains only the name of the node.
Node *apiv1.Node
// UnregisteredSince is the time when the node was first spotted.
UnregisteredSince time.Time
// Status is whether or not the node is still being created by the cloud provider
Status *cloudprovider.InstanceStatus
}

// ScaleUpFailure contains information about a failure of a scale-up.
Expand Down Expand Up @@ -651,7 +653,13 @@ func (csr *ClusterStateRegistry) updateUnregisteredNodes(unregisteredNodes []Unr
result := make(map[string]UnregisteredNode)
for _, unregistered := range unregisteredNodes {
if prev, found := csr.unregisteredNodes[unregistered.Node.Name]; found {
result[unregistered.Node.Name] = prev
// If the state of the unregistered node has changed (i.e. from "creating" to "running")
// then we reset the UnregisteredSince time
if unregistered.Status != csr.unregisteredNodes[unregistered.Node.Name].Status {
result[unregistered.Node.Name] = unregistered
} else {
result[unregistered.Node.Name] = prev
}
} else {
result[unregistered.Node.Name] = unregistered
}
Expand Down Expand Up @@ -951,6 +959,7 @@ func getNotRegisteredNodes(allNodes []*apiv1.Node, cloudProviderNodeInstances ma
notRegistered = append(notRegistered, UnregisteredNode{
Node: fakeNode(instance, cloudprovider.FakeNodeUnregistered),
UnregisteredSince: time,
Status: instance.Status,
})
}
}
Expand Down
10 changes: 7 additions & 3 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNod
removedAny := false
for _, unregisteredNode := range unregisteredNodes {
if unregisteredNode.UnregisteredSince.Add(context.MaxNodeProvisionTime).Before(currentTime) {
klog.V(0).Infof("Removing unregistered node %v", unregisteredNode.Node.Name)
klog.V(0).Infof("Removing unregistered node %v with status %v", unregisteredNode.Node.Name, unregisteredNode.Status)
nodeGroup, err := context.CloudProvider.NodeGroupForNode(unregisteredNode.Node)
if err != nil {
klog.Warningf("Failed to get node group for %s: %v", unregisteredNode.Node.Name, err)
Expand All @@ -619,8 +619,12 @@ func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNod
klog.Warningf("Failed to get node group size; unregisteredNode=%v; nodeGroup=%v; err=%v", unregisteredNode.Node.Name, nodeGroup.Id(), err)
continue
}
if nodeGroup.MinSize() >= size {
klog.Warningf("Failed to remove node %s: node group min size reached, skipping unregistered node removal", unregisteredNode.Node.Name)
// If the node is stuck in the `InstanceCreating` state, that might mean that the cloud
// provider isn't able to fulfill our request, so we should go ahead and delete this
// node even if it would take us under our minimum size; if we don't, we can get stuck
// in a state where we never back off this node group even though we will never scale up
if nodeGroup.MinSize() >= size && (unregisteredNode.Status == nil || unregisteredNode.Status.State != cloudprovider.InstanceCreating) {
klog.Warningf("Failed to remove node %s: node group min size reached and node status is not 'creating', skipping unregistered node removal", unregisteredNode.Node.Name)
continue
}
err = nodeGroup.DeleteNodes([]*apiv1.Node{unregisteredNode.Node})
Expand Down
18 changes: 12 additions & 6 deletions cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
}

func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
func TestStaticAutoscalerRunOnceWithLongUnregisteredNodes(t *testing.T) {
readyNodeLister := kubernetes.NewTestNodeLister(nil)
allNodeLister := kubernetes.NewTestNodeLister(nil)
scheduledPodMock := &podListerMock{}
Expand Down Expand Up @@ -487,10 +487,15 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
provider.AddNode("ng1", n1)

// broken node, that will be just hanging out there during
// the test (it can't be removed since that would validate group min size)
// the test (it can't be removed since that would violate group min size)
brokenNode := BuildTestNode("broken", 1000, 1000)
provider.AddNode("ng1", brokenNode)

// This node is stuck in the "instance creating" state, and will be removed
// even though min size might be violated
stuckCreatingNode := BuildTestNode("stuck_creating", 1000, 1000)
provider.AddNodeWithStatus("ng1", stuckCreatingNode, &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating})

ng1 := reflect.ValueOf(provider.GetNodeGroup("ng1")).Interface().(*testprovider.TestNodeGroup)
assert.NotNil(t, ng1)
assert.NotNil(t, provider)
Expand Down Expand Up @@ -530,7 +535,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
//nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listerRegistry, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState.UpdateNodes(nodes, nil, now)

// broken node failed to register in time
// broken/stuck nodes failed to register in time
clusterState.UpdateNodes(nodes, nil, later)

processors := NewTestProcessors()
Expand Down Expand Up @@ -560,14 +565,15 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)

// Remove broken node after going over min size
// Remove broken/stuck nodes after going over min size
provider.AddNode("ng1", n2)
ng1.SetTargetSize(3)
ng1.SetTargetSize(4)

readyNodeLister.SetNodes([]*apiv1.Node{n1, n2})
allNodeLister.SetNodes([]*apiv1.Node{n1, n2})
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice()
onScaleDownMock.On("ScaleDown", "ng1", "broken").Return(nil).Once()
onScaleDownMock.On("ScaleDown", "ng1", "broken").Return(nil).Once().
On("ScaleDown", "ng1", "stuck_creating").Return(nil).Once()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()

err = autoscaler.RunOnce(later.Add(2 * time.Hour))
Expand Down

0 comments on commit 6f00826

Please sign in to comment.