Skip to content

Commit

Permalink
Fixed a bug where daemonsets weren't correctly included in binpacking…
Browse files Browse the repository at this point in the history
… calculations (#814)

* Fixed a bug where daemonsets weren't correctly factored in
  • Loading branch information
ellistarn authored Nov 20, 2021
1 parent 596a192 commit 3ed8570
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 36 deletions.
4 changes: 2 additions & 2 deletions pkg/apis/provisioning/v1alpha5/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type Constraints struct {
Provider *runtime.RawExtension `json:"provider,omitempty"`
}

// Supports returns true if the pod's requirements are met by the constraints
func (c *Constraints) Supports(pod *v1.Pod) error {
// ValidatePod returns an error if the pod's requirements are not met by the constraints
func (c *Constraints) ValidatePod(pod *v1.Pod) error {
// Tolerate Taints
if err := c.Taints.Tolerates(pod); err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func (c *CloudProvider) GetInstanceTypes(_ context.Context, _ *v1alpha5.Constrai
NewInstanceType(InstanceTypeOptions{
name: "default-instance-type",
}),
NewInstanceType(InstanceTypeOptions{
name: "small-instance-type",
cpu: resource.MustParse("2"),
memory: resource.MustParse("2Gi"),
}),
NewInstanceType(InstanceTypeOptions{
name: "nvidia-gpu-instance-type",
nvidiaGPUs: resource.MustParse("2"),
Expand Down
5 changes: 4 additions & 1 deletion pkg/cloudprovider/fake/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,8 @@ func (i *InstanceType) AWSNeurons() *resource.Quantity {
}

func (i *InstanceType) Overhead() v1.ResourceList {
return v1.ResourceList{}
return v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("10Mi"),
}
}
6 changes: 3 additions & 3 deletions pkg/controllers/provisioning/binpacking/packable.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceTyp
packables := []*Packable{}
for _, instanceType := range instanceTypes {
packable := PackableFor(instanceType)
// 1. First pass at filtering down to viable instance types;
// First pass at filtering down to viable instance types;
// additional filtering will be done by later steps (such as
// removing instance types that obviously lack resources, such
// as GPUs, for the workload being presented).
Expand All @@ -63,12 +63,12 @@ func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceTyp
); err != nil {
continue
}
// 2. Calculate Kubelet Overhead
// Calculate Kubelet Overhead
if ok := packable.reserve(instanceType.Overhead()); !ok {
logging.FromContext(ctx).Debugf("Excluding instance type %s because there are not enough resources for kubelet and system overhead", packable.Name())
continue
}
// 3. Calculate Daemonset Overhead
// Calculate Daemonset Overhead
if len(packable.Pack(schedule.Daemons).unpacked) > 0 {
logging.FromContext(ctx).Debugf("Excluding instance type %s because there are not enough resources for daemons", packable.Name())
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (s *Scheduler) Schedule(ctx context.Context, pod *v1.Pod) error {
var provisioner *Provisioner
var errs error
for _, candidate := range s.controller.List(ctx) {
if err := candidate.Spec.DeepCopy().Supports(pod); err != nil {
if err := candidate.Spec.DeepCopy().ValidatePod(pod); err != nil {
errs = multierr.Append(errs, fmt.Errorf("tried provisioner/%s: %w", candidate.Name, err))
} else {
provisioner = candidate
Expand Down
8 changes: 3 additions & 5 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *Scheduler) getSchedules(ctx context.Context, constraints *v1alpha5.Cons
// schedule uniqueness is tracked by hash(Constraints)
schedules := map[uint64]*Schedule{}
for _, pod := range pods {
if err := constraints.Supports(pod); err != nil {
if err := constraints.ValidatePod(pod); err != nil {
logging.FromContext(ctx).Infof("Unable to schedule pod %s/%s, %s", pod.Name, pod.Namespace, err.Error())
continue
}
Expand Down Expand Up @@ -147,17 +147,15 @@ func (s *Scheduler) getSchedules(ctx context.Context, constraints *v1alpha5.Cons
}

func (s *Scheduler) getDaemons(ctx context.Context, constraints *v1alpha5.Constraints) ([]*v1.Pod, error) {
// 1. Get DaemonSets
daemonSetList := &appsv1.DaemonSetList{}
if err := s.KubeClient.List(ctx, daemonSetList); err != nil {
return nil, fmt.Errorf("listing daemonsets, %w", err)
}

// 2. filter DaemonSets to include those that will schedule on this node
// Include daemonsets that will schedule on this node
pods := []*v1.Pod{}
for _, daemonSet := range daemonSetList.Items {
pod := &v1.Pod{Spec: daemonSet.Spec.Template.Spec}
if constraints.Supports(pod) != nil {
if constraints.ValidatePod(pod) == nil {
pods = append(pods, pod)
}
}
Expand Down
95 changes: 71 additions & 24 deletions pkg/controllers/provisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/awslabs/karpenter/pkg/test"
"github.com/awslabs/karpenter/pkg/utils/resources"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -134,33 +133,81 @@ var _ = Describe("Provisioning", func() {
ExpectScheduled(ctx, env.Client, pod)
}
})
It("should account for daemonsets", func() {
ExpectCreated(env.Client, &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "daemons", Namespace: "default"},
Spec: appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": "test"}},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": "test"}},
Spec: test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}).Spec,
Context("Daemonsets and Node Overhead", func() {
It("should account for overhead", func() {
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
})
for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner,
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}),
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}),
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}),
) {
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("4")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("4Gi")))
}
})
It("should not schedule if overhead is too large", func() {
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("10000"), v1.ResourceMemory: resource.MustParse("10000Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(test.PodOptions{}))[0]
ExpectNotScheduled(ctx, env.Client, pod)
})
It("should ignore daemonsets without matching tolerations", func() {
provisioner.Spec.Taints = v1alpha5.Taints{{Key: "foo", Value: "bar", Effect: v1.TaintEffectNoSchedule}}
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
Tolerations: []v1.Toleration{{Operator: v1.TolerationOperator(v1.NodeSelectorOpExists)}},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("2")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("2Gi")))
})
It("should ignore daemonsets with an invalid selector", func() {
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
NodeSelector: map[string]string{"node": "invalid"},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("2")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("2Gi")))
})
It("should ignore daemonsets that don't match pod constraints", func() {
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
NodeRequirements: []v1.NodeSelectorRequirement{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}}},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
NodeRequirements: []v1.NodeSelectorRequirement{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2"}}},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("2")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("2Gi")))
})
})
Context("Labels", func() {
It("should label nodes", func() {
Expand Down
64 changes: 64 additions & 0 deletions pkg/test/daemonsets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package test

import (
"fmt"
"strings"

"github.com/Pallinder/go-randomdata"
"github.com/imdario/mergo"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// DaemonSetOptions customizes a DaemonSet.
type DaemonSetOptions struct {
Name string
Namespace string
Selector map[string]string
PodOptions PodOptions
}

// DaemonSet creates a test pod with defaults that can be overriden by DaemonSetOptions.
// Overrides are applied in order, with a last write wins semantic.
func DaemonSet(overrides ...DaemonSetOptions) *appsv1.DaemonSet {
options := DaemonSetOptions{}
for _, opts := range overrides {
if err := mergo.Merge(&options, opts, mergo.WithOverride); err != nil {
panic(fmt.Sprintf("Failed to merge pod options: %s", err.Error()))
}
}
if options.Name == "" {
options.Name = strings.ToLower(randomdata.SillyName())
}
if options.Namespace == "" {
options.Namespace = "default"
}
if options.Selector == nil {
options.Selector = map[string]string{"app": options.Name}
}
return &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: options.Name, Namespace: options.Namespace},
Spec: appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: options.Selector},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: options.Selector},
Spec: Pod(options.PodOptions).Spec,
}},
}
}
6 changes: 6 additions & 0 deletions pkg/test/expectations/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

//nolint:revive,stylecheck
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -133,6 +134,11 @@ func ExpectCleanedUp(c client.Client) {
for i := range provisioners.Items {
ExpectDeleted(c, &provisioners.Items[i])
}
daemonsets := appsv1.DaemonSetList{}
Expect(c.List(ctx, &daemonsets)).To(Succeed())
for i := range daemonsets.Items {
ExpectDeleted(c, &daemonsets.Items[i])
}
}

func ExpectProvisioned(ctx context.Context, c client.Client, scheduler *provisioning.Scheduler, controller *provisioning.Controller, provisioner *v1alpha5.Provisioner, pods ...*v1.Pod) (result []*v1.Pod) {
Expand Down

0 comments on commit 3ed8570

Please sign in to comment.