Skip to content

Commit

Permalink
Reduce describe ASG calls when regenerating the instance to ASG cache
Browse files Browse the repository at this point in the history
Previously we were calling the describe ASGs API once for each ASG managed by
the cluster autoscaler. We now make one call.
  • Loading branch information
Nic Cope committed Nov 18, 2017
1 parent 807ca22 commit 8f56e0b
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 41 deletions.
23 changes: 15 additions & 8 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"k8s.io/apimachinery/pkg/util/wait"

"github.com/aws/aws-sdk-go/aws"
"github.com/golang/glog"
)

Expand Down Expand Up @@ -136,16 +137,22 @@ func (m *autoScalingGroups) regenerateCacheWithoutLock() error {

newCache := make(map[AwsRef]*Asg)

for _, asg := range m.registeredAsgs {
glog.V(4).Infof("Regenerating ASG information for %s", asg.config.Name)
names := make([]string, len(m.registeredAsgs))
configs := make(map[string]*Asg)
for i, asg := range m.registeredAsgs {
names[i] = asg.config.Name
configs[asg.config.Name] = asg.config
}

group, err := m.service.getAutoscalingGroupByName(asg.config.Name)
if err != nil {
return err
}
glog.V(4).Infof("Regenerating instance to ASG map for ASGs: %v", names)
groups, err := m.service.getAutoscalingGroupsByNames(names)
if err != nil {
return err
}
for _, group := range groups {
for _, instance := range group.Instances {
ref := AwsRef{Name: *instance.InstanceId}
newCache[ref] = asg.config
ref := AwsRef{Name: aws.StringValue(instance.InstanceId)}
newCache[ref] = configs[aws.StringValue(group.AutoScalingGroupName)]
}
}

Expand Down
61 changes: 46 additions & 15 deletions cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func newTestAwsManagerWithService(service autoScaling) *AwsManager {
}

func testDescribeAutoScalingGroupsOutput(desiredCap int64, instanceIds ...string) *autoscaling.DescribeAutoScalingGroupsOutput {
return testNamedDescribeAutoScalingGroupsOutput("UNUSED", desiredCap, instanceIds...)
}

func testNamedDescribeAutoScalingGroupsOutput(groupName string, desiredCap int64, instanceIds ...string) *autoscaling.DescribeAutoScalingGroupsOutput {
instances := []*autoscaling.Instance{}
for _, id := range instanceIds {
instances = append(instances, &autoscaling.Instance{
Expand All @@ -98,8 +102,9 @@ func testDescribeAutoScalingGroupsOutput(desiredCap int64, instanceIds ...string
return &autoscaling.DescribeAutoScalingGroupsOutput{
AutoScalingGroups: []*autoscaling.Group{
{
DesiredCapacity: aws.Int64(desiredCap),
Instances: instances,
AutoScalingGroupName: aws.String(groupName),
DesiredCapacity: aws.Int64(desiredCap),
Instances: instances,
},
},
}
Expand Down Expand Up @@ -160,18 +165,24 @@ func TestNodeGroupForNode(t *testing.T) {
err := provider.addNodeGroup("1:5:test-asg")
assert.NoError(t, err)

service.On("DescribeAutoScalingGroups", &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{provider.asgs[0].Name}),
MaxRecords: aws.Int64(1),
}).Return(testDescribeAutoScalingGroupsOutput(1, "test-instance-id"))
service.On("DescribeAutoScalingGroupsPages",
&autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{provider.asgs[0].Name}),
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
},
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", 1, "test-instance-id"), false)
}).Return(nil)

group, err := provider.NodeGroupForNode(node)

assert.NoError(t, err)
assert.Equal(t, group.Id(), "test-asg")
assert.Equal(t, group.MinSize(), 1)
assert.Equal(t, group.MaxSize(), 5)
service.AssertNumberOfCalls(t, "DescribeAutoScalingGroups", 1)
service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)

// test node in cluster that is not in a group managed by cluster autoscaler
nodeNotInGroup := &apiv1.Node{
Expand All @@ -184,7 +195,7 @@ func TestNodeGroupForNode(t *testing.T) {

assert.NoError(t, err)
assert.Nil(t, group)
service.AssertNumberOfCalls(t, "DescribeAutoScalingGroups", 2)
service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2)
}

func TestAwsRefFromProviderId(t *testing.T) {
Expand Down Expand Up @@ -265,10 +276,16 @@ func TestBelongs(t *testing.T) {
err := provider.addNodeGroup("1:5:test-asg")
assert.NoError(t, err)

service.On("DescribeAutoScalingGroups", &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{provider.asgs[0].Name}),
MaxRecords: aws.Int64(1),
}).Return(testDescribeAutoScalingGroupsOutput(1, "test-instance-id"))
service.On("DescribeAutoScalingGroupsPages",
&autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{provider.asgs[0].Name}),
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
},
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", 1, "test-instance-id"), false)
}).Return(nil)

invalidNode := &apiv1.Node{
Spec: apiv1.NodeSpec{
Expand All @@ -277,7 +294,7 @@ func TestBelongs(t *testing.T) {
}
_, err = provider.asgs[0].Belongs(invalidNode)
assert.Error(t, err)
service.AssertNumberOfCalls(t, "DescribeAutoScalingGroups", 1)
service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)

validNode := &apiv1.Node{
Spec: apiv1.NodeSpec{
Expand All @@ -289,7 +306,7 @@ func TestBelongs(t *testing.T) {
assert.NoError(t, err)
// As "test-instance-id" is already known to be managed by test-asg since the first `Belongs` call,
// No additional DescribAutoScalingGroup call is made
service.AssertNumberOfCalls(t, "DescribeAutoScalingGroups", 1)
service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)
}

func TestDeleteNodes(t *testing.T) {
Expand All @@ -307,11 +324,24 @@ func TestDeleteNodes(t *testing.T) {
err := provider.addNodeGroup("1:5:test-asg")
assert.NoError(t, err)

// Look up the current number of instances...
service.On("DescribeAutoScalingGroups", &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{provider.asgs[0].Name}),
MaxRecords: aws.Int64(1),
}).Return(testDescribeAutoScalingGroupsOutput(2, "test-instance-id", "second-test-instance-id"))

// Refresh the instance to ASG cache...
service.On("DescribeAutoScalingGroupsPages",
&autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{provider.asgs[0].Name}),
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
},
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", 2, "test-instance-id", "second-test-instance-id"), false)
}).Return(nil)

node := &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: "aws:///us-east-1a/test-instance-id",
Expand All @@ -320,7 +350,8 @@ func TestDeleteNodes(t *testing.T) {
err = provider.asgs[0].DeleteNodes([]*apiv1.Node{node})
assert.NoError(t, err)
service.AssertNumberOfCalls(t, "TerminateInstanceInAutoScalingGroup", 1)
service.AssertNumberOfCalls(t, "DescribeAutoScalingGroups", 2)
service.AssertNumberOfCalls(t, "DescribeAutoScalingGroups", 1)
service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)
}

func TestGetResourceLimiter(t *testing.T) {
Expand Down
35 changes: 17 additions & 18 deletions cluster-autoscaler/cloudprovider/aws/aws_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,20 @@ func TestFetchExplicitAsgs(t *testing.T) {
},
})

s.On("DescribeAutoScalingGroupsPages",
&autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{groupname}),
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
},
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
fn(&autoscaling.DescribeAutoScalingGroupsOutput{
AutoScalingGroups: []*autoscaling.Group{
{AutoScalingGroupName: aws.String(groupname)},
}}, false)
}).Return(nil)

do := cloudprovider.NodeGroupDiscoveryOptions{
// Register the same node group twice with different max nodes.
// The intention is to test that the asgs.Register method will update
Expand Down Expand Up @@ -159,7 +173,8 @@ func TestFetchAutoAsgs(t *testing.T) {
}}, false)
}).Return(nil).Once()

// Describe the group to register it
// Describe the group to register it, then again to generate the instance
// cache.
s.On("DescribeAutoScalingGroupsPages",
&autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{groupname}),
Expand All @@ -174,23 +189,7 @@ func TestFetchAutoAsgs(t *testing.T) {
MinSize: aws.Int64(int64(min)),
MaxSize: aws.Int64(int64(max)),
}}}, false)
}).Return(nil)

// Regenerate the instance cache
s.On("DescribeAutoScalingGroups",
&autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{groupname}),
MaxRecords: aws.Int64(1),
},
).Return(&autoscaling.DescribeAutoScalingGroupsOutput{
AutoScalingGroups: []*autoscaling.Group{
{
AutoScalingGroupName: aws.String(groupname),
MinSize: aws.Int64(int64(min)),
MaxSize: aws.Int64(int64(max)),
},
},
})
}).Return(nil).Twice()

do := cloudprovider.NodeGroupDiscoveryOptions{
NodeGroupAutoDiscoverySpecs: []string{fmt.Sprintf("asg:tag=%s", strings.Join(tags, ","))},
Expand Down

0 comments on commit 8f56e0b

Please sign in to comment.