From 8e5bec64678bf63c2580458eb510157b2c568bba Mon Sep 17 00:00:00 2001
From: Todd Neal <tnealt@amazon.com>
Date: Mon, 13 Jun 2022 09:10:44 -0500
Subject: [PATCH] fix: track volume mounts per node

Assume nodes can support infinite volumes until they launch. Once the CSI driver is
reporting the current number of mountable volumes, use that value which may require
launching more nodes. Remove the PVC controller since we no longer bind pods to
nodes.

Fixes #919 and #1888
---
 charts/karpenter/templates/clusterrole.yaml   |   3 +
 cmd/controller/main.go                        |   4 +-
 hack/docs/metrics_gen_docs.go                 |   6 +
 .../aws/apis/v1alpha1/provider_validation.go  |   2 +-
 pkg/cloudprovider/aws/instancetype.go         |   2 +
 pkg/cloudprovider/aws/instancetypes.go        |   7 +-
 pkg/cloudprovider/aws/suite_test.go           |   2 +-
 pkg/cloudprovider/fake/cloudprovider.go       |   3 +
 pkg/config/suite_test.go                      |   8 +-
 .../persistentvolumeclaim/controller.go       | 126 -----------
 .../persistentvolumeclaim/suite_test.go       |  88 --------
 pkg/controllers/provisioning/provisioner.go   |   2 +-
 .../provisioning/scheduling/inflightnode.go   |  23 +-
 .../provisioning/scheduling/node.go           |  15 +-
 .../provisioning/scheduling/scheduler.go      |  16 +-
 .../scheduling/scheduling_benchmark_test.go   |   5 +-
 .../provisioning/scheduling/suite_test.go     | 142 ++++++++++++-
 pkg/controllers/provisioning/suite_test.go    |   2 +-
 pkg/controllers/state/cluster.go              | 129 +++++++-----
 pkg/controllers/state/node.go                 |   5 +-
 pkg/controllers/state/pod.go                  |   4 +-
 pkg/controllers/state/suite_test.go           |  67 ++++--
 .../state => scheduling}/hostportusage.go     |  26 ++-
 pkg/scheduling/suite_test.go                  |   3 +-
 pkg/scheduling/volumelimits.go                | 199 ++++++++++++++++++
 pkg/test/storage.go                           |  10 +-
 26 files changed, 572 insertions(+), 327 deletions(-)
 delete mode 100644 pkg/controllers/persistentvolumeclaim/controller.go
 delete mode 100644 pkg/controllers/persistentvolumeclaim/suite_test.go
 rename pkg/{controllers/state => scheduling}/hostportusage.go (82%)
 create mode 100644 pkg/scheduling/volumelimits.go

diff --git a/charts/karpenter/templates/clusterrole.yaml b/charts/karpenter/templates/clusterrole.yaml
index 9ea6cf85fd64..fbd14a44cc93 100644
--- a/charts/karpenter/templates/clusterrole.yaml
+++ b/charts/karpenter/templates/clusterrole.yaml
@@ -39,3 +39,6 @@ rules:
   - apiGroups: ["admissionregistration.k8s.io"]
     resources: ["validatingwebhookconfigurations", "mutatingwebhookconfigurations"]
     verbs: ["get", "watch", "list", "update"]
+  - apiGroups: ["storage.k8s.io"]
+    resources: ["csinodes"]
+    verbs: ["get", "watch", "list"]
\ No newline at end of file
diff --git a/cmd/controller/main.go b/cmd/controller/main.go
index 65a666105e8c..bdee7268ebe7 100644
--- a/cmd/controller/main.go
+++ b/cmd/controller/main.go
@@ -52,7 +52,6 @@ import (
 	metricspod "github.com/aws/karpenter/pkg/controllers/metrics/pod"
 	metricsprovisioner "github.com/aws/karpenter/pkg/controllers/metrics/provisioner"
 	"github.com/aws/karpenter/pkg/controllers/node"
-	"github.com/aws/karpenter/pkg/controllers/persistentvolumeclaim"
 	"github.com/aws/karpenter/pkg/controllers/provisioning"
 	"github.com/aws/karpenter/pkg/controllers/termination"
 	"github.com/aws/karpenter/pkg/utils/injection"
@@ -107,13 +106,12 @@ func main() {
 		logging.FromContext(ctx).Errorf("watching configmaps, config changes won't be applied immediately, %s", err)
 	}
 
-	cluster := state.NewCluster(ctx, manager.GetClient(), cloudProvider)
+	cluster := state.NewCluster(manager.GetClient(), cloudProvider)
 
 	if err := manager.RegisterControllers(ctx,
 		provisioning.NewController(ctx, cfg, manager.GetClient(), clientSet.CoreV1(), recorder, cloudProvider, cluster),
 		state.NewNodeController(manager.GetClient(), cluster),
 		state.NewPodController(manager.GetClient(), cluster),
-		persistentvolumeclaim.NewController(manager.GetClient()),
 		termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider),
 		node.NewController(manager.GetClient(), cloudProvider),
 		metricspod.NewController(manager.GetClient()),
diff --git a/hack/docs/metrics_gen_docs.go b/hack/docs/metrics_gen_docs.go
index fe5f461881ba..0eb3de01ae12 100644
--- a/hack/docs/metrics_gen_docs.go
+++ b/hack/docs/metrics_gen_docs.go
@@ -205,6 +205,12 @@ func handleVariableDeclaration(v *ast.GenDecl) []metricInfo {
 }
 
 func getFuncPackage(fun ast.Expr) string {
+	if pexpr, ok := fun.(*ast.ParenExpr); ok {
+		return getFuncPackage(pexpr.X)
+	}
+	if sexpr, ok := fun.(*ast.StarExpr); ok {
+		return getFuncPackage(sexpr.X)
+	}
 	if sel, ok := fun.(*ast.SelectorExpr); ok {
 		return fmt.Sprintf("%s", sel.X)
 	}
diff --git a/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go b/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go
index 710f18fdc50d..c37fa8ddd871 100644
--- a/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go
+++ b/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go
@@ -279,4 +279,4 @@ func (a *AWS) validateKubeletConfiguration(kubeletConfig *v1alpha5.KubeletConfig
 		}
 	}
 	return nil
-}
\ No newline at end of file
+}
diff --git a/pkg/cloudprovider/aws/instancetype.go b/pkg/cloudprovider/aws/instancetype.go
index 5fc3ce19d480..8c650bc4262d 100644
--- a/pkg/cloudprovider/aws/instancetype.go
+++ b/pkg/cloudprovider/aws/instancetype.go
@@ -36,6 +36,8 @@ import (
 	"github.com/aws/karpenter/pkg/utils/sets"
 )
 
+var _ cloudprovider.InstanceType = (*InstanceType)(nil)
+
 type InstanceType struct {
 	*ec2.InstanceTypeInfo
 	offerings    []cloudprovider.Offering
diff --git a/pkg/cloudprovider/aws/instancetypes.go b/pkg/cloudprovider/aws/instancetypes.go
index 8d870cd5d6e5..1f05aaac15f1 100644
--- a/pkg/cloudprovider/aws/instancetypes.go
+++ b/pkg/cloudprovider/aws/instancetypes.go
@@ -89,11 +89,12 @@ func (p *InstanceTypeProvider) newInstanceType(ctx context.Context, info *ec2.In
 		provider:         provider,
 		offerings:        offerings,
 	}
+	opts := injection.GetOptions(ctx)
 	// Precompute to minimize memory/compute overhead
-	instanceType.resources = instanceType.computeResources(injection.GetOptions(ctx).AWSEnablePodENI)
-	instanceType.overhead = instanceType.computeOverhead(injection.GetOptions(ctx).VMMemoryOverhead)
+	instanceType.resources = instanceType.computeResources(opts.AWSEnablePodENI)
+	instanceType.overhead = instanceType.computeOverhead(opts.VMMemoryOverhead)
 	instanceType.requirements = instanceType.computeRequirements()
-	if !injection.GetOptions(ctx).AWSENILimitedPodDensity {
+	if !opts.AWSENILimitedPodDensity {
 		instanceType.maxPods = ptr.Int32(110)
 	}
 	return instanceType
diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go
index bb2d70cbd6cc..835321629ad8 100644
--- a/pkg/cloudprovider/aws/suite_test.go
+++ b/pkg/cloudprovider/aws/suite_test.go
@@ -128,7 +128,7 @@ var _ = BeforeSuite(func() {
 			},
 		}
 		registry.RegisterOrDie(ctx, cloudProvider)
-		cluster = state.NewCluster(ctx, e.Client, cloudProvider)
+		cluster = state.NewCluster(e.Client, cloudProvider)
 		recorder = test.NewEventRecorder()
 		cfg = test.NewConfig()
 		controller = provisioning.NewController(ctx, cfg, e.Client, clientSet.CoreV1(), recorder, cloudProvider, cluster)
diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go
index 47104927a09b..03146a34d730 100644
--- a/pkg/cloudprovider/fake/cloudprovider.go
+++ b/pkg/cloudprovider/fake/cloudprovider.go
@@ -49,6 +49,9 @@ type CloudProvider struct {
 	CreateCalls []*cloudprovider.NodeRequest
 }
 
+var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)
+var _ cloudprovider.InstanceType = (*InstanceType)(nil)
+
 func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.NodeRequest) (*v1.Node, error) {
 	c.mu.Lock()
 	c.CreateCalls = append(c.CreateCalls, nodeRequest)
diff --git a/pkg/config/suite_test.go b/pkg/config/suite_test.go
index bd5edb0701b3..d1a12f4680f3 100644
--- a/pkg/config/suite_test.go
+++ b/pkg/config/suite_test.go
@@ -124,13 +124,7 @@ var _ = Describe("Batch Parameter", func() {
 		var changed int64
 		cfg.OnChange(func(c config.Config) {
 			defer GinkgoRecover()
-			// we can't unregister this, so just check for the one case we care about
-			if atomic.LoadInt64(&changed) == 0 {
-				atomic.StoreInt64(&changed, 1)
-				Expect(cfg.BatchIdleDuration()).To(Equal(1 * time.Second))
-				// shouldn't be changed
-				Expect(cfg.BatchMaxDuration()).To(Equal(10 * time.Second))
-			}
+			atomic.StoreInt64(&changed, 1)
 		})
 
 		// simulate user updating the config map with a bad max duration
diff --git a/pkg/controllers/persistentvolumeclaim/controller.go b/pkg/controllers/persistentvolumeclaim/controller.go
deleted file mode 100644
index cc0d220931e9..000000000000
--- a/pkg/controllers/persistentvolumeclaim/controller.go
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package persistentvolumeclaim
-
-import (
-	"context"
-	"fmt"
-
-	v1 "k8s.io/api/core/v1"
-	"k8s.io/apimachinery/pkg/api/errors"
-	"k8s.io/apimachinery/pkg/types"
-	"knative.dev/pkg/logging"
-	controllerruntime "sigs.k8s.io/controller-runtime"
-	"sigs.k8s.io/controller-runtime/pkg/client"
-	"sigs.k8s.io/controller-runtime/pkg/handler"
-	"sigs.k8s.io/controller-runtime/pkg/manager"
-	"sigs.k8s.io/controller-runtime/pkg/reconcile"
-	"sigs.k8s.io/controller-runtime/pkg/source"
-
-	"github.com/aws/karpenter/pkg/utils/functional"
-	"github.com/aws/karpenter/pkg/utils/injection"
-	"github.com/aws/karpenter/pkg/utils/pod"
-)
-
-const (
-	controllerName         = "volume"
-	SelectedNodeAnnotation = "volume.kubernetes.io/selected-node"
-)
-
-// Controller for the resource
-type Controller struct {
-	kubeClient client.Client
-}
-
-// NewController is a constructor
-func NewController(kubeClient client.Client) *Controller {
-	return &Controller{kubeClient: kubeClient}
-}
-
-// Register the controller to the manager
-func (c *Controller) Register(ctx context.Context, m manager.Manager) error {
-	return controllerruntime.
-		NewControllerManagedBy(m).
-		Named(controllerName).
-		For(&v1.PersistentVolumeClaim{}).
-		Watches(&source.Kind{Type: &v1.Pod{}}, handler.EnqueueRequestsFromMapFunc(c.pvcForPod)).
-		Complete(c)
-}
-
-// Reconcile a control loop for the resource
-func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
-	ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named(controllerName).With("resource", req.String()))
-	ctx = injection.WithNamespacedName(ctx, req.NamespacedName)
-	ctx = injection.WithControllerName(ctx, controllerName)
-
-	pvc := &v1.PersistentVolumeClaim{}
-	if err := c.kubeClient.Get(ctx, req.NamespacedName, pvc); err != nil {
-		if errors.IsNotFound(err) {
-			return reconcile.Result{}, nil
-		}
-		return reconcile.Result{}, err
-	}
-	pod, err := c.podForPvc(ctx, pvc)
-	if err != nil {
-		return reconcile.Result{}, err
-	}
-	if pod == nil {
-		return reconcile.Result{}, nil
-	}
-	if nodeName, ok := pvc.Annotations[SelectedNodeAnnotation]; ok && nodeName == pod.Spec.NodeName {
-		return reconcile.Result{}, nil
-	}
-	if !c.isBindable(pod) {
-		return reconcile.Result{}, nil
-	}
-	pvc.Annotations = functional.UnionStringMaps(pvc.Annotations, map[string]string{SelectedNodeAnnotation: pod.Spec.NodeName})
-	if err := c.kubeClient.Update(ctx, pvc); err != nil {
-		return reconcile.Result{}, fmt.Errorf("binding persistent volume claim for pod %s/%s to node %q, %w", pod.Namespace, pod.Name, pod.Spec.NodeName, err)
-	}
-	logging.FromContext(ctx).Infof("Bound persistent volume claim to node %s", pod.Spec.NodeName)
-	return reconcile.Result{}, nil
-}
-
-func (c *Controller) podForPvc(ctx context.Context, pvc *v1.PersistentVolumeClaim) (*v1.Pod, error) {
-	pods := &v1.PodList{}
-	if err := c.kubeClient.List(ctx, pods, client.InNamespace(pvc.Namespace)); err != nil {
-		return nil, err
-	}
-	for _, pod := range pods.Items {
-		for _, volume := range pod.Spec.Volumes {
-			if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvc.Name {
-				return &pod, nil
-			}
-		}
-	}
-	return nil, nil
-}
-
-func (c *Controller) pvcForPod(o client.Object) (requests []reconcile.Request) {
-	if !c.isBindable(o.(*v1.Pod)) {
-		return requests
-	}
-	for _, volume := range o.(*v1.Pod).Spec.Volumes {
-		if volume.PersistentVolumeClaim == nil {
-			continue
-		}
-		requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: o.GetNamespace(), Name: volume.PersistentVolumeClaim.ClaimName}})
-	}
-	return requests
-}
-
-func (c *Controller) isBindable(p *v1.Pod) bool {
-	return pod.IsScheduled(p) && !pod.IsTerminal(p) && !pod.IsTerminating(p)
-}
diff --git a/pkg/controllers/persistentvolumeclaim/suite_test.go b/pkg/controllers/persistentvolumeclaim/suite_test.go
deleted file mode 100644
index ade21a137425..000000000000
--- a/pkg/controllers/persistentvolumeclaim/suite_test.go
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package persistentvolumeclaim_test
-
-import (
-	"context"
-	"strings"
-	"testing"
-
-	"github.com/Pallinder/go-randomdata"
-	"github.com/aws/karpenter/pkg/controllers/persistentvolumeclaim"
-	"github.com/aws/karpenter/pkg/test"
-	. "github.com/aws/karpenter/pkg/test/expectations"
-	. "github.com/onsi/ginkgo"
-	. "github.com/onsi/gomega"
-	v1 "k8s.io/api/core/v1"
-	. "knative.dev/pkg/logging/testing"
-	"sigs.k8s.io/controller-runtime/pkg/client"
-)
-
-var ctx context.Context
-var controller *persistentvolumeclaim.Controller
-var env *test.Environment
-
-func TestAPIs(t *testing.T) {
-	ctx = TestContextWithLogger(t)
-	RegisterFailHandler(Fail)
-	RunSpecs(t, "Volume")
-}
-
-var _ = BeforeSuite(func() {
-	env = test.NewEnvironment(ctx, func(e *test.Environment) {
-		controller = persistentvolumeclaim.NewController(e.Client)
-	})
-	Expect(env.Start()).To(Succeed(), "Failed to start environment")
-})
-
-var _ = AfterSuite(func() {
-	Expect(env.Stop()).To(Succeed(), "Failed to stop environment")
-})
-
-var _ = Describe("Reconcile", func() {
-	var pvc *v1.PersistentVolumeClaim
-
-	BeforeEach(func() {
-		pvc = test.PersistentVolumeClaim()
-	})
-
-	AfterEach(func() {
-		ExpectCleanedUp(ctx, env.Client)
-	})
-
-	It("should ignore a pvc without pods", func() {
-		ExpectApplied(ctx, env.Client, pvc)
-		ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(pvc))
-		Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(pvc), pvc)).To(Succeed())
-		Expect(pvc.Annotations[persistentvolumeclaim.SelectedNodeAnnotation]).To(BeEmpty())
-	})
-	It("should ignore a pvc with unscheduled or terminal pods", func() {
-		ExpectApplied(ctx, env.Client, pvc,
-			test.Pod(test.PodOptions{Phase: v1.PodPending}),
-			test.Pod(test.PodOptions{NodeName: strings.ToLower(randomdata.SillyName()), Phase: v1.PodSucceeded}),
-			test.Pod(test.PodOptions{NodeName: strings.ToLower(randomdata.SillyName()), Phase: v1.PodFailed}),
-		)
-		ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(pvc))
-		Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(pvc), pvc)).To(Succeed())
-		Expect(pvc.Annotations[persistentvolumeclaim.SelectedNodeAnnotation]).To(BeEmpty())
-	})
-	It("should bind a pvc to a pod's node", func() {
-		pod := test.Pod(test.PodOptions{NodeName: strings.ToLower(randomdata.SillyName()), PersistentVolumeClaims: []string{pvc.Name}})
-		ExpectApplied(ctx, env.Client, pvc, pod)
-		ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(pvc))
-		Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(pvc), pvc)).To(Succeed())
-		Expect(pvc.Annotations[persistentvolumeclaim.SelectedNodeAnnotation]).To(Equal(pod.Spec.NodeName))
-	})
-})
diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go
index c1cdad12b238..0e037dd58fd7 100644
--- a/pkg/controllers/provisioning/provisioner.go
+++ b/pkg/controllers/provisioning/provisioner.go
@@ -224,7 +224,7 @@ func (p *Provisioner) schedule(ctx context.Context, pods []*v1.Pod) ([]*schedule
 		return nil, fmt.Errorf("getting daemon overhead, %w", err)
 	}
 
-	return scheduler.NewScheduler(nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods)
+	return scheduler.NewScheduler(ctx, p.kubeClient, nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods)
 }
 
 func (p *Provisioner) launch(ctx context.Context, node *scheduler.Node) error {
diff --git a/pkg/controllers/provisioning/scheduling/inflightnode.go b/pkg/controllers/provisioning/scheduling/inflightnode.go
index 839651994c53..b06d4d071df8 100644
--- a/pkg/controllers/provisioning/scheduling/inflightnode.go
+++ b/pkg/controllers/provisioning/scheduling/inflightnode.go
@@ -15,6 +15,7 @@ limitations under the License.
 package scheduling
 
 import (
+	"context"
 	"fmt"
 
 	"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
@@ -35,13 +36,14 @@ type InFlightNode struct {
 	requirements       scheduling.Requirements
 	available          v1.ResourceList
 	startupTolerations []v1.Toleration
-	hostPortUsage      *state.HostPortUsage
+	hostPortUsage      *scheduling.HostPortUsage
+	volumeUsage        *scheduling.VolumeLimits
+	volumeLimits       scheduling.VolumeCount
 }
 
 func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint, daemonResources v1.ResourceList) *InFlightNode {
 	// the remaining daemonResources to schedule are the total daemonResources minus what has already scheduled
 	remainingDaemonResources := resources.Subtract(daemonResources, n.DaemonSetRequested)
-
 	node := &InFlightNode{
 		Node:          n.Node,
 		available:     n.Available,
@@ -49,6 +51,8 @@ func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint
 		requests:      remainingDaemonResources,
 		requirements:  scheduling.NewLabelRequirements(n.Node.Labels),
 		hostPortUsage: n.HostPortUsage.Copy(),
+		volumeUsage:   n.VolumeUsage.Copy(),
+		volumeLimits:  n.VolumeLimits,
 	}
 
 	if n.Node.Labels[v1alpha5.LabelNodeInitialized] != "true" {
@@ -81,15 +85,24 @@ func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint
 	return node
 }
 
-func (n *InFlightNode) Add(pod *v1.Pod) error {
+func (n *InFlightNode) Add(ctx context.Context, pod *v1.Pod) error {
 	// Check Taints
 	if err := scheduling.Taints(n.Node.Spec.Taints).Tolerates(pod, n.startupTolerations...); err != nil {
 		return err
 	}
 
-	if err := n.hostPortUsage.Add(pod); err != nil {
+	if err := n.hostPortUsage.Validate(pod); err != nil {
+		return err
+	}
+
+	// determine the number of volumes that will be mounted if the pod schedules
+	mountedVolumeCount, err := n.volumeUsage.Validate(ctx, pod)
+	if err != nil {
 		return err
 	}
+	if mountedVolumeCount.Exceeds(n.volumeLimits) {
+		return fmt.Errorf("would exceed node volume limits")
+	}
 
 	// check resource requests first since that's a pretty likely reason the pod won't schedule on an in-flight
 	// node, which at this point can't be increased in size
@@ -122,5 +135,7 @@ func (n *InFlightNode) Add(pod *v1.Pod) error {
 	n.requests = requests
 	n.requirements = nodeRequirements
 	n.topology.Record(pod, nodeRequirements)
+	n.hostPortUsage.Add(ctx, pod)
+	n.volumeUsage.Add(ctx, pod)
 	return nil
 }
diff --git a/pkg/controllers/provisioning/scheduling/node.go b/pkg/controllers/provisioning/scheduling/node.go
index bb67a1b8f7c6..0ceef8b3c1d0 100644
--- a/pkg/controllers/provisioning/scheduling/node.go
+++ b/pkg/controllers/provisioning/scheduling/node.go
@@ -15,17 +15,16 @@ limitations under the License.
 package scheduling
 
 import (
+	"context"
 	"fmt"
 	"strings"
 	"sync/atomic"
 
-	v1 "k8s.io/api/core/v1"
-
 	"github.com/samber/lo"
+	v1 "k8s.io/api/core/v1"
 
 	"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
 	"github.com/aws/karpenter/pkg/cloudprovider"
-	"github.com/aws/karpenter/pkg/controllers/state"
 	"github.com/aws/karpenter/pkg/scheduling"
 	"github.com/aws/karpenter/pkg/utils/resources"
 	"github.com/aws/karpenter/pkg/utils/sets"
@@ -40,7 +39,7 @@ type Node struct {
 
 	topology      *Topology
 	requests      v1.ResourceList
-	hostPortUsage *state.HostPortUsage
+	hostPortUsage *scheduling.HostPortUsage
 }
 
 var nodeID int64
@@ -54,19 +53,20 @@ func NewNode(nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonRe
 	return &Node{
 		NodeTemplate:        template,
 		InstanceTypeOptions: instanceTypes,
-		hostPortUsage:       state.NewHostPortUsage(),
+		hostPortUsage:       scheduling.NewHostPortUsage(),
 		topology:            topology,
 		requests:            daemonResources,
 	}
 }
 
-func (n *Node) Add(pod *v1.Pod) error {
+func (n *Node) Add(ctx context.Context, pod *v1.Pod) error {
 	// Check Taints
 	if err := n.Taints.Tolerates(pod); err != nil {
 		return err
 	}
 
-	if err := n.hostPortUsage.Add(pod); err != nil {
+	// exposed host ports on the node
+	if err := n.hostPortUsage.Validate(pod); err != nil {
 		return err
 	}
 
@@ -102,6 +102,7 @@ func (n *Node) Add(pod *v1.Pod) error {
 	n.requests = requests
 	n.Requirements = nodeRequirements
 	n.topology.Record(pod, nodeRequirements)
+	n.hostPortUsage.Add(ctx, pod)
 	return nil
 }
 
diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go
index 1382962a60a7..7881e90b48c4 100644
--- a/pkg/controllers/provisioning/scheduling/scheduler.go
+++ b/pkg/controllers/provisioning/scheduling/scheduler.go
@@ -33,13 +33,15 @@ import (
 	"github.com/aws/karpenter/pkg/utils/resources"
 )
 
-func NewScheduler(nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler {
+func NewScheduler(ctx context.Context, kubeClient client.Client, nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler {
 	for provisioner := range instanceTypes {
 		sort.Slice(instanceTypes[provisioner], func(i, j int) bool {
 			return instanceTypes[provisioner][i].Price() < instanceTypes[provisioner][j].Price()
 		})
 	}
 	s := &Scheduler{
+		ctx:                ctx,
+		kubeClient:         kubeClient,
 		nodeTemplates:      nodeTemplates,
 		topology:           topology,
 		cluster:            cluster,
@@ -84,6 +86,7 @@ func NewScheduler(nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alp
 }
 
 type Scheduler struct {
+	ctx                context.Context
 	nodes              []*Node
 	inflight           []*InFlightNode
 	nodeTemplates      []*scheduling.NodeTemplate
@@ -94,6 +97,7 @@ type Scheduler struct {
 	topology           *Topology
 	cluster            *state.Cluster
 	recorder           events.Recorder
+	kubeClient         client.Client
 }
 
 func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) ([]*Node, error) {
@@ -112,7 +116,7 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) ([]*Node, error)
 		}
 
 		// Schedule to existing nodes or create a new node
-		if errors[pod] = s.add(pod); errors[pod] == nil {
+		if errors[pod] = s.add(ctx, pod); errors[pod] == nil {
 			continue
 		}
 
@@ -165,10 +169,10 @@ func (s *Scheduler) recordSchedulingResults(ctx context.Context, pods []*v1.Pod,
 	logging.FromContext(ctx).Infof("Computed %d unready node(s) will fit %d pod(s)", inflightCount, existingCount)
 }
 
-func (s *Scheduler) add(pod *v1.Pod) error {
+func (s *Scheduler) add(ctx context.Context, pod *v1.Pod) error {
 	// first try to schedule against an in-flight real node
 	for _, node := range s.inflight {
-		if err := node.Add(pod); err == nil {
+		if err := node.Add(ctx, pod); err == nil {
 			return nil
 		}
 	}
@@ -178,7 +182,7 @@ func (s *Scheduler) add(pod *v1.Pod) error {
 
 	// Pick existing node that we are about to create
 	for _, node := range s.nodes {
-		if err := node.Add(pod); err == nil {
+		if err := node.Add(ctx, pod); err == nil {
 			return nil
 		}
 	}
@@ -197,7 +201,7 @@ func (s *Scheduler) add(pod *v1.Pod) error {
 		}
 
 		node := NewNode(nodeTemplate, s.topology, s.daemonOverhead[nodeTemplate], instanceTypes)
-		if err := node.Add(pod); err != nil {
+		if err := node.Add(ctx, pod); err != nil {
 			errs = multierr.Append(errs, fmt.Errorf("incompatible with provisioner %q, %w", nodeTemplate.ProvisionerName, err))
 			continue
 		}
diff --git a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go
index 9349f0e9f12e..d645bd6fa33c 100644
--- a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go
+++ b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go
@@ -112,10 +112,11 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
 
 	instanceTypes := fake.InstanceTypes(instanceCount)
 	cloudProv := &fake.CloudProvider{InstanceTypes: instanceTypes}
-	scheduler := NewScheduler(
+	scheduler := NewScheduler(ctx,
+		nil,
 		[]*scheduling.NodeTemplate{scheduling.NewNodeTemplate(provisioner)},
 		nil,
-		state.NewCluster(ctx, nil, cloudProv),
+		state.NewCluster(nil, cloudProv),
 		&Topology{},
 		map[string][]cloudprovider.InstanceType{provisioner.Name: instanceTypes},
 		map[*scheduling.NodeTemplate]v1.ResourceList{},
diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go
index 283ceb51a479..bd23a1bef861 100644
--- a/pkg/controllers/provisioning/scheduling/suite_test.go
+++ b/pkg/controllers/provisioning/scheduling/suite_test.go
@@ -16,6 +16,7 @@ package scheduling_test
 
 import (
 	"context"
+	"fmt"
 	"math"
 	"math/rand"
 	"strings"
@@ -27,6 +28,7 @@ import (
 	"github.com/Pallinder/go-randomdata"
 	"github.com/aws/aws-sdk-go/aws"
 	v1 "k8s.io/api/core/v1"
+	storagev1 "k8s.io/api/storage/v1"
 	"k8s.io/apimachinery/pkg/api/resource"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/util/sets"
@@ -72,7 +74,7 @@ var _ = BeforeSuite(func() {
 		instanceTypes, _ := cloudProv.GetInstanceTypes(ctx, nil)
 		// set these on the cloud provider so we can manipulate them if needed
 		cloudProv.InstanceTypes = instanceTypes
-		cluster = state.NewCluster(ctx, e.Client, cloudProv)
+		cluster = state.NewCluster(e.Client, cloudProv)
 		nodeStateController = state.NewNodeController(e.Client, cluster)
 		podStateController = state.NewPodController(e.Client, cluster)
 		recorder = test.NewEventRecorder()
@@ -2958,11 +2960,11 @@ var _ = Describe("Instance Type Compatibility", func() {
 			ExpectApplied(ctx, env.Client, provisioner)
 			pods := ExpectProvisioned(ctx, env.Client, controller,
 				test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{
-					fake.LabelInstanceSize:  "large",
+					fake.LabelInstanceSize:     "large",
 					v1.LabelInstanceTypeStable: cloudProv.InstanceTypes[0].Name(),
 				}}),
 				test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{
-					fake.LabelInstanceSize:  "small",
+					fake.LabelInstanceSize:     "small",
 					v1.LabelInstanceTypeStable: cloudProv.InstanceTypes[4].Name(),
 				}}),
 			)
@@ -3470,6 +3472,7 @@ var _ = Describe("In-Flight Nodes", func() {
 		}}
 		initialPod := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod(opts))
 		node1 := ExpectScheduled(ctx, env.Client, initialPod[0])
+		ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1))
 
 		// the node will have 2000m CPU, so these two pods can't both fit on it
 		opts.ResourceRequirements.Limits[v1.ResourceCPU] = resource.MustParse("1")
@@ -3486,6 +3489,7 @@ var _ = Describe("In-Flight Nodes", func() {
 		}}
 		initialPod := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod(opts))
 		node1 := ExpectScheduled(ctx, env.Client, initialPod[0])
+		ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1))
 
 		secondPod := ExpectProvisioned(ctx, env.Client, controller,
 			test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1.LabelArchStable: "arm64"}}))
@@ -3848,6 +3852,138 @@ var _ = Describe("No Pre-Binding", func() {
 	})
 })
 
+var _ = Describe("Volume Limits", func() {
+	It("should launch multiple nodes if required due to volume limits", func() {
+		const csiProvider = "fake.csi.provider"
+		cloudProv.InstanceTypes = []cloudprovider.InstanceType{
+			fake.NewInstanceType(
+				fake.InstanceTypeOptions{
+					Name: "instance-type",
+					Resources: map[v1.ResourceName]resource.Quantity{
+						v1.ResourceCPU:  resource.MustParse("1024"),
+						v1.ResourcePods: resource.MustParse("1024"),
+					},
+				}),
+		}
+
+		provisioner.Spec.Limits = nil
+		ExpectApplied(ctx, env.Client, provisioner)
+		initialPods := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod())
+		node := ExpectScheduled(ctx, env.Client, initialPods[0])
+		csiNode := &storagev1.CSINode{
+			ObjectMeta: metav1.ObjectMeta{
+				Name: node.Name,
+			},
+			Spec: storagev1.CSINodeSpec{
+				Drivers: []storagev1.CSINodeDriver{
+					{
+						Name:   csiProvider,
+						NodeID: "fake-node-id",
+						Allocatable: &storagev1.VolumeNodeResources{
+							Count: aws.Int32(10),
+						},
+					},
+				},
+			},
+		}
+		ExpectApplied(ctx, env.Client, csiNode)
+		ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node))
+
+		sc := test.StorageClass(test.StorageClassOptions{
+			ObjectMeta:  metav1.ObjectMeta{Name: "my-storage-class"},
+			Provisioner: aws.String(csiProvider),
+			Zones:       []string{"test-zone-1"}})
+		ExpectApplied(ctx, env.Client, sc)
+
+		var pods []*v1.Pod
+		for i := 0; i < 6; i++ {
+			pvcA := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{
+				StorageClassName: aws.String("my-storage-class"),
+				ObjectMeta:       metav1.ObjectMeta{Name: fmt.Sprintf("my-claim-a-%d", i)},
+			})
+			pvcB := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{
+				StorageClassName: aws.String("my-storage-class"),
+				ObjectMeta:       metav1.ObjectMeta{Name: fmt.Sprintf("my-claim-b-%d", i)},
+			})
+			ExpectApplied(ctx, env.Client, pvcA, pvcB)
+			pods = append(pods, test.UnschedulablePod(test.PodOptions{
+				PersistentVolumeClaims: []string{pvcA.Name, pvcB.Name},
+			}))
+		}
+		ExpectProvisioned(ctx, env.Client, controller, pods...)
+		var nodeList v1.NodeList
+		Expect(env.Client.List(ctx, &nodeList)).To(Succeed())
+		// we need to create a new node as the in-flight one can only contain 5 pods due to the CSINode volume limit
+		Expect(nodeList.Items).To(HaveLen(2))
+	})
+	It("should launch a single node if all pods use the same PVC", func() {
+		const csiProvider = "fake.csi.provider"
+		cloudProv.InstanceTypes = []cloudprovider.InstanceType{
+			fake.NewInstanceType(
+				fake.InstanceTypeOptions{
+					Name: "instance-type",
+					Resources: map[v1.ResourceName]resource.Quantity{
+						v1.ResourceCPU:  resource.MustParse("1024"),
+						v1.ResourcePods: resource.MustParse("1024"),
+					},
+				}),
+		}
+
+		provisioner.Spec.Limits = nil
+		ExpectApplied(ctx, env.Client, provisioner)
+		initialPods := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod())
+		node := ExpectScheduled(ctx, env.Client, initialPods[0])
+		csiNode := &storagev1.CSINode{
+			ObjectMeta: metav1.ObjectMeta{
+				Name: node.Name,
+			},
+			Spec: storagev1.CSINodeSpec{
+				Drivers: []storagev1.CSINodeDriver{
+					{
+						Name:   csiProvider,
+						NodeID: "fake-node-id",
+						Allocatable: &storagev1.VolumeNodeResources{
+							Count: aws.Int32(10),
+						},
+					},
+				},
+			},
+		}
+		ExpectApplied(ctx, env.Client, csiNode)
+		ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node))
+
+		sc := test.StorageClass(test.StorageClassOptions{
+			ObjectMeta:  metav1.ObjectMeta{Name: "my-storage-class"},
+			Provisioner: aws.String(csiProvider),
+			Zones:       []string{"test-zone-1"}})
+		ExpectApplied(ctx, env.Client, sc)
+
+		pv := test.PersistentVolume(test.PersistentVolumeOptions{
+			ObjectMeta: metav1.ObjectMeta{Name: "my-volume"},
+			Zones:      []string{"test-zone-1"}})
+
+		pvc := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{
+			ObjectMeta:       metav1.ObjectMeta{Name: "my-claim"},
+			StorageClassName: aws.String("my-storage-class"),
+			VolumeName:       pv.Name,
+		})
+		ExpectApplied(ctx, env.Client, pv, pvc)
+
+		var pods []*v1.Pod
+		for i := 0; i < 100; i++ {
+			pods = append(pods, test.UnschedulablePod(test.PodOptions{
+				PersistentVolumeClaims: []string{pvc.Name, pvc.Name},
+			}))
+		}
+		ExpectApplied(ctx, env.Client, provisioner)
+		ExpectProvisioned(ctx, env.Client, controller, pods...)
+		var nodeList v1.NodeList
+		Expect(env.Client.List(ctx, &nodeList)).To(Succeed())
+		// 100 of the same PVC should all be schedulable on the same node
+		Expect(nodeList.Items).To(HaveLen(1))
+	})
+})
+
 func MakePods(count int, options test.PodOptions) (pods []*v1.Pod) {
 	for i := 0; i < count; i++ {
 		pods = append(pods, test.UnschedulablePod(options))
diff --git a/pkg/controllers/provisioning/suite_test.go b/pkg/controllers/provisioning/suite_test.go
index 6bc8861ebcd9..8632b7f5de7d 100644
--- a/pkg/controllers/provisioning/suite_test.go
+++ b/pkg/controllers/provisioning/suite_test.go
@@ -66,7 +66,7 @@ var _ = BeforeSuite(func() {
 		for _, it := range instanceTypes {
 			instanceTypeMap[it.Name()] = it
 		}
-		cluster := state.NewCluster(ctx, e.Client, cloudProvider)
+		cluster := state.NewCluster(e.Client, cloudProvider)
 		controller = provisioning.NewController(ctx, cfg, e.Client, corev1.NewForConfigOrDie(e.Config), recorder, cloudProvider, cluster)
 	})
 	Expect(env.Start()).To(Succeed(), "Failed to start environment")
diff --git a/pkg/controllers/state/cluster.go b/pkg/controllers/state/cluster.go
index 1d276fd631bb..d893c64ac27b 100644
--- a/pkg/controllers/state/cluster.go
+++ b/pkg/controllers/state/cluster.go
@@ -20,23 +20,26 @@ import (
 	"sort"
 	"sync"
 
-	"github.com/aws/karpenter/pkg/cloudprovider"
+	"go.uber.org/multierr"
 
-	"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
-
-	"knative.dev/pkg/logging"
+	"github.com/aws/aws-sdk-go/aws"
+	"k8s.io/apimachinery/pkg/api/errors"
 
 	v1 "k8s.io/api/core/v1"
+	storagev1 "k8s.io/api/storage/v1"
 	"k8s.io/apimachinery/pkg/types"
+	"knative.dev/pkg/logging"
 	"sigs.k8s.io/controller-runtime/pkg/client"
 
+	"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
+	"github.com/aws/karpenter/pkg/cloudprovider"
+	"github.com/aws/karpenter/pkg/scheduling"
 	podutils "github.com/aws/karpenter/pkg/utils/pod"
 	"github.com/aws/karpenter/pkg/utils/resources"
 )
 
 // Cluster maintains cluster state that is often needed but expensive to compute.
 type Cluster struct {
-	ctx           context.Context
 	kubeClient    client.Client
 	cloudProvider cloudprovider.CloudProvider
 
@@ -49,9 +52,8 @@ type Cluster struct {
 	bindings map[types.NamespacedName]string // pod namespaced named -> node name
 }
 
-func NewCluster(ctx context.Context, client client.Client, cp cloudprovider.CloudProvider) *Cluster {
+func NewCluster(client client.Client, cp cloudprovider.CloudProvider) *Cluster {
 	c := &Cluster{
-		ctx:           ctx,
 		kubeClient:    client,
 		cloudProvider: cp,
 		nodes:         map[string]*Node{},
@@ -76,7 +78,10 @@ type Node struct {
 	// included in the calculation for Available.
 	DaemonSetRequested v1.ResourceList
 	// HostPort usage of all pods that are bound to the node
-	HostPortUsage *HostPortUsage
+	HostPortUsage *scheduling.HostPortUsage
+	VolumeUsage   *scheduling.VolumeLimits
+	VolumeLimits  scheduling.VolumeCount
+
 	// Provisioner is the provisioner used to create the node.
 	Provisioner *v1alpha5.Provisioner
 
@@ -126,33 +131,31 @@ func (c *Cluster) ForEachNode(f func(n *Node) bool) {
 	}
 }
 
-func (c *Cluster) newNode(node *v1.Node) *Node {
+func (c *Cluster) newNode(ctx context.Context, node *v1.Node) (*Node, error) {
 	n := &Node{
 		Node:          node,
-		HostPortUsage: NewHostPortUsage(),
+		HostPortUsage: scheduling.NewHostPortUsage(),
+		VolumeUsage:   scheduling.NewVolumeLimits(c.kubeClient),
+		VolumeLimits:  scheduling.VolumeCount{},
 		podRequests:   map[types.NamespacedName]v1.ResourceList{},
 	}
-
-	// store the provisioner if it exists
-	if provisionerName, ok := node.Labels[v1alpha5.ProvisionerNameLabelKey]; ok {
-		var provisioner v1alpha5.Provisioner
-		if err := c.kubeClient.Get(c.ctx, client.ObjectKey{Name: provisionerName}, &provisioner); err != nil {
-			logging.FromContext(c.ctx).Errorf("getting provisioner, %s", err)
-		} else {
-			n.Provisioner = &provisioner
-		}
-	}
-
 	var err error
+	err = multierr.Append(err, c.populateProvisioner(ctx, node, n))
+	err = multierr.Append(err, c.populateVolumeLimits(ctx, node, n))
+	err = multierr.Append(err, c.populateResourceRequests(ctx, node, n))
+	return n, err
+}
 
-	n.InstanceType, err = c.getInstanceType(c.ctx, n.Provisioner, node.Labels[v1.LabelInstanceTypeStable])
+func (c *Cluster) populateResourceRequests(ctx context.Context, node *v1.Node, n *Node) error {
+	var err error
+	n.InstanceType, err = c.getInstanceType(ctx, n.Provisioner, node.Labels[v1.LabelInstanceTypeStable])
 	if err != nil {
-		logging.FromContext(c.ctx).Errorf("getting instance type, %s", err)
+		return fmt.Errorf("getting instance type, %w", err)
 	}
 
 	var pods v1.PodList
-	if err := c.kubeClient.List(c.ctx, &pods, client.MatchingFields{"spec.nodeName": node.Name}); err != nil {
-		logging.FromContext(c.ctx).Errorf("listing pods, %s", err)
+	if err := c.kubeClient.List(ctx, &pods, client.MatchingFields{"spec.nodeName": node.Name}); err != nil {
+		return fmt.Errorf("listing pods, %w", err)
 	}
 	var requested []v1.ResourceList
 	var daemonsetRequested []v1.ResourceList
@@ -167,9 +170,8 @@ func (c *Cluster) newNode(node *v1.Node) *Node {
 			daemonsetRequested = append(daemonsetRequested, requests)
 		}
 		requested = append(requested, requests)
-		if err := n.HostPortUsage.Add(pod); err != nil {
-			logging.FromContext(c.ctx).Errorf("inconsistent state, error tracking host port usage on node %s, %s", n.Node.Name, err)
-		}
+		n.HostPortUsage.Add(ctx, pod)
+		n.VolumeUsage.Add(ctx, pod)
 	}
 
 	n.DaemonSetRequested = resources.Merge(daemonsetRequested...)
@@ -179,15 +181,42 @@ func (c *Cluster) newNode(node *v1.Node) *Node {
 	if len(n.Capacity) == 0 && n.InstanceType != nil {
 		n.Capacity = n.InstanceType.Resources()
 	}
-	n.Available = resources.Subtract(c.getNodeAllocatable(node, n.Provisioner), resources.Merge(requested...))
-	return n
+	n.Available = resources.Subtract(c.getNodeAllocatable(ctx, node, n.Provisioner), resources.Merge(requested...))
+	return nil
+}
+
+func (c *Cluster) populateVolumeLimits(ctx context.Context, node *v1.Node, n *Node) error {
+	var csiNode storagev1.CSINode
+	if err := c.kubeClient.Get(ctx, client.ObjectKey{Name: node.Name}, &csiNode); err != nil && !errors.IsNotFound(err) {
+		return fmt.Errorf("getting CSINode to determine volume limit for %s, %w", node.Name, err)
+	}
+
+	for _, driver := range csiNode.Spec.Drivers {
+		if driver.Allocatable == nil {
+			continue
+		}
+		n.VolumeLimits[driver.Name] = int(aws.Int32Value(driver.Allocatable.Count))
+	}
+	return nil
+}
+
+func (c *Cluster) populateProvisioner(ctx context.Context, node *v1.Node, n *Node) error {
+	// store the provisioner if it exists
+	if provisionerName, ok := node.Labels[v1alpha5.ProvisionerNameLabelKey]; ok {
+		var provisioner v1alpha5.Provisioner
+		if err := c.kubeClient.Get(ctx, client.ObjectKey{Name: provisionerName}, &provisioner); err != nil {
+			return fmt.Errorf("getting provisioner, %w", err)
+		}
+		n.Provisioner = &provisioner
+	}
+	return nil
 }
 
 // getNodeAllocatable gets the allocatable resources for the node.
-func (c *Cluster) getNodeAllocatable(node *v1.Node, provisioner *v1alpha5.Provisioner) v1.ResourceList {
-	instanceType, err := c.getInstanceType(c.ctx, provisioner, node.Labels[v1.LabelInstanceTypeStable])
+func (c *Cluster) getNodeAllocatable(ctx context.Context, node *v1.Node, provisioner *v1alpha5.Provisioner) v1.ResourceList {
+	instanceType, err := c.getInstanceType(ctx, provisioner, node.Labels[v1.LabelInstanceTypeStable])
 	if err != nil {
-		logging.FromContext(c.ctx).Errorf("error finding instance type, %s", err)
+		logging.FromContext(ctx).Errorf("finding instance type for %s, %s", node.Name, err)
 		return node.Status.Allocatable
 	}
 
@@ -221,10 +250,12 @@ func (c *Cluster) deleteNode(nodeName string) {
 }
 
 // updateNode is called for every node reconciliation
-func (c *Cluster) updateNode(node *v1.Node) {
+func (c *Cluster) updateNode(ctx context.Context, node *v1.Node) error {
 	c.mu.Lock()
 	defer c.mu.Unlock()
-	c.nodes[node.Name] = c.newNode(node)
+	var err error
+	c.nodes[node.Name], err = c.newNode(ctx, node)
+	return err
 }
 
 // deletePod is called when the pod has been deleted
@@ -253,6 +284,7 @@ func (c *Cluster) updateNodeUsageFromPodDeletion(podKey types.NamespacedName) {
 	n.Available = resources.Merge(n.Available, n.podRequests[podKey])
 	delete(n.podRequests, podKey)
 	n.HostPortUsage.DeletePod(podKey)
+	n.VolumeUsage.DeletePod(podKey)
 
 	// We can't easily track the changes to the DaemonsetRequested here as we no longer have the pod.  We could keep up
 	// with this separately, but if a daemonset pod is being deleted, it usually means the node is going down.  In the
@@ -260,9 +292,10 @@ func (c *Cluster) updateNodeUsageFromPodDeletion(podKey types.NamespacedName) {
 }
 
 // updatePod is called every time the pod is reconciled
-func (c *Cluster) updatePod(pod *v1.Pod) {
-	c.updateNodeUsageFromPod(pod)
+func (c *Cluster) updatePod(ctx context.Context, pod *v1.Pod) error {
+	err := c.updateNodeUsageFromPod(ctx, pod)
 	c.updatePodAntiAffinities(pod)
+	return err
 }
 
 func (c *Cluster) updatePodAntiAffinities(pod *v1.Pod) {
@@ -279,10 +312,10 @@ func (c *Cluster) updatePodAntiAffinities(pod *v1.Pod) {
 
 // updateNodeUsageFromPod is called every time a reconcile event occurs for the pod. If the pods binding has changed
 // (unbound to bound), we need to update the resource requests on the node.
-func (c *Cluster) updateNodeUsageFromPod(pod *v1.Pod) {
+func (c *Cluster) updateNodeUsageFromPod(ctx context.Context, pod *v1.Pod) error {
 	// nothing to do if the pod isn't bound, checking early allows avoiding unnecessary locking
 	if pod.Spec.NodeName == "" {
-		return
+		return nil
 	}
 
 	c.mu.Lock()
@@ -293,11 +326,10 @@ func (c *Cluster) updateNodeUsageFromPod(pod *v1.Pod) {
 	if bindingKnown {
 		if oldNodeName == pod.Spec.NodeName {
 			// we are already tracking the pod binding, so nothing to update
-			return
+			return nil
 		}
 		// the pod has switched nodes, this can occur if a pod name was re-used and it was deleted/re-created rapidly,
 		// binding to a different node the second time
-		logging.FromContext(c.ctx).Infof("pod %s has moved from node %s to %s", podKey, oldNodeName, pod.Spec.NodeName)
 		n, ok := c.nodes[oldNodeName]
 		if ok {
 			// we were tracking the old node, so we need to reduce its capacity by the amount of the pod that has
@@ -313,14 +345,15 @@ func (c *Cluster) updateNodeUsageFromPod(pod *v1.Pod) {
 	n, ok := c.nodes[pod.Spec.NodeName]
 	if !ok {
 		var node v1.Node
-		if err := c.kubeClient.Get(c.ctx, client.ObjectKey{Name: pod.Spec.NodeName}, &node); err != nil {
-			logging.FromContext(c.ctx).Errorf("getting node, %s", err)
+		if err := c.kubeClient.Get(ctx, client.ObjectKey{Name: pod.Spec.NodeName}, &node); err != nil && !errors.IsNotFound(err) {
+			return fmt.Errorf("getting node, %w", err)
 		}
 
 		// node didn't exist, but creating it will pick up this newly bound pod as well
-		n = c.newNode(&node)
+		var err error
+		n, err = c.newNode(ctx, &node)
 		c.nodes[pod.Spec.NodeName] = n
-		return
+		return err
 	}
 
 	// sum the newly bound pod's requests into the existing node and record the binding
@@ -331,11 +364,11 @@ func (c *Cluster) updateNodeUsageFromPod(pod *v1.Pod) {
 	if podutils.IsOwnedByDaemonSet(pod) {
 		n.DaemonSetRequested = resources.Merge(n.DaemonSetRequested, podRequests)
 	}
-	if err := n.HostPortUsage.Add(pod); err != nil {
-		logging.FromContext(c.ctx).Errorf("inconsistent state, error tracking host port usage on node %s, %s", n.Node.Name, err)
-	}
+	n.HostPortUsage.Add(ctx, pod)
+	n.VolumeUsage.Add(ctx, pod)
 	n.podRequests[podKey] = podRequests
 	c.bindings[podKey] = n.Node.Name
+	return nil
 }
 
 func (c *Cluster) getInstanceType(ctx context.Context, provisioner *v1alpha5.Provisioner, instanceTypeName string) (cloudprovider.InstanceType, error) {
diff --git a/pkg/controllers/state/node.go b/pkg/controllers/state/node.go
index 3c2f9a2c2bb5..7cd9df63c374 100644
--- a/pkg/controllers/state/node.go
+++ b/pkg/controllers/state/node.go
@@ -53,9 +53,10 @@ func (c *NodeController) Reconcile(ctx context.Context, req reconcile.Request) (
 		}
 		return reconcile.Result{}, err
 	}
+	if err := c.cluster.updateNode(ctx, node); err != nil {
+		return reconcile.Result{}, err
+	}
 	// ensure it's aware of any nodes we discover, this is a no-op if the node is already known to our cluster state
-	c.cluster.updateNode(node)
-
 	return reconcile.Result{Requeue: true, RequeueAfter: stateRetryPeriod}, nil
 }
 
diff --git a/pkg/controllers/state/pod.go b/pkg/controllers/state/pod.go
index 7b3cbc22379b..2049b89359f5 100644
--- a/pkg/controllers/state/pod.go
+++ b/pkg/controllers/state/pod.go
@@ -55,7 +55,9 @@ func (c *PodController) Reconcile(ctx context.Context, req reconcile.Request) (r
 		return reconcile.Result{}, err
 	}
 
-	c.cluster.updatePod(stored)
+	if err := c.cluster.updatePod(ctx, stored); err != nil {
+		return reconcile.Result{}, err
+	}
 
 	return reconcile.Result{Requeue: true, RequeueAfter: stateRetryPeriod}, nil
 }
diff --git a/pkg/controllers/state/suite_test.go b/pkg/controllers/state/suite_test.go
index a891eb1a7bc9..e34671c67c6d 100644
--- a/pkg/controllers/state/suite_test.go
+++ b/pkg/controllers/state/suite_test.go
@@ -65,7 +65,7 @@ var _ = AfterSuite(func() {
 
 var _ = BeforeEach(func() {
 	cloudProvider = &fake.CloudProvider{InstanceTypes: fake.InstanceTypesAssorted()}
-	cluster = state.NewCluster(ctx, env.Client, cloudProvider)
+	cluster = state.NewCluster(env.Client, cloudProvider)
 	nodeController = state.NewNodeController(env.Client, cluster)
 	podController = state.NewPodController(env.Client, cluster)
 	provisioner = test.Provisioner(test.ProvisionerOptions{ObjectMeta: metav1.ObjectMeta{Name: "default"}})
@@ -90,7 +90,10 @@ var _ = Describe("Node Resource Level", func() {
 				}},
 		})
 		node := test.Node(test.NodeOptions{
-			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}},
+			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
+				v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
+				v1.LabelInstanceTypeStable:       cloudProvider.InstanceTypes[0].Name(),
+			}},
 			Allocatable: map[v1.ResourceName]resource.Quantity{
 				v1.ResourceCPU: resource.MustParse("4"),
 			}})
@@ -118,7 +121,10 @@ var _ = Describe("Node Resource Level", func() {
 				}},
 		})
 		node := test.Node(test.NodeOptions{
-			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}},
+			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
+				v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
+				v1.LabelInstanceTypeStable:       cloudProvider.InstanceTypes[0].Name(),
+			}},
 			Allocatable: map[v1.ResourceName]resource.Quantity{
 				v1.ResourceCPU: resource.MustParse("4"),
 			}})
@@ -153,7 +159,10 @@ var _ = Describe("Node Resource Level", func() {
 				}},
 		})
 		node := test.Node(test.NodeOptions{
-			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}},
+			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
+				v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
+				v1.LabelInstanceTypeStable:       cloudProvider.InstanceTypes[0].Name(),
+			}},
 			Allocatable: map[v1.ResourceName]resource.Quantity{
 				v1.ResourceCPU: resource.MustParse("4"),
 			}})
@@ -182,7 +191,10 @@ var _ = Describe("Node Resource Level", func() {
 				}},
 		})
 		node := test.Node(test.NodeOptions{
-			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}},
+			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
+				v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
+				v1.LabelInstanceTypeStable:       cloudProvider.InstanceTypes[0].Name(),
+			}},
 			Allocatable: map[v1.ResourceName]resource.Quantity{
 				v1.ResourceCPU: resource.MustParse("4"),
 			}})
@@ -217,7 +229,10 @@ var _ = Describe("Node Resource Level", func() {
 				}},
 		})
 		node := test.Node(test.NodeOptions{
-			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}},
+			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
+				v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
+				v1.LabelInstanceTypeStable:       cloudProvider.InstanceTypes[0].Name(),
+			}},
 			Allocatable: map[v1.ResourceName]resource.Quantity{
 				v1.ResourceCPU: resource.MustParse("4"),
 			}})
@@ -256,7 +271,10 @@ var _ = Describe("Node Resource Level", func() {
 		})
 
 		node1 := test.Node(test.NodeOptions{
-			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}},
+			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
+				v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
+				v1.LabelInstanceTypeStable:       cloudProvider.InstanceTypes[0].Name(),
+			}},
 			Allocatable: map[v1.ResourceName]resource.Quantity{
 				v1.ResourceCPU: resource.MustParse("4"),
 			}})
@@ -279,7 +297,10 @@ var _ = Describe("Node Resource Level", func() {
 
 		// second node has more capacity
 		node2 := test.Node(test.NodeOptions{
-			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}},
+			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
+				v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
+				v1.LabelInstanceTypeStable:       cloudProvider.InstanceTypes[0].Name(),
+			}},
 			Allocatable: map[v1.ResourceName]resource.Quantity{
 				v1.ResourceCPU: resource.MustParse("8"),
 			}})
@@ -325,7 +346,10 @@ var _ = Describe("Node Resource Level", func() {
 			}))
 		}
 		node := test.Node(test.NodeOptions{
-			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}},
+			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
+				v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
+				v1.LabelInstanceTypeStable:       cloudProvider.InstanceTypes[0].Name(),
+			}},
 			Allocatable: map[v1.ResourceName]resource.Quantity{
 				v1.ResourceCPU:  resource.MustParse("200"),
 				v1.ResourcePods: resource.MustParse("500"),
@@ -401,7 +425,10 @@ var _ = Describe("Node Resource Level", func() {
 		})
 
 		node := test.Node(test.NodeOptions{
-			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}},
+			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
+				v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
+				v1.LabelInstanceTypeStable:       cloudProvider.InstanceTypes[0].Name(),
+			}},
 			Allocatable: map[v1.ResourceName]resource.Quantity{
 				v1.ResourceCPU:    resource.MustParse("4"),
 				v1.ResourceMemory: resource.MustParse("8Gi"),
@@ -450,7 +477,10 @@ var _ = Describe("Pod Anti-Affinity", func() {
 		})
 
 		node := test.Node(test.NodeOptions{
-			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}},
+			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
+				v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
+				v1.LabelInstanceTypeStable:       cloudProvider.InstanceTypes[0].Name(),
+			}},
 			Allocatable: map[v1.ResourceName]resource.Quantity{
 				v1.ResourceCPU: resource.MustParse("4"),
 			}})
@@ -489,7 +519,10 @@ var _ = Describe("Pod Anti-Affinity", func() {
 		})
 
 		node := test.Node(test.NodeOptions{
-			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}},
+			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
+				v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
+				v1.LabelInstanceTypeStable:       cloudProvider.InstanceTypes[0].Name(),
+			}},
 			Allocatable: map[v1.ResourceName]resource.Quantity{
 				v1.ResourceCPU: resource.MustParse("4"),
 			}})
@@ -525,7 +558,10 @@ var _ = Describe("Pod Anti-Affinity", func() {
 		})
 
 		node := test.Node(test.NodeOptions{
-			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}},
+			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
+				v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
+				v1.LabelInstanceTypeStable:       cloudProvider.InstanceTypes[0].Name(),
+			}},
 			Allocatable: map[v1.ResourceName]resource.Quantity{
 				v1.ResourceCPU: resource.MustParse("4"),
 			}})
@@ -571,7 +607,10 @@ var _ = Describe("Pod Anti-Affinity", func() {
 		})
 
 		node := test.Node(test.NodeOptions{
-			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}},
+			ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
+				v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
+				v1.LabelInstanceTypeStable:       cloudProvider.InstanceTypes[0].Name(),
+			}},
 			Allocatable: map[v1.ResourceName]resource.Quantity{
 				v1.ResourceCPU: resource.MustParse("4"),
 			}})
diff --git a/pkg/controllers/state/hostportusage.go b/pkg/scheduling/hostportusage.go
similarity index 82%
rename from pkg/controllers/state/hostportusage.go
rename to pkg/scheduling/hostportusage.go
index 3f01c75af489..8f38e79fd4d1 100644
--- a/pkg/controllers/state/hostportusage.go
+++ b/pkg/scheduling/hostportusage.go
@@ -12,14 +12,16 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package state
+package scheduling
 
 import (
+	"context"
 	"fmt"
 	"net"
 
 	v1 "k8s.io/api/core/v1"
 	"k8s.io/apimachinery/pkg/types"
+	"knative.dev/pkg/logging"
 	"sigs.k8s.io/controller-runtime/pkg/client"
 )
 
@@ -61,17 +63,31 @@ func NewHostPortUsage() *HostPortUsage {
 }
 
 // Add adds a port to the HostPortUsage, returning an error in the case of a conflict
-func (u *HostPortUsage) Add(pod *v1.Pod) error {
+func (u *HostPortUsage) Add(ctx context.Context, pod *v1.Pod) {
+	newUsage, err := u.validate(pod)
+	if err != nil {
+		logging.FromContext(ctx).Errorf("invariant violated registering host port usage, %s, please file an issue", err)
+	}
+	u.reserved = append(u.reserved, newUsage...)
+}
+
+// Validate performs host port conflict validation to allow for determining if we can schedule the pod to the node
+// before doing so.
+func (u *HostPortUsage) Validate(pod *v1.Pod) error {
+	_, err := u.validate(pod)
+	return err
+}
+
+func (u *HostPortUsage) validate(pod *v1.Pod) ([]entry, error) {
 	newUsage := getHostPorts(pod)
 	for _, newEntry := range newUsage {
 		for _, existing := range u.reserved {
 			if newEntry.matches(existing) {
-				return fmt.Errorf("%s conflicts with existing HostPort configuration %s", newEntry, existing)
+				return nil, fmt.Errorf("%s conflicts with existing HostPort configuration %s", newEntry, existing)
 			}
 		}
 	}
-	u.reserved = append(u.reserved, newUsage...)
-	return nil
+	return newUsage, nil
 }
 
 // DeletePod deletes all host port usage from the HostPortUsage that were created by the pod with the given name.
diff --git a/pkg/scheduling/suite_test.go b/pkg/scheduling/suite_test.go
index ef5c1792870b..91b0bcbb5ef6 100644
--- a/pkg/scheduling/suite_test.go
+++ b/pkg/scheduling/suite_test.go
@@ -15,12 +15,11 @@ limitations under the License.
 package scheduling_test
 
 import (
-	v1 "k8s.io/api/core/v1"
-
 	"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
 	"github.com/aws/karpenter/pkg/scheduling"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
+	v1 "k8s.io/api/core/v1"
 )
 
 var _ = Describe("Scheduling", func() {
diff --git a/pkg/scheduling/volumelimits.go b/pkg/scheduling/volumelimits.go
new file mode 100644
index 000000000000..d7f49f20cec1
--- /dev/null
+++ b/pkg/scheduling/volumelimits.go
@@ -0,0 +1,199 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package scheduling
+
+import (
+	"context"
+	"fmt"
+
+	"knative.dev/pkg/logging"
+
+	v1 "k8s.io/api/core/v1"
+	storagev1 "k8s.io/api/storage/v1"
+	"k8s.io/apimachinery/pkg/types"
+	"k8s.io/apimachinery/pkg/util/sets"
+	"sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+// VolumeLimits tracks volume limits on a per node basis.  The number of volumes that can be mounted varies by instance
+// type. We need to be aware and track the mounted volume usage to inform our awareness of which pods can schedule to
+// which nodes.
+type VolumeLimits struct {
+	volumes    volumeUsage
+	podVolumes map[types.NamespacedName]volumeUsage
+	kubeClient client.Client
+}
+
+type volumeUsage map[string]sets.String
+
+func (u volumeUsage) Add(provisioner string, pvcID string) {
+	existing, ok := u[provisioner]
+	if !ok {
+		existing = sets.NewString()
+		u[provisioner] = existing
+	}
+	existing.Insert(pvcID)
+}
+
+func (u volumeUsage) union(volumes volumeUsage) volumeUsage {
+	cp := volumeUsage{}
+	for k, v := range u {
+		cp[k] = sets.NewString(v.List()...)
+	}
+	for k, v := range volumes {
+		existing, ok := cp[k]
+		if !ok {
+			existing = sets.NewString()
+			cp[k] = existing
+		}
+		existing.Insert(v.List()...)
+	}
+	return cp
+}
+
+func (u volumeUsage) insert(volumes volumeUsage) {
+	for k, v := range volumes {
+		existing, ok := u[k]
+		if !ok {
+			existing = sets.NewString()
+			u[k] = existing
+		}
+		existing.Insert(v.List()...)
+	}
+}
+
+func (u volumeUsage) copy() volumeUsage {
+	cp := volumeUsage{}
+	for k, v := range u {
+		cp[k] = sets.NewString(v.List()...)
+	}
+	return cp
+}
+
+func NewVolumeLimits(kubeClient client.Client) *VolumeLimits {
+	return &VolumeLimits{
+		kubeClient: kubeClient,
+		volumes:    volumeUsage{},
+		podVolumes: map[types.NamespacedName]volumeUsage{},
+	}
+}
+
+func (v *VolumeLimits) Add(ctx context.Context, pod *v1.Pod) {
+	podVolumes, err := v.validate(ctx, pod)
+	if err != nil {
+		logging.FromContext(ctx).Errorf("inconsistent state error adding volume, %s, please file an issue", err)
+	}
+	v.podVolumes[client.ObjectKeyFromObject(pod)] = podVolumes
+	v.volumes = v.volumes.union(podVolumes)
+}
+
+type VolumeCount map[string]int
+
+// Exceeds returns true if the volume count exceeds the limits provided.  If there is no value for a storage provider, it
+// is treated as unlimited.
+func (c VolumeCount) Exceeds(limits VolumeCount) bool {
+	for k, v := range c {
+		limit, hasLimit := limits[k]
+		if !hasLimit {
+			continue
+		}
+		if v > limit {
+			return true
+		}
+	}
+	return false
+}
+
+// Fits returns true if the rhs 'fits' within the volume count.
+func (c VolumeCount) Fits(rhs VolumeCount) bool {
+	for k, v := range rhs {
+		limit, hasLimit := c[k]
+		if !hasLimit {
+			continue
+		}
+		if v > limit {
+			return false
+		}
+	}
+	return true
+}
+
+func (v *VolumeLimits) Validate(ctx context.Context, pod *v1.Pod) (VolumeCount, error) {
+	podVolumes, err := v.validate(ctx, pod)
+	if err != nil {
+		return nil, err
+	}
+	result := VolumeCount{}
+	for k, v := range v.volumes.union(podVolumes) {
+		result[k] += len(v)
+	}
+	return result, nil
+}
+
+func (v *VolumeLimits) validate(ctx context.Context, pod *v1.Pod) (volumeUsage, error) {
+	podPVCs := volumeUsage{}
+
+	for _, volume := range pod.Spec.Volumes {
+		var pvcID string
+		var storageClassName *string
+		if volume.PersistentVolumeClaim != nil {
+			var pvc v1.PersistentVolumeClaim
+			if err := v.kubeClient.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: volume.PersistentVolumeClaim.ClaimName}, &pvc); err != nil {
+				return nil, err
+			}
+
+			pvcID = fmt.Sprintf("%s/%s", pod.Namespace, volume.PersistentVolumeClaim.ClaimName)
+			storageClassName = pvc.Spec.StorageClassName
+		} else if volume.Ephemeral != nil {
+			// generated name per https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/#persistentvolumeclaim-naming
+			pvcID = fmt.Sprintf("%s/%s-%s", pod.Namespace, pod.Name, volume.Name)
+			storageClassName = volume.Ephemeral.VolumeClaimTemplate.Spec.StorageClassName
+		} else {
+			continue
+		}
+
+		provisioner := "unspecified"
+		if storageClassName != nil {
+			var sc storagev1.StorageClass
+			if err := v.kubeClient.Get(ctx, client.ObjectKey{Name: *storageClassName}, &sc); err != nil {
+				return nil, err
+			}
+			provisioner = sc.Provisioner
+		}
+		podPVCs.Add(provisioner, pvcID)
+	}
+	return podPVCs, nil
+}
+
+func (v *VolumeLimits) DeletePod(key types.NamespacedName) {
+	delete(v.podVolumes, key)
+	// volume names could be duplicated, so we re-create our volumes
+	v.volumes = volumeUsage{}
+	for _, c := range v.podVolumes {
+		v.volumes.insert(c)
+	}
+}
+
+func (v *VolumeLimits) Copy() *VolumeLimits {
+	cp := &VolumeLimits{
+		kubeClient: v.kubeClient,
+		volumes:    v.volumes.copy(),
+		podVolumes: map[types.NamespacedName]volumeUsage{},
+	}
+	for k, v := range v.podVolumes {
+		cp.podVolumes[k] = v.copy()
+	}
+	return cp
+}
diff --git a/pkg/test/storage.go b/pkg/test/storage.go
index 8386cb81636b..ab9b507dade9 100644
--- a/pkg/test/storage.go
+++ b/pkg/test/storage.go
@@ -17,6 +17,8 @@ package test
 import (
 	"fmt"
 
+	"github.com/aws/aws-sdk-go/aws"
+
 	"github.com/imdario/mergo"
 	v1 "k8s.io/api/core/v1"
 	storagev1 "k8s.io/api/storage/v1"
@@ -77,7 +79,8 @@ func PersistentVolumeClaim(overrides ...PersistentVolumeClaimOptions) *v1.Persis
 
 type StorageClassOptions struct {
 	metav1.ObjectMeta
-	Zones []string
+	Zones       []string
+	Provisioner *string
 }
 
 func StorageClass(overrides ...StorageClassOptions) *storagev1.StorageClass {
@@ -92,10 +95,13 @@ func StorageClass(overrides ...StorageClassOptions) *storagev1.StorageClass {
 	if options.Zones != nil {
 		allowedTopologies = []v1.TopologySelectorTerm{{MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{{Key: v1.LabelTopologyZone, Values: options.Zones}}}}
 	}
+	if options.Provisioner == nil {
+		options.Provisioner = aws.String("test-provisioner")
+	}
 
 	return &storagev1.StorageClass{
 		ObjectMeta:        ObjectMeta(options.ObjectMeta),
-		Provisioner:       "test-provisioner",
+		Provisioner:       *options.Provisioner,
 		AllowedTopologies: allowedTopologies,
 	}
 }