From 86e627d46b01a78d9f7980a5ae2bd42bce82342c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Pi=C4=85tkowski?= Date: Thu, 9 May 2019 15:30:05 +0200 Subject: [PATCH] correctly handle lack of capacity of AWS spot ASGs --- .../cloudprovider/aws/auto_scaling_groups.go | 72 ++++++++++++++++--- .../aws/aws_cloud_provider_test.go | 7 +- .../cloudprovider/aws/aws_manager.go | 6 +- .../cloudprovider/aws/aws_manager_test.go | 12 +++- cluster-autoscaler/core/static_autoscaler.go | 3 +- 5 files changed, 84 insertions(+), 16 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index c3ed1fdc7518..6d88eeeab2f0 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -19,6 +19,7 @@ package aws import ( "fmt" "reflect" + "regexp" "strings" "sync" @@ -30,7 +31,10 @@ import ( "github.com/golang/glog" ) -const scaleToZeroSupported = true +const ( + scaleToZeroSupported = true + placeholderInstanceNamePrefix = "i-placeholder" +) type asgCache struct { registeredAsgs []*asg @@ -195,6 +199,10 @@ func (m *asgCache) SetAsgSize(asg *asg, size int) error { m.mutex.Lock() defer m.mutex.Unlock() + return m.setAsgSizeNoLock(asg, size) +} + +func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error { params := &autoscaling.SetDesiredCapacityInput{ AutoScalingGroupName: aws.String(asg.Name), DesiredCapacity: aws.Int64(int64(size)), @@ -212,6 +220,10 @@ func (m *asgCache) SetAsgSize(asg *asg, size int) error { return nil } +func (m *asgCache) decreaseAsgSizeByOneNoLock(asg *asg) error { + return m.setAsgSizeNoLock(asg, asg.curSize-1) +} + // DeleteInstances deletes the given instances. All instances must be controlled by the same ASG. func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { m.mutex.Lock() @@ -239,24 +251,36 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { } for _, instance := range instances { - params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{ - InstanceId: aws.String(instance.Name), - ShouldDecrementDesiredCapacity: aws.Bool(true), - } - resp, err := m.service.TerminateInstanceInAutoScalingGroup(params) - if err != nil { - return err + // check if the instance is a placeholder - a requested instance that was never created by the node group + // if it is, just decrease the size of the node group, as there's no specific instance we can remove + if m.isPlaceholderInstance(instance) { + glog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+ + "of deleting instance", instance.Name) + m.decreaseAsgSizeByOneNoLock(commonAsg) + } else { + params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{ + InstanceId: aws.String(instance.Name), + ShouldDecrementDesiredCapacity: aws.Bool(true), + } + resp, err := m.service.TerminateInstanceInAutoScalingGroup(params) + if err != nil { + return err + } + glog.V(4).Infof(*resp.Activity.Description) } // Proactively decrement the size so autoscaler makes better decisions commonAsg.curSize-- - - glog.V(4).Infof(*resp.Activity.Description) } - return nil } +// isPlaceholderInstance checks if the given instance is only a placeholder +func (m *asgCache) isPlaceholderInstance(instance *AwsInstanceRef) bool { + matched, _ := regexp.MatchString(fmt.Sprintf("^%s.*\\d+$", placeholderInstanceNamePrefix), instance.Name) + return matched +} + // Fetch automatically discovered ASGs. These ASGs should be unregistered if // they no longer exist in AWS. func (m *asgCache) fetchAutoAsgNames() ([]string, error) { @@ -323,6 +347,11 @@ func (m *asgCache) regenerate() error { return err } + // If currently any ASG has more Desired than running Instances, introduce placeholders + // for the instances to come up. This is required to track Desired instances that + // will never come up, like with Spot Request that can't be fulfilled + groups = m.createPlaceholdersForDesiredNonStartedInstances(groups) + // Register or update ASGs exists := make(map[AwsRef]bool) for _, group := range groups { @@ -355,6 +384,27 @@ func (m *asgCache) regenerate() error { return nil } +func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*autoscaling.Group) []*autoscaling.Group { + for _, g := range groups { + desired := *g.DesiredCapacity + real := int64(len(g.Instances)) + if desired <= real { + continue + } + + for i := real; i < desired; i++ { + id := fmt.Sprintf("%s-%s-%d", placeholderInstanceNamePrefix, *g.AutoScalingGroupName, i) + glog.V(4).Infof("Instance group %s has only %d instances created while requested count is %d. "+ + "Creating placeholder instance with ID %s.", *g.AutoScalingGroupName, real, desired, id) + g.Instances = append(g.Instances, &autoscaling.Instance{ + InstanceId: &id, + AvailabilityZone: g.AvailabilityZones[0], + }) + } + } + return groups +} + func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) { spec := dynamic.NodeGroupSpec{ Name: aws.StringValue(g.AutoScalingGroupName), diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 09f6f630d10d..28473123420f 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -398,6 +398,7 @@ func TestDeleteNodes(t *testing.T) { }) // Look up the current number of instances... + var expectedInstancesCount int64 = 2 service.On("DescribeAutoScalingGroupsPages", &autoscaling.DescribeAutoScalingGroupsInput{ AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}), @@ -406,7 +407,9 @@ func TestDeleteNodes(t *testing.T) { mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), ).Run(func(args mock.Arguments) { fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) - fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", 2, "test-instance-id", "second-test-instance-id"), false) + fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "test-instance-id", "second-test-instance-id"), false) + // we expect the instance count to be 1 after the call to DeleteNodes + expectedInstancesCount = 1 }).Return(nil) provider.Refresh() @@ -423,7 +426,7 @@ func TestDeleteNodes(t *testing.T) { err = asgs[0].DeleteNodes([]*apiv1.Node{node}) assert.NoError(t, err) service.AssertNumberOfCalls(t, "TerminateInstanceInAutoScalingGroup", 1) - service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) + service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2) newSize, err := asgs[0].TargetSize() assert.NoError(t, err) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index 6de24e76a594..724a952fa465 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -180,7 +180,11 @@ func (m *AwsManager) SetAsgSize(asg *asg, size int) error { // DeleteInstances deletes the given instances. All instances must be controlled by the same ASG. func (m *AwsManager) DeleteInstances(instances []*AwsInstanceRef) error { - return m.asgCache.DeleteInstances(instances) + if err := m.asgCache.DeleteInstances(instances); err != nil { + return err + } + glog.V(2).Infof("Some ASG instances might have been deleted, forcing ASG list refresh") + return m.forceRefresh() } // GetAsgNodes returns Asg nodes. diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go index 8f1c2d6628a4..0f9113d69baf 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go @@ -179,9 +179,16 @@ func TestFetchExplicitAsgs(t *testing.T) { mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), ).Run(func(args mock.Arguments) { fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) + zone := "test-1a" fn(&autoscaling.DescribeAutoScalingGroupsOutput{ AutoScalingGroups: []*autoscaling.Group{ - {AutoScalingGroupName: aws.String(groupname)}, + { + AvailabilityZones: []*string{&zone}, + AutoScalingGroupName: aws.String(groupname), + MinSize: aws.Int64(int64(min)), + MaxSize: aws.Int64(int64(max)), + DesiredCapacity: aws.Int64(int64(min)), + }, }}, false) }).Return(nil) @@ -275,11 +282,14 @@ func TestFetchAutoAsgs(t *testing.T) { mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), ).Run(func(args mock.Arguments) { fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) + zone := "test-1a" fn(&autoscaling.DescribeAutoScalingGroupsOutput{ AutoScalingGroups: []*autoscaling.Group{{ + AvailabilityZones: []*string{&zone}, AutoScalingGroupName: aws.String(groupname), MinSize: aws.Int64(int64(min)), MaxSize: aws.Int64(int64(max)), + DesiredCapacity: aws.Int64(int64(min)), }}}, false) }).Return(nil).Twice() diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 2b86796dbfd5..9b48aa42a44c 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -166,7 +166,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError unregisteredNodes := a.clusterStateRegistry.GetUnregisteredNodes() if len(unregisteredNodes) > 0 { glog.V(1).Infof("%d unregistered nodes present", len(unregisteredNodes)) - removedAny, err := removeOldUnregisteredNodes(unregisteredNodes, autoscalingContext, currentTime, autoscalingContext.LogRecorder) + removedAny, err := removeOldUnregisteredNodes(unregisteredNodes, autoscalingContext, + currentTime, autoscalingContext.LogRecorder) // There was a problem with removing unregistered nodes. Retry in the next loop. if err != nil { if removedAny {