diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 801081397f5a..90d9feffbc84 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -75,16 +75,14 @@ func main() { clientSet := kubernetes.NewForConfigOrDie(manager.GetConfig()) cloudProvider := registry.NewCloudProvider(cloudprovider.Options{ClientSet: clientSet}) + ctx := controllerruntime.SetupSignalHandler() - if err := expiration.NewController(manager.GetClient()).Register(manager); err != nil { - panic(err) - } - - if err := manager.RegisterControllers( + if err := manager.RegisterControllers(ctx, + expiration.NewController(manager.GetClient()), allocation.NewController(manager.GetClient(), clientSet.CoreV1(), cloudProvider), reallocation.NewController(manager.GetClient(), clientSet.CoreV1(), cloudProvider), termination.NewController(manager.GetClient(), clientSet.CoreV1(), cloudProvider), - ).Start(controllerruntime.SetupSignalHandler()); err != nil { + ).Start(ctx); err != nil { panic(fmt.Sprintf("Unable to start manager, %s", err.Error())) } } diff --git a/pkg/cloudprovider/aws/instance.go b/pkg/cloudprovider/aws/instance.go index 7103e62ae6a8..e8173e45513e 100644 --- a/pkg/cloudprovider/aws/instance.go +++ b/pkg/cloudprovider/aws/instance.go @@ -24,7 +24,6 @@ import ( "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/awslabs/karpenter/pkg/cloudprovider" - "go.uber.org/zap" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" ) @@ -102,9 +101,6 @@ func (p *InstanceProvider) Create(ctx context.Context, if count := len(createFleetOutput.Instances[0].InstanceIds); count != 1 { return nil, fmt.Errorf("expected 1 instance ids, but got %d due to errors %v", count, createFleetOutput.Errors) } - if count := len(createFleetOutput.Errors); count > 0 { - zap.S().Warnf("CreateFleet encountered %d errors, but still launched instances, %v", count, createFleetOutput.Errors) - } return createFleetOutput.Instances[0].InstanceIds[0], nil } diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index 8d250c0eef68..900ea282dfe0 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -18,16 +18,19 @@ import ( "context" "strings" "testing" + "time" "github.com/Pallinder/go-randomdata" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha2" "github.com/awslabs/karpenter/pkg/cloudprovider/aws/fake" "github.com/awslabs/karpenter/pkg/cloudprovider/registry" - "github.com/awslabs/karpenter/pkg/controllers" "github.com/awslabs/karpenter/pkg/controllers/allocation" + "github.com/awslabs/karpenter/pkg/packing" "github.com/awslabs/karpenter/pkg/test" . "github.com/awslabs/karpenter/pkg/test/expectations" "github.com/awslabs/karpenter/pkg/utils/resources" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" @@ -48,7 +51,7 @@ func TestAPIs(t *testing.T) { var launchTemplateCache = cache.New(CacheTTL, CacheCleanupInterval) var fakeEC2API *fake.EC2API -var controller controllers.Controller +var controller reconcile.Reconciler var env = test.NewEnvironment(func(e *test.Environment) { clientSet := kubernetes.NewForConfigOrDie(e.Config) @@ -66,7 +69,15 @@ var env = test.NewEnvironment(func(e *test.Environment) { instanceProvider: &InstanceProvider{fakeEC2API}, } registry.RegisterOrDie(cloudProvider) - controller = allocation.NewController(e.Client, clientSet.CoreV1(), cloudProvider) + controller = &allocation.Controller{ + Filter: &allocation.Filter{KubeClient: e.Client}, + Binder: &allocation.Binder{KubeClient: e.Client, CoreV1Client: clientSet.CoreV1()}, + Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond), + Constraints: &allocation.Constraints{KubeClient: e.Client}, + Packer: packing.NewPacker(), + CloudProvider: cloudProvider, + KubeClient: e.Client, + } }) var _ = BeforeSuite(func() { @@ -107,6 +118,7 @@ var _ = Describe("Allocation", func() { Context("Reconciliation", func() { Context("Reserved Labels", func() { It("should not schedule a pod with cloud provider reserved labels", func() { + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod(test.PodOptions{NodeSelector: map[string]string{AWSLabelPrefix + "unknown": randomdata.SillyName()}}), ) @@ -137,8 +149,9 @@ var _ = Describe("Allocation", func() { Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("4")}, }, }) + ExpectCreated(env.Client, provisioner) ExpectCreatedWithStatus(env.Client, pod1, pod2, pod3) - ExpectControllerSucceeded(controller, provisioner) + ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(provisioner)) // Assertions scheduled1 := ExpectPodExists(env.Client, pod1.GetName(), pod1.GetNamespace()) scheduled2 := ExpectPodExists(env.Client, pod2.GetName(), pod2.GetNamespace()) @@ -186,8 +199,9 @@ var _ = Describe("Allocation", func() { Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("4")}, }, }) + ExpectCreated(env.Client, provisioner) ExpectCreatedWithStatus(env.Client, pod1, pod2, pod3) - ExpectControllerSucceeded(controller, provisioner) + ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(provisioner)) // Assertions scheduled1 := ExpectPodExists(env.Client, pod1.GetName(), pod1.GetNamespace()) scheduled2 := ExpectPodExists(env.Client, pod2.GetName(), pod2.GetNamespace()) @@ -217,6 +231,7 @@ var _ = Describe("Allocation", func() { Context("CapacityType", func() { It("should default to on demand", func() { // Setup + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod()) // Assertions node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) @@ -228,6 +243,7 @@ var _ = Describe("Allocation", func() { It("should default to a provisioner's specified capacity type", func() { // Setup provisioner.Spec.Labels = map[string]string{CapacityTypeLabel: CapacityTypeSpot} + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod()) // Assertions node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) @@ -238,6 +254,7 @@ var _ = Describe("Allocation", func() { }) It("should allow a pod to override the capacity type", func() { // Setup + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod(test.PodOptions{NodeSelector: map[string]string{CapacityTypeLabel: CapacityTypeSpot}}), ) @@ -250,6 +267,7 @@ var _ = Describe("Allocation", func() { }) It("should not schedule a pod with an invalid capacityType", func() { // Setup + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod(test.PodOptions{NodeSelector: map[string]string{CapacityTypeLabel: "unknown"}}), ) @@ -260,6 +278,7 @@ var _ = Describe("Allocation", func() { Context("LaunchTemplates", func() { It("should default to a generated launch template", func() { // Setup + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod()) // Assertions node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) @@ -277,6 +296,7 @@ var _ = Describe("Allocation", func() { LaunchTemplateIdLabel: randomdata.SillyName(), LaunchTemplateVersionLabel: randomdata.SillyName(), } + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod()) // Assertions node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) @@ -291,6 +311,7 @@ var _ = Describe("Allocation", func() { It("should default to a provisioner's launch template and the default launch template version", func() { // Setup provisioner.Spec.Labels = map[string]string{LaunchTemplateIdLabel: randomdata.SillyName()} + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod()) // Assertions node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) @@ -308,6 +329,7 @@ var _ = Describe("Allocation", func() { LaunchTemplateIdLabel: randomdata.SillyName(), LaunchTemplateVersionLabel: randomdata.SillyName(), } + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod(test.PodOptions{NodeSelector: map[string]string{ LaunchTemplateIdLabel: randomdata.SillyName(), @@ -327,6 +349,7 @@ var _ = Describe("Allocation", func() { It("should allow a pod to override the launch template id and use the default launch template version", func() { // Setup provisioner.Spec.Labels = map[string]string{LaunchTemplateIdLabel: randomdata.SillyName()} + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod(test.PodOptions{NodeSelector: map[string]string{LaunchTemplateIdLabel: randomdata.SillyName()}}), ) @@ -346,6 +369,7 @@ var _ = Describe("Allocation", func() { LaunchTemplateIdLabel: randomdata.SillyName(), LaunchTemplateVersionLabel: randomdata.SillyName(), } + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod(test.PodOptions{NodeSelector: map[string]string{LaunchTemplateIdLabel: randomdata.SillyName()}}), ) @@ -364,6 +388,7 @@ var _ = Describe("Allocation", func() { It("should default to the clusters subnets", func() { // Setup provisioner.Spec.InstanceTypes = []string{"m5.large"} // limit instance type to simplify ConsistOf checks + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod()) // Assertions node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) @@ -381,6 +406,7 @@ var _ = Describe("Allocation", func() { // Setup provisioner.Spec.Labels = map[string]string{SubnetNameLabel: "test-subnet-2"} provisioner.Spec.InstanceTypes = []string{"m5.large"} // limit instance type to simplify ConsistOf checks + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod()) // Assertions node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) @@ -395,6 +421,7 @@ var _ = Describe("Allocation", func() { It("should default to a provisioner's specified subnet tag key", func() { provisioner.Spec.Labels = map[string]string{SubnetTagKeyLabel: "TestTag"} provisioner.Spec.InstanceTypes = []string{"m5.large"} // limit instance type to simplify ConsistOf checks + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod()) // Assertions node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) @@ -409,6 +436,7 @@ var _ = Describe("Allocation", func() { It("should allow a pod to override the subnet name", func() { // Setup provisioner.Spec.InstanceTypes = []string{"m5.large"} // limit instance type to simplify ConsistOf checks + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod(test.PodOptions{NodeSelector: map[string]string{SubnetNameLabel: "test-subnet-2"}}), ) @@ -424,6 +452,7 @@ var _ = Describe("Allocation", func() { }) It("should allow a pod to override the subnet tags", func() { provisioner.Spec.InstanceTypes = []string{"m5.large"} // limit instance type to simplify ConsistOf checks + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod(test.PodOptions{NodeSelector: map[string]string{SubnetTagKeyLabel: "TestTag"}}), ) @@ -439,6 +468,7 @@ var _ = Describe("Allocation", func() { }) It("should not schedule a pod with an invalid subnet", func() { provisioner.Spec.InstanceTypes = []string{"m5.large"} // limit instance type to simplify ConsistOf checks + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningFailed(env.Client, controller, provisioner, test.PendingPod(test.PodOptions{NodeSelector: map[string]string{SubnetTagKeyLabel: "Invalid"}}), ) @@ -449,6 +479,7 @@ var _ = Describe("Allocation", func() { Context("Security Groups", func() { It("should default to the clusters security groups", func() { // Setup + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod()) // Assertions node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) @@ -464,6 +495,7 @@ var _ = Describe("Allocation", func() { It("should default to a provisioner's specified security groups name", func() { // Setup provisioner.Spec.Labels = map[string]string{SecurityGroupNameLabel: "test-security-group-2"} + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod()) // Assertions node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) @@ -476,6 +508,7 @@ var _ = Describe("Allocation", func() { }) It("should default to a provisioner's specified security groups tag key", func() { provisioner.Spec.Labels = map[string]string{SecurityGroupTagKeyLabel: "TestTag"} + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod()) // Assertions node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) @@ -488,6 +521,7 @@ var _ = Describe("Allocation", func() { }) It("should allow a pod to override the security groups name", func() { // Setup + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod(test.PodOptions{NodeSelector: map[string]string{SecurityGroupNameLabel: "test-security-group-2"}}), ) @@ -501,6 +535,7 @@ var _ = Describe("Allocation", func() { Expect(node.Labels).ToNot(HaveKey(SecurityGroupTagKeyLabel)) }) It("should allow a pod to override the security groups tags", func() { + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod(test.PodOptions{NodeSelector: map[string]string{SecurityGroupTagKeyLabel: "TestTag"}}), ) @@ -516,6 +551,7 @@ var _ = Describe("Allocation", func() { Expect(node.Labels).To(HaveKeyWithValue(SecurityGroupTagKeyLabel, pods[0].Spec.NodeSelector[SecurityGroupTagKeyLabel])) }) It("should not schedule a pod with an invalid security group", func() { + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningFailed(env.Client, controller, provisioner, test.PendingPod(test.PodOptions{NodeSelector: map[string]string{SecurityGroupTagKeyLabel: "Invalid"}}), ) diff --git a/pkg/controllers/allocation/batch.go b/pkg/controllers/allocation/batch.go new file mode 100644 index 000000000000..6207aa41f0bc --- /dev/null +++ b/pkg/controllers/allocation/batch.go @@ -0,0 +1,173 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package allocation + +import ( + "context" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +const ( + opAdd = "add" + opWait = "wait" +) + +// Batcher is a batch manager for multiple objects +type Batcher struct { + // MaxPeriod is the maximum amount of time to batch incoming pods before flushing + MaxPeriod time.Duration + // IdlePeriod is the amount of time to wait to flush a batch when there are no incoming pods but the batch is not empty + // It should be a smaller duration than MaxPeriod + IdlePeriod time.Duration + + // windows keeps a mapping of a key (like a provisioner name and namespace) to a specific object's batch window + windows map[types.UID]*window + // ops is a stream of add and wait operations on a batch window + ops chan *batchOp + // isMonitorRunning indicates if the monitor go routine has been started + isMonitorRunning bool +} + +type batchOp struct { + kind string + key types.UID + waitEnd chan bool +} + +// window is an individual batch window +type window struct { + lastUpdated time.Time + started time.Time + closed []chan bool +} + +// NewBatcher creates a new batch manager to start multiple batch windows +func NewBatcher(maxPeriod time.Duration, idlePeriod time.Duration) *Batcher { + return &Batcher{ + MaxPeriod: maxPeriod, + IdlePeriod: idlePeriod, + windows: map[types.UID]*window{}, + } +} + +// Start should be called before Add or Wait +// It is not safe to call Start concurrently +// but Start can be called synchronously multiple times w/ no effect +func (b *Batcher) Start(ctx context.Context) { + if !b.isMonitorRunning { + b.ops = make(chan *batchOp, 1000) + go b.monitor(ctx) + b.isMonitorRunning = true + } +} + +// Add starts a batching window or adds to an existing in-progress window +// Add is safe to be called concurrently +func (b *Batcher) Add(obj metav1.Object) { + select { + case b.ops <- &batchOp{kind: opAdd, key: obj.GetUID()}: + // Do not block if the channel is full + default: + } +} + +// Wait blocks until a batching window ends +// If the batch is empty, it will block until something is added or the window times out +func (b *Batcher) Wait(obj metav1.Object) { + waitBatchOp := &batchOp{kind: opWait, key: obj.GetUID(), waitEnd: make(chan bool, 1)} + timeout := time.NewTimer(b.MaxPeriod) + select { + case b.ops <- waitBatchOp: + <-waitBatchOp.waitEnd + // if the ops channel is full (should be very rare), allow wait to block until the MaxPeriod + case <-timeout.C: + } +} + +// monitor is a synchronous loop that controls the window start, update, and end +// monitor should be executed in one go routine and will handle all object batch windows +func (b *Batcher) monitor(ctx context.Context) { + defer func() { b.isMonitorRunning = false }() + ticker := time.NewTicker(b.IdlePeriod / 2) + for { + select { + // Wake and check for any timed out batch windows + case <-ticker.C: + for key, batch := range b.windows { + b.checkForWindowEndAndNotify(key, batch) + } + // Process window operations + case op := <-b.ops: + switch op.kind { + // Start a new window or update progress on a window + case opAdd: + b.startOrUpdateWindow(op.key) + // Register a waiter and start a window if no window has been started + case opWait: + window, ok := b.windows[op.key] + if !ok { + window = b.startOrUpdateWindow(op.key) + } + window.closed = append(window.closed, op.waitEnd) + } + // Stop monitor routine on shutdown + case <-ctx.Done(): + for key, window := range b.windows { + b.endWindow(key, window) + } + return + } + } +} + +// checkForWindowEndAndNotify checks if a window has timed out due to inactivity (IdlePeriod) or has reached the MaxBatchPeriod. +// If the batch window has ended, then the batch closed channel will be notified and the window will be removed +func (b *Batcher) checkForWindowEndAndNotify(key types.UID, window *window) { + if time.Since(window.lastUpdated) < b.IdlePeriod && time.Since(window.started) < b.MaxPeriod { + return + } + b.endWindow(key, window) +} + +// endWindow signals the end of a window to all wait consumers and deletes the window +func (b *Batcher) endWindow(key types.UID, window *window) { + for _, end := range window.closed { + select { + case end <- true: + close(end) + default: + } + } + delete(b.windows, key) +} + +// startOrUpdateWindow starts a new window for the object key if one does not already exist +// if a window already exists for the object key, then the lastUpdate time is set +func (b *Batcher) startOrUpdateWindow(key types.UID) *window { + batchWindow, ok := b.windows[key] + if !ok { + batchWindow = &window{lastUpdated: time.Now(), started: time.Now()} + b.windows[key] = batchWindow + return batchWindow + } + batchWindow.lastUpdated = time.Now() + if batchWindow.started.IsZero() { + batchWindow.started = time.Now() + } + return batchWindow +} diff --git a/pkg/controllers/allocation/bind.go b/pkg/controllers/allocation/bind.go index 7cd40db4684b..268daa9b8118 100644 --- a/pkg/controllers/allocation/bind.go +++ b/pkg/controllers/allocation/bind.go @@ -28,8 +28,8 @@ import ( ) type Binder struct { - kubeClient client.Client - coreV1Client corev1.CoreV1Interface + KubeClient client.Client + CoreV1Client corev1.CoreV1Interface } func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error { @@ -52,7 +52,7 @@ func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error // with the API server. In the common case, we create the node object // ourselves to enforce the binding decision and enable images to be pulled // before the node is fully Ready. - if _, err := b.coreV1Client.Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { + if _, err := b.CoreV1Client.Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { if !errors.IsAlreadyExists(err) { return fmt.Errorf("creating node %s, %w", node.Name, err) } @@ -69,7 +69,7 @@ func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error func (b *Binder) bind(ctx context.Context, node *v1.Node, pod *v1.Pod) error { // TODO, Stop using deprecated v1.Binding - if err := b.coreV1Client.Pods(pod.Namespace).Bind(ctx, &v1.Binding{ + if err := b.CoreV1Client.Pods(pod.Namespace).Bind(ctx, &v1.Binding{ TypeMeta: pod.TypeMeta, ObjectMeta: pod.ObjectMeta, Target: v1.ObjectReference{Name: node.Name}, diff --git a/pkg/controllers/allocation/constraints.go b/pkg/controllers/allocation/constraints.go index bf25a2caa50f..44a5a140ec66 100644 --- a/pkg/controllers/allocation/constraints.go +++ b/pkg/controllers/allocation/constraints.go @@ -30,7 +30,7 @@ import ( ) type Constraints struct { - kubeClient client.Client + KubeClient client.Client } // Group separates pods into a set of equivalent scheduling groups. All pods in @@ -79,7 +79,7 @@ func (c *Constraints) Group(ctx context.Context, provisioner *v1alpha2.Provision func (c *Constraints) getDaemons(ctx context.Context, node *v1.Node) ([]*v1.Pod, error) { // 1. Get DaemonSets daemonSetList := &appsv1.DaemonSetList{} - if err := c.kubeClient.List(ctx, daemonSetList); err != nil { + if err := c.KubeClient.List(ctx, daemonSetList); err != nil { return nil, fmt.Errorf("listing daemonsets, %w", err) } diff --git a/pkg/controllers/allocation/controller.go b/pkg/controllers/allocation/controller.go index cd69d14d013f..2b56a5e3b457 100644 --- a/pkg/controllers/allocation/controller.go +++ b/pkg/controllers/allocation/controller.go @@ -23,97 +23,184 @@ import ( "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/packing" "github.com/awslabs/karpenter/pkg/utils/apiobject" + "golang.org/x/time/rate" + "go.uber.org/multierr" "go.uber.org/zap" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/util/workqueue" + controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +const ( + maxBatchWindow = 10 * time.Second + batchIdleTimeout = 2 * time.Second ) // Controller for the resource type Controller struct { - filter *Filter - binder *Binder - constraints *Constraints - packer packing.Packer - cloudProvider cloudprovider.CloudProvider -} - -// For returns the resource this controller is for. -func (c *Controller) For() client.Object { - return &v1alpha2.Provisioner{} -} - -func (c *Controller) Interval() time.Duration { - return 5 * time.Second -} - -func (c *Controller) Name() string { - return "provisioner/allocator" + Batcher *Batcher + Filter *Filter + Binder *Binder + Constraints *Constraints + Packer packing.Packer + CloudProvider cloudprovider.CloudProvider + KubeClient client.Client } // NewController constructs a controller instance func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider) *Controller { return &Controller{ - filter: &Filter{kubeClient: kubeClient}, - binder: &Binder{kubeClient: kubeClient, coreV1Client: coreV1Client}, - constraints: &Constraints{kubeClient: kubeClient}, - packer: packing.NewPacker(), - cloudProvider: cloudProvider, + Filter: &Filter{KubeClient: kubeClient}, + Binder: &Binder{KubeClient: kubeClient, CoreV1Client: coreV1Client}, + Batcher: NewBatcher(maxBatchWindow, batchIdleTimeout), + Constraints: &Constraints{KubeClient: kubeClient}, + Packer: packing.NewPacker(), + CloudProvider: cloudProvider, + KubeClient: kubeClient, } } // Reconcile executes an allocation control loop for the resource -func (c *Controller) Reconcile(ctx context.Context, object client.Object) (reconcile.Result, error) { - persistedProvisioner := object.(*v1alpha2.Provisioner) - - // 1. Hydrate provisioner with (dynamic) default values, which must not - // be persisted into the original CRD as they might change with each reconciliation - // loop iteration. - provisionerWithDefaults, err := persistedProvisioner.WithDynamicDefaults() +func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + // 1. Fetch provisioner + persistedProvisioner, provisionerWithDefaults, err := c.retrieveProvisionerFrom(ctx, req) if err != nil { - return reconcile.Result{}, fmt.Errorf("setting dynamic default values, %w", err) + return reconcile.Result{}, err } - // 2. Filter pods - pods, err := c.filter.GetProvisionablePods(ctx, &provisionerWithDefaults) + // 2. Wait on a pod batch + c.Batcher.Wait(provisionerWithDefaults) + + // 3. Filter pods + pods, err := c.Filter.GetProvisionablePods(ctx, provisionerWithDefaults) if err != nil { return reconcile.Result{}, fmt.Errorf("filtering pods, %w", err) } if len(pods) == 0 { - return reconcile.Result{RequeueAfter: c.Interval()}, nil + return reconcile.Result{}, nil } zap.S().Infof("Found %d provisionable pods", len(pods)) - // 3. Group by constraints - constraintGroups, err := c.constraints.Group(ctx, &provisionerWithDefaults, pods) + // 4. Group by constraints + constraintGroups, err := c.Constraints.Group(ctx, provisionerWithDefaults, pods) if err != nil { return reconcile.Result{}, fmt.Errorf("building constraint groups, %w", err) } - // 4. Binpack each group + // 5. Binpack each group packings := []*cloudprovider.Packing{} for _, constraintGroup := range constraintGroups { - instanceTypes, err := c.cloudProvider.GetInstanceTypes(ctx) + instanceTypes, err := c.CloudProvider.GetInstanceTypes(ctx) if err != nil { return reconcile.Result{}, fmt.Errorf("getting instance types, %w", err) } - packings = append(packings, c.packer.Pack(ctx, constraintGroup, instanceTypes)...) + packings = append(packings, c.Packer.Pack(ctx, constraintGroup, instanceTypes)...) } - // 5. Create packedNodes for packings and also copy all Status changes made by the + // 6. Create packedNodes for packings and also copy all Status changes made by the // cloud provider to the original provisioner instance. - packedNodes, err := c.cloudProvider.Create(ctx, &provisionerWithDefaults, packings) + packedNodes, err := c.CloudProvider.Create(ctx, persistedProvisioner, packings) if err != nil { return reconcile.Result{}, fmt.Errorf("creating capacity, %w", err) } - // 6. Bind pods to nodes + // 7. Bind pods to nodes + var errs error for _, packedNode := range packedNodes { zap.S().Infof("Binding pods %v to node %s", apiobject.PodNamespacedNames(packedNode.Pods), packedNode.Node.Name) - if err := c.binder.Bind(ctx, packedNode.Node, packedNode.Pods); err != nil { - zap.S().Errorf("Continuing after failing to bind, %s", err.Error()) + if err := c.Binder.Bind(ctx, packedNode.Node, packedNode.Pods); err != nil { + errs = multierr.Append(errs, err) + } + } + return reconcile.Result{}, errs +} + +func (c *Controller) Register(ctx context.Context, m manager.Manager) error { + err := controllerruntime. + NewControllerManagedBy(m). + Named("Allocation"). + For(&v1alpha2.Provisioner{}). + Watches( + &source.Kind{Type: &v1.Pod{}}, + handler.EnqueueRequestsFromMapFunc(c.podToProvisioner), + ). + WithOptions( + controller.Options{ + RateLimiter: workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second), + // 10 qps, 100 bucket size + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), + MaxConcurrentReconciles: 4, + }, + ). + Complete(c) + c.Batcher.Start(ctx) + return err +} + +// retrieveProvisionerFrom fetches the provisioner and returns a raw provisioner that was persisted in the api server +// and a provisioner w/ default runtime values added that should not be persisted +func (c *Controller) retrieveProvisionerFrom(ctx context.Context, req reconcile.Request) (*v1alpha2.Provisioner, *v1alpha2.Provisioner, error) { + persistedProvisioner := &v1alpha2.Provisioner{} + if err := c.KubeClient.Get(ctx, req.NamespacedName, persistedProvisioner); err != nil { + if errors.IsNotFound(err) { + return nil, nil, nil + } + return nil, nil, err + } + + // Hydrate provisioner with (dynamic) default values, which must not + // be persisted into the original CRD as they might change with each reconciliation + // loop iteration. + provisionerWithDefaults, err := persistedProvisioner.WithDynamicDefaults() + if err != nil { + return persistedProvisioner, &provisionerWithDefaults, fmt.Errorf("setting dynamic default values, %w", err) + } + return persistedProvisioner, &provisionerWithDefaults, nil +} + +// podToProvisioner is a function handler to transform pod objs to provisioner reconcile requests +func (c *Controller) podToProvisioner(o client.Object) (requests []reconcile.Request) { + pod := o.(*v1.Pod) + ctx := context.Background() + provisioner, err := c.getProvisionerFor(ctx, pod) + if err != nil { + zap.S().Errorf("Retrieving provisioner, %s", err.Error()) + return nil + } + if err = c.Filter.isProvisionable(ctx, pod, provisioner); err != nil { + return nil + } + c.Batcher.Add(provisioner) + return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: provisioner.Name, Namespace: provisioner.Namespace}}} +} + +// getProvisionerFor retrieves the provisioner responsible for the pod +func (c *Controller) getProvisionerFor(ctx context.Context, p *v1.Pod) (*v1alpha2.Provisioner, error) { + provisionerKey := client.ObjectKey{Namespace: "default", Name: "default"} + if name, ok := p.Spec.NodeSelector[v1alpha2.ProvisionerNameLabelKey]; ok { + provisionerKey.Name = name + } + if namespace, ok := p.Spec.NodeSelector[v1alpha2.ProvisionerNamespaceLabelKey]; ok { + provisionerKey.Namespace = namespace + } + provisioner := &v1alpha2.Provisioner{} + if err := c.KubeClient.Get(ctx, provisionerKey, provisioner); err != nil { + if errors.IsNotFound(err) { + return nil, fmt.Errorf("create a default provisioner, or specify an alternative using the nodeSelector %s", v1alpha2.ProvisionerNameLabelKey) } + return nil, err } - return reconcile.Result{RequeueAfter: c.Interval()}, nil + return provisioner, nil } diff --git a/pkg/controllers/allocation/filter.go b/pkg/controllers/allocation/filter.go index dc6b4bc8319c..82d576c547a1 100644 --- a/pkg/controllers/allocation/filter.go +++ b/pkg/controllers/allocation/filter.go @@ -28,13 +28,13 @@ import ( ) type Filter struct { - kubeClient client.Client + KubeClient client.Client } func (f *Filter) GetProvisionablePods(ctx context.Context, provisioner *v1alpha2.Provisioner) ([]*v1.Pod, error) { // 1. List Pods that aren't scheduled pods := &v1.PodList{} - if err := f.kubeClient.List(ctx, pods, client.MatchingFields{"spec.nodeName": ""}); err != nil { + if err := f.KubeClient.List(ctx, pods, client.MatchingFields{"spec.nodeName": ""}); err != nil { return nil, fmt.Errorf("listing unscheduled pods, %w", err) } if len(pods.Items) == 0 { @@ -44,13 +44,7 @@ func (f *Filter) GetProvisionablePods(ctx context.Context, provisioner *v1alpha2 // 2. Filter pods that aren't provisionable provisionable := []*v1.Pod{} for _, p := range pods.Items { - if err := functional.ValidateAll( - func() error { return f.isUnschedulable(&p) }, - func() error { return f.matchesProvisioner(&p, provisioner) }, - func() error { return f.hasSupportedSchedulingConstraints(&p) }, - func() error { return pod.ToleratesTaints(&p.Spec, provisioner.Spec.Taints...) }, - func() error { return f.withValidConstraints(ctx, &p, provisioner) }, - ); err != nil { + if err := f.isProvisionable(ctx, &p, provisioner); err != nil { zap.S().Debugf("Ignored pod %s/%s when allocating for provisioner %s/%s, %s", p.Name, p.Namespace, provisioner.Name, provisioner.Namespace, @@ -63,6 +57,16 @@ func (f *Filter) GetProvisionablePods(ctx context.Context, provisioner *v1alpha2 return provisionable, nil } +func (f *Filter) isProvisionable(ctx context.Context, p *v1.Pod, provisioner *v1alpha2.Provisioner) error { + return functional.ValidateAll( + func() error { return f.isUnschedulable(p) }, + func() error { return f.matchesProvisioner(p, provisioner) }, + func() error { return f.hasSupportedSchedulingConstraints(p) }, + func() error { return pod.ToleratesTaints(&p.Spec, provisioner.Spec.Taints...) }, + func() error { return f.withValidConstraints(ctx, p, provisioner) }, + ) +} + func (f *Filter) isUnschedulable(p *v1.Pod) error { if !pod.FailedToSchedule(p) { return fmt.Errorf("awaiting scheduling") diff --git a/pkg/controllers/allocation/suite_test.go b/pkg/controllers/allocation/suite_test.go index 5ddc6056f9b7..191481ac090e 100644 --- a/pkg/controllers/allocation/suite_test.go +++ b/pkg/controllers/allocation/suite_test.go @@ -12,17 +12,20 @@ See the License for the specific language governing permissions and limitations under the License. */ -package allocation +package allocation_test import ( "context" "strings" "testing" + "time" "github.com/Pallinder/go-randomdata" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha2" "github.com/awslabs/karpenter/pkg/cloudprovider/fake" "github.com/awslabs/karpenter/pkg/cloudprovider/registry" + "github.com/awslabs/karpenter/pkg/controllers/allocation" + "github.com/awslabs/karpenter/pkg/packing" "github.com/awslabs/karpenter/pkg/test" "knative.dev/pkg/ptr" @@ -44,15 +47,19 @@ func TestAPIs(t *testing.T) { RunSpecs(t, "Provisioner/Allocator") } -var controller *Controller +var controller *allocation.Controller var env = test.NewEnvironment(func(e *test.Environment) { cloudProvider := &fake.CloudProvider{} registry.RegisterOrDie(cloudProvider) - controller = NewController( - e.Client, - corev1.NewForConfigOrDie(e.Config), - cloudProvider, - ) + controller = &allocation.Controller{ + Filter: &allocation.Filter{KubeClient: e.Client}, + Binder: &allocation.Binder{KubeClient: e.Client, CoreV1Client: corev1.NewForConfigOrDie(e.Config)}, + Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond), + Constraints: &allocation.Constraints{KubeClient: e.Client}, + Packer: packing.NewPacker(), + CloudProvider: cloudProvider, + KubeClient: e.Client, + } }) var _ = BeforeSuite(func() { @@ -87,6 +94,8 @@ var _ = Describe("Allocation", func() { Context("Zones", func() { It("should default to a cluster zone", func() { // Setup + ExpectCreated(env.Client, provisioner) + ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(provisioner)) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod()) // Assertions node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) @@ -95,6 +104,7 @@ var _ = Describe("Allocation", func() { It("should default to a provisioner's zone", func() { // Setup provisioner.Spec.Zones = []string{"test-zone-2"} + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod()) // Assertions node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) @@ -103,6 +113,7 @@ var _ = Describe("Allocation", func() { It("should allow a pod to override the zone", func() { // Setup provisioner.Spec.Zones = []string{"test-zone-1"} + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod(test.PodOptions{NodeSelector: map[string]string{v1alpha2.ZoneLabelKey: "test-zone-2"}}), ) @@ -112,6 +123,7 @@ var _ = Describe("Allocation", func() { }) }) It("should provision nodes for unconstrained pods", func() { + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod(), test.PendingPod(), ) @@ -155,10 +167,11 @@ var _ = Describe("Allocation", func() { // Ignored, invalid operating system test.PendingPod(test.PodOptions{NodeSelector: map[string]string{v1alpha2.OperatingSystemLabelKey: "unknown"}}), } + ExpectCreated(env.Client, provisioner) ExpectCreatedWithStatus(env.Client, schedulable...) ExpectCreatedWithStatus(env.Client, coschedulable...) ExpectCreatedWithStatus(env.Client, unschedulable...) - ExpectControllerSucceeded(controller, provisioner) + ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(provisioner)) nodes := &v1.NodeList{} Expect(env.Client.List(ctx, nodes)).To(Succeed()) @@ -203,9 +216,10 @@ var _ = Describe("Allocation", func() { Tolerations: []v1.Toleration{{Key: "invalid", Value: "test-value", Operator: v1.TolerationOpEqual}}, }), } + ExpectCreated(env.Client, provisioner) ExpectCreatedWithStatus(env.Client, schedulable...) ExpectCreatedWithStatus(env.Client, unschedulable...) - ExpectControllerSucceeded(controller, provisioner) + ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(provisioner)) nodes := &v1.NodeList{} Expect(env.Client.List(ctx, nodes)).To(Succeed()) @@ -220,6 +234,7 @@ var _ = Describe("Allocation", func() { } }) It("should provision nodes for accelerators", func() { + ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(env.Client, controller, provisioner, test.PendingPod(test.PodOptions{ ResourceRequirements: v1.ResourceRequirements{Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")}}, @@ -264,9 +279,10 @@ var _ = Describe("Allocation", func() { ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, }), } + ExpectCreated(env.Client, provisioner) ExpectCreatedWithStatus(env.Client, daemonsets...) ExpectCreatedWithStatus(env.Client, schedulable...) - ExpectControllerSucceeded(controller, provisioner) + ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(provisioner)) nodes := &v1.NodeList{} Expect(env.Client.List(ctx, nodes)).To(Succeed()) diff --git a/pkg/controllers/controller.go b/pkg/controllers/controller.go deleted file mode 100644 index d26def47cbe9..000000000000 --- a/pkg/controllers/controller.go +++ /dev/null @@ -1,60 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controllers - -import ( - "context" - "fmt" - - "go.uber.org/zap" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -// GenericController implements controllerruntime.Reconciler and runs a -// standardized reconciliation workflow against incoming resource watch events. -type GenericController struct { - Controller - client.Client -} - -// Reconcile executes a control loop for the resource -func (c *GenericController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - // 1. Read Spec - resource := c.For() - if err := c.Get(ctx, req.NamespacedName, resource); err != nil { - if errors.IsNotFound(err) { - return reconcile.Result{}, nil - } - return reconcile.Result{}, err - } - // 2. Copy object for merge patch base - persisted := resource.DeepCopyObject() - // 3. Reconcile - result, err := c.Controller.Reconcile(ctx, resource) - if err != nil { - zap.S().Errorf("Controller failed to reconcile kind %s, %s", resource.GetObjectKind().GroupVersionKind().Kind, err.Error()) - } - // 5. Update Status using a merge patch - // If the controller is reconciling nodes, don't patch - if _, ok := resource.(*v1.Node); !ok { - if err := c.Status().Patch(ctx, resource, client.MergeFrom(persisted)); err != nil { - return result, fmt.Errorf("Failed to persist changes to %s, %w", req.NamespacedName, err) - } - } - return result, err -} diff --git a/pkg/controllers/expiration/controller.go b/pkg/controllers/expiration/controller.go index 41ceaebda0af..cc9ef10760c0 100644 --- a/pkg/controllers/expiration/controller.go +++ b/pkg/controllers/expiration/controller.go @@ -109,7 +109,7 @@ func (c *Controller) provisionerToNodes(o client.Object) (requests []reconcile.R return requests } -func (c *Controller) Register(m manager.Manager) error { +func (c *Controller) Register(_ context.Context, m manager.Manager) error { return controllerruntime. NewControllerManagedBy(m). Named("Expiration"). diff --git a/pkg/controllers/expiration/suite_test.go b/pkg/controllers/expiration/suite_test.go index 801f48a5db07..fe0634d2acfa 100644 --- a/pkg/controllers/expiration/suite_test.go +++ b/pkg/controllers/expiration/suite_test.go @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package expiration +package expiration_test import ( "strings" @@ -20,6 +20,7 @@ import ( "github.com/Pallinder/go-randomdata" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha2" + "github.com/awslabs/karpenter/pkg/controllers/expiration" "github.com/awslabs/karpenter/pkg/test" . "github.com/awslabs/karpenter/pkg/test/expectations" . "github.com/onsi/ginkgo" @@ -34,9 +35,9 @@ func TestAPIs(t *testing.T) { RunSpecs(t, "Expiration") } -var controller *Controller +var controller *expiration.Controller var env = test.NewEnvironment(func(e *test.Environment) { - controller = NewController(e.Client) + controller = expiration.NewController(e.Client) }) var _ = BeforeSuite(func() { diff --git a/pkg/controllers/manager.go b/pkg/controllers/manager.go index b1bbd21e52d3..1c43aaf7b44c 100644 --- a/pkg/controllers/manager.go +++ b/pkg/controllers/manager.go @@ -17,15 +17,11 @@ package controllers import ( "context" "fmt" - "time" - "golang.org/x/time/rate" v1 "k8s.io/api/core/v1" "k8s.io/client-go/rest" - "k8s.io/client-go/util/workqueue" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -47,23 +43,10 @@ func NewManagerOrDie(config *rest.Config, options controllerruntime.Options) Man } // RegisterControllers registers a set of controllers to the controller manager -func (m *GenericControllerManager) RegisterControllers(controllers ...Controller) Manager { +func (m *GenericControllerManager) RegisterControllers(ctx context.Context, controllers ...Controller) Manager { for _, c := range controllers { - controlledObject := c.For() - builder := controllerruntime.NewControllerManagedBy(m). - For(controlledObject). - WithOptions(controller.Options{ - RateLimiter: workqueue.NewMaxOfRateLimiter( - workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second), - // 10 qps, 100 bucket size - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, - ), - }) - if namedController, ok := c.(NamedController); ok { - builder.Named(namedController.Name()) - } - if err := builder.Complete(&GenericController{Controller: c, Client: m.GetClient()}); err != nil { - panic(fmt.Sprintf("Failed to register controller to manager for %s", controlledObject)) + if err := c.Register(ctx, m); err != nil { + panic(err) } } if err := m.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/pkg/controllers/reallocation/controller.go b/pkg/controllers/reallocation/controller.go index a650ab2e5249..8ee85f60b9d5 100644 --- a/pkg/controllers/reallocation/controller.go +++ b/pkg/controllers/reallocation/controller.go @@ -21,9 +21,15 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha2" "github.com/awslabs/karpenter/pkg/cloudprovider" + "golang.org/x/time/rate" + "k8s.io/apimachinery/pkg/api/errors" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/util/workqueue" + controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -31,19 +37,7 @@ import ( type Controller struct { utilization *Utilization cloudProvider cloudprovider.CloudProvider -} - -// For returns the resource this controller is for. -func (c *Controller) For() client.Object { - return &v1alpha2.Provisioner{} -} - -func (c *Controller) Interval() time.Duration { - return 5 * time.Second -} - -func (c *Controller) Name() string { - return "provisioner/reallocator" + kubeClient client.Client } // NewController constructs a controller instance @@ -51,29 +45,56 @@ func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface return &Controller{ utilization: &Utilization{kubeClient: kubeClient}, cloudProvider: cloudProvider, + kubeClient: kubeClient, } } // Reconcile executes a reallocation control loop for the resource -func (c *Controller) Reconcile(ctx context.Context, object client.Object) (reconcile.Result, error) { - provisioner := object.(*v1alpha2.Provisioner) +func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + // 1. Retrieve provisioner from reconcile request + provisioner := &v1alpha2.Provisioner{} + if err := c.kubeClient.Get(ctx, req.NamespacedName, provisioner); err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + // Skip reconciliation if utilization ttl is not defined. if provisioner.Spec.TTLSecondsAfterEmpty == nil { return reconcile.Result{}, nil } - // 1. Set TTL on TTLable Nodes + // 2. Set TTL on TTLable Nodes if err := c.utilization.markUnderutilized(ctx, provisioner); err != nil { return reconcile.Result{}, fmt.Errorf("adding ttl and underutilized label, %w", err) } - // 2. Remove TTL from Utilized Nodes + // 3. Remove TTL from Utilized Nodes if err := c.utilization.clearUnderutilized(ctx, provisioner); err != nil { return reconcile.Result{}, fmt.Errorf("removing ttl from node, %w", err) } - // 3. Delete any node past its TTL + // 4. Delete any node past its TTL if err := c.utilization.terminateExpired(ctx, provisioner); err != nil { return reconcile.Result{}, fmt.Errorf("marking nodes terminable, %w", err) } - return reconcile.Result{RequeueAfter: c.Interval()}, nil + return reconcile.Result{RequeueAfter: 5 * time.Second}, nil +} + +func (c *Controller) Register(_ context.Context, m manager.Manager) error { + return controllerruntime. + NewControllerManagedBy(m). + Named("Reallocation"). + For(&v1alpha2.Provisioner{}). + WithOptions( + controller.Options{ + RateLimiter: workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second), + // 10 qps, 100 bucket size + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), + MaxConcurrentReconciles: 1, + }, + ). + Complete(c) } diff --git a/pkg/controllers/reallocation/suite_test.go b/pkg/controllers/reallocation/suite_test.go index 66cfbd696cc4..4f1dc2ece899 100644 --- a/pkg/controllers/reallocation/suite_test.go +++ b/pkg/controllers/reallocation/suite_test.go @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package reallocation +package reallocation_test import ( "context" @@ -24,6 +24,7 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha2" "github.com/awslabs/karpenter/pkg/cloudprovider/fake" "github.com/awslabs/karpenter/pkg/cloudprovider/registry" + "github.com/awslabs/karpenter/pkg/controllers/reallocation" "github.com/awslabs/karpenter/pkg/test" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -41,11 +42,11 @@ func TestAPIs(t *testing.T) { RunSpecs(t, "Provisioner/Reallocator") } -var controller *Controller +var controller *reallocation.Controller var env = test.NewEnvironment(func(e *test.Environment) { cloudProvider := &fake.CloudProvider{} registry.RegisterOrDie(cloudProvider) - controller = NewController( + controller = reallocation.NewController( e.Client, corev1.NewForConfigOrDie(e.Config), cloudProvider, @@ -89,8 +90,9 @@ var _ = Describe("Reallocation", func() { v1alpha2.ProvisionerNamespaceLabelKey: provisioner.Namespace, }, }) + ExpectCreated(env.Client, provisioner) ExpectCreatedWithStatus(env.Client, node) - ExpectControllerSucceeded(controller, provisioner) + ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(provisioner)) updatedNode := &v1.Node{} Expect(env.Client.Get(ctx, client.ObjectKey{Name: node.Name}, updatedNode)).To(Succeed()) @@ -108,6 +110,7 @@ var _ = Describe("Reallocation", func() { v1alpha2.ProvisionerTTLAfterEmptyKey: time.Now().Add(time.Duration(100) * time.Second).Format(time.RFC3339), }, }) + ExpectCreated(env.Client, provisioner) ExpectCreatedWithStatus(env.Client, node) ExpectCreatedWithStatus(env.Client, test.Pod(test.PodOptions{ Name: strings.ToLower(randomdata.SillyName()), @@ -115,7 +118,7 @@ var _ = Describe("Reallocation", func() { NodeName: node.Name, Conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}, })) - ExpectControllerSucceeded(controller, provisioner) + ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(provisioner)) updatedNode := &v1.Node{} Expect(env.Client.Get(ctx, client.ObjectKey{Name: node.Name}, updatedNode)).To(Succeed()) diff --git a/pkg/controllers/termination/controller.go b/pkg/controllers/termination/controller.go index e522efeb140d..1f6f5c359892 100644 --- a/pkg/controllers/termination/controller.go +++ b/pkg/controllers/termination/controller.go @@ -17,14 +17,21 @@ package termination import ( "context" "fmt" + "time" provisioning "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha2" "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/utils/functional" + "golang.org/x/time/rate" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/util/workqueue" + controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -32,15 +39,7 @@ import ( type Controller struct { terminator *Terminator cloudProvider cloudprovider.CloudProvider -} - -// For returns the resource this controller is for. -func (c *Controller) For() client.Object { - return &v1.Node{} -} - -func (c *Controller) Name() string { - return "terminator" + kubeClient client.Client } // NewController constructs a controller instance @@ -48,26 +47,35 @@ func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface return &Controller{ terminator: &Terminator{kubeClient: kubeClient, cloudProvider: cloudProvider, coreV1Client: coreV1Client}, cloudProvider: cloudProvider, + kubeClient: kubeClient, } } // Reconcile executes a termination control loop for the resource -func (c *Controller) Reconcile(ctx context.Context, object client.Object) (reconcile.Result, error) { - node := object.(*v1.Node) - // 1. Check if node is terminable +func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + // 1. Retrieve node from reconcile request + node := &v1.Node{} + if err := c.kubeClient.Get(ctx, req.NamespacedName, node); err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + + // 2. Check if node is terminable if node.DeletionTimestamp.IsZero() || !functional.ContainsString(node.Finalizers, provisioning.KarpenterFinalizer) { return reconcile.Result{}, nil } - // 2. Cordon node + // 3. Cordon node if err := c.terminator.cordon(ctx, node); err != nil { return reconcile.Result{}, fmt.Errorf("cordoning node %s, %w", node.Name, err) } - // 3. Drain node + // 4. Drain node drained, err := c.terminator.drain(ctx, node) if err != nil { return reconcile.Result{}, fmt.Errorf("draining node %s, %w", node.Name, err) } - // 4. If fully drained, terminate the node + // 5. If fully drained, terminate the node if drained { if err := c.terminator.terminate(ctx, node); err != nil { return reconcile.Result{}, fmt.Errorf("terminating node %s, %w", node.Name, err) @@ -75,3 +83,21 @@ func (c *Controller) Reconcile(ctx context.Context, object client.Object) (recon } return reconcile.Result{Requeue: !drained}, nil } + +func (c *Controller) Register(_ context.Context, m manager.Manager) error { + return controllerruntime. + NewControllerManagedBy(m). + Named("Termination"). + For(&v1.Node{}). + WithOptions( + controller.Options{ + RateLimiter: workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second), + // 10 qps, 100 bucket size + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), + MaxConcurrentReconciles: 1, + }, + ). + Complete(c) +} diff --git a/pkg/controllers/termination/suite_test.go b/pkg/controllers/termination/suite_test.go index 3e5a88362002..b7f94a09de2a 100644 --- a/pkg/controllers/termination/suite_test.go +++ b/pkg/controllers/termination/suite_test.go @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package termination +package termination_test import ( "context" @@ -21,7 +21,9 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha2" "github.com/awslabs/karpenter/pkg/cloudprovider/fake" "github.com/awslabs/karpenter/pkg/cloudprovider/registry" + "github.com/awslabs/karpenter/pkg/controllers/termination" "github.com/awslabs/karpenter/pkg/test" + "sigs.k8s.io/controller-runtime/pkg/client" . "github.com/awslabs/karpenter/pkg/test/expectations" . "github.com/onsi/ginkgo" @@ -35,11 +37,11 @@ func TestAPIs(t *testing.T) { RunSpecs(t, "Termination") } -var controller *Controller +var controller *termination.Controller var env = test.NewEnvironment(func(e *test.Environment) { cloudProvider := &fake.CloudProvider{} registry.RegisterOrDie(cloudProvider) - controller = NewController( + controller = termination.NewController( e.Client, corev1.NewForConfigOrDie(e.Config), cloudProvider, @@ -72,7 +74,7 @@ var _ = Describe("Termination", func() { ExpectCreated(env.Client, node) Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(env.Client, node.Name) - ExpectControllerSucceeded(controller, node) + ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(node)) ExpectNotFound(env.Client, node) }) It("should not evict pods that tolerate unschedulable taint", func() { @@ -86,7 +88,7 @@ var _ = Describe("Termination", func() { // Trigger Termination Controller Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(env.Client, node.Name) - ExpectControllerSucceeded(controller, node) + ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(node)) // Expect podToEvict to be evicting, and delete it podEvict = ExpectPodExists(env.Client, podEvict.Name, podEvict.Namespace) @@ -98,7 +100,7 @@ var _ = Describe("Termination", func() { // Reconcile to delete node node = ExpectNodeExists(env.Client, node.Name) - ExpectControllerSucceeded(controller, node) + ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(node)) ExpectNotFound(env.Client, node) }) It("should not terminate nodes that have a do-not-evict pod", func() { @@ -112,7 +114,7 @@ var _ = Describe("Termination", func() { Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(env.Client, node.Name) - ExpectControllerSucceeded(controller, node) + ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(node)) // Expect node to exist, but be cordoned node = ExpectNodeExists(env.Client, node.Name) @@ -129,7 +131,7 @@ var _ = Describe("Termination", func() { // Reconcile node to evict pod node = ExpectNodeExists(env.Client, node.Name) - ExpectControllerSucceeded(controller, node) + ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(node)) pod := ExpectPodExists(env.Client, podEvict.Name, podEvict.Namespace) Expect(pod.GetObjectMeta().GetDeletionTimestamp().IsZero()).To(BeFalse()) @@ -138,7 +140,7 @@ var _ = Describe("Termination", func() { // Terminate Node node = ExpectNodeExists(env.Client, node.Name) - ExpectControllerSucceeded(controller, node) + ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(node)) ExpectNotFound(env.Client, node) }) }) diff --git a/pkg/controllers/types.go b/pkg/controllers/types.go index 19861dfaa026..8d040aa79f0a 100644 --- a/pkg/controllers/types.go +++ b/pkg/controllers/types.go @@ -17,7 +17,6 @@ package controllers import ( "context" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -27,22 +26,13 @@ type Controller interface { // Reconcile hands a hydrated kubernetes resource to the controller for // reconciliation. Any changes made to the resource's status are persisted // after Reconcile returns, even if it returns an error. - Reconcile(context.Context, client.Object) (reconcile.Result, error) - // For returns a default instantiation of the resource and is injected by - // data from the API Server at the start of the reconciliation loop. - For() client.Object -} - -// NamedController allows controllers to optionally implement a Name() function which will be used instead of the -// reconciled resource's name. This is useful when writing multiple controllers for a single resource type. -type NamedController interface { - Controller - // Name returns the name of the controller - Name() string + Reconcile(context.Context, reconcile.Request) (reconcile.Result, error) + // Register will register the controller with the manager + Register(context.Context, manager.Manager) error } // Manager manages a set of controllers and webhooks. type Manager interface { manager.Manager - RegisterControllers(controllers ...Controller) Manager + RegisterControllers(context.Context, ...Controller) Manager } diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index a36f35adcf92..d81e66b1cef7 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -20,7 +20,6 @@ import ( "time" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha2" - "github.com/awslabs/karpenter/pkg/controllers" . "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" @@ -105,11 +104,11 @@ func ExpectCleanedUp(c client.Client) { } } -func ExpectProvisioningSucceeded(c client.Client, controller controllers.Controller, provisioner *v1alpha2.Provisioner, pods ...*v1.Pod) []*v1.Pod { +func ExpectProvisioningSucceeded(c client.Client, reconciler reconcile.Reconciler, provisioner *v1alpha2.Provisioner, pods ...*v1.Pod) []*v1.Pod { for _, pod := range pods { ExpectCreatedWithStatus(c, pod) } - ExpectControllerSucceeded(controller, provisioner) + ExpectReconcileSucceeded(reconciler, client.ObjectKeyFromObject(provisioner)) result := []*v1.Pod{} for _, pod := range pods { result = append(result, ExpectPodExists(c, pod.GetName(), pod.GetNamespace())) @@ -117,11 +116,11 @@ func ExpectProvisioningSucceeded(c client.Client, controller controllers.Control return result } -func ExpectProvisioningFailed(c client.Client, controller controllers.Controller, provisioner *v1alpha2.Provisioner, pods ...*v1.Pod) []*v1.Pod { +func ExpectProvisioningFailed(c client.Client, reconciler reconcile.Reconciler, provisioner *v1alpha2.Provisioner, pods ...*v1.Pod) []*v1.Pod { for _, pod := range pods { ExpectCreatedWithStatus(c, pod) } - ExpectControllerFailed(controller, provisioner) + ExpectReconcileFailed(reconciler, client.ObjectKeyFromObject(provisioner)) result := []*v1.Pod{} for _, pod := range pods { result = append(result, ExpectPodExists(c, pod.GetName(), pod.GetNamespace())) @@ -129,18 +128,6 @@ func ExpectProvisioningFailed(c client.Client, controller controllers.Controller return result } -// ExpectControllerFailed should be deprecated in favor of ExpectReconcileFailed (TODO, once we move off of generic controller) -func ExpectControllerFailed(controller controllers.Controller, object client.Object) { - _, err := controller.Reconcile(context.Background(), object) - Expect(err).To(HaveOccurred()) -} - -// ExpectControllerSucceeded should be deprecated in favor of ExpectReconcileSucceeded (TODO, once we move off of generic controller) -func ExpectControllerSucceeded(controller controllers.Controller, object client.Object) { - _, err := controller.Reconcile(context.Background(), object) - Expect(err).ToNot(HaveOccurred()) -} - func ExpectReconcileFailed(reconciler reconcile.Reconciler, key client.ObjectKey) { _, err := reconciler.Reconcile(context.Background(), reconcile.Request{NamespacedName: key}) Expect(err).To(HaveOccurred())