From 99d956fc568c5792ed61699b7e6a3b31c8b73ae8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Luis=20Ojosnegros=20Manch=C3=B3n?= Date: Wed, 6 Sep 2023 12:31:40 +0200 Subject: [PATCH] topology-updater: Add Node OwnerReference to NRTs Add the corresponding Node as OwnerReference in thr NRT, so it gets deleted if the Node object gets deleted, to avoid leftovers in the cluster. --- pkg/config/config.go | 1 + pkg/nrtupdater/node_getter.go | 80 ++++++++++++ pkg/nrtupdater/nrtupdater.go | 46 +++++-- pkg/nrtupdater/nrtupdater_test.go | 115 +++++++++++++++++- .../resourcetopologyexporter.go | 10 +- test/data/TestDefaults.expected.json | 2 +- test/e2e/topology_updater/topology_updater.go | 28 +++++ tools/nrtstress/main.go | 4 +- 8 files changed, 272 insertions(+), 14 deletions(-) create mode 100644 pkg/nrtupdater/node_getter.go diff --git a/pkg/config/config.go b/pkg/config/config.go index 8f20be1c..66bb7191 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -111,6 +111,7 @@ func LoadArgs(args ...string) (ProgArgs, error) { flags.StringVar(&pArgs.RTE.KubeletConfigFile, "kubelet-config-file", "/podresources/config.yaml", "Kubelet config file path.") flags.StringVar(&pArgs.RTE.PodResourcesSocketPath, "podresources-socket", "unix:///podresources/kubelet.sock", "Pod Resource Socket path to use.") flags.BoolVar(&pArgs.RTE.PodReadinessEnable, "podreadiness", true, "Custom condition injection using Podreadiness.") + flags.BoolVar(&pArgs.RTE.AddNRTOwnerEnable, "add-nrt-owner", true, "RTE will inject NRT's related node as OwnerReference to ensure cleanup if the node is deleted.") refCnt := flags.String("reference-container", "", "Reference container, used to learn about the shared cpu pool\n See: https://github.com/kubernetes/kubernetes/issues/102190\n format of spec is namespace/podname/containername.\n Alternatively, you can use the env vars REFERENCE_NAMESPACE, REFERENCE_POD_NAME, REFERENCE_CONTAINER_NAME.") diff --git a/pkg/nrtupdater/node_getter.go b/pkg/nrtupdater/node_getter.go new file mode 100644 index 00000000..460da1b0 --- /dev/null +++ b/pkg/nrtupdater/node_getter.go @@ -0,0 +1,80 @@ +package nrtupdater + +import ( + "context" + "errors" + "fmt" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +var NotConfigured = errors.New("unconfigured feature") + +type NotFound struct { + NodeName string +} + +func (err NotFound) Error() string { + return "node " + err.NodeName + " Not Found" +} + +type ConnectionError struct { + Err error +} + +func (err ConnectionError) Error() string { + return fmt.Sprintf("error connection k8s: %v", err.Err) +} +func (err ConnectionError) Unwrap() error { + return err.Err +} + +type NodeGetter interface { + Get(ctx context.Context, nodeName string, opts metav1.GetOptions) (*corev1.Node, error) +} + +type DisabledNodeGetter struct { +} + +func (ng *DisabledNodeGetter) Get(ctx context.Context, nodeName string, opts metav1.GetOptions) (*corev1.Node, error) { + return nil, fmt.Errorf("%w", NotConfigured) +} + +type CachedNodeGetter struct { + nodes map[string]*corev1.Node +} + +func NewCachedNodeGetter(k8sInterface kubernetes.Interface, ctx context.Context) *CachedNodeGetter { + nodelist, err := k8sInterface.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil + } + + retVal := &CachedNodeGetter{nodes: make(map[string]*corev1.Node, len(nodelist.Items))} + for idx := range nodelist.Items { + node := &nodelist.Items[idx] + retVal.nodes[node.Name] = node + } + + return retVal +} + +func (ng *CachedNodeGetter) Get(ctx context.Context, nodeName string, _ metav1.GetOptions) (*corev1.Node, error) { + if node, found := ng.nodes[nodeName]; found { + return node, nil + } + return nil, fmt.Errorf("%w", NotFound{NodeName: nodeName}) +} + +func NewNodeGetter(enabled bool, k8xcli kubernetes.Interface, ctx context.Context) NodeGetter { + if enabled { + ret := NewCachedNodeGetter(k8xcli, ctx) + if ret == nil { + return nil + } + return ret + } + return &DisabledNodeGetter{} +} diff --git a/pkg/nrtupdater/nrtupdater.go b/pkg/nrtupdater/nrtupdater.go index e12abec6..7b9ea9f4 100644 --- a/pkg/nrtupdater/nrtupdater.go +++ b/pkg/nrtupdater/nrtupdater.go @@ -2,11 +2,12 @@ package nrtupdater import ( "context" + "errors" "fmt" "time" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" @@ -42,9 +43,10 @@ func (conf TMConfig) IsValid() bool { } type NRTUpdater struct { - args Args - tmConfig TMConfig - stopChan chan struct{} + args Args + tmConfig TMConfig + stopChan chan struct{} + nodeGetter NodeGetter } type MonitorInfo struct { @@ -61,11 +63,12 @@ func (mi MonitorInfo) UpdateReason() string { return RTEUpdateReactive } -func NewNRTUpdater(args Args, tmconf TMConfig) *NRTUpdater { +func NewNRTUpdater(nodeGetter NodeGetter, args Args, tmconf TMConfig) *NRTUpdater { return &NRTUpdater{ - args: args, - tmConfig: tmconf, - stopChan: make(chan struct{}), + args: args, + tmConfig: tmconf, + stopChan: make(chan struct{}), + nodeGetter: nodeGetter, } } @@ -89,7 +92,7 @@ func (te *NRTUpdater) UpdateWithClient(cli topologyclientset.Interface, info Mon } nrt, err := cli.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), te.args.Hostname, metav1.GetOptions{}) - if errors.IsNotFound(err) { + if apierrors.IsNotFound(err) { nrtNew := v1alpha2.NodeResourceTopology{ ObjectMeta: metav1.ObjectMeta{ Name: te.args.Hostname, @@ -130,6 +133,31 @@ func (te *NRTUpdater) updateNRTInfo(nrt *v1alpha2.NodeResourceTopology, info Mon nrt.Attributes = info.Attributes.DeepCopy() nrt.Attributes = append(nrt.Attributes, te.makeAttributes()...) // TODO: check for duplicate attributes? + + te.updateOwnerReferences(nrt) +} + +// updateOwnerReferences ensure nrt.OwnerReferences include a reference to the Node with the same name as the NRT +// +// Check nrt.OwnerReferences for Node references and update it so it has only one Node reference, +// the one to the Node with the same name as the NRT. +func (te *NRTUpdater) updateOwnerReferences(nrt *v1alpha2.NodeResourceTopology) { + node, err := te.nodeGetter.Get(context.TODO(), nrt.Name, metav1.GetOptions{}) + if err != nil { + if errors.Is(err, NotConfigured) { + return + } + klog.V(7).Infof("nrtupdater unable to get Node %s. Can't add Owner reference. error: %v", nrt.Name, err) + return + } + nodeReference := metav1.OwnerReference{ + APIVersion: "v1", + Kind: "Node", + Name: node.Name, + UID: node.UID, + } + + nrt.OwnerReferences = []metav1.OwnerReference{nodeReference} } func (te *NRTUpdater) makeAttributes() v1alpha2.AttributeList { diff --git a/pkg/nrtupdater/nrtupdater_test.go b/pkg/nrtupdater/nrtupdater_test.go index 624652c9..0575ea06 100644 --- a/pkg/nrtupdater/nrtupdater_test.go +++ b/pkg/nrtupdater/nrtupdater_test.go @@ -17,13 +17,16 @@ limitations under the License. package nrtupdater import ( + "context" "reflect" "testing" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + clientk8sfake "k8s.io/client-go/kubernetes/fake" "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned/fake" @@ -48,7 +51,9 @@ func TestUpdateTMPolicy(t *testing.T) { } var err error - nrtUpd = NewNRTUpdater(args, tmConfInitial) + k8sClient := clientk8sfake.NewSimpleClientset() + nodeGetter := NewCachedNodeGetter(k8sClient, context.Background()) + nrtUpd = NewNRTUpdater(nodeGetter, args, tmConfInitial) err = nrtUpd.UpdateWithClient( cli, MonitorInfo{ @@ -85,7 +90,7 @@ func TestUpdateTMPolicy(t *testing.T) { } checkTMConfig(t, obj, tmConfInitial) - nrtUpd = NewNRTUpdater(args, tmConfUpdated) + nrtUpd = NewNRTUpdater(nodeGetter, args, tmConfUpdated) err = nrtUpd.UpdateWithClient( cli, MonitorInfo{ @@ -121,6 +126,89 @@ func TestUpdateTMPolicy(t *testing.T) { } checkTMConfig(t, obj, tmConfUpdated) } +func TestUpdateOwnerReferences(t *testing.T) { + nodeName := "test-node" + + args := Args{ + Hostname: nodeName, + } + tmConfig := TMConfig{ + Scope: "scope-whatever", + Policy: "policy-whatever", + } + + zoneInfo := v1alpha2.Zone{ + Name: "test-zone-0", + Type: "node", + Resources: v1alpha2.ResourceInfoList{ + { + Name: string(corev1.ResourceCPU), + Capacity: resource.MustParse("16"), + Allocatable: resource.MustParse("14"), + Available: resource.MustParse("14"), + }, + { + Name: string(corev1.ResourceMemory), + Capacity: resource.MustParse("32Gi"), + Allocatable: resource.MustParse("30Gi"), + Available: resource.MustParse("30Gi"), + }, + }, + } + + node := corev1.Node{ + TypeMeta: metav1.TypeMeta{ + Kind: "Node", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + }, + } + + expected := metav1.OwnerReference{ + Kind: node.Kind, + Name: node.Name, + APIVersion: node.APIVersion, + UID: node.UID, + } + + var nrtUpd *NRTUpdater + + cli := fake.NewSimpleClientset() + var err error + k8sClient := clientk8sfake.NewSimpleClientset(&node) + nodeGetter := NewCachedNodeGetter(k8sClient, context.Background()) + nrtUpd = NewNRTUpdater(nodeGetter, args, tmConfig) + + err = nrtUpd.UpdateWithClient( + cli, + MonitorInfo{Zones: v1alpha2.ZoneList{zoneInfo}}, + ) + if err != nil { + t.Fatalf("failed to perform the initial creation: %v", err) + } + + nrtResource := schema.GroupVersionResource{Group: "topology.node.k8s.io", Version: "v1alpha2", Resource: "noderesourcetopologies"} + obj, err := cli.Tracker().Get(nrtResource, "", nodeName) + if err != nil { + t.Fatalf("failed to get the NRT object from tracker: %v", err) + } + checkOwnerReferences(t, obj, expected) + + err = nrtUpd.UpdateWithClient( + cli, + MonitorInfo{Zones: v1alpha2.ZoneList{zoneInfo}}, + ) + if err != nil { + t.Fatalf("failed to perform the initial creation: %v", err) + } + obj, err = cli.Tracker().Get(nrtResource, "", nodeName) + if err != nil { + t.Fatalf("failed to get the NRT object from tracker: %v", err) + } + checkOwnerReferences(t, obj, expected) +} func checkTMConfig(t *testing.T, obj runtime.Object, expectedConf TMConfig) { t.Helper() @@ -152,3 +240,26 @@ func tmConfigFromAttributes(attrs v1alpha2.AttributeList) TMConfig { } return conf } + +func checkOwnerReferences(t *testing.T, obj runtime.Object, expected metav1.OwnerReference) { + t.Helper() + + nrtObj, ok := obj.(*v1alpha2.NodeResourceTopology) + if !ok { + t.Fatalf("provided object is not a NodeResourceTopology") + } + + nodeReferences := []metav1.OwnerReference{} + for _, own := range nrtObj.OwnerReferences { + if own.Kind == "Node" { + nodeReferences = append(nodeReferences, own) + } + } + + if len(nodeReferences) != 1 { + t.Fatalf("unexpected number of node OwnerReferences: %#v", nodeReferences) + } + if !reflect.DeepEqual(nodeReferences[0], expected) { + t.Fatalf("unexpected node OwnerReference. got=%+#v expected=%+#v", nodeReferences[0], expected) + } +} diff --git a/pkg/resourcetopologyexporter/resourcetopologyexporter.go b/pkg/resourcetopologyexporter/resourcetopologyexporter.go index a9c0e8cc..196ecfcc 100644 --- a/pkg/resourcetopologyexporter/resourcetopologyexporter.go +++ b/pkg/resourcetopologyexporter/resourcetopologyexporter.go @@ -1,6 +1,7 @@ package resourcetopologyexporter import ( + "context" "fmt" "time" @@ -28,6 +29,7 @@ type Args struct { NotifyFilePath string MaxEventsPerTimeUnit int64 TimeUnitToLimitEvents time.Duration + AddNRTOwnerEnable bool } type tmSettings struct { @@ -40,6 +42,12 @@ func Execute(hnd resourcemonitor.Handle, nrtupdaterArgs nrtupdater.Args, resourc return err } + nodeGetter := nrtupdater.NewNodeGetter(rteArgs.AddNRTOwnerEnable, hnd.K8SCli, context.TODO()) + if nodeGetter == nil { + klog.V(2).Info("Cannot enable 'add-nrt-owner'. Unable to get node info") + return fmt.Errorf("Cannot enable 'add-nrt-owner'. Unable to get node info") + } + var condChan chan v1.PodCondition if rteArgs.PodReadinessEnable { condChan = make(chan v1.PodCondition) @@ -61,7 +69,7 @@ func Execute(hnd resourcemonitor.Handle, nrtupdaterArgs nrtupdater.Args, resourc } go resObs.Run(eventSource.Events(), condChan) - upd := nrtupdater.NewNRTUpdater(nrtupdaterArgs, tmConf.config) + upd := nrtupdater.NewNRTUpdater(nodeGetter, nrtupdaterArgs, tmConf.config) go upd.Run(resObs.Infos, condChan) go eventSource.Run() diff --git a/test/data/TestDefaults.expected.json b/test/data/TestDefaults.expected.json index 24b0adf6..e195e792 100644 --- a/test/data/TestDefaults.expected.json +++ b/test/data/TestDefaults.expected.json @@ -1 +1 @@ -{"NRTupdater":{"NoPublish":false,"Oneshot":false,"Hostname":"TEST_NODE"},"Resourcemonitor":{"Namespace":"","SysfsRoot":"/sys","ResourceExclude":null,"RefreshNodeResources":false,"PodSetFingerprint":true,"PodSetFingerprintMethod":"with-exclusive-resources","ExposeTiming":false,"PodSetFingerprintStatusFile":"","PodExclude":null,"ExcludeTerminalPods":false},"RTE":{"Debug":false,"ReferenceContainer":{"Namespace":"TEST_NS","PodName":"TEST_POD","ContainerName":"TEST_CONT"},"TopologyManagerPolicy":"","TopologyManagerScope":"","KubeletConfigFile":"/podresources/config.yaml","PodResourcesSocketPath":"unix:///podresources/kubelet.sock","SleepInterval":60000000000,"PodReadinessEnable":true,"NotifyFilePath":"","MaxEventsPerTimeUnit":1,"TimeUnitToLimitEvents":1000000000},"Version":false,"DumpConfig":""} \ No newline at end of file +{"NRTupdater":{"NoPublish":false,"Oneshot":false,"Hostname":"TEST_NODE"},"Resourcemonitor":{"Namespace":"","SysfsRoot":"/sys","ResourceExclude":null,"RefreshNodeResources":false,"PodSetFingerprint":true,"PodSetFingerprintMethod":"with-exclusive-resources","ExposeTiming":false,"PodSetFingerprintStatusFile":"","PodExclude":null,"ExcludeTerminalPods":false},"RTE":{"Debug":false,"ReferenceContainer":{"Namespace":"TEST_NS","PodName":"TEST_POD","ContainerName":"TEST_CONT"},"TopologyManagerPolicy":"","TopologyManagerScope":"","KubeletConfigFile":"/podresources/config.yaml","PodResourcesSocketPath":"unix:///podresources/kubelet.sock","SleepInterval":60000000000,"PodReadinessEnable":true,"NotifyFilePath":"","MaxEventsPerTimeUnit":1,"TimeUnitToLimitEvents":1000000000,"AddNRTOwnerEnable":true},"Version":false,"DumpConfig":""} \ No newline at end of file diff --git a/test/e2e/topology_updater/topology_updater.go b/test/e2e/topology_updater/topology_updater.go index 81de87ab..cf0dd808 100644 --- a/test/e2e/topology_updater/topology_updater.go +++ b/test/e2e/topology_updater/topology_updater.go @@ -24,6 +24,7 @@ package topology_updater import ( "context" "fmt" + "reflect" "time" "github.com/onsi/ginkgo/v2" @@ -96,6 +97,33 @@ var _ = ginkgo.Describe("[TopologyUpdater][InfraConsuming] Node topology updater }) ginkgo.Context("[release] with cluster configured", func() { + ginkgo.It("should have Node as Owner Reference", func() { + ginkgo.By("getting the initial topology information") + expectedOwnerReference := metav1.OwnerReference{ + Name: topologyUpdaterNode.Name, + APIVersion: "v1", + Kind: "Node", + UID: topologyUpdaterNode.UID, + } + gomega.Eventually(func() bool { + finalNodeTopo, err := f.TopoCli.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), topologyUpdaterNode.Name, metav1.GetOptions{}) + if err != nil { + klog.Infof("failed to get the node topology resource: %v", err) + return false + } + + if len(finalNodeTopo.OwnerReferences) != 1 { + klog.Infof("Too many OwnerReferences: expected=1; got=%d", len(finalNodeTopo.OwnerReferences)) + klog.Infof("%+#v", finalNodeTopo.OwnerReferences) + return false + } + if !reflect.DeepEqual(finalNodeTopo.OwnerReferences[0], expectedOwnerReference) { + klog.Infof("Unexpected OwnerReference: expected=%+#v; got=%+#v", expectedOwnerReference, finalNodeTopo.OwnerReferences[0]) + return false + } + return true + }).WithTimeout(5*timeout).WithPolling(5*time.Second).Should(gomega.BeTrue(), "didn't get updated node topology info") + }) ginkgo.It("it should not account for any cpus if a container doesn't request exclusive cpus (best effort QOS)", func() { devName := e2etestenv.GetDeviceName() diff --git a/tools/nrtstress/main.go b/tools/nrtstress/main.go index afa16408..c12c8333 100644 --- a/tools/nrtstress/main.go +++ b/tools/nrtstress/main.go @@ -53,6 +53,8 @@ func main() { Policy: tmPolicy, Scope: tmScope, } - upd := nrtupdater.NewNRTUpdater(nrtupdaterArgs, tmConf) + + nodeGetter := &nrtupdater.DisabledNodeGetter{} + upd := nrtupdater.NewNRTUpdater(nodeGetter, nrtupdaterArgs, tmConf) upd.Run(gen.Infos, nil) }