Skip to content

Commit

Permalink
backport of commit 997da25
Browse files Browse the repository at this point in the history
  • Loading branch information
mvegter authored Nov 21, 2024
1 parent 22e7876 commit 17a7fc5
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 28 deletions.
3 changes: 3 additions & 0 deletions .changelog/24304.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: take all assigned cpu cores into account instead of only those part of the largest lifecycle
```
12 changes: 10 additions & 2 deletions client/lib/cgroupslib/partition_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,18 @@ func (p *partition) Reserve(cores *idset.Set[hw.CoreID]) error {
p.lock.Lock()
defer p.lock.Unlock()

// Use the intersection with the usable cores to avoid adding more cores than available.
usableCores := p.usableCores.Intersect(cores)

overlappingCores := p.reserve.Intersect(usableCores)
if overlappingCores.Size() > 0 {
// COMPAT: prior to Nomad 1.9.X this would silently happen, this should probably return an error instead
p.log.Warn("Unable to exclusively reserve the requested cores", "cores", cores, "overlapping_cores", overlappingCores)
}

p.share.RemoveSet(cores)
p.reserve.InsertSet(usableCores)

// Use the intersection with the usable cores to avoid adding more cores than available.
p.reserve.InsertSet(p.usableCores.Intersect(cores))
return p.write()
}

Expand Down
73 changes: 73 additions & 0 deletions nomad/structs/funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,79 @@ func TestAllocsFit(t *testing.T) {
must.Eq(t, 1024, used.Flattened.Memory.MemoryMB)
}

func TestAllocsFit_Cores(t *testing.T) {
ci.Parallel(t)

n := node2k()

a1 := &Allocation{
AllocatedResources: &AllocatedResources{
Tasks: map[string]*AllocatedTaskResources{
"web": {
Cpu: AllocatedCpuResources{
CpuShares: 500,
ReservedCores: []uint16{0},
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
},
},
},
},
}

a2 := &Allocation{
AllocatedResources: &AllocatedResources{
Tasks: map[string]*AllocatedTaskResources{
"web-prestart": {
Cpu: AllocatedCpuResources{
CpuShares: 500,
ReservedCores: []uint16{1},
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
},
},
"web": {
Cpu: AllocatedCpuResources{
CpuShares: 500,
ReservedCores: []uint16{0},
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
},
},
},
TaskLifecycles: map[string]*TaskLifecycleConfig{
"web-prestart": {
Hook: TaskLifecycleHookPrestart,
Sidecar: false,
},
},
},
}

// Should fit one allocation
fit, dim, used, err := AllocsFit(n, []*Allocation{a1}, nil, false)
must.NoError(t, err)
must.True(t, fit, must.Sprintf("failed for dimension %q", dim))
must.Eq(t, 500, used.Flattened.Cpu.CpuShares)
must.Eq(t, 1024, used.Flattened.Memory.MemoryMB)

// Should fit one allocation
fit, dim, used, err = AllocsFit(n, []*Allocation{a2}, nil, false)
must.NoError(t, err)
must.True(t, fit, must.Sprintf("failed for dimension %q", dim))
must.Eq(t, 1000, used.Flattened.Cpu.CpuShares)
must.Eq(t, 1024, used.Flattened.Memory.MemoryMB)

// Should not fit both allocations
fit, dim, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil, false)
must.NoError(t, err)
must.False(t, fit)
must.Eq(t, dim, "cores")
}

func TestAllocsFit_TerminalAlloc(t *testing.T) {
ci.Parallel(t)

Expand Down
58 changes: 34 additions & 24 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3856,35 +3856,45 @@ func (a *AllocatedResources) Comparable() *ComparableResources {
Shared: a.Shared,
}

prestartSidecarTasks := &AllocatedTaskResources{}
prestartEphemeralTasks := &AllocatedTaskResources{}
main := &AllocatedTaskResources{}
poststartTasks := &AllocatedTaskResources{}
poststopTasks := &AllocatedTaskResources{}

for taskName, r := range a.Tasks {
lc := a.TaskLifecycles[taskName]
if lc == nil {
main.Add(r)
} else if lc.Hook == TaskLifecycleHookPrestart {
if lc.Sidecar {
prestartSidecarTasks.Add(r)
// The lifecycle in which a task could run
prestartLifecycle := &AllocatedTaskResources{}
mainLifecycle := &AllocatedTaskResources{}
stopLifecycle := &AllocatedTaskResources{}

for taskName, taskResources := range a.Tasks {
taskLifecycle := a.TaskLifecycles[taskName]
fungibleTaskResources := taskResources.Copy()

// Reserved cores (and their respective bandwidth) are not fungible,
// hence we should always include it as part of the Flattened resources.
if len(fungibleTaskResources.Cpu.ReservedCores) > 0 {
c.Flattened.Cpu.Add(&fungibleTaskResources.Cpu)
fungibleTaskResources.Cpu = AllocatedCpuResources{}
}

if taskLifecycle == nil {
mainLifecycle.Add(fungibleTaskResources)
} else if taskLifecycle.Hook == TaskLifecycleHookPrestart {
if taskLifecycle.Sidecar {
// These tasks span both the prestart and main lifecycle
prestartLifecycle.Add(fungibleTaskResources)
mainLifecycle.Add(fungibleTaskResources)
} else {
prestartEphemeralTasks.Add(r)
prestartLifecycle.Add(fungibleTaskResources)
}
} else if lc.Hook == TaskLifecycleHookPoststart {
poststartTasks.Add(r)
} else if lc.Hook == TaskLifecycleHookPoststop {
poststopTasks.Add(r)
} else if taskLifecycle.Hook == TaskLifecycleHookPoststart {
mainLifecycle.Add(fungibleTaskResources)
} else if taskLifecycle.Hook == TaskLifecycleHookPoststop {
stopLifecycle.Add(fungibleTaskResources)
}
}

// update this loop to account for lifecycle hook
main.Add(poststartTasks)
prestartEphemeralTasks.Max(main)
prestartEphemeralTasks.Max(poststopTasks)
prestartSidecarTasks.Add(prestartEphemeralTasks)
c.Flattened.Add(prestartSidecarTasks)
// Update the main lifecycle to reflect the largest fungible resource set
mainLifecycle.Max(prestartLifecycle)
mainLifecycle.Max(stopLifecycle)

// Add the fungible resources
c.Flattened.Add(mainLifecycle)

// Add network resources that are at the task group level
for _, network := range a.Shared.Networks {
Expand Down
122 changes: 120 additions & 2 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7763,6 +7763,124 @@ func TestAllocatedResources_Comparable_Flattened(t *testing.T) {
ci.Parallel(t)

allocationResources := AllocatedResources{
TaskLifecycles: map[string]*TaskLifecycleConfig{
"prestart-task": {
Hook: TaskLifecycleHookPrestart,
Sidecar: false,
},
"poststop-task": {
Hook: TaskLifecycleHookPoststop,
Sidecar: false,
},
},
Tasks: map[string]*AllocatedTaskResources{
"prestart-task": {
Cpu: AllocatedCpuResources{
CpuShares: 2000,
},
},
"main-task": {
Cpu: AllocatedCpuResources{
CpuShares: 4000,
},
},
"poststop-task": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
},
},
},
}

// The output of Flattened should return the resource required during the execution of the largest lifecycle
must.Eq(t, 4000, allocationResources.Comparable().Flattened.Cpu.CpuShares)
must.Len(t, 0, allocationResources.Comparable().Flattened.Cpu.ReservedCores)

allocationResources = AllocatedResources{
TaskLifecycles: map[string]*TaskLifecycleConfig{
"prestart-task": {
Hook: TaskLifecycleHookPrestart,
Sidecar: false,
},
"prestart-sidecar-task": {
Hook: TaskLifecycleHookPrestart,
Sidecar: true,
},
"poststop-task": {
Hook: TaskLifecycleHookPoststop,
Sidecar: false,
},
},
Tasks: map[string]*AllocatedTaskResources{
"prestart-task": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
ReservedCores: []uint16{0},
},
},
"prestart-sidecar-task": {
Cpu: AllocatedCpuResources{
CpuShares: 2000,
ReservedCores: []uint16{1, 2},
},
},
"main-task": {
Cpu: AllocatedCpuResources{
CpuShares: 2000,
ReservedCores: []uint16{3, 4},
},
},
"poststop-task": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
ReservedCores: []uint16{5},
},
},
},
}

// Reserved core resources are claimed throughout the lifespan of the allocation
must.Eq(t, 6000, allocationResources.Comparable().Flattened.Cpu.CpuShares)
must.Len(t, 6, allocationResources.Comparable().Flattened.Cpu.ReservedCores)

allocationResources = AllocatedResources{
TaskLifecycles: map[string]*TaskLifecycleConfig{
"prestart-task": {
Hook: TaskLifecycleHookPrestart,
Sidecar: false,
},
"poststop-task": {
Hook: TaskLifecycleHookPoststop,
Sidecar: false,
},
},
Tasks: map[string]*AllocatedTaskResources{
"prestart-task": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
},
},
"main-task": {
Cpu: AllocatedCpuResources{
CpuShares: 2000,
ReservedCores: []uint16{1, 2},
},
},
"poststop-task": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
},
},
},
}

// Reserved core resources are claimed throughout the lifespan of the allocation,
// but the prestart and poststop task can reuse the CpuShares. It's important to
// note that we will only claim 1000 MHz as part of the share slice.
must.Eq(t, 3000, allocationResources.Comparable().Flattened.Cpu.CpuShares)
must.Len(t, 2, allocationResources.Comparable().Flattened.Cpu.ReservedCores)

allocationResources = AllocatedResources{
TaskLifecycles: map[string]*TaskLifecycleConfig{
"prestart-task": {
Hook: TaskLifecycleHookPrestart,
Expand Down Expand Up @@ -7816,8 +7934,8 @@ func TestAllocatedResources_Comparable_Flattened(t *testing.T) {
}

// The output of Flattened should return the resource required during the execution of the largest lifecycle
must.Eq(t, 5000, allocationResources.Comparable().Flattened.Cpu.CpuShares)
must.Len(t, 5, allocationResources.Comparable().Flattened.Cpu.ReservedCores)
must.Eq(t, 9000, allocationResources.Comparable().Flattened.Cpu.CpuShares)
must.Len(t, 9, allocationResources.Comparable().Flattened.Cpu.ReservedCores)
}

func requireErrors(t *testing.T, err error, expected ...string) {
Expand Down

0 comments on commit 17a7fc5

Please sign in to comment.