Skip to content

Commit

Permalink
Exclude non-ready nodes from scheduling. (#3413)
Browse files Browse the repository at this point in the history
* Exclude non-ready nodes from scheduling.

* Update after code review

* Move TAS indexer to subpackage

* Use TAS indexer to filter out non ready nodes

* Update integration tests

* Update to match verify
  • Loading branch information
mszadkow authored Nov 4, 2024
1 parent 8ac14ac commit dad65ef
Show file tree
Hide file tree
Showing 17 changed files with 342 additions and 48 deletions.
3 changes: 2 additions & 1 deletion cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/tas"
tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer"
"sigs.k8s.io/kueue/pkg/debugger"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/metrics"
Expand Down Expand Up @@ -218,7 +219,7 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur
}

if features.Enabled(features.TopologyAwareScheduling) {
if err := tas.SetupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
if err := tasindexer.SetupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
setupLog.Error(err, "Could not setup TAS indexer")
os.Exit(1)
}
Expand Down
163 changes: 163 additions & 0 deletions pkg/cache/tas_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer"
"sigs.k8s.io/kueue/pkg/resources"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod"
Expand Down Expand Up @@ -55,6 +56,12 @@ func TestFindTopologyAssignment(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
{
Expand All @@ -71,6 +78,12 @@ func TestFindTopologyAssignment(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
{
Expand All @@ -87,6 +100,12 @@ func TestFindTopologyAssignment(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
{
Expand All @@ -103,6 +122,12 @@ func TestFindTopologyAssignment(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
{
Expand All @@ -119,6 +144,12 @@ func TestFindTopologyAssignment(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
{
Expand All @@ -135,6 +166,12 @@ func TestFindTopologyAssignment(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("4Gi"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
}
Expand Down Expand Up @@ -185,6 +222,12 @@ func TestFindTopologyAssignment(t *testing.T) {
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
{
Expand All @@ -200,6 +243,12 @@ func TestFindTopologyAssignment(t *testing.T) {
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
{
Expand All @@ -215,6 +264,12 @@ func TestFindTopologyAssignment(t *testing.T) {
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
{
Expand All @@ -230,6 +285,12 @@ func TestFindTopologyAssignment(t *testing.T) {
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
{
Expand All @@ -245,6 +306,12 @@ func TestFindTopologyAssignment(t *testing.T) {
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
{
Expand All @@ -260,6 +327,12 @@ func TestFindTopologyAssignment(t *testing.T) {
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
Expand Down Expand Up @@ -656,6 +729,12 @@ func TestFindTopologyAssignment(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
Expand Down Expand Up @@ -698,6 +777,12 @@ func TestFindTopologyAssignment(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
Expand Down Expand Up @@ -726,6 +811,12 @@ func TestFindTopologyAssignment(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
Expand Down Expand Up @@ -768,6 +859,12 @@ func TestFindTopologyAssignment(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
Expand Down Expand Up @@ -816,6 +913,12 @@ func TestFindTopologyAssignment(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
Expand Down Expand Up @@ -850,6 +953,12 @@ func TestFindTopologyAssignment(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
Expand Down Expand Up @@ -885,6 +994,12 @@ func TestFindTopologyAssignment(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
{
Expand All @@ -899,6 +1014,12 @@ func TestFindTopologyAssignment(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
Expand Down Expand Up @@ -927,6 +1048,47 @@ func TestFindTopologyAssignment(t *testing.T) {
},
},
},
"no assignment as node is not ready": {
nodes: []corev1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "b1-r1-x1",
Labels: map[string]string{
"zone": "zone-a",
tasHostLabel: "x1",
},
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionFalse,
},
{
Type: corev1.NodeNetworkUnavailable,
Status: corev1.ConditionTrue,
},
},
},
},
},
request: kueue.PodSetTopologyRequest{
Required: ptr.To(tasHostLabel),
},
nodeLabels: map[string]string{
"zone": "zone-a",
},
levels: defaultOneLevel,
requests: resources.Requests{
corev1.ResourceCPU: 1000,
},
count: 1,
wantAssignment: nil,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand All @@ -941,6 +1103,7 @@ func TestFindTopologyAssignment(t *testing.T) {
}
clientBuilder := utiltesting.NewClientBuilder()
clientBuilder.WithObjects(initialObjects...)
_ = tasindexer.SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder))
client := clientBuilder.Build()

tasCache := NewTASCache(client)
Expand Down
3 changes: 2 additions & 1 deletion pkg/cache/tas_flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/controller/tas/indexer"
"sigs.k8s.io/kueue/pkg/resources"
"sigs.k8s.io/kueue/pkg/util/limitrange"
utiltas "sigs.k8s.io/kueue/pkg/util/tas"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (c *TASFlavorCache) snapshot(ctx context.Context) (*TASFlavorSnapshot, erro
}
requiredLabelKeys := client.HasLabels{}
requiredLabelKeys = append(requiredLabelKeys, c.Levels...)
err := c.client.List(ctx, nodes, requiredLabels, requiredLabelKeys)
err := c.client.List(ctx, nodes, requiredLabels, requiredLabelKeys, client.MatchingFields{indexer.ReadyNode: "true"})
if err != nil {
return nil, fmt.Errorf("failed to list nodes for TAS: %w", err)
}
Expand Down
Loading

0 comments on commit dad65ef

Please sign in to comment.