From 3a77ccaac61b8634a75b67eb36a02256a737af22 Mon Sep 17 00:00:00 2001 From: Todd Neal Date: Mon, 13 Jun 2022 09:10:44 -0500 Subject: [PATCH] fix: track volume mounts per node Assume nodes can support infinite volumes until they launch. Once the CSI driver is reporting the current number of mountable volumes, use that value which may require launching more nodes. Remove the PVC controller since we no longer bind pods to nodes. Fixes #919 and #1888 --- charts/karpenter/templates/clusterrole.yaml | 3 + cmd/controller/main.go | 4 +- hack/docs/metrics_gen_docs.go | 6 + pkg/cloudprovider/aws/instancetype.go | 2 + pkg/cloudprovider/aws/suite_test.go | 2 +- pkg/cloudprovider/fake/cloudprovider.go | 3 + pkg/config/suite_test.go | 8 +- .../persistentvolumeclaim/controller.go | 126 ----------- .../persistentvolumeclaim/suite_test.go | 88 -------- pkg/controllers/provisioning/provisioner.go | 2 +- .../provisioning/scheduling/inflightnode.go | 23 +- .../provisioning/scheduling/node.go | 15 +- .../provisioning/scheduling/scheduler.go | 16 +- .../scheduling/scheduling_benchmark_test.go | 5 +- .../provisioning/scheduling/suite_test.go | 142 ++++++++++++- pkg/controllers/provisioning/suite_test.go | 2 +- pkg/controllers/state/cluster.go | 168 +++++++++------ pkg/controllers/state/node.go | 5 +- pkg/controllers/state/pod.go | 4 +- pkg/controllers/state/suite_test.go | 67 ++++-- .../state => scheduling}/hostportusage.go | 26 ++- pkg/scheduling/suite_test.go | 3 +- pkg/scheduling/volumelimits.go | 199 ++++++++++++++++++ pkg/test/storage.go | 10 +- 24 files changed, 587 insertions(+), 342 deletions(-) delete mode 100644 pkg/controllers/persistentvolumeclaim/controller.go delete mode 100644 pkg/controllers/persistentvolumeclaim/suite_test.go rename pkg/{controllers/state => scheduling}/hostportusage.go (82%) create mode 100644 pkg/scheduling/volumelimits.go diff --git a/charts/karpenter/templates/clusterrole.yaml b/charts/karpenter/templates/clusterrole.yaml index 7d0f318beca0..8a8e30f6cf7c 100644 --- a/charts/karpenter/templates/clusterrole.yaml +++ b/charts/karpenter/templates/clusterrole.yaml @@ -39,3 +39,6 @@ rules: - apiGroups: ["admissionregistration.k8s.io"] resources: ["validatingwebhookconfigurations", "mutatingwebhookconfigurations"] verbs: ["get", "watch", "list", "update"] + - apiGroups: ["storage.k8s.io"] + resources: ["csinodes"] + verbs: ["get", "watch", "list"] \ No newline at end of file diff --git a/cmd/controller/main.go b/cmd/controller/main.go index af317ebd7457..21c6a3408d7f 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -47,7 +47,6 @@ import ( metricspod "github.com/aws/karpenter/pkg/controllers/metrics/pod" metricsprovisioner "github.com/aws/karpenter/pkg/controllers/metrics/provisioner" "github.com/aws/karpenter/pkg/controllers/node" - "github.com/aws/karpenter/pkg/controllers/persistentvolumeclaim" "github.com/aws/karpenter/pkg/controllers/provisioning" "github.com/aws/karpenter/pkg/controllers/state" "github.com/aws/karpenter/pkg/controllers/termination" @@ -110,13 +109,12 @@ func main() { logging.FromContext(ctx).Errorf("watching configmaps, config changes won't be applied immediately, %s", err) } - cluster := state.NewCluster(ctx, manager.GetClient(), cloudProvider) + cluster := state.NewCluster(manager.GetClient(), cloudProvider) if err := manager.RegisterControllers(ctx, provisioning.NewController(ctx, cfg, manager.GetClient(), clientSet.CoreV1(), recorder, cloudProvider, cluster), state.NewNodeController(manager.GetClient(), cluster), state.NewPodController(manager.GetClient(), cluster), - persistentvolumeclaim.NewController(manager.GetClient()), termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider), node.NewController(manager.GetClient(), cloudProvider), metricspod.NewController(manager.GetClient()), diff --git a/hack/docs/metrics_gen_docs.go b/hack/docs/metrics_gen_docs.go index fe5f461881ba..0eb3de01ae12 100644 --- a/hack/docs/metrics_gen_docs.go +++ b/hack/docs/metrics_gen_docs.go @@ -205,6 +205,12 @@ func handleVariableDeclaration(v *ast.GenDecl) []metricInfo { } func getFuncPackage(fun ast.Expr) string { + if pexpr, ok := fun.(*ast.ParenExpr); ok { + return getFuncPackage(pexpr.X) + } + if sexpr, ok := fun.(*ast.StarExpr); ok { + return getFuncPackage(sexpr.X) + } if sel, ok := fun.(*ast.SelectorExpr); ok { return fmt.Sprintf("%s", sel.X) } diff --git a/pkg/cloudprovider/aws/instancetype.go b/pkg/cloudprovider/aws/instancetype.go index efb1e6c7ef33..e447496dc722 100644 --- a/pkg/cloudprovider/aws/instancetype.go +++ b/pkg/cloudprovider/aws/instancetype.go @@ -38,6 +38,8 @@ import ( "github.com/aws/karpenter/pkg/utils/sets" ) +var _ cloudprovider.InstanceType = (*InstanceType)(nil) + type InstanceType struct { *ec2.InstanceTypeInfo offerings []cloudprovider.Offering diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index a1630af8b8fb..149547bb5164 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -130,7 +130,7 @@ var _ = BeforeSuite(func() { kubeClient: e.Client, } registry.RegisterOrDie(ctx, cloudProvider) - cluster = state.NewCluster(ctx, e.Client, cloudProvider) + cluster = state.NewCluster(e.Client, cloudProvider) recorder = test.NewEventRecorder() cfg = test.NewConfig() controller = provisioning.NewController(ctx, cfg, e.Client, clientSet.CoreV1(), recorder, cloudProvider, cluster) diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index c877bc9872c6..dedfcc5bd5bd 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -49,6 +49,9 @@ type CloudProvider struct { CreateCalls []*cloudprovider.NodeRequest } +var _ cloudprovider.CloudProvider = (*CloudProvider)(nil) +var _ cloudprovider.InstanceType = (*InstanceType)(nil) + func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.NodeRequest) (*v1.Node, error) { c.mu.Lock() c.CreateCalls = append(c.CreateCalls, nodeRequest) diff --git a/pkg/config/suite_test.go b/pkg/config/suite_test.go index bd5edb0701b3..d1a12f4680f3 100644 --- a/pkg/config/suite_test.go +++ b/pkg/config/suite_test.go @@ -124,13 +124,7 @@ var _ = Describe("Batch Parameter", func() { var changed int64 cfg.OnChange(func(c config.Config) { defer GinkgoRecover() - // we can't unregister this, so just check for the one case we care about - if atomic.LoadInt64(&changed) == 0 { - atomic.StoreInt64(&changed, 1) - Expect(cfg.BatchIdleDuration()).To(Equal(1 * time.Second)) - // shouldn't be changed - Expect(cfg.BatchMaxDuration()).To(Equal(10 * time.Second)) - } + atomic.StoreInt64(&changed, 1) }) // simulate user updating the config map with a bad max duration diff --git a/pkg/controllers/persistentvolumeclaim/controller.go b/pkg/controllers/persistentvolumeclaim/controller.go deleted file mode 100644 index cc0d220931e9..000000000000 --- a/pkg/controllers/persistentvolumeclaim/controller.go +++ /dev/null @@ -1,126 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package persistentvolumeclaim - -import ( - "context" - "fmt" - - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - "knative.dev/pkg/logging" - controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" - - "github.com/aws/karpenter/pkg/utils/functional" - "github.com/aws/karpenter/pkg/utils/injection" - "github.com/aws/karpenter/pkg/utils/pod" -) - -const ( - controllerName = "volume" - SelectedNodeAnnotation = "volume.kubernetes.io/selected-node" -) - -// Controller for the resource -type Controller struct { - kubeClient client.Client -} - -// NewController is a constructor -func NewController(kubeClient client.Client) *Controller { - return &Controller{kubeClient: kubeClient} -} - -// Register the controller to the manager -func (c *Controller) Register(ctx context.Context, m manager.Manager) error { - return controllerruntime. - NewControllerManagedBy(m). - Named(controllerName). - For(&v1.PersistentVolumeClaim{}). - Watches(&source.Kind{Type: &v1.Pod{}}, handler.EnqueueRequestsFromMapFunc(c.pvcForPod)). - Complete(c) -} - -// Reconcile a control loop for the resource -func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named(controllerName).With("resource", req.String())) - ctx = injection.WithNamespacedName(ctx, req.NamespacedName) - ctx = injection.WithControllerName(ctx, controllerName) - - pvc := &v1.PersistentVolumeClaim{} - if err := c.kubeClient.Get(ctx, req.NamespacedName, pvc); err != nil { - if errors.IsNotFound(err) { - return reconcile.Result{}, nil - } - return reconcile.Result{}, err - } - pod, err := c.podForPvc(ctx, pvc) - if err != nil { - return reconcile.Result{}, err - } - if pod == nil { - return reconcile.Result{}, nil - } - if nodeName, ok := pvc.Annotations[SelectedNodeAnnotation]; ok && nodeName == pod.Spec.NodeName { - return reconcile.Result{}, nil - } - if !c.isBindable(pod) { - return reconcile.Result{}, nil - } - pvc.Annotations = functional.UnionStringMaps(pvc.Annotations, map[string]string{SelectedNodeAnnotation: pod.Spec.NodeName}) - if err := c.kubeClient.Update(ctx, pvc); err != nil { - return reconcile.Result{}, fmt.Errorf("binding persistent volume claim for pod %s/%s to node %q, %w", pod.Namespace, pod.Name, pod.Spec.NodeName, err) - } - logging.FromContext(ctx).Infof("Bound persistent volume claim to node %s", pod.Spec.NodeName) - return reconcile.Result{}, nil -} - -func (c *Controller) podForPvc(ctx context.Context, pvc *v1.PersistentVolumeClaim) (*v1.Pod, error) { - pods := &v1.PodList{} - if err := c.kubeClient.List(ctx, pods, client.InNamespace(pvc.Namespace)); err != nil { - return nil, err - } - for _, pod := range pods.Items { - for _, volume := range pod.Spec.Volumes { - if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvc.Name { - return &pod, nil - } - } - } - return nil, nil -} - -func (c *Controller) pvcForPod(o client.Object) (requests []reconcile.Request) { - if !c.isBindable(o.(*v1.Pod)) { - return requests - } - for _, volume := range o.(*v1.Pod).Spec.Volumes { - if volume.PersistentVolumeClaim == nil { - continue - } - requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: o.GetNamespace(), Name: volume.PersistentVolumeClaim.ClaimName}}) - } - return requests -} - -func (c *Controller) isBindable(p *v1.Pod) bool { - return pod.IsScheduled(p) && !pod.IsTerminal(p) && !pod.IsTerminating(p) -} diff --git a/pkg/controllers/persistentvolumeclaim/suite_test.go b/pkg/controllers/persistentvolumeclaim/suite_test.go deleted file mode 100644 index ade21a137425..000000000000 --- a/pkg/controllers/persistentvolumeclaim/suite_test.go +++ /dev/null @@ -1,88 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package persistentvolumeclaim_test - -import ( - "context" - "strings" - "testing" - - "github.com/Pallinder/go-randomdata" - "github.com/aws/karpenter/pkg/controllers/persistentvolumeclaim" - "github.com/aws/karpenter/pkg/test" - . "github.com/aws/karpenter/pkg/test/expectations" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - v1 "k8s.io/api/core/v1" - . "knative.dev/pkg/logging/testing" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -var ctx context.Context -var controller *persistentvolumeclaim.Controller -var env *test.Environment - -func TestAPIs(t *testing.T) { - ctx = TestContextWithLogger(t) - RegisterFailHandler(Fail) - RunSpecs(t, "Volume") -} - -var _ = BeforeSuite(func() { - env = test.NewEnvironment(ctx, func(e *test.Environment) { - controller = persistentvolumeclaim.NewController(e.Client) - }) - Expect(env.Start()).To(Succeed(), "Failed to start environment") -}) - -var _ = AfterSuite(func() { - Expect(env.Stop()).To(Succeed(), "Failed to stop environment") -}) - -var _ = Describe("Reconcile", func() { - var pvc *v1.PersistentVolumeClaim - - BeforeEach(func() { - pvc = test.PersistentVolumeClaim() - }) - - AfterEach(func() { - ExpectCleanedUp(ctx, env.Client) - }) - - It("should ignore a pvc without pods", func() { - ExpectApplied(ctx, env.Client, pvc) - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(pvc)) - Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(pvc), pvc)).To(Succeed()) - Expect(pvc.Annotations[persistentvolumeclaim.SelectedNodeAnnotation]).To(BeEmpty()) - }) - It("should ignore a pvc with unscheduled or terminal pods", func() { - ExpectApplied(ctx, env.Client, pvc, - test.Pod(test.PodOptions{Phase: v1.PodPending}), - test.Pod(test.PodOptions{NodeName: strings.ToLower(randomdata.SillyName()), Phase: v1.PodSucceeded}), - test.Pod(test.PodOptions{NodeName: strings.ToLower(randomdata.SillyName()), Phase: v1.PodFailed}), - ) - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(pvc)) - Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(pvc), pvc)).To(Succeed()) - Expect(pvc.Annotations[persistentvolumeclaim.SelectedNodeAnnotation]).To(BeEmpty()) - }) - It("should bind a pvc to a pod's node", func() { - pod := test.Pod(test.PodOptions{NodeName: strings.ToLower(randomdata.SillyName()), PersistentVolumeClaims: []string{pvc.Name}}) - ExpectApplied(ctx, env.Client, pvc, pod) - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(pvc)) - Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(pvc), pvc)).To(Succeed()) - Expect(pvc.Annotations[persistentvolumeclaim.SelectedNodeAnnotation]).To(Equal(pod.Spec.NodeName)) - }) -}) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index e19a7659c73b..f98923829f3c 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -224,7 +224,7 @@ func (p *Provisioner) schedule(ctx context.Context, pods []*v1.Pod) ([]*schedule return nil, fmt.Errorf("getting daemon overhead, %w", err) } - return scheduler.NewScheduler(nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods) + return scheduler.NewScheduler(ctx, p.kubeClient, nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods) } func (p *Provisioner) launch(ctx context.Context, node *scheduler.Node) error { diff --git a/pkg/controllers/provisioning/scheduling/inflightnode.go b/pkg/controllers/provisioning/scheduling/inflightnode.go index 839651994c53..b06d4d071df8 100644 --- a/pkg/controllers/provisioning/scheduling/inflightnode.go +++ b/pkg/controllers/provisioning/scheduling/inflightnode.go @@ -15,6 +15,7 @@ limitations under the License. package scheduling import ( + "context" "fmt" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" @@ -35,13 +36,14 @@ type InFlightNode struct { requirements scheduling.Requirements available v1.ResourceList startupTolerations []v1.Toleration - hostPortUsage *state.HostPortUsage + hostPortUsage *scheduling.HostPortUsage + volumeUsage *scheduling.VolumeLimits + volumeLimits scheduling.VolumeCount } func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint, daemonResources v1.ResourceList) *InFlightNode { // the remaining daemonResources to schedule are the total daemonResources minus what has already scheduled remainingDaemonResources := resources.Subtract(daemonResources, n.DaemonSetRequested) - node := &InFlightNode{ Node: n.Node, available: n.Available, @@ -49,6 +51,8 @@ func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint requests: remainingDaemonResources, requirements: scheduling.NewLabelRequirements(n.Node.Labels), hostPortUsage: n.HostPortUsage.Copy(), + volumeUsage: n.VolumeUsage.Copy(), + volumeLimits: n.VolumeLimits, } if n.Node.Labels[v1alpha5.LabelNodeInitialized] != "true" { @@ -81,15 +85,24 @@ func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint return node } -func (n *InFlightNode) Add(pod *v1.Pod) error { +func (n *InFlightNode) Add(ctx context.Context, pod *v1.Pod) error { // Check Taints if err := scheduling.Taints(n.Node.Spec.Taints).Tolerates(pod, n.startupTolerations...); err != nil { return err } - if err := n.hostPortUsage.Add(pod); err != nil { + if err := n.hostPortUsage.Validate(pod); err != nil { + return err + } + + // determine the number of volumes that will be mounted if the pod schedules + mountedVolumeCount, err := n.volumeUsage.Validate(ctx, pod) + if err != nil { return err } + if mountedVolumeCount.Exceeds(n.volumeLimits) { + return fmt.Errorf("would exceed node volume limits") + } // check resource requests first since that's a pretty likely reason the pod won't schedule on an in-flight // node, which at this point can't be increased in size @@ -122,5 +135,7 @@ func (n *InFlightNode) Add(pod *v1.Pod) error { n.requests = requests n.requirements = nodeRequirements n.topology.Record(pod, nodeRequirements) + n.hostPortUsage.Add(ctx, pod) + n.volumeUsage.Add(ctx, pod) return nil } diff --git a/pkg/controllers/provisioning/scheduling/node.go b/pkg/controllers/provisioning/scheduling/node.go index bb67a1b8f7c6..0ceef8b3c1d0 100644 --- a/pkg/controllers/provisioning/scheduling/node.go +++ b/pkg/controllers/provisioning/scheduling/node.go @@ -15,17 +15,16 @@ limitations under the License. package scheduling import ( + "context" "fmt" "strings" "sync/atomic" - v1 "k8s.io/api/core/v1" - "github.com/samber/lo" + v1 "k8s.io/api/core/v1" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" - "github.com/aws/karpenter/pkg/controllers/state" "github.com/aws/karpenter/pkg/scheduling" "github.com/aws/karpenter/pkg/utils/resources" "github.com/aws/karpenter/pkg/utils/sets" @@ -40,7 +39,7 @@ type Node struct { topology *Topology requests v1.ResourceList - hostPortUsage *state.HostPortUsage + hostPortUsage *scheduling.HostPortUsage } var nodeID int64 @@ -54,19 +53,20 @@ func NewNode(nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonRe return &Node{ NodeTemplate: template, InstanceTypeOptions: instanceTypes, - hostPortUsage: state.NewHostPortUsage(), + hostPortUsage: scheduling.NewHostPortUsage(), topology: topology, requests: daemonResources, } } -func (n *Node) Add(pod *v1.Pod) error { +func (n *Node) Add(ctx context.Context, pod *v1.Pod) error { // Check Taints if err := n.Taints.Tolerates(pod); err != nil { return err } - if err := n.hostPortUsage.Add(pod); err != nil { + // exposed host ports on the node + if err := n.hostPortUsage.Validate(pod); err != nil { return err } @@ -102,6 +102,7 @@ func (n *Node) Add(pod *v1.Pod) error { n.requests = requests n.Requirements = nodeRequirements n.topology.Record(pod, nodeRequirements) + n.hostPortUsage.Add(ctx, pod) return nil } diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 1382962a60a7..7881e90b48c4 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -33,13 +33,15 @@ import ( "github.com/aws/karpenter/pkg/utils/resources" ) -func NewScheduler(nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler { +func NewScheduler(ctx context.Context, kubeClient client.Client, nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler { for provisioner := range instanceTypes { sort.Slice(instanceTypes[provisioner], func(i, j int) bool { return instanceTypes[provisioner][i].Price() < instanceTypes[provisioner][j].Price() }) } s := &Scheduler{ + ctx: ctx, + kubeClient: kubeClient, nodeTemplates: nodeTemplates, topology: topology, cluster: cluster, @@ -84,6 +86,7 @@ func NewScheduler(nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alp } type Scheduler struct { + ctx context.Context nodes []*Node inflight []*InFlightNode nodeTemplates []*scheduling.NodeTemplate @@ -94,6 +97,7 @@ type Scheduler struct { topology *Topology cluster *state.Cluster recorder events.Recorder + kubeClient client.Client } func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) ([]*Node, error) { @@ -112,7 +116,7 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) ([]*Node, error) } // Schedule to existing nodes or create a new node - if errors[pod] = s.add(pod); errors[pod] == nil { + if errors[pod] = s.add(ctx, pod); errors[pod] == nil { continue } @@ -165,10 +169,10 @@ func (s *Scheduler) recordSchedulingResults(ctx context.Context, pods []*v1.Pod, logging.FromContext(ctx).Infof("Computed %d unready node(s) will fit %d pod(s)", inflightCount, existingCount) } -func (s *Scheduler) add(pod *v1.Pod) error { +func (s *Scheduler) add(ctx context.Context, pod *v1.Pod) error { // first try to schedule against an in-flight real node for _, node := range s.inflight { - if err := node.Add(pod); err == nil { + if err := node.Add(ctx, pod); err == nil { return nil } } @@ -178,7 +182,7 @@ func (s *Scheduler) add(pod *v1.Pod) error { // Pick existing node that we are about to create for _, node := range s.nodes { - if err := node.Add(pod); err == nil { + if err := node.Add(ctx, pod); err == nil { return nil } } @@ -197,7 +201,7 @@ func (s *Scheduler) add(pod *v1.Pod) error { } node := NewNode(nodeTemplate, s.topology, s.daemonOverhead[nodeTemplate], instanceTypes) - if err := node.Add(pod); err != nil { + if err := node.Add(ctx, pod); err != nil { errs = multierr.Append(errs, fmt.Errorf("incompatible with provisioner %q, %w", nodeTemplate.ProvisionerName, err)) continue } diff --git a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go index 9349f0e9f12e..d645bd6fa33c 100644 --- a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go +++ b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go @@ -112,10 +112,11 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) { instanceTypes := fake.InstanceTypes(instanceCount) cloudProv := &fake.CloudProvider{InstanceTypes: instanceTypes} - scheduler := NewScheduler( + scheduler := NewScheduler(ctx, + nil, []*scheduling.NodeTemplate{scheduling.NewNodeTemplate(provisioner)}, nil, - state.NewCluster(ctx, nil, cloudProv), + state.NewCluster(nil, cloudProv), &Topology{}, map[string][]cloudprovider.InstanceType{provisioner.Name: instanceTypes}, map[*scheduling.NodeTemplate]v1.ResourceList{}, diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index 283ceb51a479..bd23a1bef861 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -16,6 +16,7 @@ package scheduling_test import ( "context" + "fmt" "math" "math/rand" "strings" @@ -27,6 +28,7 @@ import ( "github.com/Pallinder/go-randomdata" "github.com/aws/aws-sdk-go/aws" v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -72,7 +74,7 @@ var _ = BeforeSuite(func() { instanceTypes, _ := cloudProv.GetInstanceTypes(ctx, nil) // set these on the cloud provider so we can manipulate them if needed cloudProv.InstanceTypes = instanceTypes - cluster = state.NewCluster(ctx, e.Client, cloudProv) + cluster = state.NewCluster(e.Client, cloudProv) nodeStateController = state.NewNodeController(e.Client, cluster) podStateController = state.NewPodController(e.Client, cluster) recorder = test.NewEventRecorder() @@ -2958,11 +2960,11 @@ var _ = Describe("Instance Type Compatibility", func() { ExpectApplied(ctx, env.Client, provisioner) pods := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{ - fake.LabelInstanceSize: "large", + fake.LabelInstanceSize: "large", v1.LabelInstanceTypeStable: cloudProv.InstanceTypes[0].Name(), }}), test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{ - fake.LabelInstanceSize: "small", + fake.LabelInstanceSize: "small", v1.LabelInstanceTypeStable: cloudProv.InstanceTypes[4].Name(), }}), ) @@ -3470,6 +3472,7 @@ var _ = Describe("In-Flight Nodes", func() { }} initialPod := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod(opts)) node1 := ExpectScheduled(ctx, env.Client, initialPod[0]) + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) // the node will have 2000m CPU, so these two pods can't both fit on it opts.ResourceRequirements.Limits[v1.ResourceCPU] = resource.MustParse("1") @@ -3486,6 +3489,7 @@ var _ = Describe("In-Flight Nodes", func() { }} initialPod := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod(opts)) node1 := ExpectScheduled(ctx, env.Client, initialPod[0]) + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) secondPod := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1.LabelArchStable: "arm64"}})) @@ -3848,6 +3852,138 @@ var _ = Describe("No Pre-Binding", func() { }) }) +var _ = Describe("Volume Limits", func() { + It("should launch multiple nodes if required due to volume limits", func() { + const csiProvider = "fake.csi.provider" + cloudProv.InstanceTypes = []cloudprovider.InstanceType{ + fake.NewInstanceType( + fake.InstanceTypeOptions{ + Name: "instance-type", + Resources: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1024"), + v1.ResourcePods: resource.MustParse("1024"), + }, + }), + } + + provisioner.Spec.Limits = nil + ExpectApplied(ctx, env.Client, provisioner) + initialPods := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod()) + node := ExpectScheduled(ctx, env.Client, initialPods[0]) + csiNode := &storagev1.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: node.Name, + }, + Spec: storagev1.CSINodeSpec{ + Drivers: []storagev1.CSINodeDriver{ + { + Name: csiProvider, + NodeID: "fake-node-id", + Allocatable: &storagev1.VolumeNodeResources{ + Count: aws.Int32(10), + }, + }, + }, + }, + } + ExpectApplied(ctx, env.Client, csiNode) + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) + + sc := test.StorageClass(test.StorageClassOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-storage-class"}, + Provisioner: aws.String(csiProvider), + Zones: []string{"test-zone-1"}}) + ExpectApplied(ctx, env.Client, sc) + + var pods []*v1.Pod + for i := 0; i < 6; i++ { + pvcA := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{ + StorageClassName: aws.String("my-storage-class"), + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("my-claim-a-%d", i)}, + }) + pvcB := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{ + StorageClassName: aws.String("my-storage-class"), + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("my-claim-b-%d", i)}, + }) + ExpectApplied(ctx, env.Client, pvcA, pvcB) + pods = append(pods, test.UnschedulablePod(test.PodOptions{ + PersistentVolumeClaims: []string{pvcA.Name, pvcB.Name}, + })) + } + ExpectProvisioned(ctx, env.Client, controller, pods...) + var nodeList v1.NodeList + Expect(env.Client.List(ctx, &nodeList)).To(Succeed()) + // we need to create a new node as the in-flight one can only contain 5 pods due to the CSINode volume limit + Expect(nodeList.Items).To(HaveLen(2)) + }) + It("should launch a single node if all pods use the same PVC", func() { + const csiProvider = "fake.csi.provider" + cloudProv.InstanceTypes = []cloudprovider.InstanceType{ + fake.NewInstanceType( + fake.InstanceTypeOptions{ + Name: "instance-type", + Resources: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1024"), + v1.ResourcePods: resource.MustParse("1024"), + }, + }), + } + + provisioner.Spec.Limits = nil + ExpectApplied(ctx, env.Client, provisioner) + initialPods := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod()) + node := ExpectScheduled(ctx, env.Client, initialPods[0]) + csiNode := &storagev1.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: node.Name, + }, + Spec: storagev1.CSINodeSpec{ + Drivers: []storagev1.CSINodeDriver{ + { + Name: csiProvider, + NodeID: "fake-node-id", + Allocatable: &storagev1.VolumeNodeResources{ + Count: aws.Int32(10), + }, + }, + }, + }, + } + ExpectApplied(ctx, env.Client, csiNode) + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) + + sc := test.StorageClass(test.StorageClassOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-storage-class"}, + Provisioner: aws.String(csiProvider), + Zones: []string{"test-zone-1"}}) + ExpectApplied(ctx, env.Client, sc) + + pv := test.PersistentVolume(test.PersistentVolumeOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-volume"}, + Zones: []string{"test-zone-1"}}) + + pvc := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-claim"}, + StorageClassName: aws.String("my-storage-class"), + VolumeName: pv.Name, + }) + ExpectApplied(ctx, env.Client, pv, pvc) + + var pods []*v1.Pod + for i := 0; i < 100; i++ { + pods = append(pods, test.UnschedulablePod(test.PodOptions{ + PersistentVolumeClaims: []string{pvc.Name, pvc.Name}, + })) + } + ExpectApplied(ctx, env.Client, provisioner) + ExpectProvisioned(ctx, env.Client, controller, pods...) + var nodeList v1.NodeList + Expect(env.Client.List(ctx, &nodeList)).To(Succeed()) + // 100 of the same PVC should all be schedulable on the same node + Expect(nodeList.Items).To(HaveLen(1)) + }) +}) + func MakePods(count int, options test.PodOptions) (pods []*v1.Pod) { for i := 0; i < count; i++ { pods = append(pods, test.UnschedulablePod(options)) diff --git a/pkg/controllers/provisioning/suite_test.go b/pkg/controllers/provisioning/suite_test.go index 6bc8861ebcd9..8632b7f5de7d 100644 --- a/pkg/controllers/provisioning/suite_test.go +++ b/pkg/controllers/provisioning/suite_test.go @@ -66,7 +66,7 @@ var _ = BeforeSuite(func() { for _, it := range instanceTypes { instanceTypeMap[it.Name()] = it } - cluster := state.NewCluster(ctx, e.Client, cloudProvider) + cluster := state.NewCluster(e.Client, cloudProvider) controller = provisioning.NewController(ctx, cfg, e.Client, corev1.NewForConfigOrDie(e.Config), recorder, cloudProvider, cluster) }) Expect(env.Start()).To(Succeed(), "Failed to start environment") diff --git a/pkg/controllers/state/cluster.go b/pkg/controllers/state/cluster.go index ed96ae5b5a0f..ff37cd3b7133 100644 --- a/pkg/controllers/state/cluster.go +++ b/pkg/controllers/state/cluster.go @@ -20,23 +20,25 @@ import ( "sort" "sync" - "github.com/aws/karpenter/pkg/cloudprovider" - - "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" + "go.uber.org/multierr" - "knative.dev/pkg/logging" + "github.com/aws/aws-sdk-go/aws" + "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" + "github.com/aws/karpenter/pkg/cloudprovider" + "github.com/aws/karpenter/pkg/scheduling" podutils "github.com/aws/karpenter/pkg/utils/pod" "github.com/aws/karpenter/pkg/utils/resources" ) // Cluster maintains cluster state that is often needed but expensive to compute. type Cluster struct { - ctx context.Context kubeClient client.Client cloudProvider cloudprovider.CloudProvider @@ -49,9 +51,8 @@ type Cluster struct { bindings map[types.NamespacedName]string // pod namespaced named -> node name } -func NewCluster(ctx context.Context, client client.Client, cp cloudprovider.CloudProvider) *Cluster { +func NewCluster(client client.Client, cp cloudprovider.CloudProvider) *Cluster { c := &Cluster{ - ctx: ctx, kubeClient: client, cloudProvider: cp, nodes: map[string]*Node{}, @@ -76,12 +77,14 @@ type Node struct { // included in the calculation for Available. DaemonSetRequested v1.ResourceList // HostPort usage of all pods that are bound to the node - HostPortUsage *HostPortUsage + HostPortUsage *scheduling.HostPortUsage + VolumeUsage *scheduling.VolumeLimits + VolumeLimits scheduling.VolumeCount + InstanceType cloudprovider.InstanceType // Provisioner is the provisioner used to create the node. Provisioner *v1alpha5.Provisioner - podRequests map[types.NamespacedName]v1.ResourceList - InstanceType cloudprovider.InstanceType + podRequests map[types.NamespacedName]v1.ResourceList } // ForPodsWithAntiAffinity calls the supplied function once for each pod with required anti affinity terms that is @@ -126,33 +129,30 @@ func (c *Cluster) ForEachNode(f func(n *Node) bool) { } } -func (c *Cluster) newNode(node *v1.Node) *Node { +// newNode always returns a node, even if some portion of the update has failed +func (c *Cluster) newNode(ctx context.Context, node *v1.Node) (*Node, error) { n := &Node{ Node: node, - HostPortUsage: NewHostPortUsage(), + HostPortUsage: scheduling.NewHostPortUsage(), + VolumeUsage: scheduling.NewVolumeLimits(c.kubeClient), + VolumeLimits: scheduling.VolumeCount{}, podRequests: map[types.NamespacedName]v1.ResourceList{}, } - - // store the provisioner if it exists - if provisionerName, ok := node.Labels[v1alpha5.ProvisionerNameLabelKey]; ok { - var provisioner v1alpha5.Provisioner - if err := c.kubeClient.Get(c.ctx, client.ObjectKey{Name: provisionerName}, &provisioner); err != nil { - logging.FromContext(c.ctx).Errorf("getting provisioner, %s", err) - } else { - n.Provisioner = &provisioner - } - } - - var err error - - n.InstanceType, err = c.getInstanceType(c.ctx, n.Provisioner, node.Labels[v1.LabelInstanceTypeStable]) - if err != nil { - logging.FromContext(c.ctx).Errorf("getting instance type, %s", err) + if err := multierr.Combine( + c.populateProvisioner(ctx, node, n), + c.populateInstanceType(ctx, node, n), + c.populateVolumeLimits(ctx, node, n), + c.populateResourceRequests(ctx, node, n), + ); err != nil { + return nil, err } + return n, nil +} +func (c *Cluster) populateResourceRequests(ctx context.Context, node *v1.Node, n *Node) error { var pods v1.PodList - if err := c.kubeClient.List(c.ctx, &pods, client.MatchingFields{"spec.nodeName": node.Name}); err != nil { - logging.FromContext(c.ctx).Errorf("listing pods, %s", err) + if err := c.kubeClient.List(ctx, &pods, client.MatchingFields{"spec.nodeName": node.Name}); err != nil { + return fmt.Errorf("listing pods, %w", err) } var requested []v1.ResourceList var daemonsetRequested []v1.ResourceList @@ -167,9 +167,8 @@ func (c *Cluster) newNode(node *v1.Node) *Node { daemonsetRequested = append(daemonsetRequested, requests) } requested = append(requested, requests) - if err := n.HostPortUsage.Add(pod); err != nil { - logging.FromContext(c.ctx).Errorf("inconsistent state, error tracking host port usage on node %s, %s", n.Node.Name, err) - } + n.HostPortUsage.Add(ctx, pod) + n.VolumeUsage.Add(ctx, pod) } n.DaemonSetRequested = resources.Merge(daemonsetRequested...) @@ -179,23 +178,44 @@ func (c *Cluster) newNode(node *v1.Node) *Node { if len(n.Capacity) == 0 && n.InstanceType != nil { n.Capacity = n.InstanceType.Resources() } - n.Available = resources.Subtract(c.getNodeAllocatable(node, n.Provisioner), resources.Merge(requested...)) - return n + n.Available = resources.Subtract(c.getNodeAllocatable(node, n), resources.Merge(requested...)) + return nil } -// getNodeAllocatable gets the allocatable resources for the node. -func (c *Cluster) getNodeAllocatable(node *v1.Node, provisioner *v1alpha5.Provisioner) v1.ResourceList { - instanceType, err := c.getInstanceType(c.ctx, provisioner, node.Labels[v1.LabelInstanceTypeStable]) - if err != nil { - logging.FromContext(c.ctx).Errorf("error finding instance type, %s", err) - return node.Status.Allocatable +func (c *Cluster) populateVolumeLimits(ctx context.Context, node *v1.Node, n *Node) error { + var csiNode storagev1.CSINode + if err := c.kubeClient.Get(ctx, client.ObjectKey{Name: node.Name}, &csiNode); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("getting CSINode to determine volume limit for %s, %w", node.Name, err) } + for _, driver := range csiNode.Spec.Drivers { + if driver.Allocatable == nil { + continue + } + n.VolumeLimits[driver.Name] = int(aws.Int32Value(driver.Allocatable.Count)) + } + return nil +} + +func (c *Cluster) populateProvisioner(ctx context.Context, node *v1.Node, n *Node) error { + // store the provisioner if it exists + if provisionerName, ok := node.Labels[v1alpha5.ProvisionerNameLabelKey]; ok { + var provisioner v1alpha5.Provisioner + if err := c.kubeClient.Get(ctx, client.ObjectKey{Name: provisionerName}, &provisioner); err != nil { + return fmt.Errorf("getting provisioner, %w", err) + } + n.Provisioner = &provisioner + } + return nil +} + +// getNodeAllocatable gets the allocatable resources for the node. +func (c *Cluster) getNodeAllocatable(node *v1.Node, n *Node) v1.ResourceList { // If the node is ready, don't take into consideration possible kubelet resource zeroing. This is to handle the // case where a node comes up with a resource and the hardware fails in some way so that the device-plugin zeros // out the resource. We don't want to assume that it will always come back. The instance type may be nil if // the node was created from a provisioner that has since been deleted. - if instanceType == nil || node.Labels[v1alpha5.LabelNodeInitialized] == "true" { + if n.InstanceType == nil || node.Labels[v1alpha5.LabelNodeInitialized] == "true" { return node.Status.Allocatable } @@ -203,7 +223,7 @@ func (c *Cluster) getNodeAllocatable(node *v1.Node, provisioner *v1alpha5.Provis for k, v := range node.Status.Allocatable { allocatable[k] = v } - for resourceName, quantity := range instanceType.Resources() { + for resourceName, quantity := range n.InstanceType.Resources() { // kubelet will zero out both the capacity and allocatable for an extended resource on startup if resources.IsZero(node.Status.Capacity[resourceName]) && resources.IsZero(node.Status.Allocatable[resourceName]) && @@ -221,10 +241,17 @@ func (c *Cluster) deleteNode(nodeName string) { } // updateNode is called for every node reconciliation -func (c *Cluster) updateNode(node *v1.Node) { +func (c *Cluster) updateNode(ctx context.Context, node *v1.Node) error { c.mu.Lock() defer c.mu.Unlock() - c.nodes[node.Name] = c.newNode(node) + n, err := c.newNode(ctx, node) + if err != nil { + // ensure that the out of date node is forgotten + delete(c.nodes, node.Name) + return err + } + c.nodes[node.Name] = n + return nil } // deletePod is called when the pod has been deleted @@ -253,6 +280,7 @@ func (c *Cluster) updateNodeUsageFromPodDeletion(podKey types.NamespacedName) { n.Available = resources.Merge(n.Available, n.podRequests[podKey]) delete(n.podRequests, podKey) n.HostPortUsage.DeletePod(podKey) + n.VolumeUsage.DeletePod(podKey) // We can't easily track the changes to the DaemonsetRequested here as we no longer have the pod. We could keep up // with this separately, but if a daemonset pod is being deleted, it usually means the node is going down. In the @@ -260,9 +288,10 @@ func (c *Cluster) updateNodeUsageFromPodDeletion(podKey types.NamespacedName) { } // updatePod is called every time the pod is reconciled -func (c *Cluster) updatePod(pod *v1.Pod) { - c.updateNodeUsageFromPod(pod) +func (c *Cluster) updatePod(ctx context.Context, pod *v1.Pod) error { + err := c.updateNodeUsageFromPod(ctx, pod) c.updatePodAntiAffinities(pod) + return err } func (c *Cluster) updatePodAntiAffinities(pod *v1.Pod) { @@ -279,10 +308,10 @@ func (c *Cluster) updatePodAntiAffinities(pod *v1.Pod) { // updateNodeUsageFromPod is called every time a reconcile event occurs for the pod. If the pods binding has changed // (unbound to bound), we need to update the resource requests on the node. -func (c *Cluster) updateNodeUsageFromPod(pod *v1.Pod) { +func (c *Cluster) updateNodeUsageFromPod(ctx context.Context, pod *v1.Pod) error { // nothing to do if the pod isn't bound, checking early allows avoiding unnecessary locking if pod.Spec.NodeName == "" { - return + return nil } c.mu.Lock() @@ -293,11 +322,10 @@ func (c *Cluster) updateNodeUsageFromPod(pod *v1.Pod) { if bindingKnown { if oldNodeName == pod.Spec.NodeName { // we are already tracking the pod binding, so nothing to update - return + return nil } // the pod has switched nodes, this can occur if a pod name was re-used and it was deleted/re-created rapidly, // binding to a different node the second time - logging.FromContext(c.ctx).Infof("pod %s has moved from node %s to %s", podKey, oldNodeName, pod.Spec.NodeName) n, ok := c.nodes[oldNodeName] if ok { // we were tracking the old node, so we need to reduce its capacity by the amount of the pod that has @@ -313,14 +341,19 @@ func (c *Cluster) updateNodeUsageFromPod(pod *v1.Pod) { n, ok := c.nodes[pod.Spec.NodeName] if !ok { var node v1.Node - if err := c.kubeClient.Get(c.ctx, client.ObjectKey{Name: pod.Spec.NodeName}, &node); err != nil { - logging.FromContext(c.ctx).Errorf("getting node, %s", err) + if err := c.kubeClient.Get(ctx, client.ObjectKey{Name: pod.Spec.NodeName}, &node); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("getting node, %w", err) } + var err error // node didn't exist, but creating it will pick up this newly bound pod as well - n = c.newNode(&node) - c.nodes[pod.Spec.NodeName] = n - return + n, err = c.newNode(ctx, &node) + if err != nil { + // no need to delete c.nodes[node.Name] as it wasn't stored previously + return err + } + c.nodes[node.Name] = n + return nil } // sum the newly bound pod's requests into the existing node and record the binding @@ -331,26 +364,27 @@ func (c *Cluster) updateNodeUsageFromPod(pod *v1.Pod) { if podutils.IsOwnedByDaemonSet(pod) { n.DaemonSetRequested = resources.Merge(n.DaemonSetRequested, podRequests) } - if err := n.HostPortUsage.Add(pod); err != nil { - logging.FromContext(c.ctx).Errorf("inconsistent state, error tracking host port usage on node %s, %s", n.Node.Name, err) - } + n.HostPortUsage.Add(ctx, pod) + n.VolumeUsage.Add(ctx, pod) n.podRequests[podKey] = podRequests c.bindings[podKey] = n.Node.Name + return nil } -func (c *Cluster) getInstanceType(ctx context.Context, provisioner *v1alpha5.Provisioner, instanceTypeName string) (cloudprovider.InstanceType, error) { - if provisioner == nil { - // no provisioner means we cant lookup the instance type - return nil, nil +func (c *Cluster) populateInstanceType(ctx context.Context, node *v1.Node, n *Node) error { + if n.Provisioner == nil || n.Provisioner.Spec.Provider == nil { + return nil } - instanceTypes, err := c.cloudProvider.GetInstanceTypes(ctx, provisioner) + instanceTypes, err := c.cloudProvider.GetInstanceTypes(ctx, n.Provisioner) if err != nil { - return nil, err + return err } + instanceTypeName := node.Labels[v1.LabelInstanceTypeStable] for _, it := range instanceTypes { if it.Name() == instanceTypeName { - return it, nil + n.InstanceType = it + return nil } } - return nil, fmt.Errorf("instance type '%s' not found", instanceTypeName) + return fmt.Errorf("instance type '%s' not found", instanceTypeName) } diff --git a/pkg/controllers/state/node.go b/pkg/controllers/state/node.go index 3c2f9a2c2bb5..7cd9df63c374 100644 --- a/pkg/controllers/state/node.go +++ b/pkg/controllers/state/node.go @@ -53,9 +53,10 @@ func (c *NodeController) Reconcile(ctx context.Context, req reconcile.Request) ( } return reconcile.Result{}, err } + if err := c.cluster.updateNode(ctx, node); err != nil { + return reconcile.Result{}, err + } // ensure it's aware of any nodes we discover, this is a no-op if the node is already known to our cluster state - c.cluster.updateNode(node) - return reconcile.Result{Requeue: true, RequeueAfter: stateRetryPeriod}, nil } diff --git a/pkg/controllers/state/pod.go b/pkg/controllers/state/pod.go index 7b3cbc22379b..2049b89359f5 100644 --- a/pkg/controllers/state/pod.go +++ b/pkg/controllers/state/pod.go @@ -55,7 +55,9 @@ func (c *PodController) Reconcile(ctx context.Context, req reconcile.Request) (r return reconcile.Result{}, err } - c.cluster.updatePod(stored) + if err := c.cluster.updatePod(ctx, stored); err != nil { + return reconcile.Result{}, err + } return reconcile.Result{Requeue: true, RequeueAfter: stateRetryPeriod}, nil } diff --git a/pkg/controllers/state/suite_test.go b/pkg/controllers/state/suite_test.go index a891eb1a7bc9..e34671c67c6d 100644 --- a/pkg/controllers/state/suite_test.go +++ b/pkg/controllers/state/suite_test.go @@ -65,7 +65,7 @@ var _ = AfterSuite(func() { var _ = BeforeEach(func() { cloudProvider = &fake.CloudProvider{InstanceTypes: fake.InstanceTypesAssorted()} - cluster = state.NewCluster(ctx, env.Client, cloudProvider) + cluster = state.NewCluster(env.Client, cloudProvider) nodeController = state.NewNodeController(env.Client, cluster) podController = state.NewPodController(env.Client, cluster) provisioner = test.Provisioner(test.ProvisionerOptions{ObjectMeta: metav1.ObjectMeta{Name: "default"}}) @@ -90,7 +90,10 @@ var _ = Describe("Node Resource Level", func() { }}, }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -118,7 +121,10 @@ var _ = Describe("Node Resource Level", func() { }}, }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -153,7 +159,10 @@ var _ = Describe("Node Resource Level", func() { }}, }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -182,7 +191,10 @@ var _ = Describe("Node Resource Level", func() { }}, }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -217,7 +229,10 @@ var _ = Describe("Node Resource Level", func() { }}, }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -256,7 +271,10 @@ var _ = Describe("Node Resource Level", func() { }) node1 := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -279,7 +297,10 @@ var _ = Describe("Node Resource Level", func() { // second node has more capacity node2 := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("8"), }}) @@ -325,7 +346,10 @@ var _ = Describe("Node Resource Level", func() { })) } node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("200"), v1.ResourcePods: resource.MustParse("500"), @@ -401,7 +425,10 @@ var _ = Describe("Node Resource Level", func() { }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), v1.ResourceMemory: resource.MustParse("8Gi"), @@ -450,7 +477,10 @@ var _ = Describe("Pod Anti-Affinity", func() { }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -489,7 +519,10 @@ var _ = Describe("Pod Anti-Affinity", func() { }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -525,7 +558,10 @@ var _ = Describe("Pod Anti-Affinity", func() { }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -571,7 +607,10 @@ var _ = Describe("Pod Anti-Affinity", func() { }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) diff --git a/pkg/controllers/state/hostportusage.go b/pkg/scheduling/hostportusage.go similarity index 82% rename from pkg/controllers/state/hostportusage.go rename to pkg/scheduling/hostportusage.go index 3f01c75af489..8f38e79fd4d1 100644 --- a/pkg/controllers/state/hostportusage.go +++ b/pkg/scheduling/hostportusage.go @@ -12,14 +12,16 @@ See the License for the specific language governing permissions and limitations under the License. */ -package state +package scheduling import ( + "context" "fmt" "net" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -61,17 +63,31 @@ func NewHostPortUsage() *HostPortUsage { } // Add adds a port to the HostPortUsage, returning an error in the case of a conflict -func (u *HostPortUsage) Add(pod *v1.Pod) error { +func (u *HostPortUsage) Add(ctx context.Context, pod *v1.Pod) { + newUsage, err := u.validate(pod) + if err != nil { + logging.FromContext(ctx).Errorf("invariant violated registering host port usage, %s, please file an issue", err) + } + u.reserved = append(u.reserved, newUsage...) +} + +// Validate performs host port conflict validation to allow for determining if we can schedule the pod to the node +// before doing so. +func (u *HostPortUsage) Validate(pod *v1.Pod) error { + _, err := u.validate(pod) + return err +} + +func (u *HostPortUsage) validate(pod *v1.Pod) ([]entry, error) { newUsage := getHostPorts(pod) for _, newEntry := range newUsage { for _, existing := range u.reserved { if newEntry.matches(existing) { - return fmt.Errorf("%s conflicts with existing HostPort configuration %s", newEntry, existing) + return nil, fmt.Errorf("%s conflicts with existing HostPort configuration %s", newEntry, existing) } } } - u.reserved = append(u.reserved, newUsage...) - return nil + return newUsage, nil } // DeletePod deletes all host port usage from the HostPortUsage that were created by the pod with the given name. diff --git a/pkg/scheduling/suite_test.go b/pkg/scheduling/suite_test.go index ef5c1792870b..91b0bcbb5ef6 100644 --- a/pkg/scheduling/suite_test.go +++ b/pkg/scheduling/suite_test.go @@ -15,12 +15,11 @@ limitations under the License. package scheduling_test import ( - v1 "k8s.io/api/core/v1" - "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/scheduling" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" ) var _ = Describe("Scheduling", func() { diff --git a/pkg/scheduling/volumelimits.go b/pkg/scheduling/volumelimits.go new file mode 100644 index 000000000000..d7f49f20cec1 --- /dev/null +++ b/pkg/scheduling/volumelimits.go @@ -0,0 +1,199 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduling + +import ( + "context" + "fmt" + + "knative.dev/pkg/logging" + + v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// VolumeLimits tracks volume limits on a per node basis. The number of volumes that can be mounted varies by instance +// type. We need to be aware and track the mounted volume usage to inform our awareness of which pods can schedule to +// which nodes. +type VolumeLimits struct { + volumes volumeUsage + podVolumes map[types.NamespacedName]volumeUsage + kubeClient client.Client +} + +type volumeUsage map[string]sets.String + +func (u volumeUsage) Add(provisioner string, pvcID string) { + existing, ok := u[provisioner] + if !ok { + existing = sets.NewString() + u[provisioner] = existing + } + existing.Insert(pvcID) +} + +func (u volumeUsage) union(volumes volumeUsage) volumeUsage { + cp := volumeUsage{} + for k, v := range u { + cp[k] = sets.NewString(v.List()...) + } + for k, v := range volumes { + existing, ok := cp[k] + if !ok { + existing = sets.NewString() + cp[k] = existing + } + existing.Insert(v.List()...) + } + return cp +} + +func (u volumeUsage) insert(volumes volumeUsage) { + for k, v := range volumes { + existing, ok := u[k] + if !ok { + existing = sets.NewString() + u[k] = existing + } + existing.Insert(v.List()...) + } +} + +func (u volumeUsage) copy() volumeUsage { + cp := volumeUsage{} + for k, v := range u { + cp[k] = sets.NewString(v.List()...) + } + return cp +} + +func NewVolumeLimits(kubeClient client.Client) *VolumeLimits { + return &VolumeLimits{ + kubeClient: kubeClient, + volumes: volumeUsage{}, + podVolumes: map[types.NamespacedName]volumeUsage{}, + } +} + +func (v *VolumeLimits) Add(ctx context.Context, pod *v1.Pod) { + podVolumes, err := v.validate(ctx, pod) + if err != nil { + logging.FromContext(ctx).Errorf("inconsistent state error adding volume, %s, please file an issue", err) + } + v.podVolumes[client.ObjectKeyFromObject(pod)] = podVolumes + v.volumes = v.volumes.union(podVolumes) +} + +type VolumeCount map[string]int + +// Exceeds returns true if the volume count exceeds the limits provided. If there is no value for a storage provider, it +// is treated as unlimited. +func (c VolumeCount) Exceeds(limits VolumeCount) bool { + for k, v := range c { + limit, hasLimit := limits[k] + if !hasLimit { + continue + } + if v > limit { + return true + } + } + return false +} + +// Fits returns true if the rhs 'fits' within the volume count. +func (c VolumeCount) Fits(rhs VolumeCount) bool { + for k, v := range rhs { + limit, hasLimit := c[k] + if !hasLimit { + continue + } + if v > limit { + return false + } + } + return true +} + +func (v *VolumeLimits) Validate(ctx context.Context, pod *v1.Pod) (VolumeCount, error) { + podVolumes, err := v.validate(ctx, pod) + if err != nil { + return nil, err + } + result := VolumeCount{} + for k, v := range v.volumes.union(podVolumes) { + result[k] += len(v) + } + return result, nil +} + +func (v *VolumeLimits) validate(ctx context.Context, pod *v1.Pod) (volumeUsage, error) { + podPVCs := volumeUsage{} + + for _, volume := range pod.Spec.Volumes { + var pvcID string + var storageClassName *string + if volume.PersistentVolumeClaim != nil { + var pvc v1.PersistentVolumeClaim + if err := v.kubeClient.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: volume.PersistentVolumeClaim.ClaimName}, &pvc); err != nil { + return nil, err + } + + pvcID = fmt.Sprintf("%s/%s", pod.Namespace, volume.PersistentVolumeClaim.ClaimName) + storageClassName = pvc.Spec.StorageClassName + } else if volume.Ephemeral != nil { + // generated name per https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/#persistentvolumeclaim-naming + pvcID = fmt.Sprintf("%s/%s-%s", pod.Namespace, pod.Name, volume.Name) + storageClassName = volume.Ephemeral.VolumeClaimTemplate.Spec.StorageClassName + } else { + continue + } + + provisioner := "unspecified" + if storageClassName != nil { + var sc storagev1.StorageClass + if err := v.kubeClient.Get(ctx, client.ObjectKey{Name: *storageClassName}, &sc); err != nil { + return nil, err + } + provisioner = sc.Provisioner + } + podPVCs.Add(provisioner, pvcID) + } + return podPVCs, nil +} + +func (v *VolumeLimits) DeletePod(key types.NamespacedName) { + delete(v.podVolumes, key) + // volume names could be duplicated, so we re-create our volumes + v.volumes = volumeUsage{} + for _, c := range v.podVolumes { + v.volumes.insert(c) + } +} + +func (v *VolumeLimits) Copy() *VolumeLimits { + cp := &VolumeLimits{ + kubeClient: v.kubeClient, + volumes: v.volumes.copy(), + podVolumes: map[types.NamespacedName]volumeUsage{}, + } + for k, v := range v.podVolumes { + cp.podVolumes[k] = v.copy() + } + return cp +} diff --git a/pkg/test/storage.go b/pkg/test/storage.go index 8386cb81636b..ab9b507dade9 100644 --- a/pkg/test/storage.go +++ b/pkg/test/storage.go @@ -17,6 +17,8 @@ package test import ( "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/imdario/mergo" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" @@ -77,7 +79,8 @@ func PersistentVolumeClaim(overrides ...PersistentVolumeClaimOptions) *v1.Persis type StorageClassOptions struct { metav1.ObjectMeta - Zones []string + Zones []string + Provisioner *string } func StorageClass(overrides ...StorageClassOptions) *storagev1.StorageClass { @@ -92,10 +95,13 @@ func StorageClass(overrides ...StorageClassOptions) *storagev1.StorageClass { if options.Zones != nil { allowedTopologies = []v1.TopologySelectorTerm{{MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{{Key: v1.LabelTopologyZone, Values: options.Zones}}}} } + if options.Provisioner == nil { + options.Provisioner = aws.String("test-provisioner") + } return &storagev1.StorageClass{ ObjectMeta: ObjectMeta(options.ObjectMeta), - Provisioner: "test-provisioner", + Provisioner: *options.Provisioner, AllowedTopologies: allowedTopologies, } }