Skip to content

Commit

Permalink
Moved scheduling tests to scheduling package and renamed scheduling c…
Browse files Browse the repository at this point in the history
…ontroller to selection
  • Loading branch information
ellistarn committed Dec 13, 2021
1 parent ca5ed8d commit 4b4530c
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 137 deletions.
8 changes: 4 additions & 4 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/aws/karpenter/pkg/controllers/metrics"
"github.com/aws/karpenter/pkg/controllers/node"
"github.com/aws/karpenter/pkg/controllers/provisioning"
"github.com/aws/karpenter/pkg/controllers/scheduling"
"github.com/aws/karpenter/pkg/controllers/selection"
"github.com/aws/karpenter/pkg/controllers/termination"
"github.com/aws/karpenter/pkg/utils/injection"
"github.com/aws/karpenter/pkg/utils/options"
Expand Down Expand Up @@ -83,11 +83,11 @@ func main() {
HealthProbeBindAddress: fmt.Sprintf(":%d", opts.HealthProbePort),
})

provisioners := provisioning.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider)
provisioningController := provisioning.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider)

if err := manager.RegisterControllers(ctx,
provisioners,
scheduling.NewController(manager.GetClient(), provisioners),
provisioningController,
selection.NewController(manager.GetClient(), provisioningController),
termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider),
node.NewController(manager.GetClient()),
metrics.NewController(manager.GetClient(), cloudProvider),
Expand Down
46 changes: 23 additions & 23 deletions pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/aws/karpenter/pkg/cloudprovider/aws/fake"
"github.com/aws/karpenter/pkg/cloudprovider/registry"
"github.com/aws/karpenter/pkg/controllers/provisioning"
"github.com/aws/karpenter/pkg/controllers/scheduling"
"github.com/aws/karpenter/pkg/controllers/selection"
"github.com/aws/karpenter/pkg/test"
. "github.com/aws/karpenter/pkg/test/expectations"
"github.com/aws/karpenter/pkg/utils/injection"
Expand Down Expand Up @@ -54,7 +54,7 @@ var launchTemplateCache *cache.Cache
var unavailableOfferingsCache *cache.Cache
var fakeEC2API *fake.EC2API
var provisioners *provisioning.Controller
var scheduler *scheduling.Controller
var selectionController *selection.Controller

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
Expand Down Expand Up @@ -91,7 +91,7 @@ var _ = BeforeSuite(func() {
}
registry.RegisterOrDie(ctx, cloudProvider)
provisioners = provisioning.NewController(ctx, e.Client, clientSet.CoreV1(), cloudProvider)
scheduler = scheduling.NewController(e.Client, provisioners)
selectionController = selection.NewController(e.Client, provisioners)
})

Expect(env.Start()).To(Succeed(), "Failed to start environment")
Expand Down Expand Up @@ -123,7 +123,7 @@ var _ = Describe("Allocation", func() {
Context("Reconciliation", func() {
Context("Specialized Hardware", func() {
It("should not launch AWS Pod ENI on a t3", func() {
for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
for _, pod := range ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
NodeSelector: map[string]string{
v1.LabelInstanceTypeStable: "t3.large",
Expand All @@ -137,7 +137,7 @@ var _ = Describe("Allocation", func() {
}
})
It("should launch AWS Pod ENI on a compatible instance type", func() {
for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
for _, pod := range ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{resources.AWSPodENI: resource.MustParse("1")},
Expand All @@ -155,7 +155,7 @@ var _ = Describe("Allocation", func() {
})
It("should launch instances for Nvidia GPU resource requests", func() {
nodeNames := sets.NewString()
for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
for _, pod := range ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")},
Expand Down Expand Up @@ -184,7 +184,7 @@ var _ = Describe("Allocation", func() {
})
It("should launch instances for AWS Neuron resource requests", func() {
nodeNames := sets.NewString()
for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
for _, pod := range ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")},
Expand Down Expand Up @@ -216,7 +216,7 @@ var _ = Describe("Allocation", func() {
Context("Insufficient Capacity Error Cache", func() {
It("should launch instances of different type on second reconciliation attempt with Insufficient Capacity Error Cache fallback", func() {
fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{CapacityType: v1alpha1.CapacityTypeOnDemand, InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}}
pods := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
pods := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-1a"},
ResourceRequirements: v1.ResourceRequirements{
Expand All @@ -237,7 +237,7 @@ var _ = Describe("Allocation", func() {
ExpectNotScheduled(ctx, env.Client, pod)
}
nodeNames := sets.NewString()
for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pods...) {
for _, pod := range ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pods...) {
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "inf1.2xlarge"))
nodeNames.Insert(node.Name)
Expand All @@ -259,19 +259,19 @@ var _ = Describe("Allocation", func() {
}},
},
}}}
pod = ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pod)[0]
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0]
// it should've tried to pack them in test-zone-1a on a p3.8xlarge then hit insufficient capacity, the next attempt will try test-zone-1b
ExpectNotScheduled(ctx, env.Client, pod)

pod = ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pod)[0]
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(SatisfyAll(
HaveKeyWithValue(v1.LabelInstanceTypeStable, "p3.8xlarge"),
HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-1b")))
})
It("should launch instances on later reconciliation attempt with Insufficient Capacity Error Cache expiry", func() {
fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{CapacityType: v1alpha1.CapacityTypeOnDemand, InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}}
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
NodeSelector: map[string]string{v1.LabelInstanceTypeStable: "inf1.6xlarge"},
ResourceRequirements: v1.ResourceRequirements{
Expand All @@ -284,7 +284,7 @@ var _ = Describe("Allocation", func() {
// capacity shortage is over - expire the item from the cache and try again
fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{}
unavailableOfferingsCache.Delete(UnavailableOfferingsCacheKey(v1alpha1.CapacityTypeOnDemand, "inf1.6xlarge", "test-zone-1a"))
pod = ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pod)[0]
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "inf1.6xlarge"))
})
Expand All @@ -296,23 +296,23 @@ var _ = Describe("Allocation", func() {
{Key: v1.LabelInstanceTypeStable, Operator: v1.NodeSelectorOpIn, Values: []string{"m5.large"}},
}
// Spot Unavailable
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod())[0]
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod())[0]
ExpectNotScheduled(ctx, env.Client, pod)
// Fallback to OD
pod = ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod())[0]
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod())[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1alpha5.LabelCapacityType, v1alpha1.CapacityTypeOnDemand))
})
})
Context("CapacityType", func() {
It("should default to on demand", func() {
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod())[0]
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod())[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1alpha5.LabelCapacityType, v1alpha1.CapacityTypeOnDemand))
})
It("should launch spot capacity if flexible to both spot and on demand", func() {
provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1alpha5.LabelCapacityType, Operator: v1.NodeSelectorOpIn, Values: []string{v1alpha1.CapacityTypeSpot, v1alpha1.CapacityTypeOnDemand}}}
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod())[0]
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod())[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1alpha5.LabelCapacityType, v1alpha1.CapacityTypeSpot))
})
Expand All @@ -338,7 +338,7 @@ var _ = Describe("Allocation", func() {
Effect: "NoSchedule",
}

pod1 := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
pod1 := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
Tolerations: []v1.Toleration{t1, t2, t3},
}),
Expand All @@ -347,7 +347,7 @@ var _ = Describe("Allocation", func() {
Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1))
name1 := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput).LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateName

pod2 := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
pod2 := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
Tolerations: []v1.Toleration{t2, t3, t1},
}),
Expand All @@ -360,7 +360,7 @@ var _ = Describe("Allocation", func() {
})

It("should default to a generated launch template", func() {
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod())[0]
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod())[0]
ExpectScheduled(ctx, env.Client, pod)
Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1))
input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput)
Expand All @@ -370,7 +370,7 @@ var _ = Describe("Allocation", func() {
})
It("should allow a launch template to be specified", func() {
provider.LaunchTemplate = aws.String("test-launch-template")
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0]
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0]
ExpectScheduled(ctx, env.Client, pod)
Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1))
input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput)
Expand All @@ -382,7 +382,7 @@ var _ = Describe("Allocation", func() {
})
Context("Subnets", func() {
It("should default to the cluster's subnets", func() {
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0]
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0]
ExpectScheduled(ctx, env.Client, pod)
Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1))
input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput)
Expand All @@ -396,7 +396,7 @@ var _ = Describe("Allocation", func() {
})
Context("Security Groups", func() {
It("should default to the clusters security groups", func() {
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0]
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0]
ExpectScheduled(ctx, env.Client, pod)
Expect(fakeEC2API.CalledWithCreateLaunchTemplateInput.Cardinality()).To(Equal(1))
input := fakeEC2API.CalledWithCreateLaunchTemplateInput.Pop().(*ec2.CreateLaunchTemplateInput)
Expand Down
Loading

0 comments on commit 4b4530c

Please sign in to comment.