Skip to content

Commit

Permalink
fix: track volume mounts per node
Browse files Browse the repository at this point in the history
Assume nodes can support infinite volumes until they launch. Once the CSI driver is
reporting the current number of mountable volumes, use that value which may require
launching more nodes. Remove the PVC controller since we no longer bind pods to
nodes.

Fixes #919 and #1888
  • Loading branch information
tzneal committed Jun 21, 2022
1 parent 7b48a62 commit 3a77cca
Show file tree
Hide file tree
Showing 24 changed files with 587 additions and 342 deletions.
3 changes: 3 additions & 0 deletions charts/karpenter/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ rules:
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["validatingwebhookconfigurations", "mutatingwebhookconfigurations"]
verbs: ["get", "watch", "list", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["csinodes"]
verbs: ["get", "watch", "list"]
4 changes: 1 addition & 3 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
metricspod "github.com/aws/karpenter/pkg/controllers/metrics/pod"
metricsprovisioner "github.com/aws/karpenter/pkg/controllers/metrics/provisioner"
"github.com/aws/karpenter/pkg/controllers/node"
"github.com/aws/karpenter/pkg/controllers/persistentvolumeclaim"
"github.com/aws/karpenter/pkg/controllers/provisioning"
"github.com/aws/karpenter/pkg/controllers/state"
"github.com/aws/karpenter/pkg/controllers/termination"
Expand Down Expand Up @@ -110,13 +109,12 @@ func main() {
logging.FromContext(ctx).Errorf("watching configmaps, config changes won't be applied immediately, %s", err)
}

cluster := state.NewCluster(ctx, manager.GetClient(), cloudProvider)
cluster := state.NewCluster(manager.GetClient(), cloudProvider)

if err := manager.RegisterControllers(ctx,
provisioning.NewController(ctx, cfg, manager.GetClient(), clientSet.CoreV1(), recorder, cloudProvider, cluster),
state.NewNodeController(manager.GetClient(), cluster),
state.NewPodController(manager.GetClient(), cluster),
persistentvolumeclaim.NewController(manager.GetClient()),
termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider),
node.NewController(manager.GetClient(), cloudProvider),
metricspod.NewController(manager.GetClient()),
Expand Down
6 changes: 6 additions & 0 deletions hack/docs/metrics_gen_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ func handleVariableDeclaration(v *ast.GenDecl) []metricInfo {
}

func getFuncPackage(fun ast.Expr) string {
if pexpr, ok := fun.(*ast.ParenExpr); ok {
return getFuncPackage(pexpr.X)
}
if sexpr, ok := fun.(*ast.StarExpr); ok {
return getFuncPackage(sexpr.X)
}
if sel, ok := fun.(*ast.SelectorExpr); ok {
return fmt.Sprintf("%s", sel.X)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/cloudprovider/aws/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"github.com/aws/karpenter/pkg/utils/sets"
)

var _ cloudprovider.InstanceType = (*InstanceType)(nil)

type InstanceType struct {
*ec2.InstanceTypeInfo
offerings []cloudprovider.Offering
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ var _ = BeforeSuite(func() {
kubeClient: e.Client,
}
registry.RegisterOrDie(ctx, cloudProvider)
cluster = state.NewCluster(ctx, e.Client, cloudProvider)
cluster = state.NewCluster(e.Client, cloudProvider)
recorder = test.NewEventRecorder()
cfg = test.NewConfig()
controller = provisioning.NewController(ctx, cfg, e.Client, clientSet.CoreV1(), recorder, cloudProvider, cluster)
Expand Down
3 changes: 3 additions & 0 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type CloudProvider struct {
CreateCalls []*cloudprovider.NodeRequest
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)
var _ cloudprovider.InstanceType = (*InstanceType)(nil)

func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.NodeRequest) (*v1.Node, error) {
c.mu.Lock()
c.CreateCalls = append(c.CreateCalls, nodeRequest)
Expand Down
8 changes: 1 addition & 7 deletions pkg/config/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,7 @@ var _ = Describe("Batch Parameter", func() {
var changed int64
cfg.OnChange(func(c config.Config) {
defer GinkgoRecover()
// we can't unregister this, so just check for the one case we care about
if atomic.LoadInt64(&changed) == 0 {
atomic.StoreInt64(&changed, 1)
Expect(cfg.BatchIdleDuration()).To(Equal(1 * time.Second))
// shouldn't be changed
Expect(cfg.BatchMaxDuration()).To(Equal(10 * time.Second))
}
atomic.StoreInt64(&changed, 1)
})

// simulate user updating the config map with a bad max duration
Expand Down
126 changes: 0 additions & 126 deletions pkg/controllers/persistentvolumeclaim/controller.go

This file was deleted.

88 changes: 0 additions & 88 deletions pkg/controllers/persistentvolumeclaim/suite_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (p *Provisioner) schedule(ctx context.Context, pods []*v1.Pod) ([]*schedule
return nil, fmt.Errorf("getting daemon overhead, %w", err)
}

return scheduler.NewScheduler(nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods)
return scheduler.NewScheduler(ctx, p.kubeClient, nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods)
}

func (p *Provisioner) launch(ctx context.Context, node *scheduler.Node) error {
Expand Down
23 changes: 19 additions & 4 deletions pkg/controllers/provisioning/scheduling/inflightnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
package scheduling

import (
"context"
"fmt"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
Expand All @@ -35,20 +36,23 @@ type InFlightNode struct {
requirements scheduling.Requirements
available v1.ResourceList
startupTolerations []v1.Toleration
hostPortUsage *state.HostPortUsage
hostPortUsage *scheduling.HostPortUsage
volumeUsage *scheduling.VolumeLimits
volumeLimits scheduling.VolumeCount
}

func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint, daemonResources v1.ResourceList) *InFlightNode {
// the remaining daemonResources to schedule are the total daemonResources minus what has already scheduled
remainingDaemonResources := resources.Subtract(daemonResources, n.DaemonSetRequested)

node := &InFlightNode{
Node: n.Node,
available: n.Available,
topology: topology,
requests: remainingDaemonResources,
requirements: scheduling.NewLabelRequirements(n.Node.Labels),
hostPortUsage: n.HostPortUsage.Copy(),
volumeUsage: n.VolumeUsage.Copy(),
volumeLimits: n.VolumeLimits,
}

if n.Node.Labels[v1alpha5.LabelNodeInitialized] != "true" {
Expand Down Expand Up @@ -81,15 +85,24 @@ func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint
return node
}

func (n *InFlightNode) Add(pod *v1.Pod) error {
func (n *InFlightNode) Add(ctx context.Context, pod *v1.Pod) error {
// Check Taints
if err := scheduling.Taints(n.Node.Spec.Taints).Tolerates(pod, n.startupTolerations...); err != nil {
return err
}

if err := n.hostPortUsage.Add(pod); err != nil {
if err := n.hostPortUsage.Validate(pod); err != nil {
return err
}

// determine the number of volumes that will be mounted if the pod schedules
mountedVolumeCount, err := n.volumeUsage.Validate(ctx, pod)
if err != nil {
return err
}
if mountedVolumeCount.Exceeds(n.volumeLimits) {
return fmt.Errorf("would exceed node volume limits")
}

// check resource requests first since that's a pretty likely reason the pod won't schedule on an in-flight
// node, which at this point can't be increased in size
Expand Down Expand Up @@ -122,5 +135,7 @@ func (n *InFlightNode) Add(pod *v1.Pod) error {
n.requests = requests
n.requirements = nodeRequirements
n.topology.Record(pod, nodeRequirements)
n.hostPortUsage.Add(ctx, pod)
n.volumeUsage.Add(ctx, pod)
return nil
}
Loading

0 comments on commit 3a77cca

Please sign in to comment.