Skip to content

Commit

Permalink
Add multipart handling commands
Browse files Browse the repository at this point in the history
Adding command to list active multipart uploads.
Adding command to abort active multipart upload.
  • Loading branch information
voyvodov committed Sep 11, 2024
1 parent c280956 commit f07aa14
Show file tree
Hide file tree
Showing 11 changed files with 827 additions and 49 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ storage services and local filesystems.
- Print object contents to stdout
- Select JSON records from objects using SQL expressions
- Create or remove buckets
- List or abort multipart uploads
- Summarize objects sizes, grouping by storage class
- Wildcard support for all operations
- Multiple arguments support for delete operation
Expand Down
90 changes: 90 additions & 0 deletions command/abortmp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package command

import (
"fmt"

"github.com/peak/s5cmd/v2/storage"
"github.com/peak/s5cmd/v2/storage/url"
"github.com/urfave/cli/v2"
)

var abortmpHelpTemplate = `Name:
{{.HelpName}} - {{.Usage}}
Usage:
{{.HelpName}} [options] object-path upload-id
Options:
{{range .VisibleFlags}}{{.}}
{{end}}
Examples:
1. Abort multipart upload
> s5cmd {{.HelpName}} s3://bucket/object 01000191-daf9-7547-5278-71bd81953ffe
`

func NewAbortMultipartCommand() *cli.Command {
cmd := &cli.Command{
Name: "abortmp",
HelpName: "abortmp",
Usage: "abort multipart uploads",
CustomHelpTemplate: abortmpHelpTemplate,
Flags: []cli.Flag{},
Before: func(c *cli.Context) error {
err := validateAbortMultipartCommand(c)
if err != nil {
printError(commandFromContext(c), c.Command.Name, err)
}
return err
},
Action: func(c *cli.Context) (err error) {

// var merror error

fullCommand := commandFromContext(c)

objurl, err := url.New(c.Args().First())
if err != nil {
printError(fullCommand, c.Command.Name, err)
return err
}
uploadID := c.Args().Get(1)

client, err := storage.NewRemoteClient(c.Context, objurl, NewStorageOpts(c))
if err != nil {
printError(fullCommand, c.Command.Name, err)
return err
}

err = client.AbortMultipartUpload(c.Context, objurl, uploadID)
if err != nil && err != storage.ErrNoObjectFound {
printError(fullCommand, c.Command.Name, err)
return err
}

return nil
},
}

cmd.BashComplete = getBashCompleteFn(cmd, false, false)
return cmd
}

func validateAbortMultipartCommand(c *cli.Context) error {
if c.Args().Len() != 2 {
return fmt.Errorf("expected object path and upload id arguments")
}

objectPath := c.Args().Get(0)
uploadID := c.Args().Get(1)

_, err := url.New(objectPath)
if err != nil {
return err
}

if uploadID == "" {
return fmt.Errorf("expected upload id, got empty string")
}

return nil
}
3 changes: 3 additions & 0 deletions command/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ func Commands() []*cli.Command {
NewDeleteCommand(),
NewMoveCommand(),
NewMakeBucketCommand(),
NewAbortMultipartCommand(),
NewListMultipartCommand(),
NewMultipartPartsCommand(),
NewRemoveBucketCommand(),
NewSelectCommand(),
NewSizeCommand(),
Expand Down
4 changes: 3 additions & 1 deletion command/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,9 @@ func (c Copy) Run(ctx context.Context) error {
waiter.Wait()
<-errDoneCh

return multierror.Append(merrorWaiter, merrorObjects).ErrorOrNil()
err = multierror.Append(merrorWaiter, merrorObjects).ErrorOrNil()

return handleMultipartError(c.fullCommand, c.op, err)
}

func (c Copy) prepareCopyTask(
Expand Down
27 changes: 27 additions & 0 deletions command/error.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package command

import (
"errors"
"fmt"
"strings"

"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/hashicorp/go-multierror"

errorpkg "github.com/peak/s5cmd/v2/error"
Expand Down Expand Up @@ -94,3 +96,28 @@ func cleanupError(err error) string {
s = strings.TrimSpace(s)
return s
}

func handleMultipartError(command, op string, err error) error {
var pkgErr *errorpkg.Error
if err == nil {
return nil
}

if multiErr, ok := err.(*multierror.Error); ok {
for _, merr := range multiErr.Errors {
if errors.As(merr, &pkgErr) {
if awsErr, ok := pkgErr.Err.(s3manager.MultiUploadFailure); ok {
printError(command, op, fmt.Errorf("multipart upload fail. To resume use the following id: %s", awsErr.UploadID()))
}
}
}
} else {
if errors.As(err, &pkgErr) {
if awsErr, ok := pkgErr.Err.(s3manager.MultiUploadFailure); ok {
printError(command, op, fmt.Errorf("multipart upload fail. To resume use the following id: %s", awsErr.UploadID()))
}
}
}

return err
}
134 changes: 134 additions & 0 deletions command/lsmp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package command

import (
"fmt"

"github.com/hashicorp/go-multierror"
"github.com/peak/s5cmd/v2/log"
"github.com/peak/s5cmd/v2/storage"
"github.com/peak/s5cmd/v2/storage/url"
"github.com/peak/s5cmd/v2/strutil"
"github.com/urfave/cli/v2"
)

var lsmpHelpTemplate = `Name:
{{.HelpName}} - {{.Usage}}
Usage:
{{.HelpName}} [options] prefix
Options:
{{range .VisibleFlags}}{{.}}
{{end}}
Examples:
1. List multipart uploads for bucket
> s5cmd {{.HelpName}} s3://bucket
2. List multipart uploads for specific object
> s5cmd {{.HelpName}} s3://bucket/object
3. List multipart uploads with full path to the object
> s5cmd {{.HelpName}} --show-fullpath s3://bucket/object
`

func NewListMultipartCommand() *cli.Command {
cmd := &cli.Command{
Name: "lsmp",
HelpName: "lsmp",
Usage: "list multipart uploads",
CustomHelpTemplate: lsmpHelpTemplate,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "show-fullpath",
Usage: "show the fullpath names of the object(s)",
},
},
Before: func(c *cli.Context) error {
err := validateListMultipartCommand(c)
if err != nil {
printError(commandFromContext(c), c.Command.Name, err)
}
return err
},
Action: func(c *cli.Context) (err error) {

var merror error

fullCommand := commandFromContext(c)

srcurl, err := url.New(c.Args().First())
if err != nil {
printError(fullCommand, c.Command.Name, err)
return err
}

client, err := storage.NewRemoteClient(c.Context, srcurl, NewStorageOpts(c))
if err != nil {
printError(fullCommand, c.Command.Name, err)
return err
}

for object := range client.ListMultipartUploads(c.Context, srcurl) {
if err := object.Err; err != nil {
merror = multierror.Append(merror, err)
printError(fullCommand, c.Command.Name, err)
continue
}
msg := ListMPUploadMessage{
Object: object,
showFullPath: c.Bool("show-fullpath"),
}
log.Info(msg)
}

return nil
},
}

cmd.BashComplete = getBashCompleteFn(cmd, false, false)
return cmd
}

type ListMPUploadMessage struct {
Object *storage.UploadObject `json:"object"`

showFullPath bool
}

// String returns the string representation of ListMessage.
func (l ListMPUploadMessage) String() string {
// date and storage fields
var listFormat = "%19s"

listFormat = listFormat + " %s %s"

var s string

var path string
if l.showFullPath {
path = l.Object.URL.String()
} else {
path = l.Object.URL.Relative()
}

s = fmt.Sprintf(
listFormat,
l.Object.Initiated.Format(dateFormat),
path,
l.Object.UploadID,
)

return s
}

// JSON returns the JSON representation of ListMessage.
func (l ListMPUploadMessage) JSON() string {
return strutil.JSON(l.Object)
}

func validateListMultipartCommand(c *cli.Context) error {
if c.Args().Len() != 1 {
return fmt.Errorf("expected 1 argument")
}

_, err := url.New(c.Args().First())
return err
}
Loading

0 comments on commit f07aa14

Please sign in to comment.