Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CA: DRA integration MVP #7530

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions cluster-autoscaler/cloudprovider/test/test_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ func (tcp *TestCloudProvider) SetResourceLimiter(resourceLimiter *cloudprovider.
tcp.resourceLimiter = resourceLimiter
}

// SetMachineTemplates sets template NodeInfos per-machine-type.
func (tcp *TestCloudProvider) SetMachineTemplates(machineTemplates map[string]*framework.NodeInfo) {
tcp.machineTemplates = machineTemplates
}

// Cleanup this is a function to close resources associated with the cloud provider
func (tcp *TestCloudProvider) Cleanup() error {
return nil
Expand Down
6 changes: 6 additions & 0 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
Expand Down Expand Up @@ -63,6 +64,7 @@ type AutoscalerOptions struct {
ScaleUpOrchestrator scaleup.Orchestrator
DeleteOptions options.NodeDeleteOptions
DrainabilityRules rules.Rules
DraProvider *draprovider.Provider
}

// Autoscaler is the main component of CA which scales up/down node groups according to its configuration
Expand Down Expand Up @@ -102,6 +104,7 @@ func NewAutoscaler(opts AutoscalerOptions, informerFactory informers.SharedInfor
opts.ScaleUpOrchestrator,
opts.DeleteOptions,
opts.DrainabilityRules,
opts.DraProvider,
), nil
}

Expand Down Expand Up @@ -165,6 +168,9 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers
if opts.DrainabilityRules == nil {
opts.DrainabilityRules = rules.Default(opts.DeleteOptions)
}
if opts.DraProvider == nil && opts.DynamicResourceAllocationEnabled {
opts.DraProvider = draprovider.NewProviderFromInformers(informerFactory)
}

return nil
}
18 changes: 15 additions & 3 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
Expand Down Expand Up @@ -64,6 +65,7 @@ type Actuator struct {
configGetter actuatorNodeGroupConfigGetter
nodeDeleteDelayAfterTaint time.Duration
pastLatencies *expiring.List
draProvider *draprovider.Provider
}

// actuatorNodeGroupConfigGetter is an interface to limit the functions that can be used
Expand All @@ -74,7 +76,7 @@ type actuatorNodeGroupConfigGetter interface {
}

// NewActuator returns a new instance of Actuator.
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter, draProvider *draprovider.Provider) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval)
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
var evictor Evictor
Expand All @@ -93,6 +95,7 @@ func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupch
configGetter: configGetter,
nodeDeleteDelayAfterTaint: ctx.NodeDeleteDelayAfterTaint,
pastLatencies: expiring.NewList(),
draProvider: draProvider,
}
}

Expand Down Expand Up @@ -331,7 +334,7 @@ func (a *Actuator) scaleDownNodeToReport(node *apiv1.Node, drain bool) (*status.
}

gpuConfig := a.ctx.CloudProvider.GetNodeGpuConfig(node)
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, a.ctx.IgnoreMirrorPodsUtilization, gpuConfig, time.Now())
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, a.ctx.IgnoreMirrorPodsUtilization, a.ctx.DynamicResourceAllocationEnabled, gpuConfig, time.Now())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -368,7 +371,16 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
scheduledPods := kube_util.ScheduledPods(pods)
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)

err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, drasnapshot.Snapshot{})
var draSnapshot drasnapshot.Snapshot
if a.ctx.DynamicResourceAllocationEnabled && a.draProvider != nil {
draSnapshot, err = a.draProvider.Snapshot()
if err != nil {
// TODO(DRA): Maybe proceed?
return nil, err
}
}

err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, draSnapshot)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/scaledown/eligibility/eligibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ func (c *Checker) unremovableReasonAndNodeUtilization(context *context.Autoscali
}

gpuConfig := context.CloudProvider.GetNodeGpuConfig(node)
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, context.IgnoreMirrorPodsUtilization, gpuConfig, timestamp)
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, context.IgnoreMirrorPodsUtilization, context.DynamicResourceAllocationEnabled, gpuConfig, timestamp)
if err != nil {
klog.Warningf("Failed to calculate utilization for %s: %v", node.Name, err)
return simulator.UnexpectedError, nil
}

// If scale down of unready nodes is disabled, skip the node if it is unready
Expand Down
99 changes: 81 additions & 18 deletions cluster-autoscaler/core/scaledown/eligibility/eligibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/google/go-cmp/cmp"

apiv1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/client-go/kubernetes/fake"
Expand All @@ -39,13 +42,15 @@ type testCase struct {
desc string
nodes []*apiv1.Node
pods []*apiv1.Pod
want []string
draSnapshot drasnapshot.Snapshot
draEnabled bool
wantUnneeded []string
wantUnremovable []*simulator.UnremovableNode
scaleDownUnready bool
ignoreDaemonSetsUtilization bool
}

func getTestCases(ignoreDaemonSetsUtilization bool, suffix string, now time.Time) []testCase {

regularNode := BuildTestNode("regular", 1000, 10)
SetNodeReadyState(regularNode, true, time.Time{})

Expand All @@ -69,51 +74,99 @@ func getTestCases(ignoreDaemonSetsUtilization bool, suffix string, now time.Time
dsPod := BuildTestPod("dsPod", 500, 0, WithDSController())
dsPod.Spec.NodeName = "regular"

brokenUtilNode := BuildTestNode("regular", 0, 0)
regularNodeIncompleteResourceSlice := &resourceapi.ResourceSlice{
ObjectMeta: metav1.ObjectMeta{Name: "regularNodeIncompleteResourceSlice", UID: "regularNodeIncompleteResourceSlice"},
Spec: resourceapi.ResourceSliceSpec{
Driver: "driver.foo.com",
NodeName: "regular",
Pool: resourceapi.ResourcePool{
Name: "regular-pool",
ResourceSliceCount: 999,
},
Devices: []resourceapi.Device{{Name: "dev1"}},
},
}
testCases := []testCase{
{
desc: "regular node stays",
nodes: []*apiv1.Node{regularNode},
want: []string{"regular"},
wantUnneeded: []string{"regular"},
wantUnremovable: []*simulator.UnremovableNode{},
scaleDownUnready: true,
},
{
desc: "recently deleted node is filtered out",
nodes: []*apiv1.Node{regularNode, justDeletedNode},
want: []string{"regular"},
wantUnneeded: []string{"regular"},
wantUnremovable: []*simulator.UnremovableNode{{Node: justDeletedNode, Reason: simulator.CurrentlyBeingDeleted}},
scaleDownUnready: true,
},
{
desc: "marked no scale down is filtered out",
nodes: []*apiv1.Node{noScaleDownNode, regularNode},
want: []string{"regular"},
wantUnneeded: []string{"regular"},
wantUnremovable: []*simulator.UnremovableNode{{Node: noScaleDownNode, Reason: simulator.ScaleDownDisabledAnnotation}},
scaleDownUnready: true,
},
{
desc: "highly utilized node is filtered out",
nodes: []*apiv1.Node{regularNode},
pods: []*apiv1.Pod{bigPod},
want: []string{},
wantUnneeded: []string{},
wantUnremovable: []*simulator.UnremovableNode{{Node: regularNode, Reason: simulator.NotUnderutilized}},
scaleDownUnready: true,
},
{
desc: "underutilized node stays",
nodes: []*apiv1.Node{regularNode},
pods: []*apiv1.Pod{smallPod},
want: []string{"regular"},
wantUnneeded: []string{"regular"},
wantUnremovable: []*simulator.UnremovableNode{},
scaleDownUnready: true,
},
{
desc: "node is filtered out if utilization can't be calculated",
nodes: []*apiv1.Node{brokenUtilNode},
pods: []*apiv1.Pod{smallPod},
wantUnneeded: []string{},
wantUnremovable: []*simulator.UnremovableNode{{Node: brokenUtilNode, Reason: simulator.UnexpectedError}},
scaleDownUnready: true,
},
{
desc: "unready node stays",
nodes: []*apiv1.Node{unreadyNode},
want: []string{"unready"},
wantUnneeded: []string{"unready"},
wantUnremovable: []*simulator.UnremovableNode{},
scaleDownUnready: true,
},
{
desc: "unready node is filtered oud when scale-down of unready is disabled",
nodes: []*apiv1.Node{unreadyNode},
want: []string{},
wantUnneeded: []string{},
wantUnremovable: []*simulator.UnremovableNode{{Node: unreadyNode, Reason: simulator.ScaleDownUnreadyDisabled}},
scaleDownUnready: false,
},
{
desc: "Node is not filtered out because of DRA issues if DRA is disabled",
nodes: []*apiv1.Node{regularNode},
pods: []*apiv1.Pod{smallPod},
draSnapshot: drasnapshot.NewSnapshot(nil, map[string][]*resourceapi.ResourceSlice{"regular": {regularNodeIncompleteResourceSlice}}, nil, nil),
draEnabled: false,
wantUnneeded: []string{"regular"},
wantUnremovable: []*simulator.UnremovableNode{},
scaleDownUnready: true,
},
{
desc: "Node is filtered out because of DRA issues if DRA is enabled",
nodes: []*apiv1.Node{regularNode},
pods: []*apiv1.Pod{smallPod},
draSnapshot: drasnapshot.NewSnapshot(nil, map[string][]*resourceapi.ResourceSlice{"regular": {regularNodeIncompleteResourceSlice}}, nil, nil),
draEnabled: true,
wantUnneeded: []string{},
wantUnremovable: []*simulator.UnremovableNode{{Node: regularNode, Reason: simulator.UnexpectedError}},
scaleDownUnready: true,
},
}

finalTestCases := []testCase{}
Expand All @@ -130,15 +183,17 @@ func getTestCases(ignoreDaemonSetsUtilization bool, suffix string, now time.Time
desc: "high utilization daemonsets node is filtered out",
nodes: []*apiv1.Node{regularNode},
pods: []*apiv1.Pod{smallPod, dsPod},
want: []string{},
wantUnneeded: []string{},
wantUnremovable: []*simulator.UnremovableNode{{Node: regularNode, Reason: simulator.NotUnderutilized}},
scaleDownUnready: true,
ignoreDaemonSetsUtilization: false,
},
testCase{
desc: "high utilization daemonsets node stays",
nodes: []*apiv1.Node{regularNode},
pods: []*apiv1.Pod{smallPod, dsPod},
want: []string{"regular"},
wantUnneeded: []string{"regular"},
wantUnremovable: []*simulator.UnremovableNode{},
scaleDownUnready: true,
ignoreDaemonSetsUtilization: true,
})
Expand All @@ -155,8 +210,9 @@ func TestFilterOutUnremovable(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
options := config.AutoscalingOptions{
UnremovableNodeRecheckTimeout: 5 * time.Minute,
ScaleDownUnreadyEnabled: tc.scaleDownUnready,
DynamicResourceAllocationEnabled: tc.draEnabled,
UnremovableNodeRecheckTimeout: 5 * time.Minute,
ScaleDownUnreadyEnabled: tc.scaleDownUnready,
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: config.DefaultScaleDownUtilizationThreshold,
ScaleDownGpuUtilizationThreshold: config.DefaultScaleDownGpuUtilizationThreshold,
Expand All @@ -173,13 +229,20 @@ func TestFilterOutUnremovable(t *testing.T) {
provider.AddNode("ng1", n)
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods)
if err != nil {
t.Fatalf("Could not create autoscaling context: %v", err)
}
if err := context.ClusterSnapshot.SetClusterState(tc.nodes, tc.pods, tc.draSnapshot); err != nil {
t.Fatalf("Could not SetClusterState: %v", err)
}
unremovableNodes := unremovable.NewNodes()
got, _, _ := c.FilterOutUnremovable(&context, tc.nodes, now, unremovableNodes)
assert.Equal(t, tc.want, got)
gotUnneeded, _, gotUnremovable := c.FilterOutUnremovable(&context, tc.nodes, now, unremovableNodes)
if diff := cmp.Diff(tc.wantUnneeded, gotUnneeded); diff != "" {
t.Errorf("FilterOutUnremovable(): unexpected unneeded (-want +got): %s", diff)
}
if diff := cmp.Diff(tc.wantUnremovable, gotUnremovable); diff != "" {
t.Errorf("FilterOutUnremovable(): unexpected unremovable (-want +got): %s", diff)
}
})
}
}
Loading
Loading