Skip to content

Commit

Permalink
Add support for tags on AWS managed nodegroups to indicate resources
Browse files Browse the repository at this point in the history
  • Loading branch information
tbalzer authored and ClareCat committed Oct 31, 2023
1 parent f8aa91c commit 68dbbaa
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 8 deletions.
33 changes: 33 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,18 @@ func (m *AwsManager) buildNodeFromTemplate(asg *asg, template *asgTemplate) (*ap
node.Spec.Taints = append(node.Spec.Taints, mngTaints...)
klog.V(5).Infof("node.Spec.Taints : %+v\n", node.Spec.Taints)
}

mngTags, err := m.managedNodegroupCache.getManagedNodegroupTags(nodegroupName, clusterName)
if err != nil {
klog.Errorf("Failed to get tags from EKS DescribeNodegroup API for nodegroup %s in cluster %s because %s.", nodegroupName, clusterName, err)
} else if mngTags != nil && len(mngTags) > 0 {
resourcesFromMngTags := extractAllocatableResourcesFromTags(mngTags)
klog.V(5).Infof("Extracted resources from EKS nodegroup tags %v", resourcesFromTags)
// ManagedNodeGroup resource-indicating tags override conflicting tags on the ASG if they exist
for resourceName, val := range resourcesFromMngTags {
node.Status.Capacity[apiv1.ResourceName(resourceName)] = *val
}
}
}

node.Status.Conditions = cloudprovider.BuildReadyConditions()
Expand Down Expand Up @@ -460,6 +472,27 @@ func extractAllocatableResourcesFromAsg(tags []*autoscaling.TagDescription) map[
return result
}

func extractAllocatableResourcesFromTags(tags map[string]string) map[string]*resource.Quantity {
result := make(map[string]*resource.Quantity)

for k, v := range tags {
splits := strings.Split(k, "k8s.io/cluster-autoscaler/node-template/resources/")
if len(splits) > 1 {
label := splits[1]
if label != "" {
quantity, err := resource.ParseQuantity(v)
if err != nil {
klog.Warningf("Failed to parse resource quanitity '%s' for resource '%s'", v, label)
continue
}
result[label] = &quantity
}
}
}

return result
}

func extractTaintsFromAsg(tags []*autoscaling.TagDescription) []apiv1.Taint {
taints := make([]apiv1.Taint, 0)

Expand Down
30 changes: 30 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,27 @@ func TestExtractAllocatableResourcesFromAsg(t *testing.T) {
assert.Equal(t, resource.NewQuantity(5, resource.DecimalSI).String(), labels["custom-resource"].String())
}

func TestExtractAllocatableResourcesFromTags(t *testing.T) {
tags := map[string]string{
"k8s.io/cluster-autoscaler/node-template/resources/cpu": "100m",
"k8s.io/cluster-autoscaler/node-template/resources/memory": "100M",
"k8s.io/cluster-autoscaler/node-template/resources/ephemeral-storage": "20G",
"k8s.io/cluster-autoscaler/node-template/resources/custom-resource": "5",
"k8s.io/cluster-autoscaler/node-template/resources/error-resource": "GG",
}

labels := extractAllocatableResourcesFromTags(tags)

assert.Equal(t, 4, len(labels))
assert.NotContains(t, labels, "error-resource")
assert.Equal(t, resource.NewMilliQuantity(100, resource.DecimalSI).String(), labels["cpu"].String())
expectedMemory := resource.MustParse("100M")
assert.Equal(t, (&expectedMemory).String(), labels["memory"].String())
expectedEphemeralStorage := resource.MustParse("20G")
assert.Equal(t, (&expectedEphemeralStorage).String(), labels["ephemeral-storage"].String())
assert.Equal(t, resource.NewQuantity(5, resource.DecimalSI).String(), labels["custom-resource"].String())
}

func TestGetAsgOptions(t *testing.T) {
defaultOptions := config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: 0.1,
Expand Down Expand Up @@ -215,11 +236,17 @@ func TestBuildNodeFromTemplateWithManagedNodegroup(t *testing.T) {
Value: taintValue2,
}

ephemeralStorageKey := "ephemeral-storage"
diskSizeGb := 80
tagKey1 := fmt.Sprintf("k8s.io/cluster-autoscaler/node-template/resources/%s", ephemeralStorageKey)
tagValue1 := fmt.Sprintf("%dGi", diskSizeGb)

err := mngCache.Add(managedNodegroupCachedObject{
name: ngNameLabelValue,
clusterName: clusterNameLabelValue,
taints: []apiv1.Taint{taint1, taint2},
labels: map[string]string{labelKey1: labelValue1, labelKey2: labelValue2},
tags: map[string]string{tagKey1: tagValue1},
})
require.NoError(t, err)

Expand All @@ -242,6 +269,9 @@ func TestBuildNodeFromTemplateWithManagedNodegroup(t *testing.T) {
},
})
assert.NoError(t, observedErr)
esValue, esExist := observedNode.Status.Capacity[apiv1.ResourceName(ephemeralStorageKey)]
assert.True(t, esExist)
assert.Equal(t, int64(diskSizeGb*1024*1024*1024), esValue.Value())
assert.GreaterOrEqual(t, len(observedNode.Labels), 4)
ngNameValue, ngLabelExist := observedNode.Labels["nodegroup-name"]
assert.True(t, ngLabelExist)
Expand Down
16 changes: 13 additions & 3 deletions cluster-autoscaler/cloudprovider/aws/aws_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type awsWrapper struct {
eksI
}

func (m *awsWrapper) getManagedNodegroupInfo(nodegroupName string, clusterName string) ([]apiv1.Taint, map[string]string, error) {
func (m *awsWrapper) getManagedNodegroupInfo(nodegroupName string, clusterName string) ([]apiv1.Taint, map[string]string, map[string]string, error) {
params := &eks.DescribeNodegroupInput{
ClusterName: &clusterName,
NodegroupName: &nodegroupName,
Expand All @@ -66,13 +66,14 @@ func (m *awsWrapper) getManagedNodegroupInfo(nodegroupName string, clusterName s
r, err := m.DescribeNodegroup(params)
observeAWSRequest("DescribeNodegroup", err, start)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

klog.V(6).Infof("DescribeNodegroup output : %+v\n", r)

taints := make([]apiv1.Taint, 0)
labels := make(map[string]string)
tags := make(map[string]string)

// Labels will include diskSize, amiType, capacityType, version
if r.Nodegroup.DiskSize != nil {
Expand Down Expand Up @@ -104,6 +105,15 @@ func (m *awsWrapper) getManagedNodegroupInfo(nodegroupName string, clusterName s
}
}

if r.Nodegroup.Tags != nil && len(r.Nodegroup.Tags) > 0 {
tagsMap := r.Nodegroup.Tags
for k, v := range tagsMap {
if v != nil {
tags[k] = *v
}
}
}

if r.Nodegroup.Taints != nil && len(r.Nodegroup.Taints) > 0 {
taintList := r.Nodegroup.Taints
for _, taint := range taintList {
Expand All @@ -117,7 +127,7 @@ func (m *awsWrapper) getManagedNodegroupInfo(nodegroupName string, clusterName s
}
}

return taints, labels, nil
return taints, labels, tags, nil
}

func (m *awsWrapper) getInstanceTypeByLaunchConfigNames(launchConfigToQuery []*string) (map[string]string, error) {
Expand Down
19 changes: 16 additions & 3 deletions cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ func TestGetManagedNodegroup(t *testing.T) {
capacityType := "testCapacityType"
k8sVersion := "1.19"

tagKey1 := "tag 1"
tagValue1 := "value 1"
tagKey2 := "tag 2"
tagValue2 := "value 2"

// Create test nodegroup
testNodegroup := eks.Nodegroup{
AmiType: &amiType,
Expand All @@ -147,14 +152,15 @@ func TestGetManagedNodegroup(t *testing.T) {
CapacityType: &capacityType,
Version: &k8sVersion,
Taints: []*eks.Taint{&taint1, &taint2},
Tags: map[string]*string{tagKey1: &tagValue1, tagKey2: &tagValue2},
}

k.On("DescribeNodegroup", &eks.DescribeNodegroupInput{
ClusterName: &clusterName,
NodegroupName: &nodegroupName,
}).Return(&eks.DescribeNodegroupOutput{Nodegroup: &testNodegroup}, nil)

taintList, labelMap, err := awsWrapper.getManagedNodegroupInfo(nodegroupName, clusterName)
taintList, labelMap, tagMap, err := awsWrapper.getManagedNodegroupInfo(nodegroupName, clusterName)
assert.Nil(t, err)
assert.Equal(t, len(taintList), 2)
assert.Equal(t, taintList[0].Effect, apiv1.TaintEffect(taintEffect1))
Expand All @@ -171,6 +177,9 @@ func TestGetManagedNodegroup(t *testing.T) {
assert.Equal(t, labelMap["capacityType"], capacityType)
assert.Equal(t, labelMap["k8sVersion"], k8sVersion)
assert.Equal(t, labelMap["eks.amazonaws.com/nodegroup"], nodegroupName)
assert.Equal(t, len(tagMap), 2)
assert.Equal(t, tagMap[tagKey1], tagValue1)
assert.Equal(t, tagMap[tagKey2], tagValue2)
}

func TestGetManagedNodegroupWithNilValues(t *testing.T) {
Expand Down Expand Up @@ -198,21 +207,23 @@ func TestGetManagedNodegroupWithNilValues(t *testing.T) {
CapacityType: &capacityType,
Version: &k8sVersion,
Taints: nil,
Tags: nil,
}

k.On("DescribeNodegroup", &eks.DescribeNodegroupInput{
ClusterName: &clusterName,
NodegroupName: &nodegroupName,
}).Return(&eks.DescribeNodegroupOutput{Nodegroup: &testNodegroup}, nil)

taintList, labelMap, err := awsWrapper.getManagedNodegroupInfo(nodegroupName, clusterName)
taintList, labelMap, tagMap, err := awsWrapper.getManagedNodegroupInfo(nodegroupName, clusterName)
assert.Nil(t, err)
assert.Equal(t, len(taintList), 0)
assert.Equal(t, len(labelMap), 4)
assert.Equal(t, labelMap["amiType"], amiType)
assert.Equal(t, labelMap["capacityType"], capacityType)
assert.Equal(t, labelMap["k8sVersion"], k8sVersion)
assert.Equal(t, labelMap["eks.amazonaws.com/nodegroup"], nodegroupName)
assert.Equal(t, len(tagMap), 0)
}

func TestGetManagedNodegroupWithEmptyValues(t *testing.T) {
Expand Down Expand Up @@ -240,21 +251,23 @@ func TestGetManagedNodegroupWithEmptyValues(t *testing.T) {
CapacityType: &capacityType,
Version: &k8sVersion,
Taints: make([]*eks.Taint, 0),
Tags: make(map[string]*string),
}

k.On("DescribeNodegroup", &eks.DescribeNodegroupInput{
ClusterName: &clusterName,
NodegroupName: &nodegroupName,
}).Return(&eks.DescribeNodegroupOutput{Nodegroup: &testNodegroup}, nil)

taintList, labelMap, err := awsWrapper.getManagedNodegroupInfo(nodegroupName, clusterName)
taintList, labelMap, tagMap, err := awsWrapper.getManagedNodegroupInfo(nodegroupName, clusterName)
assert.Nil(t, err)
assert.Equal(t, len(taintList), 0)
assert.Equal(t, len(labelMap), 4)
assert.Equal(t, labelMap["amiType"], amiType)
assert.Equal(t, labelMap["capacityType"], capacityType)
assert.Equal(t, labelMap["k8sVersion"], k8sVersion)
assert.Equal(t, labelMap["eks.amazonaws.com/nodegroup"], nodegroupName)
assert.Equal(t, len(tagMap), 0)
}

func TestMoreThen100Groups(t *testing.T) {
Expand Down
16 changes: 14 additions & 2 deletions cluster-autoscaler/cloudprovider/aws/managed_nodegroup_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type managedNodegroupCachedObject struct {
clusterName string
taints []apiv1.Taint
labels map[string]string
tags map[string]string
}

type mngJitterClock struct {
Expand Down Expand Up @@ -92,14 +93,15 @@ func (c *mngJitterClock) Since(ts time.Time) time.Duration {
}

func (m *managedNodegroupCache) getManagedNodegroup(nodegroupName string, clusterName string) (*managedNodegroupCachedObject, error) {
taintList, labelMap, err := m.awsService.getManagedNodegroupInfo(nodegroupName, clusterName)
taintList, labelMap, tagMap, err := m.awsService.getManagedNodegroupInfo(nodegroupName, clusterName)
if err != nil {
// If there's an error cache an empty nodegroup to limit failed calls to the EKS API
newEmptyNodegroup := managedNodegroupCachedObject{
name: nodegroupName,
clusterName: clusterName,
taints: nil,
labels: nil,
tags: nil,
}

m.Add(newEmptyNodegroup)
Expand All @@ -111,6 +113,7 @@ func (m *managedNodegroupCache) getManagedNodegroup(nodegroupName string, cluste
clusterName: clusterName,
taints: taintList,
labels: labelMap,
tags: tagMap,
}

m.Add(newNodegroup)
Expand All @@ -130,7 +133,7 @@ func (m managedNodegroupCache) getManagedNodegroupInfoObject(nodegroupName strin

managedNodegroupInfo, err := m.getManagedNodegroup(nodegroupName, clusterName)
if err != nil {
klog.Errorf("Failed to query the managed nodegroup %s for the cluster %s while looking for labels/taints: %v", nodegroupName, clusterName, err)
klog.Errorf("Failed to query the managed nodegroup %s for the cluster %s while looking for labels/taints/tags: %v", nodegroupName, clusterName, err)
return nil, err
}
return managedNodegroupInfo, nil
Expand All @@ -145,6 +148,15 @@ func (m managedNodegroupCache) getManagedNodegroupLabels(nodegroupName string, c
return getManagedNodegroupInfoObject.labels, nil
}

func (m managedNodegroupCache) getManagedNodegroupTags(nodegroupName string, clusterName string) (map[string]string, error) {
getManagedNodegroupInfoObject, err := m.getManagedNodegroupInfoObject(nodegroupName, clusterName)
if err != nil {
return nil, err
}

return getManagedNodegroupInfoObject.tags, nil
}

func (m managedNodegroupCache) getManagedNodegroupTaints(nodegroupName string, clusterName string) ([]apiv1.Taint, error) {
getManagedNodegroupInfoObject, err := m.getManagedNodegroupInfoObject(nodegroupName, clusterName)
if err != nil {
Expand Down
Loading

0 comments on commit 68dbbaa

Please sign in to comment.