Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: take all assigned cpu cores into account instead of only those part of the largest lifecycle #24304

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/24304.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: take all assigned cpu cores into account instead of only those part of the largest lifecycle
```
12 changes: 10 additions & 2 deletions client/lib/cgroupslib/partition_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,18 @@ func (p *partition) Reserve(cores *idset.Set[hw.CoreID]) error {
p.lock.Lock()
defer p.lock.Unlock()

// Use the intersection with the usable cores to avoid adding more cores than available.
usableCores := p.usableCores.Intersect(cores)

overlappingCores := p.reserve.Intersect(usableCores)
if overlappingCores.Size() > 0 {
// COMPAT: prior to Nomad 1.9.X this would silently happen, this should probably return an error instead
p.log.Warn("Unable to exclusively reserve the requested cores", "cores", cores, "overlapping_cores", overlappingCores)
}

p.share.RemoveSet(cores)
p.reserve.InsertSet(usableCores)

// Use the intersection with the usable cores to avoid adding more cores than available.
p.reserve.InsertSet(p.usableCores.Intersect(cores))
return p.write()
}

Expand Down
73 changes: 73 additions & 0 deletions nomad/structs/funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,79 @@ func TestAllocsFit(t *testing.T) {
must.Eq(t, 1024, used.Flattened.Memory.MemoryMB)
}

func TestAllocsFit_Cores(t *testing.T) {
ci.Parallel(t)

n := node2k()

a1 := &Allocation{
AllocatedResources: &AllocatedResources{
Tasks: map[string]*AllocatedTaskResources{
"web": {
Cpu: AllocatedCpuResources{
CpuShares: 500,
ReservedCores: []uint16{0},
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
},
},
},
},
}

a2 := &Allocation{
AllocatedResources: &AllocatedResources{
Tasks: map[string]*AllocatedTaskResources{
"web-prestart": {
Cpu: AllocatedCpuResources{
CpuShares: 500,
ReservedCores: []uint16{1},
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
},
},
"web": {
Cpu: AllocatedCpuResources{
CpuShares: 500,
ReservedCores: []uint16{0},
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
},
},
},
TaskLifecycles: map[string]*TaskLifecycleConfig{
"web-prestart": {
Hook: TaskLifecycleHookPrestart,
Sidecar: false,
},
},
},
}

// Should fit one allocation
fit, dim, used, err := AllocsFit(n, []*Allocation{a1}, nil, false)
must.NoError(t, err)
must.True(t, fit, must.Sprintf("failed for dimension %q", dim))
must.Eq(t, 500, used.Flattened.Cpu.CpuShares)
must.Eq(t, 1024, used.Flattened.Memory.MemoryMB)

// Should fit one allocation
fit, dim, used, err = AllocsFit(n, []*Allocation{a2}, nil, false)
must.NoError(t, err)
must.True(t, fit, must.Sprintf("failed for dimension %q", dim))
must.Eq(t, 1000, used.Flattened.Cpu.CpuShares)
must.Eq(t, 1024, used.Flattened.Memory.MemoryMB)

// Should not fit both allocations
fit, dim, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil, false)
must.NoError(t, err)
must.False(t, fit)
must.Eq(t, dim, "cores")
}

func TestAllocsFit_TerminalAlloc(t *testing.T) {
ci.Parallel(t)

Expand Down
58 changes: 34 additions & 24 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3856,35 +3856,45 @@ func (a *AllocatedResources) Comparable() *ComparableResources {
Shared: a.Shared,
}

prestartSidecarTasks := &AllocatedTaskResources{}
prestartEphemeralTasks := &AllocatedTaskResources{}
main := &AllocatedTaskResources{}
poststartTasks := &AllocatedTaskResources{}
poststopTasks := &AllocatedTaskResources{}

for taskName, r := range a.Tasks {
lc := a.TaskLifecycles[taskName]
if lc == nil {
main.Add(r)
} else if lc.Hook == TaskLifecycleHookPrestart {
if lc.Sidecar {
prestartSidecarTasks.Add(r)
// The lifecycle in which a task could run
prestartLifecycle := &AllocatedTaskResources{}
mainLifecycle := &AllocatedTaskResources{}
stopLifecycle := &AllocatedTaskResources{}

for taskName, taskResources := range a.Tasks {
taskLifecycle := a.TaskLifecycles[taskName]
fungibleTaskResources := taskResources.Copy()

// Reserved cores (and their respective bandwidth) are not fungible,
// hence we should always include it as part of the Flattened resources.
if len(fungibleTaskResources.Cpu.ReservedCores) > 0 {
c.Flattened.Cpu.Add(&fungibleTaskResources.Cpu)
fungibleTaskResources.Cpu = AllocatedCpuResources{}
}

if taskLifecycle == nil {
mainLifecycle.Add(fungibleTaskResources)
} else if taskLifecycle.Hook == TaskLifecycleHookPrestart {
if taskLifecycle.Sidecar {
// These tasks span both the prestart and main lifecycle
prestartLifecycle.Add(fungibleTaskResources)
mainLifecycle.Add(fungibleTaskResources)
} else {
prestartEphemeralTasks.Add(r)
prestartLifecycle.Add(fungibleTaskResources)
}
} else if lc.Hook == TaskLifecycleHookPoststart {
poststartTasks.Add(r)
} else if lc.Hook == TaskLifecycleHookPoststop {
poststopTasks.Add(r)
} else if taskLifecycle.Hook == TaskLifecycleHookPoststart {
mainLifecycle.Add(fungibleTaskResources)
} else if taskLifecycle.Hook == TaskLifecycleHookPoststop {
stopLifecycle.Add(fungibleTaskResources)
}
}

// update this loop to account for lifecycle hook
main.Add(poststartTasks)
prestartEphemeralTasks.Max(main)
prestartEphemeralTasks.Max(poststopTasks)
prestartSidecarTasks.Add(prestartEphemeralTasks)
c.Flattened.Add(prestartSidecarTasks)
// Update the main lifecycle to reflect the largest fungible resource set
mainLifecycle.Max(prestartLifecycle)
mainLifecycle.Max(stopLifecycle)

// Add the fungible resources
c.Flattened.Add(mainLifecycle)

// Add network resources that are at the task group level
for _, network := range a.Shared.Networks {
Expand Down
122 changes: 120 additions & 2 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7763,6 +7763,124 @@ func TestAllocatedResources_Comparable_Flattened(t *testing.T) {
ci.Parallel(t)

allocationResources := AllocatedResources{
TaskLifecycles: map[string]*TaskLifecycleConfig{
"prestart-task": {
Hook: TaskLifecycleHookPrestart,
Sidecar: false,
},
"poststop-task": {
Hook: TaskLifecycleHookPoststop,
Sidecar: false,
},
},
Tasks: map[string]*AllocatedTaskResources{
"prestart-task": {
Cpu: AllocatedCpuResources{
CpuShares: 2000,
},
},
"main-task": {
Cpu: AllocatedCpuResources{
CpuShares: 4000,
},
},
"poststop-task": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
},
},
},
}

// The output of Flattened should return the resource required during the execution of the largest lifecycle
must.Eq(t, 4000, allocationResources.Comparable().Flattened.Cpu.CpuShares)
must.Len(t, 0, allocationResources.Comparable().Flattened.Cpu.ReservedCores)

allocationResources = AllocatedResources{
TaskLifecycles: map[string]*TaskLifecycleConfig{
"prestart-task": {
Hook: TaskLifecycleHookPrestart,
Sidecar: false,
},
mvegter marked this conversation as resolved.
Show resolved Hide resolved
"prestart-sidecar-task": {
Hook: TaskLifecycleHookPrestart,
Sidecar: true,
},
"poststop-task": {
Hook: TaskLifecycleHookPoststop,
Sidecar: false,
},
},
Tasks: map[string]*AllocatedTaskResources{
"prestart-task": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
ReservedCores: []uint16{0},
},
},
"prestart-sidecar-task": {
Cpu: AllocatedCpuResources{
CpuShares: 2000,
ReservedCores: []uint16{1, 2},
},
},
"main-task": {
Cpu: AllocatedCpuResources{
CpuShares: 2000,
ReservedCores: []uint16{3, 4},
},
},
"poststop-task": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
ReservedCores: []uint16{5},
},
},
},
}

// Reserved core resources are claimed throughout the lifespan of the allocation
must.Eq(t, 6000, allocationResources.Comparable().Flattened.Cpu.CpuShares)
must.Len(t, 6, allocationResources.Comparable().Flattened.Cpu.ReservedCores)

allocationResources = AllocatedResources{
TaskLifecycles: map[string]*TaskLifecycleConfig{
"prestart-task": {
Hook: TaskLifecycleHookPrestart,
Sidecar: false,
},
"poststop-task": {
Hook: TaskLifecycleHookPoststop,
Sidecar: false,
},
},
Tasks: map[string]*AllocatedTaskResources{
"prestart-task": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
},
},
"main-task": {
Cpu: AllocatedCpuResources{
CpuShares: 2000,
ReservedCores: []uint16{1, 2},
},
},
"poststop-task": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
},
},
},
}

// Reserved core resources are claimed throughout the lifespan of the allocation,
// but the prestart and poststop task can reuse the CpuShares. It's important to
// note that we will only claim 1000 MHz as part of the share slice.
must.Eq(t, 3000, allocationResources.Comparable().Flattened.Cpu.CpuShares)
must.Len(t, 2, allocationResources.Comparable().Flattened.Cpu.ReservedCores)

allocationResources = AllocatedResources{
TaskLifecycles: map[string]*TaskLifecycleConfig{
"prestart-task": {
Hook: TaskLifecycleHookPrestart,
Expand Down Expand Up @@ -7816,8 +7934,8 @@ func TestAllocatedResources_Comparable_Flattened(t *testing.T) {
}

// The output of Flattened should return the resource required during the execution of the largest lifecycle
must.Eq(t, 5000, allocationResources.Comparable().Flattened.Cpu.CpuShares)
must.Len(t, 5, allocationResources.Comparable().Flattened.Cpu.ReservedCores)
must.Eq(t, 9000, allocationResources.Comparable().Flattened.Cpu.CpuShares)
must.Len(t, 9, allocationResources.Comparable().Flattened.Cpu.ReservedCores)
}

func requireErrors(t *testing.T, err error, expected ...string) {
Expand Down