From 235783a8798b21f06725736b23d320768060f9b1 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Fri, 8 Sep 2023 11:26:10 -0500 Subject: [PATCH] server: use a shared informer pod cache rather than direct apiserver access When running in server mode we can use a shared informer to listen for Pod events from the apiserver, and grab pod info from that cache rather than doing direct apiserver requests each time. This reduces apiserver load and retry latency, since multus can poll the local cache more frequently than it should do direct apiserver requests. Signed-off-by: Dan Williams --- cmd/multus/main.go | 4 +- pkg/multus/multus.go | 62 ++++++++++++--- pkg/multus/multus_cni020_test.go | 36 ++++----- pkg/multus/multus_cni040_test.go | 64 +++++++-------- pkg/multus/multus_cni100_test.go | 129 +++++++++++++++++++++++++------ pkg/server/server.go | 57 +++++++++++++- pkg/server/types.go | 15 ++-- 7 files changed, 272 insertions(+), 95 deletions(-) diff --git a/cmd/multus/main.go b/cmd/multus/main.go index 5e9b6440e..777d62e3a 100644 --- a/cmd/multus/main.go +++ b/cmd/multus/main.go @@ -45,7 +45,7 @@ func main() { skel.PluginMain( func(args *skel.CmdArgs) error { - result, err := multus.CmdAdd(args, nil, nil) + result, err := multus.CmdAdd(args, nil, nil, nil) if err != nil { return err } @@ -54,6 +54,6 @@ func main() { func(args *skel.CmdArgs) error { return multus.CmdCheck(args, nil, nil) }, - func(args *skel.CmdArgs) error { return multus.CmdDel(args, nil, nil) }, + func(args *skel.CmdArgs) error { return multus.CmdDel(args, nil, nil, nil) }, cniversion.All, "meta-plugin that delegates to other CNI plugins") } diff --git a/pkg/multus/multus.go b/pkg/multus/multus.go index 6238b1d8d..7a74fd5e4 100644 --- a/pkg/multus/multus.go +++ b/pkg/multus/multus.go @@ -38,6 +38,8 @@ import ( "k8s.io/apimachinery/pkg/api/errors" k8snet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/wait" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" k8s "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient" "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging" @@ -46,8 +48,9 @@ import ( ) const ( - shortPollDuration = 250 * time.Millisecond - shortPollTimeout = 2500 * time.Millisecond + shortPollDuration = 250 * time.Millisecond + informerPollDuration = 50 * time.Millisecond + shortPollTimeout = 2500 * time.Millisecond ) var ( @@ -492,10 +495,13 @@ func cmdPluginErr(k8sArgs *types.K8sArgs, confName string, format string, args . return logging.Errorf(msg+format, args...) } -func isCriticalRequestRetriable(err error) bool { +func isCriticalRequestRetriable(err error, otherFn func(error) bool) bool { logging.Debugf("isCriticalRequestRetriable: %v", err) errorTypesAllowingRetry := []func(error) bool{ errors.IsServiceUnavailable, errors.IsInternalError, k8snet.IsConnectionReset, k8snet.IsConnectionRefused} + if otherFn != nil { + errorTypesAllowingRetry = append(errorTypesAllowingRetry, otherFn) + } for _, f := range errorTypesAllowingRetry { if f(err) { return true @@ -506,7 +512,7 @@ func isCriticalRequestRetriable(err error) bool { // GetPod retrieves Kubernetes Pod object from given namespace/name in k8sArgs (i.e. cni args) // GetPod also get pod UID, but it is not used to retrieve, but it is used for double check -func GetPod(kubeClient *k8s.ClientInfo, k8sArgs *types.K8sArgs, warnOnly bool) (*v1.Pod, error) { +func GetPod(kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer, k8sArgs *types.K8sArgs, warnOnly bool) (*v1.Pod, error) { if kubeClient == nil { return nil, nil } @@ -515,12 +521,44 @@ func GetPod(kubeClient *k8s.ClientInfo, k8sArgs *types.K8sArgs, warnOnly bool) ( podName := string(k8sArgs.K8S_POD_NAME) podUID := string(k8sArgs.K8S_POD_UID) - pod, err := kubeClient.GetPod(podNamespace, podName) + // Keep track of how long getting the pod takes + logging.Debugf("GetPod for [%s/%s] starting", podNamespace, podName) + start := time.Now() + defer func() { + logging.Debugf("GetPod for [%s/%s] took %v", podNamespace, podName, time.Since(start)) + }() + + var pod *v1.Pod + var err error + var retryErrFunc func(error) bool + + // Default to direct apiserver request + podGetter := func(ns, name string) (*v1.Pod, error) { + return kubeClient.GetPod(ns, name) + } + pollDuration := shortPollDuration + + if podInformer != nil { + logging.Debugf("GetPod for [%s/%s] will use informer cache", podNamespace, podName) + // Use the shared informer cache to reduce apiserver load + podGetter = func(ns, name string) (*v1.Pod, error) { + return listers.NewPodLister(podInformer.GetIndexer()).Pods(ns).Get(name) + } + // NotFound is a retriable error since the cache may be a bit behind the apiserver + retryErrFunc = errors.IsNotFound + // We can poll the informer cache more frequently since it's local + pollDuration = informerPollDuration + } + + pod, err = podGetter(podNamespace, podName) if err != nil { // in case of a retriable error, retry 10 times with 0.25 sec interval - if isCriticalRequestRetriable(err) { - waitErr := wait.PollImmediate(shortPollDuration, shortPollTimeout, func() (bool, error) { - pod, err = kubeClient.GetPod(podNamespace, podName) + if isCriticalRequestRetriable(err, retryErrFunc) { + waitErr := wait.PollImmediate(pollDuration, shortPollTimeout, func() (bool, error) { + pod, err = podGetter(podNamespace, podName) + if retryErrFunc != nil && retryErrFunc(err) { + return false, nil + } return pod != nil, err }) // retry failed, then return error with retry out @@ -552,7 +590,7 @@ func GetPod(kubeClient *k8s.ClientInfo, k8sArgs *types.K8sArgs, warnOnly bool) ( } // CmdAdd ... -func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (cnitypes.Result, error) { +func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer) (cnitypes.Result, error) { n, err := types.LoadNetConf(args.StdinData) logging.Debugf("CmdAdd: %v, %v, %v", args, exec, kubeClient) if err != nil { @@ -575,7 +613,7 @@ func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c } } - pod, err := GetPod(kubeClient, k8sArgs, false) + pod, err := GetPod(kubeClient, podInformer, k8sArgs, false) if err != nil { return nil, err } @@ -772,7 +810,7 @@ func CmdCheck(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) } // CmdDel ... -func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) error { +func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer) error { in, err := types.LoadNetConf(args.StdinData) logging.Debugf("CmdDel: %v, %v, %v", args, exec, kubeClient) if err != nil { @@ -814,7 +852,7 @@ func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) er return cmdErr(nil, "error getting k8s client: %v", err) } - pod, err := GetPod(kubeClient, k8sArgs, true) + pod, err := GetPod(kubeClient, podInformer, k8sArgs, true) if err != nil { // GetPod may be failed but just do print error in its log and continue to delete logging.Errorf("Multus: GetPod failed: %v, but continue to delete", err) diff --git a/pkg/multus/multus_cni020_test.go b/pkg/multus/multus_cni020_test.go index 8868677d3..2d53ec1c7 100644 --- a/pkg/multus/multus_cni020_test.go +++ b/pkg/multus/multus_cni020_test.go @@ -142,14 +142,14 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { }` fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*types020.Result) // plugin 1 is the masterplugin Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -203,14 +203,14 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { }` fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*types020.Result) // plugin 1 is the masterplugin Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -265,7 +265,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { }` fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).To(MatchError("[//:weave1]: error adding container to network \"weave1\": DelegateAdd: cannot set \"weave-net\" interface name to \"eth0\": validateIfName: no net namespace fsdadfad found: failed to Statfs \"fsdadfad\": no such file or directory")) }) @@ -319,10 +319,10 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { }` fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).To(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).To(HaveOccurred()) }) @@ -363,7 +363,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { err := fmt.Errorf("expected plugin failure") fExec.addPlugin020(nil, "net1", expectedConf2, nil, err) - _, err = CmdAdd(args, fExec, nil) + _, err = CmdAdd(args, fExec, nil, nil) Expect(fExec.addIndex).To(Equal(2)) Expect(fExec.delIndex).To(Equal(2)) Expect(err).To(MatchError("[//:other1]: error adding container to network \"other1\": expected plugin failure")) @@ -409,7 +409,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { err := fmt.Errorf("expected plugin failure") fExec.addPlugin020(nil, "net1", expectedConf2, nil, err) - _, err = CmdAdd(args, fExec, nil) + _, err = CmdAdd(args, fExec, nil, nil) Expect(fExec.addIndex).To(Equal(1)) Expect(fExec.delIndex).To(Equal(2)) Expect(err).To(HaveOccurred()) @@ -491,7 +491,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net3", net3)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*types020.Result) @@ -557,7 +557,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*types020.Result) @@ -568,7 +568,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { err = fKubeClient.DeletePod(fakePod.ObjectMeta.Namespace, fakePod.ObjectMeta.Name) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -609,13 +609,13 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { _, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*types020.Result) Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -674,7 +674,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { _, err = fKubeClient.AddNetAttachDef( testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*types020.Result) @@ -687,7 +687,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { Expect(err).NotTo(HaveOccurred()) By("Delete and check net count is not incremented") - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -746,7 +746,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { _, err = fKubeClient.AddNetAttachDef( testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*types020.Result) @@ -762,7 +762,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() { Expect(err).NotTo(HaveOccurred()) By("Delete and check pod/net count is incremented") - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) diff --git a/pkg/multus/multus_cni040_test.go b/pkg/multus/multus_cni040_test.go index 86ff62689..3e9f8f397 100644 --- a/pkg/multus/multus_cni040_test.go +++ b/pkg/multus/multus_cni040_test.go @@ -121,14 +121,14 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { }` fExec.addPlugin040(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni040.Result) // plugin 1 is the masterplugin Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -239,7 +239,7 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni040.Result) @@ -296,14 +296,14 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { }` fExec.addPlugin040(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni040.Result) // plugin 1 is the masterplugin Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -342,7 +342,7 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - _, err = CmdAdd(args, newFakeExec(), clientInfo) + _, err = CmdAdd(args, newFakeExec(), clientInfo, nil) Expect(err.Error()).To(ContainSubstring("expected pod UID \"foobar\" but got %q from Kube API", fakePod.UID)) }) @@ -406,7 +406,7 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni040.Result) @@ -458,7 +458,7 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { context.TODO(), fakePod, metav1.CreateOptions{}) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni040.Result) @@ -510,7 +510,7 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { context.TODO(), fakePod, metav1.CreateOptions{}) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni040.Result) @@ -557,7 +557,7 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { } }` fExec.addPlugin040(nil, "eth0", expectedConf1, nil, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) }) @@ -600,9 +600,9 @@ var _ = Describe("multus operations cniVersion 0.3.1 config", func() { } }` fExec.addPlugin040(nil, "eth0", expectedConf1, nil, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) }) }) @@ -696,7 +696,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { }` fExec.addPlugin040(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) @@ -706,7 +706,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { err = CmdCheck(args, fExec, nil) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -763,7 +763,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { }` fExec.addPlugin040(nil, "net1", expectedConf2, expectedResult2, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).To(MatchError("[//:weave1]: error adding container to network \"weave1\": DelegateAdd: cannot set \"weave-net\" interface name to \"eth0\": validateIfName: no net namespace fsdadfad found: failed to Statfs \"fsdadfad\": no such file or directory")) }) @@ -828,7 +828,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { }` fExec.addPlugin040(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) @@ -838,7 +838,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { err = CmdCheck(args, fExec, nil) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -895,10 +895,10 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { }` fExec.addPlugin040(nil, "net1", expectedConf2, expectedResult2, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).To(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).To(HaveOccurred()) }) @@ -940,7 +940,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { err := fmt.Errorf("expected plugin failure") fExec.addPlugin040(nil, "net1", expectedConf2, nil, err) - _, err = CmdAdd(args, fExec, nil) + _, err = CmdAdd(args, fExec, nil, nil) Expect(fExec.addIndex).To(Equal(2)) Expect(fExec.delIndex).To(Equal(2)) Expect(err).To(MatchError("[//:other1]: error adding container to network \"other1\": expected plugin failure")) @@ -986,7 +986,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { err := fmt.Errorf("missing network name") fExec.addPlugin040(nil, "net1", expectedConf2, nil, err) - _, err = CmdAdd(args, fExec, nil) + _, err = CmdAdd(args, fExec, nil, nil) Expect(fExec.addIndex).To(Equal(1)) Expect(fExec.delIndex).To(Equal(1)) Expect(err).To(HaveOccurred()) @@ -1098,7 +1098,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni040.Result) @@ -1186,7 +1186,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net3", net3)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -1247,7 +1247,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -1256,7 +1256,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { // set fKubeClient to nil to emulate no pod info err = clientInfo.DeletePod(fakePod.ObjectMeta.Namespace, fakePod.ObjectMeta.Name) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, clientInfo) + err = CmdDel(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -1300,7 +1300,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { } }` fExec.addPlugin040(nil, "eth0", expectedConf1, nil, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) }) @@ -1341,12 +1341,12 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { _, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) Expect(reflect.DeepEqual(result, expectedResult1)).To(BeTrue()) - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -1407,7 +1407,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { _, err = fKubeClient.AddNetAttachDef( testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -1419,7 +1419,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { Expect(err).NotTo(HaveOccurred()) By("Delete and check net count is not incremented") - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -1480,7 +1480,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { _, err = fKubeClient.AddNetAttachDef( testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -1495,7 +1495,7 @@ var _ = Describe("multus operations cniVersion 0.4.0 config", func() { Expect(err).NotTo(HaveOccurred()) By("Delete and check pod/net count is incremented") - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) diff --git a/pkg/multus/multus_cni100_test.go b/pkg/multus/multus_cni100_test.go index e193f4a56..d60b659e9 100644 --- a/pkg/multus/multus_cni100_test.go +++ b/pkg/multus/multus_cni100_test.go @@ -15,9 +15,11 @@ package multus import ( + "context" "fmt" "os" "reflect" + "time" "github.com/containernetworking/cni/pkg/skel" cni100 "github.com/containernetworking/cni/pkg/types/100" @@ -30,13 +32,44 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + + kapi "k8s.io/api/core/v1" + informerfactory "k8s.io/client-go/informers" + v1coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" ) +func newPodInformer(ctx context.Context, kclient kubernetes.Interface) cache.SharedIndexInformer { + informerFactory := informerfactory.NewSharedInformerFactory(kclient, 0 * time.Second) + + podInformer := informerFactory.InformerFor(&kapi.Pod{}, func(c kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return v1coreinformers.NewFilteredPodInformer( + c, + kapi.NamespaceAll, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + nil) + }) + + informerFactory.Start(ctx.Done()) + + waitCtx, waitCancel := context.WithTimeout(ctx, 20 * time.Second) + if !cache.WaitForCacheSync(waitCtx.Done(), podInformer.HasSynced) { + logging.Errorf("failed to sync pod informer cache") + } + waitCancel() + + return podInformer +} + var _ = Describe("multus operations cniVersion 1.0.0 config", func() { var testNS ns.NetNS var tmpDir string resultCNIVersion := "1.0.0" configPath := "/tmp/foo.multus.conf" + var ctx context.Context + var cancel context.CancelFunc BeforeEach(func() { // Create a new NetNS so we don't modify the host @@ -52,9 +85,12 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { // Touch the default network file. os.OpenFile(configPath, os.O_RDONLY|os.O_CREATE, 0755) + ctx, cancel = context.WithCancel(context.TODO()) }) AfterEach(func() { + cancel() + // Cleanup default network file. if _, errStat := os.Stat(configPath); errStat == nil { errRemove := os.Remove(configPath) @@ -121,7 +157,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { }` fExec.addPlugin100(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) @@ -131,7 +167,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { err = CmdCheck(args, fExec, nil) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -188,7 +224,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { }` fExec.addPlugin100(nil, "net1", expectedConf2, expectedResult2, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).To(MatchError("[//:weave1]: error adding container to network \"weave1\": DelegateAdd: cannot set \"weave-net\" interface name to \"eth0\": validateIfName: no net namespace fsdadfad found: failed to Statfs \"fsdadfad\": no such file or directory")) }) @@ -242,7 +278,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { }` fExec.addPlugin100(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) @@ -252,7 +288,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { err = CmdCheck(args, fExec, nil) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -318,7 +354,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { }` fExec.addPlugin100(nil, "net1", expectedConf2, expectedResult2, nil) - result, err := CmdAdd(args, fExec, nil) + result, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) @@ -328,7 +364,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { err = CmdCheck(args, fExec, nil) Expect(err).NotTo(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -385,10 +421,10 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { }` fExec.addPlugin100(nil, "net1", expectedConf2, expectedResult2, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).To(HaveOccurred()) - err = CmdDel(args, fExec, nil) + err = CmdDel(args, fExec, nil, nil) Expect(err).To(HaveOccurred()) }) @@ -430,7 +466,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { err := fmt.Errorf("expected plugin failure") fExec.addPlugin100(nil, "net1", expectedConf2, nil, err) - _, err = CmdAdd(args, fExec, nil) + _, err = CmdAdd(args, fExec, nil, nil) Expect(fExec.addIndex).To(Equal(2)) Expect(fExec.delIndex).To(Equal(2)) Expect(err).To(MatchError("[//:other1]: error adding container to network \"other1\": expected plugin failure")) @@ -476,7 +512,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { err := fmt.Errorf("missing network name") fExec.addPlugin100(nil, "net1", expectedConf2, nil, err) - _, err = CmdAdd(args, fExec, nil) + _, err = CmdAdd(args, fExec, nil, nil) Expect(fExec.addIndex).To(Equal(1)) Expect(fExec.delIndex).To(Equal(1)) Expect(err).To(HaveOccurred()) @@ -587,7 +623,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) r := result.(*cni100.Result) @@ -674,7 +710,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net3", net3)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -735,7 +771,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, clientInfo) + result, err := CmdAdd(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -743,7 +779,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { // set fKubeClient to nil to emulate no pod info clientInfo.DeletePod(fakePod.ObjectMeta.Namespace, fakePod.ObjectMeta.Name) - err = CmdDel(args, fExec, clientInfo) + err = CmdDel(args, fExec, clientInfo, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -787,7 +823,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { } }` fExec.addPlugin100(nil, "eth0", expectedConf1, nil, nil) - _, err := CmdAdd(args, fExec, nil) + _, err := CmdAdd(args, fExec, nil, nil) Expect(err).NotTo(HaveOccurred()) }) @@ -828,12 +864,61 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { _, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) + Expect(reflect.DeepEqual(result, expectedResult1)).To(BeTrue()) + + err = CmdDel(args, fExec, fKubeClient, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) + }) + + It("executes clusterNetwork delegate with a shared informer", func() { + fakePod := testhelpers.NewFakePod("testpod", "", "kube-system/net1") + net1 := `{ + "name": "net1", + "type": "mynet", + "cniVersion": "1.0.0" + }` + expectedResult1 := &cni100.Result{ + CNIVersion: "1.0.0", + IPs: []*cni100.IPConfig{{ + Address: *testhelpers.EnsureCIDR("1.1.1.2/24"), + }, + }, + } + args := &skel.CmdArgs{ + ContainerID: "123456789", + Netns: testNS.Path(), + IfName: "eth0", + Args: fmt.Sprintf("K8S_POD_NAME=%s;K8S_POD_NAMESPACE=%s", fakePod.ObjectMeta.Name, fakePod.ObjectMeta.Namespace), + StdinData: []byte(`{ + "name": "node-cni-network", + "type": "multus", + "kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml", + "defaultNetworks": [], + "clusterNetwork": "net1", + "delegates": [] + }`), + } + + fExec := newFakeExec() + fExec.addPlugin100(nil, "eth0", net1, expectedResult1, nil) + + fKubeClient := NewFakeClientInfo() + fKubeClient.AddPod(fakePod) + _, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1)) + Expect(err).NotTo(HaveOccurred()) + + podInformer := newPodInformer(ctx, fKubeClient.Client) + + result, err := CmdAdd(args, fExec, fKubeClient, podInformer) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) Expect(reflect.DeepEqual(result, expectedResult1)).To(BeTrue()) - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, podInformer) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -894,7 +979,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { _, err = fKubeClient.AddNetAttachDef( testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -906,7 +991,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { Expect(err).NotTo(HaveOccurred()) By("Delete and check net count is not incremented") - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) @@ -967,7 +1052,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { _, err = fKubeClient.AddNetAttachDef( testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1)) Expect(err).NotTo(HaveOccurred()) - result, err := CmdAdd(args, fExec, fKubeClient) + result, err := CmdAdd(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) // plugin 1 is the masterplugin @@ -982,7 +1067,7 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { Expect(err).NotTo(HaveOccurred()) By("Delete and check pod/net count is incremented") - err = CmdDel(args, fExec, fKubeClient) + err = CmdDel(args, fExec, fKubeClient, nil) Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) diff --git a/pkg/server/server.go b/pkg/server/server.go index 6f2d46ceb..7cfd9352b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -24,6 +24,7 @@ import ( "net/http" "os" "strings" + "time" "github.com/containernetworking/cni/pkg/invoke" "github.com/containernetworking/cni/pkg/skel" @@ -40,8 +41,16 @@ import ( "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/config" "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/types" + kapi "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilwait "k8s.io/apimachinery/pkg/util/wait" + informerfactory "k8s.io/client-go/informers" + v1coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/informers/internalinterfaces" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" ) const ( @@ -136,6 +145,32 @@ func GetListener(socketPath string) (net.Listener, error) { return l, nil } +func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internalinterfaces.SharedInformerFactory, cache.SharedIndexInformer) { + var tweakFunc internalinterfaces.TweakListOptionsFunc + if nodeName != "" { + // Only watch for local pods + tweakFunc = func(opts *metav1.ListOptions) { + opts.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeName).String() + } + } + + // Multus only watches pods so there's no possibility of race conditions + // between multiple resources that might require a resync to resolve + const resyncInterval time.Duration = 0 * time.Second + + informerFactory := informerfactory.NewSharedInformerFactory(kubeClient, resyncInterval) + podInformer := informerFactory.InformerFor(&kapi.Pod{}, func(c kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return v1coreinformers.NewFilteredPodInformer( + c, + kapi.NamespaceAll, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + tweakFunc) + }) + + return informerFactory, podInformer +} + // NewCNIServer creates and returns a new Server object which will listen on a socket in the given path func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte) (*Server, error) { kubeClient, err := k8s.InClusterK8sClient() @@ -165,6 +200,8 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s servConfig = bytes.Replace(servConfig, []byte("{"), []byte(","), 1) } + informerFactory, podInformer := newPodInformer(kubeClient.Client, os.Getenv("MULTUS_NODE_NAME")) + router := http.NewServeMux() s := &Server{ Server: http.Server{ @@ -183,6 +220,8 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s []string{"handler", "code", "method"}, ), }, + informerFactory: informerFactory, + podInformer: podInformer, } s.SetKeepAlivesEnabled(false) @@ -256,6 +295,16 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s } func (s *Server) Start(ctx context.Context, l net.Listener) { + s.informerFactory.Start(ctx.Done()) + + // Give the initial sync some time to complete in large clusters, but + // don't wait forever + waitCtx, waitCancel := context.WithTimeout(ctx, 20*time.Second) + if !cache.WaitForCacheSync(waitCtx.Done(), s.podInformer.HasSynced) { + logging.Errorf("failed to sync pod informer cache") + } + waitCancel() + go func() { utilwait.UntilWithContext(ctx, func(ctx context.Context) { logging.Debugf("open for business") @@ -439,7 +488,7 @@ func (s *Server) cmdAdd(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs) ([]byte, } logging.Debugf("CmdAdd for [%s/%s]. CNI conf: %+v", namespace, podName, *cmdArgs) - result, err := multus.CmdAdd(cmdArgs, s.exec, s.kubeclient) + result, err := multus.CmdAdd(cmdArgs, s.exec, s.kubeclient, s.podInformer) if err != nil { return nil, fmt.Errorf("error configuring pod [%s/%s] networking: %v", namespace, podName, err) } @@ -454,7 +503,7 @@ func (s *Server) cmdDel(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs) error { } logging.Debugf("CmdDel for [%s/%s]. CNI conf: %+v", namespace, podName, *cmdArgs) - return multus.CmdDel(cmdArgs, s.exec, s.kubeclient) + return multus.CmdDel(cmdArgs, s.exec, s.kubeclient, s.podInformer) } func (s *Server) cmdCheck(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs) error { @@ -488,7 +537,7 @@ func (s *Server) cmdDelegateAdd(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs, m if namespace == "" || podName == "" { return nil, fmt.Errorf("required CNI variable missing. pod name: %s; pod namespace: %s", podName, namespace) } - pod, err := multus.GetPod(s.kubeclient, k8sArgs, false) + pod, err := multus.GetPod(s.kubeclient, s.podInformer, k8sArgs, false) if err != nil { return nil, err } @@ -542,7 +591,7 @@ func (s *Server) cmdDelegateDel(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs, m if namespace == "" || podName == "" { return fmt.Errorf("required CNI variable missing. pod name: %s; pod namespace: %s", podName, namespace) } - pod, err := multus.GetPod(s.kubeclient, k8sArgs, false) + pod, err := multus.GetPod(s.kubeclient, s.podInformer, k8sArgs, false) if err != nil { return err } diff --git a/pkg/server/types.go b/pkg/server/types.go index d24ad7b76..9dd318352 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -22,6 +22,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient" + + "k8s.io/client-go/informers/internalinterfaces" + "k8s.io/client-go/tools/cache" ) const ( @@ -42,11 +45,13 @@ type Metrics struct { // the CNI shim requests issued when a pod is added / removed. type Server struct { http.Server - rundir string - kubeclient *k8sclient.ClientInfo - exec invoke.Exec - serverConfig []byte - metrics *Metrics + rundir string + kubeclient *k8sclient.ClientInfo + exec invoke.Exec + serverConfig []byte + metrics *Metrics + informerFactory internalinterfaces.SharedInformerFactory + podInformer cache.SharedIndexInformer } // ControllerNetConf for the controller cni configuration