Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented pod batching in the allocation/provisioner controller #503

Merged
merged 18 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,14 @@ func main() {

clientSet := kubernetes.NewForConfigOrDie(manager.GetConfig())
cloudProvider := registry.NewCloudProvider(cloudprovider.Options{ClientSet: clientSet})
ctx := controllerruntime.SetupSignalHandler()

if err := expiration.NewController(manager.GetClient()).Register(manager); err != nil {
panic(err)
}

if err := manager.RegisterControllers(
if err := manager.RegisterControllers(ctx,
expiration.NewController(manager.GetClient()),
allocation.NewController(manager.GetClient(), clientSet.CoreV1(), cloudProvider),
reallocation.NewController(manager.GetClient(), clientSet.CoreV1(), cloudProvider),
termination.NewController(manager.GetClient(), clientSet.CoreV1(), cloudProvider),
).Start(controllerruntime.SetupSignalHandler()); err != nil {
).Start(ctx); err != nil {
panic(fmt.Sprintf("Unable to start manager, %s", err.Error()))
}
}
4 changes: 0 additions & 4 deletions pkg/cloudprovider/aws/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/awslabs/karpenter/pkg/cloudprovider"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
)
Expand Down Expand Up @@ -102,9 +101,6 @@ func (p *InstanceProvider) Create(ctx context.Context,
if count := len(createFleetOutput.Instances[0].InstanceIds); count != 1 {
return nil, fmt.Errorf("expected 1 instance ids, but got %d due to errors %v", count, createFleetOutput.Errors)
}
if count := len(createFleetOutput.Errors); count > 0 {
zap.S().Warnf("CreateFleet encountered %d errors, but still launched instances, %v", count, createFleetOutput.Errors)
bwagner5 marked this conversation as resolved.
Show resolved Hide resolved
}
return createFleetOutput.Instances[0].InstanceIds[0], nil
}

Expand Down
46 changes: 41 additions & 5 deletions pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ import (
"context"
"strings"
"testing"
"time"

"github.com/Pallinder/go-randomdata"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha2"
"github.com/awslabs/karpenter/pkg/cloudprovider/aws/fake"
"github.com/awslabs/karpenter/pkg/cloudprovider/registry"
"github.com/awslabs/karpenter/pkg/controllers"
"github.com/awslabs/karpenter/pkg/controllers/allocation"
"github.com/awslabs/karpenter/pkg/packing"
"github.com/awslabs/karpenter/pkg/test"
. "github.com/awslabs/karpenter/pkg/test/expectations"
"github.com/awslabs/karpenter/pkg/utils/resources"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
Expand All @@ -48,7 +51,7 @@ func TestAPIs(t *testing.T) {

var launchTemplateCache = cache.New(CacheTTL, CacheCleanupInterval)
var fakeEC2API *fake.EC2API
var controller controllers.Controller
var controller reconcile.Reconciler

var env = test.NewEnvironment(func(e *test.Environment) {
clientSet := kubernetes.NewForConfigOrDie(e.Config)
Expand All @@ -66,7 +69,15 @@ var env = test.NewEnvironment(func(e *test.Environment) {
instanceProvider: &InstanceProvider{fakeEC2API},
}
registry.RegisterOrDie(cloudProvider)
controller = allocation.NewController(e.Client, clientSet.CoreV1(), cloudProvider)
controller = &allocation.Controller{
Filter: &allocation.Filter{KubeClient: e.Client},
Binder: &allocation.Binder{KubeClient: e.Client, CoreV1Client: clientSet.CoreV1()},
Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond),
Constraints: &allocation.Constraints{KubeClient: e.Client},
Packer: packing.NewPacker(),
CloudProvider: cloudProvider,
KubeClient: e.Client,
}
})

var _ = BeforeSuite(func() {
Expand Down Expand Up @@ -107,6 +118,7 @@ var _ = Describe("Allocation", func() {
Context("Reconciliation", func() {
Context("Reserved Labels", func() {
It("should not schedule a pod with cloud provider reserved labels", func() {
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner,
test.PendingPod(test.PodOptions{NodeSelector: map[string]string{AWSLabelPrefix + "unknown": randomdata.SillyName()}}),
)
Expand Down Expand Up @@ -137,8 +149,9 @@ var _ = Describe("Allocation", func() {
Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("4")},
},
})
ExpectCreated(env.Client, provisioner)
ExpectCreatedWithStatus(env.Client, pod1, pod2, pod3)
ExpectControllerSucceeded(controller, provisioner)
ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(provisioner))
// Assertions
scheduled1 := ExpectPodExists(env.Client, pod1.GetName(), pod1.GetNamespace())
scheduled2 := ExpectPodExists(env.Client, pod2.GetName(), pod2.GetNamespace())
Expand Down Expand Up @@ -186,8 +199,9 @@ var _ = Describe("Allocation", func() {
Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("4")},
},
})
ExpectCreated(env.Client, provisioner)
ExpectCreatedWithStatus(env.Client, pod1, pod2, pod3)
ExpectControllerSucceeded(controller, provisioner)
ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(provisioner))
// Assertions
scheduled1 := ExpectPodExists(env.Client, pod1.GetName(), pod1.GetNamespace())
scheduled2 := ExpectPodExists(env.Client, pod2.GetName(), pod2.GetNamespace())
Expand Down Expand Up @@ -217,6 +231,7 @@ var _ = Describe("Allocation", func() {
Context("CapacityType", func() {
It("should default to on demand", func() {
// Setup
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod())
// Assertions
node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName)
Expand All @@ -228,6 +243,7 @@ var _ = Describe("Allocation", func() {
It("should default to a provisioner's specified capacity type", func() {
// Setup
provisioner.Spec.Labels = map[string]string{CapacityTypeLabel: CapacityTypeSpot}
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod())
// Assertions
node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName)
Expand All @@ -238,6 +254,7 @@ var _ = Describe("Allocation", func() {
})
It("should allow a pod to override the capacity type", func() {
// Setup
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner,
test.PendingPod(test.PodOptions{NodeSelector: map[string]string{CapacityTypeLabel: CapacityTypeSpot}}),
)
Expand All @@ -250,6 +267,7 @@ var _ = Describe("Allocation", func() {
})
It("should not schedule a pod with an invalid capacityType", func() {
// Setup
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner,
test.PendingPod(test.PodOptions{NodeSelector: map[string]string{CapacityTypeLabel: "unknown"}}),
)
Expand All @@ -260,6 +278,7 @@ var _ = Describe("Allocation", func() {
Context("LaunchTemplates", func() {
It("should default to a generated launch template", func() {
// Setup
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod())
// Assertions
node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName)
Expand All @@ -277,6 +296,7 @@ var _ = Describe("Allocation", func() {
LaunchTemplateIdLabel: randomdata.SillyName(),
LaunchTemplateVersionLabel: randomdata.SillyName(),
}
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod())
// Assertions
node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName)
Expand All @@ -291,6 +311,7 @@ var _ = Describe("Allocation", func() {
It("should default to a provisioner's launch template and the default launch template version", func() {
// Setup
provisioner.Spec.Labels = map[string]string{LaunchTemplateIdLabel: randomdata.SillyName()}
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod())
// Assertions
node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName)
Expand All @@ -308,6 +329,7 @@ var _ = Describe("Allocation", func() {
LaunchTemplateIdLabel: randomdata.SillyName(),
LaunchTemplateVersionLabel: randomdata.SillyName(),
}
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner,
test.PendingPod(test.PodOptions{NodeSelector: map[string]string{
LaunchTemplateIdLabel: randomdata.SillyName(),
Expand All @@ -327,6 +349,7 @@ var _ = Describe("Allocation", func() {
It("should allow a pod to override the launch template id and use the default launch template version", func() {
// Setup
provisioner.Spec.Labels = map[string]string{LaunchTemplateIdLabel: randomdata.SillyName()}
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner,
test.PendingPod(test.PodOptions{NodeSelector: map[string]string{LaunchTemplateIdLabel: randomdata.SillyName()}}),
)
Expand All @@ -346,6 +369,7 @@ var _ = Describe("Allocation", func() {
LaunchTemplateIdLabel: randomdata.SillyName(),
LaunchTemplateVersionLabel: randomdata.SillyName(),
}
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner,
test.PendingPod(test.PodOptions{NodeSelector: map[string]string{LaunchTemplateIdLabel: randomdata.SillyName()}}),
)
Expand All @@ -364,6 +388,7 @@ var _ = Describe("Allocation", func() {
It("should default to the clusters subnets", func() {
// Setup
provisioner.Spec.InstanceTypes = []string{"m5.large"} // limit instance type to simplify ConsistOf checks
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod())
// Assertions
node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName)
Expand All @@ -381,6 +406,7 @@ var _ = Describe("Allocation", func() {
// Setup
provisioner.Spec.Labels = map[string]string{SubnetNameLabel: "test-subnet-2"}
provisioner.Spec.InstanceTypes = []string{"m5.large"} // limit instance type to simplify ConsistOf checks
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod())
// Assertions
node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName)
Expand All @@ -395,6 +421,7 @@ var _ = Describe("Allocation", func() {
It("should default to a provisioner's specified subnet tag key", func() {
provisioner.Spec.Labels = map[string]string{SubnetTagKeyLabel: "TestTag"}
provisioner.Spec.InstanceTypes = []string{"m5.large"} // limit instance type to simplify ConsistOf checks
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod())
// Assertions
node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName)
Expand All @@ -409,6 +436,7 @@ var _ = Describe("Allocation", func() {
It("should allow a pod to override the subnet name", func() {
// Setup
provisioner.Spec.InstanceTypes = []string{"m5.large"} // limit instance type to simplify ConsistOf checks
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner,
test.PendingPod(test.PodOptions{NodeSelector: map[string]string{SubnetNameLabel: "test-subnet-2"}}),
)
Expand All @@ -424,6 +452,7 @@ var _ = Describe("Allocation", func() {
})
It("should allow a pod to override the subnet tags", func() {
provisioner.Spec.InstanceTypes = []string{"m5.large"} // limit instance type to simplify ConsistOf checks
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner,
test.PendingPod(test.PodOptions{NodeSelector: map[string]string{SubnetTagKeyLabel: "TestTag"}}),
)
Expand All @@ -439,6 +468,7 @@ var _ = Describe("Allocation", func() {
})
It("should not schedule a pod with an invalid subnet", func() {
provisioner.Spec.InstanceTypes = []string{"m5.large"} // limit instance type to simplify ConsistOf checks
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningFailed(env.Client, controller, provisioner,
test.PendingPod(test.PodOptions{NodeSelector: map[string]string{SubnetTagKeyLabel: "Invalid"}}),
)
Expand All @@ -449,6 +479,7 @@ var _ = Describe("Allocation", func() {
Context("Security Groups", func() {
It("should default to the clusters security groups", func() {
// Setup
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod())
// Assertions
node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName)
Expand All @@ -464,6 +495,7 @@ var _ = Describe("Allocation", func() {
It("should default to a provisioner's specified security groups name", func() {
// Setup
provisioner.Spec.Labels = map[string]string{SecurityGroupNameLabel: "test-security-group-2"}
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod())
// Assertions
node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName)
Expand All @@ -476,6 +508,7 @@ var _ = Describe("Allocation", func() {
})
It("should default to a provisioner's specified security groups tag key", func() {
provisioner.Spec.Labels = map[string]string{SecurityGroupTagKeyLabel: "TestTag"}
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod())
// Assertions
node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName)
Expand All @@ -488,6 +521,7 @@ var _ = Describe("Allocation", func() {
})
It("should allow a pod to override the security groups name", func() {
// Setup
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner,
test.PendingPod(test.PodOptions{NodeSelector: map[string]string{SecurityGroupNameLabel: "test-security-group-2"}}),
)
Expand All @@ -501,6 +535,7 @@ var _ = Describe("Allocation", func() {
Expect(node.Labels).ToNot(HaveKey(SecurityGroupTagKeyLabel))
})
It("should allow a pod to override the security groups tags", func() {
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner,
test.PendingPod(test.PodOptions{NodeSelector: map[string]string{SecurityGroupTagKeyLabel: "TestTag"}}),
)
Expand All @@ -516,6 +551,7 @@ var _ = Describe("Allocation", func() {
Expect(node.Labels).To(HaveKeyWithValue(SecurityGroupTagKeyLabel, pods[0].Spec.NodeSelector[SecurityGroupTagKeyLabel]))
})
It("should not schedule a pod with an invalid security group", func() {
ExpectCreated(env.Client, provisioner)
pods := ExpectProvisioningFailed(env.Client, controller, provisioner,
test.PendingPod(test.PodOptions{NodeSelector: map[string]string{SecurityGroupTagKeyLabel: "Invalid"}}),
)
Expand Down
Loading