Skip to content

Commit

Permalink
node: podresources: implement GetAllocatableResources API
Browse files Browse the repository at this point in the history
Extend the podresources API implementing the GetAllocatableResources endpoint,
as specified in the KEPs:

https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2043-pod-resource-concrete-assigments
kubernetes/enhancements#2404

Signed-off-by: Francesco Romani <[email protected]>
  • Loading branch information
ffromani committed Mar 4, 2021
1 parent 40b8795 commit 1cd2577
Show file tree
Hide file tree
Showing 21 changed files with 373 additions and 38 deletions.
30 changes: 30 additions & 0 deletions pkg/kubelet/apis/podresources/server_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,33 @@ func (p *v1PodResourcesServer) List(ctx context.Context, req *v1.ListPodResource
PodResources: podResources,
}, nil
}

// GetAllocatableResources returns information about all the resources known by the server - this more like the capacity, not like the current amount of free resources.
func (p *v1PodResourcesServer) GetAllocatableResources(ctx context.Context, req *v1.AllocatableResourcesRequest) (*v1.AllocatableResourcesResponse, error) {
metrics.PodResourcesEndpointRequestsTotalCount.WithLabelValues("v1").Inc()

allDevices := p.devicesProvider.GetAllocatableDevices()
var respDevs []*v1.ContainerDevices

for resourceName, resourceDevs := range allDevices {
for devID, dev := range resourceDevs {
for _, node := range dev.GetTopology().GetNodes() {
numaNode := node.GetID()
respDevs = append(respDevs, &v1.ContainerDevices{
ResourceName: resourceName,
DeviceIds: []string{devID},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
{ID: numaNode},
},
},
})
}
}
}

return &v1.AllocatableResourcesResponse{
Devices: respDevs,
CpuIds: p.cpusProvider.GetAllocatableCPUs().ToSliceNoSortInt64(),
}, nil
}
183 changes: 168 additions & 15 deletions pkg/kubelet/apis/podresources/server_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
)

func TestListPodResourcesV1(t *testing.T) {
Expand Down Expand Up @@ -138,6 +140,8 @@ func TestListPodResourcesV1(t *testing.T) {
m.On("GetDevices", string(podUID), containerName).Return(tc.devices)
m.On("GetCPUs", string(podUID), containerName).Return(tc.cpus)
m.On("UpdateAllocatedDevices").Return()
m.On("GetAllocatableCPUs").Return(cpuset.CPUSet{})
m.On("GetAllocatableDevices").Return(devicemanager.NewResourceDeviceInstances())
server := NewV1PodResourcesServer(m, m, m)
resp, err := server.List(context.TODO(), &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
Expand All @@ -150,6 +154,140 @@ func TestListPodResourcesV1(t *testing.T) {
}
}

func TestAllocatableResources(t *testing.T) {
allDevs := devicemanager.ResourceDeviceInstances{
"resource": {
"dev0": {
ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
Health: "Healthy",
Topology: &pluginapi.TopologyInfo{
Nodes: []*pluginapi.NUMANode{
{
ID: 0,
},
},
},
},
"dev1": {
ID: "VF-8536e1e8-9dc6-4645-9aea-882db92e31e7",
Health: "Healthy",
Topology: &pluginapi.TopologyInfo{
Nodes: []*pluginapi.NUMANode{
{
ID: 1,
},
},
},
},
},
}
allCPUs := cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)

for _, tc := range []struct {
desc string
allCPUs cpuset.CPUSet
allDevices devicemanager.ResourceDeviceInstances
expectedAllocatableResourcesResponse *podresourcesapi.AllocatableResourcesResponse
}{
{
desc: "no devices, no CPUs",
allCPUs: cpuset.CPUSet{},
allDevices: devicemanager.NewResourceDeviceInstances(),
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{},
},
{
desc: "no devices, all CPUs",
allCPUs: allCPUs,
allDevices: devicemanager.NewResourceDeviceInstances(),
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{
CpuIds: allCPUs.ToSliceNoSortInt64(),
},
},
{
desc: "with devices, all CPUs",
allCPUs: allCPUs,
allDevices: allDevs,
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{
CpuIds: allCPUs.ToSliceNoSortInt64(),
Devices: []*podresourcesapi.ContainerDevices{
{
ResourceName: "resource",
DeviceIds: []string{"dev0"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
},
},
},
{
ResourceName: "resource",
DeviceIds: []string{"dev1"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 1,
},
},
},
},
},
},
},
{
desc: "with devices, no CPUs",
allCPUs: cpuset.CPUSet{},
allDevices: allDevs,
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{
Devices: []*podresourcesapi.ContainerDevices{
{
ResourceName: "resource",
DeviceIds: []string{"dev0"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
},
},
},
{
ResourceName: "resource",
DeviceIds: []string{"dev1"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 1,
},
},
},
},
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
m := new(mockProvider)
m.On("GetDevices", "", "").Return([]*podresourcesapi.ContainerDevices{})
m.On("GetCPUs", "", "").Return(cpuset.CPUSet{})
m.On("UpdateAllocatedDevices").Return()
m.On("GetAllocatableDevices").Return(tc.allDevices)
m.On("GetAllocatableCPUs").Return(tc.allCPUs)
server := NewV1PodResourcesServer(m, m, m)

resp, err := server.GetAllocatableResources(context.TODO(), &podresourcesapi.AllocatableResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
}

if !equalAllocatableResourcesResponse(tc.expectedAllocatableResourcesResponse, resp) {
t.Errorf("want resp = %s, got %s", tc.expectedAllocatableResourcesResponse.String(), resp.String())
}
})
}
}

func equalListResponse(respA, respB *podresourcesapi.ListPodResourcesResponse) bool {
if len(respA.PodResources) != len(respB.PodResources) {
return false
Expand Down Expand Up @@ -177,26 +315,34 @@ func equalListResponse(respA, respB *podresourcesapi.ListPodResourcesResponse) b
return false
}

if len(cntA.Devices) != len(cntB.Devices) {
if !equalContainerDevices(cntA.Devices, cntB.Devices) {
return false
}
}
}
return true
}

for kdx := 0; kdx < len(cntA.Devices); kdx++ {
cntDevA := cntA.Devices[kdx]
cntDevB := cntB.Devices[kdx]

if cntDevA.ResourceName != cntDevB.ResourceName {
return false
}
if !equalTopology(cntDevA.Topology, cntDevB.Topology) {
return false
}
if !equalStrings(cntDevA.DeviceIds, cntDevB.DeviceIds) {
return false
}
}
func equalContainerDevices(devA, devB []*podresourcesapi.ContainerDevices) bool {
if len(devA) != len(devB) {
return false
}

for idx := 0; idx < len(devA); idx++ {
cntDevA := devA[idx]
cntDevB := devB[idx]

if cntDevA.ResourceName != cntDevB.ResourceName {
return false
}
if !equalTopology(cntDevA.Topology, cntDevB.Topology) {
return false
}
if !equalStrings(cntDevA.DeviceIds, cntDevB.DeviceIds) {
return false
}
}

return true
}

Expand Down Expand Up @@ -231,3 +377,10 @@ func equalTopology(a, b *podresourcesapi.TopologyInfo) bool {
}
return reflect.DeepEqual(a, b)
}

func equalAllocatableResourcesResponse(respA, respB *podresourcesapi.AllocatableResourcesResponse) bool {
if !equalInt64s(respA.CpuIds, respB.CpuIds) {
return false
}
return equalContainerDevices(respA.Devices, respB.Devices)
}
11 changes: 11 additions & 0 deletions pkg/kubelet/apis/podresources/server_v1alpha1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
)

type mockProvider struct {
Expand All @@ -53,6 +54,16 @@ func (m *mockProvider) UpdateAllocatedDevices() {
m.Called()
}

func (m *mockProvider) GetAllocatableDevices() devicemanager.ResourceDeviceInstances {
args := m.Called()
return args.Get(0).(devicemanager.ResourceDeviceInstances)
}

func (m *mockProvider) GetAllocatableCPUs() cpuset.CPUSet {
args := m.Called()
return args.Get(0).(cpuset.CPUSet)
}

func TestListPodResourcesV1alpha1(t *testing.T) {
podName := "pod-name"
podNamespace := "pod-namespace"
Expand Down
3 changes: 3 additions & 0 deletions pkg/kubelet/apis/podresources/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"k8s.io/api/core/v1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
)

// DevicesProvider knows how to provide the devices used by the given container
type DevicesProvider interface {
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
UpdateAllocatedDevices()
GetAllocatableDevices() devicemanager.ResourceDeviceInstances
}

// PodsProvider knows how to provide the pods admitted by the node
Expand All @@ -36,4 +38,5 @@ type PodsProvider interface {
// CPUsProvider knows how to provide the cpus used by the given container
type CPUsProvider interface {
GetCPUs(podUID, containerName string) cpuset.CPUSet
GetAllocatableCPUs() cpuset.CPUSet
}
7 changes: 7 additions & 0 deletions pkg/kubelet/cm/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
Expand Down Expand Up @@ -109,6 +110,9 @@ type ContainerManager interface {
// GetCPUs returns information about the cpus assigned to pods and containers
GetCPUs(podUID, containerName string) cpuset.CPUSet

// GetAllocatableCPUs returns the allocatable (not allocated) CPUs
GetAllocatableCPUs() cpuset.CPUSet

// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed,
// due to node recreation.
ShouldResetExtendedResourceCapacity() bool
Expand All @@ -118,6 +122,9 @@ type ContainerManager interface {

// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
UpdateAllocatedDevices()

// GetAllocatableDevices returns information about all the devices known to the manager
GetAllocatableDevices() devicemanager.ResourceDeviceInstances
}

type NodeConfig struct {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,3 +1084,11 @@ func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
func (cm *containerManagerImpl) UpdateAllocatedDevices() {
cm.deviceManager.UpdateAllocatedDevices()
}

func (cm *containerManagerImpl) GetAllocatableDevices() devicemanager.ResourceDeviceInstances {
return cm.deviceManager.GetAllocatableDevices()
}

func (cm *containerManagerImpl) GetAllocatableCPUs() cpuset.CPUSet {
return cm.cpuManager.GetAllocatableCPUs()
}
9 changes: 9 additions & 0 deletions pkg/kubelet/cm/container_manager_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
Expand Down Expand Up @@ -131,6 +132,14 @@ func (cm *containerManagerStub) GetCPUs(_, _ string) cpuset.CPUSet {
return cpuset.CPUSet{}
}

func (cm *containerManagerStub) GetAllocatableDevices() devicemanager.ResourceDeviceInstances {
return nil
}

func (cm *containerManagerStub) GetAllocatableCPUs() cpuset.CPUSet {
return cpuset.CPUSet{}
}

func NewStubContainerManager() ContainerManager {
return &containerManagerStub{shouldResetExtendedResourceCapacity: false}
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/kubelet/cm/container_manager_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
internalapi "k8s.io/cri-api/pkg/apis"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
Expand Down Expand Up @@ -235,3 +237,11 @@ func (cm *containerManagerImpl) UpdateAllocatedDevices() {
func (cm *containerManagerImpl) GetCPUs(_, _ string) []int64 {
return nil
}

func (cm *containerManagerImpl) GetAllocatableDevices() devicemanager.ResourceDeviceInstances {
return nil
}

func (cm *containerManagerImpl) GetAllocatableCPUs() cpuset.CPUSet {
return nil
}
Loading

0 comments on commit 1cd2577

Please sign in to comment.