Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First phase of scheduler refactor #931

Merged
merged 1 commit into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 30 additions & 30 deletions pkg/controllers/provisioning/binpacking/packable.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"context"
"fmt"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/controllers/provisioning/scheduling"
"github.com/aws/karpenter/pkg/utils/resources"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
Expand All @@ -39,9 +39,9 @@ type Result struct {
unpacked []*v1.Pod
}

// PackablesFor creates viable packables for the provided schedule, excluding
// those that can't fit resources or violate schedule.
func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceType, schedule *scheduling.Schedule) []*Packable {
// PackablesFor creates viable packables for the provided constraints, excluding
// those that can't fit resources or violate constraints.
func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceType, constraints *v1alpha5.Constraints, pods []*v1.Pod, daemons []*v1.Pod) []*Packable {
packables := []*Packable{}
for _, instanceType := range instanceTypes {
packable := PackableFor(instanceType)
Expand All @@ -50,16 +50,16 @@ func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceTyp
// removing instance types that obviously lack resources, such
// as GPUs, for the workload being presented).
if err := multierr.Combine(
packable.validateZones(schedule),
packable.validateInstanceType(schedule),
packable.validateArchitecture(schedule),
packable.validateCapacityTypes(schedule),
packable.validateZones(constraints),
packable.validateInstanceType(constraints),
packable.validateArchitecture(constraints),
packable.validateCapacityTypes(constraints),
// Although this will remove instances that have GPUs when
// not required, removal of instance types that *lack*
// GPUs will be done later.
packable.validateNvidiaGpus(schedule),
packable.validateAMDGpus(schedule),
packable.validateAWSNeurons(schedule),
packable.validateNvidiaGpus(pods),
packable.validateAMDGpus(pods),
packable.validateAWSNeurons(pods),
); err != nil {
continue
}
Expand All @@ -69,7 +69,7 @@ func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceTyp
continue
}
// Calculate Daemonset Overhead
if len(packable.Pack(schedule.Daemons).unpacked) > 0 {
if len(packable.Pack(daemons).unpacked) > 0 {
logging.FromContext(ctx).Debugf("Excluding instance type %s because there are not enough resources for daemons", packable.Name())
continue
}
Expand Down Expand Up @@ -151,47 +151,47 @@ func (p *Packable) reservePod(pod *v1.Pod) bool {
return p.reserve(requests)
}

func (p *Packable) validateInstanceType(schedule *scheduling.Schedule) error {
if !schedule.Requirements.InstanceTypes().Has(p.Name()) {
return fmt.Errorf("instance type %s is not in %v", p.Name(), schedule.Requirements.InstanceTypes().List())
func (p *Packable) validateInstanceType(constraints *v1alpha5.Constraints) error {
if !constraints.Requirements.InstanceTypes().Has(p.Name()) {
return fmt.Errorf("instance type %s is not in %v", p.Name(), constraints.Requirements.InstanceTypes().List())
}
return nil
}

func (p *Packable) validateArchitecture(schedule *scheduling.Schedule) error {
if !schedule.Requirements.Architectures().Has(p.Architecture()) {
return fmt.Errorf("architecture %s is not in %v", p.Architecture(), schedule.Requirements.Architectures().List())
func (p *Packable) validateArchitecture(constraints *v1alpha5.Constraints) error {
if !constraints.Requirements.Architectures().Has(p.Architecture()) {
return fmt.Errorf("architecture %s is not in %v", p.Architecture(), constraints.Requirements.Architectures().List())
}
return nil
}

func (p *Packable) validateZones(schedule *scheduling.Schedule) error {
func (p *Packable) validateZones(constraints *v1alpha5.Constraints) error {
zones := sets.String{}
for _, offering := range p.Offerings() {
zones.Insert(offering.Zone)
}
if schedule.Requirements.Zones().Intersection(zones).Len() == 0 {
return fmt.Errorf("zones %v are not in %v", zones, schedule.Requirements.Zones().List())
if constraints.Requirements.Zones().Intersection(zones).Len() == 0 {
return fmt.Errorf("zones %v are not in %v", zones, constraints.Requirements.Zones().List())
}
return nil
}

func (p *Packable) validateCapacityTypes(schedule *scheduling.Schedule) error {
func (p *Packable) validateCapacityTypes(constraints *v1alpha5.Constraints) error {
capacityTypes := sets.String{}
for _, offering := range p.Offerings() {
capacityTypes.Insert(offering.CapacityType)
}
if schedule.Requirements.CapacityTypes().Intersection(capacityTypes).Len() == 0 {
return fmt.Errorf("capacity types %v are not in %v", capacityTypes, schedule.Requirements.CapacityTypes().List())
if constraints.Requirements.CapacityTypes().Intersection(capacityTypes).Len() == 0 {
return fmt.Errorf("capacity types %v are not in %v", capacityTypes, constraints.Requirements.CapacityTypes().List())
}
return nil
}

func (p *Packable) validateNvidiaGpus(schedule *scheduling.Schedule) error {
func (p *Packable) validateNvidiaGpus(pods []*v1.Pod) error {
if p.InstanceType.NvidiaGPUs().IsZero() {
return nil
}
for _, pod := range schedule.Pods {
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
if _, ok := container.Resources.Requests[resources.NvidiaGPU]; ok {
return nil
Expand All @@ -201,11 +201,11 @@ func (p *Packable) validateNvidiaGpus(schedule *scheduling.Schedule) error {
return fmt.Errorf("nvidia gpu is not required")
}

func (p *Packable) validateAMDGpus(schedule *scheduling.Schedule) error {
func (p *Packable) validateAMDGpus(pods []*v1.Pod) error {
if p.InstanceType.AMDGPUs().IsZero() {
return nil
}
for _, pod := range schedule.Pods {
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
if _, ok := container.Resources.Requests[resources.AMDGPU]; ok {
return nil
Expand All @@ -215,11 +215,11 @@ func (p *Packable) validateAMDGpus(schedule *scheduling.Schedule) error {
return fmt.Errorf("amd gpu is not required")
}

func (p *Packable) validateAWSNeurons(schedule *scheduling.Schedule) error {
func (p *Packable) validateAWSNeurons(pods []*v1.Pod) error {
if p.InstanceType.AWSNeurons().IsZero() {
return nil
}
for _, pod := range schedule.Pods {
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
if _, ok := container.Resources.Requests[resources.AWSNeuron]; ok {
return nil
Expand Down
55 changes: 43 additions & 12 deletions pkg/controllers/provisioning/binpacking/packer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@ package binpacking

import (
"context"
"fmt"
"math"
"sort"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/controllers/provisioning/scheduling"
"github.com/aws/karpenter/pkg/metrics"
"github.com/aws/karpenter/pkg/utils/apiobject"
"github.com/aws/karpenter/pkg/utils/injection"
"github.com/aws/karpenter/pkg/utils/resources"
"github.com/mitchellh/hashstructure/v2"
"github.com/prometheus/client_golang/prometheus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -55,8 +56,14 @@ func init() {
crmetrics.Registry.MustRegister(packDuration)
}

func NewPacker(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Packer {
return &Packer{kubeClient: kubeClient, cloudProvider: cloudProvider}
}

// Packer packs pods and calculates efficient placement on the instances.
type Packer struct {
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
}

// Packing is a binpacking solution of equivalently schedulable pods to a set of
Expand All @@ -66,7 +73,6 @@ type Packing struct {
Pods [][]*v1.Pod `hash:"ignore"`
NodeQuantity int `hash:"ignore"`
InstanceTypeOptions []cloudprovider.InstanceType
Constraints *v1alpha5.Constraints
}

// Pack returns the node packings for the provided pods. It computes a set of viable
Expand All @@ -75,19 +81,28 @@ type Packing struct {
// Pods provided are all schedulable in the same zone as tightly as possible.
// It follows the First Fit Decreasing bin packing technique, reference-
// https://en.wikipedia.org/wiki/Bin_packing_problem#First_Fit_Decreasing_(FFD)
func (p *Packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instances []cloudprovider.InstanceType) []*Packing {
func (p *Packer) Pack(ctx context.Context, constraints *v1alpha5.Constraints, pods []*v1.Pod) ([]*Packing, error) {
defer metrics.Measure(packDuration.WithLabelValues(injection.GetNamespacedName(ctx).Name))()

// Get instance type options
instanceTypes, err := p.cloudProvider.GetInstanceTypes(ctx, constraints)
if err != nil {
return nil, fmt.Errorf("getting instance types, %w", err)
}
// Get daemons for overhead calculations
daemons, err := p.getDaemons(ctx, constraints)
if err != nil {
return nil, fmt.Errorf("getting schedulable daemon pods, %w", err)
}
// Sort pods in decreasing order by the amount of CPU requested, if
// CPU requested is equal compare memory requested.
sort.Sort(sort.Reverse(ByResourcesRequested{SortablePods: schedule.Pods}))
sort.Sort(sort.Reverse(ByResourcesRequested{SortablePods: pods}))
packs := map[uint64]*Packing{}
var packings []*Packing
var packing *Packing
remainingPods := schedule.Pods
remainingPods := pods
for len(remainingPods) > 0 {
packables := PackablesFor(ctx, instances, schedule)
packing, remainingPods = p.packWithLargestPod(schedule.Constraints, remainingPods, packables)
packables := PackablesFor(ctx, instanceTypes, constraints, pods, daemons)
packing, remainingPods = p.packWithLargestPod(remainingPods, packables)
// checked all instance types and found no packing option
if flattenedLen(packing.Pods...) == 0 {
logging.FromContext(ctx).Errorf("Failed to compute packing, pod(s) %s did not fit in instance type option(s) %v", apiobject.PodNamespacedNames(remainingPods), packableNames(packables))
Expand All @@ -96,7 +111,7 @@ func (p *Packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instan
}
key, err := hashstructure.Hash(packing, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
if err != nil {
logging.FromContext(ctx).Fatalf("Unable to hash packings while binpacking: %s", err.Error())
return nil, fmt.Errorf("hashing packings, %w", err)
}
if mainPack, ok := packs[key]; ok {
mainPack.NodeQuantity++
Expand All @@ -109,13 +124,29 @@ func (p *Packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instan
for _, pack := range packings {
logging.FromContext(ctx).Infof("Computed packing of %d node(s) for %d pod(s) with instance type option(s) %s", pack.NodeQuantity, flattenedLen(pack.Pods...), instanceTypeNames(pack.InstanceTypeOptions))
}
return packings
return packings, nil
}

func (p *Packer) getDaemons(ctx context.Context, constraints *v1alpha5.Constraints) ([]*v1.Pod, error) {
daemonSetList := &appsv1.DaemonSetList{}
if err := p.kubeClient.List(ctx, daemonSetList); err != nil {
return nil, fmt.Errorf("listing daemonsets, %w", err)
}
// Include DaemonSets that will schedule on this node
pods := []*v1.Pod{}
for _, daemonSet := range daemonSetList.Items {
pod := &v1.Pod{Spec: daemonSet.Spec.Template.Spec}
if err := constraints.ValidatePod(pod); err == nil {
pods = append(pods, pod)
}
}
return pods, nil
}

// packWithLargestPod will try to pack max number of pods with largest pod in
// pods across all available node capacities. It returns Packing: max pod count
// that fit; with their node capacities and list of leftover pods
func (p *Packer) packWithLargestPod(constraints *v1alpha5.Constraints, unpackedPods []*v1.Pod, packables []*Packable) (*Packing, []*v1.Pod) {
func (p *Packer) packWithLargestPod(unpackedPods []*v1.Pod, packables []*Packable) (*Packing, []*v1.Pod) {
bestPackedPods := []*v1.Pod{}
bestInstances := []cloudprovider.InstanceType{}
remainingPods := unpackedPods
Expand Down Expand Up @@ -143,7 +174,7 @@ func (p *Packer) packWithLargestPod(constraints *v1alpha5.Constraints, unpackedP
if len(bestInstances) > MaxInstanceTypes {
bestInstances = bestInstances[:MaxInstanceTypes]
}
return &Packing{Pods: [][]*v1.Pod{bestPackedPods}, Constraints: constraints, InstanceTypeOptions: bestInstances, NodeQuantity: 1}, remainingPods
return &Packing{Pods: [][]*v1.Pod{bestPackedPods}, InstanceTypeOptions: bestInstances, NodeQuantity: 1}, remainingPods
}

func (*Packer) podsMatch(first, second []*v1.Pod) bool {
Expand Down
34 changes: 8 additions & 26 deletions pkg/controllers/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/controllers/provisioning/binpacking"
"github.com/aws/karpenter/pkg/controllers/provisioning/scheduling"
"github.com/aws/karpenter/pkg/utils/functional"
"github.com/aws/karpenter/pkg/utils/injection"
Expand All @@ -44,7 +43,7 @@ type Controller struct {
ctx context.Context
provisioners *sync.Map
scheduler *scheduling.Scheduler
launcher *Launcher
coreV1Client corev1.CoreV1Interface
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
}
Expand All @@ -55,9 +54,9 @@ func NewController(ctx context.Context, kubeClient client.Client, coreV1Client c
ctx: ctx,
provisioners: &sync.Map{},
kubeClient: kubeClient,
coreV1Client: coreV1Client,
cloudProvider: cloudProvider,
scheduler: scheduling.NewScheduler(kubeClient, cloudProvider),
launcher: &Launcher{KubeClient: kubeClient, CoreV1Client: coreV1Client, CloudProvider: cloudProvider, Packer: &binpacking.Packer{}},
}
}

Expand All @@ -69,7 +68,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
provisioner := &v1alpha5.Provisioner{}
if err := c.kubeClient.Get(ctx, req.NamespacedName, provisioner); err != nil {
if errors.IsNotFound(err) {
c.Delete(ctx, req.Name)
c.Delete(req.Name)
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
Expand All @@ -82,7 +81,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
}

// Delete stops and removes a provisioner. Enqueued pods will be provisioned.
func (c *Controller) Delete(ctx context.Context, name string) {
func (c *Controller) Delete(name string) {
if p, ok := c.provisioners.LoadAndDelete(name); ok {
p.(*Provisioner).Stop()
}
Expand All @@ -100,27 +99,10 @@ func (c *Controller) Apply(ctx context.Context, provisioner *v1alpha5.Provisione
With(requirements(instanceTypes)).
With(v1alpha5.LabelRequirements(provisioner.Spec.Labels)).
Consolidate()
if !c.hasChanged(ctx, provisioner) {
// If the provisionerSpecs haven't changed, we don't need to stop and drain the current Provisioner.
return nil
}
ctx, cancelFunc := context.WithCancel(ctx)
p := &Provisioner{
Provisioner: provisioner,
pods: make(chan *v1.Pod),
results: make(chan error),
done: ctx.Done(),
Stop: cancelFunc,
cloudProvider: c.cloudProvider,
kubeClient: c.kubeClient,
scheduler: c.scheduler,
launcher: c.launcher,
}
p.Start(ctx)
// Update the provisioner; stop and drain an existing provisioner if exists.
if existing, ok := c.provisioners.LoadOrStore(provisioner.Name, p); ok {
c.provisioners.Store(provisioner.Name, p)
existing.(*Provisioner).Stop()
// Update the provisioner if anything has changed
if c.hasChanged(ctx, provisioner) {
c.Delete(provisioner.Name)
c.provisioners.Store(provisioner.Name, NewProvisioner(ctx, provisioner, c.kubeClient, c.coreV1Client, c.cloudProvider))
}
return nil
}
Expand Down
Loading