From c0675c6b4dafbe6a5372ce64581bf08002850002 Mon Sep 17 00:00:00 2001 From: gabesaba <15304068+gabesaba@users.noreply.github.com> Date: Fri, 13 Sep 2024 15:27:12 +0200 Subject: [PATCH] Implement Cohort cycle detection in hierarchy.Manager (#3040) --- pkg/cache/snapshot_test.go | 1 + pkg/hierarchy/cycle.go | 42 +++++++ pkg/hierarchy/manager.go | 8 ++ pkg/hierarchy/manager_test.go | 123 ++++++++++++++++++++ pkg/scheduler/preemption/preemption_test.go | 1 + 5 files changed, 175 insertions(+) create mode 100644 pkg/hierarchy/cycle.go diff --git a/pkg/cache/snapshot_test.go b/pkg/cache/snapshot_test.go index 422fc43a5d..36ee70074e 100644 --- a/pkg/cache/snapshot_test.go +++ b/pkg/cache/snapshot_test.go @@ -43,6 +43,7 @@ var snapCmpOpts = []cmp.Option{ cmpopts.IgnoreUnexported(hierarchy.Cohort[*ClusterQueueSnapshot, *CohortSnapshot]{}), cmpopts.IgnoreUnexported(hierarchy.ClusterQueue[*CohortSnapshot]{}), cmpopts.IgnoreUnexported(hierarchy.Manager[*ClusterQueueSnapshot, *CohortSnapshot]{}), + cmpopts.IgnoreUnexported(hierarchy.CycleChecker{}), cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), } diff --git a/pkg/hierarchy/cycle.go b/pkg/hierarchy/cycle.go new file mode 100644 index 0000000000..3d97684b3d --- /dev/null +++ b/pkg/hierarchy/cycle.go @@ -0,0 +1,42 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hierarchy + +// cycleChecker checks for cycles in Cohorts, while memoizing the +// result. +type CycleChecker struct { + cycles map[string]bool +} + +type CycleCheckable interface { + GetName() string + HasParent() bool + CCParent() CycleCheckable +} + +func (c *CycleChecker) HasCycle(cohort CycleCheckable) bool { + if cycle, seen := c.cycles[cohort.GetName()]; seen { + return cycle + } + if !cohort.HasParent() { + c.cycles[cohort.GetName()] = false + return c.cycles[cohort.GetName()] + } + c.cycles[cohort.GetName()] = true + c.cycles[cohort.GetName()] = c.HasCycle(cohort.CCParent()) + return c.cycles[cohort.GetName()] +} diff --git a/pkg/hierarchy/manager.go b/pkg/hierarchy/manager.go index f00f63b70d..a3231ed8d0 100644 --- a/pkg/hierarchy/manager.go +++ b/pkg/hierarchy/manager.go @@ -22,6 +22,7 @@ type Manager[CQ clusterQueueNode[C], C cohortNode[CQ, C]] struct { Cohorts map[string]C ClusterQueues map[string]CQ cohortFactory func(string) C + CycleChecker CycleChecker } // NewManager creates a new Manager. A newCohort function must @@ -32,6 +33,7 @@ func NewManager[CQ clusterQueueNode[C], C cohortNode[CQ, C]](newCohort func(stri make(map[string]C), make(map[string]CQ), newCohort, + CycleChecker{make(map[string]bool)}, } } @@ -68,6 +70,7 @@ func (m *Manager[CQ, C]) AddCohort(cohortName string) { } func (m *Manager[CQ, C]) UpdateCohortEdge(name, parentName string) { + m.resetCycleChecker() cohort := m.Cohorts[name] m.detachCohortFromParent(cohort) if parentName != "" { @@ -78,6 +81,7 @@ func (m *Manager[CQ, C]) UpdateCohortEdge(name, parentName string) { } func (m *Manager[CQ, C]) DeleteCohort(name string) { + m.resetCycleChecker() cohort, ok := m.Cohorts[name] if !ok { return @@ -138,6 +142,10 @@ func (m *Manager[CQ, C]) cleanupCohort(cohort C) { } } +func (m *Manager[CQ, C]) resetCycleChecker() { + m.CycleChecker = CycleChecker{make(map[string]bool, len(m.Cohorts))} +} + type nodeBase interface { GetName() string comparable diff --git a/pkg/hierarchy/manager_test.go b/pkg/hierarchy/manager_test.go index 5f6ca71ffd..55ad311b44 100644 --- a/pkg/hierarchy/manager_test.go +++ b/pkg/hierarchy/manager_test.go @@ -374,6 +374,125 @@ func TestManager(t *testing.T) { } } +func TestCycles(t *testing.T) { + type M = Manager[*testClusterQueue, *testCohort] + cases := map[string]struct { + operations func(M) + wantCycles map[string]bool + }{ + "no cycles": { + operations: func(m M) { + m.AddCohort("root") + m.AddCohort("left") + m.AddCohort("right") + m.UpdateCohortEdge("left", "root") + m.UpdateCohortEdge("right", "root") + }, + wantCycles: map[string]bool{ + "root": false, + "left": false, + "right": false, + }, + }, + "self-cycle": { + operations: func(m M) { + m.AddCohort("root") + m.UpdateCohortEdge("root", "root") + }, + wantCycles: map[string]bool{ + "root": true, + }, + }, + "remove self-cycle": { + operations: func(m M) { + m.AddCohort("root") + m.UpdateCohortEdge("root", "root") + // we call HasCycle to test invalidation + m.CycleChecker.HasCycle(m.Cohorts["root"]) + m.UpdateCohortEdge("root", "") + }, + wantCycles: map[string]bool{ + "root": false, + }, + }, + "cycle": { + operations: func(m M) { + m.AddCohort("cohort-a") + m.AddCohort("cohort-b") + m.UpdateCohortEdge("cohort-a", "cohort-b") + m.UpdateCohortEdge("cohort-b", "cohort-a") + }, + wantCycles: map[string]bool{ + "cohort-a": true, + "cohort-b": true, + }, + }, + "remove cycle via edge update": { + operations: func(m M) { + m.AddCohort("cohort-a") + m.AddCohort("cohort-b") + m.UpdateCohortEdge("cohort-a", "cohort-b") + m.UpdateCohortEdge("cohort-b", "cohort-a") + + // we call HasCycle to test invalidation + m.CycleChecker.HasCycle(m.Cohorts["cohort-a"]) + + m.UpdateCohortEdge("cohort-a", "cohort-c") + }, + wantCycles: map[string]bool{ + "cohort-a": false, + "cohort-b": false, + "cohort-c": false, + }, + }, + "remove cycle via edge deletion": { + operations: func(m M) { + m.AddCohort("cohort-a") + m.AddCohort("cohort-b") + m.UpdateCohortEdge("cohort-a", "cohort-b") + m.UpdateCohortEdge("cohort-b", "cohort-a") + m.CycleChecker.HasCycle(m.Cohorts["cohort-a"]) + + m.UpdateCohortEdge("cohort-a", "") + }, + wantCycles: map[string]bool{ + "cohort-a": false, + "cohort-b": false, + }, + }, + "remove cycle via node deletion": { + operations: func(m M) { + m.AddCohort("cohort-a") + m.AddCohort("cohort-b") + m.UpdateCohortEdge("cohort-a", "cohort-b") + m.UpdateCohortEdge("cohort-b", "cohort-a") + m.CycleChecker.HasCycle(m.Cohorts["cohort-a"]) + m.DeleteCohort("cohort-b") + }, + wantCycles: map[string]bool{ + "cohort-a": false, + "cohort-b": false, + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + mgr := NewManager(newCohort) + tc.operations(mgr) + for _, cohort := range mgr.Cohorts { + got := mgr.CycleChecker.HasCycle(cohort) + if got != tc.wantCycles[cohort.GetName()] { + t.Errorf("-want +got: %v %v", tc.wantCycles[cohort.GetName()], got) + } + } + if diff := cmp.Diff(mgr.CycleChecker.cycles, tc.wantCycles); diff != "" { + t.Errorf("-want +got: %v", diff) + } + }) + } +} + type testCohort struct { name string Cohort[*testClusterQueue, *testCohort] @@ -390,6 +509,10 @@ func (t *testCohort) GetName() string { return t.name } +func (t *testCohort) CCParent() CycleCheckable { + return t.Parent() +} + type testClusterQueue struct { name string ClusterQueue[*testCohort] diff --git a/pkg/scheduler/preemption/preemption_test.go b/pkg/scheduler/preemption/preemption_test.go index 3f1bf20b8c..4fb65978de 100644 --- a/pkg/scheduler/preemption/preemption_test.go +++ b/pkg/scheduler/preemption/preemption_test.go @@ -53,6 +53,7 @@ var snapCmpOpts = []cmp.Option{ cmpopts.IgnoreUnexported(hierarchy.Cohort[*cache.ClusterQueueSnapshot, *cache.CohortSnapshot]{}), cmpopts.IgnoreUnexported(hierarchy.ClusterQueue[*cache.CohortSnapshot]{}), cmpopts.IgnoreUnexported(hierarchy.Manager[*cache.ClusterQueueSnapshot, *cache.CohortSnapshot]{}), + cmpopts.IgnoreUnexported(hierarchy.CycleChecker{}), cmpopts.IgnoreFields(cache.ClusterQueueSnapshot{}, "AllocatableResourceGeneration"), cmp.Transformer("Cohort.Members", func(s sets.Set[*cache.ClusterQueueSnapshot]) sets.Set[string] { result := make(sets.Set[string], len(s))