Skip to content

Commit

Permalink
fix: track volume mounts per node
Browse files Browse the repository at this point in the history
Assume nodes can support infinite volumes until they launch. Once the CSI driver is
reporting the current number of mountable volumes, use that value which may require
launching more nodes. Remove the PVC controller since we no longer bind pods to
nodes.

Fixes #919 and #1888
  • Loading branch information
tzneal committed Jun 20, 2022
1 parent aa9777b commit 8e5bec6
Show file tree
Hide file tree
Showing 26 changed files with 572 additions and 327 deletions.
3 changes: 3 additions & 0 deletions charts/karpenter/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ rules:
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["validatingwebhookconfigurations", "mutatingwebhookconfigurations"]
verbs: ["get", "watch", "list", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["csinodes"]
verbs: ["get", "watch", "list"]
4 changes: 1 addition & 3 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
metricspod "github.com/aws/karpenter/pkg/controllers/metrics/pod"
metricsprovisioner "github.com/aws/karpenter/pkg/controllers/metrics/provisioner"
"github.com/aws/karpenter/pkg/controllers/node"
"github.com/aws/karpenter/pkg/controllers/persistentvolumeclaim"
"github.com/aws/karpenter/pkg/controllers/provisioning"
"github.com/aws/karpenter/pkg/controllers/termination"
"github.com/aws/karpenter/pkg/utils/injection"
Expand Down Expand Up @@ -107,13 +106,12 @@ func main() {
logging.FromContext(ctx).Errorf("watching configmaps, config changes won't be applied immediately, %s", err)
}

cluster := state.NewCluster(ctx, manager.GetClient(), cloudProvider)
cluster := state.NewCluster(manager.GetClient(), cloudProvider)

if err := manager.RegisterControllers(ctx,
provisioning.NewController(ctx, cfg, manager.GetClient(), clientSet.CoreV1(), recorder, cloudProvider, cluster),
state.NewNodeController(manager.GetClient(), cluster),
state.NewPodController(manager.GetClient(), cluster),
persistentvolumeclaim.NewController(manager.GetClient()),
termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider),
node.NewController(manager.GetClient(), cloudProvider),
metricspod.NewController(manager.GetClient()),
Expand Down
6 changes: 6 additions & 0 deletions hack/docs/metrics_gen_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ func handleVariableDeclaration(v *ast.GenDecl) []metricInfo {
}

func getFuncPackage(fun ast.Expr) string {
if pexpr, ok := fun.(*ast.ParenExpr); ok {
return getFuncPackage(pexpr.X)
}
if sexpr, ok := fun.(*ast.StarExpr); ok {
return getFuncPackage(sexpr.X)
}
if sel, ok := fun.(*ast.SelectorExpr); ok {
return fmt.Sprintf("%s", sel.X)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,4 @@ func (a *AWS) validateKubeletConfiguration(kubeletConfig *v1alpha5.KubeletConfig
}
}
return nil
}
}
2 changes: 2 additions & 0 deletions pkg/cloudprovider/aws/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"github.com/aws/karpenter/pkg/utils/sets"
)

var _ cloudprovider.InstanceType = (*InstanceType)(nil)

type InstanceType struct {
*ec2.InstanceTypeInfo
offerings []cloudprovider.Offering
Expand Down
7 changes: 4 additions & 3 deletions pkg/cloudprovider/aws/instancetypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ func (p *InstanceTypeProvider) newInstanceType(ctx context.Context, info *ec2.In
provider: provider,
offerings: offerings,
}
opts := injection.GetOptions(ctx)
// Precompute to minimize memory/compute overhead
instanceType.resources = instanceType.computeResources(injection.GetOptions(ctx).AWSEnablePodENI)
instanceType.overhead = instanceType.computeOverhead(injection.GetOptions(ctx).VMMemoryOverhead)
instanceType.resources = instanceType.computeResources(opts.AWSEnablePodENI)
instanceType.overhead = instanceType.computeOverhead(opts.VMMemoryOverhead)
instanceType.requirements = instanceType.computeRequirements()
if !injection.GetOptions(ctx).AWSENILimitedPodDensity {
if !opts.AWSENILimitedPodDensity {
instanceType.maxPods = ptr.Int32(110)
}
return instanceType
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ var _ = BeforeSuite(func() {
},
}
registry.RegisterOrDie(ctx, cloudProvider)
cluster = state.NewCluster(ctx, e.Client, cloudProvider)
cluster = state.NewCluster(e.Client, cloudProvider)
recorder = test.NewEventRecorder()
cfg = test.NewConfig()
controller = provisioning.NewController(ctx, cfg, e.Client, clientSet.CoreV1(), recorder, cloudProvider, cluster)
Expand Down
3 changes: 3 additions & 0 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type CloudProvider struct {
CreateCalls []*cloudprovider.NodeRequest
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)
var _ cloudprovider.InstanceType = (*InstanceType)(nil)

func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.NodeRequest) (*v1.Node, error) {
c.mu.Lock()
c.CreateCalls = append(c.CreateCalls, nodeRequest)
Expand Down
8 changes: 1 addition & 7 deletions pkg/config/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,7 @@ var _ = Describe("Batch Parameter", func() {
var changed int64
cfg.OnChange(func(c config.Config) {
defer GinkgoRecover()
// we can't unregister this, so just check for the one case we care about
if atomic.LoadInt64(&changed) == 0 {
atomic.StoreInt64(&changed, 1)
Expect(cfg.BatchIdleDuration()).To(Equal(1 * time.Second))
// shouldn't be changed
Expect(cfg.BatchMaxDuration()).To(Equal(10 * time.Second))
}
atomic.StoreInt64(&changed, 1)
})

// simulate user updating the config map with a bad max duration
Expand Down
126 changes: 0 additions & 126 deletions pkg/controllers/persistentvolumeclaim/controller.go

This file was deleted.

88 changes: 0 additions & 88 deletions pkg/controllers/persistentvolumeclaim/suite_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (p *Provisioner) schedule(ctx context.Context, pods []*v1.Pod) ([]*schedule
return nil, fmt.Errorf("getting daemon overhead, %w", err)
}

return scheduler.NewScheduler(nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods)
return scheduler.NewScheduler(ctx, p.kubeClient, nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods)
}

func (p *Provisioner) launch(ctx context.Context, node *scheduler.Node) error {
Expand Down
Loading

0 comments on commit 8e5bec6

Please sign in to comment.