Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugtool: Compress and copy with resume #671

Merged
merged 2 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions internal/k8s/copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2020 Authors of Cilium
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The copyright year should be 2021. This also applies to other files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad :(


package k8s

import (
"context"
"fmt"
"io"
"os"
)

const (
defaultReadFromByteCmd = "tail -c+%d %s"
defaultMaxTries = 5
)

// CopyFromPod is to copy srcFile in a given pod to local destFile with defaultMaxTries.
func (c *Client) CopyFromPod(ctx context.Context, namespace, pod, container string, srcFile, destFile string) error {
pipe := newPipe(&CopyOptions{
MaxTries: defaultMaxTries,
ReadFunc: readFromPod(ctx, c, namespace, pod, container, srcFile),
})

outFile, err := os.Create(destFile)
if err != nil {
return err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is missing a defer outFile.Close() or similar.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, I forgot about this most of the time, and relying on linters. If I am not wrong, file is having finalizer i.e. fd will be garbage collected later, not idea in case of reuse of file descriptor.

Let me make a small update on this, thanks again.


if _, err = io.Copy(outFile, pipe); err != nil {
return err
}
return nil
}

func readFromPod(ctx context.Context, client *Client, namespace, pod, container, srcFile string) ReadFunc {
return func(offset uint64, writer io.Writer) error {
command := []string{"sh", "-c", fmt.Sprintf(defaultReadFromByteCmd, offset, srcFile)}
return client.execInPodWithWriters(ctx, ExecParameters{
Namespace: namespace,
Pod: pod,
Container: container,
Command: command,
}, writer, writer)
}
}
18 changes: 12 additions & 6 deletions internal/k8s/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ type ExecParameters struct {
TTY bool // fuses stderr into stdout if 'true', needed for Ctrl-C support
}

func (c *Client) execInPod(ctx context.Context, p ExecParameters) (*ExecResult, error) {
func (c *Client) execInPodWithWriters(ctx context.Context, p ExecParameters, stdout, stderr io.Writer) error {
req := c.Clientset.CoreV1().RESTClient().Post().Resource("pods").Name(p.Pod).Namespace(p.Namespace).SubResource("exec")

scheme := runtime.NewScheme()
if err := corev1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("error adding to scheme: %w", err)
return fmt.Errorf("error adding to scheme: %w", err)
}

parameterCodec := runtime.NewParameterCodec(scheme)
Expand All @@ -49,9 +49,8 @@ func (c *Client) execInPod(ctx context.Context, p ExecParameters) (*ExecResult,

exec, err := remotecommand.NewSPDYExecutor(c.Config, "POST", req.URL())
if err != nil {
return nil, fmt.Errorf("error while creating executor: %w", err)
return fmt.Errorf("error while creating executor: %w", err)
}
result := &ExecResult{}

var stdin io.ReadCloser
if p.TTY {
Expand All @@ -64,11 +63,18 @@ func (c *Client) execInPod(ctx context.Context, p ExecParameters) (*ExecResult,

err = exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: &result.Stdout,
Stderr: &result.Stderr,
Stdout: stdout,
Stderr: stderr,
Tty: p.TTY,
})

return err
}

func (c *Client) execInPod(ctx context.Context, p ExecParameters) (*ExecResult, error) {
result := &ExecResult{}
err := c.execInPodWithWriters(ctx, p, &result.Stdout, &result.Stderr)

// TTY support may introduce "\r\n" sequences as line separators.
// Replace them with "\n" to allow callers to not care.
if p.TTY && bytes.Contains(result.Stdout.Bytes(), []byte("\r\n")) {
Expand Down
82 changes: 82 additions & 0 deletions internal/k8s/pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2020 Authors of Cilium

package k8s

import (
"errors"
"fmt"
"io"
)

// CopyOptions have the data required to perform the copy operation
type CopyOptions struct {
// Maximum number of retries, -1 for unlimited retries.
MaxTries int

// ReaderFunc is the actual implementation for reading file content
ReadFunc ReadFunc
}

// ReadFunc function is to support reading content from given offset till EOF.
// The content will be written to io.Writer.
type ReadFunc func(offset uint64, writer io.Writer) error

// CopyPipe struct is simple implementation to support copy files with retry.
type CopyPipe struct {
Options *CopyOptions

Reader *io.PipeReader
Writer *io.PipeWriter

bytesRead uint64
retries int
}

func newPipe(option *CopyOptions) *CopyPipe {
p := new(CopyPipe)
p.Options = option
p.startReadFrom(0)
return p
}

func (t *CopyPipe) startReadFrom(offset uint64) {
t.Reader, t.Writer = io.Pipe()
go func() {
var err error
defer func() {
// close with error here to make sure any read operation with Pipe Reader will have return the same error
// otherwise, by default, EOF will be returned.
_ = t.Writer.CloseWithError(err)
}()
err = t.Options.ReadFunc(offset, t.Writer)
}()
}

// Read function is to satisfy io.Reader interface.
// This is simple implementation to support resuming copy in case of there is any temporary issue (e.g. networking)
func (t *CopyPipe) Read(p []byte) (int, error) {
n, err := t.Reader.Read(p)
if err != nil {
// If EOF error happens, just bubble it up, no retry is required.
if errors.Is(err, io.EOF) {
return n, err
}

// Check if the number of retries is already exhausted
if t.Options.MaxTries >= 0 && t.retries >= t.Options.MaxTries {
return n, fmt.Errorf("dropping out copy after %d retries: %w", t.retries, err)
}

// Perform retry
if t.bytesRead == 0 {
t.startReadFrom(t.bytesRead)
} else {
t.startReadFrom(t.bytesRead + 1)
}
t.retries++
return 0, nil
}
t.bytesRead += uint64(n)
return n, err
}
96 changes: 96 additions & 0 deletions internal/k8s/pipe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package k8s

import (
"bytes"
"io"
"testing"

"gopkg.in/check.v1"
)

func Test(t *testing.T) {
check.TestingT(t)
}

type CopyPipeSuites struct{}

var _ = check.Suite(&CopyPipeSuites{})

type remoteFile struct {
bytes []byte

maxFailures int
count int
}

func (r *remoteFile) Read(offset uint64, writer io.Writer) error {
if int(offset) > len(r.bytes) {
return io.EOF
}
_, err := writer.Write(r.bytes[offset:])
return err
}

func (r *remoteFile) ReadWithFailure(offset uint64, writer io.Writer) error {
if int(offset) > len(r.bytes) {
return io.EOF
}
if r.count < r.maxFailures {
r.count++
return io.ErrUnexpectedEOF
}

_, err := writer.Write(r.bytes[offset:])
return err

}

func (b *CopyPipeSuites) TestCopyWithoutRetry(c *check.C) {
remoteFile := &remoteFile{
bytes: []byte{1, 2, 3},
}

pipe := newPipe(&CopyOptions{
ReadFunc: remoteFile.Read,
})

res := &bytes.Buffer{}
_, err := io.Copy(res, pipe)
c.Assert(err, check.IsNil)
c.Assert(res.Bytes(), check.DeepEquals, remoteFile.bytes)
}

func (b *CopyPipeSuites) TestCopyWithRetry(c *check.C) {
remoteFile := &remoteFile{
bytes: []byte{1, 2, 3},
maxFailures: 2,
}

pipe := newPipe(&CopyOptions{
ReadFunc: remoteFile.ReadWithFailure,
MaxTries: 3,
})

res := &bytes.Buffer{}
_, err := io.Copy(res, pipe)
c.Assert(err, check.IsNil)
c.Assert(res.Bytes(), check.DeepEquals, remoteFile.bytes)
}

func (b *CopyPipeSuites) TestCopyWithExhaustedRetry(c *check.C) {
remoteFile := &remoteFile{
bytes: []byte{1, 2, 3},
maxFailures: 3,
}

pipe := newPipe(&CopyOptions{
ReadFunc: remoteFile.ReadWithFailure,
MaxTries: 2,
})

res := &bytes.Buffer{}
_, err := io.Copy(res, pipe)
c.Assert(err, check.NotNil)
c.Assert(err, check.ErrorMatches, "dropping out copy after 2 retries: unexpected EOF")
c.Assert(res.Bytes(), check.HasLen, 0)
}
1 change: 1 addition & 0 deletions sysdump/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

type KubernetesClient interface {
AutodetectFlavor(ctx context.Context) (f k8s.Flavor, err error)
CopyFromPod(ctx context.Context, namespace, pod, container string, fromFile, destFile string) error
ExecInPod(ctx context.Context, namespace, pod, container string, command []string) (bytes.Buffer, error)
ExecInPodWithStderr(ctx context.Context, namespace, pod, container string, command []string) (bytes.Buffer, bytes.Buffer, error)
GetConfigMap(ctx context.Context, namespace, name string, opts metav1.GetOptions) (*corev1.ConfigMap, error)
Expand Down
2 changes: 1 addition & 1 deletion sysdump/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (

const (
awsNodeDaemonSetFileName = "aws-node-daemonset-<ts>.yaml"
ciliumBugtoolFileName = "cilium-bugtool-%s-<ts>.tar"
ciliumBugtoolFileName = "cilium-bugtool-%s-<ts>.tar.gz"
ciliumClusterWideNetworkPoliciesFileName = "ciliumclusterwidenetworkpolicies-<ts>.yaml"
ciliumConfigMapFileName = "cilium-configmap-<ts>.yaml"
ciliumDaemonSetFileName = "cilium-daemonset-<ts>.yaml"
Expand Down
24 changes: 15 additions & 9 deletions sysdump/sysdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/cilium/cilium-cli/internal/utils"

"github.com/cilium/workerpool"
archiver "github.com/mholt/archiver/v3"
"github.com/mholt/archiver/v3"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -899,21 +899,27 @@ func (c *Collector) submitBugtoolTasks(ctx context.Context, pods []*corev1.Pod,
if len(m) != 2 || len(m[1]) == 0 {
return fmt.Errorf("failed to collect 'cilium-bugtool' output for %q in namespace %q: output doesn't contain archive name: %s", p.Name, p.Namespace, outString)
}
// Grab the resulting archive's contents from the pod.
b, err := c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{"cat", m[1]})

// Gzip bugtool tar file to reduce the size of transferred file
o, e, err = c.Client.ExecInPodWithStderr(ctx, p.Namespace, p.Name, containerName, []string{"gzip", m[1]})
tklauser marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to collect 'cilium-bugtool' output for %q: %w", p.Name, err)
return fmt.Errorf("failed to compress 'cilium-bugtool' output for %q in namespace %q: %w: %s", p.Name, p.Namespace, err, e.String())
}
tarGzFile := m[1] + ".gz"

// Dump the resulting file's contents to the temporary directory.
f := c.AbsoluteTempPath(fmt.Sprintf(ciliumBugtoolFileName, p.Name))
if err := writeBytes(f, b.Bytes()); err != nil {
err = c.Client.CopyFromPod(ctx, p.Namespace, p.Name, containerName, tarGzFile, f)
if err != nil {
return fmt.Errorf("failed to collect 'cilium-bugtool' output for %q: %w", p.Name, err)
}
// Untar the resulting file.
t := archiver.Tar{
StripComponents: 1,
t := archiver.TarGz{
Tar: &archiver.Tar{
StripComponents: 1,
},
}
if err := t.Unarchive(f, strings.Replace(f, ".tar", "", -1)); err != nil {
if err := t.Unarchive(f, strings.Replace(f, ".tar.gz", "", -1)); err != nil {
c.logWarn("Failed to unarchive 'cilium-bugtool' output for %q: %v", p.Name, err)
return nil
}
Expand All @@ -923,7 +929,7 @@ func (c *Collector) submitBugtoolTasks(ctx context.Context, pods []*corev1.Pod,
return nil
}
// Remove the file from the pod.
if _, err = c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{rmCommand, m[1]}); err != nil {
if _, err = c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{rmCommand, tarGzFile}); err != nil {
c.logWarn("Failed to delete 'cilium-bugtool' output from pod %q in namespace %q: %w", p.Name, p.Namespace, err)
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions sysdump/sysdump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ type fakeClient struct {
nodeList *corev1.NodeList
}

func (c *fakeClient) CopyFromPod(ctx context.Context, namespace, pod, container string, fromFile, destFile string) error {
panic("implement me")
}

func (c *fakeClient) AutodetectFlavor(ctx context.Context) (f k8s.Flavor, err error) {
panic("implement me")
}
Expand Down