Skip to content

Commit

Permalink
Add string/json output string mc pipe (#5065)
Browse files Browse the repository at this point in the history
This commit adds a string message and json output to mc pipe.  The
implementation is heavily inspired by the one in mc cp, just adapted to
the different semantics of pipe.
  • Loading branch information
fennm authored Oct 20, 2024
1 parent d9b81b2 commit 65aa514
Showing 1 changed file with 38 additions and 7 deletions.
45 changes: 38 additions & 7 deletions cmd/pipe-main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2022 MinIO, Inc.
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
Expand All @@ -18,15 +18,18 @@
package cmd

import (
"fmt"
"io"
"os"
"runtime/debug"
"syscall"

"github.com/dustin/go-humanize"
"github.com/minio/cli"
json "github.com/minio/colorjson"
"github.com/minio/mc/pkg/probe"
"github.com/minio/minio-go/v7"
"github.com/minio/pkg/v3/console"
)

func defaultPartSize() string {
Expand Down Expand Up @@ -114,7 +117,28 @@ EXAMPLES:
`,
}

func pipe(ctx *cli.Context, targetURL string, encKeyDB map[string][]prefixSSEPair, meta map[string]string, quiet bool) *probe.Error {
// pipeMessage container for pipe messages
type pipeMessage struct {
Status string `json:"status"`
Target string `json:"target"`
Size int64 `json:"size"`
}

// String colorized pipe message
func (p pipeMessage) String() string {
return console.Colorize("Pipe", fmt.Sprintf("%d bytes -> `%s`", p.Size, p.Target))
}

// JSON jsonified pipe message
func (p pipeMessage) JSON() string {
p.Status = "success"
pipeMessageBytes, e := json.MarshalIndent(p, "", " ")
fatalIf(probe.NewError(e), "Unable to marshal into JSON.")

return string(pipeMessageBytes)
}

func pipe(ctx *cli.Context, targetURL string, encKeyDB map[string][]prefixSSEPair, meta map[string]string, quiet bool, json bool) *probe.Error {
// If possible increase the pipe buffer size
if e := increasePipeBufferSize(os.Stdin, ctx.Int("pipe-max-size")); e != nil {
fatalIf(probe.NewError(e), "Unable to increase custom pipe-max-size")
Expand Down Expand Up @@ -159,21 +183,26 @@ func pipe(ctx *cli.Context, targetURL string, encKeyDB map[string][]prefixSSEPai
}

var reader io.Reader
if !quiet {
if !quiet && !json {
pg := newProgressBar(0)
reader = io.TeeReader(os.Stdin, pg)
} else {
reader = os.Stdin
}

_, err := putTargetStreamWithURL(targetURL, reader, -1, opts)
n, err := putTargetStreamWithURL(targetURL, reader, -1, opts)
// TODO: See if this check is necessary.
switch e := err.ToGoError().(type) {
case *os.PathError:
if e.Err == syscall.EPIPE {
// stdin closed by the user. Gracefully exit.
return nil
}
case nil:
printMsg(pipeMessage{
Target: targetURL,
Size: n,
})
}
return err.Trace(targetURL)
}
Expand All @@ -193,8 +222,10 @@ func mainPipe(ctx *cli.Context) error {
encKeyDB, err := validateAndCreateEncryptionKeys(ctx)
fatalIf(err, "Unable to parse encryption keys.")

// globalQuiet is true for no window size to get. We just need --quiet here.
// globalQuiet is true for no window size to get.
// We just need --quiet and --json here.
quiet := ctx.Bool("quiet")
json := ctx.Bool("json")

meta := map[string]string{}
if attr := ctx.String("attr"); attr != "" {
Expand All @@ -205,12 +236,12 @@ func mainPipe(ctx *cli.Context) error {
meta["X-Amz-Tagging"] = tags
}
if len(ctx.Args()) == 0 {
err = pipe(ctx, "", nil, meta, quiet)
err = pipe(ctx, "", nil, meta, quiet, json)
fatalIf(err.Trace("stdout"), "Unable to write to one or more targets.")
} else {
// extract URLs.
URLs := ctx.Args()
err = pipe(ctx, URLs[0], encKeyDB, meta, quiet)
err = pipe(ctx, URLs[0], encKeyDB, meta, quiet, json)
fatalIf(err.Trace(URLs[0]), "Unable to write to one or more targets.")
}

Expand Down

0 comments on commit 65aa514

Please sign in to comment.