diff --git a/test/e2e/rte/rte.go b/test/e2e/rte/rte.go index 45dee5fc6..fb2b2f471 100644 --- a/test/e2e/rte/rte.go +++ b/test/e2e/rte/rte.go @@ -29,10 +29,12 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/kubernetes/test/e2e/framework" "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1" topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned" + "github.com/k8stopologyawareschedwg/podfingerprint" "github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/k8sannotations" "github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/nrtupdater" @@ -45,6 +47,10 @@ import ( e2etestenv "github.com/k8stopologyawareschedwg/resource-topology-exporter/test/e2e/utils/testenv" ) +const ( + updateIntervalExtraSafety = 10 * time.Second +) + var _ = ginkgo.Describe("[RTE][InfraConsuming] Resource topology exporter", func() { var ( initialized bool @@ -204,13 +210,167 @@ var _ = ginkgo.Describe("[RTE][InfraConsuming] Resource topology exporter", func <-doneChan gomega.Expect(finalNodeTopo.Annotations).ToNot(gomega.BeNil(), "missing annotations entirely") - reason := finalNodeTopo.Annotations[nrtupdater.AnnotationRTEUpdate] + reason := finalNodeTopo.Annotations[k8sannotations.RTEUpdate] gomega.Expect(reason).To(gomega.Equal(nrtupdater.RTEUpdateReactive), "update reason error: expected %q got %q", nrtupdater.RTEUpdateReactive, reason) }) + }) + ginkgo.Context("with pod fingerprinting enabled", func() { + ginkgo.It("[PodFingerprint] it should report stable value if the pods do not change", func() { + var err error + var currNrt *v1alpha1.NodeResourceTopology + prevNrt := e2enodetopology.GetNodeTopology(topologyClient, topologyUpdaterNode.Name) + + if _, ok := prevNrt.Annotations[podfingerprint.Annotation]; !ok { + ginkgo.Skip("pod fingerprinting not found - assuming disabled") + } + + dumpPods(f, topologyUpdaterNode.Name, "reference pods") + + updateInterval, err := estimateUpdateInterval(*prevNrt) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + framework.Logf("(estimated) update interval: %s", updateInterval) + + // 3 checks is "long enough" - decided after quick tuning and try/error + maxAttempts := 3 + for attempt := 0; attempt < maxAttempts; attempt++ { + framework.Logf("PFP stability check: %d/%d", attempt, maxAttempts) + + currNrt = getUpdatedNRT(topologyClient, topologyUpdaterNode.Name, *prevNrt, updateInterval) + + // note we don't test no pods have been added/deleted. This is because the suite is supposed to own the cluster while it runs + // IOW, if we don't create/delete pods explicitely, noone else is supposed to do + pfpStable := checkEqualPodFingerprintAnnotations(*prevNrt, *currNrt) + if !pfpStable { + dumpPods(f, topologyUpdaterNode.Name, "after PFP mismatch") + } + gomega.Expect(pfpStable).To(gomega.BeTrue(), "PFP changed across NRT updates") + + prevNrt = currNrt + } + }) + + ginkgo.It("[PodFingerprint] it should report updated value if the set of running pods changes", func() { + nodes, err := e2enodes.FilterNodesWithEnoughCores(workerNodes, "1000m") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + if len(nodes) < 1 { + ginkgo.Skip("not enough allocatable cores for this test") + } + + var currNrt *v1alpha1.NodeResourceTopology + prevNrt := e2enodetopology.GetNodeTopology(topologyClient, topologyUpdaterNode.Name) + + if _, ok := prevNrt.Annotations[podfingerprint.Annotation]; !ok { + ginkgo.Skip("pod fingerprinting not found - assuming disabled") + } + dumpPods(f, topologyUpdaterNode.Name, "reference pods") + + updateInterval, err := estimateUpdateInterval(*prevNrt) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + framework.Logf("(estimated) update interval: %s", updateInterval) + + sleeperPod := e2epods.MakeGuaranteedSleeperPod("1000m") + defer e2epods.Cooldown(f) + pod := f.PodClient().CreateSync(sleeperPod) + // removing it twice is no bother + defer e2epods.DeletePodSyncByName(f, pod.Name) + + currNrt = getUpdatedNRT(topologyClient, topologyUpdaterNode.Name, *prevNrt, updateInterval) + + pfpStable := checkEqualPodFingerprintAnnotations(*prevNrt, *currNrt) + errMessage := "PFP did not change after pod creation" + if pfpStable { + dumpPods(f, topologyUpdaterNode.Name, errMessage) + } + gomega.Expect(pfpStable).To(gomega.BeFalse(), errMessage) + + // since we need to delete the pod anyway, let's use this to run another check + prevNrt = currNrt + defer e2epods.DeletePodSyncByName(f, pod.Name) + + currNrt = getUpdatedNRT(topologyClient, topologyUpdaterNode.Name, *prevNrt, updateInterval) + + pfpStable = checkEqualPodFingerprintAnnotations(*prevNrt, *currNrt) + errMessage = "PFP did not change after pod deletion" + if pfpStable { + dumpPods(f, topologyUpdaterNode.Name, errMessage) + } + gomega.Expect(pfpStable).To(gomega.BeFalse(), errMessage) + }) }) }) +func getUpdatedNRT(topologyClient *topologyclientset.Clientset, nodeName string, prevNrt v1alpha1.NodeResourceTopology, timeout time.Duration) *v1alpha1.NodeResourceTopology { + var err error + var currNrt *v1alpha1.NodeResourceTopology + gomega.EventuallyWithOffset(1, func() bool { + currNrt, err = topologyClient.TopologyV1alpha1().NodeResourceTopologies().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + framework.Logf("failed to get the node topology resource: %v", err) + return false + } + if currNrt.ObjectMeta.ResourceVersion == prevNrt.ObjectMeta.ResourceVersion { + framework.Logf("resource %s not yet updated - resource version not bumped", nodeName) + return false + } + return true + }, timeout+updateIntervalExtraSafety, 1*time.Second).Should(gomega.BeTrue(), "didn't get updated node topology info") + return currNrt +} + +func dumpPods(f *framework.Framework, nodeName, message string) { + nodeSelector := fields.Set{ + "spec.nodeName": nodeName, + }.AsSelector().String() + + pods, err := f.ClientSet.CoreV1().Pods(e2etestenv.GetNamespaceName()).List(context.TODO(), metav1.ListOptions{FieldSelector: nodeSelector}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + framework.Logf("BEGIN pods running on %q: %s", nodeName, message) + for _, pod := range pods.Items { + framework.Logf("%s %s/%s annotations=%v status=%s (%s %s)", nodeName, pod.Namespace, pod.Name, pod.Annotations, pod.Status.Phase, pod.Status.Message, pod.Status.Reason) + } + framework.Logf("END pods running on %q: %s", nodeName, message) +} + +func checkEqualPodFingerprintAnnotations(nrt1, nrt2 v1alpha1.NodeResourceTopology) bool { + pfp1, ok1 := nrt1.Annotations[podfingerprint.Annotation] + if !ok1 { + framework.Logf("cannot find pod fingerprint annotation in NRT %q", nrt1.Name) + return false + } + + pfp2, ok2 := nrt2.Annotations[podfingerprint.Annotation] + if !ok2 { + framework.Logf("cannot find pod fingerprint annotation in NRT %q", nrt2.Name) + return false + } + + if pfp1 != pfp2 { + framework.Logf("fingerprint mismatch NRT %q PFP %q vs NRT %q PFP %q", nrt1.Name, pfp1, nrt2.Name, pfp2) + return false + } + return true +} + +func estimateUpdateInterval(nrt v1alpha1.NodeResourceTopology) (time.Duration, error) { + fallbackInterval, err := time.ParseDuration(e2etestenv.GetPollInterval()) + if err != nil { + return fallbackInterval, err + } + framework.Logf("Annotations for %q: %#v", nrt.Name, nrt.Annotations) + updateIntervalAnn, ok := nrt.Annotations[k8sannotations.UpdateInterval] + if !ok { + // no annotation, we need to guess + return fallbackInterval, nil + } + updateInterval, err := time.ParseDuration(updateIntervalAnn) + if err != nil { + return fallbackInterval, err + } + return updateInterval, nil +} + func execCommandInContainer(f *framework.Framework, namespace, podName, containerName string, cmd ...string) string { stdout, stderr, err := f.ExecWithOptions(framework.ExecOptions{ Command: cmd,