Skip to content

Commit

Permalink
fix: Ensure shallow copy of data when returning back cached data (aws…
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed May 13, 2024
1 parent b981570 commit 6eaf34d
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 43 deletions.
10 changes: 7 additions & 3 deletions pkg/providers/amifamily/ami.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ func (p *Provider) Get(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, opt

func (p *Provider) getDefaultAMIs(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, options *Options) (res AMIs, err error) {
if images, ok := p.cache.Get(lo.FromPtr(nodeClass.Spec.AMIFamily)); ok {
return images.(AMIs), nil
// Ensure what's returned from this function is a deep-copy of AMIs so alterations
// to the data don't affect the original
return append(AMIs{}, images.(AMIs)...), nil
}
amiFamily := GetAMIFamily(nodeClass.Spec.AMIFamily, options)
kubernetesVersion, err := p.versionProvider.Get(ctx)
Expand Down Expand Up @@ -189,8 +191,10 @@ func (p *Provider) getAMIs(ctx context.Context, terms []v1beta1.AMISelectorTerm)
if err != nil {
return nil, err
}
if images, ok := p.cache.Get(fmt.Sprint(hash)); ok {
return images.(AMIs), nil
if images, ok := p.cache.Get(fmt.Sprintf("%d", hash)); ok {
// Ensure what's returned from this function is a deep-copy of AMIs so alterations
// to the data don't affect the original
return append(AMIs{}, images.(AMIs)...), nil
}
images := map[uint64]AMI{}
for _, filtersAndOwners := range filterAndOwnerSets {
Expand Down
121 changes: 84 additions & 37 deletions pkg/providers/amifamily/ami_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"sort"
"sync"
"testing"
"time"

Expand All @@ -26,9 +27,11 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
. "knative.dev/pkg/logging/testing"

coresettings "github.com/aws/karpenter-core/pkg/apis/settings"
corev1beta1 "github.com/aws/karpenter-core/pkg/apis/v1beta1"
"github.com/aws/karpenter-core/pkg/operator/scheme"
"github.com/aws/karpenter-core/pkg/scheduling"
coretest "github.com/aws/karpenter-core/pkg/test"
Expand Down Expand Up @@ -71,7 +74,7 @@ var _ = BeforeEach(func() {
{
Name: aws.String(amd64AMI),
ImageId: aws.String("amd64-ami-id"),
CreationDate: aws.String(time.Now().Format(time.RFC3339)),
CreationDate: aws.String(time.Time{}.Format(time.RFC3339)),
Architecture: aws.String("x86_64"),
Tags: []*ec2.Tag{
{Key: aws.String("Name"), Value: aws.String(amd64AMI)},
Expand All @@ -81,7 +84,7 @@ var _ = BeforeEach(func() {
{
Name: aws.String(arm64AMI),
ImageId: aws.String("arm64-ami-id"),
CreationDate: aws.String(time.Now().Add(time.Minute).Format(time.RFC3339)),
CreationDate: aws.String(time.Time{}.Add(time.Minute).Format(time.RFC3339)),
Architecture: aws.String("arm64"),
Tags: []*ec2.Tag{
{Key: aws.String("Name"), Value: aws.String(arm64AMI)},
Expand All @@ -91,7 +94,7 @@ var _ = BeforeEach(func() {
{
Name: aws.String(amd64NvidiaAMI),
ImageId: aws.String("amd64-nvidia-ami-id"),
CreationDate: aws.String(time.Now().Add(2 * time.Minute).Format(time.RFC3339)),
CreationDate: aws.String(time.Time{}.Add(2 * time.Minute).Format(time.RFC3339)),
Architecture: aws.String("x86_64"),
Tags: []*ec2.Tag{
{Key: aws.String("Name"), Value: aws.String(amd64NvidiaAMI)},
Expand All @@ -101,7 +104,7 @@ var _ = BeforeEach(func() {
{
Name: aws.String(arm64NvidiaAMI),
ImageId: aws.String("arm64-nvidia-ami-id"),
CreationDate: aws.String(time.Now().Add(2 * time.Minute).Format(time.RFC3339)),
CreationDate: aws.String(time.Time{}.Add(2 * time.Minute).Format(time.RFC3339)),
Architecture: aws.String("arm64"),
Tags: []*ec2.Tag{
{Key: aws.String("Name"), Value: aws.String(arm64NvidiaAMI)},
Expand Down Expand Up @@ -183,42 +186,86 @@ var _ = Describe("AMIProvider", func() {
Expect(err).ToNot(HaveOccurred())
Expect(amis).To(HaveLen(0))
})

It("should succeed to partially resolve AMIs if all SSM aliases don't exist (Al2)", func() {
nodeClass.Spec.AMIFamily = &v1beta1.AMIFamilyAL2
// No GPU AMI exists here
awsEnv.SSMAPI.Parameters = map[string]string{
fmt.Sprintf("/aws/service/eks/optimized-ami/%s/amazon-linux-2/recommended/image_id", version): amd64AMI,
fmt.Sprintf("/aws/service/eks/optimized-ami/%s/amazon-linux-2-arm64/recommended/image_id", version): arm64AMI,
It("should not cause data races when calling Get() simultaneously", func() {
nodeClass.Spec.AMISelectorTerms = []v1beta1.AMISelectorTerm{
{
ID: "amd64-ami-id",
},
{
ID: "arm64-ami-id",
},
}
// Only 2 of the requirements sets for the SSM aliases will resolve
amis, err := awsEnv.AMIProvider.Get(ctx, nodeClass, &amifamily.Options{})
Expect(err).ToNot(HaveOccurred())
Expect(amis).To(HaveLen(2))
})
It("should succeed to partially resolve AMIs if all SSM aliases don't exist (Bottlerocket)", func() {
nodeClass.Spec.AMIFamily = &v1beta1.AMIFamilyBottlerocket
// No GPU AMI exists for AM64 here
awsEnv.SSMAPI.Parameters = map[string]string{
fmt.Sprintf("/aws/service/bottlerocket/aws-k8s-%s/x86_64/latest/image_id", version): amd64AMI,
fmt.Sprintf("/aws/service/bottlerocket/aws-k8s-%s-nvidia/x86_64/latest/image_id", version): amd64NvidiaAMI,
fmt.Sprintf("/aws/service/bottlerocket/aws-k8s-%s/arm64/latest/image_id", version): arm64AMI,
wg := sync.WaitGroup{}
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
defer GinkgoRecover()
images, err := awsEnv.AMIProvider.Get(ctx, nodeClass, &amifamily.Options{})
Expect(err).ToNot(HaveOccurred())

Expect(images).To(HaveLen(2))
// Sort everything in parallel and ensure that we don't get data races
images.Sort()
Expect(images).To(BeEquivalentTo([]amifamily.AMI{
{
Name: arm64AMI,
AmiID: "arm64-ami-id",
CreationDate: time.Time{}.Add(time.Minute).Format(time.RFC3339),
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.LabelArchStable: corev1beta1.ArchitectureArm64,
}),
},
{
Name: amd64AMI,
AmiID: "amd64-ami-id",
CreationDate: time.Time{}.Format(time.RFC3339),
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.LabelArchStable: corev1beta1.ArchitectureAmd64,
}),
},
}))
}()
}
// Only 4 of the requirements sets for the SSM aliases will resolve
amis, err := awsEnv.AMIProvider.Get(ctx, nodeClass, &amifamily.Options{})
Expect(err).ToNot(HaveOccurred())
Expect(amis).To(HaveLen(4))
wg.Wait()
})
It("should succeed to partially resolve AMIs if all SSM aliases don't exist (Ubuntu)", func() {
nodeClass.Spec.AMIFamily = &v1beta1.AMIFamilyUbuntu
// No AMD64 AMI exists here
awsEnv.SSMAPI.Parameters = map[string]string{
fmt.Sprintf("/aws/service/canonical/ubuntu/eks/20.04/%s/stable/current/arm64/hvm/ebs-gp2/ami-id", version): arm64AMI,
}
// Only 1 of the requirements sets for the SSM aliases will resolve
amis, err := awsEnv.AMIProvider.Get(ctx, nodeClass, &amifamily.Options{})
Expect(err).ToNot(HaveOccurred())
Expect(amis).To(HaveLen(1))
Context("SSM Alias Missing", func() {
It("should succeed to partially resolve AMIs if all SSM aliases don't exist (Al2)", func() {
nodeClass.Spec.AMIFamily = &v1beta1.AMIFamilyAL2
// No GPU AMI exists here
awsEnv.SSMAPI.Parameters = map[string]string{
fmt.Sprintf("/aws/service/eks/optimized-ami/%s/amazon-linux-2/recommended/image_id", version): amd64AMI,
fmt.Sprintf("/aws/service/eks/optimized-ami/%s/amazon-linux-2-arm64/recommended/image_id", version): arm64AMI,
}
// Only 2 of the requirements sets for the SSM aliases will resolve
amis, err := awsEnv.AMIProvider.Get(ctx, nodeClass, &amifamily.Options{})
Expect(err).ToNot(HaveOccurred())
Expect(amis).To(HaveLen(2))
})
It("should succeed to partially resolve AMIs if all SSM aliases don't exist (Bottlerocket)", func() {
nodeClass.Spec.AMIFamily = &v1beta1.AMIFamilyBottlerocket
// No GPU AMI exists for AM64 here
awsEnv.SSMAPI.Parameters = map[string]string{
fmt.Sprintf("/aws/service/bottlerocket/aws-k8s-%s/x86_64/latest/image_id", version): amd64AMI,
fmt.Sprintf("/aws/service/bottlerocket/aws-k8s-%s-nvidia/x86_64/latest/image_id", version): amd64NvidiaAMI,
fmt.Sprintf("/aws/service/bottlerocket/aws-k8s-%s/arm64/latest/image_id", version): arm64AMI,
}
// Only 4 of the requirements sets for the SSM aliases will resolve
amis, err := awsEnv.AMIProvider.Get(ctx, nodeClass, &amifamily.Options{})
Expect(err).ToNot(HaveOccurred())
Expect(amis).To(HaveLen(4))
})
It("should succeed to partially resolve AMIs if all SSM aliases don't exist (Ubuntu)", func() {
nodeClass.Spec.AMIFamily = &v1beta1.AMIFamilyUbuntu
// No AMD64 AMI exists here
awsEnv.SSMAPI.Parameters = map[string]string{
fmt.Sprintf("/aws/service/canonical/ubuntu/eks/20.04/%s/stable/current/arm64/hvm/ebs-gp2/ami-id", version): arm64AMI,
}
// Only 1 of the requirements sets for the SSM aliases will resolve
amis, err := awsEnv.AMIProvider.Get(ctx, nodeClass, &amifamily.Options{})
Expect(err).ToNot(HaveOccurred())
Expect(amis).To(HaveLen(1))
})
})
Context("AMI Selectors", func() {
It("should have default owners and use tags when prefixes aren't set", func() {
Expand Down
4 changes: 3 additions & 1 deletion pkg/providers/instancetype/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ func (p *Provider) List(ctx context.Context, kc *corev1beta1.KubeletConfiguratio
key := fmt.Sprintf("%d-%d-%s-%016x-%016x", p.instanceTypesSeqNum, p.unavailableOfferings.SeqNum, nodeClass.UID, instanceTypeZonesHash, kcHash)

if item, ok := p.cache.Get(key); ok {
return item.([]*cloudprovider.InstanceType), nil
// Ensure what's returned from this function is a shallow-copy of the slice (not a deep-copy of the data itself)
// so that modifications to the ordering of the data don't affect the original
return append([]*cloudprovider.InstanceType{}, item.([]*cloudprovider.InstanceType)...), nil
}
// Reject any instance types that don't have any offerings due to zone
result := lo.Reject(lo.Map(instanceTypes, func(i *ec2.InstanceTypeInfo, _ int) *cloudprovider.InstanceType {
Expand Down
36 changes: 36 additions & 0 deletions pkg/providers/instancetype/nodeclass_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math"
"sort"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -1523,4 +1524,39 @@ var _ = Describe("NodeClass/InstanceTypes", func() {
})
})
})
It("should not cause data races when calling List() simultaneously", func() {
mu := sync.RWMutex{}
var instanceTypeOrder []string
wg := sync.WaitGroup{}
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
defer GinkgoRecover()
instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{})
Expect(err).ToNot(HaveOccurred())

// Sort everything in parallel and ensure that we don't get data races
sort.Slice(instanceTypes, func(i, j int) bool {
return instanceTypes[i].Name < instanceTypes[j].Name
})
// Get the ordering of the instance types based on name
tempInstanceTypeOrder := lo.Map(instanceTypes, func(i *corecloudprovider.InstanceType, _ int) string {
return i.Name
})
// Expect that all the elements in the instance type list are unique
Expect(lo.Uniq(tempInstanceTypeOrder)).To(HaveLen(len(tempInstanceTypeOrder)))

// We have to lock since we are doing simultaneous access to this value
mu.Lock()
if len(instanceTypeOrder) == 0 {
instanceTypeOrder = tempInstanceTypeOrder
} else {
Expect(tempInstanceTypeOrder).To(BeEquivalentTo(instanceTypeOrder))
}
mu.Unlock()
}()
}
wg.Wait()
})
})
36 changes: 36 additions & 0 deletions pkg/providers/instancetype/nodetemplate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math"
"sort"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -1550,4 +1551,39 @@ var _ = Describe("NodeTemplate/InstanceTypes", func() {
})
})
})
It("should not cause data races when calling List() simultaneously", func() {
mu := sync.RWMutex{}
var instanceTypeOrder []string
wg := sync.WaitGroup{}
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
defer GinkgoRecover()
instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, nodepoolutil.NewKubeletConfiguration(&v1alpha5.KubeletConfiguration{}), nodeclassutil.New(&v1alpha1.AWSNodeTemplate{}))
Expect(err).ToNot(HaveOccurred())

// Sort everything in parallel and ensure that we don't get data races
sort.Slice(instanceTypes, func(i, j int) bool {
return instanceTypes[i].Name < instanceTypes[j].Name
})
// Get the ordering of the instance types based on name
tempInstanceTypeOrder := lo.Map(instanceTypes, func(i *corecloudprovider.InstanceType, _ int) string {
return i.Name
})
// Expect that all the elements in the instance type list are unique
Expect(lo.Uniq(tempInstanceTypeOrder)).To(HaveLen(len(tempInstanceTypeOrder)))

// We have to lock since we are doing simultaneous access to this value
mu.Lock()
if len(instanceTypeOrder) == 0 {
instanceTypeOrder = tempInstanceTypeOrder
} else {
Expect(tempInstanceTypeOrder).To(BeEquivalentTo(instanceTypeOrder))
}
mu.Unlock()
}()
}
wg.Wait()
})
})
4 changes: 3 additions & 1 deletion pkg/providers/securitygroup/securitygroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func (p *Provider) getSecurityGroups(ctx context.Context, filterSets [][]*ec2.Fi
return nil, err
}
if sg, ok := p.cache.Get(fmt.Sprint(hash)); ok {
return sg.([]*ec2.SecurityGroup), nil
// Ensure what's returned from this function is a shallow-copy of the slice (not a deep-copy of the data itself)
// so that modifications to the ordering of the data don't affect the original
return append([]*ec2.SecurityGroup{}, sg.([]*ec2.SecurityGroup)...), nil
}
securityGroups := map[string]*ec2.SecurityGroup{}
for _, filters := range filterSets {
Expand Down
68 changes: 68 additions & 0 deletions pkg/providers/securitygroup/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package securitygroup_test

import (
"context"
"sort"
"sync"
"testing"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -268,6 +270,72 @@ var _ = Describe("SecurityGroupProvider", func() {
},
}, securityGroups)
})
It("should not cause data races when calling List() simultaneously", func() {
wg := sync.WaitGroup{}
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
defer GinkgoRecover()
securityGroups, err := awsEnv.SecurityGroupProvider.List(ctx, nodeClass)
Expect(err).ToNot(HaveOccurred())

Expect(securityGroups).To(HaveLen(3))
// Sort everything in parallel and ensure that we don't get data races
sort.Slice(securityGroups, func(i, j int) bool {
return *securityGroups[i].GroupId < *securityGroups[j].GroupId
})
Expect(securityGroups).To(BeEquivalentTo([]*ec2.SecurityGroup{
{
GroupId: lo.ToPtr("sg-test1"),
GroupName: lo.ToPtr("securityGroup-test1"),
Tags: []*ec2.Tag{
{
Key: lo.ToPtr("Name"),
Value: lo.ToPtr("test-security-group-1"),
},
{
Key: lo.ToPtr("foo"),
Value: lo.ToPtr("bar"),
},
},
},
{
GroupId: lo.ToPtr("sg-test2"),
GroupName: lo.ToPtr("securityGroup-test2"),
Tags: []*ec2.Tag{
{
Key: lo.ToPtr("Name"),
Value: lo.ToPtr("test-security-group-2"),
},
{
Key: lo.ToPtr("foo"),
Value: lo.ToPtr("bar"),
},
},
},
{
GroupId: lo.ToPtr("sg-test3"),
GroupName: lo.ToPtr("securityGroup-test3"),
Tags: []*ec2.Tag{
{
Key: lo.ToPtr("Name"),
Value: lo.ToPtr("test-security-group-3"),
},
{
Key: lo.ToPtr("TestTag"),
},
{
Key: lo.ToPtr("foo"),
Value: lo.ToPtr("bar"),
},
},
},
}))
}()
}
wg.Wait()
})
})

func ExpectConsistsOfSecurityGroups(expected, actual []*ec2.SecurityGroup) {
Expand Down
Loading

0 comments on commit 6eaf34d

Please sign in to comment.