From a0aa32679f07a268175fdf17372548aae57c93ac Mon Sep 17 00:00:00 2001 From: Tam Mach Date: Fri, 24 Dec 2021 18:05:48 +1100 Subject: [PATCH] Add retry mechanism while copying files This commit is to add CopyFromPod function in current k8s client, which supports resuming copy from last byte offset. Small refactor for exec in pod method was done to avoid copying reader multiple times. Relates to https://github.com/cilium/cilium-cli/issues/616#issuecomment-990747365 Signed-off-by: Tam Mach --- internal/k8s/copy.go | 46 +++++++++++++++++++ internal/k8s/exec.go | 18 +++++--- internal/k8s/pipe.go | 82 +++++++++++++++++++++++++++++++++ internal/k8s/pipe_test.go | 96 +++++++++++++++++++++++++++++++++++++++ sysdump/client.go | 1 + sysdump/sysdump.go | 10 ++-- sysdump/sysdump_test.go | 4 ++ 7 files changed, 244 insertions(+), 13 deletions(-) create mode 100644 internal/k8s/copy.go create mode 100644 internal/k8s/pipe.go create mode 100644 internal/k8s/pipe_test.go diff --git a/internal/k8s/copy.go b/internal/k8s/copy.go new file mode 100644 index 0000000000..b0ad82d36f --- /dev/null +++ b/internal/k8s/copy.go @@ -0,0 +1,46 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright 2020 Authors of Cilium + +package k8s + +import ( + "context" + "fmt" + "io" + "os" +) + +const ( + defaultReadFromByteCmd = "tail -c+%d %s" + defaultMaxTries = 5 +) + +// CopyFromPod is to copy srcFile in a given pod to local destFile with defaultMaxTries. +func (c *Client) CopyFromPod(ctx context.Context, namespace, pod, container string, srcFile, destFile string) error { + pipe := newPipe(&CopyOptions{ + MaxTries: defaultMaxTries, + ReadFunc: readFromPod(ctx, c, namespace, pod, container, srcFile), + }) + + outFile, err := os.Create(destFile) + if err != nil { + return err + } + + if _, err = io.Copy(outFile, pipe); err != nil { + return err + } + return nil +} + +func readFromPod(ctx context.Context, client *Client, namespace, pod, container, srcFile string) ReadFunc { + return func(offset uint64, writer io.Writer) error { + command := []string{"sh", "-c", fmt.Sprintf(defaultReadFromByteCmd, offset, srcFile)} + return client.execInPodWithWriters(ctx, ExecParameters{ + Namespace: namespace, + Pod: pod, + Container: container, + Command: command, + }, writer, writer) + } +} diff --git a/internal/k8s/exec.go b/internal/k8s/exec.go index d96b702225..099a067090 100644 --- a/internal/k8s/exec.go +++ b/internal/k8s/exec.go @@ -28,12 +28,12 @@ type ExecParameters struct { TTY bool // fuses stderr into stdout if 'true', needed for Ctrl-C support } -func (c *Client) execInPod(ctx context.Context, p ExecParameters) (*ExecResult, error) { +func (c *Client) execInPodWithWriters(ctx context.Context, p ExecParameters, stdout, stderr io.Writer) error { req := c.Clientset.CoreV1().RESTClient().Post().Resource("pods").Name(p.Pod).Namespace(p.Namespace).SubResource("exec") scheme := runtime.NewScheme() if err := corev1.AddToScheme(scheme); err != nil { - return nil, fmt.Errorf("error adding to scheme: %w", err) + return fmt.Errorf("error adding to scheme: %w", err) } parameterCodec := runtime.NewParameterCodec(scheme) @@ -49,9 +49,8 @@ func (c *Client) execInPod(ctx context.Context, p ExecParameters) (*ExecResult, exec, err := remotecommand.NewSPDYExecutor(c.Config, "POST", req.URL()) if err != nil { - return nil, fmt.Errorf("error while creating executor: %w", err) + return fmt.Errorf("error while creating executor: %w", err) } - result := &ExecResult{} var stdin io.ReadCloser if p.TTY { @@ -64,11 +63,18 @@ func (c *Client) execInPod(ctx context.Context, p ExecParameters) (*ExecResult, err = exec.Stream(remotecommand.StreamOptions{ Stdin: stdin, - Stdout: &result.Stdout, - Stderr: &result.Stderr, + Stdout: stdout, + Stderr: stderr, Tty: p.TTY, }) + return err +} + +func (c *Client) execInPod(ctx context.Context, p ExecParameters) (*ExecResult, error) { + result := &ExecResult{} + err := c.execInPodWithWriters(ctx, p, &result.Stdout, &result.Stderr) + // TTY support may introduce "\r\n" sequences as line separators. // Replace them with "\n" to allow callers to not care. if p.TTY && bytes.Contains(result.Stdout.Bytes(), []byte("\r\n")) { diff --git a/internal/k8s/pipe.go b/internal/k8s/pipe.go new file mode 100644 index 0000000000..b190719326 --- /dev/null +++ b/internal/k8s/pipe.go @@ -0,0 +1,82 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright 2020 Authors of Cilium + +package k8s + +import ( + "errors" + "fmt" + "io" +) + +// CopyOptions have the data required to perform the copy operation +type CopyOptions struct { + // Maximum number of retries, -1 for unlimited retries. + MaxTries int + + // ReaderFunc is the actual implementation for reading file content + ReadFunc ReadFunc +} + +// ReadFunc function is to support reading content from given offset till EOF. +// The content will be written to io.Writer. +type ReadFunc func(offset uint64, writer io.Writer) error + +// CopyPipe struct is simple implementation to support copy files with retry. +type CopyPipe struct { + Options *CopyOptions + + Reader *io.PipeReader + Writer *io.PipeWriter + + bytesRead uint64 + retries int +} + +func newPipe(option *CopyOptions) *CopyPipe { + p := new(CopyPipe) + p.Options = option + p.startReadFrom(0) + return p +} + +func (t *CopyPipe) startReadFrom(offset uint64) { + t.Reader, t.Writer = io.Pipe() + go func() { + var err error + defer func() { + // close with error here to make sure any read operation with Pipe Reader will have return the same error + // otherwise, by default, EOF will be returned. + _ = t.Writer.CloseWithError(err) + }() + err = t.Options.ReadFunc(offset, t.Writer) + }() +} + +// Read function is to satisfy io.Reader interface. +// This is simple implementation to support resuming copy in case of there is any temporary issue (e.g. networking) +func (t *CopyPipe) Read(p []byte) (int, error) { + n, err := t.Reader.Read(p) + if err != nil { + // If EOF error happens, just bubble it up, no retry is required. + if errors.Is(err, io.EOF) { + return n, err + } + + // Check if the number of retries is already exhausted + if t.Options.MaxTries >= 0 && t.retries >= t.Options.MaxTries { + return n, fmt.Errorf("dropping out copy after %d retries: %w", t.retries, err) + } + + // Perform retry + if t.bytesRead == 0 { + t.startReadFrom(t.bytesRead) + } else { + t.startReadFrom(t.bytesRead + 1) + } + t.retries++ + return 0, nil + } + t.bytesRead += uint64(n) + return n, err +} diff --git a/internal/k8s/pipe_test.go b/internal/k8s/pipe_test.go new file mode 100644 index 0000000000..9bc3107858 --- /dev/null +++ b/internal/k8s/pipe_test.go @@ -0,0 +1,96 @@ +package k8s + +import ( + "bytes" + "io" + "testing" + + "gopkg.in/check.v1" +) + +func Test(t *testing.T) { + check.TestingT(t) +} + +type CopyPipeSuites struct{} + +var _ = check.Suite(&CopyPipeSuites{}) + +type remoteFile struct { + bytes []byte + + maxFailures int + count int +} + +func (r *remoteFile) Read(offset uint64, writer io.Writer) error { + if int(offset) > len(r.bytes) { + return io.EOF + } + _, err := writer.Write(r.bytes[offset:]) + return err +} + +func (r *remoteFile) ReadWithFailure(offset uint64, writer io.Writer) error { + if int(offset) > len(r.bytes) { + return io.EOF + } + if r.count < r.maxFailures { + r.count++ + return io.ErrUnexpectedEOF + } + + _, err := writer.Write(r.bytes[offset:]) + return err + +} + +func (b *CopyPipeSuites) TestCopyWithoutRetry(c *check.C) { + remoteFile := &remoteFile{ + bytes: []byte{1, 2, 3}, + } + + pipe := newPipe(&CopyOptions{ + ReadFunc: remoteFile.Read, + }) + + res := &bytes.Buffer{} + _, err := io.Copy(res, pipe) + c.Assert(err, check.IsNil) + c.Assert(res.Bytes(), check.DeepEquals, remoteFile.bytes) +} + +func (b *CopyPipeSuites) TestCopyWithRetry(c *check.C) { + remoteFile := &remoteFile{ + bytes: []byte{1, 2, 3}, + maxFailures: 2, + } + + pipe := newPipe(&CopyOptions{ + ReadFunc: remoteFile.ReadWithFailure, + MaxTries: 3, + }) + + res := &bytes.Buffer{} + _, err := io.Copy(res, pipe) + c.Assert(err, check.IsNil) + c.Assert(res.Bytes(), check.DeepEquals, remoteFile.bytes) +} + +func (b *CopyPipeSuites) TestCopyWithExhaustedRetry(c *check.C) { + remoteFile := &remoteFile{ + bytes: []byte{1, 2, 3}, + maxFailures: 3, + } + + pipe := newPipe(&CopyOptions{ + ReadFunc: remoteFile.ReadWithFailure, + MaxTries: 2, + }) + + res := &bytes.Buffer{} + _, err := io.Copy(res, pipe) + c.Assert(err, check.NotNil) + c.Assert(err, check.ErrorMatches, "dropping out copy after 2 retries: unexpected EOF") + c.Assert(res.Bytes(), check.HasLen, 0) +} diff --git a/sysdump/client.go b/sysdump/client.go index 016ef4bab6..4a4a194430 100644 --- a/sysdump/client.go +++ b/sysdump/client.go @@ -21,6 +21,7 @@ import ( type KubernetesClient interface { AutodetectFlavor(ctx context.Context) (f k8s.Flavor, err error) + CopyFromPod(ctx context.Context, namespace, pod, container string, fromFile, destFile string) error ExecInPod(ctx context.Context, namespace, pod, container string, command []string) (bytes.Buffer, error) ExecInPodWithStderr(ctx context.Context, namespace, pod, container string, command []string) (bytes.Buffer, bytes.Buffer, error) GetConfigMap(ctx context.Context, namespace, name string, opts metav1.GetOptions) (*corev1.ConfigMap, error) diff --git a/sysdump/sysdump.go b/sysdump/sysdump.go index ef630cdae4..fd80107860 100644 --- a/sysdump/sysdump.go +++ b/sysdump/sysdump.go @@ -907,14 +907,10 @@ func (c *Collector) submitBugtoolTasks(ctx context.Context, pods []*corev1.Pod, } tarGzFile := m[1] + ".gz" - // Grab the resulting archive's contents from the pod. - b, err := c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{"cat", tarGzFile}) - if err != nil { - return fmt.Errorf("failed to collect 'cilium-bugtool' output for %q: %w", p.Name, err) - } - + // Dump the resulting file's contents to the temporary directory. f := c.AbsoluteTempPath(fmt.Sprintf(ciliumBugtoolFileName, p.Name)) - if err := writeBytes(f, b.Bytes()); err != nil { + err = c.Client.CopyFromPod(ctx, p.Namespace, p.Name, containerName, tarGzFile, f) + if err != nil { return fmt.Errorf("failed to collect 'cilium-bugtool' output for %q: %w", p.Name, err) } // Untar the resulting file. diff --git a/sysdump/sysdump_test.go b/sysdump/sysdump_test.go index 125133c2da..4225c617e3 100644 --- a/sysdump/sysdump_test.go +++ b/sysdump/sysdump_test.go @@ -102,6 +102,10 @@ type fakeClient struct { nodeList *corev1.NodeList } +func (c *fakeClient) CopyFromPod(ctx context.Context, namespace, pod, container string, fromFile, destFile string) error { + panic("implement me") +} + func (c *fakeClient) AutodetectFlavor(ctx context.Context) (f k8s.Flavor, err error) { panic("implement me") }