-
Notifications
You must be signed in to change notification settings - Fork 979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
track volume mounts per node #1927
Conversation
✅ Deploy Preview for karpenter-docs-prod canceled.
|
d75a5a2
to
c2388d4
Compare
pkg/cloudprovider/types.go
Outdated
@@ -74,6 +74,8 @@ type InstanceType interface { | |||
// Overhead is the amount of resource overhead expected to be used by kubelet and any other system daemons outside | |||
// of Kubernetes. | |||
Overhead() v1.ResourceList | |||
// MaxVolumes returns the maximum number of volumes that can be mounted to the node. | |||
MaxVolumes() int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be expressed via Resources()
?
From a real node:
capacity:
attachable-volumes-aws-ebs: "25"
cpu: "8"
ephemeral-storage: 83873772Ki
hugepages-1Gi: "0"
hugepages-2Mi: "0"
memory: 32521312Ki
pods: "58"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See below, that number is incorrect.
node := &InFlightNode{ | ||
Node: n.Node, | ||
maxVolumes: n.InstanceType.MaxVolumes(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it not make sense to store the entire state.Node if we're using multiple piece of data from it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few of the pieces we use are intentionally deep copied so we can modify them.
34cdf72
to
759d88f
Compare
4bde825
to
9da3c3a
Compare
@@ -91,7 +96,13 @@ func (n *Node) Add(pod *v1.Pod) error { | |||
|
|||
// Check instance type combinations | |||
requests := resources.Merge(n.requests, resources.RequestsForPods(pod)) | |||
instanceTypes := filterInstanceTypes(n.InstanceTypeOptions, nodeRequirements, requests) | |||
// determine the number of volumes that will be mounted if the pod schedules | |||
mountedVolumeCount, err := n.volumeUsage.Validate(pod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We call this variable mountedVolumeCount
and volumesPerNode
in different parts of the code. Can we simplify to a single concept to minimize cognitive load for the reader? Perhaps just volumeCount
with "mounted" and "node" implied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The results of Validate()
is called mountedVolumeCount
both times it's used. This is the number of volumes that would be mounted on the node if the pod were added. I think renaming this to volumeCount is slightly less clear. I renamed volumesPerNode
to mountedVolumeCount
.
@@ -196,7 +200,7 @@ func (s *Scheduler) add(pod *v1.Pod) error { | |||
} | |||
} | |||
|
|||
node := NewNode(nodeTemplate, s.topology, s.daemonOverhead[nodeTemplate], instanceTypes) | |||
node := NewNode(s.ctx, s.kubeClient, nodeTemplate, s.topology, s.daemonOverhead[nodeTemplate], instanceTypes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we using a different context object here? How does this differ from ctx
on line 103?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just wasn't passed down and was only used for logging. Modified to pass it through add().
pkg/scheduling/hostportusage.go
Outdated
} | ||
|
||
// Add adds a port to the HostPortUsage, returning an error in the case of a conflict | ||
func (u *HostPortUsage) Add(pod *v1.Pod) error { | ||
func (u *HostPortUsage) Add(pod *v1.Pod) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not pass the context in through the function arguments like we do everywhere else? IIUC, go style recommends passing context through call chains and avoiding setting as an object property. There are a few cases (e.g. provisioner.go) where this pattern must be violated, since it's not triggered by anything, but this isn't the case here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified, I started this code in cluster state which had a stored context already for the same reason provisioner does.
pkg/scheduling/volumelimits.go
Outdated
} | ||
|
||
func (u *volumeUsage) insert(c volumeUsage) { | ||
for k, v := range c { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm having a hard time reading this. What's c
stand for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collection, just renamed it to volumes.
pkg/scheduling/volumelimits.go
Outdated
return cp | ||
} | ||
|
||
func NewVolumeLimits(ctx context.Context, kubeClient client.Client) *VolumeLimits { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
likewise
func NewVolumeLimits(ctx context.Context, kubeClient client.Client) *VolumeLimits { | |
func NewVolumeLimits(ctx context.Context, kubeClient client.Client) VolumeLimits { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VolumeLimits
isn't a map type, it's a struct.
} | ||
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html | ||
switch i.Name() { | ||
case "d3.8xlarge", "d3en.12xlarge": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I right that AWS exposes no API for this?
@@ -293,6 +300,36 @@ func (i *InstanceType) eniLimitedPods() int64 { | |||
return *i.NetworkInfo.MaximumNetworkInterfaces*(*i.NetworkInfo.Ipv4AddressesPerInterface-1) + 2 | |||
} | |||
|
|||
func (i *InstanceType) computeVolumeLimits(ebsVolumeAttachLimit int) map[string]int { | |||
result := map[string]int{ | |||
ebsCsiProvisioner: 27, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This default only makes sense for Nitro instances. Obviously that's the majority of instances now ... but this might behave funny on an older non-Nitro instance.
@@ -40,6 +40,8 @@ const ( | |||
InstanceTypeZonesCacheKey = "zones" | |||
InstanceTypesAndZonesCacheTTL = 5 * time.Minute | |||
UnfulfillableCapacityErrorCacheTTL = 3 * time.Minute | |||
|
|||
ebsCsiProvisioner = "ebs.csi.aws.com" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I missing where this is used?
@@ -293,6 +300,36 @@ func (i *InstanceType) eniLimitedPods() int64 { | |||
return *i.NetworkInfo.MaximumNetworkInterfaces*(*i.NetworkInfo.Ipv4AddressesPerInterface-1) + 2 | |||
} | |||
|
|||
func (i *InstanceType) computeVolumeLimits(ebsVolumeAttachLimit int) map[string]int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought here... rather than hard-coding this list, what if Karpenter just used the CSINode
data for each node in the cluster to decide whether or not an already-running node was really eligible to launch a new pod?
apiVersion: storage.k8s.io/v1
kind: CSINode
metadata:
creationTimestamp: "2022-06-14T17:13:09Z"
name: ip-100-64-1-228.us-west-2.compute.internal
ownerReferences:
- apiVersion: v1
kind: Node
name: ip-100-64-1-228.us-west-2.compute.internal
uid: 8126d39d-afa1-4967-8b5d-7d181e103545
resourceVersion: "693807159"
uid: 95d3d0d2-f92a-47ec-b0df-4abfd33ea3da
spec:
drivers:
- name: efs.csi.aws.com
nodeID: i-01b557d4a5dc348ab
topologyKeys: null
- allocatable:
count: 15
name: ebs.csi.aws.com
nodeID: i-01b557d4a5dc348ab
topologyKeys:
- topology.ebs.csi.aws.com/zone
Right now, as I understand it, Karpenter tries to replicate much of the kube-scheduler
scheduling logic to determine whether not a Pod
is going to be schedulable against the current cluster hardware or not. When Karpenter sees a Pod
in status: Pending
, it could go through its normal logic loop... but as its iterating over the nodes to determine whether or not the Pod should be schedulable, it could also take into account the allocatable.count
values from the CSINodes
API.
I guess the disadvantage of this model is that if you launch a large number of pods all at once, Karpenter might under-or-over-launch nodes because it still needs some concept of "default number of volumes that can be mounted". So perhaps you'd still need the ebsVolumeAttachLimit
setting.
The advantage though is that you don't have to guess what the volume limit is for existing hosts, because it is exposed via the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The downside is that we'll have slower provisioning in the event your pods hit the volume limit, but it probably doesn't matter much as its somewhat unlikely to hit the volume limits. Looking into it.
af94c35
to
fc34204
Compare
717185e
to
8e5bec6
Compare
pkg/controllers/state/cluster.go
Outdated
err = multierr.Append(err, c.populateProvisioner(ctx, node, n)) | ||
err = multierr.Append(err, c.populateVolumeLimits(ctx, node, n)) | ||
err = multierr.Append(err, c.populateResourceRequests(ctx, node, n)) | ||
return n, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: If you're not familiar with multierr.Combine. It can help you achieve declarative, functional code.
return &Node{
Node: node,
HostPortUsage: NewHostPortUsage(),
HostPortUsage: scheduling.NewHostPortUsage(),
VolumeUsage: scheduling.NewVolumeLimits(c.kubeClient),
VolumeLimits: scheduling.VolumeCount{},
podRequests: map[types.NamespacedName]v1.ResourceList{},
}, multierr.Combine(
c.populateProvisioner(ctx, node, n)
c.populateVolumeLimits(ctx, node, n),
c.populateResourceRequests(ctx, node, n),
)
pkg/controllers/state/cluster.go
Outdated
func (c *Cluster) getNodeAllocatable(node *v1.Node, provisioner *v1alpha5.Provisioner) v1.ResourceList { | ||
instanceType, err := c.getInstanceType(c.ctx, provisioner, node.Labels[v1.LabelInstanceTypeStable]) | ||
func (c *Cluster) getNodeAllocatable(ctx context.Context, node *v1.Node, provisioner *v1alpha5.Provisioner) v1.ResourceList { | ||
instanceType, err := c.getInstanceType(ctx, provisioner, node.Labels[v1.LabelInstanceTypeStable]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we're not handling this error and bubbling it up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can't find the instance type, it only matters for not ready nodes. We then treat them as having no capacity, once kubelet is running, not finding an instance type means something weird has happened (instance type is no longer reported by EC2 API or some other odd error), but it doesn't affect anything so we just log it.
pkg/controllers/state/cluster.go
Outdated
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
c.nodes[node.Name] = c.newNode(node) | ||
var err error | ||
c.nodes[node.Name], err = c.newNode(ctx, node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an error occurs, will we inadvertently set c.nodes[node.Name] to nil? This will cause it to show up when looping over the nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously newNode() always returned a node object, even if it failed to populate some portion of it. I could see it going both ways as some semi-failure modes aren't real failures in most cases (e.g. unable to read the CSINode), but for now I've modified it to return nil in the event of error for consistency sake.
} | ||
|
||
// node didn't exist, but creating it will pick up this newly bound pod as well | ||
n = c.newNode(&node) | ||
var err error | ||
n, err = c.newNode(ctx, &node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same re: setting if err != nil
func (u *HostPortUsage) Add(ctx context.Context, pod *v1.Pod) { | ||
newUsage, err := u.validate(pod) | ||
if err != nil { | ||
logging.FromContext(ctx).Errorf("invariant violated registering host port usage, %s, please file an issue", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should do a similar error propagation pattern here. This will bubble up to adding a pod to a node, which shouldn't block other pods.
8e5bec6
to
a2a6e1d
Compare
a2a6e1d
to
3a77cca
Compare
This might be a silly question, but do we no longer need the PersistentVolumeClaim controller given we're no longer pre-binding pods? |
Discussed with @tzneal offline and the short answer is "yes". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
func (c *Cluster) updatePod(pod *v1.Pod) { | ||
c.updateNodeUsageFromPod(pod) | ||
func (c *Cluster) updatePod(ctx context.Context, pod *v1.Pod) error { | ||
err := c.updateNodeUsageFromPod(ctx, pod) | ||
c.updatePodAntiAffinities(pod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we update if there's an error?
@@ -92,10 +95,13 @@ func StorageClass(overrides ...StorageClassOptions) *storagev1.StorageClass { | |||
if options.Zones != nil { | |||
allowedTopologies = []v1.TopologySelectorTerm{{MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{{Key: v1.LabelTopologyZone, Values: options.Zones}}}} | |||
} | |||
if options.Provisioner == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider just checking "" and not making options.Provisioner a pointer to avoid type juggling.
1. Issue, if available:
Fixes #919 and #1888
2. Description of changes:
track volume mounts per node
Assume nodes can support infinite volumes until they launch. Once the CSI driver is
reporting the current number of mountable volumes, use that value which may require
launching more nodes.
3. How was this change tested?
Unit testing & deployed to EKS with my EBS CSI driver configured with
--volume-attach-limit=3
. When the driver created the CSINode Karpenter launched another node for the remaining volumes.4. Does this change impact docs?
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.