Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

track volume mounts per node #1927

Merged
merged 1 commit into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions charts/karpenter/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ rules:
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["validatingwebhookconfigurations", "mutatingwebhookconfigurations"]
verbs: ["get", "watch", "list", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["csinodes"]
verbs: ["get", "watch", "list"]
4 changes: 1 addition & 3 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
metricspod "github.com/aws/karpenter/pkg/controllers/metrics/pod"
metricsprovisioner "github.com/aws/karpenter/pkg/controllers/metrics/provisioner"
"github.com/aws/karpenter/pkg/controllers/node"
"github.com/aws/karpenter/pkg/controllers/persistentvolumeclaim"
"github.com/aws/karpenter/pkg/controllers/provisioning"
"github.com/aws/karpenter/pkg/controllers/state"
"github.com/aws/karpenter/pkg/controllers/termination"
Expand Down Expand Up @@ -110,13 +109,12 @@ func main() {
logging.FromContext(ctx).Errorf("watching configmaps, config changes won't be applied immediately, %s", err)
}

cluster := state.NewCluster(ctx, manager.GetClient(), cloudProvider)
cluster := state.NewCluster(manager.GetClient(), cloudProvider)

if err := manager.RegisterControllers(ctx,
provisioning.NewController(ctx, cfg, manager.GetClient(), clientSet.CoreV1(), recorder, cloudProvider, cluster),
state.NewNodeController(manager.GetClient(), cluster),
state.NewPodController(manager.GetClient(), cluster),
persistentvolumeclaim.NewController(manager.GetClient()),
termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider),
node.NewController(manager.GetClient(), cloudProvider),
metricspod.NewController(manager.GetClient()),
Expand Down
6 changes: 6 additions & 0 deletions hack/docs/metrics_gen_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ func handleVariableDeclaration(v *ast.GenDecl) []metricInfo {
}

func getFuncPackage(fun ast.Expr) string {
if pexpr, ok := fun.(*ast.ParenExpr); ok {
return getFuncPackage(pexpr.X)
}
if sexpr, ok := fun.(*ast.StarExpr); ok {
return getFuncPackage(sexpr.X)
}
if sel, ok := fun.(*ast.SelectorExpr); ok {
return fmt.Sprintf("%s", sel.X)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/cloudprovider/aws/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"github.com/aws/karpenter/pkg/utils/sets"
)

var _ cloudprovider.InstanceType = (*InstanceType)(nil)

type InstanceType struct {
*ec2.InstanceTypeInfo
offerings []cloudprovider.Offering
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ var _ = BeforeSuite(func() {
kubeClient: e.Client,
}
registry.RegisterOrDie(ctx, cloudProvider)
cluster = state.NewCluster(ctx, e.Client, cloudProvider)
cluster = state.NewCluster(e.Client, cloudProvider)
recorder = test.NewEventRecorder()
cfg = test.NewConfig()
controller = provisioning.NewController(ctx, cfg, e.Client, clientSet.CoreV1(), recorder, cloudProvider, cluster)
Expand Down
3 changes: 3 additions & 0 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type CloudProvider struct {
CreateCalls []*cloudprovider.NodeRequest
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)
var _ cloudprovider.InstanceType = (*InstanceType)(nil)

func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.NodeRequest) (*v1.Node, error) {
c.mu.Lock()
c.CreateCalls = append(c.CreateCalls, nodeRequest)
Expand Down
8 changes: 1 addition & 7 deletions pkg/config/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,7 @@ var _ = Describe("Batch Parameter", func() {
var changed int64
cfg.OnChange(func(c config.Config) {
defer GinkgoRecover()
// we can't unregister this, so just check for the one case we care about
if atomic.LoadInt64(&changed) == 0 {
atomic.StoreInt64(&changed, 1)
Expect(cfg.BatchIdleDuration()).To(Equal(1 * time.Second))
// shouldn't be changed
Expect(cfg.BatchMaxDuration()).To(Equal(10 * time.Second))
}
atomic.StoreInt64(&changed, 1)
})

// simulate user updating the config map with a bad max duration
Expand Down
126 changes: 0 additions & 126 deletions pkg/controllers/persistentvolumeclaim/controller.go

This file was deleted.

88 changes: 0 additions & 88 deletions pkg/controllers/persistentvolumeclaim/suite_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (p *Provisioner) schedule(ctx context.Context, pods []*v1.Pod) ([]*schedule
return nil, fmt.Errorf("getting daemon overhead, %w", err)
}

return scheduler.NewScheduler(nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods)
return scheduler.NewScheduler(ctx, p.kubeClient, nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods)
}

func (p *Provisioner) launch(ctx context.Context, node *scheduler.Node) error {
Expand Down
23 changes: 19 additions & 4 deletions pkg/controllers/provisioning/scheduling/inflightnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
package scheduling

import (
"context"
"fmt"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
Expand All @@ -35,20 +36,23 @@ type InFlightNode struct {
requirements scheduling.Requirements
available v1.ResourceList
startupTolerations []v1.Toleration
hostPortUsage *state.HostPortUsage
hostPortUsage *scheduling.HostPortUsage
tzneal marked this conversation as resolved.
Show resolved Hide resolved
volumeUsage *scheduling.VolumeLimits
volumeLimits scheduling.VolumeCount
}

func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint, daemonResources v1.ResourceList) *InFlightNode {
// the remaining daemonResources to schedule are the total daemonResources minus what has already scheduled
remainingDaemonResources := resources.Subtract(daemonResources, n.DaemonSetRequested)

node := &InFlightNode{
Node: n.Node,
available: n.Available,
topology: topology,
requests: remainingDaemonResources,
requirements: scheduling.NewLabelRequirements(n.Node.Labels),
hostPortUsage: n.HostPortUsage.Copy(),
volumeUsage: n.VolumeUsage.Copy(),
volumeLimits: n.VolumeLimits,
}

if n.Node.Labels[v1alpha5.LabelNodeInitialized] != "true" {
Expand Down Expand Up @@ -81,15 +85,24 @@ func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint
return node
}

func (n *InFlightNode) Add(pod *v1.Pod) error {
func (n *InFlightNode) Add(ctx context.Context, pod *v1.Pod) error {
// Check Taints
if err := scheduling.Taints(n.Node.Spec.Taints).Tolerates(pod, n.startupTolerations...); err != nil {
return err
}

if err := n.hostPortUsage.Add(pod); err != nil {
if err := n.hostPortUsage.Validate(pod); err != nil {
ellistarn marked this conversation as resolved.
Show resolved Hide resolved
return err
}

// determine the number of volumes that will be mounted if the pod schedules
mountedVolumeCount, err := n.volumeUsage.Validate(ctx, pod)
if err != nil {
return err
}
if mountedVolumeCount.Exceeds(n.volumeLimits) {
return fmt.Errorf("would exceed node volume limits")
}

// check resource requests first since that's a pretty likely reason the pod won't schedule on an in-flight
// node, which at this point can't be increased in size
Expand Down Expand Up @@ -122,5 +135,7 @@ func (n *InFlightNode) Add(pod *v1.Pod) error {
n.requests = requests
n.requirements = nodeRequirements
n.topology.Record(pod, nodeRequirements)
n.hostPortUsage.Add(ctx, pod)
n.volumeUsage.Add(ctx, pod)
return nil
}
Loading