From 55fd34413892c4030f3f9a32867123bad5744867 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Fri, 8 Sep 2023 16:01:15 -0500 Subject: [PATCH] udpate --- pkg/multus/multus.go | 59 +++++++++++++------------------- pkg/multus/multus_cni100_test.go | 58 +++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 36 deletions(-) diff --git a/pkg/multus/multus.go b/pkg/multus/multus.go index 7a74fd5e4..d8a878074 100644 --- a/pkg/multus/multus.go +++ b/pkg/multus/multus.go @@ -495,24 +495,21 @@ func cmdPluginErr(k8sArgs *types.K8sArgs, confName string, format string, args . return logging.Errorf(msg+format, args...) } -func isCriticalRequestRetriable(err error, otherFn func(error) bool) bool { +func isCriticalRequestRetriable(err error, retryOnNotFound bool) bool { logging.Debugf("isCriticalRequestRetriable: %v", err) errorTypesAllowingRetry := []func(error) bool{ errors.IsServiceUnavailable, errors.IsInternalError, k8snet.IsConnectionReset, k8snet.IsConnectionRefused} - if otherFn != nil { - errorTypesAllowingRetry = append(errorTypesAllowingRetry, otherFn) - } for _, f := range errorTypesAllowingRetry { if f(err) { return true } } - return false + return retryOnNotFound && errors.IsNotFound(err) } // GetPod retrieves Kubernetes Pod object from given namespace/name in k8sArgs (i.e. cni args) // GetPod also get pod UID, but it is not used to retrieve, but it is used for double check -func GetPod(kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer, k8sArgs *types.K8sArgs, warnOnly bool) (*v1.Pod, error) { +func GetPod(kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer, k8sArgs *types.K8sArgs, isDel bool) (*v1.Pod, error) { if kubeClient == nil { return nil, nil } @@ -528,56 +525,46 @@ func GetPod(kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer, k logging.Debugf("GetPod for [%s/%s] took %v", podNamespace, podName, time.Since(start)) }() - var pod *v1.Pod - var err error - var retryErrFunc func(error) bool - - // Default to direct apiserver request + // Standard getting grabs pod directly from the apiserver podGetter := func(ns, name string) (*v1.Pod, error) { return kubeClient.GetPod(ns, name) } + // Use a fairly long 0.25 sec interval so we don't hammer the apiserver pollDuration := shortPollDuration + retryOnNotFound := false if podInformer != nil { logging.Debugf("GetPod for [%s/%s] will use informer cache", podNamespace, podName) - // Use the shared informer cache to reduce apiserver load + // If we have an informer get the pod from the informer cache podGetter = func(ns, name string) (*v1.Pod, error) { return listers.NewPodLister(podInformer.GetIndexer()).Pods(ns).Get(name) } - // NotFound is a retriable error since the cache may be a bit behind the apiserver - retryErrFunc = errors.IsNotFound - // We can poll the informer cache more frequently since it's local + // Use short retry intervals with the informer since it's a local cache pollDuration = informerPollDuration + // Retry NotFound on ADD since the cache may be a bit behind the apiserver + retryOnNotFound = !isDel } - pod, err = podGetter(podNamespace, podName) - if err != nil { - // in case of a retriable error, retry 10 times with 0.25 sec interval - if isCriticalRequestRetriable(err, retryErrFunc) { - waitErr := wait.PollImmediate(pollDuration, shortPollTimeout, func() (bool, error) { - pod, err = podGetter(podNamespace, podName) - if retryErrFunc != nil && retryErrFunc(err) { - return false, nil - } - return pod != nil, err - }) - // retry failed, then return error with retry out - if waitErr != nil { - return nil, cmdErr(k8sArgs, "error waiting for pod: %v", err) - } - } else if warnOnly && errors.IsNotFound(err) { - // If not found, proceed to remove interface with cache + var pod *v1.Pod + if err := wait.PollImmediate(pollDuration, shortPollTimeout, func() (bool, error) { + var getErr error + pod, getErr = podGetter(podNamespace, podName) + if isCriticalRequestRetriable(getErr, retryOnNotFound) { + return false, nil + } + return pod != nil, getErr + }); err != nil { + if isDel && errors.IsNotFound(err) { + // On DEL pod may already be gone from apiserver/informer return nil, nil - } else { - // Other case, return error - return nil, cmdErr(k8sArgs, "error getting pod: %v", err) } + return nil, cmdErr(k8sArgs, "error waiting for pod: %v", err) } // In case of static pod, UID through kube api is different because of mirror pod, hence it is expected. if podUID != "" && string(pod.UID) != podUID && !k8s.IsStaticPod(pod) { msg := fmt.Sprintf("expected pod UID %q but got %q from Kube API", podUID, pod.UID) - if warnOnly { + if isDel { // On CNI DEL we just operate on the cache when these mismatch, we don't error out. // For example: stateful sets namespace/name can remain the same while podUID changes. logging.Verbosef("warning: %s", msg) diff --git a/pkg/multus/multus_cni100_test.go b/pkg/multus/multus_cni100_test.go index d60b659e9..fca8aac5b 100644 --- a/pkg/multus/multus_cni100_test.go +++ b/pkg/multus/multus_cni100_test.go @@ -19,6 +19,7 @@ import ( "fmt" "os" "reflect" + "sync" "time" "github.com/containernetworking/cni/pkg/skel" @@ -923,6 +924,63 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) + It("executes clusterNetwork delegate with a shared informer if pod is not immediately found", func() { + fakePod := testhelpers.NewFakePod("testpod", "", "kube-system/net1") + net1 := `{ + "name": "net1", + "type": "mynet", + "cniVersion": "1.0.0" + }` + expectedResult1 := &cni100.Result{ + CNIVersion: "1.0.0", + IPs: []*cni100.IPConfig{{ + Address: *testhelpers.EnsureCIDR("1.1.1.2/24"), + }, + }, + } + args := &skel.CmdArgs{ + ContainerID: "123456789", + Netns: testNS.Path(), + IfName: "eth0", + Args: fmt.Sprintf("K8S_POD_NAME=%s;K8S_POD_NAMESPACE=%s", fakePod.ObjectMeta.Name, fakePod.ObjectMeta.Namespace), + StdinData: []byte(`{ + "name": "node-cni-network", + "type": "multus", + "kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml", + "defaultNetworks": [], + "clusterNetwork": "net1", + "delegates": [] + }`), + } + + fExec := newFakeExec() + fExec.addPlugin100(nil, "eth0", net1, expectedResult1, nil) + + fKubeClient := NewFakeClientInfo() + _, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1)) + Expect(err).NotTo(HaveOccurred()) + + podInformer := newPodInformer(ctx, fKubeClient.Client) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + wg.Done() + time.Sleep(1 * time.Second) + fKubeClient.AddPod(fakePod) + }() + wg.Wait() + + result, err := CmdAdd(args, fExec, fKubeClient, podInformer) + Expect(err).NotTo(HaveOccurred()) + Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) + Expect(reflect.DeepEqual(result, expectedResult1)).To(BeTrue()) + + err = CmdDel(args, fExec, fKubeClient, podInformer) + Expect(err).NotTo(HaveOccurred()) + Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) + }) + It("Verify the cache is created in dataDir", func() { tmpCNIDir := tmpDir + "/cniData" err := os.Mkdir(tmpCNIDir, 0777)