Skip to content

Commit

Permalink
fix(inventory): recalculate inventory if node state changes (#256)
Browse files Browse the repository at this point in the history
k8s node capabilities.allocatable may go to 0 when devices via device
plugin become unavailable due to various issues.

recalculate the current node's inventory if any of the fields in
capabilities.allocatable is changed

refs akash-network/support#249

Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian authored Dec 4, 2024
1 parent bf2a150 commit a489a33
Showing 1 changed file with 80 additions and 28 deletions.
108 changes: 80 additions & 28 deletions operator/inventory/node-discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/client-go/kubernetes"

v1 "github.com/akash-network/akash-api/go/inventory/v1"
types "github.com/akash-network/akash-api/go/node/types/v1beta3"

"github.com/akash-network/provider/cluster/kube/builder"
ctypes "github.com/akash-network/provider/cluster/types/v1beta3"
Expand Down Expand Up @@ -408,20 +409,27 @@ func (dp *nodeDiscovery) monitor() error {
currLabels = copyManagedLabels(knode.Labels)
}

node, err := dp.initNodeInfo(gpusIDs)
node, err := dp.initNodeInfo(gpusIDs, knode)
if err != nil {
log.Error(err, "unable to init node info")
return err
}

restartWatcher := func() error {
restartPodsWatcher := func() error {
if podsWatch != nil {
select {
case <-podsWatch.ResultChan():
default:
}
}

var terr error
podsWatch, terr = kc.CoreV1().Pods(corev1.NamespaceAll).Watch(dp.ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", dp.name).String(),
})

if terr != nil {
log.Error(terr, "unable to watch start pods")
log.Error(terr, "unable to start pods watcher")
return terr
}

Expand All @@ -446,7 +454,7 @@ func (dp *nodeDiscovery) monitor() error {
return nil
}

err = restartWatcher()
err = restartPodsWatcher()
if err != nil {
return err
}
Expand Down Expand Up @@ -504,17 +512,25 @@ func (dp *nodeDiscovery) monitor() error {
switch obj := evt.Object.(type) {
case *corev1.Node:
if obj.Name == dp.name {
knode = obj.DeepCopy()
switch evt.Type {
case watch.Modified:
if nodeAllocatableChanged(knode, obj, &node) {
podsWatch.Stop()
if err = restartPodsWatcher(); err != nil {
return err
}
}

signalLabels()
}

knode = obj.DeepCopy()
}
}
case res, isopen := <-podsWatch.ResultChan():
if !isopen {
podsWatch.Stop()
if err = restartWatcher(); err != nil {
if err = restartPodsWatcher(); err != nil {
return err
}

Expand Down Expand Up @@ -596,17 +612,30 @@ func (dp *nodeDiscovery) monitor() error {
}
}

func (dp *nodeDiscovery) initNodeInfo(gpusIDs RegistryGPUVendors) (v1.Node, error) {
kc := fromctx.MustKubeClientFromCtx(dp.ctx)
func nodeAllocatableChanged(prev *corev1.Node, curr *corev1.Node, node *v1.Node) bool {
changed := len(prev.Status.Allocatable) != len(curr.Status.Allocatable)

cpuInfo := dp.parseCPUInfo(dp.ctx)
gpuInfo := dp.parseGPUInfo(dp.ctx, gpusIDs)
if !changed {
for pres, pval := range prev.Status.Allocatable {
cval, exists := curr.Status.Allocatable[pres]
if !exists || (pval.Value() != cval.Value()) {
changed = true
break
}
}
}

knode, err := kc.CoreV1().Nodes().Get(dp.ctx, dp.name, metav1.GetOptions{})
if err != nil {
return v1.Node{}, fmt.Errorf("%w: error fetching node %s", err, dp.name)
if changed {
updateNodeInfo(curr, node)
}

return changed
}

func (dp *nodeDiscovery) initNodeInfo(gpusIDs RegistryGPUVendors, knode *corev1.Node) (v1.Node, error) {

Check failure on line 635 in operator/inventory/node-discovery.go

View workflow job for this annotation

GitHub Actions / lint

(*nodeDiscovery).initNodeInfo - result 1 (error) is always nil (unparam)
cpuInfo := dp.parseCPUInfo(dp.ctx)
gpuInfo := dp.parseGPUInfo(dp.ctx, gpusIDs)

res := v1.Node{
Name: knode.Name,
Resources: v1.NodeResources{
Expand All @@ -628,31 +657,50 @@ func (dp *nodeDiscovery) initNodeInfo(gpusIDs RegistryGPUVendors) (v1.Node, erro
},
}

updateNodeInfo(knode, &res)

return res, nil
}

func updateNodeInfo(knode *corev1.Node, node *v1.Node) {
for name, r := range knode.Status.Allocatable {
switch name {
case corev1.ResourceCPU:
res.Resources.CPU.Quantity.Allocatable.SetMilli(r.MilliValue())
node.Resources.CPU.Quantity.Allocatable.SetMilli(r.MilliValue())
case corev1.ResourceMemory:
res.Resources.Memory.Quantity.Allocatable.Set(r.Value())
node.Resources.Memory.Quantity.Allocatable.Set(r.Value())
case corev1.ResourceEphemeralStorage:
res.Resources.EphemeralStorage.Allocatable.Set(r.Value())
node.Resources.EphemeralStorage.Allocatable.Set(r.Value())
case builder.ResourceGPUNvidia:
fallthrough
case builder.ResourceGPUAMD:
res.Resources.GPU.Quantity.Allocatable.Set(r.Value())
node.Resources.GPU.Quantity.Allocatable.Set(r.Value())
}
}

return res, nil
}

// // trimAllocated ensure allocated does not overrun allocatable
// // Deprecated to be replaced with function from akash-api after sdk-47 upgrade
// func trimAllocated(rp *v1.ResourcePair) {
// allocated := rp.Allocated.Value()
// allocatable := rp.Allocatable.Value()
//
// if allocated <= allocatable {
// return
// }
//
// allocated = allocatable
//
// rp.Allocated.Set(allocated)
// }

func nodeResetAllocated(node *v1.Node) {
node.Resources.CPU.Quantity.Allocated = resource.NewMilliQuantity(0, resource.DecimalSI)
node.Resources.GPU.Quantity.Allocated = resource.NewQuantity(0, resource.DecimalSI)
node.Resources.Memory.Quantity.Allocated = resource.NewMilliQuantity(0, resource.DecimalSI)
node.Resources.EphemeralStorage.Allocated = resource.NewMilliQuantity(0, resource.DecimalSI)
node.Resources.VolumesAttached.Allocated = resource.NewMilliQuantity(0, resource.DecimalSI)
node.Resources.VolumesMounted.Allocated = resource.NewMilliQuantity(0, resource.DecimalSI)
node.Resources.Memory.Quantity.Allocated = resource.NewQuantity(0, resource.DecimalSI)
node.Resources.EphemeralStorage.Allocated = resource.NewQuantity(0, resource.DecimalSI)
node.Resources.VolumesAttached.Allocated = resource.NewQuantity(0, resource.DecimalSI)
node.Resources.VolumesMounted.Allocated = resource.NewQuantity(0, resource.DecimalSI)
}

func addPodAllocatedResources(node *v1.Node, pod *corev1.Pod) {
Expand All @@ -669,6 +717,7 @@ func addPodAllocatedResources(node *v1.Node, pod *corev1.Pod) {
fallthrough
case builder.ResourceGPUAMD:
node.Resources.GPU.Quantity.Allocated.Add(quantity)
// GPU overcommit is not allowed, if that happens something is terribly wrong with the inventory
}
}

Expand All @@ -686,17 +735,18 @@ func addPodAllocatedResources(node *v1.Node, pod *corev1.Pod) {
func subPodAllocatedResources(node *v1.Node, pod *corev1.Pod) {
for _, container := range pod.Spec.Containers {
for name, quantity := range container.Resources.Requests {
rv := types.NewResourceValue(uint64(quantity.Value()))

Check failure on line 738 in operator/inventory/node-discovery.go

View workflow job for this annotation

GitHub Actions / lint

G115: integer overflow conversion int64 -> uint64 (gosec)
switch name {
case corev1.ResourceCPU:
node.Resources.CPU.Quantity.Allocated.Sub(quantity)
node.Resources.CPU.Quantity.SubMilliNLZ(rv)
case corev1.ResourceMemory:
node.Resources.Memory.Quantity.Allocated.Sub(quantity)
node.Resources.Memory.Quantity.SubNLZ(rv)
case corev1.ResourceEphemeralStorage:
node.Resources.EphemeralStorage.Allocated.Sub(quantity)
node.Resources.EphemeralStorage.SubNLZ(rv)
case builder.ResourceGPUNvidia:
fallthrough
case builder.ResourceGPUAMD:
node.Resources.GPU.Quantity.Allocated.Sub(quantity)
node.Resources.GPU.Quantity.SubNLZ(rv)
}
}

Expand All @@ -705,7 +755,9 @@ func subPodAllocatedResources(node *v1.Node, pod *corev1.Pod) {
continue
}

node.Resources.Memory.Quantity.Allocated.Sub(*vol.EmptyDir.SizeLimit)
rv := types.NewResourceValue(uint64((*vol.EmptyDir.SizeLimit).Value()))

Check failure on line 758 in operator/inventory/node-discovery.go

View workflow job for this annotation

GitHub Actions / lint

G115: integer overflow conversion int64 -> uint64 (gosec)

node.Resources.Memory.Quantity.SubNLZ(rv)
}
}
}
Expand Down

0 comments on commit a489a33

Please sign in to comment.