From 6dd0dfa5941425b2cf6efd272a7df981ee681d60 Mon Sep 17 00:00:00 2001 From: Ellis Tarn Date: Mon, 6 Dec 2021 15:54:52 -0800 Subject: [PATCH] First phase of scheduler refactor --- .../provisioning/binpacking/packable.go | 60 +++---- .../provisioning/binpacking/packer.go | 55 ++++-- pkg/controllers/provisioning/controller.go | 34 +--- pkg/controllers/provisioning/launcher.go | 145 --------------- pkg/controllers/provisioning/provisioner.go | 165 +++++++++++++++--- .../provisioning/scheduling/scheduler.go | 32 +--- 6 files changed, 223 insertions(+), 268 deletions(-) delete mode 100644 pkg/controllers/provisioning/launcher.go diff --git a/pkg/controllers/provisioning/binpacking/packable.go b/pkg/controllers/provisioning/binpacking/packable.go index affac7d84e51..394aaf3da3a0 100644 --- a/pkg/controllers/provisioning/binpacking/packable.go +++ b/pkg/controllers/provisioning/binpacking/packable.go @@ -18,8 +18,8 @@ import ( "context" "fmt" + "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" - "github.com/aws/karpenter/pkg/controllers/provisioning/scheduling" "github.com/aws/karpenter/pkg/utils/resources" "go.uber.org/multierr" v1 "k8s.io/api/core/v1" @@ -39,9 +39,9 @@ type Result struct { unpacked []*v1.Pod } -// PackablesFor creates viable packables for the provided schedule, excluding -// those that can't fit resources or violate schedule. -func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceType, schedule *scheduling.Schedule) []*Packable { +// PackablesFor creates viable packables for the provided constraints, excluding +// those that can't fit resources or violate constraints. +func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceType, constraints *v1alpha5.Constraints, pods []*v1.Pod, daemons []*v1.Pod) []*Packable { packables := []*Packable{} for _, instanceType := range instanceTypes { packable := PackableFor(instanceType) @@ -50,16 +50,16 @@ func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceTyp // removing instance types that obviously lack resources, such // as GPUs, for the workload being presented). if err := multierr.Combine( - packable.validateZones(schedule), - packable.validateInstanceType(schedule), - packable.validateArchitecture(schedule), - packable.validateCapacityTypes(schedule), + packable.validateZones(constraints), + packable.validateInstanceType(constraints), + packable.validateArchitecture(constraints), + packable.validateCapacityTypes(constraints), // Although this will remove instances that have GPUs when // not required, removal of instance types that *lack* // GPUs will be done later. - packable.validateNvidiaGpus(schedule), - packable.validateAMDGpus(schedule), - packable.validateAWSNeurons(schedule), + packable.validateNvidiaGpus(pods), + packable.validateAMDGpus(pods), + packable.validateAWSNeurons(pods), ); err != nil { continue } @@ -69,7 +69,7 @@ func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceTyp continue } // Calculate Daemonset Overhead - if len(packable.Pack(schedule.Daemons).unpacked) > 0 { + if len(packable.Pack(daemons).unpacked) > 0 { logging.FromContext(ctx).Debugf("Excluding instance type %s because there are not enough resources for daemons", packable.Name()) continue } @@ -151,47 +151,47 @@ func (p *Packable) reservePod(pod *v1.Pod) bool { return p.reserve(requests) } -func (p *Packable) validateInstanceType(schedule *scheduling.Schedule) error { - if !schedule.Requirements.InstanceTypes().Has(p.Name()) { - return fmt.Errorf("instance type %s is not in %v", p.Name(), schedule.Requirements.InstanceTypes().List()) +func (p *Packable) validateInstanceType(constraints *v1alpha5.Constraints) error { + if !constraints.Requirements.InstanceTypes().Has(p.Name()) { + return fmt.Errorf("instance type %s is not in %v", p.Name(), constraints.Requirements.InstanceTypes().List()) } return nil } -func (p *Packable) validateArchitecture(schedule *scheduling.Schedule) error { - if !schedule.Requirements.Architectures().Has(p.Architecture()) { - return fmt.Errorf("architecture %s is not in %v", p.Architecture(), schedule.Requirements.Architectures().List()) +func (p *Packable) validateArchitecture(constraints *v1alpha5.Constraints) error { + if !constraints.Requirements.Architectures().Has(p.Architecture()) { + return fmt.Errorf("architecture %s is not in %v", p.Architecture(), constraints.Requirements.Architectures().List()) } return nil } -func (p *Packable) validateZones(schedule *scheduling.Schedule) error { +func (p *Packable) validateZones(constraints *v1alpha5.Constraints) error { zones := sets.String{} for _, offering := range p.Offerings() { zones.Insert(offering.Zone) } - if schedule.Requirements.Zones().Intersection(zones).Len() == 0 { - return fmt.Errorf("zones %v are not in %v", zones, schedule.Requirements.Zones().List()) + if constraints.Requirements.Zones().Intersection(zones).Len() == 0 { + return fmt.Errorf("zones %v are not in %v", zones, constraints.Requirements.Zones().List()) } return nil } -func (p *Packable) validateCapacityTypes(schedule *scheduling.Schedule) error { +func (p *Packable) validateCapacityTypes(constraints *v1alpha5.Constraints) error { capacityTypes := sets.String{} for _, offering := range p.Offerings() { capacityTypes.Insert(offering.CapacityType) } - if schedule.Requirements.CapacityTypes().Intersection(capacityTypes).Len() == 0 { - return fmt.Errorf("capacity types %v are not in %v", capacityTypes, schedule.Requirements.CapacityTypes().List()) + if constraints.Requirements.CapacityTypes().Intersection(capacityTypes).Len() == 0 { + return fmt.Errorf("capacity types %v are not in %v", capacityTypes, constraints.Requirements.CapacityTypes().List()) } return nil } -func (p *Packable) validateNvidiaGpus(schedule *scheduling.Schedule) error { +func (p *Packable) validateNvidiaGpus(pods []*v1.Pod) error { if p.InstanceType.NvidiaGPUs().IsZero() { return nil } - for _, pod := range schedule.Pods { + for _, pod := range pods { for _, container := range pod.Spec.Containers { if _, ok := container.Resources.Requests[resources.NvidiaGPU]; ok { return nil @@ -201,11 +201,11 @@ func (p *Packable) validateNvidiaGpus(schedule *scheduling.Schedule) error { return fmt.Errorf("nvidia gpu is not required") } -func (p *Packable) validateAMDGpus(schedule *scheduling.Schedule) error { +func (p *Packable) validateAMDGpus(pods []*v1.Pod) error { if p.InstanceType.AMDGPUs().IsZero() { return nil } - for _, pod := range schedule.Pods { + for _, pod := range pods { for _, container := range pod.Spec.Containers { if _, ok := container.Resources.Requests[resources.AMDGPU]; ok { return nil @@ -215,11 +215,11 @@ func (p *Packable) validateAMDGpus(schedule *scheduling.Schedule) error { return fmt.Errorf("amd gpu is not required") } -func (p *Packable) validateAWSNeurons(schedule *scheduling.Schedule) error { +func (p *Packable) validateAWSNeurons(pods []*v1.Pod) error { if p.InstanceType.AWSNeurons().IsZero() { return nil } - for _, pod := range schedule.Pods { + for _, pod := range pods { for _, container := range pod.Spec.Containers { if _, ok := container.Resources.Requests[resources.AWSNeuron]; ok { return nil diff --git a/pkg/controllers/provisioning/binpacking/packer.go b/pkg/controllers/provisioning/binpacking/packer.go index ff8831a7b738..3eb34c807a61 100644 --- a/pkg/controllers/provisioning/binpacking/packer.go +++ b/pkg/controllers/provisioning/binpacking/packer.go @@ -16,18 +16,19 @@ package binpacking import ( "context" + "fmt" "math" "sort" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" - "github.com/aws/karpenter/pkg/controllers/provisioning/scheduling" "github.com/aws/karpenter/pkg/metrics" "github.com/aws/karpenter/pkg/utils/apiobject" "github.com/aws/karpenter/pkg/utils/injection" "github.com/aws/karpenter/pkg/utils/resources" "github.com/mitchellh/hashstructure/v2" "github.com/prometheus/client_golang/prometheus" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "knative.dev/pkg/logging" @@ -55,8 +56,14 @@ func init() { crmetrics.Registry.MustRegister(packDuration) } +func NewPacker(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Packer { + return &Packer{kubeClient: kubeClient, cloudProvider: cloudProvider} +} + // Packer packs pods and calculates efficient placement on the instances. type Packer struct { + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider } // Packing is a binpacking solution of equivalently schedulable pods to a set of @@ -66,7 +73,6 @@ type Packing struct { Pods [][]*v1.Pod `hash:"ignore"` NodeQuantity int `hash:"ignore"` InstanceTypeOptions []cloudprovider.InstanceType - Constraints *v1alpha5.Constraints } // Pack returns the node packings for the provided pods. It computes a set of viable @@ -75,19 +81,28 @@ type Packing struct { // Pods provided are all schedulable in the same zone as tightly as possible. // It follows the First Fit Decreasing bin packing technique, reference- // https://en.wikipedia.org/wiki/Bin_packing_problem#First_Fit_Decreasing_(FFD) -func (p *Packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instances []cloudprovider.InstanceType) []*Packing { +func (p *Packer) Pack(ctx context.Context, constraints *v1alpha5.Constraints, pods []*v1.Pod) ([]*Packing, error) { defer metrics.Measure(packDuration.WithLabelValues(injection.GetNamespacedName(ctx).Name))() - + // Get instance type options + instanceTypes, err := p.cloudProvider.GetInstanceTypes(ctx, constraints) + if err != nil { + return nil, fmt.Errorf("getting instance types, %w", err) + } + // Get daemons for overhead calculations + daemons, err := p.getDaemons(ctx, constraints) + if err != nil { + return nil, fmt.Errorf("getting schedulable daemon pods, %w", err) + } // Sort pods in decreasing order by the amount of CPU requested, if // CPU requested is equal compare memory requested. - sort.Sort(sort.Reverse(ByResourcesRequested{SortablePods: schedule.Pods})) + sort.Sort(sort.Reverse(ByResourcesRequested{SortablePods: pods})) packs := map[uint64]*Packing{} var packings []*Packing var packing *Packing - remainingPods := schedule.Pods + remainingPods := pods for len(remainingPods) > 0 { - packables := PackablesFor(ctx, instances, schedule) - packing, remainingPods = p.packWithLargestPod(schedule.Constraints, remainingPods, packables) + packables := PackablesFor(ctx, instanceTypes, constraints, pods, daemons) + packing, remainingPods = p.packWithLargestPod(remainingPods, packables) // checked all instance types and found no packing option if flattenedLen(packing.Pods...) == 0 { logging.FromContext(ctx).Errorf("Failed to compute packing, pod(s) %s did not fit in instance type option(s) %v", apiobject.PodNamespacedNames(remainingPods), packableNames(packables)) @@ -96,7 +111,7 @@ func (p *Packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instan } key, err := hashstructure.Hash(packing, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}) if err != nil { - logging.FromContext(ctx).Fatalf("Unable to hash packings while binpacking: %s", err.Error()) + return nil, fmt.Errorf("hashing packings, %w", err) } if mainPack, ok := packs[key]; ok { mainPack.NodeQuantity++ @@ -109,13 +124,29 @@ func (p *Packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instan for _, pack := range packings { logging.FromContext(ctx).Infof("Computed packing of %d node(s) for %d pod(s) with instance type option(s) %s", pack.NodeQuantity, flattenedLen(pack.Pods...), instanceTypeNames(pack.InstanceTypeOptions)) } - return packings + return packings, nil +} + +func (p *Packer) getDaemons(ctx context.Context, constraints *v1alpha5.Constraints) ([]*v1.Pod, error) { + daemonSetList := &appsv1.DaemonSetList{} + if err := p.kubeClient.List(ctx, daemonSetList); err != nil { + return nil, fmt.Errorf("listing daemonsets, %w", err) + } + // Include DaemonSets that will schedule on this node + pods := []*v1.Pod{} + for _, daemonSet := range daemonSetList.Items { + pod := &v1.Pod{Spec: daemonSet.Spec.Template.Spec} + if err := constraints.ValidatePod(pod); err == nil { + pods = append(pods, pod) + } + } + return pods, nil } // packWithLargestPod will try to pack max number of pods with largest pod in // pods across all available node capacities. It returns Packing: max pod count // that fit; with their node capacities and list of leftover pods -func (p *Packer) packWithLargestPod(constraints *v1alpha5.Constraints, unpackedPods []*v1.Pod, packables []*Packable) (*Packing, []*v1.Pod) { +func (p *Packer) packWithLargestPod(unpackedPods []*v1.Pod, packables []*Packable) (*Packing, []*v1.Pod) { bestPackedPods := []*v1.Pod{} bestInstances := []cloudprovider.InstanceType{} remainingPods := unpackedPods @@ -143,7 +174,7 @@ func (p *Packer) packWithLargestPod(constraints *v1alpha5.Constraints, unpackedP if len(bestInstances) > MaxInstanceTypes { bestInstances = bestInstances[:MaxInstanceTypes] } - return &Packing{Pods: [][]*v1.Pod{bestPackedPods}, Constraints: constraints, InstanceTypeOptions: bestInstances, NodeQuantity: 1}, remainingPods + return &Packing{Pods: [][]*v1.Pod{bestPackedPods}, InstanceTypeOptions: bestInstances, NodeQuantity: 1}, remainingPods } func (*Packer) podsMatch(first, second []*v1.Pod) bool { diff --git a/pkg/controllers/provisioning/controller.go b/pkg/controllers/provisioning/controller.go index 376f091661f6..08657bfbf0fc 100644 --- a/pkg/controllers/provisioning/controller.go +++ b/pkg/controllers/provisioning/controller.go @@ -32,7 +32,6 @@ import ( "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" - "github.com/aws/karpenter/pkg/controllers/provisioning/binpacking" "github.com/aws/karpenter/pkg/controllers/provisioning/scheduling" "github.com/aws/karpenter/pkg/utils/functional" "github.com/aws/karpenter/pkg/utils/injection" @@ -44,7 +43,7 @@ type Controller struct { ctx context.Context provisioners *sync.Map scheduler *scheduling.Scheduler - launcher *Launcher + coreV1Client corev1.CoreV1Interface kubeClient client.Client cloudProvider cloudprovider.CloudProvider } @@ -55,9 +54,9 @@ func NewController(ctx context.Context, kubeClient client.Client, coreV1Client c ctx: ctx, provisioners: &sync.Map{}, kubeClient: kubeClient, + coreV1Client: coreV1Client, cloudProvider: cloudProvider, scheduler: scheduling.NewScheduler(kubeClient, cloudProvider), - launcher: &Launcher{KubeClient: kubeClient, CoreV1Client: coreV1Client, CloudProvider: cloudProvider, Packer: &binpacking.Packer{}}, } } @@ -69,7 +68,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco provisioner := &v1alpha5.Provisioner{} if err := c.kubeClient.Get(ctx, req.NamespacedName, provisioner); err != nil { if errors.IsNotFound(err) { - c.Delete(ctx, req.Name) + c.Delete(req.Name) return reconcile.Result{}, nil } return reconcile.Result{}, err @@ -82,7 +81,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco } // Delete stops and removes a provisioner. Enqueued pods will be provisioned. -func (c *Controller) Delete(ctx context.Context, name string) { +func (c *Controller) Delete(name string) { if p, ok := c.provisioners.LoadAndDelete(name); ok { p.(*Provisioner).Stop() } @@ -100,27 +99,10 @@ func (c *Controller) Apply(ctx context.Context, provisioner *v1alpha5.Provisione With(requirements(instanceTypes)). With(v1alpha5.LabelRequirements(provisioner.Spec.Labels)). Consolidate() - if !c.hasChanged(ctx, provisioner) { - // If the provisionerSpecs haven't changed, we don't need to stop and drain the current Provisioner. - return nil - } - ctx, cancelFunc := context.WithCancel(ctx) - p := &Provisioner{ - Provisioner: provisioner, - pods: make(chan *v1.Pod), - results: make(chan error), - done: ctx.Done(), - Stop: cancelFunc, - cloudProvider: c.cloudProvider, - kubeClient: c.kubeClient, - scheduler: c.scheduler, - launcher: c.launcher, - } - p.Start(ctx) - // Update the provisioner; stop and drain an existing provisioner if exists. - if existing, ok := c.provisioners.LoadOrStore(provisioner.Name, p); ok { - c.provisioners.Store(provisioner.Name, p) - existing.(*Provisioner).Stop() + // Update the provisioner if anything has changed + if c.hasChanged(ctx, provisioner) { + c.Delete(provisioner.Name) + c.provisioners.Store(provisioner.Name, NewProvisioner(ctx, provisioner, c.kubeClient, c.coreV1Client, c.cloudProvider)) } return nil } diff --git a/pkg/controllers/provisioning/launcher.go b/pkg/controllers/provisioning/launcher.go deleted file mode 100644 index 60de63fb8fbb..000000000000 --- a/pkg/controllers/provisioning/launcher.go +++ /dev/null @@ -1,145 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package provisioning - -import ( - "context" - "fmt" - "sync/atomic" - - "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" - "github.com/aws/karpenter/pkg/cloudprovider" - "github.com/aws/karpenter/pkg/controllers/provisioning/binpacking" - "github.com/aws/karpenter/pkg/controllers/provisioning/scheduling" - "github.com/aws/karpenter/pkg/metrics" - "github.com/aws/karpenter/pkg/utils/functional" - "github.com/aws/karpenter/pkg/utils/injection" - "github.com/prometheus/client_golang/prometheus" - "go.uber.org/multierr" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/util/workqueue" - "knative.dev/pkg/logging" - "sigs.k8s.io/controller-runtime/pkg/client" - crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" -) - -type Launcher struct { - Packer *binpacking.Packer - KubeClient client.Client - CoreV1Client corev1.CoreV1Interface - CloudProvider cloudprovider.CloudProvider -} - -// Thread safe channel to pop off packed pod slices -func queueFor(pods [][]*v1.Pod) <-chan []*v1.Pod { - queue := make(chan []*v1.Pod, len(pods)) - defer close(queue) - for _, ps := range pods { - queue <- ps - } - return queue -} - -func (l *Launcher) Launch(ctx context.Context, provisioner *v1alpha5.Provisioner, schedules []*scheduling.Schedule, - instanceTypes []cloudprovider.InstanceType) error { - // Pack and bind pods - errs := make([]error, len(schedules)) - workqueue.ParallelizeUntil(ctx, len(schedules), len(schedules), func(index int) { - for _, packing := range l.Packer.Pack(ctx, schedules[index], instanceTypes) { - if err := l.verifyResourceLimits(ctx, provisioner); err != nil { - errs[index] = multierr.Append(errs[index], fmt.Errorf("verifying limits, %w", err)) - continue - } - packedPods := queueFor(packing.Pods) - if err := <-l.CloudProvider.Create(ctx, packing.Constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error { - node.Labels = functional.UnionStringMaps(node.Labels, packing.Constraints.Labels) - node.Spec.Taints = append(node.Spec.Taints, packing.Constraints.Taints...) - return l.bind(ctx, node, <-packedPods) - }); err != nil { - errs[index] = multierr.Append(errs[index], err) - } - } - }) - return multierr.Combine(errs...) -} - -func (l *Launcher) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) { - defer metrics.Measure(bindTimeHistogram.WithLabelValues(injection.GetNamespacedName(ctx).Name))() - - // Add the Karpenter finalizer to the node to enable the termination workflow - node.Finalizers = append(node.Finalizers, v1alpha5.TerminationFinalizer) - // Taint karpenter.sh/not-ready=NoSchedule to prevent the kube scheduler - // from scheduling pods before we're able to bind them ourselves. The kube - // scheduler has an eventually consistent cache of nodes and pods, so it's - // possible for it to see a provisioned node before it sees the pods bound - // to it. This creates an edge case where other pending pods may be bound to - // the node by the kube scheduler, causing OutOfCPU errors when the - // binpacked pods race to bind to the same node. The system eventually - // heals, but causes delays from additional provisioning (thrash). This - // taint will be removed by the node controller when a node is marked ready. - node.Spec.Taints = append(node.Spec.Taints, v1.Taint{ - Key: v1alpha5.NotReadyTaintKey, - Effect: v1.TaintEffectNoSchedule, - }) - // Idempotently create a node. In rare cases, nodes can come online and - // self register before the controller is able to register a node object - // with the API server. In the common case, we create the node object - // ourselves to enforce the binding decision and enable images to be pulled - // before the node is fully Ready. - if _, err := l.CoreV1Client.Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { - if !errors.IsAlreadyExists(err) { - return fmt.Errorf("creating node %s, %w", node.Name, err) - } - } - // Bind pods - var bound int64 - workqueue.ParallelizeUntil(ctx, len(pods), len(pods), func(i int) { - pod := pods[i] - binding := &v1.Binding{TypeMeta: pod.TypeMeta, ObjectMeta: pod.ObjectMeta, Target: v1.ObjectReference{Name: node.Name}} - if err := l.CoreV1Client.Pods(pods[i].Namespace).Bind(ctx, binding, metav1.CreateOptions{}); err != nil { - logging.FromContext(ctx).Errorf("Failed to bind %s/%s to %s, %s", pod.Namespace, pod.Name, node.Name, err.Error()) - } else { - atomic.AddInt64(&bound, 1) - } - }) - logging.FromContext(ctx).Infof("Bound %d pod(s) to node %s", bound, node.Name) - return nil -} - -func (l *Launcher) verifyResourceLimits(ctx context.Context, provisioner *v1alpha5.Provisioner) error { - latest := &v1alpha5.Provisioner{} - if err := l.KubeClient.Get(ctx, client.ObjectKeyFromObject(provisioner), latest); err != nil { - return fmt.Errorf("getting current resource usage, %w", err) - } - return provisioner.Spec.Limits.ExceededBy(latest.Status.Resources) -} - -var bindTimeHistogram = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: metrics.Namespace, - Subsystem: "allocation_controller", - Name: "bind_duration_seconds", - Help: "Duration of bind process in seconds. Broken down by result.", - Buckets: metrics.DurationBuckets(), - }, - []string{metrics.ProvisionerLabel}, -) - -func init() { - crmetrics.Registry.MustRegister(bindTimeHistogram) -} diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 5fca69832328..4ac17a668ee4 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -17,15 +17,26 @@ package provisioning import ( "context" "fmt" + "sync/atomic" "time" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" + "github.com/aws/karpenter/pkg/controllers/provisioning/binpacking" "github.com/aws/karpenter/pkg/controllers/provisioning/scheduling" + "github.com/aws/karpenter/pkg/metrics" + "github.com/aws/karpenter/pkg/utils/functional" + "github.com/aws/karpenter/pkg/utils/injection" + "github.com/prometheus/client_golang/prometheus" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/util/workqueue" "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" + crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" ) var ( @@ -35,24 +46,21 @@ var ( MaxPodsPerBatch = 2_000 ) -// Provisioner waits for enqueued pods, batches them, creates capacity and binds the pods to the capacity. -type Provisioner struct { - // State - *v1alpha5.Provisioner - pods chan *v1.Pod - results chan error - done <-chan struct{} - Stop context.CancelFunc - // Dependencies - cloudProvider cloudprovider.CloudProvider - kubeClient client.Client - scheduler *scheduling.Scheduler - launcher *Launcher -} - -func (p *Provisioner) Start(ctx context.Context) { +func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider) *Provisioner { + c, stop := context.WithCancel(ctx) + p := &Provisioner{ + Provisioner: provisioner, + pods: make(chan *v1.Pod), + results: make(chan error), + done: c.Done(), + Stop: stop, + cloudProvider: cloudProvider, + kubeClient: kubeClient, + coreV1Client: coreV1Client, + scheduler: scheduling.NewScheduler(kubeClient, cloudProvider), + packer: binpacking.NewPacker(kubeClient, cloudProvider), + } go func() { - logging.FromContext(ctx).Info("Starting provisioner") for { select { case <-p.done: @@ -65,6 +73,23 @@ func (p *Provisioner) Start(ctx context.Context) { } } }() + return p +} + +// Provisioner waits for enqueued pods, batches them, creates capacity and binds the pods to the capacity. +type Provisioner struct { + // State + *v1alpha5.Provisioner + pods chan *v1.Pod + results chan error + Stop context.CancelFunc + done <-chan struct{} + // Dependencies + cloudProvider cloudprovider.CloudProvider + kubeClient client.Client + coreV1Client corev1.CoreV1Interface + scheduler *scheduling.Scheduler + packer *binpacking.Packer } func (p *Provisioner) provision(ctx context.Context) (err error) { @@ -84,14 +109,18 @@ func (p *Provisioner) provision(ctx context.Context) (err error) { if err != nil { return fmt.Errorf("solving scheduling constraints, %w", err) } - // Get Instance Types, offering availability may vary over time - instanceTypes, err := p.cloudProvider.GetInstanceTypes(ctx, &p.Spec.Constraints) - if err != nil { - return fmt.Errorf("getting instance types") - } // Launch capacity and bind pods - if err := p.launcher.Launch(ctx, p.Provisioner, schedules, instanceTypes); err != nil { - return fmt.Errorf("launching capacity, %w", err) + for _, schedule := range schedules { + packings, err := p.packer.Pack(ctx, schedule.Constraints, schedule.Pods) + if err != nil { + return fmt.Errorf("binpacking pods, %w", err) + } + for _, packing := range packings { + if err := p.launch(ctx, schedule.Constraints, packing); err != nil { + logging.FromContext(ctx).Error("Could not launch node, %s", err.Error()) + continue + } + } } return nil } @@ -161,3 +190,91 @@ func (p *Provisioner) FilterProvisionable(ctx context.Context, pods []*v1.Pod) [ } return provisionable } + +func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constraints, packing *binpacking.Packing) error { + if err := p.verifyResourceLimits(ctx, p.Provisioner); err != nil { + return fmt.Errorf("limits exceeded, %w", err) + } + packedPods := queueFor(packing.Pods) + return <-p.cloudProvider.Create(ctx, constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error { + node.Labels = functional.UnionStringMaps(node.Labels, constraints.Labels) + node.Spec.Taints = append(node.Spec.Taints, constraints.Taints...) + return p.bind(ctx, node, <-packedPods) + }) +} + +func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) { + defer metrics.Measure(bindTimeHistogram.WithLabelValues(injection.GetNamespacedName(ctx).Name))() + + // Add the Karpenter finalizer to the node to enable the termination workflow + node.Finalizers = append(node.Finalizers, v1alpha5.TerminationFinalizer) + // Taint karpenter.sh/not-ready=NoSchedule to prevent the kube scheduler + // from scheduling pods before we're able to bind them ourselves. The kube + // scheduler has an eventually consistent cache of nodes and pods, so it's + // possible for it to see a provisioned node before it sees the pods bound + // to it. This creates an edge case where other pending pods may be bound to + // the node by the kube scheduler, causing OutOfCPU errors when the + // binpacked pods race to bind to the same node. The system eventually + // heals, but causes delays from additional provisioning (thrash). This + // taint will be removed by the node controller when a node is marked ready. + node.Spec.Taints = append(node.Spec.Taints, v1.Taint{ + Key: v1alpha5.NotReadyTaintKey, + Effect: v1.TaintEffectNoSchedule, + }) + // Idempotently create a node. In rare cases, nodes can come online and + // self register before the controller is able to register a node object + // with the API server. In the common case, we create the node object + // ourselves to enforce the binding decision and enable images to be pulled + // before the node is fully Ready. + if _, err := p.coreV1Client.Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { + if !errors.IsAlreadyExists(err) { + return fmt.Errorf("creating node %s, %w", node.Name, err) + } + } + // Bind pods + var bound int64 + workqueue.ParallelizeUntil(ctx, len(pods), len(pods), func(i int) { + pod := pods[i] + binding := &v1.Binding{TypeMeta: pod.TypeMeta, ObjectMeta: pod.ObjectMeta, Target: v1.ObjectReference{Name: node.Name}} + if err := p.coreV1Client.Pods(pods[i].Namespace).Bind(ctx, binding, metav1.CreateOptions{}); err != nil { + logging.FromContext(ctx).Errorf("Failed to bind %s/%s to %s, %s", pod.Namespace, pod.Name, node.Name, err.Error()) + } else { + atomic.AddInt64(&bound, 1) + } + }) + logging.FromContext(ctx).Infof("Bound %d pod(s) to node %s", bound, node.Name) + return nil +} + +func (p *Provisioner) verifyResourceLimits(ctx context.Context, provisioner *v1alpha5.Provisioner) error { + latest := &v1alpha5.Provisioner{} + if err := p.kubeClient.Get(ctx, client.ObjectKeyFromObject(provisioner), latest); err != nil { + return fmt.Errorf("getting current resource usage, %w", err) + } + return provisioner.Spec.Limits.ExceededBy(latest.Status.Resources) +} + +// Thread safe channel to pop off packed pod slices +func queueFor(pods [][]*v1.Pod) <-chan []*v1.Pod { + queue := make(chan []*v1.Pod, len(pods)) + defer close(queue) + for _, ps := range pods { + queue <- ps + } + return queue +} + +var bindTimeHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: metrics.Namespace, + Subsystem: "allocation_controller", + Name: "bind_duration_seconds", + Help: "Duration of bind process in seconds. Broken down by result.", + Buckets: metrics.DurationBuckets(), + }, + []string{metrics.ProvisionerLabel}, +) + +func init() { + crmetrics.Registry.MustRegister(bindTimeHistogram) +} diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index e360e4c27918..b96e4fa24820 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -24,7 +24,6 @@ import ( "github.com/aws/karpenter/pkg/utils/injection" "github.com/mitchellh/hashstructure/v2" "github.com/prometheus/client_golang/prometheus" - appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" @@ -48,7 +47,6 @@ func init() { type Scheduler struct { CloudProvider cloudprovider.CloudProvider - KubeClient client.Client Topology *Topology } @@ -56,14 +54,11 @@ type Schedule struct { *v1alpha5.Constraints // Pods is a set of pods that may schedule to the node; used for binpacking. Pods []*v1.Pod - // Daemons are a set of daemons that will schedule to the node; used for overhead. - Daemons []*v1.Pod } func NewScheduler(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Scheduler { return &Scheduler{ CloudProvider: cloudProvider, - KubeClient: kubeClient, Topology: &Topology{kubeClient: kubeClient}, } } @@ -104,16 +99,7 @@ func (s *Scheduler) getSchedules(ctx context.Context, constraints *v1alpha5.Cons } // Create new schedule if one doesn't exist if _, ok := schedules[key]; !ok { - // Uses a theoretical node object to compute schedulablility of daemonset overhead. - daemons, err := s.getDaemons(ctx, tightened) - if err != nil { - return nil, fmt.Errorf("computing node overhead, %w", err) - } - schedules[key] = &Schedule{ - Constraints: tightened, - Pods: []*v1.Pod{}, - Daemons: daemons, - } + schedules[key] = &Schedule{Constraints: tightened, Pods: []*v1.Pod{}} } // Append pod to schedule, guaranteed to exist schedules[key].Pods = append(schedules[key].Pods, pod) @@ -125,19 +111,3 @@ func (s *Scheduler) getSchedules(ctx context.Context, constraints *v1alpha5.Cons } return result, nil } - -func (s *Scheduler) getDaemons(ctx context.Context, constraints *v1alpha5.Constraints) ([]*v1.Pod, error) { - daemonSetList := &appsv1.DaemonSetList{} - if err := s.KubeClient.List(ctx, daemonSetList); err != nil { - return nil, fmt.Errorf("listing daemonsets, %w", err) - } - // Include DaemonSets that will schedule on this node - pods := []*v1.Pod{} - for _, daemonSet := range daemonSetList.Items { - pod := &v1.Pod{Spec: daemonSet.Spec.Template.Spec} - if constraints.ValidatePod(pod) == nil { - pods = append(pods, pod) - } - } - return pods, nil -}