diff --git a/pkg/apis/provisioning/v1alpha3/provisioner.go b/pkg/apis/provisioning/v1alpha3/provisioner.go index 203db71a4eed..a24c9e2dbee4 100644 --- a/pkg/apis/provisioning/v1alpha3/provisioner.go +++ b/pkg/apis/provisioning/v1alpha3/provisioner.go @@ -18,6 +18,7 @@ import ( "github.com/awslabs/karpenter/pkg/utils/functional" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) // ProvisionerSpec is the top level provisioner specification. Provisioners @@ -130,6 +131,9 @@ var ( // Finalizers KarpenterFinalizer = SchemeGroupVersion.Group + "/termination" + + // Default provisioner + DefaultProvisioner = types.NamespacedName{Name: "default"} ) // Provisioner is the Schema for the Provisioners API diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index 4edd7bfd3830..ba5c5d23da02 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -16,7 +16,6 @@ package aws import ( "context" - "strings" "testing" "time" @@ -62,7 +61,7 @@ var env = test.NewEnvironment(func(e *test.Environment) { fakeEC2API, NewAMIProvider(&fake.SSMAPI{}, clientSet), NewSecurityGroupProvider(fakeEC2API), - cache.New(CacheTTL, CacheCleanupInterval), + launchTemplateCache, }, subnetProvider: NewSubnetProvider(fakeEC2API), instanceTypeProvider: NewInstanceTypeProvider(fakeEC2API), @@ -96,8 +95,7 @@ var _ = Describe("Allocation", func() { ctx = context.Background() provisioner = &v1alpha3.Provisioner{ ObjectMeta: metav1.ObjectMeta{ - Name: strings.ToLower(randomdata.SillyName()), - Namespace: "default", + Name: v1alpha3.DefaultProvisioner.Name, }, Spec: v1alpha3.ProvisionerSpec{ Cluster: v1alpha3.Cluster{ diff --git a/pkg/controllers/allocation/controller.go b/pkg/controllers/allocation/controller.go index 76d1202918f4..dff5db059bd3 100644 --- a/pkg/controllers/allocation/controller.go +++ b/pkg/controllers/allocation/controller.go @@ -33,10 +33,13 @@ import ( corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/workqueue" controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -73,16 +76,21 @@ func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface // Reconcile executes an allocation control loop for the resource func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { // 1. Fetch provisioner - persistedProvisioner, provisionerWithDefaults, err := c.retrieveProvisionerFrom(ctx, req) + provisioner, err := c.provisionerFor(ctx, req.NamespacedName) if err != nil { + if errors.IsNotFound(err) { + c.Batcher.Wait(&v1alpha3.Provisioner{}) + zap.S().Errorf("Provisioner \"%s\" not found. Create the \"default\" provisioner or specify an alternative using the nodeSelector %s", req.Name, v1alpha3.ProvisionerNameLabelKey) + return reconcile.Result{}, nil + } return reconcile.Result{}, err } // 2. Wait on a pod batch - c.Batcher.Wait(provisionerWithDefaults) + c.Batcher.Wait(provisioner) // 3. Filter pods - pods, err := c.Filter.GetProvisionablePods(ctx, provisionerWithDefaults) + pods, err := c.Filter.GetProvisionablePods(ctx, provisioner) if err != nil { return reconcile.Result{}, fmt.Errorf("filtering pods, %w", err) } @@ -92,7 +100,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco zap.S().Infof("Found %d provisionable pods", len(pods)) // 4. Group by constraints - constraintGroups, err := c.Constraints.Group(ctx, provisionerWithDefaults, pods) + constraintGroups, err := c.Constraints.Group(ctx, provisioner, pods) if err != nil { return reconcile.Result{}, fmt.Errorf("building constraint groups, %w", err) } @@ -109,7 +117,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco // 6. Create packedNodes for packings and also copy all Status changes made by the // cloud provider to the original provisioner instance. - packedNodes, err := c.CloudProvider.Create(ctx, persistedProvisioner, packings) + packedNodes, err := c.CloudProvider.Create(ctx, provisioner, packings) if err != nil { return reconcile.Result{}, fmt.Errorf("creating capacity, %w", err) } @@ -133,6 +141,14 @@ func (c *Controller) Register(ctx context.Context, m manager.Manager) error { Watches( &source.Kind{Type: &v1.Pod{}}, handler.EnqueueRequestsFromMapFunc(c.podToProvisioner), + // Only process pod update events + builder.WithPredicates( + predicate.Funcs{ + CreateFunc: func(_ event.CreateEvent) bool { return false }, + DeleteFunc: func(_ event.DeleteEvent) bool { return false }, + GenericFunc: func(_ event.GenericEvent) bool { return false }, + }, + ), ). WithOptions( controller.Options{ @@ -149,34 +165,46 @@ func (c *Controller) Register(ctx context.Context, m manager.Manager) error { return err } -// retrieveProvisionerFrom fetches the provisioner and returns a raw provisioner that was persisted in the api server -// and a provisioner w/ default runtime values added that should not be persisted -func (c *Controller) retrieveProvisionerFrom(ctx context.Context, req reconcile.Request) (*v1alpha3.Provisioner, *v1alpha3.Provisioner, error) { - persistedProvisioner := &v1alpha3.Provisioner{} - if err := c.KubeClient.Get(ctx, req.NamespacedName, persistedProvisioner); err != nil { - if errors.IsNotFound(err) { - return nil, nil, nil - } - return nil, nil, err +// provisionerFor fetches the provisioner and returns a provisioner w/ default runtime values +func (c *Controller) provisionerFor(ctx context.Context, name types.NamespacedName) (*v1alpha3.Provisioner, error) { + provisioner := &v1alpha3.Provisioner{} + if err := c.KubeClient.Get(ctx, name, provisioner); err != nil { + return nil, err } // Hydrate provisioner with (dynamic) default values, which must not // be persisted into the original CRD as they might change with each reconciliation // loop iteration. - provisionerWithDefaults, err := persistedProvisioner.WithDynamicDefaults() + provisionerWithDefaults, err := provisioner.WithDynamicDefaults() if err != nil { - return persistedProvisioner, &provisionerWithDefaults, fmt.Errorf("setting dynamic default values, %w", err) + return &provisionerWithDefaults, fmt.Errorf("setting dynamic default values, %w", err) } - return persistedProvisioner, &provisionerWithDefaults, nil + return &provisionerWithDefaults, nil } // podToProvisioner is a function handler to transform pod objs to provisioner reconcile requests func (c *Controller) podToProvisioner(o client.Object) (requests []reconcile.Request) { pod := o.(*v1.Pod) ctx := context.Background() - provisioner, err := c.getProvisionerFor(ctx, pod) + if err := c.Filter.isUnschedulable(pod); err != nil { + return nil + } + provisionerKey := v1alpha3.DefaultProvisioner + if name, ok := pod.Spec.NodeSelector[v1alpha3.ProvisionerNameLabelKey]; ok { + provisionerKey.Name = name + } + provisioner, err := c.provisionerFor(ctx, provisionerKey) if err != nil { - zap.S().Errorf("Retrieving provisioner, %s", err.Error()) + if errors.IsNotFound(err) { + // Queue and batch a reconcile request for a non-existent, empty provisioner + // This will reduce the number of repeated error messages about a provisioner not existing + c.Batcher.Add(&v1alpha3.Provisioner{}) + notFoundProvisioner := v1alpha3.DefaultProvisioner.Name + if name, ok := pod.Spec.NodeSelector[v1alpha3.ProvisionerNameLabelKey]; ok { + notFoundProvisioner = name + } + return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: notFoundProvisioner}}} + } return nil } if err = c.Filter.isProvisionable(ctx, pod, provisioner); err != nil { @@ -185,19 +213,3 @@ func (c *Controller) podToProvisioner(o client.Object) (requests []reconcile.Req c.Batcher.Add(provisioner) return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: provisioner.Name}}} } - -// getProvisionerFor retrieves the provisioner responsible for the pod -func (c *Controller) getProvisionerFor(ctx context.Context, p *v1.Pod) (*v1alpha3.Provisioner, error) { - provisionerKey := client.ObjectKey{Name: "default"} - if name, ok := p.Spec.NodeSelector[v1alpha3.ProvisionerNameLabelKey]; ok { - provisionerKey.Name = name - } - provisioner := &v1alpha3.Provisioner{} - if err := c.KubeClient.Get(ctx, provisionerKey, provisioner); err != nil { - if errors.IsNotFound(err) { - return nil, fmt.Errorf("create a default provisioner, or specify an alternative using the nodeSelector %s", v1alpha3.ProvisionerNameLabelKey) - } - return nil, err - } - return provisioner, nil -} diff --git a/pkg/controllers/allocation/filter.go b/pkg/controllers/allocation/filter.go index bc2bb99a8233..02f9376285ce 100644 --- a/pkg/controllers/allocation/filter.go +++ b/pkg/controllers/allocation/filter.go @@ -88,14 +88,11 @@ func (f *Filter) hasSupportedSchedulingConstraints(pod *v1.Pod) error { } func (f *Filter) matchesProvisioner(pod *v1.Pod, provisioner *v1alpha3.Provisioner) error { - if pod.Spec.NodeSelector == nil { - return nil - } name, ok := pod.Spec.NodeSelector[v1alpha3.ProvisionerNameLabelKey] - if !ok { + if ok && provisioner.Name == name { return nil } - if name == provisioner.Name { + if !ok && provisioner.Name == v1alpha3.DefaultProvisioner.Name { return nil } return fmt.Errorf("matched another provisioner, %s", name) diff --git a/pkg/controllers/allocation/suite_test.go b/pkg/controllers/allocation/suite_test.go index c51b743fffe8..660886641151 100644 --- a/pkg/controllers/allocation/suite_test.go +++ b/pkg/controllers/allocation/suite_test.go @@ -16,11 +16,9 @@ package allocation_test import ( "context" - "strings" "testing" "time" - "github.com/Pallinder/go-randomdata" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha3" "github.com/awslabs/karpenter/pkg/cloudprovider/fake" "github.com/awslabs/karpenter/pkg/cloudprovider/registry" @@ -76,8 +74,7 @@ var _ = Describe("Allocation", func() { BeforeEach(func() { provisioner = &v1alpha3.Provisioner{ ObjectMeta: metav1.ObjectMeta{ - Name: strings.ToLower(randomdata.SillyName()), - Namespace: "default", + Name: v1alpha3.DefaultProvisioner.Name, }, Spec: v1alpha3.ProvisionerSpec{ Cluster: v1alpha3.Cluster{Name: ptr.String("test-cluster"), Endpoint: "http://test-cluster", CABundle: ptr.String("dGVzdC1jbHVzdGVyCg==")}, @@ -135,15 +132,9 @@ var _ = Describe("Allocation", func() { } }) It("should provision nodes for pods with supported node selectors", func() { - coschedulable := []client.Object{ - // Unconstrained - test.PendingPod(), - // Constrained by provisioner - test.PendingPod(test.PodOptions{ - NodeSelector: map[string]string{v1alpha3.ProvisionerNameLabelKey: provisioner.Name}, - }), - } schedulable := []client.Object{ + // Constrained by provisioner + test.PendingPod(test.PodOptions{NodeSelector: map[string]string{v1alpha3.ProvisionerNameLabelKey: provisioner.Name}}), // Constrained by zone test.PendingPod(test.PodOptions{NodeSelector: map[string]string{v1alpha3.ZoneLabelKey: "test-zone-1"}}), // Constrained by instanceType @@ -169,14 +160,13 @@ var _ = Describe("Allocation", func() { } ExpectCreated(env.Client, provisioner) ExpectCreatedWithStatus(env.Client, schedulable...) - ExpectCreatedWithStatus(env.Client, coschedulable...) ExpectCreatedWithStatus(env.Client, unschedulable...) ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(provisioner)) nodes := &v1.NodeList{} Expect(env.Client.List(ctx, nodes)).To(Succeed()) Expect(len(nodes.Items)).To(Equal(6)) // 5 schedulable -> 5 node, 2 coschedulable -> 1 node - for _, pod := range append(schedulable, coschedulable...) { + for _, pod := range schedulable { scheduled := ExpectPodExists(env.Client, pod.GetName(), pod.GetNamespace()) node := ExpectNodeExists(env.Client, scheduled.Spec.NodeName) for key, value := range scheduled.Spec.NodeSelector { diff --git a/pkg/controllers/expiration/suite_test.go b/pkg/controllers/expiration/suite_test.go index 64be25cba9ee..bcd7a2f44b87 100644 --- a/pkg/controllers/expiration/suite_test.go +++ b/pkg/controllers/expiration/suite_test.go @@ -15,10 +15,8 @@ limitations under the License. package expiration_test import ( - "strings" "testing" - "github.com/Pallinder/go-randomdata" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha3" "github.com/awslabs/karpenter/pkg/controllers/expiration" "github.com/awslabs/karpenter/pkg/test" @@ -53,7 +51,7 @@ var _ = Describe("Reconciliation", func() { BeforeEach(func() { provisioner = &v1alpha3.Provisioner{ - ObjectMeta: metav1.ObjectMeta{Name: strings.ToLower(randomdata.SillyName())}, + ObjectMeta: metav1.ObjectMeta{Name: v1alpha3.DefaultProvisioner.Name}, Spec: v1alpha3.ProvisionerSpec{ Cluster: v1alpha3.Cluster{Name: ptr.String("test-cluster"), Endpoint: "http://test-cluster", CABundle: ptr.String("dGVzdC1jbHVzdGVyCg==")}, TTLSecondsUntilExpired: ptr.Int64(30), diff --git a/pkg/controllers/reallocation/suite_test.go b/pkg/controllers/reallocation/suite_test.go index 1fdf98cade08..02dcce6335f4 100644 --- a/pkg/controllers/reallocation/suite_test.go +++ b/pkg/controllers/reallocation/suite_test.go @@ -67,7 +67,7 @@ var _ = Describe("Reallocation", func() { BeforeEach(func() { provisioner = &v1alpha3.Provisioner{ - ObjectMeta: metav1.ObjectMeta{Name: strings.ToLower(randomdata.SillyName())}, + ObjectMeta: metav1.ObjectMeta{Name: v1alpha3.DefaultProvisioner.Name}, Spec: v1alpha3.ProvisionerSpec{ Cluster: v1alpha3.Cluster{Name: ptr.String("test-cluster"), Endpoint: "http://test-cluster", CABundle: ptr.String("dGVzdC1jbHVzdGVyCg==")}, TTLSecondsAfterEmpty: ptr.Int64(300),