Skip to content

Commit

Permalink
Fixed a bug where daemonsets weren't correctly factored in
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn committed Nov 19, 2021
1 parent db9765a commit cea24cb
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 32 deletions.
5 changes: 5 additions & 0 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func (c *CloudProvider) GetInstanceTypes(_ context.Context, _ *v1alpha5.Constrai
NewInstanceType(InstanceTypeOptions{
name: "default-instance-type",
}),
NewInstanceType(InstanceTypeOptions{
name: "small-instance-type",
cpu: resource.MustParse("2"),
memory: resource.MustParse("2Gi"),
}),
NewInstanceType(InstanceTypeOptions{
name: "nvidia-gpu-instance-type",
nvidiaGPUs: resource.MustParse("2"),
Expand Down
5 changes: 4 additions & 1 deletion pkg/cloudprovider/fake/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,8 @@ func (i *InstanceType) AWSNeurons() *resource.Quantity {
}

func (i *InstanceType) Overhead() v1.ResourceList {
return v1.ResourceList{}
return v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("10Mi"),
}
}
6 changes: 3 additions & 3 deletions pkg/controllers/provisioning/binpacking/packable.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceTyp
packables := []*Packable{}
for _, instanceType := range instanceTypes {
packable := PackableFor(instanceType)
// 1. First pass at filtering down to viable instance types;
// First pass at filtering down to viable instance types;
// additional filtering will be done by later steps (such as
// removing instance types that obviously lack resources, such
// as GPUs, for the workload being presented).
Expand All @@ -63,12 +63,12 @@ func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceTyp
); err != nil {
continue
}
// 2. Calculate Kubelet Overhead
// Calculate Kubelet Overhead
if ok := packable.reserve(instanceType.Overhead()); !ok {
logging.FromContext(ctx).Debugf("Excluding instance type %s because there are not enough resources for kubelet and system overhead", packable.Name())
continue
}
// 3. Calculate Daemonset Overhead
// Calculate Daemonset Overhead
if len(packable.Pack(schedule.Daemons).unpacked) > 0 {
logging.FromContext(ctx).Debugf("Excluding instance type %s because there are not enough resources for daemons", packable.Name())
continue
Expand Down
6 changes: 2 additions & 4 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,15 @@ func (s *Scheduler) getSchedules(ctx context.Context, constraints *v1alpha5.Cons
}

func (s *Scheduler) getDaemons(ctx context.Context, constraints *v1alpha5.Constraints) ([]*v1.Pod, error) {
// 1. Get DaemonSets
daemonSetList := &appsv1.DaemonSetList{}
if err := s.KubeClient.List(ctx, daemonSetList); err != nil {
return nil, fmt.Errorf("listing daemonsets, %w", err)
}

// 2. filter DaemonSets to include those that will schedule on this node
// Include daemonsets that will schedule on this node
pods := []*v1.Pod{}
for _, daemonSet := range daemonSetList.Items {
pod := &v1.Pod{Spec: daemonSet.Spec.Template.Spec}
if constraints.Supports(pod) != nil {
if constraints.Supports(pod) == nil {
pods = append(pods, pod)
}
}
Expand Down
95 changes: 71 additions & 24 deletions pkg/controllers/provisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/awslabs/karpenter/pkg/test"
"github.com/awslabs/karpenter/pkg/utils/resources"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -134,33 +133,81 @@ var _ = Describe("Provisioning", func() {
ExpectScheduled(ctx, env.Client, pod)
}
})
It("should account for daemonsets", func() {
ExpectCreated(env.Client, &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "daemons", Namespace: "default"},
Spec: appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": "test"}},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": "test"}},
Spec: test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}).Spec,
Context("Daemonsets and Node Overhead", func() {
It("should account for overhead", func() {
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
})
for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner,
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}),
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}),
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}),
) {
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("4")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("4Gi")))
}
})
It("should not schedule if overhead is too large", func() {
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("10"), v1.ResourceMemory: resource.MustParse("10Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(test.PodOptions{}))[0]
ExpectNotScheduled(ctx, env.Client, pod)
})
It("should ignore daemonsets without matching tolerations", func() {
provisioner.Spec.Taints = v1alpha5.Taints{{Key: "foo", Value: "bar", Effect: v1.TaintEffectNoSchedule}}
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
Tolerations: []v1.Toleration{{Operator: v1.TolerationOperator(v1.NodeSelectorOpExists)}},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("2")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("2Gi")))
})
It("should ignore daemonsets an invalid selector", func() {
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
NodeSelector: map[string]string{"node": "invalid"},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("2")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("2Gi")))
})
It("should ignore daemonsets that don't match pod constraints", func() {
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
NodeRequirements: []v1.NodeSelectorRequirement{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}}},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
NodeRequirements: []v1.NodeSelectorRequirement{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2"}}},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("2")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("2Gi")))
})
})
Context("Labels", func() {
It("should label nodes", func() {
Expand Down
50 changes: 50 additions & 0 deletions pkg/test/daemonsets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package test

import (
"fmt"
"strings"

"github.com/Pallinder/go-randomdata"
"github.com/imdario/mergo"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// DaemonSetOptions customizes a DaemonSet.
type DaemonSetOptions struct {
Name string
Namespace string
Selector map[string]string
PodOptions PodOptions
}

// DaemonSet creates a test pod with defaults that can be overriden by DaemonSetOptions.
// Overrides are applied in order, with a last write wins semantic.
func DaemonSet(overrides ...DaemonSetOptions) *appsv1.DaemonSet {
options := DaemonSetOptions{}
for _, opts := range overrides {
if err := mergo.Merge(&options, opts, mergo.WithOverride); err != nil {
panic(fmt.Sprintf("Failed to merge pod options: %s", err.Error()))
}
}
if options.Name == "" {
options.Name = strings.ToLower(randomdata.SillyName())
}
if options.Namespace == "" {
options.Namespace = "default"
}
if options.Selector == nil {
options.Selector = map[string]string{"app": options.Name}
}
return &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: options.Name, Namespace: options.Namespace},
Spec: appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: options.Selector},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: options.Selector},
Spec: Pod(options.PodOptions).Spec,
}},
}
}
6 changes: 6 additions & 0 deletions pkg/test/expectations/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

//nolint:revive,stylecheck
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -133,6 +134,11 @@ func ExpectCleanedUp(c client.Client) {
for i := range provisioners.Items {
ExpectDeleted(c, &provisioners.Items[i])
}
daemonsets := appsv1.DaemonSetList{}
Expect(c.List(ctx, &daemonsets)).To(Succeed())
for i := range daemonsets.Items {
ExpectDeleted(c, &daemonsets.Items[i])
}
}

func ExpectProvisioned(ctx context.Context, c client.Client, scheduler *provisioning.Scheduler, controller *provisioning.Controller, provisioner *v1alpha5.Provisioner, pods ...*v1.Pod) (result []*v1.Pod) {
Expand Down

0 comments on commit cea24cb

Please sign in to comment.