diff --git a/connectivity/tests/encryption.go b/connectivity/tests/encryption.go index 4b8376557a..f62c652449 100644 --- a/connectivity/tests/encryption.go +++ b/connectivity/tests/encryption.go @@ -45,8 +45,6 @@ func (s *podToPodEncryption) Run(ctx context.Context, t *check.Test) { } } - bgCtx, bgCancel := context.WithCancel(ctx) - // clientHost is a pod running on the same node as the client pod, just in // the host netns. clientHost := t.Context().HostNetNSPodsByNode()[client.Pod.Spec.NodeName] @@ -78,6 +76,7 @@ func (s *podToPodEncryption) Run(ctx context.Context, t *check.Test) { bgStdout := &safeBuffer{} bgStderr := &safeBuffer{} bgExited := make(chan struct{}) + killCmdCtx, killCmd := context.WithCancel(context.Background()) // Start kubectl exec in bg (=goroutine) go func() { // Run tcpdump with -w instead of directly printing captured pkts. This @@ -95,9 +94,9 @@ func (s *podToPodEncryption) Run(ctx context.Context, t *check.Test) { // to be captured "-c", "1"} t.Debugf("Running in bg: %s", strings.Join(cmd, " ")) - err := clientHost.K8sClient.ExecInPodWithWriters(bgCtx, clientHost.Pod.Namespace, - clientHost.Pod.Name, "", cmd, bgStdout, bgStderr) - if err != nil { + err := clientHost.K8sClient.ExecInPodWithWriters(ctx, killCmdCtx, + clientHost.Pod.Namespace, clientHost.Pod.Name, "", cmd, bgStdout, bgStderr) + if err != nil && !errors.Is(err, context.Canceled) { t.Fatalf("Failed to execute tcpdump: %s", err) } close(bgExited) @@ -128,7 +127,7 @@ func (s *podToPodEncryption) Run(ctx context.Context, t *check.Test) { }) // Wait until tcpdump has exited - bgCancel() + killCmd() <-bgExited // Redirect stderr to /dev/null, as tcpdump logs to stderr, and ExecInPod diff --git a/internal/utils/ctrlcreader.go b/internal/utils/ctrlcreader.go new file mode 100644 index 0000000000..fdd06e6ce4 --- /dev/null +++ b/internal/utils/ctrlcreader.go @@ -0,0 +1,58 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package utils + +import ( + "context" + "io" + "sync" +) + +// CtrlCReader implements a simple Reader/Closer that returns Ctrl-C and EOF +// on Read() after it has been closed, and nothing before it. +type CtrlCReader struct { + ctx context.Context + closeOnce sync.Once + closed chan struct{} +} + +// NewCtrlCReader returns a new CtrlCReader instance +func NewCtrlCReader(ctx context.Context) *CtrlCReader { + return &CtrlCReader{ + ctx: ctx, + closed: make(chan struct{}), + } +} + +// Read implements io.Reader. +// Blocks until we are done. +func (cc *CtrlCReader) Read(p []byte) (n int, err error) { + if len(p) == 0 { + return 0, nil + } + select { + case <-cc.closed: + // Graceful close, EOF without any data + return 0, io.EOF + case <-cc.ctx.Done(): + // Context cancelled, send Ctrl-C/Ctrl-D + p[0] = byte(3) // Ctrl-C + if len(p) > 1 { + // Add Ctrl-D for the case Ctrl-C alone is ineffective. + // We skip this in the odd case where the buffer is too small. + p[1] = byte(4) // Ctrl-D + return 2, io.EOF + } + return 1, io.EOF + } +} + +// Close implements io.Closer. Note that we do not return an error on +// second close, not do we wait for the close to have any effect. +func (cc *CtrlCReader) Close() error { + cc.closeOnce.Do(func() { + close(cc.closed) + }) + return nil +} diff --git a/k8s/client.go b/k8s/client.go index f3edecb2f4..75d0d8ae7a 100644 --- a/k8s/client.go +++ b/k8s/client.go @@ -403,13 +403,17 @@ func (c *Client) ExecInPod(ctx context.Context, namespace, pod, container string return result.Stdout, nil } -func (c *Client) ExecInPodWithWriters(ctx context.Context, namespace, pod, container string, command []string, stdout, stderr io.Writer) error { - err := c.execInPodWithWriters(ctx, ExecParameters{ +func (c *Client) ExecInPodWithWriters(connCtx, killCmdCtx context.Context, namespace, pod, container string, command []string, stdout, stderr io.Writer) error { + execParams := ExecParameters{ Namespace: namespace, Pod: pod, Container: container, Command: command, - }, stdout, stderr) + } + if killCmdCtx != nil { + execParams.TTY = true + } + err := c.execInPodWithWriters(connCtx, killCmdCtx, execParams, stdout, stderr) if err != nil { return err } diff --git a/k8s/copy.go b/k8s/copy.go index ce223d4ab8..65704773fd 100644 --- a/k8s/copy.go +++ b/k8s/copy.go @@ -37,7 +37,7 @@ func (c *Client) CopyFromPod(ctx context.Context, namespace, pod, container stri func readFromPod(ctx context.Context, client *Client, namespace, pod, container, srcFile string) ReadFunc { return func(offset uint64, writer io.Writer) error { command := []string{"sh", "-c", fmt.Sprintf(defaultReadFromByteCmd, offset, srcFile)} - return client.execInPodWithWriters(ctx, ExecParameters{ + return client.execInPodWithWriters(ctx, nil, ExecParameters{ Namespace: namespace, Pod: pod, Container: container, diff --git a/k8s/exec.go b/k8s/exec.go index fdbbf92cca..4482656eb7 100644 --- a/k8s/exec.go +++ b/k8s/exec.go @@ -12,6 +12,8 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/remotecommand" + + "github.com/cilium/cilium-cli/internal/utils" ) type ExecResult struct { @@ -24,10 +26,11 @@ type ExecParameters struct { Pod string Container string Command []string + TTY bool // fuses stderr into stdout if 'true', needed for Ctrl-C support } -func (c *Client) execInPodWithWriters(ctx context.Context, p ExecParameters, stdout, stderr io.Writer) error { - req := c.Clientset.CoreV1().RESTClient().Post().Namespace(p.Namespace).Resource("pods").Name(p.Pod).SubResource("exec") +func (c *Client) execInPodWithWriters(connCtx, killCmdCtx context.Context, p ExecParameters, stdout, stderr io.Writer) error { + req := c.Clientset.CoreV1().RESTClient().Post().Resource("pods").Name(p.Pod).Namespace(p.Namespace).SubResource("exec") scheme := runtime.NewScheme() if err := corev1.AddToScheme(scheme); err != nil { @@ -36,26 +39,41 @@ func (c *Client) execInPodWithWriters(ctx context.Context, p ExecParameters, std parameterCodec := runtime.NewParameterCodec(scheme) - req.VersionedParams(&corev1.PodExecOptions{ + execOpts := &corev1.PodExecOptions{ Command: p.Command, Container: p.Container, + Stdin: p.TTY, Stdout: true, Stderr: true, - }, parameterCodec) + TTY: p.TTY, + } + req.VersionedParams(execOpts, parameterCodec) exec, err := remotecommand.NewSPDYExecutor(c.Config, "POST", req.URL()) if err != nil { return fmt.Errorf("error while creating executor: %w", err) } - return exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + var stdin io.ReadCloser + if p.TTY { + // CtrlCReader sends Ctrl-C/D sequence if context is cancelled + stdin = utils.NewCtrlCReader(killCmdCtx) + // Graceful close of stdin once we are done, no Ctrl-C is sent + // if execution finishes before the context expires. + defer stdin.Close() + } + + return exec.StreamWithContext(connCtx, remotecommand.StreamOptions{ + Stdin: stdin, Stdout: stdout, Stderr: stderr, + Tty: p.TTY, }) } func (c *Client) execInPod(ctx context.Context, p ExecParameters) (*ExecResult, error) { result := &ExecResult{} - err := c.execInPodWithWriters(ctx, p, &result.Stdout, &result.Stderr) + err := c.execInPodWithWriters(ctx, nil, p, &result.Stdout, &result.Stderr) + return result, err }