Skip to content

Commit

Permalink
chore: Change deprovisioning to disruption controller and add metrics (
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis authored Oct 13, 2023
1 parent 4239902 commit 7dcc61d
Show file tree
Hide file tree
Showing 25 changed files with 620 additions and 466 deletions.
4 changes: 2 additions & 2 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aws/karpenter-core/pkg/cloudprovider"
"github.com/aws/karpenter-core/pkg/controllers/deprovisioning"
"github.com/aws/karpenter-core/pkg/controllers/disruption"
"github.com/aws/karpenter-core/pkg/controllers/leasegarbagecollection"
metricsnode "github.com/aws/karpenter-core/pkg/controllers/metrics/node"
metricspod "github.com/aws/karpenter-core/pkg/controllers/metrics/pod"
Expand Down Expand Up @@ -55,7 +55,7 @@ func NewControllers(

return []controller.Controller{
p, evictionQueue,
deprovisioning.NewController(clock, kubeClient, p, cloudProvider, recorder, cluster),
disruption.NewController(clock, kubeClient, p, cloudProvider, recorder, cluster),
provisioning.NewController(kubeClient, p, recorder),
nodepoolhash.NewProvisionerController(kubeClient),
informer.NewDaemonSetController(kubeClient, cluster),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package deprovisioning
package disruption

import (
"context"
Expand All @@ -29,11 +29,10 @@ import (
"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/apis/v1beta1"
"github.com/aws/karpenter-core/pkg/cloudprovider"
deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events"
disruptionevents "github.com/aws/karpenter-core/pkg/controllers/disruption/events"
"github.com/aws/karpenter-core/pkg/controllers/provisioning"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/metrics"
"github.com/aws/karpenter-core/pkg/scheduling"
)

Expand Down Expand Up @@ -64,12 +63,7 @@ func makeConsolidation(clock clock.Clock, cluster *state.Cluster, kubeClient cli
}
}

// string is the string representation of the deprovisioner
func (c *consolidation) String() string {
return metrics.ConsolidationReason
}

// sortAndFilterCandidates orders deprovisionable candidates by the disruptionCost, removing any that we already know won't
// sortAndFilterCandidates orders candidates by the disruptionCost, removing any that we already know won't
// be viable consolidation options.
func (c *consolidation) sortAndFilterCandidates(ctx context.Context, candidates []*Candidate) ([]*Candidate, error) {
candidates, err := filterCandidates(ctx, c.kubeClient, c.recorder, candidates)
Expand All @@ -93,15 +87,15 @@ func (c *consolidation) markConsolidated() {
c.lastConsolidationState = c.cluster.ConsolidationState()
}

// ShouldDeprovision is a predicate used to filter deprovisionable candidates
func (c *consolidation) ShouldDeprovision(_ context.Context, cn *Candidate) bool {
// ShouldDisrupt is a predicate used to filter candidates
func (c *consolidation) ShouldDisrupt(_ context.Context, cn *Candidate) bool {
if cn.Annotations()[v1alpha5.DoNotConsolidateNodeAnnotationKey] == "true" {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))...)
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))...)
return false
}
if cn.nodePool.Spec.Disruption.ConsolidationPolicy != v1beta1.ConsolidationPolicyWhenUnderutilized ||
(cn.nodePool.Spec.Disruption.ConsolidateAfter != nil && cn.nodePool.Spec.Disruption.ConsolidateAfter.Duration == nil) {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("%s %q has consolidation disabled", lo.Ternary(cn.nodePool.IsProvisioner, "Provisioner", "NodePool"), cn.nodePool.Name))...)
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("%s %q has consolidation disabled", lo.Ternary(cn.nodePool.IsProvisioner, "Provisioner", "NodePool"), cn.nodePool.Name))...)
return false
}
return true
Expand All @@ -125,7 +119,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if !results.AllNonPendingPodsScheduled() {
// This method is used by multi-node consolidation as well, so we'll only report in the single node case
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, results.NonPendingPodSchedulingErrors())...)
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, results.NonPendingPodSchedulingErrors())...)
}
return Command{}, nil
}
Expand All @@ -140,7 +134,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
// we're not going to turn a single node into multiple candidates
if len(results.NewNodeClaims) != 1 {
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("Can't remove without creating %d candidates", len(results.NewNodeClaims)))...)
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("Can't remove without creating %d candidates", len(results.NewNodeClaims)))...)
}
return Command{}, nil
}
Expand All @@ -154,7 +148,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
results.NewNodeClaims[0].InstanceTypeOptions = filterByPrice(results.NewNodeClaims[0].InstanceTypeOptions, results.NewNodeClaims[0].Requirements, candidatePrice)
if len(results.NewNodeClaims[0].InstanceTypeOptions) == 0 {
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace with a cheaper node")...)
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace with a cheaper node")...)
}
// no instance types remain after filtering by price
return Command{}, nil
Expand All @@ -173,7 +167,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if allExistingAreSpot &&
results.NewNodeClaims[0].Requirements.Get(v1beta1.CapacityTypeLabelKey).Has(v1beta1.CapacityTypeSpot) {
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace a spot node with a spot node")...)
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace a spot node with a spot node")...)
}
return Command{}, nil
}
Expand Down
Loading

0 comments on commit 7dcc61d

Please sign in to comment.