Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed a bug where daemonsets weren't correctly included in binpacking calculations #814

Merged
merged 2 commits into from
Nov 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/apis/provisioning/v1alpha5/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type Constraints struct {
Provider *runtime.RawExtension `json:"provider,omitempty"`
}

// Supports returns true if the pod's requirements are met by the constraints
func (c *Constraints) Supports(pod *v1.Pod) error {
// ValidatePod returns an error if the pod's requirements are not met by the constraints
func (c *Constraints) ValidatePod(pod *v1.Pod) error {
// Tolerate Taints
if err := c.Taints.Tolerates(pod); err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func (c *CloudProvider) GetInstanceTypes(_ context.Context, _ *v1alpha5.Constrai
NewInstanceType(InstanceTypeOptions{
name: "default-instance-type",
}),
NewInstanceType(InstanceTypeOptions{
name: "small-instance-type",
cpu: resource.MustParse("2"),
memory: resource.MustParse("2Gi"),
}),
NewInstanceType(InstanceTypeOptions{
name: "nvidia-gpu-instance-type",
nvidiaGPUs: resource.MustParse("2"),
Expand Down
5 changes: 4 additions & 1 deletion pkg/cloudprovider/fake/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,8 @@ func (i *InstanceType) AWSNeurons() *resource.Quantity {
}

func (i *InstanceType) Overhead() v1.ResourceList {
return v1.ResourceList{}
return v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("10Mi"),
}
}
6 changes: 3 additions & 3 deletions pkg/controllers/provisioning/binpacking/packable.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceTyp
packables := []*Packable{}
for _, instanceType := range instanceTypes {
packable := PackableFor(instanceType)
// 1. First pass at filtering down to viable instance types;
// First pass at filtering down to viable instance types;
// additional filtering will be done by later steps (such as
// removing instance types that obviously lack resources, such
// as GPUs, for the workload being presented).
Expand All @@ -63,12 +63,12 @@ func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceTyp
); err != nil {
continue
}
// 2. Calculate Kubelet Overhead
// Calculate Kubelet Overhead
if ok := packable.reserve(instanceType.Overhead()); !ok {
logging.FromContext(ctx).Debugf("Excluding instance type %s because there are not enough resources for kubelet and system overhead", packable.Name())
continue
}
// 3. Calculate Daemonset Overhead
// Calculate Daemonset Overhead
if len(packable.Pack(schedule.Daemons).unpacked) > 0 {
logging.FromContext(ctx).Debugf("Excluding instance type %s because there are not enough resources for daemons", packable.Name())
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (s *Scheduler) Schedule(ctx context.Context, pod *v1.Pod) error {
var provisioner *Provisioner
var errs error
for _, candidate := range s.controller.List(ctx) {
if err := candidate.Spec.DeepCopy().Supports(pod); err != nil {
if err := candidate.Spec.DeepCopy().ValidatePod(pod); err != nil {
errs = multierr.Append(errs, fmt.Errorf("tried provisioner/%s: %w", candidate.Name, err))
} else {
provisioner = candidate
Expand Down
8 changes: 3 additions & 5 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *Scheduler) getSchedules(ctx context.Context, constraints *v1alpha5.Cons
// schedule uniqueness is tracked by hash(Constraints)
schedules := map[uint64]*Schedule{}
for _, pod := range pods {
if err := constraints.Supports(pod); err != nil {
if err := constraints.ValidatePod(pod); err != nil {
logging.FromContext(ctx).Infof("Unable to schedule pod %s/%s, %s", pod.Name, pod.Namespace, err.Error())
continue
}
Expand Down Expand Up @@ -147,17 +147,15 @@ func (s *Scheduler) getSchedules(ctx context.Context, constraints *v1alpha5.Cons
}

func (s *Scheduler) getDaemons(ctx context.Context, constraints *v1alpha5.Constraints) ([]*v1.Pod, error) {
// 1. Get DaemonSets
daemonSetList := &appsv1.DaemonSetList{}
if err := s.KubeClient.List(ctx, daemonSetList); err != nil {
return nil, fmt.Errorf("listing daemonsets, %w", err)
}

// 2. filter DaemonSets to include those that will schedule on this node
// Include daemonsets that will schedule on this node
pods := []*v1.Pod{}
for _, daemonSet := range daemonSetList.Items {
pod := &v1.Pod{Spec: daemonSet.Spec.Template.Spec}
if constraints.Supports(pod) != nil {
if constraints.ValidatePod(pod) == nil {
pods = append(pods, pod)
}
}
Expand Down
95 changes: 71 additions & 24 deletions pkg/controllers/provisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/awslabs/karpenter/pkg/test"
"github.com/awslabs/karpenter/pkg/utils/resources"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -134,33 +133,81 @@ var _ = Describe("Provisioning", func() {
ExpectScheduled(ctx, env.Client, pod)
}
})
It("should account for daemonsets", func() {
ExpectCreated(env.Client, &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "daemons", Namespace: "default"},
Spec: appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": "test"}},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": "test"}},
Spec: test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}).Spec,
Context("Daemonsets and Node Overhead", func() {
It("should account for overhead", func() {
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
})
for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner,
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}),
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}),
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}),
) {
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("4")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("4Gi")))
}
})
It("should not schedule if overhead is too large", func() {
JacobGabrielson marked this conversation as resolved.
Show resolved Hide resolved
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("10000"), v1.ResourceMemory: resource.MustParse("10000Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(test.PodOptions{}))[0]
ExpectNotScheduled(ctx, env.Client, pod)
})
It("should ignore daemonsets without matching tolerations", func() {
provisioner.Spec.Taints = v1alpha5.Taints{{Key: "foo", Value: "bar", Effect: v1.TaintEffectNoSchedule}}
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
Tolerations: []v1.Toleration{{Operator: v1.TolerationOperator(v1.NodeSelectorOpExists)}},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("2")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("2Gi")))
})
It("should ignore daemonsets with an invalid selector", func() {
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
NodeSelector: map[string]string{"node": "invalid"},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("2")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("2Gi")))
})
It("should ignore daemonsets that don't match pod constraints", func() {
ExpectCreated(env.Client, test.DaemonSet(
test.DaemonSetOptions{PodOptions: test.PodOptions{
NodeRequirements: []v1.NodeSelectorRequirement{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}}},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
}},
))
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(
test.PodOptions{
NodeRequirements: []v1.NodeSelectorRequirement{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2"}}},
ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}},
},
))[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("2")))
Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("2Gi")))
})
})
Context("Labels", func() {
It("should label nodes", func() {
Expand Down
64 changes: 64 additions & 0 deletions pkg/test/daemonsets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package test

import (
"fmt"
"strings"

"github.com/Pallinder/go-randomdata"
"github.com/imdario/mergo"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// DaemonSetOptions customizes a DaemonSet.
type DaemonSetOptions struct {
Name string
Namespace string
Selector map[string]string
PodOptions PodOptions
}

// DaemonSet creates a test pod with defaults that can be overriden by DaemonSetOptions.
// Overrides are applied in order, with a last write wins semantic.
func DaemonSet(overrides ...DaemonSetOptions) *appsv1.DaemonSet {
options := DaemonSetOptions{}
for _, opts := range overrides {
if err := mergo.Merge(&options, opts, mergo.WithOverride); err != nil {
panic(fmt.Sprintf("Failed to merge pod options: %s", err.Error()))
}
}
if options.Name == "" {
options.Name = strings.ToLower(randomdata.SillyName())
}
if options.Namespace == "" {
options.Namespace = "default"
}
if options.Selector == nil {
options.Selector = map[string]string{"app": options.Name}
}
return &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: options.Name, Namespace: options.Namespace},
Spec: appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: options.Selector},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: options.Selector},
Spec: Pod(options.PodOptions).Spec,
}},
}
}
6 changes: 6 additions & 0 deletions pkg/test/expectations/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

//nolint:revive,stylecheck
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -133,6 +134,11 @@ func ExpectCleanedUp(c client.Client) {
for i := range provisioners.Items {
ExpectDeleted(c, &provisioners.Items[i])
}
daemonsets := appsv1.DaemonSetList{}
Expect(c.List(ctx, &daemonsets)).To(Succeed())
for i := range daemonsets.Items {
ExpectDeleted(c, &daemonsets.Items[i])
}
}

func ExpectProvisioned(ctx context.Context, c client.Client, scheduler *provisioning.Scheduler, controller *provisioning.Controller, provisioner *v1alpha5.Provisioner, pods ...*v1.Pod) (result []*v1.Pod) {
Expand Down