Skip to content

Commit

Permalink
Add retry mechanism while copying files
Browse files Browse the repository at this point in the history
This commit is to add CopyFromPod function in current k8s client, which
supports resuming copy from last byte offset. Small refactor for exec in pod
method was done to avoid copying reader multiple times.

Relates to #616 (comment)

Signed-off-by: Tam Mach <[email protected]>
  • Loading branch information
sayboras committed Dec 24, 2021
1 parent 2ea7f4d commit a0aa326
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 13 deletions.
46 changes: 46 additions & 0 deletions internal/k8s/copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2020 Authors of Cilium

package k8s

import (
"context"
"fmt"
"io"
"os"
)

const (
defaultReadFromByteCmd = "tail -c+%d %s"
defaultMaxTries = 5
)

// CopyFromPod is to copy srcFile in a given pod to local destFile with defaultMaxTries.
func (c *Client) CopyFromPod(ctx context.Context, namespace, pod, container string, srcFile, destFile string) error {
pipe := newPipe(&CopyOptions{
MaxTries: defaultMaxTries,
ReadFunc: readFromPod(ctx, c, namespace, pod, container, srcFile),
})

outFile, err := os.Create(destFile)
if err != nil {
return err
}

if _, err = io.Copy(outFile, pipe); err != nil {
return err
}
return nil
}

func readFromPod(ctx context.Context, client *Client, namespace, pod, container, srcFile string) ReadFunc {
return func(offset uint64, writer io.Writer) error {
command := []string{"sh", "-c", fmt.Sprintf(defaultReadFromByteCmd, offset, srcFile)}
return client.execInPodWithWriters(ctx, ExecParameters{
Namespace: namespace,
Pod: pod,
Container: container,
Command: command,
}, writer, writer)
}
}
18 changes: 12 additions & 6 deletions internal/k8s/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ type ExecParameters struct {
TTY bool // fuses stderr into stdout if 'true', needed for Ctrl-C support
}

func (c *Client) execInPod(ctx context.Context, p ExecParameters) (*ExecResult, error) {
func (c *Client) execInPodWithWriters(ctx context.Context, p ExecParameters, stdout, stderr io.Writer) error {
req := c.Clientset.CoreV1().RESTClient().Post().Resource("pods").Name(p.Pod).Namespace(p.Namespace).SubResource("exec")

scheme := runtime.NewScheme()
if err := corev1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("error adding to scheme: %w", err)
return fmt.Errorf("error adding to scheme: %w", err)
}

parameterCodec := runtime.NewParameterCodec(scheme)
Expand All @@ -49,9 +49,8 @@ func (c *Client) execInPod(ctx context.Context, p ExecParameters) (*ExecResult,

exec, err := remotecommand.NewSPDYExecutor(c.Config, "POST", req.URL())
if err != nil {
return nil, fmt.Errorf("error while creating executor: %w", err)
return fmt.Errorf("error while creating executor: %w", err)
}
result := &ExecResult{}

var stdin io.ReadCloser
if p.TTY {
Expand All @@ -64,11 +63,18 @@ func (c *Client) execInPod(ctx context.Context, p ExecParameters) (*ExecResult,

err = exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: &result.Stdout,
Stderr: &result.Stderr,
Stdout: stdout,
Stderr: stderr,
Tty: p.TTY,
})

return err
}

func (c *Client) execInPod(ctx context.Context, p ExecParameters) (*ExecResult, error) {
result := &ExecResult{}
err := c.execInPodWithWriters(ctx, p, &result.Stdout, &result.Stderr)

// TTY support may introduce "\r\n" sequences as line separators.
// Replace them with "\n" to allow callers to not care.
if p.TTY && bytes.Contains(result.Stdout.Bytes(), []byte("\r\n")) {
Expand Down
82 changes: 82 additions & 0 deletions internal/k8s/pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2020 Authors of Cilium

package k8s

import (
"errors"
"fmt"
"io"
)

// CopyOptions have the data required to perform the copy operation
type CopyOptions struct {
// Maximum number of retries, -1 for unlimited retries.
MaxTries int

// ReaderFunc is the actual implementation for reading file content
ReadFunc ReadFunc
}

// ReadFunc function is to support reading content from given offset till EOF.
// The content will be written to io.Writer.
type ReadFunc func(offset uint64, writer io.Writer) error

// CopyPipe struct is simple implementation to support copy files with retry.
type CopyPipe struct {
Options *CopyOptions

Reader *io.PipeReader
Writer *io.PipeWriter

bytesRead uint64
retries int
}

func newPipe(option *CopyOptions) *CopyPipe {
p := new(CopyPipe)
p.Options = option
p.startReadFrom(0)
return p
}

func (t *CopyPipe) startReadFrom(offset uint64) {
t.Reader, t.Writer = io.Pipe()
go func() {
var err error
defer func() {
// close with error here to make sure any read operation with Pipe Reader will have return the same error
// otherwise, by default, EOF will be returned.
_ = t.Writer.CloseWithError(err)
}()
err = t.Options.ReadFunc(offset, t.Writer)
}()
}

// Read function is to satisfy io.Reader interface.
// This is simple implementation to support resuming copy in case of there is any temporary issue (e.g. networking)
func (t *CopyPipe) Read(p []byte) (int, error) {
n, err := t.Reader.Read(p)
if err != nil {
// If EOF error happens, just bubble it up, no retry is required.
if errors.Is(err, io.EOF) {
return n, err
}

// Check if the number of retries is already exhausted
if t.Options.MaxTries >= 0 && t.retries >= t.Options.MaxTries {
return n, fmt.Errorf("dropping out copy after %d retries: %w", t.retries, err)
}

// Perform retry
if t.bytesRead == 0 {
t.startReadFrom(t.bytesRead)
} else {
t.startReadFrom(t.bytesRead + 1)
}
t.retries++
return 0, nil
}
t.bytesRead += uint64(n)
return n, err
}
96 changes: 96 additions & 0 deletions internal/k8s/pipe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package k8s

import (
"bytes"
"io"
"testing"

"gopkg.in/check.v1"
)

func Test(t *testing.T) {
check.TestingT(t)
}

type CopyPipeSuites struct{}

var _ = check.Suite(&CopyPipeSuites{})

type remoteFile struct {
bytes []byte

maxFailures int
count int
}

func (r *remoteFile) Read(offset uint64, writer io.Writer) error {
if int(offset) > len(r.bytes) {
return io.EOF
}
_, err := writer.Write(r.bytes[offset:])
return err
}

func (r *remoteFile) ReadWithFailure(offset uint64, writer io.Writer) error {
if int(offset) > len(r.bytes) {
return io.EOF
}
if r.count < r.maxFailures {
r.count++
return io.ErrUnexpectedEOF
}

_, err := writer.Write(r.bytes[offset:])
return err

}

func (b *CopyPipeSuites) TestCopyWithoutRetry(c *check.C) {
remoteFile := &remoteFile{
bytes: []byte{1, 2, 3},
}

pipe := newPipe(&CopyOptions{
ReadFunc: remoteFile.Read,
})

res := &bytes.Buffer{}
_, err := io.Copy(res, pipe)
c.Assert(err, check.IsNil)
c.Assert(res.Bytes(), check.DeepEquals, remoteFile.bytes)
}

func (b *CopyPipeSuites) TestCopyWithRetry(c *check.C) {
remoteFile := &remoteFile{
bytes: []byte{1, 2, 3},
maxFailures: 2,
}

pipe := newPipe(&CopyOptions{
ReadFunc: remoteFile.ReadWithFailure,
MaxTries: 3,
})

res := &bytes.Buffer{}
_, err := io.Copy(res, pipe)
c.Assert(err, check.IsNil)
c.Assert(res.Bytes(), check.DeepEquals, remoteFile.bytes)
}

func (b *CopyPipeSuites) TestCopyWithExhaustedRetry(c *check.C) {
remoteFile := &remoteFile{
bytes: []byte{1, 2, 3},
maxFailures: 3,
}

pipe := newPipe(&CopyOptions{
ReadFunc: remoteFile.ReadWithFailure,
MaxTries: 2,
})

res := &bytes.Buffer{}
_, err := io.Copy(res, pipe)
c.Assert(err, check.NotNil)
c.Assert(err, check.ErrorMatches, "dropping out copy after 2 retries: unexpected EOF")
c.Assert(res.Bytes(), check.HasLen, 0)
}
1 change: 1 addition & 0 deletions sysdump/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

type KubernetesClient interface {
AutodetectFlavor(ctx context.Context) (f k8s.Flavor, err error)
CopyFromPod(ctx context.Context, namespace, pod, container string, fromFile, destFile string) error
ExecInPod(ctx context.Context, namespace, pod, container string, command []string) (bytes.Buffer, error)
ExecInPodWithStderr(ctx context.Context, namespace, pod, container string, command []string) (bytes.Buffer, bytes.Buffer, error)
GetConfigMap(ctx context.Context, namespace, name string, opts metav1.GetOptions) (*corev1.ConfigMap, error)
Expand Down
10 changes: 3 additions & 7 deletions sysdump/sysdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,14 +907,10 @@ func (c *Collector) submitBugtoolTasks(ctx context.Context, pods []*corev1.Pod,
}
tarGzFile := m[1] + ".gz"

// Grab the resulting archive's contents from the pod.
b, err := c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{"cat", tarGzFile})
if err != nil {
return fmt.Errorf("failed to collect 'cilium-bugtool' output for %q: %w", p.Name, err)
}

// Dump the resulting file's contents to the temporary directory.
f := c.AbsoluteTempPath(fmt.Sprintf(ciliumBugtoolFileName, p.Name))
if err := writeBytes(f, b.Bytes()); err != nil {
err = c.Client.CopyFromPod(ctx, p.Namespace, p.Name, containerName, tarGzFile, f)
if err != nil {
return fmt.Errorf("failed to collect 'cilium-bugtool' output for %q: %w", p.Name, err)
}
// Untar the resulting file.
Expand Down
4 changes: 4 additions & 0 deletions sysdump/sysdump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ type fakeClient struct {
nodeList *corev1.NodeList
}

func (c *fakeClient) CopyFromPod(ctx context.Context, namespace, pod, container string, fromFile, destFile string) error {
panic("implement me")
}

func (c *fakeClient) AutodetectFlavor(ctx context.Context) (f k8s.Flavor, err error) {
panic("implement me")
}
Expand Down

0 comments on commit a0aa326

Please sign in to comment.