Skip to content

Commit

Permalink
Fixed a bug where daemonsets weren't correctly factored in
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn committed Nov 19, 2021
1 parent db9765a commit 7139b27
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 32 deletions.
5 changes: 5 additions & 0 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func (c *CloudProvider) GetInstanceTypes(_ context.Context, _ *v1alpha5.Constrai
NewInstanceType(InstanceTypeOptions{
name: "default-instance-type",
}),
NewInstanceType(InstanceTypeOptions{
name: "small-instance-type",
cpu: resource.MustParse("2"),
memory: resource.MustParse("2Gi"),
}),
NewInstanceType(InstanceTypeOptions{
name: "nvidia-gpu-instance-type",
nvidiaGPUs: resource.MustParse("2"),
Expand Down
5 changes: 4 additions & 1 deletion pkg/cloudprovider/fake/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,8 @@ func (i *InstanceType) AWSNeurons() *resource.Quantity {
}

func (i *InstanceType) Overhead() v1.ResourceList {
return v1.ResourceList{}
return v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("10Mi"),
}
}
6 changes: 3 additions & 3 deletions pkg/controllers/provisioning/binpacking/packable.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceTyp
packables := []*Packable{}
for _, instanceType := range instanceTypes {
packable := PackableFor(instanceType)
// 1. First pass at filtering down to viable instance types;
// First pass at filtering down to viable instance types;
// additional filtering will be done by later steps (such as
// removing instance types that obviously lack resources, such
// as GPUs, for the workload being presented).
Expand All @@ -63,12 +63,12 @@ func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceTyp
); err != nil {
continue
}
// 2. Calculate Kubelet Overhead
// Calculate Kubelet Overhead
if ok := packable.reserve(instanceType.Overhead()); !ok {
logging.FromContext(ctx).Debugf("Excluding instance type %s because there are not enough resources for kubelet and system overhead", packable.Name())
continue
}
// 3. Calculate Daemonset Overhead
// Calculate Daemonset Overhead
if len(packable.Pack(schedule.Daemons).unpacked) > 0 {
logging.FromContext(ctx).Debugf("Excluding instance type %s because there are not enough resources for daemons", packable.Name())
continue
Expand Down
6 changes: 2 additions & 4 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,15 @@ func (s *Scheduler) getSchedules(ctx context.Context, constraints *v1alpha5.Cons
}

func (s *Scheduler) getDaemons(ctx context.Context, constraints *v1alpha5.Constraints) ([]*v1.Pod, error) {
// 1. Get DaemonSets
daemonSetList := &appsv1.DaemonSetList{}
if err := s.KubeClient.List(ctx, daemonSetList); err != nil {
return nil, fmt.Errorf("listing daemonsets, %w", err)
}

// 2. filter DaemonSets to include those that will schedule on this node
// Include daemonsets that will schedule on this node
pods := []*v1.Pod{}
for _, daemonSet := range daemonSetList.Items {
pod := &v1.Pod{Spec: daemonSet.Spec.Template.Spec}
if constraints.Supports(pod) != nil {
if constraints.Supports(pod) == nil {
pods = append(pods, pod)
}
}
Expand Down
95 changes: 71 additions & 24 deletions pkg/controllers/provisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/awslabs/karpenter/pkg/test"
"github.com/awslabs/karpenter/pkg/utils/resources"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -134,33 +133,81 @@ var _ = Describe("Provisioning", func() {
ExpectScheduled(ctx, env.Client, pod)
}
})
It("should account for daemonsets", func() {
ExpectCreated(env.Client, &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "daemons", Namespace: "default"},
Spec: appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": "test"}},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": "test"}},
Spec: test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}).Spec,
Context("Daemonsets and Node Overhead", func() {
It("should account for overhead", func() {
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
})
for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner,
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}),
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}),
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}),
) {
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("4")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("4Gi")))
}
})
It("should not schedule if overhead is too large", func() {
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("10"), v1.ResourceMemory: resource.MustParse("10Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(test.PodOptions{}))[0]
ExpectNotScheduled(ctx, env.Client, pod)
})
It("should ignore daemonsets without matching tolerations", func() {
provisioner.Spec.Taints = v1alpha5.Taints{{Key: "foo", Value: "bar", Effect: v1.TaintEffectNoSchedule}}
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
Tolerations: []v1.Toleration{{Operator: v1.TolerationOperator(v1.NodeSelectorOpExists)}},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("2")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("2Gi")))
})
It("should ignore daemonsets an invalid selector", func() {
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
NodeSelector: map[string]string{"node": "invalid"},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("2")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("2Gi")))
})
It("should ignore daemonsets that don't match pod constraints", func() {
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
NodeRequirements: []v1.NodeSelectorRequirement{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}}},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
NodeRequirements: []v1.NodeSelectorRequirement{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2"}}},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("2")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("2Gi")))
})
})
Context("Labels", func() {
It("should label nodes", func() {
Expand Down
64 changes: 64 additions & 0 deletions pkg/test/daemonsets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package test

import (
"fmt"
"strings"

"github.com/Pallinder/go-randomdata"
"github.com/imdario/mergo"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// DaemonSetOptions customizes a DaemonSet.
type DaemonSetOptions struct {
Name string
Namespace string
Selector map[string]string
PodOptions PodOptions
}

// DaemonSet creates a test pod with defaults that can be overriden by DaemonSetOptions.
// Overrides are applied in order, with a last write wins semantic.
func DaemonSet(overrides ...DaemonSetOptions) *appsv1.DaemonSet {
options := DaemonSetOptions{}
for _, opts := range overrides {
if err := mergo.Merge(&options, opts, mergo.WithOverride); err != nil {
panic(fmt.Sprintf("Failed to merge pod options: %s", err.Error()))
}
}
if options.Name == "" {
options.Name = strings.ToLower(randomdata.SillyName())
}
if options.Namespace == "" {
options.Namespace = "default"
}
if options.Selector == nil {
options.Selector = map[string]string{"app": options.Name}
}
return &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: options.Name, Namespace: options.Namespace},
Spec: appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: options.Selector},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: options.Selector},
Spec: Pod(options.PodOptions).Spec,
}},
}
}
6 changes: 6 additions & 0 deletions pkg/test/expectations/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

//nolint:revive,stylecheck
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -133,6 +134,11 @@ func ExpectCleanedUp(c client.Client) {
for i := range provisioners.Items {
ExpectDeleted(c, &provisioners.Items[i])
}
daemonsets := appsv1.DaemonSetList{}
Expect(c.List(ctx, &daemonsets)).To(Succeed())
for i := range daemonsets.Items {
ExpectDeleted(c, &daemonsets.Items[i])
}
}

func ExpectProvisioned(ctx context.Context, c client.Client, scheduler *provisioning.Scheduler, controller *provisioning.Controller, provisioner *v1alpha5.Provisioner, pods ...*v1.Pod) (result []*v1.Pod) {
Expand Down

0 comments on commit 7139b27

Please sign in to comment.