From 17a7fc5e7804303dc14bed361849541b437c4286 Mon Sep 17 00:00:00 2001 From: Martijn Vegter Date: Thu, 21 Nov 2024 18:21:48 +0000 Subject: [PATCH] backport of commit 997da25cdb49c634749be97874955024492b9d43 --- .changelog/24304.txt | 3 + client/lib/cgroupslib/partition_linux.go | 12 ++- nomad/structs/funcs_test.go | 73 ++++++++++++++ nomad/structs/structs.go | 58 ++++++----- nomad/structs/structs_test.go | 122 ++++++++++++++++++++++- 5 files changed, 240 insertions(+), 28 deletions(-) create mode 100644 .changelog/24304.txt diff --git a/.changelog/24304.txt b/.changelog/24304.txt new file mode 100644 index 00000000000..e8bce7fb3bf --- /dev/null +++ b/.changelog/24304.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: take all assigned cpu cores into account instead of only those part of the largest lifecycle +``` diff --git a/client/lib/cgroupslib/partition_linux.go b/client/lib/cgroupslib/partition_linux.go index 165454dd0f0..a3bfe294ad7 100644 --- a/client/lib/cgroupslib/partition_linux.go +++ b/client/lib/cgroupslib/partition_linux.go @@ -79,10 +79,18 @@ func (p *partition) Reserve(cores *idset.Set[hw.CoreID]) error { p.lock.Lock() defer p.lock.Unlock() + // Use the intersection with the usable cores to avoid adding more cores than available. + usableCores := p.usableCores.Intersect(cores) + + overlappingCores := p.reserve.Intersect(usableCores) + if overlappingCores.Size() > 0 { + // COMPAT: prior to Nomad 1.9.X this would silently happen, this should probably return an error instead + p.log.Warn("Unable to exclusively reserve the requested cores", "cores", cores, "overlapping_cores", overlappingCores) + } + p.share.RemoveSet(cores) + p.reserve.InsertSet(usableCores) - // Use the intersection with the usable cores to avoid adding more cores than available. - p.reserve.InsertSet(p.usableCores.Intersect(cores)) return p.write() } diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index 55c8ce91b66..2b9476c8769 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -248,6 +248,79 @@ func TestAllocsFit(t *testing.T) { must.Eq(t, 1024, used.Flattened.Memory.MemoryMB) } +func TestAllocsFit_Cores(t *testing.T) { + ci.Parallel(t) + + n := node2k() + + a1 := &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{ + CpuShares: 500, + ReservedCores: []uint16{0}, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 1024, + }, + }, + }, + }, + } + + a2 := &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web-prestart": { + Cpu: AllocatedCpuResources{ + CpuShares: 500, + ReservedCores: []uint16{1}, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 1024, + }, + }, + "web": { + Cpu: AllocatedCpuResources{ + CpuShares: 500, + ReservedCores: []uint16{0}, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 1024, + }, + }, + }, + TaskLifecycles: map[string]*TaskLifecycleConfig{ + "web-prestart": { + Hook: TaskLifecycleHookPrestart, + Sidecar: false, + }, + }, + }, + } + + // Should fit one allocation + fit, dim, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) + must.NoError(t, err) + must.True(t, fit, must.Sprintf("failed for dimension %q", dim)) + must.Eq(t, 500, used.Flattened.Cpu.CpuShares) + must.Eq(t, 1024, used.Flattened.Memory.MemoryMB) + + // Should fit one allocation + fit, dim, used, err = AllocsFit(n, []*Allocation{a2}, nil, false) + must.NoError(t, err) + must.True(t, fit, must.Sprintf("failed for dimension %q", dim)) + must.Eq(t, 1000, used.Flattened.Cpu.CpuShares) + must.Eq(t, 1024, used.Flattened.Memory.MemoryMB) + + // Should not fit both allocations + fit, dim, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil, false) + must.NoError(t, err) + must.False(t, fit) + must.Eq(t, dim, "cores") +} + func TestAllocsFit_TerminalAlloc(t *testing.T) { ci.Parallel(t) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 21d9c483aa5..c832fab0a82 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3856,35 +3856,45 @@ func (a *AllocatedResources) Comparable() *ComparableResources { Shared: a.Shared, } - prestartSidecarTasks := &AllocatedTaskResources{} - prestartEphemeralTasks := &AllocatedTaskResources{} - main := &AllocatedTaskResources{} - poststartTasks := &AllocatedTaskResources{} - poststopTasks := &AllocatedTaskResources{} - - for taskName, r := range a.Tasks { - lc := a.TaskLifecycles[taskName] - if lc == nil { - main.Add(r) - } else if lc.Hook == TaskLifecycleHookPrestart { - if lc.Sidecar { - prestartSidecarTasks.Add(r) + // The lifecycle in which a task could run + prestartLifecycle := &AllocatedTaskResources{} + mainLifecycle := &AllocatedTaskResources{} + stopLifecycle := &AllocatedTaskResources{} + + for taskName, taskResources := range a.Tasks { + taskLifecycle := a.TaskLifecycles[taskName] + fungibleTaskResources := taskResources.Copy() + + // Reserved cores (and their respective bandwidth) are not fungible, + // hence we should always include it as part of the Flattened resources. + if len(fungibleTaskResources.Cpu.ReservedCores) > 0 { + c.Flattened.Cpu.Add(&fungibleTaskResources.Cpu) + fungibleTaskResources.Cpu = AllocatedCpuResources{} + } + + if taskLifecycle == nil { + mainLifecycle.Add(fungibleTaskResources) + } else if taskLifecycle.Hook == TaskLifecycleHookPrestart { + if taskLifecycle.Sidecar { + // These tasks span both the prestart and main lifecycle + prestartLifecycle.Add(fungibleTaskResources) + mainLifecycle.Add(fungibleTaskResources) } else { - prestartEphemeralTasks.Add(r) + prestartLifecycle.Add(fungibleTaskResources) } - } else if lc.Hook == TaskLifecycleHookPoststart { - poststartTasks.Add(r) - } else if lc.Hook == TaskLifecycleHookPoststop { - poststopTasks.Add(r) + } else if taskLifecycle.Hook == TaskLifecycleHookPoststart { + mainLifecycle.Add(fungibleTaskResources) + } else if taskLifecycle.Hook == TaskLifecycleHookPoststop { + stopLifecycle.Add(fungibleTaskResources) } } - // update this loop to account for lifecycle hook - main.Add(poststartTasks) - prestartEphemeralTasks.Max(main) - prestartEphemeralTasks.Max(poststopTasks) - prestartSidecarTasks.Add(prestartEphemeralTasks) - c.Flattened.Add(prestartSidecarTasks) + // Update the main lifecycle to reflect the largest fungible resource set + mainLifecycle.Max(prestartLifecycle) + mainLifecycle.Max(stopLifecycle) + + // Add the fungible resources + c.Flattened.Add(mainLifecycle) // Add network resources that are at the task group level for _, network := range a.Shared.Networks { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index b54cb25484f..4f44a8b3758 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -7763,6 +7763,124 @@ func TestAllocatedResources_Comparable_Flattened(t *testing.T) { ci.Parallel(t) allocationResources := AllocatedResources{ + TaskLifecycles: map[string]*TaskLifecycleConfig{ + "prestart-task": { + Hook: TaskLifecycleHookPrestart, + Sidecar: false, + }, + "poststop-task": { + Hook: TaskLifecycleHookPoststop, + Sidecar: false, + }, + }, + Tasks: map[string]*AllocatedTaskResources{ + "prestart-task": { + Cpu: AllocatedCpuResources{ + CpuShares: 2000, + }, + }, + "main-task": { + Cpu: AllocatedCpuResources{ + CpuShares: 4000, + }, + }, + "poststop-task": { + Cpu: AllocatedCpuResources{ + CpuShares: 1000, + }, + }, + }, + } + + // The output of Flattened should return the resource required during the execution of the largest lifecycle + must.Eq(t, 4000, allocationResources.Comparable().Flattened.Cpu.CpuShares) + must.Len(t, 0, allocationResources.Comparable().Flattened.Cpu.ReservedCores) + + allocationResources = AllocatedResources{ + TaskLifecycles: map[string]*TaskLifecycleConfig{ + "prestart-task": { + Hook: TaskLifecycleHookPrestart, + Sidecar: false, + }, + "prestart-sidecar-task": { + Hook: TaskLifecycleHookPrestart, + Sidecar: true, + }, + "poststop-task": { + Hook: TaskLifecycleHookPoststop, + Sidecar: false, + }, + }, + Tasks: map[string]*AllocatedTaskResources{ + "prestart-task": { + Cpu: AllocatedCpuResources{ + CpuShares: 1000, + ReservedCores: []uint16{0}, + }, + }, + "prestart-sidecar-task": { + Cpu: AllocatedCpuResources{ + CpuShares: 2000, + ReservedCores: []uint16{1, 2}, + }, + }, + "main-task": { + Cpu: AllocatedCpuResources{ + CpuShares: 2000, + ReservedCores: []uint16{3, 4}, + }, + }, + "poststop-task": { + Cpu: AllocatedCpuResources{ + CpuShares: 1000, + ReservedCores: []uint16{5}, + }, + }, + }, + } + + // Reserved core resources are claimed throughout the lifespan of the allocation + must.Eq(t, 6000, allocationResources.Comparable().Flattened.Cpu.CpuShares) + must.Len(t, 6, allocationResources.Comparable().Flattened.Cpu.ReservedCores) + + allocationResources = AllocatedResources{ + TaskLifecycles: map[string]*TaskLifecycleConfig{ + "prestart-task": { + Hook: TaskLifecycleHookPrestart, + Sidecar: false, + }, + "poststop-task": { + Hook: TaskLifecycleHookPoststop, + Sidecar: false, + }, + }, + Tasks: map[string]*AllocatedTaskResources{ + "prestart-task": { + Cpu: AllocatedCpuResources{ + CpuShares: 1000, + }, + }, + "main-task": { + Cpu: AllocatedCpuResources{ + CpuShares: 2000, + ReservedCores: []uint16{1, 2}, + }, + }, + "poststop-task": { + Cpu: AllocatedCpuResources{ + CpuShares: 1000, + }, + }, + }, + } + + // Reserved core resources are claimed throughout the lifespan of the allocation, + // but the prestart and poststop task can reuse the CpuShares. It's important to + // note that we will only claim 1000 MHz as part of the share slice. + must.Eq(t, 3000, allocationResources.Comparable().Flattened.Cpu.CpuShares) + must.Len(t, 2, allocationResources.Comparable().Flattened.Cpu.ReservedCores) + + allocationResources = AllocatedResources{ TaskLifecycles: map[string]*TaskLifecycleConfig{ "prestart-task": { Hook: TaskLifecycleHookPrestart, @@ -7816,8 +7934,8 @@ func TestAllocatedResources_Comparable_Flattened(t *testing.T) { } // The output of Flattened should return the resource required during the execution of the largest lifecycle - must.Eq(t, 5000, allocationResources.Comparable().Flattened.Cpu.CpuShares) - must.Len(t, 5, allocationResources.Comparable().Flattened.Cpu.ReservedCores) + must.Eq(t, 9000, allocationResources.Comparable().Flattened.Cpu.CpuShares) + must.Len(t, 9, allocationResources.Comparable().Flattened.Cpu.ReservedCores) } func requireErrors(t *testing.T, err error, expected ...string) {