From 173f4bdfdcbc8d78ab4bd829afac38a5dad4e2d5 Mon Sep 17 00:00:00 2001 From: Tomofumi Hayashi Date: Thu, 2 May 2024 21:14:27 +0900 Subject: [PATCH] Support GC and STATUS command for cluster network This change supports up to date CNI 1.1 command, GC and STATUS for cluster network. --- cmd/multus-shim/main.go | 26 ++++--- cmd/multus/main.go | 32 +++++--- pkg/k8sclient/k8sclient.go | 4 +- pkg/multus/multus.go | 96 ++++++++++++++++++++++++ pkg/multus/multus_cni100_test.go | 124 +++++++++++++++++++++++++++++++ pkg/multus/multus_suite_test.go | 10 +++ pkg/server/api/shim.go | 18 +++++ pkg/server/server.go | 26 +++++++ 8 files changed, 314 insertions(+), 22 deletions(-) diff --git a/cmd/multus-shim/main.go b/cmd/multus-shim/main.go index e5151c36f..43dffe364 100644 --- a/cmd/multus-shim/main.go +++ b/cmd/multus-shim/main.go @@ -44,15 +44,23 @@ func main() { return } - skel.PluginMain( - func(args *skel.CmdArgs) error { - return api.CmdAdd(args) - }, - func(args *skel.CmdArgs) error { - return api.CmdCheck(args) - }, - func(args *skel.CmdArgs) error { - return api.CmdDel(args) + skel.PluginMainFuncs( + skel.CNIFuncs{ + Add: func(args *skel.CmdArgs) error { + return api.CmdAdd(args) + }, + Check: func(args *skel.CmdArgs) error { + return api.CmdCheck(args) + }, + Del: func(args *skel.CmdArgs) error { + return api.CmdDel(args) + }, + GC: func(args *skel.CmdArgs) error { + return api.CmdGC(args) + }, + Status: func(args *skel.CmdArgs) error { + return api.CmdStatus(args) + }, }, cniversion.All, "meta-plugin that delegates to other CNI plugins") } diff --git a/cmd/multus/main.go b/cmd/multus/main.go index 5e9b6440e..a34630c3c 100644 --- a/cmd/multus/main.go +++ b/cmd/multus/main.go @@ -43,17 +43,27 @@ func main() { return } - skel.PluginMain( - func(args *skel.CmdArgs) error { - result, err := multus.CmdAdd(args, nil, nil) - if err != nil { - return err - } - return result.Print() + skel.PluginMainFuncs( + skel.CNIFuncs{ + Add: func(args *skel.CmdArgs) error { + result, err := multus.CmdAdd(args, nil, nil) + if err != nil { + return err + } + return result.Print() + }, + Del: func(args *skel.CmdArgs) error { + return multus.CmdDel(args, nil, nil) + }, + Check: func(args *skel.CmdArgs) error { + return multus.CmdCheck(args, nil, nil) + }, + GC: func(args *skel.CmdArgs) error { + return multus.CmdGC(args, nil, nil) + }, + Status: func(args *skel.CmdArgs) error { + return multus.CmdStatus(args, nil, nil) + }, }, - func(args *skel.CmdArgs) error { - return multus.CmdCheck(args, nil, nil) - }, - func(args *skel.CmdArgs) error { return multus.CmdDel(args, nil, nil) }, cniversion.All, "meta-plugin that delegates to other CNI plugins") } diff --git a/pkg/k8sclient/k8sclient.go b/pkg/k8sclient/k8sclient.go index 9c7005fda..3f26262dc 100644 --- a/pkg/k8sclient/k8sclient.go +++ b/pkg/k8sclient/k8sclient.go @@ -281,7 +281,7 @@ func getKubernetesDelegate(client *ClientInfo, net *types.NetworkSelectionElemen // Get resourceName annotation from NetworkAttachmentDefinition deviceID := "" resourceName, ok := customResource.GetAnnotations()[resourceNameAnnot] - if ok && pod.Name != "" && pod.Namespace != "" { + if ok && pod != nil && pod.Name != "" && pod.Namespace != "" { // ResourceName annotation is found; try to get device info from resourceMap logging.Debugf("getKubernetesDelegate: found resourceName annotation : %s", resourceName) @@ -568,7 +568,7 @@ func GetDefaultNetworks(pod *v1.Pod, conf *types.NetConf, kubeClient *ClientInfo delegates = append(delegates, delegate) // Pod in kube-system namespace does not have default network for now. - if !types.CheckSystemNamespaces(pod.ObjectMeta.Namespace, conf.SystemNamespaces) { + if pod != nil && !types.CheckSystemNamespaces(pod.ObjectMeta.Namespace, conf.SystemNamespaces) { for _, netname := range conf.DefaultNetworks { delegate, resourceMap, err := getNetDelegate(kubeClient, pod, netname, conf.ConfDir, conf.MultusNamespace, resourceMap) if err != nil { diff --git a/pkg/multus/multus.go b/pkg/multus/multus.go index 5452d9e7e..73e1bbd46 100644 --- a/pkg/multus/multus.go +++ b/pkg/multus/multus.go @@ -922,3 +922,99 @@ func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) er return e } + +// CmdStatus... +func CmdStatus(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) error { + n, err := types.LoadNetConf(args.StdinData) + logging.Debugf("CmdStatus: %v, %v, %v", args, exec, kubeClient) + if err != nil { + return cmdErr(nil, "error loading netconf: %v", err) + } + + kubeClient, err = k8s.GetK8sClient(n.Kubeconfig, kubeClient) + if err != nil { + return cmdErr(nil, "error getting k8s client: %v", err) + } + + if n.ReadinessIndicatorFile != "" { + if err := types.GetReadinessIndicatorFile(n.ReadinessIndicatorFile); err != nil { + return cmdErr(nil, "have you checked that your default network is ready? still waiting for readinessindicatorfile @ %v. pollimmediate error: %v", n.ReadinessIndicatorFile, err) + } + } + + if n.ClusterNetwork != "" { + _, err = k8s.GetDefaultNetworks(nil, n, kubeClient, nil) + if err != nil { + return cmdErr(nil, "failed to get clusterNetwork: %v", err) + } + // First delegate is always the master plugin + n.Delegates[0].MasterPlugin = true + } + + // invoke delegate's STATUS command + // we only need to check cluster network status + binDirs := filepath.SplitList(os.Getenv("CNI_PATH")) + binDirs = append([]string{n.BinDir}, binDirs...) + cniNet := libcni.NewCNIConfigWithCacheDir(binDirs, n.CNIDir, exec) + + conf, err := libcni.ConfListFromBytes(n.Delegates[0].Bytes) + if err != nil { + return logging.Errorf("error in converting the raw bytes to conf: %v", err) + } + + err = cniNet.GetStatusNetworkList(context.TODO(), conf) + if err != nil { + return logging.Errorf("error in STATUS command: %v", err) + } + + return nil +} + +// CmdGC ... +func CmdGC(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) error { + n, err := types.LoadNetConf(args.StdinData) + logging.Debugf("CmdStatus: %v, %v, %v", args, exec, kubeClient) + if err != nil { + return cmdErr(nil, "error loading netconf: %v", err) + } + + kubeClient, err = k8s.GetK8sClient(n.Kubeconfig, kubeClient) + if err != nil { + return cmdErr(nil, "error getting k8s client: %v", err) + } + + if n.ReadinessIndicatorFile != "" { + if err := types.GetReadinessIndicatorFile(n.ReadinessIndicatorFile); err != nil { + return cmdErr(nil, "have you checked that your default network is ready? still waiting for readinessindicatorfile @ %v. pollimmediate error: %v", n.ReadinessIndicatorFile, err) + } + } + + if n.ClusterNetwork != "" { + _, err = k8s.GetDefaultNetworks(nil, n, kubeClient, nil) + if err != nil { + return cmdErr(nil, "failed to get clusterNetwork: %v", err) + } + // First delegate is always the master plugin + n.Delegates[0].MasterPlugin = true + } + + // invoke delegate's GC command + // we only need to check cluster network status + binDirs := filepath.SplitList(os.Getenv("CNI_PATH")) + binDirs = append([]string{n.BinDir}, binDirs...) + cniNet := libcni.NewCNIConfigWithCacheDir(binDirs, n.CNIDir, exec) + + conf, err := libcni.ConfListFromBytes(n.Delegates[0].Bytes) + if err != nil { + return logging.Errorf("error in converting the raw bytes to conf: %v", err) + } + + err = cniNet.GCNetworkList(context.TODO(), conf, &libcni.GCArgs{ + ValidAttachments: n.ValidAttachments, + }) + if err != nil { + return logging.Errorf("error in GC command: %v", err) + } + + return nil +} diff --git a/pkg/multus/multus_cni100_test.go b/pkg/multus/multus_cni100_test.go index 7c4f7013c..633d72a32 100644 --- a/pkg/multus/multus_cni100_test.go +++ b/pkg/multus/multus_cni100_test.go @@ -1227,3 +1227,127 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { Expect(err).To(HaveOccurred()) }) }) + +var _ = Describe("multus operations cniVersion 1.1.0 config", func() { + var testNS ns.NetNS + var tmpDir string + configPath := "/tmp/foo.multus.conf" + var cancel context.CancelFunc + + BeforeEach(func() { + // Create a new NetNS so we don't modify the host + var err error + testNS, err = testutils.NewNS() + Expect(err).NotTo(HaveOccurred()) + os.Setenv("CNI_NETNS", testNS.Path()) + os.Setenv("CNI_PATH", "/some/path") + + tmpDir, err = os.MkdirTemp("", "multus_tmp") + Expect(err).NotTo(HaveOccurred()) + + // Touch the default network file. + os.OpenFile(configPath, os.O_RDONLY|os.O_CREATE, 0755) + _, cancel = context.WithCancel(context.TODO()) + }) + + AfterEach(func() { + cancel() + + // Cleanup default network file. + if _, errStat := os.Stat(configPath); errStat == nil { + errRemove := os.Remove(configPath) + Expect(errRemove).NotTo(HaveOccurred()) + } + + Expect(testNS.Close()).To(Succeed()) + os.Unsetenv("CNI_PATH") + os.Unsetenv("CNI_ARGS") + err := os.RemoveAll(tmpDir) + Expect(err).NotTo(HaveOccurred()) + }) + + It("executes delegates with CNI Check", func() { + args := &skel.CmdArgs{ + ContainerID: "123456789", + Netns: testNS.Path(), + IfName: "eth0", + StdinData: []byte(`{ + "name": "node-cni-network", + "type": "multus", + "defaultnetworkfile": "/tmp/foo.multus.conf", + "defaultnetworkwaitseconds": 3, + "delegates": [{ + "name": "weave1", + "cniVersion": "1.1.0", + "plugins": [{ + "type": "weave-net" + }] + },{ + "name": "other1", + "cniVersion": "1.1.0", + "plugins": [{ + "type": "other-plugin" + }] + }] + }`), + } + + logging.SetLogLevel("verbose") + + fExec := newFakeExec() + expectedConf1 := `{ + "name": "weave1", + "cniVersion": "1.1.0", + "type": "weave-net" + }` + fExec.addPlugin100(nil, "", expectedConf1, nil, nil) + + err := CmdStatus(args, fExec, nil) + Expect(err).NotTo(HaveOccurred()) + // we only execute once for cluster network, not additional one + Expect(fExec.statusIndex).To(Equal(1)) + }) + + It("executes delegates with CNI GC", func() { + args := &skel.CmdArgs{ + ContainerID: "123456789", + Netns: testNS.Path(), + IfName: "eth0", + StdinData: []byte(`{ + "name": "node-cni-network", + "type": "multus", + "defaultnetworkfile": "/tmp/foo.multus.conf", + "defaultnetworkwaitseconds": 3, + "delegates": [{ + "name": "weave1", + "cniVersion": "1.1.0", + "plugins": [{ + "type": "weave-net" + }] + },{ + "name": "other1", + "cniVersion": "1.1.0", + "plugins": [{ + "type": "other-plugin" + }] + }] + }`), + } + + logging.SetLogLevel("verbose") + + fExec := newFakeExec() + expectedConf1 := `{ + "cni.dev/valid-attachments": null, + "name": "weave1", + "cniVersion": "1.1.0", + "type": "weave-net" + }` + fExec.addPlugin100(nil, "", expectedConf1, nil, nil) + + err := CmdGC(args, fExec, nil) + Expect(err).NotTo(HaveOccurred()) + // we only execute once for cluster network, not additional one + Expect(fExec.gcIndex).To(Equal(1)) + }) +}) diff --git a/pkg/multus/multus_suite_test.go b/pkg/multus/multus_suite_test.go index a9ad866f2..a4748382b 100644 --- a/pkg/multus/multus_suite_test.go +++ b/pkg/multus/multus_suite_test.go @@ -58,6 +58,8 @@ type fakeExec struct { addIndex int delIndex int chkIndex int + statusIndex int + gcIndex int expectedDelSkip int plugins map[string]*fakePlugin } @@ -168,6 +170,14 @@ func (f *fakeExec) ExecPlugin(_ context.Context, pluginPath string, stdinData [] Expect(len(f.plugins)).To(BeNumerically(">", f.delIndex)) index = len(f.plugins) - f.expectedDelSkip - f.delIndex - 1 f.delIndex++ + case "GC": + Expect(len(f.plugins)).To(BeNumerically(">", f.statusIndex)) + index = f.gcIndex + f.gcIndex++ + case "STATUS": + Expect(len(f.plugins)).To(BeNumerically(">", f.statusIndex)) + index = f.statusIndex + f.statusIndex++ default: // Should never be reached Expect(false).To(BeTrue()) diff --git a/pkg/server/api/shim.go b/pkg/server/api/shim.go index 168404ea4..b09b4d79e 100644 --- a/pkg/server/api/shim.go +++ b/pkg/server/api/shim.go @@ -71,6 +71,24 @@ func CmdDel(args *skel.CmdArgs) error { return nil } +// CmdGC implements the CNI spec GC command handler +func CmdGC(args *skel.CmdArgs) error { + _, _, err := postRequest(args) + if err != nil { + return logging.Errorf("CmdGC (shim): %v", err) + } + return nil +} + +// CmdStatus implements the CNI spec STATUS command handler +func CmdStatus(args *skel.CmdArgs) error { + _, _, err := postRequest(args) + if err != nil { + return logging.Errorf("CmdStatus (shim): %v", err) + } + return nil +} + func postRequest(args *skel.CmdArgs) (*Response, string, error) { multusShimConfig, err := shimConfig(args.StdinData) if err != nil { diff --git a/pkg/server/server.go b/pkg/server/server.go index d469af23a..0d3b013d2 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -95,6 +95,10 @@ func (s *Server) HandleCNIRequest(cmd string, k8sArgs *types.K8sArgs, cniCmdArgs err = s.cmdDel(cniCmdArgs, k8sArgs) case "CHECK": err = s.cmdCheck(cniCmdArgs, k8sArgs) + case "GC": + err = s.cmdGC(cniCmdArgs, k8sArgs) + case "STATUS": + err = s.cmdStatus(cniCmdArgs, k8sArgs) default: return []byte(""), fmt.Errorf("unknown cmd type: %s", cmd) } @@ -614,6 +618,28 @@ func (s *Server) cmdCheck(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs) error { return multus.CmdCheck(cmdArgs, s.exec, s.kubeclient) } +func (s *Server) cmdGC(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs) error { + namespace := string(k8sArgs.K8S_POD_NAMESPACE) + podName := string(k8sArgs.K8S_POD_NAME) + if namespace == "" || podName == "" { + return fmt.Errorf("required CNI variable missing. pod name: %s; pod namespace: %s", podName, namespace) + } + + logging.Debugf("CmdGC for [%s/%s]. CNI conf: %+v", namespace, podName, *cmdArgs) + return multus.CmdGC(cmdArgs, s.exec, s.kubeclient) +} + +func (s *Server) cmdStatus(cmdArgs *skel.CmdArgs, k8sArgs *types.K8sArgs) error { + namespace := string(k8sArgs.K8S_POD_NAMESPACE) + podName := string(k8sArgs.K8S_POD_NAME) + if namespace == "" || podName == "" { + return fmt.Errorf("required CNI variable missing. pod name: %s; pod namespace: %s", podName, namespace) + } + + logging.Debugf("CmdStatus for [%s/%s]. CNI conf: %+v", namespace, podName, *cmdArgs) + return multus.CmdStatus(cmdArgs, s.exec, s.kubeclient) +} + func serializeResult(result cnitypes.Result) ([]byte, error) { // cni result is converted to latest here and decoded to specific cni version at multus-shim realResult, err := cni100.NewResultFromResult(result)