Skip to content

Commit

Permalink
feat: exec and port-forward using websocket if available (#295)
Browse files Browse the repository at this point in the history
  • Loading branch information
wencaiwulue authored Jul 12, 2024
1 parent b0a6a0d commit 0e2a8f1
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 95 deletions.
21 changes: 18 additions & 3 deletions pkg/handler/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,19 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) error {
}

func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) {
// Found those code looks like not works
if !util.FindAllowFirewallRule() {
util.AddAllowFirewallRule()
}
c.AddRolloutFunc(func() error {
util.DeleteAllowFirewallRule()
return nil
})

// The reason why delete firewall rule is:
// On windows use 'kubevpn proxy deploy/authors -H user=windows'
// Open terminal 'curl localhost:9080' ok
// Open terminal 'curl localTunIP:9080' not ok
go util.DeleteBlockFirewallRule(ctx)
}

Expand All @@ -469,7 +475,7 @@ func (c *ConnectOptions) setupDNS(ctx context.Context) error {
log.Errorf("get running pod list failed, err: %v", err)
return err
}
relovConf, err := util.GetDNSServiceIPFromPod(ctx, c.clientset, c.restclient, c.config, pod[0].GetName(), c.Namespace)
relovConf, err := util.GetDNSServiceIPFromPod(ctx, c.clientset, c.config, pod[0].GetName(), c.Namespace)
if err != nil {
log.Errorln(err)
return err
Expand Down Expand Up @@ -713,7 +719,7 @@ func (c *ConnectOptions) getCIDR(ctx context.Context) (err error) {
}

// (2) get cidr from cni
c.cidrs, err = util.GetCIDRElegant(ctx, c.clientset, c.restclient, c.config, c.Namespace)
c.cidrs, err = util.GetCIDRElegant(ctx, c.clientset, c.config, c.Namespace)
if err == nil {
s := sets.New[string]()
for _, cidr := range c.cidrs {
Expand Down Expand Up @@ -767,7 +773,7 @@ func (c *ConnectOptions) addExtraRoute(ctx context.Context, nameserver string) e
}
var ok = true
for _, domain := range c.ExtraRouteInfo.ExtraDomain {
ip, err := util.Shell(ctx, c.clientset, c.restclient, c.config, podList[0].Name, config.ContainerSidecarVPN, c.Namespace, []string{"dig", "+short", domain})
ip, err := util.Shell(ctx, c.clientset, c.config, podList[0].Name, config.ContainerSidecarVPN, c.Namespace, []string{"dig", "+short", domain})
if err == nil || net.ParseIP(ip) != nil {
addRouteFunc(domain, ip)
c.extraHost = append(c.extraHost, dns.Entry{IP: net.ParseIP(ip).String(), Domain: domain})
Expand Down Expand Up @@ -1059,6 +1065,15 @@ func (c *ConnectOptions) upgradeService(ctx context.Context) error {
return nil
}

// The reason why only Ping each other inner ip on Windows:
// On macOS use 'kubevpn proxy deploy/authors -H user=macos'
// On Windows use 'kubevpn proxy deploy/authors -H user=windows'
// On macOS/Windows:
// 'curl authors:9080/health -H "user: macos"' --> ok
// 'curl authors:9080/health -H "user: windows"' --> failed
// On Windows 'ping authors's inner tunIP' then
// 'curl authors:9080/health -H "user: macos"' --> ok
// 'curl authors:9080/health -H "user: windows"' --> ok
func (c *ConnectOptions) heartbeats(ctx context.Context) {
if !util.IsWindows() {
return
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/cidr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// 2) grep cmdline
// 3) create svc + cat *.conflist
// 4) create svc + get pod ip with svc mask
func GetCIDRElegant(ctx context.Context, clientset *kubernetes.Clientset, restclient *rest.RESTClient, restconfig *rest.Config, namespace string) ([]*net.IPNet, error) {
func GetCIDRElegant(ctx context.Context, clientset *kubernetes.Clientset, restconfig *rest.Config, namespace string) ([]*net.IPNet, error) {
defer func() {
_ = clientset.CoreV1().Pods(namespace).Delete(context.Background(), config.CniNetName, v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)})
}()
Expand All @@ -34,13 +34,13 @@ func GetCIDRElegant(ctx context.Context, clientset *kubernetes.Clientset, restcl
}

log.Infoln("get cidr from cni...")
cni, err := GetCIDRFromCNI(ctx, clientset, restclient, restconfig, namespace)
cni, err := GetCIDRFromCNI(ctx, clientset, restconfig, namespace)
if err == nil {
log.Infoln("get cidr from cni ok")
result = append(result, cni...)
}

pod, err := GetPodCIDRFromCNI(ctx, clientset, restclient, restconfig, namespace)
pod, err := GetPodCIDRFromCNI(ctx, clientset, restconfig, namespace)
if err == nil {
result = append(result, pod...)
}
Expand Down
12 changes: 3 additions & 9 deletions pkg/util/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
)

func GetDNSServiceIPFromPod(ctx context.Context, clientset *kubernetes.Clientset, restclient *rest.RESTClient, config *rest.Config, podName, namespace string) (*dns.ClientConfig, error) {
str, err := Shell(ctx, clientset, restclient, config, podName, "", namespace, []string{"cat", "/etc/resolv.conf"})
func GetDNSServiceIPFromPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace string) (*dns.ClientConfig, error) {
str, err := Shell(ctx, clientset, config, podName, "", namespace, []string{"cat", "/etc/resolv.conf"})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -88,13 +88,7 @@ func GetDNS(ctx context.Context, f util.Factory, ns, pod string) (*dns.ClientCon
if err != nil {
return nil, err
}

client, err := f.RESTClient()
if err != nil {
return nil, err
}

clientConfig, err := GetDNSServiceIPFromPod(ctx, clientSet, client, restConfig, pod, ns)
clientConfig, err := GetDNSServiceIPFromPod(ctx, clientSet, restConfig, pod, ns)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/elevate_others.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func RunCmdWithElevated(exe string, args []string) error {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Stdin = os.Stdin
cmd.Env = append(os.Environ(), envStartSudoKubeVPNByKubeVPN+"=1", config.EnvDisableSyncthingLog+"=1")
cmd.Env = append(os.Environ(), config.EnvStartSudoKubeVPNByKubeVPN+"=1", config.EnvDisableSyncthingLog+"=1")
err := cmd.Start()
if err != nil {
return err
Expand All @@ -54,7 +54,7 @@ func RunCmd(exe string, args []string) error {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Stdin = os.Stdin
cmd.Env = append(os.Environ(), envStartSudoKubeVPNByKubeVPN+"=1", config.EnvDisableSyncthingLog+"=1")
cmd.Env = append(os.Environ(), config.EnvStartSudoKubeVPNByKubeVPN+"=1", config.EnvDisableSyncthingLog+"=1")
err := cmd.Start()
if err != nil {
return err
Expand Down
23 changes: 9 additions & 14 deletions pkg/util/elevatecheck_others.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,18 @@ package util

import (
"flag"
"fmt"
"os"
"os/exec"
"os/signal"
"runtime"
"syscall"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/client-go/tools/clientcmd"

"github.com/wencaiwulue/kubevpn/v2/pkg/config"
)

const envStartSudoKubeVPNByKubeVPN = config.EnvStartSudoKubeVPNByKubeVPN

func RunWithElevated() {
// fix if startup with normal user, after elevated home dir will change to root user in linux
// but unix don't have this issue
Expand All @@ -33,7 +29,7 @@ func RunWithElevated() {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Stdin = os.Stdin
cmd.Env = append(os.Environ(), envStartSudoKubeVPNByKubeVPN+"=1", config.EnvDisableSyncthingLog+"=1")
cmd.Env = append(os.Environ(), config.EnvStartSudoKubeVPNByKubeVPN+"=1", config.EnvDisableSyncthingLog+"=1")
// while send single CTRL+C, command will quit immediately, but output will cut off and print util quit final
// so, mute single CTRL+C, let inner command handle single only
go func() {
Expand All @@ -48,22 +44,21 @@ func RunWithElevated() {
}

func IsAdmin() bool {
_, ok := os.LookupEnv(envStartSudoKubeVPNByKubeVPN)
/*_, ok := os.LookupEnv(config.EnvStartSudoKubeVPNByKubeVPN)
if os.Getuid() == 0 {
if !ok {
fmt.Println()
fmt.Println(`----------------------------------------------------------------------------------`)
fmt.Println(` Warn: Use sudo to execute command kubevpn can not use user env KUBECONFIG. `)
fmt.Println(` Because of sudo user env and user env are different. `)
fmt.Println(` Current env KUBECONFIG value: ` + os.Getenv(clientcmd.RecommendedConfigPathEnvVar))
fmt.Println(`----------------------------------------------------------------------------------`)
fmt.Println()
strings := []string{
"Warn: Use sudo to execute command kubevpn can not use user env KUBECONFIG.",
"Because of sudo user env and user env are different.",
"Current env KUBECONFIG value: " + os.Getenv(clientcmd.RecommendedConfigPathEnvVar),
}
PrintLine(nil, strings...)
for i := 0; i >= 0; i-- {
_, _ = fmt.Printf("\r %ds", i)
time.Sleep(time.Second * 1)
}
_, _ = fmt.Printf("\r")
}
}
}*/
return os.Getuid() == 0
}
8 changes: 4 additions & 4 deletions pkg/util/getcidr.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func GetCIDRByDumpClusterInfo(ctx context.Context, clientset *kubernetes.Clients
}

// GetCIDRFromCNI kube-controller-manager--allocate-node-cidrs=true--authentication-kubeconfig=/etc/kubernetes/controller-manager.conf--authorization-kubeconfig=/etc/kubernetes/controller-manager.conf--bind-address=0.0.0.0--client-ca-file=/etc/kubernetes/ssl/ca.crt--cluster-cidr=10.233.64.0/18--cluster-name=cluster.local--cluster-signing-cert-file=/etc/kubernetes/ssl/ca.crt--cluster-signing-key-file=/etc/kubernetes/ssl/ca.key--configure-cloud-routes=false--controllers=*,bootstrapsigner,tokencleaner--kubeconfig=/etc/kubernetes/controller-manager.conf--leader-elect=true--leader-elect-lease-duration=15s--leader-elect-renew-deadline=10s--node-cidr-mask-size=24--node-monitor-grace-period=40s--node-monitor-period=5s--port=0--profiling=False--requestheader-client-ca-file=/etc/kubernetes/ssl/front-proxy-ca.crt--root-ca-file=/etc/kubernetes/ssl/ca.crt--service-account-private-key-file=/etc/kubernetes/ssl/sa.key--service-cluster-ip-range=10.233.0.0/18--terminated-pod-gc-threshold=12500--use-service-account-credentials=true
func GetCIDRFromCNI(ctx context.Context, clientset *kubernetes.Clientset, restclient *rest.RESTClient, restconfig *rest.Config, namespace string) ([]*net.IPNet, error) {
func GetCIDRFromCNI(ctx context.Context, clientset *kubernetes.Clientset, restconfig *rest.Config, namespace string) ([]*net.IPNet, error) {
pod, err := CreateCIDRPod(ctx, clientset, namespace)
if err != nil {
return nil, err
Expand All @@ -61,7 +61,7 @@ func GetCIDRFromCNI(ctx context.Context, clientset *kubernetes.Clientset, restcl
var cmd = `grep -a -R "service-cluster-ip-range\|cluster-cidr" /etc/cni/proc/*/cmdline | grep -a -v grep | tr "\0" "\n"`

var content string
content, err = Shell(ctx, clientset, restclient, restconfig, pod.Name, "", pod.Namespace, []string{"sh", "-c", cmd})
content, err = Shell(ctx, clientset, restconfig, pod.Name, "", pod.Namespace, []string{"sh", "-c", cmd})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -130,9 +130,9 @@ func GetServiceCIDRByCreateService(ctx context.Context, serviceInterface corev1.
]
}
*/
func GetPodCIDRFromCNI(ctx context.Context, clientset *kubernetes.Clientset, restclient *rest.RESTClient, restconfig *rest.Config, namespace string) ([]*net.IPNet, error) {
func GetPodCIDRFromCNI(ctx context.Context, clientset *kubernetes.Clientset, restconfig *rest.Config, namespace string) ([]*net.IPNet, error) {
//var cmd = "cat /etc/cni/net.d/*.conflist"
content, err := Shell(ctx, clientset, restclient, restconfig, config.CniNetName, "", namespace, []string{"cat", "/etc/cni/net.d/*.conflist"})
content, err := Shell(ctx, clientset, restconfig, config.CniNetName, "", namespace, []string{"cat", "/etc/cni/net.d/*.conflist"})
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/getcidr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestByCreateSvc(t *testing.T) {

func TestElegant(t *testing.T) {
before()
elegant, err := GetCIDRElegant(context.Background(), clientset, restclient, restconfig, namespace)
elegant, err := GetCIDRElegant(context.Background(), clientset, restconfig, namespace)
if err != nil {
t.Error(err)
}
Expand Down
97 changes: 38 additions & 59 deletions pkg/util/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericiooptions"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/transport/spdy"
"k8s.io/kubectl/pkg/cmd/exec"
"k8s.io/kubectl/pkg/cmd/util"
Expand Down Expand Up @@ -107,10 +106,6 @@ func GetEnv(ctx context.Context, f util.Factory, ns, pod string) (map[string][]s
if err2 != nil {
return nil, err2
}
client, err2 := f.RESTClient()
if err2 != nil {
return nil, err2
}
config, err2 := f.ToRESTConfig()
if err2 != nil {
return nil, err2
Expand All @@ -121,7 +116,7 @@ func GetEnv(ctx context.Context, f util.Factory, ns, pod string) (map[string][]s
}
result := map[string][]string{}
for _, c := range get.Spec.Containers {
env, err := Shell(ctx, set, client, config, pod, c.Name, ns, []string{"env"})
env, err := Shell(ctx, set, config, pod, c.Name, ns, []string{"env"})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -152,6 +147,10 @@ func WaitPod(ctx context.Context, podInterface v12.PodInterface, list v1.ListOpt
}

func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, namespace string, portPair []string, readyChan chan struct{}, stopChan <-chan struct{}) error {
err := os.Setenv(string(util.RemoteCommandWebsockets), "true")
if err != nil {
return err
}
url := clientset.
Post().
Resource("pods").
Expand All @@ -165,7 +164,17 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na
return err
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
forwarder, err := portforward.NewOnAddresses(dialer, []string{"localhost"}, portPair, stopChan, readyChan, nil, os.Stderr)
// Legacy SPDY executor is default. If feature gate enabled, fallback
// executor attempts websockets first--then SPDY.
if util.RemoteCommandWebsockets.IsEnabled() {
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
websocketDialer, err := portforward.NewSPDYOverWebsocketDialer(url, config)
if err != nil {
return err
}
dialer = portforward.NewFallbackDialer(websocketDialer, dialer, httpstream.IsUpgradeFailure)
}
forwarder, err := portforward.New(dialer, portPair, stopChan, readyChan, nil, os.Stderr)
if err != nil {
logrus.Errorf("create port forward error: %s", err.Error())
return err
Expand Down Expand Up @@ -217,62 +226,32 @@ func GetTopOwnerReferenceBySelector(factory util.Factory, namespace, selector st
return set, nil
}

func Shell(ctx context.Context, clientset *kubernetes.Clientset, restclient *rest.RESTClient, config *rest.Config, podName, containerName, namespace string, cmd []string) (string, error) {
pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, v1.GetOptions{})

func Shell(_ context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, containerName, namespace string, cmd []string) (string, error) {
err := os.Setenv(string(util.RemoteCommandWebsockets), "true")
if err != nil {
return "", err
}
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
err = fmt.Errorf("cannot exec into a container in a completed pod; current phase is %s", pod.Status.Phase)
return "", err
}
if containerName == "" {
containerName = pod.Spec.Containers[0].Name
}
stdin, _, _ := term.StdStreams()

stdoutBuf := bytes.NewBuffer(nil)
stdout := io.MultiWriter(stdoutBuf)
StreamOptions := exec.StreamOptions{
Namespace: namespace,
PodName: podName,
ContainerName: containerName,
IOStreams: genericiooptions.IOStreams{In: stdin, Out: stdout, ErrOut: nil},
}
Executor := &exec.DefaultRemoteExecutor{}
// ensure we can recover the terminal while attached
tt := StreamOptions.SetupTTY()

var sizeQueue remotecommand.TerminalSizeQueue
if tt.Raw {
// this call spawns a goroutine to monitor/update the terminal size
sizeQueue = tt.MonitorSize(tt.GetSize())

// unset p.Err if it was previously set because both stdout and stderr go over p.Out when tty is
// true
StreamOptions.ErrOut = nil
}

fn := func() error {
req := restclient.Post().
Resource("pods").
Name(pod.Name).
Namespace(pod.Namespace).
SubResource("exec")
req.VersionedParams(&corev1.PodExecOptions{
Container: containerName,
Command: cmd,
Stdin: StreamOptions.Stdin,
Stdout: StreamOptions.Out != nil,
Stderr: StreamOptions.ErrOut != nil,
TTY: tt.Raw,
}, scheme.ParameterCodec)
return Executor.Execute(req.URL(), config, StreamOptions.In, StreamOptions.Out, StreamOptions.ErrOut, tt.Raw, sizeQueue)
buf := bytes.NewBuffer(nil)
options := exec.ExecOptions{
StreamOptions: exec.StreamOptions{
Namespace: namespace,
PodName: podName,
ContainerName: containerName,
Stdin: false,
TTY: false,
Quiet: true,
IOStreams: genericiooptions.IOStreams{In: stdin, Out: buf, ErrOut: io.Discard},
},
Command: cmd,
Executor: &exec.DefaultRemoteExecutor{},
PodClient: clientset.CoreV1(),
Config: config,
}
if err = options.Run(); err != nil {
return "", err
}

err = tt.Safe(fn)
return strings.TrimRight(stdoutBuf.String(), "\n"), err
return strings.TrimRight(buf.String(), "\n"), err
}

func WaitPodToBeReady(ctx context.Context, podInterface v12.PodInterface, selector v1.LabelSelector) error {
Expand Down

0 comments on commit 0e2a8f1

Please sign in to comment.