Skip to content

Commit

Permalink
feat: add instance type drift for instance types that are no longer d…
Browse files Browse the repository at this point in the history
…iscoverable (#1787)
  • Loading branch information
njtran authored Nov 8, 2024
1 parent 650bf3d commit d22742d
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 50 deletions.
17 changes: 17 additions & 0 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ func (c *consolidation) markConsolidated() {

// ShouldDisrupt is a predicate used to filter candidates
func (c *consolidation) ShouldDisrupt(_ context.Context, cn *Candidate) bool {
// We need the following to know what the price of the instance for price comparison. If one of these doesn't exist, we can't
// compute consolidation decisions for this candidate.
// 1. Instance Type
// 2. Capacity Type
// 3. Zone
if cn.instanceType == nil {
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("Instance Type %q not found", cn.Labels()[corev1.LabelInstanceTypeStable]))...)
return false
}
if _, ok := cn.Labels()[v1.CapacityTypeLabelKey]; !ok {
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("Node does not have label %q", v1.CapacityTypeLabelKey))...)
return false
}
if _, ok := cn.Labels()[corev1.LabelTopologyZone]; !ok {
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("Node does not have label %q", corev1.LabelTopologyZone))...)
return false
}
if cn.nodePool.Spec.Disruption.ConsolidateAfter.Duration == nil {
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("NodePool %q has consolidation disabled", cn.nodePool.Name))...)
return false
Expand Down
39 changes: 39 additions & 0 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,45 @@ var _ = Describe("Consolidation", func() {
// and each of the consolidation mechanisms specifies that this event should be fired
Expect(recorder.Calls("Unconsolidatable")).To(Equal(6))
})
It("should fire an event when a candidate does not have a resolvable instance type", func() {
pod := test.Pod()
delete(nodeClaim.Labels, corev1.LabelInstanceTypeStable)
delete(node.Labels, corev1.LabelInstanceTypeStable)

ExpectApplied(ctx, env.Client, pod, node, nodeClaim, nodePool)
ExpectManualBinding(ctx, env.Client, pod, node)

ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim})
ExpectSingletonReconciled(ctx, disruptionController)
// We get four calls since we only care about this since we don't emit for empty node consolidation
Expect(recorder.Calls("Unconsolidatable")).To(Equal(4))
})
It("should fire an event when a candidate does not have the capacity type label", func() {
pod := test.Pod()
delete(nodeClaim.Labels, v1.CapacityTypeLabelKey)
delete(node.Labels, v1.CapacityTypeLabelKey)

ExpectApplied(ctx, env.Client, pod, node, nodeClaim, nodePool)
ExpectManualBinding(ctx, env.Client, pod, node)

ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim})
ExpectSingletonReconciled(ctx, disruptionController)
// We get four calls since we only care about this since we don't emit for empty node consolidation
Expect(recorder.Calls("Unconsolidatable")).To(Equal(4))
})
It("should fire an event when a candidate does not have the zone label", func() {
pod := test.Pod()
delete(nodeClaim.Labels, corev1.LabelTopologyZone)
delete(node.Labels, corev1.LabelTopologyZone)

ExpectApplied(ctx, env.Client, pod, node, nodeClaim, nodePool)
ExpectManualBinding(ctx, env.Client, pod, node)

ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim})
ExpectSingletonReconciled(ctx, disruptionController)
// We get four calls since we only care about this since we don't emit for empty node consolidation
Expect(recorder.Calls("Unconsolidatable")).To(Equal(4))
})
})
Context("Metrics", func() {
It("should correctly report eligible nodes", func() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func instanceTypesAreSubset(lhs []*cloudprovider.InstanceType, rhs []*cloudprovi

// GetCandidates returns nodes that appear to be currently deprovisionable based off of their nodePool
func GetCandidates(ctx context.Context, cluster *state.Cluster, kubeClient client.Client, recorder events.Recorder, clk clock.Clock,
cloudProvider cloudprovider.CloudProvider, shouldDeprovision CandidateFilter, disruptionClass string, queue *orchestration.Queue,
cloudProvider cloudprovider.CloudProvider, shouldDisrupt CandidateFilter, disruptionClass string, queue *orchestration.Queue,
) ([]*Candidate, error) {
nodePoolMap, nodePoolToInstanceTypesMap, err := BuildNodePoolMap(ctx, kubeClient, cloudProvider)
if err != nil {
Expand All @@ -156,7 +156,7 @@ func GetCandidates(ctx context.Context, cluster *state.Cluster, kubeClient clien
return cn, e == nil
})
// Filter only the valid candidates that we should disrupt
return lo.Filter(candidates, func(c *Candidate, _ int) bool { return shouldDeprovision(ctx, c) }), nil
return lo.Filter(candidates, func(c *Candidate, _ int) bool { return shouldDisrupt(ctx, c) }), nil
}

// BuildNodePoolMap builds a provName -> nodePool map and a provName -> instanceName -> instance type map
Expand Down
24 changes: 8 additions & 16 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1676,7 +1676,7 @@ var _ = Describe("Candidate Filtering", func() {
Expect(err.Error()).To(Equal(fmt.Sprintf("nodepool %q can't be resolved for state node", nodePool.Name)))
Expect(recorder.DetectedEvent(fmt.Sprintf("Cannot disrupt Node: NodePool %q not found", nodePool.Name))).To(BeTrue())
})
It("should not consider candidates that do not have the karpenter.sh/capacity-type label", func() {
It("should consider candidates that do not have the karpenter.sh/capacity-type label", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand All @@ -1691,11 +1691,9 @@ var _ = Describe("Candidate Filtering", func() {

Expect(cluster.Nodes()).To(HaveLen(1))
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(`state node doesn't have required label "karpenter.sh/capacity-type"`))
Expect(recorder.DetectedEvent(`Cannot disrupt Node: state node doesn't have required label "karpenter.sh/capacity-type"`)).To(BeTrue())
Expect(err).ToNot(HaveOccurred())
})
It("should not consider candidates that do not have the topology.kubernetes.io/zone label", func() {
It("should consider candidates that do not have the topology.kubernetes.io/zone label", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand All @@ -1710,11 +1708,9 @@ var _ = Describe("Candidate Filtering", func() {

Expect(cluster.Nodes()).To(HaveLen(1))
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(`state node doesn't have required label "topology.kubernetes.io/zone"`))
Expect(recorder.DetectedEvent(`Cannot disrupt Node: state node doesn't have required label "topology.kubernetes.io/zone"`)).To(BeTrue())
Expect(err).ToNot(HaveOccurred())
})
It("should not consider candidates that do not have the node.kubernetes.io/instance-type label", func() {
It("should consider candidates that do not have the node.kubernetes.io/instance-type label", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand All @@ -1729,11 +1725,9 @@ var _ = Describe("Candidate Filtering", func() {

Expect(cluster.Nodes()).To(HaveLen(1))
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(`state node doesn't have required label "node.kubernetes.io/instance-type"`))
Expect(recorder.DetectedEvent(`Cannot disrupt Node: state node doesn't have required label "node.kubernetes.io/instance-type"`)).To(BeTrue())
Expect(err).ToNot((HaveOccurred()))
})
It("should not consider candidates that have an instance type that cannot be resolved", func() {
It("should consider candidates that have an instance type that cannot be resolved", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand All @@ -1752,9 +1746,7 @@ var _ = Describe("Candidate Filtering", func() {

Expect(cluster.Nodes()).To(HaveLen(1))
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(fmt.Sprintf("instance type %q can't be resolved", mostExpensiveInstance.Name)))
Expect(recorder.DetectedEvent(fmt.Sprintf("Cannot disrupt Node: Instance Type %q not found", mostExpensiveInstance.Name))).To(BeTrue())
Expect(err).ToNot(HaveOccurred())
})
It("should not consider candidates that are actively being processed in the queue", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand Down
10 changes: 3 additions & 7 deletions pkg/controllers/disruption/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,8 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, recorder events
recorder.Publish(disruptionevents.Blocked(node.Node, node.NodeClaim, fmt.Sprintf("NodePool %q not found", nodePoolName))...)
return nil, fmt.Errorf("nodepool %q can't be resolved for state node", nodePoolName)
}
// We only care if instanceType in non-empty consolidation to do price-comparison.
instanceType := instanceTypeMap[node.Labels()[corev1.LabelInstanceTypeStable]]
// skip any candidates that we can't determine the instance of
if instanceType == nil {
recorder.Publish(disruptionevents.Blocked(node.Node, node.NodeClaim, fmt.Sprintf("Instance Type %q not found", node.Labels()[corev1.LabelInstanceTypeStable]))...)
return nil, fmt.Errorf("instance type %q can't be resolved", node.Labels()[corev1.LabelInstanceTypeStable])
}
if pods, err = node.ValidatePodsDisruptable(ctx, kubeClient, pdbs); err != nil {
// if the disruption class is not eventual or the nodepool has no TerminationGracePeriod, block disruption of pods
// if the error is anything but a PodBlockEvictionError, also block disruption of pods
Expand Down Expand Up @@ -149,8 +145,8 @@ func (c Command) String() string {
fmt.Fprint(&buf, ", ")
}
fmt.Fprintf(&buf, "%s", old.Name())
fmt.Fprintf(&buf, "/%s", old.instanceType.Name)
fmt.Fprintf(&buf, "/%s", old.capacityType)
fmt.Fprintf(&buf, "/%s", old.Labels()[corev1.LabelInstanceTypeStable])
fmt.Fprintf(&buf, "/%s", old.Labels()[v1.CapacityTypeLabelKey])
}
if len(c.replacements) == 0 {
return buf.String()
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/nodeclaim/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var _ = Describe("Underutilized", func() {
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
corev1.LabelInstanceTypeStable: "default-instance-type", // need the instance type for the cluster state update
corev1.LabelInstanceTypeStable: it.Name,
},
},
})
Expand Down
39 changes: 37 additions & 2 deletions pkg/controllers/nodeclaim/disruption/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

corev1 "k8s.io/api/core/v1"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/scheduling"
)

const (
NodePoolDrifted cloudprovider.DriftReason = "NodePoolDrifted"
RequirementsDrifted cloudprovider.DriftReason = "RequirementsDrifted"
NodePoolDrifted cloudprovider.DriftReason = "NodePoolDrifted"
RequirementsDrifted cloudprovider.DriftReason = "RequirementsDrifted"
InstanceTypeNotFound cloudprovider.DriftReason = "InstanceTypeNotFound"
)

// Drift is a nodeclaim sub-controller that adds or removes status conditions on drifted nodeclaims
Expand Down Expand Up @@ -81,13 +84,45 @@ func (d *Drift) isDrifted(ctx context.Context, nodePool *v1.NodePool, nodeClaim
}); reason != "" {
return reason, nil
}
// Include instance type checking separate from the other two to reduce the amount of times we grab the instance types.
its, err := d.cloudProvider.GetInstanceTypes(ctx, nodePool)
if err != nil {
return "", err
}
if reason := instanceTypeNotFound(its, nodeClaim); reason != "" {
return reason, nil
}
// Then check if it's drifted from the cloud provider side.
driftedReason, err := d.cloudProvider.IsDrifted(ctx, nodeClaim)
if err != nil {
return "", err
}
return driftedReason, nil
}

// InstanceType Offerings should return the full list of allowed instance types, even if they're temporarily
// unavailable. If we can't find the instance type that the NodeClaim is running with, or if we don't find
// a compatible offering for that given instance type (zone and capacity type being the only added in requirements),
// then we consider it drifted, since we don't have the data for this instance type anymore.
// Note this is different than RequirementDrift, where we only compare if the NodePool is compatible with the NodeClaim
// based on the requirements constructed from the NodeClaim Labels.
// InstanceTypeNotFoundDrift is computed by looking directly at the list of instance types and comparing the NodeClaim
// to the instance types and its offerings.
// 1. The NodeClaim doesn't have the instance type label
// 2. The NodeClaim has an instance type that doesn't exist in the cloudprovider instance types
// 3. There are no offerings that match the requirements
func instanceTypeNotFound(its []*cloudprovider.InstanceType, nodeClaim *v1.NodeClaim) cloudprovider.DriftReason {
it, ok := lo.Find(its, func(it *cloudprovider.InstanceType) bool {
return it.Name == nodeClaim.Labels[corev1.LabelInstanceTypeStable]
})
// Offerings should in most cases only have zone and capacity type. This likely shouldn't differ
// across cloud providers.
if !ok || !it.Offerings.HasCompatible(scheduling.NewLabelRequirements(nodeClaim.Labels)) {
return InstanceTypeNotFound
}
return ""
}

// Eligible fields for drift are described in the docs
// https://karpenter.sh/docs/concepts/deprovisioning/#drift
func areStaticFieldsDrifted(nodePool *v1.NodePool, nodeClaim *v1.NodeClaim) cloudprovider.DriftReason {
Expand Down
Loading

0 comments on commit d22742d

Please sign in to comment.