diff --git a/pkg/apis/provisioning/v1alpha5/constraints.go b/pkg/apis/provisioning/v1alpha5/constraints.go index 750a123819dd..268e68c70657 100644 --- a/pkg/apis/provisioning/v1alpha5/constraints.go +++ b/pkg/apis/provisioning/v1alpha5/constraints.go @@ -39,8 +39,8 @@ type Constraints struct { Provider *runtime.RawExtension `json:"provider,omitempty"` } -// Supports returns true if the pod's requirements are met by the constraints -func (c *Constraints) Supports(pod *v1.Pod) error { +// ValidatePod returns an error if the pod's requirements are not met by the constraints +func (c *Constraints) ValidatePod(pod *v1.Pod) error { // Tolerate Taints if err := c.Taints.Tolerates(pod); err != nil { return err diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index 6cda69d081ce..c85d0f632940 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -82,6 +82,11 @@ func (c *CloudProvider) GetInstanceTypes(_ context.Context, _ *v1alpha5.Constrai NewInstanceType(InstanceTypeOptions{ name: "default-instance-type", }), + NewInstanceType(InstanceTypeOptions{ + name: "small-instance-type", + cpu: resource.MustParse("2"), + memory: resource.MustParse("2Gi"), + }), NewInstanceType(InstanceTypeOptions{ name: "nvidia-gpu-instance-type", nvidiaGPUs: resource.MustParse("2"), diff --git a/pkg/cloudprovider/fake/instancetype.go b/pkg/cloudprovider/fake/instancetype.go index bc8eb54f9d2d..63ef5e3d7705 100644 --- a/pkg/cloudprovider/fake/instancetype.go +++ b/pkg/cloudprovider/fake/instancetype.go @@ -110,5 +110,8 @@ func (i *InstanceType) AWSNeurons() *resource.Quantity { } func (i *InstanceType) Overhead() v1.ResourceList { - return v1.ResourceList{} + return v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("100m"), + v1.ResourceMemory: resource.MustParse("10Mi"), + } } diff --git a/pkg/controllers/provisioning/binpacking/packable.go b/pkg/controllers/provisioning/binpacking/packable.go index daa63ffcb1f0..52f45747b135 100644 --- a/pkg/controllers/provisioning/binpacking/packable.go +++ b/pkg/controllers/provisioning/binpacking/packable.go @@ -45,7 +45,7 @@ func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceTyp packables := []*Packable{} for _, instanceType := range instanceTypes { packable := PackableFor(instanceType) - // 1. First pass at filtering down to viable instance types; + // First pass at filtering down to viable instance types; // additional filtering will be done by later steps (such as // removing instance types that obviously lack resources, such // as GPUs, for the workload being presented). @@ -63,12 +63,12 @@ func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceTyp ); err != nil { continue } - // 2. Calculate Kubelet Overhead + // Calculate Kubelet Overhead if ok := packable.reserve(instanceType.Overhead()); !ok { logging.FromContext(ctx).Debugf("Excluding instance type %s because there are not enough resources for kubelet and system overhead", packable.Name()) continue } - // 3. Calculate Daemonset Overhead + // Calculate Daemonset Overhead if len(packable.Pack(schedule.Daemons).unpacked) > 0 { logging.FromContext(ctx).Debugf("Excluding instance type %s because there are not enough resources for daemons", packable.Name()) continue diff --git a/pkg/controllers/provisioning/scheduler.go b/pkg/controllers/provisioning/scheduler.go index 6a5fbc27a22b..5319bfdf07c5 100644 --- a/pkg/controllers/provisioning/scheduler.go +++ b/pkg/controllers/provisioning/scheduler.go @@ -80,7 +80,7 @@ func (s *Scheduler) Schedule(ctx context.Context, pod *v1.Pod) error { var provisioner *Provisioner var errs error for _, candidate := range s.controller.List(ctx) { - if err := candidate.Spec.DeepCopy().Supports(pod); err != nil { + if err := candidate.Spec.DeepCopy().ValidatePod(pod); err != nil { errs = multierr.Append(errs, fmt.Errorf("tried provisioner/%s: %w", candidate.Name, err)) } else { provisioner = candidate diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 406694649b30..6c241fc44363 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -113,7 +113,7 @@ func (s *Scheduler) getSchedules(ctx context.Context, constraints *v1alpha5.Cons // schedule uniqueness is tracked by hash(Constraints) schedules := map[uint64]*Schedule{} for _, pod := range pods { - if err := constraints.Supports(pod); err != nil { + if err := constraints.ValidatePod(pod); err != nil { logging.FromContext(ctx).Infof("Unable to schedule pod %s/%s, %s", pod.Name, pod.Namespace, err.Error()) continue } @@ -147,17 +147,15 @@ func (s *Scheduler) getSchedules(ctx context.Context, constraints *v1alpha5.Cons } func (s *Scheduler) getDaemons(ctx context.Context, constraints *v1alpha5.Constraints) ([]*v1.Pod, error) { - // 1. Get DaemonSets daemonSetList := &appsv1.DaemonSetList{} if err := s.KubeClient.List(ctx, daemonSetList); err != nil { return nil, fmt.Errorf("listing daemonsets, %w", err) } - - // 2. filter DaemonSets to include those that will schedule on this node + // Include daemonsets that will schedule on this node pods := []*v1.Pod{} for _, daemonSet := range daemonSetList.Items { pod := &v1.Pod{Spec: daemonSet.Spec.Template.Spec} - if constraints.Supports(pod) != nil { + if constraints.ValidatePod(pod) == nil { pods = append(pods, pod) } } diff --git a/pkg/controllers/provisioning/suite_test.go b/pkg/controllers/provisioning/suite_test.go index 624c4d5b8c9a..736ad0bea105 100644 --- a/pkg/controllers/provisioning/suite_test.go +++ b/pkg/controllers/provisioning/suite_test.go @@ -25,7 +25,6 @@ import ( "github.com/awslabs/karpenter/pkg/test" "github.com/awslabs/karpenter/pkg/utils/resources" - appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -134,33 +133,81 @@ var _ = Describe("Provisioning", func() { ExpectScheduled(ctx, env.Client, pod) } }) - It("should account for daemonsets", func() { - ExpectCreated(env.Client, &appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{Name: "daemons", Namespace: "default"}, - Spec: appsv1.DaemonSetSpec{ - Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": "test"}}, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": "test"}}, - Spec: test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, - }).Spec, + Context("Daemonsets and Node Overhead", func() { + It("should account for overhead", func() { + ExpectCreated(env.Client, test.DaemonSet( + test.DaemonSetOptions{PodOptions: test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, }}, - }) - for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, - test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, - }), - test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, - }), - test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, - }), - ) { + )) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, + }, + ))[0] node := ExpectScheduled(ctx, env.Client, pod) Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("4"))) Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("4Gi"))) - } + }) + It("should not schedule if overhead is too large", func() { + ExpectCreated(env.Client, test.DaemonSet( + test.DaemonSetOptions{PodOptions: test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("10000"), v1.ResourceMemory: resource.MustParse("10000Gi")}}, + }}, + )) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(test.PodOptions{}))[0] + ExpectNotScheduled(ctx, env.Client, pod) + }) + It("should ignore daemonsets without matching tolerations", func() { + provisioner.Spec.Taints = v1alpha5.Taints{{Key: "foo", Value: "bar", Effect: v1.TaintEffectNoSchedule}} + ExpectCreated(env.Client, test.DaemonSet( + test.DaemonSetOptions{PodOptions: test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, + }}, + )) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{ + Tolerations: []v1.Toleration{{Operator: v1.TolerationOperator(v1.NodeSelectorOpExists)}}, + ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, + }, + ))[0] + node := ExpectScheduled(ctx, env.Client, pod) + Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("2"))) + Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("2Gi"))) + }) + It("should ignore daemonsets with an invalid selector", func() { + ExpectCreated(env.Client, test.DaemonSet( + test.DaemonSetOptions{PodOptions: test.PodOptions{ + NodeSelector: map[string]string{"node": "invalid"}, + ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, + }}, + )) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, + }, + ))[0] + node := ExpectScheduled(ctx, env.Client, pod) + Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("2"))) + Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("2Gi"))) + }) + It("should ignore daemonsets that don't match pod constraints", func() { + ExpectCreated(env.Client, test.DaemonSet( + test.DaemonSetOptions{PodOptions: test.PodOptions{ + NodeRequirements: []v1.NodeSelectorRequirement{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}}}, + ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, + }}, + )) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{ + NodeRequirements: []v1.NodeSelectorRequirement{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2"}}}, + ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, + }, + ))[0] + node := ExpectScheduled(ctx, env.Client, pod) + Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("2"))) + Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("2Gi"))) + }) }) Context("Labels", func() { It("should label nodes", func() { diff --git a/pkg/test/daemonsets.go b/pkg/test/daemonsets.go new file mode 100644 index 000000000000..619caa98cf57 --- /dev/null +++ b/pkg/test/daemonsets.go @@ -0,0 +1,64 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test + +import ( + "fmt" + "strings" + + "github.com/Pallinder/go-randomdata" + "github.com/imdario/mergo" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// DaemonSetOptions customizes a DaemonSet. +type DaemonSetOptions struct { + Name string + Namespace string + Selector map[string]string + PodOptions PodOptions +} + +// DaemonSet creates a test pod with defaults that can be overriden by DaemonSetOptions. +// Overrides are applied in order, with a last write wins semantic. +func DaemonSet(overrides ...DaemonSetOptions) *appsv1.DaemonSet { + options := DaemonSetOptions{} + for _, opts := range overrides { + if err := mergo.Merge(&options, opts, mergo.WithOverride); err != nil { + panic(fmt.Sprintf("Failed to merge pod options: %s", err.Error())) + } + } + if options.Name == "" { + options.Name = strings.ToLower(randomdata.SillyName()) + } + if options.Namespace == "" { + options.Namespace = "default" + } + if options.Selector == nil { + options.Selector = map[string]string{"app": options.Name} + } + return &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{Name: options.Name, Namespace: options.Namespace}, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{MatchLabels: options.Selector}, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: options.Selector}, + Spec: Pod(options.PodOptions).Spec, + }}, + } +} diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index d0151e01ea1a..9d8fecca8a9b 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -22,6 +22,7 @@ import ( //nolint:revive,stylecheck . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/errors" @@ -133,6 +134,11 @@ func ExpectCleanedUp(c client.Client) { for i := range provisioners.Items { ExpectDeleted(c, &provisioners.Items[i]) } + daemonsets := appsv1.DaemonSetList{} + Expect(c.List(ctx, &daemonsets)).To(Succeed()) + for i := range daemonsets.Items { + ExpectDeleted(c, &daemonsets.Items[i]) + } } func ExpectProvisioned(ctx context.Context, c client.Client, scheduler *provisioning.Scheduler, controller *provisioning.Controller, provisioner *v1alpha5.Provisioner, pods ...*v1.Pod) (result []*v1.Pod) {