diff --git a/cmd/multus/main.go b/cmd/multus/main.go index 5e9b6440e..777d62e3a 100644 --- a/cmd/multus/main.go +++ b/cmd/multus/main.go @@ -45,7 +45,7 @@ func main() { skel.PluginMain( func(args *skel.CmdArgs) error { - result, err := multus.CmdAdd(args, nil, nil) + result, err := multus.CmdAdd(args, nil, nil, nil) if err != nil { return err } @@ -54,6 +54,6 @@ func main() { func(args *skel.CmdArgs) error { return multus.CmdCheck(args, nil, nil) }, - func(args *skel.CmdArgs) error { return multus.CmdDel(args, nil, nil) }, + func(args *skel.CmdArgs) error { return multus.CmdDel(args, nil, nil, nil) }, cniversion.All, "meta-plugin that delegates to other CNI plugins") } diff --git a/pkg/multus/multus.go b/pkg/multus/multus.go index 6238b1d8d..16768f063 100644 --- a/pkg/multus/multus.go +++ b/pkg/multus/multus.go @@ -38,6 +38,8 @@ import ( "k8s.io/apimachinery/pkg/api/errors" k8snet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/wait" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" k8s "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient" "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging" @@ -46,8 +48,9 @@ import ( ) const ( - shortPollDuration = 250 * time.Millisecond - shortPollTimeout = 2500 * time.Millisecond + shortPollDuration = 250 * time.Millisecond + informerPollDuration = 50 * time.Millisecond + shortPollTimeout = 2500 * time.Millisecond ) var ( @@ -506,7 +509,7 @@ func isCriticalRequestRetriable(err error) bool { // GetPod retrieves Kubernetes Pod object from given namespace/name in k8sArgs (i.e. cni args) // GetPod also get pod UID, but it is not used to retrieve, but it is used for double check -func GetPod(kubeClient *k8s.ClientInfo, k8sArgs *types.K8sArgs, warnOnly bool) (*v1.Pod, error) { +func GetPod(kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer, k8sArgs *types.K8sArgs, isDel bool) (*v1.Pod, error) { if kubeClient == nil { return nil, nil } @@ -515,31 +518,57 @@ func GetPod(kubeClient *k8s.ClientInfo, k8sArgs *types.K8sArgs, warnOnly bool) ( podName := string(k8sArgs.K8S_POD_NAME) podUID := string(k8sArgs.K8S_POD_UID) - pod, err := kubeClient.GetPod(podNamespace, podName) - if err != nil { - // in case of a retriable error, retry 10 times with 0.25 sec interval - if isCriticalRequestRetriable(err) { - waitErr := wait.PollImmediate(shortPollDuration, shortPollTimeout, func() (bool, error) { - pod, err = kubeClient.GetPod(podNamespace, podName) - return pod != nil, err - }) - // retry failed, then return error with retry out - if waitErr != nil { - return nil, cmdErr(k8sArgs, "error waiting for pod: %v", err) - } - } else if warnOnly && errors.IsNotFound(err) { - // If not found, proceed to remove interface with cache + // Keep track of how long getting the pod takes + logging.Debugf("GetPod for [%s/%s] starting", podNamespace, podName) + start := time.Now() + defer func() { + logging.Debugf("GetPod for [%s/%s] took %v", podNamespace, podName, time.Since(start)) + }() + + // Standard getter grabs pod directly from the apiserver + podGetter := func(ns, name string) (*v1.Pod, error) { + return kubeClient.GetPod(ns, name) + } + // Use a fairly long 0.25 sec interval so we don't hammer the apiserver + pollDuration := shortPollDuration + retryOnNotFound := func(error) bool { + return false + } + + if podInformer != nil { + logging.Debugf("GetPod for [%s/%s] will use informer cache", podNamespace, podName) + // If we have an informer get the pod from the informer cache + podGetter = func(ns, name string) (*v1.Pod, error) { + return listers.NewPodLister(podInformer.GetIndexer()).Pods(ns).Get(name) + } + // Use short retry intervals with the informer since it's a local cache + pollDuration = informerPollDuration + // Retry NotFound on ADD since the cache may be a bit behind the apiserver + retryOnNotFound = func(e error) bool { + return !isDel && errors.IsNotFound(e) + } + } + + var pod *v1.Pod + if err := wait.PollImmediate(pollDuration, shortPollTimeout, func() (bool, error) { + var getErr error + pod, getErr = podGetter(podNamespace, podName) + if isCriticalRequestRetriable(getErr) || retryOnNotFound(getErr) { + return false, nil + } + return pod != nil, getErr + }); err != nil { + if isDel && errors.IsNotFound(err) { + // On DEL pod may already be gone from apiserver/informer return nil, nil - } else { - // Other case, return error - return nil, cmdErr(k8sArgs, "error getting pod: %v", err) } + return nil, cmdErr(k8sArgs, "error waiting for pod: %v", err) } // In case of static pod, UID through kube api is different because of mirror pod, hence it is expected. if podUID != "" && string(pod.UID) != podUID && !k8s.IsStaticPod(pod) { msg := fmt.Sprintf("expected pod UID %q but got %q from Kube API", podUID, pod.UID) - if warnOnly { + if isDel { // On CNI DEL we just operate on the cache when these mismatch, we don't error out. // For example: stateful sets namespace/name can remain the same while podUID changes. logging.Verbosef("warning: %s", msg) @@ -552,7 +581,7 @@ func GetPod(kubeClient *k8s.ClientInfo, k8sArgs *types.K8sArgs, warnOnly bool) ( } // CmdAdd ... -func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (cnitypes.Result, error) { +func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer) (cnitypes.Result, error) { n, err := types.LoadNetConf(args.StdinData) logging.Debugf("CmdAdd: %v, %v, %v", args, exec, kubeClient) if err != nil { @@ -575,7 +604,7 @@ func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c } } - pod, err := GetPod(kubeClient, k8sArgs, false) + pod, err := GetPod(kubeClient, podInformer, k8sArgs, false) if err != nil { return nil, err } @@ -772,7 +801,7 @@ func CmdCheck(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) } // CmdDel ... -func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) error { +func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer) error { in, err := types.LoadNetConf(args.StdinData) logging.Debugf("CmdDel: %v, %v, %v", args, exec, kubeClient) if err != nil { @@ -814,7 +843,7 @@ func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) er return cmdErr(nil, "error getting k8s client: %v", err) } - pod, err := GetPod(kubeClient, k8sArgs, true) + pod, err := GetPod(kubeClient, podInformer, k8sArgs, true) if err != nil { // GetPod may be failed but just do print error in its log and continue to delete logging.Errorf("Multus: GetPod failed: %v, but continue to delete", err) diff --git a/pkg/multus/multus_cni020_test.go b/pkg/multus/multus_cni020_test.go index 8868677d3..2d53ec1c7 100644 --- a/pkg/multus/multus_cni020_test.go +++ b/pkg/multus/multus_cni020_test.go @@ -142,14 +142,14 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { }` fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*types020.Result) // plugin 1 is the masterplugin Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -203,14 +203,14 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { }` fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*types020.Result) // plugin 1 is the masterplugin Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -265,7 +265,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { }` fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).To(MatchError("[//:weave1]: error adding container to network \"weave1\": DelegateAdd: cannot set \"weave-net\" interface name to \"eth0\": validateIfName: no net namespace fsdadfad found: failed to Statfs \"fsdadfad\": no such file or directory")) }) @@ -319,10 +319,10 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { }` fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).To(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).To(HaveOccurred()) }) @@ -363,7 +363,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { err := fmt.Errorf("expected plugin failure") fExec.addPlugin020(nil, "net1", expectedConf2, nil, err) - _, err = CmdAdd(args, fExec, nil) + _, err = CmdAdd(args, fExec, nil, nil) Expect(fExec.addIndex).To(Equal(2)) Expect(fExec.delIndex).To(Equal(2)) Expect(err).To(MatchError("[//:other1]: error adding container to network \"other1\": expected plugin failure")) @@ -409,7 +409,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { err := fmt.Errorf("expected plugin failure") fExec.addPlugin020(nil, "net1", expectedConf2, nil, err) - _, err = CmdAdd(args, fExec, nil) + _, err = CmdAdd(args, fExec, nil, nil) Expect(fExec.addIndex).To(Equal(1)) Expect(fExec.delIndex).To(Equal(2)) Expect(err).To(HaveOccurred()) @@ -491,7 +491,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net3", net3)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*types020.Result) @@ -557,7 +557,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*types020.Result) @@ -568,7 +568,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { err = fKubeClient.DeletePod(fakePod.ObjectMeta.Namespace, fakePod.ObjectMeta.Name) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -609,13 +609,13 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { _, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*types020.Result) Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -674,7 +674,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { _, err = fKubeClient.AddNetAttachDef( testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*types020.Result) @@ -687,7 +687,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { Expect(err).NotTo(HaveOccurred()) By("Delete and check net count is not incremented") - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -746,7 +746,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { _, err = fKubeClient.AddNetAttachDef( testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*types020.Result) @@ -762,7 +762,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { Expect(err).NotTo(HaveOccurred()) By("Delete and check pod/net count is incremented") - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) diff --git a/pkg/multus/multus_cni040_test.go b/pkg/multus/multus_cni040_test.go index 86ff62689..3e9f8f397 100644 --- a/pkg/multus/multus_cni040_test.go +++ b/pkg/multus/multus_cni040_test.go @@ -121,14 +121,14 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { }` fExec.addPlugin040(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni040.Result) // plugin 1 is the masterplugin Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -239,7 +239,7 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni040.Result) @@ -296,14 +296,14 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { }` fExec.addPlugin040(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni040.Result) // plugin 1 is the masterplugin Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -342,7 +342,7 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - _, err = CmdAdd(args, newFakeExec(), clientInfo) + _, err = CmdAdd(args, newFakeExec(), clientInfo, nil) Expect(err.Error()).To(ContainSubstring("expected pod UID \"foobar\" but got %q from Kube API", fakePod.UID)) }) @@ -406,7 +406,7 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni040.Result) @@ -458,7 +458,7 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { context.TODO(), fakePod, metav1.CreateOptions{}) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni040.Result) @@ -510,7 +510,7 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { context.TODO(), fakePod, metav1.CreateOptions{}) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni040.Result) @@ -557,7 +557,7 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { } }` fExec.addPlugin040(nil, "eth0", expectedConf1, nil, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) }) @@ -600,9 +600,9 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { } }` fExec.addPlugin040(nil, "eth0", expectedConf1, nil, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) }) }) @@ -696,7 +696,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { }` fExec.addPlugin040(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) @@ -706,7 +706,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { err = CmdCheck(args, fExec, nil) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -763,7 +763,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { }` fExec.addPlugin040(nil, "net1", expectedConf2, expectedResult2, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).To(MatchError("[//:weave1]: error adding container to network \"weave1\": DelegateAdd: cannot set \"weave-net\" interface name to \"eth0\": validateIfName: no net namespace fsdadfad found: failed to Statfs \"fsdadfad\": no such file or directory")) }) @@ -828,7 +828,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { }` fExec.addPlugin040(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) @@ -838,7 +838,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { err = CmdCheck(args, fExec, nil) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -895,10 +895,10 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { }` fExec.addPlugin040(nil, "net1", expectedConf2, expectedResult2, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).To(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).To(HaveOccurred()) }) @@ -940,7 +940,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { err := fmt.Errorf("expected plugin failure") fExec.addPlugin040(nil, "net1", expectedConf2, nil, err) - _, err = CmdAdd(args, fExec, nil) + _, err = CmdAdd(args, fExec, nil, nil) Expect(fExec.addIndex).To(Equal(2)) Expect(fExec.delIndex).To(Equal(2)) Expect(err).To(MatchError("[//:other1]: error adding container to network \"other1\": expected plugin failure")) @@ -986,7 +986,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { err := fmt.Errorf("missing network name") fExec.addPlugin040(nil, "net1", expectedConf2, nil, err) - _, err = CmdAdd(args, fExec, nil) + _, err = CmdAdd(args, fExec, nil, nil) Expect(fExec.addIndex).To(Equal(1)) Expect(fExec.delIndex).To(Equal(1)) Expect(err).To(HaveOccurred()) @@ -1098,7 +1098,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni040.Result) @@ -1186,7 +1186,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net3", net3)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -1247,7 +1247,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -1256,7 +1256,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { // set fKubeClient to nil to emulate no pod info err = clientInfo.DeletePod(fakePod.ObjectMeta.Namespace, fakePod.ObjectMeta.Name) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, clientInfo) + err = CmdDel(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -1300,7 +1300,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { } }` fExec.addPlugin040(nil, "eth0", expectedConf1, nil, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) }) @@ -1341,12 +1341,12 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { _, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) Expect(reflect.DeepEqual(result, expectedResult1)).To(BeTrue()) - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -1407,7 +1407,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { _, err = fKubeClient.AddNetAttachDef( testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -1419,7 +1419,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { Expect(err).NotTo(HaveOccurred()) By("Delete and check net count is not incremented") - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -1480,7 +1480,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { _, err = fKubeClient.AddNetAttachDef( testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -1495,7 +1495,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { Expect(err).NotTo(HaveOccurred()) By("Delete and check pod/net count is incremented") - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) diff --git a/pkg/multus/multus_cni100_test.go b/pkg/multus/multus_cni100_test.go index e193f4a56..fca8aac5b 100644 --- a/pkg/multus/multus_cni100_test.go +++ b/pkg/multus/multus_cni100_test.go @@ -15,9 +15,12 @@ package multus import ( + "context" "fmt" "os" "reflect" + "sync" + "time" "github.com/containernetworking/cni/pkg/skel" cni100 "github.com/containernetworking/cni/pkg/types/100" @@ -30,13 +33,44 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + + kapi "k8s.io/api/core/v1" + informerfactory "k8s.io/client-go/informers" + v1coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" ) +func newPodInformer(ctx context.Context, kclient kubernetes.Interface) cache.SharedIndexInformer { + informerFactory := informerfactory.NewSharedInformerFactory(kclient, 0 * time.Second) + + podInformer := informerFactory.InformerFor(&kapi.Pod{}, func(c kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return v1coreinformers.NewFilteredPodInformer( + c, + kapi.NamespaceAll, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + nil) + }) + + informerFactory.Start(ctx.Done()) + + waitCtx, waitCancel := context.WithTimeout(ctx, 20 * time.Second) + if !cache.WaitForCacheSync(waitCtx.Done(), podInformer.HasSynced) { + logging.Errorf("failed to sync pod informer cache") + } + waitCancel() + + return podInformer +} + var _ = Describe("multus operations cniVersion 1.0.0 config", func() { var testNS ns.NetNS var tmpDir string resultCNIVersion := "1.0.0" configPath := "/tmp/foo.multus.conf" + var ctx context.Context + var cancel context.CancelFunc BeforeEach(func() { // Create a new NetNS so we don't modify the host @@ -52,9 +86,12 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { // Touch the default network file. os.OpenFile(configPath, os.O_RDONLY|os.O_CREATE, 0755) + ctx, cancel = context.WithCancel(context.TODO()) }) AfterEach(func() { + cancel() + // Cleanup default network file. if _, errStat := os.Stat(configPath); errStat == nil { errRemove := os.Remove(configPath) @@ -121,7 +158,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { }` fExec.addPlugin100(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) @@ -131,7 +168,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { err = CmdCheck(args, fExec, nil) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -188,7 +225,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { }` fExec.addPlugin100(nil, "net1", expectedConf2, expectedResult2, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).To(MatchError("[//:weave1]: error adding container to network \"weave1\": DelegateAdd: cannot set \"weave-net\" interface name to \"eth0\": validateIfName: no net namespace fsdadfad found: failed to Statfs \"fsdadfad\": no such file or directory")) }) @@ -242,7 +279,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { }` fExec.addPlugin100(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) @@ -252,7 +289,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { err = CmdCheck(args, fExec, nil) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -318,7 +355,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { }` fExec.addPlugin100(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) @@ -328,7 +365,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { err = CmdCheck(args, fExec, nil) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -385,10 +422,10 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { }` fExec.addPlugin100(nil, "net1", expectedConf2, expectedResult2, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).To(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).To(HaveOccurred()) }) @@ -430,7 +467,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { err := fmt.Errorf("expected plugin failure") fExec.addPlugin100(nil, "net1", expectedConf2, nil, err) - _, err = CmdAdd(args, fExec, nil) + _, err = CmdAdd(args, fExec, nil, nil) Expect(fExec.addIndex).To(Equal(2)) Expect(fExec.delIndex).To(Equal(2)) Expect(err).To(MatchError("[//:other1]: error adding container to network \"other1\": expected plugin failure")) @@ -476,7 +513,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { err := fmt.Errorf("missing network name") fExec.addPlugin100(nil, "net1", expectedConf2, nil, err) - _, err = CmdAdd(args, fExec, nil) + _, err = CmdAdd(args, fExec, nil, nil) Expect(fExec.addIndex).To(Equal(1)) Expect(fExec.delIndex).To(Equal(1)) Expect(err).To(HaveOccurred()) @@ -587,7 +624,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni100.Result) @@ -674,7 +711,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net3", net3)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -735,7 +772,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -743,7 +780,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { // set fKubeClient to nil to emulate no pod info clientInfo.DeletePod(fakePod.ObjectMeta.Namespace, fakePod.ObjectMeta.Name) - err = CmdDel(args, fExec, clientInfo) + err = CmdDel(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -787,7 +824,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { } }` fExec.addPlugin100(nil, "eth0", expectedConf1, nil, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) }) @@ -828,12 +865,118 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { _, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) + Expect(reflect.DeepEqual(result, expectedResult1)).To(BeTrue()) + + err = CmdDel(args, fExec, fKubeClient, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) + }) + + It("executes clusterNetwork delegate with a shared informer", func() { + fakePod := testhelpers.NewFakePod("testpod", "", "kube-system/net1") + net1 := `{ + "name": "net1", + "type": "mynet", + "cniVersion": "1.0.0" + }` + expectedResult1 := &cni100.Result{ + CNIVersion: "1.0.0", + IPs: []*cni100.IPConfig{{ + Address: *testhelpers.EnsureCIDR("1.1.1.2/24"), + }, + }, + } + args := &skel.CmdArgs{ + ContainerID: "123456789", + Netns: testNS.Path(), + IfName: "eth0", + Args: fmt.Sprintf("K8S_POD_NAME=%s;K8S_POD_NAMESPACE=%s", fakePod.ObjectMeta.Name, fakePod.ObjectMeta.Namespace), + StdinData: []byte(`{ + "name": "node-cni-network", + "type": "multus", + "kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml", + "defaultNetworks": [], + "clusterNetwork": "net1", + "delegates": [] + }`), + } + + fExec := newFakeExec() + fExec.addPlugin100(nil, "eth0", net1, expectedResult1, nil) + + fKubeClient := NewFakeClientInfo() + fKubeClient.AddPod(fakePod) + _, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1)) + Expect(err).NotTo(HaveOccurred()) + + podInformer := newPodInformer(ctx, fKubeClient.Client) + + result, err := CmdAdd(args, fExec, fKubeClient, podInformer) + Expect(err).NotTo(HaveOccurred()) + Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) + Expect(reflect.DeepEqual(result, expectedResult1)).To(BeTrue()) + + err = CmdDel(args, fExec, fKubeClient, podInformer) + Expect(err).NotTo(HaveOccurred()) + Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) + }) + + It("executes clusterNetwork delegate with a shared informer if pod is not immediately found", func() { + fakePod := testhelpers.NewFakePod("testpod", "", "kube-system/net1") + net1 := `{ + "name": "net1", + "type": "mynet", + "cniVersion": "1.0.0" + }` + expectedResult1 := &cni100.Result{ + CNIVersion: "1.0.0", + IPs: []*cni100.IPConfig{{ + Address: *testhelpers.EnsureCIDR("1.1.1.2/24"), + }, + }, + } + args := &skel.CmdArgs{ + ContainerID: "123456789", + Netns: testNS.Path(), + IfName: "eth0", + Args: fmt.Sprintf("K8S_POD_NAME=%s;K8S_POD_NAMESPACE=%s", fakePod.ObjectMeta.Name, fakePod.ObjectMeta.Namespace), + StdinData: []byte(`{ + "name": "node-cni-network", + "type": "multus", + "kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml", + "defaultNetworks": [], + "clusterNetwork": "net1", + "delegates": [] + }`), + } + + fExec := newFakeExec() + fExec.addPlugin100(nil, "eth0", net1, expectedResult1, nil) + + fKubeClient := NewFakeClientInfo() + _, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1)) + Expect(err).NotTo(HaveOccurred()) + + podInformer := newPodInformer(ctx, fKubeClient.Client) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + wg.Done() + time.Sleep(1 * time.Second) + fKubeClient.AddPod(fakePod) + }() + wg.Wait() + + result, err := CmdAdd(args, fExec, fKubeClient, podInformer) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) Expect(reflect.DeepEqual(result, expectedResult1)).To(BeTrue()) - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, podInformer) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -894,7 +1037,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { _, err = fKubeClient.AddNetAttachDef( testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -906,7 +1049,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { Expect(err).NotTo(HaveOccurred()) By("Delete and check net count is not incremented") - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -967,7 +1110,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { _, err = fKubeClient.AddNetAttachDef( testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -982,7 +1125,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { Expect(err).NotTo(HaveOccurred()) By("Delete and check pod/net count is incremented") - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) diff --git a/pkg/server/server.go b/pkg/server/server.go index f12382569..53e5dea7f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -24,6 +24,7 @@ import ( "net/http" "os" "strings" + "time" "github.com/containernetworking/cni/pkg/invoke" "github.com/containernetworking/cni/pkg/skel" @@ -40,8 +41,16 @@ import ( "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/config" "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/types" + kapi "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilwait "k8s.io/apimachinery/pkg/util/wait" + informerfactory "k8s.io/client-go/informers" + v1coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/informers/internalinterfaces" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" ) const ( @@ -136,6 +145,32 @@ func GetListener(socketPath string) (net.Listener, error) { return l, nil } +func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internalinterfaces.SharedInformerFactory, cache.SharedIndexInformer) { + var tweakFunc internalinterfaces.TweakListOptionsFunc + if nodeName != "" { + // Only watch for local pods + tweakFunc = func(opts *metav1.ListOptions) { + opts.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeName).String() + } + } + + // Multus only watches pods so there's no possibility of race conditions + // between multiple resources that might require a resync to resolve + const resyncInterval time.Duration = 0 * time.Second + + informerFactory := informerfactory.NewSharedInformerFactory(kubeClient, resyncInterval) + podInformer := informerFactory.InformerFor(&kapi.Pod{}, func(c kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return v1coreinformers.NewFilteredPodInformer( + c, + kapi.NamespaceAll, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + tweakFunc) + }) + + return informerFactory, podInformer +} + // NewCNIServer creates and returns a new Server object which will listen on a socket in the given path func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte) (*Server, error) { kubeClient, err := k8s.InClusterK8sClient() @@ -165,6 +200,8 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s servConfig = bytes.Replace(servConfig, []byte("{"), []byte(","), 1) } + informerFactory, podInformer := newPodInformer(kubeClient.Client, os.Getenv("MULTUS_NODE_NAME")) + router := http.NewServeMux() s := &Server{ Server: http.Server{ @@ -183,6 +220,8 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s []string{"handler", "code", "method"}, ), }, + informerFactory: informerFactory, + podInformer: podInformer, } s.SetKeepAlivesEnabled(false) @@ -257,6 +296,16 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s // Start starts the server and begins serving on the given listener func (s *Server) Start(ctx context.Context, l net.Listener) { + s.informerFactory.Start(ctx.Done()) + + // Give the initial sync some time to complete in large clusters, but + // don't wait forever + waitCtx, waitCancel := context.WithTimeout(ctx, 20*time.Second) + if !cache.WaitForCacheSync(waitCtx.Done(), s.podInformer.HasSynced) { + logging.Errorf("failed to sync pod informer cache") + } + waitCancel() + go func() { utilwait.UntilWithContext(ctx, func(ctx context.Context) { logging.Debugf("open for business") @@ -440,7 +489,7 @@ func (s *Server) cmdAdd(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs) ([]byte, } logging.Debugf("CmdAdd for [%s/%s]. CNI conf: %+v", namespace, podName, *cmdArgs) - result, err := multus.CmdAdd(cmdArgs, s.exec, s.kubeclient) + result, err := multus.CmdAdd(cmdArgs, s.exec, s.kubeclient, s.podInformer) if err != nil { return nil, fmt.Errorf("error configuring pod [%s/%s] networking: %v", namespace, podName, err) } @@ -455,7 +504,7 @@ func (s *Server) cmdDel(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs) error { } logging.Debugf("CmdDel for [%s/%s]. CNI conf: %+v", namespace, podName, *cmdArgs) - return multus.CmdDel(cmdArgs, s.exec, s.kubeclient) + return multus.CmdDel(cmdArgs, s.exec, s.kubeclient, s.podInformer) } func (s *Server) cmdCheck(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs) error { @@ -489,7 +538,7 @@ func (s *Server) cmdDelegateAdd(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs, m if namespace == "" || podName == "" { return nil, fmt.Errorf("required CNI variable missing. pod name: %s; pod namespace: %s", podName, namespace) } - pod, err := multus.GetPod(s.kubeclient, k8sArgs, false) + pod, err := multus.GetPod(s.kubeclient, s.podInformer, k8sArgs, false) if err != nil { return nil, err } @@ -543,7 +592,7 @@ func (s *Server) cmdDelegateDel(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs, m if namespace == "" || podName == "" { return fmt.Errorf("required CNI variable missing. pod name: %s; pod namespace: %s", podName, namespace) } - pod, err := multus.GetPod(s.kubeclient, k8sArgs, false) + pod, err := multus.GetPod(s.kubeclient, s.podInformer, k8sArgs, false) if err != nil { return err } diff --git a/pkg/server/types.go b/pkg/server/types.go index d24ad7b76..9dd318352 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -22,6 +22,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient" + + "k8s.io/client-go/informers/internalinterfaces" + "k8s.io/client-go/tools/cache" ) const ( @@ -42,11 +45,13 @@ type Metrics struct { // the CNI shim requests issued when a pod is added / removed. type Server struct { http.Server - rundir string - kubeclient *k8sclient.ClientInfo - exec invoke.Exec - serverConfig []byte - metrics *Metrics + rundir string + kubeclient *k8sclient.ClientInfo + exec invoke.Exec + serverConfig []byte + metrics *Metrics + informerFactory internalinterfaces.SharedInformerFactory + podInformer cache.SharedIndexInformer } // ControllerNetConf for the controller cni configuration