From 0e2a8f1ce6f41dd581f38efe1e0783e4d98a1e01 Mon Sep 17 00:00:00 2001 From: naison <895703375@qq.com> Date: Fri, 12 Jul 2024 22:09:50 +0800 Subject: [PATCH] feat: exec and port-forward using websocket if available (#295) --- pkg/handler/connect.go | 21 ++++++- pkg/util/cidr.go | 6 +- pkg/util/dns.go | 12 +--- pkg/util/elevate_others.go | 4 +- pkg/util/elevatecheck_others.go | 23 +++----- pkg/util/getcidr.go | 8 +-- pkg/util/getcidr_test.go | 2 +- pkg/util/pod.go | 97 +++++++++++++-------------------- 8 files changed, 78 insertions(+), 95 deletions(-) diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index 2b425fb9a..c22ab15cc 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -452,6 +452,7 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) error { } func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) { + // Found those code looks like not works if !util.FindAllowFirewallRule() { util.AddAllowFirewallRule() } @@ -459,6 +460,11 @@ func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) { util.DeleteAllowFirewallRule() return nil }) + + // The reason why delete firewall rule is: + // On windows use 'kubevpn proxy deploy/authors -H user=windows' + // Open terminal 'curl localhost:9080' ok + // Open terminal 'curl localTunIP:9080' not ok go util.DeleteBlockFirewallRule(ctx) } @@ -469,7 +475,7 @@ func (c *ConnectOptions) setupDNS(ctx context.Context) error { log.Errorf("get running pod list failed, err: %v", err) return err } - relovConf, err := util.GetDNSServiceIPFromPod(ctx, c.clientset, c.restclient, c.config, pod[0].GetName(), c.Namespace) + relovConf, err := util.GetDNSServiceIPFromPod(ctx, c.clientset, c.config, pod[0].GetName(), c.Namespace) if err != nil { log.Errorln(err) return err @@ -713,7 +719,7 @@ func (c *ConnectOptions) getCIDR(ctx context.Context) (err error) { } // (2) get cidr from cni - c.cidrs, err = util.GetCIDRElegant(ctx, c.clientset, c.restclient, c.config, c.Namespace) + c.cidrs, err = util.GetCIDRElegant(ctx, c.clientset, c.config, c.Namespace) if err == nil { s := sets.New[string]() for _, cidr := range c.cidrs { @@ -767,7 +773,7 @@ func (c *ConnectOptions) addExtraRoute(ctx context.Context, nameserver string) e } var ok = true for _, domain := range c.ExtraRouteInfo.ExtraDomain { - ip, err := util.Shell(ctx, c.clientset, c.restclient, c.config, podList[0].Name, config.ContainerSidecarVPN, c.Namespace, []string{"dig", "+short", domain}) + ip, err := util.Shell(ctx, c.clientset, c.config, podList[0].Name, config.ContainerSidecarVPN, c.Namespace, []string{"dig", "+short", domain}) if err == nil || net.ParseIP(ip) != nil { addRouteFunc(domain, ip) c.extraHost = append(c.extraHost, dns.Entry{IP: net.ParseIP(ip).String(), Domain: domain}) @@ -1059,6 +1065,15 @@ func (c *ConnectOptions) upgradeService(ctx context.Context) error { return nil } +// The reason why only Ping each other inner ip on Windows: +// On macOS use 'kubevpn proxy deploy/authors -H user=macos' +// On Windows use 'kubevpn proxy deploy/authors -H user=windows' +// On macOS/Windows: +// 'curl authors:9080/health -H "user: macos"' --> ok +// 'curl authors:9080/health -H "user: windows"' --> failed +// On Windows 'ping authors's inner tunIP' then +// 'curl authors:9080/health -H "user: macos"' --> ok +// 'curl authors:9080/health -H "user: windows"' --> ok func (c *ConnectOptions) heartbeats(ctx context.Context) { if !util.IsWindows() { return diff --git a/pkg/util/cidr.go b/pkg/util/cidr.go index 760b37bbd..87155d075 100644 --- a/pkg/util/cidr.go +++ b/pkg/util/cidr.go @@ -20,7 +20,7 @@ import ( // 2) grep cmdline // 3) create svc + cat *.conflist // 4) create svc + get pod ip with svc mask -func GetCIDRElegant(ctx context.Context, clientset *kubernetes.Clientset, restclient *rest.RESTClient, restconfig *rest.Config, namespace string) ([]*net.IPNet, error) { +func GetCIDRElegant(ctx context.Context, clientset *kubernetes.Clientset, restconfig *rest.Config, namespace string) ([]*net.IPNet, error) { defer func() { _ = clientset.CoreV1().Pods(namespace).Delete(context.Background(), config.CniNetName, v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)}) }() @@ -34,13 +34,13 @@ func GetCIDRElegant(ctx context.Context, clientset *kubernetes.Clientset, restcl } log.Infoln("get cidr from cni...") - cni, err := GetCIDRFromCNI(ctx, clientset, restclient, restconfig, namespace) + cni, err := GetCIDRFromCNI(ctx, clientset, restconfig, namespace) if err == nil { log.Infoln("get cidr from cni ok") result = append(result, cni...) } - pod, err := GetPodCIDRFromCNI(ctx, clientset, restclient, restconfig, namespace) + pod, err := GetPodCIDRFromCNI(ctx, clientset, restconfig, namespace) if err == nil { result = append(result, pod...) } diff --git a/pkg/util/dns.go b/pkg/util/dns.go index e0c3022cb..a9c667483 100644 --- a/pkg/util/dns.go +++ b/pkg/util/dns.go @@ -17,8 +17,8 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/config" ) -func GetDNSServiceIPFromPod(ctx context.Context, clientset *kubernetes.Clientset, restclient *rest.RESTClient, config *rest.Config, podName, namespace string) (*dns.ClientConfig, error) { - str, err := Shell(ctx, clientset, restclient, config, podName, "", namespace, []string{"cat", "/etc/resolv.conf"}) +func GetDNSServiceIPFromPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace string) (*dns.ClientConfig, error) { + str, err := Shell(ctx, clientset, config, podName, "", namespace, []string{"cat", "/etc/resolv.conf"}) if err != nil { return nil, err } @@ -88,13 +88,7 @@ func GetDNS(ctx context.Context, f util.Factory, ns, pod string) (*dns.ClientCon if err != nil { return nil, err } - - client, err := f.RESTClient() - if err != nil { - return nil, err - } - - clientConfig, err := GetDNSServiceIPFromPod(ctx, clientSet, client, restConfig, pod, ns) + clientConfig, err := GetDNSServiceIPFromPod(ctx, clientSet, restConfig, pod, ns) if err != nil { return nil, err } diff --git a/pkg/util/elevate_others.go b/pkg/util/elevate_others.go index 3978c18dd..d73f5ee70 100644 --- a/pkg/util/elevate_others.go +++ b/pkg/util/elevate_others.go @@ -27,7 +27,7 @@ func RunCmdWithElevated(exe string, args []string) error { cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr cmd.Stdin = os.Stdin - cmd.Env = append(os.Environ(), envStartSudoKubeVPNByKubeVPN+"=1", config.EnvDisableSyncthingLog+"=1") + cmd.Env = append(os.Environ(), config.EnvStartSudoKubeVPNByKubeVPN+"=1", config.EnvDisableSyncthingLog+"=1") err := cmd.Start() if err != nil { return err @@ -54,7 +54,7 @@ func RunCmd(exe string, args []string) error { cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr cmd.Stdin = os.Stdin - cmd.Env = append(os.Environ(), envStartSudoKubeVPNByKubeVPN+"=1", config.EnvDisableSyncthingLog+"=1") + cmd.Env = append(os.Environ(), config.EnvStartSudoKubeVPNByKubeVPN+"=1", config.EnvDisableSyncthingLog+"=1") err := cmd.Start() if err != nil { return err diff --git a/pkg/util/elevatecheck_others.go b/pkg/util/elevatecheck_others.go index d9c9f76fa..32633db22 100644 --- a/pkg/util/elevatecheck_others.go +++ b/pkg/util/elevatecheck_others.go @@ -4,13 +4,11 @@ package util import ( "flag" - "fmt" "os" "os/exec" "os/signal" "runtime" "syscall" - "time" log "github.com/sirupsen/logrus" "k8s.io/client-go/tools/clientcmd" @@ -18,8 +16,6 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/config" ) -const envStartSudoKubeVPNByKubeVPN = config.EnvStartSudoKubeVPNByKubeVPN - func RunWithElevated() { // fix if startup with normal user, after elevated home dir will change to root user in linux // but unix don't have this issue @@ -33,7 +29,7 @@ func RunWithElevated() { cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr cmd.Stdin = os.Stdin - cmd.Env = append(os.Environ(), envStartSudoKubeVPNByKubeVPN+"=1", config.EnvDisableSyncthingLog+"=1") + cmd.Env = append(os.Environ(), config.EnvStartSudoKubeVPNByKubeVPN+"=1", config.EnvDisableSyncthingLog+"=1") // while send single CTRL+C, command will quit immediately, but output will cut off and print util quit final // so, mute single CTRL+C, let inner command handle single only go func() { @@ -48,22 +44,21 @@ func RunWithElevated() { } func IsAdmin() bool { - _, ok := os.LookupEnv(envStartSudoKubeVPNByKubeVPN) + /*_, ok := os.LookupEnv(config.EnvStartSudoKubeVPNByKubeVPN) if os.Getuid() == 0 { if !ok { - fmt.Println() - fmt.Println(`----------------------------------------------------------------------------------`) - fmt.Println(` Warn: Use sudo to execute command kubevpn can not use user env KUBECONFIG. `) - fmt.Println(` Because of sudo user env and user env are different. `) - fmt.Println(` Current env KUBECONFIG value: ` + os.Getenv(clientcmd.RecommendedConfigPathEnvVar)) - fmt.Println(`----------------------------------------------------------------------------------`) - fmt.Println() + strings := []string{ + "Warn: Use sudo to execute command kubevpn can not use user env KUBECONFIG.", + "Because of sudo user env and user env are different.", + "Current env KUBECONFIG value: " + os.Getenv(clientcmd.RecommendedConfigPathEnvVar), + } + PrintLine(nil, strings...) for i := 0; i >= 0; i-- { _, _ = fmt.Printf("\r %ds", i) time.Sleep(time.Second * 1) } _, _ = fmt.Printf("\r") } - } + }*/ return os.Getuid() == 0 } diff --git a/pkg/util/getcidr.go b/pkg/util/getcidr.go index a1b2d5a8c..fe4ce3b83 100644 --- a/pkg/util/getcidr.go +++ b/pkg/util/getcidr.go @@ -52,7 +52,7 @@ func GetCIDRByDumpClusterInfo(ctx context.Context, clientset *kubernetes.Clients } // GetCIDRFromCNI kube-controller-manager--allocate-node-cidrs=true--authentication-kubeconfig=/etc/kubernetes/controller-manager.conf--authorization-kubeconfig=/etc/kubernetes/controller-manager.conf--bind-address=0.0.0.0--client-ca-file=/etc/kubernetes/ssl/ca.crt--cluster-cidr=10.233.64.0/18--cluster-name=cluster.local--cluster-signing-cert-file=/etc/kubernetes/ssl/ca.crt--cluster-signing-key-file=/etc/kubernetes/ssl/ca.key--configure-cloud-routes=false--controllers=*,bootstrapsigner,tokencleaner--kubeconfig=/etc/kubernetes/controller-manager.conf--leader-elect=true--leader-elect-lease-duration=15s--leader-elect-renew-deadline=10s--node-cidr-mask-size=24--node-monitor-grace-period=40s--node-monitor-period=5s--port=0--profiling=False--requestheader-client-ca-file=/etc/kubernetes/ssl/front-proxy-ca.crt--root-ca-file=/etc/kubernetes/ssl/ca.crt--service-account-private-key-file=/etc/kubernetes/ssl/sa.key--service-cluster-ip-range=10.233.0.0/18--terminated-pod-gc-threshold=12500--use-service-account-credentials=true -func GetCIDRFromCNI(ctx context.Context, clientset *kubernetes.Clientset, restclient *rest.RESTClient, restconfig *rest.Config, namespace string) ([]*net.IPNet, error) { +func GetCIDRFromCNI(ctx context.Context, clientset *kubernetes.Clientset, restconfig *rest.Config, namespace string) ([]*net.IPNet, error) { pod, err := CreateCIDRPod(ctx, clientset, namespace) if err != nil { return nil, err @@ -61,7 +61,7 @@ func GetCIDRFromCNI(ctx context.Context, clientset *kubernetes.Clientset, restcl var cmd = `grep -a -R "service-cluster-ip-range\|cluster-cidr" /etc/cni/proc/*/cmdline | grep -a -v grep | tr "\0" "\n"` var content string - content, err = Shell(ctx, clientset, restclient, restconfig, pod.Name, "", pod.Namespace, []string{"sh", "-c", cmd}) + content, err = Shell(ctx, clientset, restconfig, pod.Name, "", pod.Namespace, []string{"sh", "-c", cmd}) if err != nil { return nil, err } @@ -130,9 +130,9 @@ func GetServiceCIDRByCreateService(ctx context.Context, serviceInterface corev1. ] } */ -func GetPodCIDRFromCNI(ctx context.Context, clientset *kubernetes.Clientset, restclient *rest.RESTClient, restconfig *rest.Config, namespace string) ([]*net.IPNet, error) { +func GetPodCIDRFromCNI(ctx context.Context, clientset *kubernetes.Clientset, restconfig *rest.Config, namespace string) ([]*net.IPNet, error) { //var cmd = "cat /etc/cni/net.d/*.conflist" - content, err := Shell(ctx, clientset, restclient, restconfig, config.CniNetName, "", namespace, []string{"cat", "/etc/cni/net.d/*.conflist"}) + content, err := Shell(ctx, clientset, restconfig, config.CniNetName, "", namespace, []string{"cat", "/etc/cni/net.d/*.conflist"}) if err != nil { return nil, err } diff --git a/pkg/util/getcidr_test.go b/pkg/util/getcidr_test.go index 4619895cf..a11b4a6c0 100644 --- a/pkg/util/getcidr_test.go +++ b/pkg/util/getcidr_test.go @@ -61,7 +61,7 @@ func TestByCreateSvc(t *testing.T) { func TestElegant(t *testing.T) { before() - elegant, err := GetCIDRElegant(context.Background(), clientset, restclient, restconfig, namespace) + elegant, err := GetCIDRElegant(context.Background(), clientset, restconfig, namespace) if err != nil { t.Error(err) } diff --git a/pkg/util/pod.go b/pkg/util/pod.go index 78f97adfc..80c3ef06c 100644 --- a/pkg/util/pod.go +++ b/pkg/util/pod.go @@ -20,16 +20,15 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericiooptions" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" v12 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/portforward" - "k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/transport/spdy" "k8s.io/kubectl/pkg/cmd/exec" "k8s.io/kubectl/pkg/cmd/util" @@ -107,10 +106,6 @@ func GetEnv(ctx context.Context, f util.Factory, ns, pod string) (map[string][]s if err2 != nil { return nil, err2 } - client, err2 := f.RESTClient() - if err2 != nil { - return nil, err2 - } config, err2 := f.ToRESTConfig() if err2 != nil { return nil, err2 @@ -121,7 +116,7 @@ func GetEnv(ctx context.Context, f util.Factory, ns, pod string) (map[string][]s } result := map[string][]string{} for _, c := range get.Spec.Containers { - env, err := Shell(ctx, set, client, config, pod, c.Name, ns, []string{"env"}) + env, err := Shell(ctx, set, config, pod, c.Name, ns, []string{"env"}) if err != nil { return nil, err } @@ -152,6 +147,10 @@ func WaitPod(ctx context.Context, podInterface v12.PodInterface, list v1.ListOpt } func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, namespace string, portPair []string, readyChan chan struct{}, stopChan <-chan struct{}) error { + err := os.Setenv(string(util.RemoteCommandWebsockets), "true") + if err != nil { + return err + } url := clientset. Post(). Resource("pods"). @@ -165,7 +164,17 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na return err } dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) - forwarder, err := portforward.NewOnAddresses(dialer, []string{"localhost"}, portPair, stopChan, readyChan, nil, os.Stderr) + // Legacy SPDY executor is default. If feature gate enabled, fallback + // executor attempts websockets first--then SPDY. + if util.RemoteCommandWebsockets.IsEnabled() { + // WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17). + websocketDialer, err := portforward.NewSPDYOverWebsocketDialer(url, config) + if err != nil { + return err + } + dialer = portforward.NewFallbackDialer(websocketDialer, dialer, httpstream.IsUpgradeFailure) + } + forwarder, err := portforward.New(dialer, portPair, stopChan, readyChan, nil, os.Stderr) if err != nil { logrus.Errorf("create port forward error: %s", err.Error()) return err @@ -217,62 +226,32 @@ func GetTopOwnerReferenceBySelector(factory util.Factory, namespace, selector st return set, nil } -func Shell(ctx context.Context, clientset *kubernetes.Clientset, restclient *rest.RESTClient, config *rest.Config, podName, containerName, namespace string, cmd []string) (string, error) { - pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, v1.GetOptions{}) - +func Shell(_ context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, containerName, namespace string, cmd []string) (string, error) { + err := os.Setenv(string(util.RemoteCommandWebsockets), "true") if err != nil { return "", err } - if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { - err = fmt.Errorf("cannot exec into a container in a completed pod; current phase is %s", pod.Status.Phase) - return "", err - } - if containerName == "" { - containerName = pod.Spec.Containers[0].Name - } stdin, _, _ := term.StdStreams() - - stdoutBuf := bytes.NewBuffer(nil) - stdout := io.MultiWriter(stdoutBuf) - StreamOptions := exec.StreamOptions{ - Namespace: namespace, - PodName: podName, - ContainerName: containerName, - IOStreams: genericiooptions.IOStreams{In: stdin, Out: stdout, ErrOut: nil}, - } - Executor := &exec.DefaultRemoteExecutor{} - // ensure we can recover the terminal while attached - tt := StreamOptions.SetupTTY() - - var sizeQueue remotecommand.TerminalSizeQueue - if tt.Raw { - // this call spawns a goroutine to monitor/update the terminal size - sizeQueue = tt.MonitorSize(tt.GetSize()) - - // unset p.Err if it was previously set because both stdout and stderr go over p.Out when tty is - // true - StreamOptions.ErrOut = nil - } - - fn := func() error { - req := restclient.Post(). - Resource("pods"). - Name(pod.Name). - Namespace(pod.Namespace). - SubResource("exec") - req.VersionedParams(&corev1.PodExecOptions{ - Container: containerName, - Command: cmd, - Stdin: StreamOptions.Stdin, - Stdout: StreamOptions.Out != nil, - Stderr: StreamOptions.ErrOut != nil, - TTY: tt.Raw, - }, scheme.ParameterCodec) - return Executor.Execute(req.URL(), config, StreamOptions.In, StreamOptions.Out, StreamOptions.ErrOut, tt.Raw, sizeQueue) + buf := bytes.NewBuffer(nil) + options := exec.ExecOptions{ + StreamOptions: exec.StreamOptions{ + Namespace: namespace, + PodName: podName, + ContainerName: containerName, + Stdin: false, + TTY: false, + Quiet: true, + IOStreams: genericiooptions.IOStreams{In: stdin, Out: buf, ErrOut: io.Discard}, + }, + Command: cmd, + Executor: &exec.DefaultRemoteExecutor{}, + PodClient: clientset.CoreV1(), + Config: config, + } + if err = options.Run(); err != nil { + return "", err } - - err = tt.Safe(fn) - return strings.TrimRight(stdoutBuf.String(), "\n"), err + return strings.TrimRight(buf.String(), "\n"), err } func WaitPodToBeReady(ctx context.Context, podInterface v12.PodInterface, selector v1.LabelSelector) error {