diff --git a/charts/karpenter/templates/clusterrole.yaml b/charts/karpenter/templates/clusterrole.yaml index 7d0f318beca0..8a8e30f6cf7c 100644 --- a/charts/karpenter/templates/clusterrole.yaml +++ b/charts/karpenter/templates/clusterrole.yaml @@ -39,3 +39,6 @@ rules: - apiGroups: ["admissionregistration.k8s.io"] resources: ["validatingwebhookconfigurations", "mutatingwebhookconfigurations"] verbs: ["get", "watch", "list", "update"] + - apiGroups: ["storage.k8s.io"] + resources: ["csinodes"] + verbs: ["get", "watch", "list"] \ No newline at end of file diff --git a/cmd/controller/main.go b/cmd/controller/main.go index af317ebd7457..21c6a3408d7f 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -47,7 +47,6 @@ import ( metricspod "github.com/aws/karpenter/pkg/controllers/metrics/pod" metricsprovisioner "github.com/aws/karpenter/pkg/controllers/metrics/provisioner" "github.com/aws/karpenter/pkg/controllers/node" - "github.com/aws/karpenter/pkg/controllers/persistentvolumeclaim" "github.com/aws/karpenter/pkg/controllers/provisioning" "github.com/aws/karpenter/pkg/controllers/state" "github.com/aws/karpenter/pkg/controllers/termination" @@ -110,13 +109,12 @@ func main() { logging.FromContext(ctx).Errorf("watching configmaps, config changes won't be applied immediately, %s", err) } - cluster := state.NewCluster(ctx, manager.GetClient(), cloudProvider) + cluster := state.NewCluster(manager.GetClient(), cloudProvider) if err := manager.RegisterControllers(ctx, provisioning.NewController(ctx, cfg, manager.GetClient(), clientSet.CoreV1(), recorder, cloudProvider, cluster), state.NewNodeController(manager.GetClient(), cluster), state.NewPodController(manager.GetClient(), cluster), - persistentvolumeclaim.NewController(manager.GetClient()), termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider), node.NewController(manager.GetClient(), cloudProvider), metricspod.NewController(manager.GetClient()), diff --git a/hack/docs/metrics_gen_docs.go b/hack/docs/metrics_gen_docs.go index fe5f461881ba..0eb3de01ae12 100644 --- a/hack/docs/metrics_gen_docs.go +++ b/hack/docs/metrics_gen_docs.go @@ -205,6 +205,12 @@ func handleVariableDeclaration(v *ast.GenDecl) []metricInfo { } func getFuncPackage(fun ast.Expr) string { + if pexpr, ok := fun.(*ast.ParenExpr); ok { + return getFuncPackage(pexpr.X) + } + if sexpr, ok := fun.(*ast.StarExpr); ok { + return getFuncPackage(sexpr.X) + } if sel, ok := fun.(*ast.SelectorExpr); ok { return fmt.Sprintf("%s", sel.X) } diff --git a/pkg/cloudprovider/aws/instancetype.go b/pkg/cloudprovider/aws/instancetype.go index efb1e6c7ef33..e447496dc722 100644 --- a/pkg/cloudprovider/aws/instancetype.go +++ b/pkg/cloudprovider/aws/instancetype.go @@ -38,6 +38,8 @@ import ( "github.com/aws/karpenter/pkg/utils/sets" ) +var _ cloudprovider.InstanceType = (*InstanceType)(nil) + type InstanceType struct { *ec2.InstanceTypeInfo offerings []cloudprovider.Offering diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index a1630af8b8fb..149547bb5164 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -130,7 +130,7 @@ var _ = BeforeSuite(func() { kubeClient: e.Client, } registry.RegisterOrDie(ctx, cloudProvider) - cluster = state.NewCluster(ctx, e.Client, cloudProvider) + cluster = state.NewCluster(e.Client, cloudProvider) recorder = test.NewEventRecorder() cfg = test.NewConfig() controller = provisioning.NewController(ctx, cfg, e.Client, clientSet.CoreV1(), recorder, cloudProvider, cluster) diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index c877bc9872c6..dedfcc5bd5bd 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -49,6 +49,9 @@ type CloudProvider struct { CreateCalls []*cloudprovider.NodeRequest } +var _ cloudprovider.CloudProvider = (*CloudProvider)(nil) +var _ cloudprovider.InstanceType = (*InstanceType)(nil) + func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.NodeRequest) (*v1.Node, error) { c.mu.Lock() c.CreateCalls = append(c.CreateCalls, nodeRequest) diff --git a/pkg/config/suite_test.go b/pkg/config/suite_test.go index bd5edb0701b3..d1a12f4680f3 100644 --- a/pkg/config/suite_test.go +++ b/pkg/config/suite_test.go @@ -124,13 +124,7 @@ var _ = Describe("Batch Parameter", func() { var changed int64 cfg.OnChange(func(c config.Config) { defer GinkgoRecover() - // we can't unregister this, so just check for the one case we care about - if atomic.LoadInt64(&changed) == 0 { - atomic.StoreInt64(&changed, 1) - Expect(cfg.BatchIdleDuration()).To(Equal(1 * time.Second)) - // shouldn't be changed - Expect(cfg.BatchMaxDuration()).To(Equal(10 * time.Second)) - } + atomic.StoreInt64(&changed, 1) }) // simulate user updating the config map with a bad max duration diff --git a/pkg/controllers/persistentvolumeclaim/controller.go b/pkg/controllers/persistentvolumeclaim/controller.go deleted file mode 100644 index cc0d220931e9..000000000000 --- a/pkg/controllers/persistentvolumeclaim/controller.go +++ /dev/null @@ -1,126 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package persistentvolumeclaim - -import ( - "context" - "fmt" - - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - "knative.dev/pkg/logging" - controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" - - "github.com/aws/karpenter/pkg/utils/functional" - "github.com/aws/karpenter/pkg/utils/injection" - "github.com/aws/karpenter/pkg/utils/pod" -) - -const ( - controllerName = "volume" - SelectedNodeAnnotation = "volume.kubernetes.io/selected-node" -) - -// Controller for the resource -type Controller struct { - kubeClient client.Client -} - -// NewController is a constructor -func NewController(kubeClient client.Client) *Controller { - return &Controller{kubeClient: kubeClient} -} - -// Register the controller to the manager -func (c *Controller) Register(ctx context.Context, m manager.Manager) error { - return controllerruntime. - NewControllerManagedBy(m). - Named(controllerName). - For(&v1.PersistentVolumeClaim{}). - Watches(&source.Kind{Type: &v1.Pod{}}, handler.EnqueueRequestsFromMapFunc(c.pvcForPod)). - Complete(c) -} - -// Reconcile a control loop for the resource -func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named(controllerName).With("resource", req.String())) - ctx = injection.WithNamespacedName(ctx, req.NamespacedName) - ctx = injection.WithControllerName(ctx, controllerName) - - pvc := &v1.PersistentVolumeClaim{} - if err := c.kubeClient.Get(ctx, req.NamespacedName, pvc); err != nil { - if errors.IsNotFound(err) { - return reconcile.Result{}, nil - } - return reconcile.Result{}, err - } - pod, err := c.podForPvc(ctx, pvc) - if err != nil { - return reconcile.Result{}, err - } - if pod == nil { - return reconcile.Result{}, nil - } - if nodeName, ok := pvc.Annotations[SelectedNodeAnnotation]; ok && nodeName == pod.Spec.NodeName { - return reconcile.Result{}, nil - } - if !c.isBindable(pod) { - return reconcile.Result{}, nil - } - pvc.Annotations = functional.UnionStringMaps(pvc.Annotations, map[string]string{SelectedNodeAnnotation: pod.Spec.NodeName}) - if err := c.kubeClient.Update(ctx, pvc); err != nil { - return reconcile.Result{}, fmt.Errorf("binding persistent volume claim for pod %s/%s to node %q, %w", pod.Namespace, pod.Name, pod.Spec.NodeName, err) - } - logging.FromContext(ctx).Infof("Bound persistent volume claim to node %s", pod.Spec.NodeName) - return reconcile.Result{}, nil -} - -func (c *Controller) podForPvc(ctx context.Context, pvc *v1.PersistentVolumeClaim) (*v1.Pod, error) { - pods := &v1.PodList{} - if err := c.kubeClient.List(ctx, pods, client.InNamespace(pvc.Namespace)); err != nil { - return nil, err - } - for _, pod := range pods.Items { - for _, volume := range pod.Spec.Volumes { - if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvc.Name { - return &pod, nil - } - } - } - return nil, nil -} - -func (c *Controller) pvcForPod(o client.Object) (requests []reconcile.Request) { - if !c.isBindable(o.(*v1.Pod)) { - return requests - } - for _, volume := range o.(*v1.Pod).Spec.Volumes { - if volume.PersistentVolumeClaim == nil { - continue - } - requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: o.GetNamespace(), Name: volume.PersistentVolumeClaim.ClaimName}}) - } - return requests -} - -func (c *Controller) isBindable(p *v1.Pod) bool { - return pod.IsScheduled(p) && !pod.IsTerminal(p) && !pod.IsTerminating(p) -} diff --git a/pkg/controllers/persistentvolumeclaim/suite_test.go b/pkg/controllers/persistentvolumeclaim/suite_test.go deleted file mode 100644 index ade21a137425..000000000000 --- a/pkg/controllers/persistentvolumeclaim/suite_test.go +++ /dev/null @@ -1,88 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package persistentvolumeclaim_test - -import ( - "context" - "strings" - "testing" - - "github.com/Pallinder/go-randomdata" - "github.com/aws/karpenter/pkg/controllers/persistentvolumeclaim" - "github.com/aws/karpenter/pkg/test" - . "github.com/aws/karpenter/pkg/test/expectations" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - v1 "k8s.io/api/core/v1" - . "knative.dev/pkg/logging/testing" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -var ctx context.Context -var controller *persistentvolumeclaim.Controller -var env *test.Environment - -func TestAPIs(t *testing.T) { - ctx = TestContextWithLogger(t) - RegisterFailHandler(Fail) - RunSpecs(t, "Volume") -} - -var _ = BeforeSuite(func() { - env = test.NewEnvironment(ctx, func(e *test.Environment) { - controller = persistentvolumeclaim.NewController(e.Client) - }) - Expect(env.Start()).To(Succeed(), "Failed to start environment") -}) - -var _ = AfterSuite(func() { - Expect(env.Stop()).To(Succeed(), "Failed to stop environment") -}) - -var _ = Describe("Reconcile", func() { - var pvc *v1.PersistentVolumeClaim - - BeforeEach(func() { - pvc = test.PersistentVolumeClaim() - }) - - AfterEach(func() { - ExpectCleanedUp(ctx, env.Client) - }) - - It("should ignore a pvc without pods", func() { - ExpectApplied(ctx, env.Client, pvc) - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(pvc)) - Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(pvc), pvc)).To(Succeed()) - Expect(pvc.Annotations[persistentvolumeclaim.SelectedNodeAnnotation]).To(BeEmpty()) - }) - It("should ignore a pvc with unscheduled or terminal pods", func() { - ExpectApplied(ctx, env.Client, pvc, - test.Pod(test.PodOptions{Phase: v1.PodPending}), - test.Pod(test.PodOptions{NodeName: strings.ToLower(randomdata.SillyName()), Phase: v1.PodSucceeded}), - test.Pod(test.PodOptions{NodeName: strings.ToLower(randomdata.SillyName()), Phase: v1.PodFailed}), - ) - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(pvc)) - Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(pvc), pvc)).To(Succeed()) - Expect(pvc.Annotations[persistentvolumeclaim.SelectedNodeAnnotation]).To(BeEmpty()) - }) - It("should bind a pvc to a pod's node", func() { - pod := test.Pod(test.PodOptions{NodeName: strings.ToLower(randomdata.SillyName()), PersistentVolumeClaims: []string{pvc.Name}}) - ExpectApplied(ctx, env.Client, pvc, pod) - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(pvc)) - Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(pvc), pvc)).To(Succeed()) - Expect(pvc.Annotations[persistentvolumeclaim.SelectedNodeAnnotation]).To(Equal(pod.Spec.NodeName)) - }) -}) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index e19a7659c73b..f98923829f3c 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -224,7 +224,7 @@ func (p *Provisioner) schedule(ctx context.Context, pods []*v1.Pod) ([]*schedule return nil, fmt.Errorf("getting daemon overhead, %w", err) } - return scheduler.NewScheduler(nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods) + return scheduler.NewScheduler(ctx, p.kubeClient, nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods) } func (p *Provisioner) launch(ctx context.Context, node *scheduler.Node) error { diff --git a/pkg/controllers/provisioning/scheduling/inflightnode.go b/pkg/controllers/provisioning/scheduling/inflightnode.go index 839651994c53..b06d4d071df8 100644 --- a/pkg/controllers/provisioning/scheduling/inflightnode.go +++ b/pkg/controllers/provisioning/scheduling/inflightnode.go @@ -15,6 +15,7 @@ limitations under the License. package scheduling import ( + "context" "fmt" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" @@ -35,13 +36,14 @@ type InFlightNode struct { requirements scheduling.Requirements available v1.ResourceList startupTolerations []v1.Toleration - hostPortUsage *state.HostPortUsage + hostPortUsage *scheduling.HostPortUsage + volumeUsage *scheduling.VolumeLimits + volumeLimits scheduling.VolumeCount } func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint, daemonResources v1.ResourceList) *InFlightNode { // the remaining daemonResources to schedule are the total daemonResources minus what has already scheduled remainingDaemonResources := resources.Subtract(daemonResources, n.DaemonSetRequested) - node := &InFlightNode{ Node: n.Node, available: n.Available, @@ -49,6 +51,8 @@ func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint requests: remainingDaemonResources, requirements: scheduling.NewLabelRequirements(n.Node.Labels), hostPortUsage: n.HostPortUsage.Copy(), + volumeUsage: n.VolumeUsage.Copy(), + volumeLimits: n.VolumeLimits, } if n.Node.Labels[v1alpha5.LabelNodeInitialized] != "true" { @@ -81,15 +85,24 @@ func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint return node } -func (n *InFlightNode) Add(pod *v1.Pod) error { +func (n *InFlightNode) Add(ctx context.Context, pod *v1.Pod) error { // Check Taints if err := scheduling.Taints(n.Node.Spec.Taints).Tolerates(pod, n.startupTolerations...); err != nil { return err } - if err := n.hostPortUsage.Add(pod); err != nil { + if err := n.hostPortUsage.Validate(pod); err != nil { + return err + } + + // determine the number of volumes that will be mounted if the pod schedules + mountedVolumeCount, err := n.volumeUsage.Validate(ctx, pod) + if err != nil { return err } + if mountedVolumeCount.Exceeds(n.volumeLimits) { + return fmt.Errorf("would exceed node volume limits") + } // check resource requests first since that's a pretty likely reason the pod won't schedule on an in-flight // node, which at this point can't be increased in size @@ -122,5 +135,7 @@ func (n *InFlightNode) Add(pod *v1.Pod) error { n.requests = requests n.requirements = nodeRequirements n.topology.Record(pod, nodeRequirements) + n.hostPortUsage.Add(ctx, pod) + n.volumeUsage.Add(ctx, pod) return nil } diff --git a/pkg/controllers/provisioning/scheduling/node.go b/pkg/controllers/provisioning/scheduling/node.go index bb67a1b8f7c6..0ceef8b3c1d0 100644 --- a/pkg/controllers/provisioning/scheduling/node.go +++ b/pkg/controllers/provisioning/scheduling/node.go @@ -15,17 +15,16 @@ limitations under the License. package scheduling import ( + "context" "fmt" "strings" "sync/atomic" - v1 "k8s.io/api/core/v1" - "github.com/samber/lo" + v1 "k8s.io/api/core/v1" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" - "github.com/aws/karpenter/pkg/controllers/state" "github.com/aws/karpenter/pkg/scheduling" "github.com/aws/karpenter/pkg/utils/resources" "github.com/aws/karpenter/pkg/utils/sets" @@ -40,7 +39,7 @@ type Node struct { topology *Topology requests v1.ResourceList - hostPortUsage *state.HostPortUsage + hostPortUsage *scheduling.HostPortUsage } var nodeID int64 @@ -54,19 +53,20 @@ func NewNode(nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonRe return &Node{ NodeTemplate: template, InstanceTypeOptions: instanceTypes, - hostPortUsage: state.NewHostPortUsage(), + hostPortUsage: scheduling.NewHostPortUsage(), topology: topology, requests: daemonResources, } } -func (n *Node) Add(pod *v1.Pod) error { +func (n *Node) Add(ctx context.Context, pod *v1.Pod) error { // Check Taints if err := n.Taints.Tolerates(pod); err != nil { return err } - if err := n.hostPortUsage.Add(pod); err != nil { + // exposed host ports on the node + if err := n.hostPortUsage.Validate(pod); err != nil { return err } @@ -102,6 +102,7 @@ func (n *Node) Add(pod *v1.Pod) error { n.requests = requests n.Requirements = nodeRequirements n.topology.Record(pod, nodeRequirements) + n.hostPortUsage.Add(ctx, pod) return nil } diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 1382962a60a7..7881e90b48c4 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -33,13 +33,15 @@ import ( "github.com/aws/karpenter/pkg/utils/resources" ) -func NewScheduler(nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler { +func NewScheduler(ctx context.Context, kubeClient client.Client, nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler { for provisioner := range instanceTypes { sort.Slice(instanceTypes[provisioner], func(i, j int) bool { return instanceTypes[provisioner][i].Price() < instanceTypes[provisioner][j].Price() }) } s := &Scheduler{ + ctx: ctx, + kubeClient: kubeClient, nodeTemplates: nodeTemplates, topology: topology, cluster: cluster, @@ -84,6 +86,7 @@ func NewScheduler(nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alp } type Scheduler struct { + ctx context.Context nodes []*Node inflight []*InFlightNode nodeTemplates []*scheduling.NodeTemplate @@ -94,6 +97,7 @@ type Scheduler struct { topology *Topology cluster *state.Cluster recorder events.Recorder + kubeClient client.Client } func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) ([]*Node, error) { @@ -112,7 +116,7 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) ([]*Node, error) } // Schedule to existing nodes or create a new node - if errors[pod] = s.add(pod); errors[pod] == nil { + if errors[pod] = s.add(ctx, pod); errors[pod] == nil { continue } @@ -165,10 +169,10 @@ func (s *Scheduler) recordSchedulingResults(ctx context.Context, pods []*v1.Pod, logging.FromContext(ctx).Infof("Computed %d unready node(s) will fit %d pod(s)", inflightCount, existingCount) } -func (s *Scheduler) add(pod *v1.Pod) error { +func (s *Scheduler) add(ctx context.Context, pod *v1.Pod) error { // first try to schedule against an in-flight real node for _, node := range s.inflight { - if err := node.Add(pod); err == nil { + if err := node.Add(ctx, pod); err == nil { return nil } } @@ -178,7 +182,7 @@ func (s *Scheduler) add(pod *v1.Pod) error { // Pick existing node that we are about to create for _, node := range s.nodes { - if err := node.Add(pod); err == nil { + if err := node.Add(ctx, pod); err == nil { return nil } } @@ -197,7 +201,7 @@ func (s *Scheduler) add(pod *v1.Pod) error { } node := NewNode(nodeTemplate, s.topology, s.daemonOverhead[nodeTemplate], instanceTypes) - if err := node.Add(pod); err != nil { + if err := node.Add(ctx, pod); err != nil { errs = multierr.Append(errs, fmt.Errorf("incompatible with provisioner %q, %w", nodeTemplate.ProvisionerName, err)) continue } diff --git a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go index 9349f0e9f12e..d645bd6fa33c 100644 --- a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go +++ b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go @@ -112,10 +112,11 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) { instanceTypes := fake.InstanceTypes(instanceCount) cloudProv := &fake.CloudProvider{InstanceTypes: instanceTypes} - scheduler := NewScheduler( + scheduler := NewScheduler(ctx, + nil, []*scheduling.NodeTemplate{scheduling.NewNodeTemplate(provisioner)}, nil, - state.NewCluster(ctx, nil, cloudProv), + state.NewCluster(nil, cloudProv), &Topology{}, map[string][]cloudprovider.InstanceType{provisioner.Name: instanceTypes}, map[*scheduling.NodeTemplate]v1.ResourceList{}, diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index 283ceb51a479..bd23a1bef861 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -16,6 +16,7 @@ package scheduling_test import ( "context" + "fmt" "math" "math/rand" "strings" @@ -27,6 +28,7 @@ import ( "github.com/Pallinder/go-randomdata" "github.com/aws/aws-sdk-go/aws" v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -72,7 +74,7 @@ var _ = BeforeSuite(func() { instanceTypes, _ := cloudProv.GetInstanceTypes(ctx, nil) // set these on the cloud provider so we can manipulate them if needed cloudProv.InstanceTypes = instanceTypes - cluster = state.NewCluster(ctx, e.Client, cloudProv) + cluster = state.NewCluster(e.Client, cloudProv) nodeStateController = state.NewNodeController(e.Client, cluster) podStateController = state.NewPodController(e.Client, cluster) recorder = test.NewEventRecorder() @@ -2958,11 +2960,11 @@ var _ = Describe("Instance Type Compatibility", func() { ExpectApplied(ctx, env.Client, provisioner) pods := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{ - fake.LabelInstanceSize: "large", + fake.LabelInstanceSize: "large", v1.LabelInstanceTypeStable: cloudProv.InstanceTypes[0].Name(), }}), test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{ - fake.LabelInstanceSize: "small", + fake.LabelInstanceSize: "small", v1.LabelInstanceTypeStable: cloudProv.InstanceTypes[4].Name(), }}), ) @@ -3470,6 +3472,7 @@ var _ = Describe("In-Flight Nodes", func() { }} initialPod := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod(opts)) node1 := ExpectScheduled(ctx, env.Client, initialPod[0]) + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) // the node will have 2000m CPU, so these two pods can't both fit on it opts.ResourceRequirements.Limits[v1.ResourceCPU] = resource.MustParse("1") @@ -3486,6 +3489,7 @@ var _ = Describe("In-Flight Nodes", func() { }} initialPod := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod(opts)) node1 := ExpectScheduled(ctx, env.Client, initialPod[0]) + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) secondPod := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1.LabelArchStable: "arm64"}})) @@ -3848,6 +3852,138 @@ var _ = Describe("No Pre-Binding", func() { }) }) +var _ = Describe("Volume Limits", func() { + It("should launch multiple nodes if required due to volume limits", func() { + const csiProvider = "fake.csi.provider" + cloudProv.InstanceTypes = []cloudprovider.InstanceType{ + fake.NewInstanceType( + fake.InstanceTypeOptions{ + Name: "instance-type", + Resources: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1024"), + v1.ResourcePods: resource.MustParse("1024"), + }, + }), + } + + provisioner.Spec.Limits = nil + ExpectApplied(ctx, env.Client, provisioner) + initialPods := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod()) + node := ExpectScheduled(ctx, env.Client, initialPods[0]) + csiNode := &storagev1.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: node.Name, + }, + Spec: storagev1.CSINodeSpec{ + Drivers: []storagev1.CSINodeDriver{ + { + Name: csiProvider, + NodeID: "fake-node-id", + Allocatable: &storagev1.VolumeNodeResources{ + Count: aws.Int32(10), + }, + }, + }, + }, + } + ExpectApplied(ctx, env.Client, csiNode) + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) + + sc := test.StorageClass(test.StorageClassOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-storage-class"}, + Provisioner: aws.String(csiProvider), + Zones: []string{"test-zone-1"}}) + ExpectApplied(ctx, env.Client, sc) + + var pods []*v1.Pod + for i := 0; i < 6; i++ { + pvcA := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{ + StorageClassName: aws.String("my-storage-class"), + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("my-claim-a-%d", i)}, + }) + pvcB := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{ + StorageClassName: aws.String("my-storage-class"), + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("my-claim-b-%d", i)}, + }) + ExpectApplied(ctx, env.Client, pvcA, pvcB) + pods = append(pods, test.UnschedulablePod(test.PodOptions{ + PersistentVolumeClaims: []string{pvcA.Name, pvcB.Name}, + })) + } + ExpectProvisioned(ctx, env.Client, controller, pods...) + var nodeList v1.NodeList + Expect(env.Client.List(ctx, &nodeList)).To(Succeed()) + // we need to create a new node as the in-flight one can only contain 5 pods due to the CSINode volume limit + Expect(nodeList.Items).To(HaveLen(2)) + }) + It("should launch a single node if all pods use the same PVC", func() { + const csiProvider = "fake.csi.provider" + cloudProv.InstanceTypes = []cloudprovider.InstanceType{ + fake.NewInstanceType( + fake.InstanceTypeOptions{ + Name: "instance-type", + Resources: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1024"), + v1.ResourcePods: resource.MustParse("1024"), + }, + }), + } + + provisioner.Spec.Limits = nil + ExpectApplied(ctx, env.Client, provisioner) + initialPods := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod()) + node := ExpectScheduled(ctx, env.Client, initialPods[0]) + csiNode := &storagev1.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: node.Name, + }, + Spec: storagev1.CSINodeSpec{ + Drivers: []storagev1.CSINodeDriver{ + { + Name: csiProvider, + NodeID: "fake-node-id", + Allocatable: &storagev1.VolumeNodeResources{ + Count: aws.Int32(10), + }, + }, + }, + }, + } + ExpectApplied(ctx, env.Client, csiNode) + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) + + sc := test.StorageClass(test.StorageClassOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-storage-class"}, + Provisioner: aws.String(csiProvider), + Zones: []string{"test-zone-1"}}) + ExpectApplied(ctx, env.Client, sc) + + pv := test.PersistentVolume(test.PersistentVolumeOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-volume"}, + Zones: []string{"test-zone-1"}}) + + pvc := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-claim"}, + StorageClassName: aws.String("my-storage-class"), + VolumeName: pv.Name, + }) + ExpectApplied(ctx, env.Client, pv, pvc) + + var pods []*v1.Pod + for i := 0; i < 100; i++ { + pods = append(pods, test.UnschedulablePod(test.PodOptions{ + PersistentVolumeClaims: []string{pvc.Name, pvc.Name}, + })) + } + ExpectApplied(ctx, env.Client, provisioner) + ExpectProvisioned(ctx, env.Client, controller, pods...) + var nodeList v1.NodeList + Expect(env.Client.List(ctx, &nodeList)).To(Succeed()) + // 100 of the same PVC should all be schedulable on the same node + Expect(nodeList.Items).To(HaveLen(1)) + }) +}) + func MakePods(count int, options test.PodOptions) (pods []*v1.Pod) { for i := 0; i < count; i++ { pods = append(pods, test.UnschedulablePod(options)) diff --git a/pkg/controllers/provisioning/suite_test.go b/pkg/controllers/provisioning/suite_test.go index 6bc8861ebcd9..8632b7f5de7d 100644 --- a/pkg/controllers/provisioning/suite_test.go +++ b/pkg/controllers/provisioning/suite_test.go @@ -66,7 +66,7 @@ var _ = BeforeSuite(func() { for _, it := range instanceTypes { instanceTypeMap[it.Name()] = it } - cluster := state.NewCluster(ctx, e.Client, cloudProvider) + cluster := state.NewCluster(e.Client, cloudProvider) controller = provisioning.NewController(ctx, cfg, e.Client, corev1.NewForConfigOrDie(e.Config), recorder, cloudProvider, cluster) }) Expect(env.Start()).To(Succeed(), "Failed to start environment") diff --git a/pkg/controllers/state/cluster.go b/pkg/controllers/state/cluster.go index ed96ae5b5a0f..ff37cd3b7133 100644 --- a/pkg/controllers/state/cluster.go +++ b/pkg/controllers/state/cluster.go @@ -20,23 +20,25 @@ import ( "sort" "sync" - "github.com/aws/karpenter/pkg/cloudprovider" - - "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" + "go.uber.org/multierr" - "knative.dev/pkg/logging" + "github.com/aws/aws-sdk-go/aws" + "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" + "github.com/aws/karpenter/pkg/cloudprovider" + "github.com/aws/karpenter/pkg/scheduling" podutils "github.com/aws/karpenter/pkg/utils/pod" "github.com/aws/karpenter/pkg/utils/resources" ) // Cluster maintains cluster state that is often needed but expensive to compute. type Cluster struct { - ctx context.Context kubeClient client.Client cloudProvider cloudprovider.CloudProvider @@ -49,9 +51,8 @@ type Cluster struct { bindings map[types.NamespacedName]string // pod namespaced named -> node name } -func NewCluster(ctx context.Context, client client.Client, cp cloudprovider.CloudProvider) *Cluster { +func NewCluster(client client.Client, cp cloudprovider.CloudProvider) *Cluster { c := &Cluster{ - ctx: ctx, kubeClient: client, cloudProvider: cp, nodes: map[string]*Node{}, @@ -76,12 +77,14 @@ type Node struct { // included in the calculation for Available. DaemonSetRequested v1.ResourceList // HostPort usage of all pods that are bound to the node - HostPortUsage *HostPortUsage + HostPortUsage *scheduling.HostPortUsage + VolumeUsage *scheduling.VolumeLimits + VolumeLimits scheduling.VolumeCount + InstanceType cloudprovider.InstanceType // Provisioner is the provisioner used to create the node. Provisioner *v1alpha5.Provisioner - podRequests map[types.NamespacedName]v1.ResourceList - InstanceType cloudprovider.InstanceType + podRequests map[types.NamespacedName]v1.ResourceList } // ForPodsWithAntiAffinity calls the supplied function once for each pod with required anti affinity terms that is @@ -126,33 +129,30 @@ func (c *Cluster) ForEachNode(f func(n *Node) bool) { } } -func (c *Cluster) newNode(node *v1.Node) *Node { +// newNode always returns a node, even if some portion of the update has failed +func (c *Cluster) newNode(ctx context.Context, node *v1.Node) (*Node, error) { n := &Node{ Node: node, - HostPortUsage: NewHostPortUsage(), + HostPortUsage: scheduling.NewHostPortUsage(), + VolumeUsage: scheduling.NewVolumeLimits(c.kubeClient), + VolumeLimits: scheduling.VolumeCount{}, podRequests: map[types.NamespacedName]v1.ResourceList{}, } - - // store the provisioner if it exists - if provisionerName, ok := node.Labels[v1alpha5.ProvisionerNameLabelKey]; ok { - var provisioner v1alpha5.Provisioner - if err := c.kubeClient.Get(c.ctx, client.ObjectKey{Name: provisionerName}, &provisioner); err != nil { - logging.FromContext(c.ctx).Errorf("getting provisioner, %s", err) - } else { - n.Provisioner = &provisioner - } - } - - var err error - - n.InstanceType, err = c.getInstanceType(c.ctx, n.Provisioner, node.Labels[v1.LabelInstanceTypeStable]) - if err != nil { - logging.FromContext(c.ctx).Errorf("getting instance type, %s", err) + if err := multierr.Combine( + c.populateProvisioner(ctx, node, n), + c.populateInstanceType(ctx, node, n), + c.populateVolumeLimits(ctx, node, n), + c.populateResourceRequests(ctx, node, n), + ); err != nil { + return nil, err } + return n, nil +} +func (c *Cluster) populateResourceRequests(ctx context.Context, node *v1.Node, n *Node) error { var pods v1.PodList - if err := c.kubeClient.List(c.ctx, &pods, client.MatchingFields{"spec.nodeName": node.Name}); err != nil { - logging.FromContext(c.ctx).Errorf("listing pods, %s", err) + if err := c.kubeClient.List(ctx, &pods, client.MatchingFields{"spec.nodeName": node.Name}); err != nil { + return fmt.Errorf("listing pods, %w", err) } var requested []v1.ResourceList var daemonsetRequested []v1.ResourceList @@ -167,9 +167,8 @@ func (c *Cluster) newNode(node *v1.Node) *Node { daemonsetRequested = append(daemonsetRequested, requests) } requested = append(requested, requests) - if err := n.HostPortUsage.Add(pod); err != nil { - logging.FromContext(c.ctx).Errorf("inconsistent state, error tracking host port usage on node %s, %s", n.Node.Name, err) - } + n.HostPortUsage.Add(ctx, pod) + n.VolumeUsage.Add(ctx, pod) } n.DaemonSetRequested = resources.Merge(daemonsetRequested...) @@ -179,23 +178,44 @@ func (c *Cluster) newNode(node *v1.Node) *Node { if len(n.Capacity) == 0 && n.InstanceType != nil { n.Capacity = n.InstanceType.Resources() } - n.Available = resources.Subtract(c.getNodeAllocatable(node, n.Provisioner), resources.Merge(requested...)) - return n + n.Available = resources.Subtract(c.getNodeAllocatable(node, n), resources.Merge(requested...)) + return nil } -// getNodeAllocatable gets the allocatable resources for the node. -func (c *Cluster) getNodeAllocatable(node *v1.Node, provisioner *v1alpha5.Provisioner) v1.ResourceList { - instanceType, err := c.getInstanceType(c.ctx, provisioner, node.Labels[v1.LabelInstanceTypeStable]) - if err != nil { - logging.FromContext(c.ctx).Errorf("error finding instance type, %s", err) - return node.Status.Allocatable +func (c *Cluster) populateVolumeLimits(ctx context.Context, node *v1.Node, n *Node) error { + var csiNode storagev1.CSINode + if err := c.kubeClient.Get(ctx, client.ObjectKey{Name: node.Name}, &csiNode); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("getting CSINode to determine volume limit for %s, %w", node.Name, err) } + for _, driver := range csiNode.Spec.Drivers { + if driver.Allocatable == nil { + continue + } + n.VolumeLimits[driver.Name] = int(aws.Int32Value(driver.Allocatable.Count)) + } + return nil +} + +func (c *Cluster) populateProvisioner(ctx context.Context, node *v1.Node, n *Node) error { + // store the provisioner if it exists + if provisionerName, ok := node.Labels[v1alpha5.ProvisionerNameLabelKey]; ok { + var provisioner v1alpha5.Provisioner + if err := c.kubeClient.Get(ctx, client.ObjectKey{Name: provisionerName}, &provisioner); err != nil { + return fmt.Errorf("getting provisioner, %w", err) + } + n.Provisioner = &provisioner + } + return nil +} + +// getNodeAllocatable gets the allocatable resources for the node. +func (c *Cluster) getNodeAllocatable(node *v1.Node, n *Node) v1.ResourceList { // If the node is ready, don't take into consideration possible kubelet resource zeroing. This is to handle the // case where a node comes up with a resource and the hardware fails in some way so that the device-plugin zeros // out the resource. We don't want to assume that it will always come back. The instance type may be nil if // the node was created from a provisioner that has since been deleted. - if instanceType == nil || node.Labels[v1alpha5.LabelNodeInitialized] == "true" { + if n.InstanceType == nil || node.Labels[v1alpha5.LabelNodeInitialized] == "true" { return node.Status.Allocatable } @@ -203,7 +223,7 @@ func (c *Cluster) getNodeAllocatable(node *v1.Node, provisioner *v1alpha5.Provis for k, v := range node.Status.Allocatable { allocatable[k] = v } - for resourceName, quantity := range instanceType.Resources() { + for resourceName, quantity := range n.InstanceType.Resources() { // kubelet will zero out both the capacity and allocatable for an extended resource on startup if resources.IsZero(node.Status.Capacity[resourceName]) && resources.IsZero(node.Status.Allocatable[resourceName]) && @@ -221,10 +241,17 @@ func (c *Cluster) deleteNode(nodeName string) { } // updateNode is called for every node reconciliation -func (c *Cluster) updateNode(node *v1.Node) { +func (c *Cluster) updateNode(ctx context.Context, node *v1.Node) error { c.mu.Lock() defer c.mu.Unlock() - c.nodes[node.Name] = c.newNode(node) + n, err := c.newNode(ctx, node) + if err != nil { + // ensure that the out of date node is forgotten + delete(c.nodes, node.Name) + return err + } + c.nodes[node.Name] = n + return nil } // deletePod is called when the pod has been deleted @@ -253,6 +280,7 @@ func (c *Cluster) updateNodeUsageFromPodDeletion(podKey types.NamespacedName) { n.Available = resources.Merge(n.Available, n.podRequests[podKey]) delete(n.podRequests, podKey) n.HostPortUsage.DeletePod(podKey) + n.VolumeUsage.DeletePod(podKey) // We can't easily track the changes to the DaemonsetRequested here as we no longer have the pod. We could keep up // with this separately, but if a daemonset pod is being deleted, it usually means the node is going down. In the @@ -260,9 +288,10 @@ func (c *Cluster) updateNodeUsageFromPodDeletion(podKey types.NamespacedName) { } // updatePod is called every time the pod is reconciled -func (c *Cluster) updatePod(pod *v1.Pod) { - c.updateNodeUsageFromPod(pod) +func (c *Cluster) updatePod(ctx context.Context, pod *v1.Pod) error { + err := c.updateNodeUsageFromPod(ctx, pod) c.updatePodAntiAffinities(pod) + return err } func (c *Cluster) updatePodAntiAffinities(pod *v1.Pod) { @@ -279,10 +308,10 @@ func (c *Cluster) updatePodAntiAffinities(pod *v1.Pod) { // updateNodeUsageFromPod is called every time a reconcile event occurs for the pod. If the pods binding has changed // (unbound to bound), we need to update the resource requests on the node. -func (c *Cluster) updateNodeUsageFromPod(pod *v1.Pod) { +func (c *Cluster) updateNodeUsageFromPod(ctx context.Context, pod *v1.Pod) error { // nothing to do if the pod isn't bound, checking early allows avoiding unnecessary locking if pod.Spec.NodeName == "" { - return + return nil } c.mu.Lock() @@ -293,11 +322,10 @@ func (c *Cluster) updateNodeUsageFromPod(pod *v1.Pod) { if bindingKnown { if oldNodeName == pod.Spec.NodeName { // we are already tracking the pod binding, so nothing to update - return + return nil } // the pod has switched nodes, this can occur if a pod name was re-used and it was deleted/re-created rapidly, // binding to a different node the second time - logging.FromContext(c.ctx).Infof("pod %s has moved from node %s to %s", podKey, oldNodeName, pod.Spec.NodeName) n, ok := c.nodes[oldNodeName] if ok { // we were tracking the old node, so we need to reduce its capacity by the amount of the pod that has @@ -313,14 +341,19 @@ func (c *Cluster) updateNodeUsageFromPod(pod *v1.Pod) { n, ok := c.nodes[pod.Spec.NodeName] if !ok { var node v1.Node - if err := c.kubeClient.Get(c.ctx, client.ObjectKey{Name: pod.Spec.NodeName}, &node); err != nil { - logging.FromContext(c.ctx).Errorf("getting node, %s", err) + if err := c.kubeClient.Get(ctx, client.ObjectKey{Name: pod.Spec.NodeName}, &node); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("getting node, %w", err) } + var err error // node didn't exist, but creating it will pick up this newly bound pod as well - n = c.newNode(&node) - c.nodes[pod.Spec.NodeName] = n - return + n, err = c.newNode(ctx, &node) + if err != nil { + // no need to delete c.nodes[node.Name] as it wasn't stored previously + return err + } + c.nodes[node.Name] = n + return nil } // sum the newly bound pod's requests into the existing node and record the binding @@ -331,26 +364,27 @@ func (c *Cluster) updateNodeUsageFromPod(pod *v1.Pod) { if podutils.IsOwnedByDaemonSet(pod) { n.DaemonSetRequested = resources.Merge(n.DaemonSetRequested, podRequests) } - if err := n.HostPortUsage.Add(pod); err != nil { - logging.FromContext(c.ctx).Errorf("inconsistent state, error tracking host port usage on node %s, %s", n.Node.Name, err) - } + n.HostPortUsage.Add(ctx, pod) + n.VolumeUsage.Add(ctx, pod) n.podRequests[podKey] = podRequests c.bindings[podKey] = n.Node.Name + return nil } -func (c *Cluster) getInstanceType(ctx context.Context, provisioner *v1alpha5.Provisioner, instanceTypeName string) (cloudprovider.InstanceType, error) { - if provisioner == nil { - // no provisioner means we cant lookup the instance type - return nil, nil +func (c *Cluster) populateInstanceType(ctx context.Context, node *v1.Node, n *Node) error { + if n.Provisioner == nil || n.Provisioner.Spec.Provider == nil { + return nil } - instanceTypes, err := c.cloudProvider.GetInstanceTypes(ctx, provisioner) + instanceTypes, err := c.cloudProvider.GetInstanceTypes(ctx, n.Provisioner) if err != nil { - return nil, err + return err } + instanceTypeName := node.Labels[v1.LabelInstanceTypeStable] for _, it := range instanceTypes { if it.Name() == instanceTypeName { - return it, nil + n.InstanceType = it + return nil } } - return nil, fmt.Errorf("instance type '%s' not found", instanceTypeName) + return fmt.Errorf("instance type '%s' not found", instanceTypeName) } diff --git a/pkg/controllers/state/node.go b/pkg/controllers/state/node.go index 3c2f9a2c2bb5..7cd9df63c374 100644 --- a/pkg/controllers/state/node.go +++ b/pkg/controllers/state/node.go @@ -53,9 +53,10 @@ func (c *NodeController) Reconcile(ctx context.Context, req reconcile.Request) ( } return reconcile.Result{}, err } + if err := c.cluster.updateNode(ctx, node); err != nil { + return reconcile.Result{}, err + } // ensure it's aware of any nodes we discover, this is a no-op if the node is already known to our cluster state - c.cluster.updateNode(node) - return reconcile.Result{Requeue: true, RequeueAfter: stateRetryPeriod}, nil } diff --git a/pkg/controllers/state/pod.go b/pkg/controllers/state/pod.go index 7b3cbc22379b..2049b89359f5 100644 --- a/pkg/controllers/state/pod.go +++ b/pkg/controllers/state/pod.go @@ -55,7 +55,9 @@ func (c *PodController) Reconcile(ctx context.Context, req reconcile.Request) (r return reconcile.Result{}, err } - c.cluster.updatePod(stored) + if err := c.cluster.updatePod(ctx, stored); err != nil { + return reconcile.Result{}, err + } return reconcile.Result{Requeue: true, RequeueAfter: stateRetryPeriod}, nil } diff --git a/pkg/controllers/state/suite_test.go b/pkg/controllers/state/suite_test.go index a891eb1a7bc9..e34671c67c6d 100644 --- a/pkg/controllers/state/suite_test.go +++ b/pkg/controllers/state/suite_test.go @@ -65,7 +65,7 @@ var _ = AfterSuite(func() { var _ = BeforeEach(func() { cloudProvider = &fake.CloudProvider{InstanceTypes: fake.InstanceTypesAssorted()} - cluster = state.NewCluster(ctx, env.Client, cloudProvider) + cluster = state.NewCluster(env.Client, cloudProvider) nodeController = state.NewNodeController(env.Client, cluster) podController = state.NewPodController(env.Client, cluster) provisioner = test.Provisioner(test.ProvisionerOptions{ObjectMeta: metav1.ObjectMeta{Name: "default"}}) @@ -90,7 +90,10 @@ var _ = Describe("Node Resource Level", func() { }}, }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -118,7 +121,10 @@ var _ = Describe("Node Resource Level", func() { }}, }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -153,7 +159,10 @@ var _ = Describe("Node Resource Level", func() { }}, }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -182,7 +191,10 @@ var _ = Describe("Node Resource Level", func() { }}, }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -217,7 +229,10 @@ var _ = Describe("Node Resource Level", func() { }}, }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -256,7 +271,10 @@ var _ = Describe("Node Resource Level", func() { }) node1 := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -279,7 +297,10 @@ var _ = Describe("Node Resource Level", func() { // second node has more capacity node2 := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("8"), }}) @@ -325,7 +346,10 @@ var _ = Describe("Node Resource Level", func() { })) } node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("200"), v1.ResourcePods: resource.MustParse("500"), @@ -401,7 +425,10 @@ var _ = Describe("Node Resource Level", func() { }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), v1.ResourceMemory: resource.MustParse("8Gi"), @@ -450,7 +477,10 @@ var _ = Describe("Pod Anti-Affinity", func() { }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -489,7 +519,10 @@ var _ = Describe("Pod Anti-Affinity", func() { }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -525,7 +558,10 @@ var _ = Describe("Pod Anti-Affinity", func() { }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) @@ -571,7 +607,10 @@ var _ = Describe("Pod Anti-Affinity", func() { }) node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(), + }}, Allocatable: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("4"), }}) diff --git a/pkg/controllers/state/hostportusage.go b/pkg/scheduling/hostportusage.go similarity index 82% rename from pkg/controllers/state/hostportusage.go rename to pkg/scheduling/hostportusage.go index 3f01c75af489..8f38e79fd4d1 100644 --- a/pkg/controllers/state/hostportusage.go +++ b/pkg/scheduling/hostportusage.go @@ -12,14 +12,16 @@ See the License for the specific language governing permissions and limitations under the License. */ -package state +package scheduling import ( + "context" "fmt" "net" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -61,17 +63,31 @@ func NewHostPortUsage() *HostPortUsage { } // Add adds a port to the HostPortUsage, returning an error in the case of a conflict -func (u *HostPortUsage) Add(pod *v1.Pod) error { +func (u *HostPortUsage) Add(ctx context.Context, pod *v1.Pod) { + newUsage, err := u.validate(pod) + if err != nil { + logging.FromContext(ctx).Errorf("invariant violated registering host port usage, %s, please file an issue", err) + } + u.reserved = append(u.reserved, newUsage...) +} + +// Validate performs host port conflict validation to allow for determining if we can schedule the pod to the node +// before doing so. +func (u *HostPortUsage) Validate(pod *v1.Pod) error { + _, err := u.validate(pod) + return err +} + +func (u *HostPortUsage) validate(pod *v1.Pod) ([]entry, error) { newUsage := getHostPorts(pod) for _, newEntry := range newUsage { for _, existing := range u.reserved { if newEntry.matches(existing) { - return fmt.Errorf("%s conflicts with existing HostPort configuration %s", newEntry, existing) + return nil, fmt.Errorf("%s conflicts with existing HostPort configuration %s", newEntry, existing) } } } - u.reserved = append(u.reserved, newUsage...) - return nil + return newUsage, nil } // DeletePod deletes all host port usage from the HostPortUsage that were created by the pod with the given name. diff --git a/pkg/scheduling/suite_test.go b/pkg/scheduling/suite_test.go index ef5c1792870b..91b0bcbb5ef6 100644 --- a/pkg/scheduling/suite_test.go +++ b/pkg/scheduling/suite_test.go @@ -15,12 +15,11 @@ limitations under the License. package scheduling_test import ( - v1 "k8s.io/api/core/v1" - "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/scheduling" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" ) var _ = Describe("Scheduling", func() { diff --git a/pkg/scheduling/volumelimits.go b/pkg/scheduling/volumelimits.go new file mode 100644 index 000000000000..d7f49f20cec1 --- /dev/null +++ b/pkg/scheduling/volumelimits.go @@ -0,0 +1,199 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduling + +import ( + "context" + "fmt" + + "knative.dev/pkg/logging" + + v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// VolumeLimits tracks volume limits on a per node basis. The number of volumes that can be mounted varies by instance +// type. We need to be aware and track the mounted volume usage to inform our awareness of which pods can schedule to +// which nodes. +type VolumeLimits struct { + volumes volumeUsage + podVolumes map[types.NamespacedName]volumeUsage + kubeClient client.Client +} + +type volumeUsage map[string]sets.String + +func (u volumeUsage) Add(provisioner string, pvcID string) { + existing, ok := u[provisioner] + if !ok { + existing = sets.NewString() + u[provisioner] = existing + } + existing.Insert(pvcID) +} + +func (u volumeUsage) union(volumes volumeUsage) volumeUsage { + cp := volumeUsage{} + for k, v := range u { + cp[k] = sets.NewString(v.List()...) + } + for k, v := range volumes { + existing, ok := cp[k] + if !ok { + existing = sets.NewString() + cp[k] = existing + } + existing.Insert(v.List()...) + } + return cp +} + +func (u volumeUsage) insert(volumes volumeUsage) { + for k, v := range volumes { + existing, ok := u[k] + if !ok { + existing = sets.NewString() + u[k] = existing + } + existing.Insert(v.List()...) + } +} + +func (u volumeUsage) copy() volumeUsage { + cp := volumeUsage{} + for k, v := range u { + cp[k] = sets.NewString(v.List()...) + } + return cp +} + +func NewVolumeLimits(kubeClient client.Client) *VolumeLimits { + return &VolumeLimits{ + kubeClient: kubeClient, + volumes: volumeUsage{}, + podVolumes: map[types.NamespacedName]volumeUsage{}, + } +} + +func (v *VolumeLimits) Add(ctx context.Context, pod *v1.Pod) { + podVolumes, err := v.validate(ctx, pod) + if err != nil { + logging.FromContext(ctx).Errorf("inconsistent state error adding volume, %s, please file an issue", err) + } + v.podVolumes[client.ObjectKeyFromObject(pod)] = podVolumes + v.volumes = v.volumes.union(podVolumes) +} + +type VolumeCount map[string]int + +// Exceeds returns true if the volume count exceeds the limits provided. If there is no value for a storage provider, it +// is treated as unlimited. +func (c VolumeCount) Exceeds(limits VolumeCount) bool { + for k, v := range c { + limit, hasLimit := limits[k] + if !hasLimit { + continue + } + if v > limit { + return true + } + } + return false +} + +// Fits returns true if the rhs 'fits' within the volume count. +func (c VolumeCount) Fits(rhs VolumeCount) bool { + for k, v := range rhs { + limit, hasLimit := c[k] + if !hasLimit { + continue + } + if v > limit { + return false + } + } + return true +} + +func (v *VolumeLimits) Validate(ctx context.Context, pod *v1.Pod) (VolumeCount, error) { + podVolumes, err := v.validate(ctx, pod) + if err != nil { + return nil, err + } + result := VolumeCount{} + for k, v := range v.volumes.union(podVolumes) { + result[k] += len(v) + } + return result, nil +} + +func (v *VolumeLimits) validate(ctx context.Context, pod *v1.Pod) (volumeUsage, error) { + podPVCs := volumeUsage{} + + for _, volume := range pod.Spec.Volumes { + var pvcID string + var storageClassName *string + if volume.PersistentVolumeClaim != nil { + var pvc v1.PersistentVolumeClaim + if err := v.kubeClient.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: volume.PersistentVolumeClaim.ClaimName}, &pvc); err != nil { + return nil, err + } + + pvcID = fmt.Sprintf("%s/%s", pod.Namespace, volume.PersistentVolumeClaim.ClaimName) + storageClassName = pvc.Spec.StorageClassName + } else if volume.Ephemeral != nil { + // generated name per https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/#persistentvolumeclaim-naming + pvcID = fmt.Sprintf("%s/%s-%s", pod.Namespace, pod.Name, volume.Name) + storageClassName = volume.Ephemeral.VolumeClaimTemplate.Spec.StorageClassName + } else { + continue + } + + provisioner := "unspecified" + if storageClassName != nil { + var sc storagev1.StorageClass + if err := v.kubeClient.Get(ctx, client.ObjectKey{Name: *storageClassName}, &sc); err != nil { + return nil, err + } + provisioner = sc.Provisioner + } + podPVCs.Add(provisioner, pvcID) + } + return podPVCs, nil +} + +func (v *VolumeLimits) DeletePod(key types.NamespacedName) { + delete(v.podVolumes, key) + // volume names could be duplicated, so we re-create our volumes + v.volumes = volumeUsage{} + for _, c := range v.podVolumes { + v.volumes.insert(c) + } +} + +func (v *VolumeLimits) Copy() *VolumeLimits { + cp := &VolumeLimits{ + kubeClient: v.kubeClient, + volumes: v.volumes.copy(), + podVolumes: map[types.NamespacedName]volumeUsage{}, + } + for k, v := range v.podVolumes { + cp.podVolumes[k] = v.copy() + } + return cp +} diff --git a/pkg/test/storage.go b/pkg/test/storage.go index 8386cb81636b..ab9b507dade9 100644 --- a/pkg/test/storage.go +++ b/pkg/test/storage.go @@ -17,6 +17,8 @@ package test import ( "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/imdario/mergo" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" @@ -77,7 +79,8 @@ func PersistentVolumeClaim(overrides ...PersistentVolumeClaimOptions) *v1.Persis type StorageClassOptions struct { metav1.ObjectMeta - Zones []string + Zones []string + Provisioner *string } func StorageClass(overrides ...StorageClassOptions) *storagev1.StorageClass { @@ -92,10 +95,13 @@ func StorageClass(overrides ...StorageClassOptions) *storagev1.StorageClass { if options.Zones != nil { allowedTopologies = []v1.TopologySelectorTerm{{MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{{Key: v1.LabelTopologyZone, Values: options.Zones}}}} } + if options.Provisioner == nil { + options.Provisioner = aws.String("test-provisioner") + } return &storagev1.StorageClass{ ObjectMeta: ObjectMeta(options.ObjectMeta), - Provisioner: "test-provisioner", + Provisioner: *options.Provisioner, AllowedTopologies: allowedTopologies, } }