Skip to content

Commit

Permalink
topology-updater: Add Node OwnerReference to NRTs
Browse files Browse the repository at this point in the history
Add the corresponding Node as OwnerReference in thr NRT, so it gets
deleted if the Node object gets deleted, to avoid leftovers in the
cluster.
  • Loading branch information
jlojosnegros committed Sep 12, 2023
1 parent c26ee82 commit 99d956f
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 14 deletions.
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func LoadArgs(args ...string) (ProgArgs, error) {
flags.StringVar(&pArgs.RTE.KubeletConfigFile, "kubelet-config-file", "/podresources/config.yaml", "Kubelet config file path.")
flags.StringVar(&pArgs.RTE.PodResourcesSocketPath, "podresources-socket", "unix:///podresources/kubelet.sock", "Pod Resource Socket path to use.")
flags.BoolVar(&pArgs.RTE.PodReadinessEnable, "podreadiness", true, "Custom condition injection using Podreadiness.")
flags.BoolVar(&pArgs.RTE.AddNRTOwnerEnable, "add-nrt-owner", true, "RTE will inject NRT's related node as OwnerReference to ensure cleanup if the node is deleted.")

refCnt := flags.String("reference-container", "", "Reference container, used to learn about the shared cpu pool\n See: https://github.com/kubernetes/kubernetes/issues/102190\n format of spec is namespace/podname/containername.\n Alternatively, you can use the env vars REFERENCE_NAMESPACE, REFERENCE_POD_NAME, REFERENCE_CONTAINER_NAME.")

Expand Down
80 changes: 80 additions & 0 deletions pkg/nrtupdater/node_getter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package nrtupdater

import (
"context"
"errors"
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

var NotConfigured = errors.New("unconfigured feature")

type NotFound struct {
NodeName string
}

func (err NotFound) Error() string {
return "node " + err.NodeName + " Not Found"
}

type ConnectionError struct {
Err error
}

func (err ConnectionError) Error() string {
return fmt.Sprintf("error connection k8s: %v", err.Err)
}
func (err ConnectionError) Unwrap() error {
return err.Err
}

type NodeGetter interface {
Get(ctx context.Context, nodeName string, opts metav1.GetOptions) (*corev1.Node, error)
}

type DisabledNodeGetter struct {
}

func (ng *DisabledNodeGetter) Get(ctx context.Context, nodeName string, opts metav1.GetOptions) (*corev1.Node, error) {
return nil, fmt.Errorf("%w", NotConfigured)
}

type CachedNodeGetter struct {
nodes map[string]*corev1.Node
}

func NewCachedNodeGetter(k8sInterface kubernetes.Interface, ctx context.Context) *CachedNodeGetter {
nodelist, err := k8sInterface.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return nil
}

retVal := &CachedNodeGetter{nodes: make(map[string]*corev1.Node, len(nodelist.Items))}
for idx := range nodelist.Items {
node := &nodelist.Items[idx]
retVal.nodes[node.Name] = node
}

return retVal
}

func (ng *CachedNodeGetter) Get(ctx context.Context, nodeName string, _ metav1.GetOptions) (*corev1.Node, error) {
if node, found := ng.nodes[nodeName]; found {
return node, nil
}
return nil, fmt.Errorf("%w", NotFound{NodeName: nodeName})
}

func NewNodeGetter(enabled bool, k8xcli kubernetes.Interface, ctx context.Context) NodeGetter {
if enabled {
ret := NewCachedNodeGetter(k8xcli, ctx)
if ret == nil {
return nil
}
return ret
}
return &DisabledNodeGetter{}
}
46 changes: 37 additions & 9 deletions pkg/nrtupdater/nrtupdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package nrtupdater

import (
"context"
"errors"
"fmt"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -42,9 +43,10 @@ func (conf TMConfig) IsValid() bool {
}

type NRTUpdater struct {
args Args
tmConfig TMConfig
stopChan chan struct{}
args Args
tmConfig TMConfig
stopChan chan struct{}
nodeGetter NodeGetter
}

type MonitorInfo struct {
Expand All @@ -61,11 +63,12 @@ func (mi MonitorInfo) UpdateReason() string {
return RTEUpdateReactive
}

func NewNRTUpdater(args Args, tmconf TMConfig) *NRTUpdater {
func NewNRTUpdater(nodeGetter NodeGetter, args Args, tmconf TMConfig) *NRTUpdater {
return &NRTUpdater{
args: args,
tmConfig: tmconf,
stopChan: make(chan struct{}),
args: args,
tmConfig: tmconf,
stopChan: make(chan struct{}),
nodeGetter: nodeGetter,
}
}

Expand All @@ -89,7 +92,7 @@ func (te *NRTUpdater) UpdateWithClient(cli topologyclientset.Interface, info Mon
}

nrt, err := cli.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), te.args.Hostname, metav1.GetOptions{})
if errors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
nrtNew := v1alpha2.NodeResourceTopology{
ObjectMeta: metav1.ObjectMeta{
Name: te.args.Hostname,
Expand Down Expand Up @@ -130,6 +133,31 @@ func (te *NRTUpdater) updateNRTInfo(nrt *v1alpha2.NodeResourceTopology, info Mon
nrt.Attributes = info.Attributes.DeepCopy()
nrt.Attributes = append(nrt.Attributes, te.makeAttributes()...)
// TODO: check for duplicate attributes?

te.updateOwnerReferences(nrt)
}

// updateOwnerReferences ensure nrt.OwnerReferences include a reference to the Node with the same name as the NRT
//
// Check nrt.OwnerReferences for Node references and update it so it has only one Node reference,
// the one to the Node with the same name as the NRT.
func (te *NRTUpdater) updateOwnerReferences(nrt *v1alpha2.NodeResourceTopology) {
node, err := te.nodeGetter.Get(context.TODO(), nrt.Name, metav1.GetOptions{})
if err != nil {
if errors.Is(err, NotConfigured) {
return
}
klog.V(7).Infof("nrtupdater unable to get Node %s. Can't add Owner reference. error: %v", nrt.Name, err)
return
}
nodeReference := metav1.OwnerReference{
APIVersion: "v1",
Kind: "Node",
Name: node.Name,
UID: node.UID,
}

nrt.OwnerReferences = []metav1.OwnerReference{nodeReference}
}

func (te *NRTUpdater) makeAttributes() v1alpha2.AttributeList {
Expand Down
115 changes: 113 additions & 2 deletions pkg/nrtupdater/nrtupdater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ limitations under the License.
package nrtupdater

import (
"context"
"reflect"
"testing"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
clientk8sfake "k8s.io/client-go/kubernetes/fake"

"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned/fake"
Expand All @@ -48,7 +51,9 @@ func TestUpdateTMPolicy(t *testing.T) {
}

var err error
nrtUpd = NewNRTUpdater(args, tmConfInitial)
k8sClient := clientk8sfake.NewSimpleClientset()
nodeGetter := NewCachedNodeGetter(k8sClient, context.Background())
nrtUpd = NewNRTUpdater(nodeGetter, args, tmConfInitial)
err = nrtUpd.UpdateWithClient(
cli,
MonitorInfo{
Expand Down Expand Up @@ -85,7 +90,7 @@ func TestUpdateTMPolicy(t *testing.T) {
}
checkTMConfig(t, obj, tmConfInitial)

nrtUpd = NewNRTUpdater(args, tmConfUpdated)
nrtUpd = NewNRTUpdater(nodeGetter, args, tmConfUpdated)
err = nrtUpd.UpdateWithClient(
cli,
MonitorInfo{
Expand Down Expand Up @@ -121,6 +126,89 @@ func TestUpdateTMPolicy(t *testing.T) {
}
checkTMConfig(t, obj, tmConfUpdated)
}
func TestUpdateOwnerReferences(t *testing.T) {
nodeName := "test-node"

args := Args{
Hostname: nodeName,
}
tmConfig := TMConfig{
Scope: "scope-whatever",
Policy: "policy-whatever",
}

zoneInfo := v1alpha2.Zone{
Name: "test-zone-0",
Type: "node",
Resources: v1alpha2.ResourceInfoList{
{
Name: string(corev1.ResourceCPU),
Capacity: resource.MustParse("16"),
Allocatable: resource.MustParse("14"),
Available: resource.MustParse("14"),
},
{
Name: string(corev1.ResourceMemory),
Capacity: resource.MustParse("32Gi"),
Allocatable: resource.MustParse("30Gi"),
Available: resource.MustParse("30Gi"),
},
},
}

node := corev1.Node{
TypeMeta: metav1.TypeMeta{
Kind: "Node",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
},
}

expected := metav1.OwnerReference{
Kind: node.Kind,
Name: node.Name,
APIVersion: node.APIVersion,
UID: node.UID,
}

var nrtUpd *NRTUpdater

cli := fake.NewSimpleClientset()
var err error
k8sClient := clientk8sfake.NewSimpleClientset(&node)
nodeGetter := NewCachedNodeGetter(k8sClient, context.Background())
nrtUpd = NewNRTUpdater(nodeGetter, args, tmConfig)

err = nrtUpd.UpdateWithClient(
cli,
MonitorInfo{Zones: v1alpha2.ZoneList{zoneInfo}},
)
if err != nil {
t.Fatalf("failed to perform the initial creation: %v", err)
}

nrtResource := schema.GroupVersionResource{Group: "topology.node.k8s.io", Version: "v1alpha2", Resource: "noderesourcetopologies"}
obj, err := cli.Tracker().Get(nrtResource, "", nodeName)
if err != nil {
t.Fatalf("failed to get the NRT object from tracker: %v", err)
}
checkOwnerReferences(t, obj, expected)

err = nrtUpd.UpdateWithClient(
cli,
MonitorInfo{Zones: v1alpha2.ZoneList{zoneInfo}},
)
if err != nil {
t.Fatalf("failed to perform the initial creation: %v", err)
}
obj, err = cli.Tracker().Get(nrtResource, "", nodeName)
if err != nil {
t.Fatalf("failed to get the NRT object from tracker: %v", err)
}
checkOwnerReferences(t, obj, expected)
}

func checkTMConfig(t *testing.T, obj runtime.Object, expectedConf TMConfig) {
t.Helper()
Expand Down Expand Up @@ -152,3 +240,26 @@ func tmConfigFromAttributes(attrs v1alpha2.AttributeList) TMConfig {
}
return conf
}

func checkOwnerReferences(t *testing.T, obj runtime.Object, expected metav1.OwnerReference) {
t.Helper()

nrtObj, ok := obj.(*v1alpha2.NodeResourceTopology)
if !ok {
t.Fatalf("provided object is not a NodeResourceTopology")
}

nodeReferences := []metav1.OwnerReference{}
for _, own := range nrtObj.OwnerReferences {
if own.Kind == "Node" {
nodeReferences = append(nodeReferences, own)
}
}

if len(nodeReferences) != 1 {
t.Fatalf("unexpected number of node OwnerReferences: %#v", nodeReferences)
}
if !reflect.DeepEqual(nodeReferences[0], expected) {
t.Fatalf("unexpected node OwnerReference. got=%+#v expected=%+#v", nodeReferences[0], expected)
}
}
10 changes: 9 additions & 1 deletion pkg/resourcetopologyexporter/resourcetopologyexporter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package resourcetopologyexporter

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -28,6 +29,7 @@ type Args struct {
NotifyFilePath string
MaxEventsPerTimeUnit int64
TimeUnitToLimitEvents time.Duration
AddNRTOwnerEnable bool
}

type tmSettings struct {
Expand All @@ -40,6 +42,12 @@ func Execute(hnd resourcemonitor.Handle, nrtupdaterArgs nrtupdater.Args, resourc
return err
}

nodeGetter := nrtupdater.NewNodeGetter(rteArgs.AddNRTOwnerEnable, hnd.K8SCli, context.TODO())
if nodeGetter == nil {
klog.V(2).Info("Cannot enable 'add-nrt-owner'. Unable to get node info")
return fmt.Errorf("Cannot enable 'add-nrt-owner'. Unable to get node info")
}

var condChan chan v1.PodCondition
if rteArgs.PodReadinessEnable {
condChan = make(chan v1.PodCondition)
Expand All @@ -61,7 +69,7 @@ func Execute(hnd resourcemonitor.Handle, nrtupdaterArgs nrtupdater.Args, resourc
}
go resObs.Run(eventSource.Events(), condChan)

upd := nrtupdater.NewNRTUpdater(nrtupdaterArgs, tmConf.config)
upd := nrtupdater.NewNRTUpdater(nodeGetter, nrtupdaterArgs, tmConf.config)
go upd.Run(resObs.Infos, condChan)

go eventSource.Run()
Expand Down
2 changes: 1 addition & 1 deletion test/data/TestDefaults.expected.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"NRTupdater":{"NoPublish":false,"Oneshot":false,"Hostname":"TEST_NODE"},"Resourcemonitor":{"Namespace":"","SysfsRoot":"/sys","ResourceExclude":null,"RefreshNodeResources":false,"PodSetFingerprint":true,"PodSetFingerprintMethod":"with-exclusive-resources","ExposeTiming":false,"PodSetFingerprintStatusFile":"","PodExclude":null,"ExcludeTerminalPods":false},"RTE":{"Debug":false,"ReferenceContainer":{"Namespace":"TEST_NS","PodName":"TEST_POD","ContainerName":"TEST_CONT"},"TopologyManagerPolicy":"","TopologyManagerScope":"","KubeletConfigFile":"/podresources/config.yaml","PodResourcesSocketPath":"unix:///podresources/kubelet.sock","SleepInterval":60000000000,"PodReadinessEnable":true,"NotifyFilePath":"","MaxEventsPerTimeUnit":1,"TimeUnitToLimitEvents":1000000000},"Version":false,"DumpConfig":""}
{"NRTupdater":{"NoPublish":false,"Oneshot":false,"Hostname":"TEST_NODE"},"Resourcemonitor":{"Namespace":"","SysfsRoot":"/sys","ResourceExclude":null,"RefreshNodeResources":false,"PodSetFingerprint":true,"PodSetFingerprintMethod":"with-exclusive-resources","ExposeTiming":false,"PodSetFingerprintStatusFile":"","PodExclude":null,"ExcludeTerminalPods":false},"RTE":{"Debug":false,"ReferenceContainer":{"Namespace":"TEST_NS","PodName":"TEST_POD","ContainerName":"TEST_CONT"},"TopologyManagerPolicy":"","TopologyManagerScope":"","KubeletConfigFile":"/podresources/config.yaml","PodResourcesSocketPath":"unix:///podresources/kubelet.sock","SleepInterval":60000000000,"PodReadinessEnable":true,"NotifyFilePath":"","MaxEventsPerTimeUnit":1,"TimeUnitToLimitEvents":1000000000,"AddNRTOwnerEnable":true},"Version":false,"DumpConfig":""}
Loading

0 comments on commit 99d956f

Please sign in to comment.