diff --git a/README.md b/README.md index 57bd03519..a6fa8f4ca 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ storage services and local filesystems. - Print object contents to stdout - Select JSON records from objects using SQL expressions - Create or remove buckets +- List or abort multipart uploads - Summarize objects sizes, grouping by storage class - Wildcard support for all operations - Multiple arguments support for delete operation diff --git a/command/abortmp.go b/command/abortmp.go new file mode 100644 index 000000000..0185d5d48 --- /dev/null +++ b/command/abortmp.go @@ -0,0 +1,90 @@ +package command + +import ( + "fmt" + + "github.com/peak/s5cmd/v2/storage" + "github.com/peak/s5cmd/v2/storage/url" + "github.com/urfave/cli/v2" +) + +var abortmpHelpTemplate = `Name: + {{.HelpName}} - {{.Usage}} + +Usage: + {{.HelpName}} [options] object-path upload-id + +Options: + {{range .VisibleFlags}}{{.}} + {{end}} +Examples: + 1. Abort multipart upload + > s5cmd {{.HelpName}} s3://bucket/object 01000191-daf9-7547-5278-71bd81953ffe +` + +func NewAbortMultipartCommand() *cli.Command { + cmd := &cli.Command{ + Name: "abortmp", + HelpName: "abortmp", + Usage: "abort multipart uploads", + CustomHelpTemplate: abortmpHelpTemplate, + Flags: []cli.Flag{}, + Before: func(c *cli.Context) error { + err := validateAbortMultipartCommand(c) + if err != nil { + printError(commandFromContext(c), c.Command.Name, err) + } + return err + }, + Action: func(c *cli.Context) (err error) { + + // var merror error + + fullCommand := commandFromContext(c) + + objurl, err := url.New(c.Args().First()) + if err != nil { + printError(fullCommand, c.Command.Name, err) + return err + } + uploadID := c.Args().Get(1) + + client, err := storage.NewRemoteClient(c.Context, objurl, NewStorageOpts(c)) + if err != nil { + printError(fullCommand, c.Command.Name, err) + return err + } + + err = client.AbortMultipartUpload(c.Context, objurl, uploadID) + if err != nil && err != storage.ErrNoObjectFound { + printError(fullCommand, c.Command.Name, err) + return err + } + + return nil + }, + } + + cmd.BashComplete = getBashCompleteFn(cmd, false, false) + return cmd +} + +func validateAbortMultipartCommand(c *cli.Context) error { + if c.Args().Len() != 2 { + return fmt.Errorf("expected object path and upload id arguments") + } + + objectPath := c.Args().Get(0) + uploadID := c.Args().Get(1) + + _, err := url.New(objectPath) + if err != nil { + return err + } + + if uploadID == "" { + return fmt.Errorf("expected upload id, got empty string") + } + + return nil +} diff --git a/command/app.go b/command/app.go index f1699e43d..de0a459f5 100644 --- a/command/app.go +++ b/command/app.go @@ -200,6 +200,9 @@ func Commands() []*cli.Command { NewDeleteCommand(), NewMoveCommand(), NewMakeBucketCommand(), + NewAbortMultipartCommand(), + NewListMultipartCommand(), + NewMultipartPartsCommand(), NewRemoveBucketCommand(), NewSelectCommand(), NewSizeCommand(), diff --git a/command/cp.go b/command/cp.go index 7ed06bfee..65f144132 100644 --- a/command/cp.go +++ b/command/cp.go @@ -565,7 +565,9 @@ func (c Copy) Run(ctx context.Context) error { waiter.Wait() <-errDoneCh - return multierror.Append(merrorWaiter, merrorObjects).ErrorOrNil() + err = multierror.Append(merrorWaiter, merrorObjects).ErrorOrNil() + + return handleMultipartError(c.fullCommand, c.op, err) } func (c Copy) prepareCopyTask( diff --git a/command/error.go b/command/error.go index 37294ce4f..dfe59731a 100644 --- a/command/error.go +++ b/command/error.go @@ -1,9 +1,11 @@ package command import ( + "errors" "fmt" "strings" + "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/hashicorp/go-multierror" errorpkg "github.com/peak/s5cmd/v2/error" @@ -94,3 +96,28 @@ func cleanupError(err error) string { s = strings.TrimSpace(s) return s } + +func handleMultipartError(command, op string, err error) error { + var pkgErr *errorpkg.Error + if err == nil { + return nil + } + + if multiErr, ok := err.(*multierror.Error); ok { + for _, merr := range multiErr.Errors { + if errors.As(merr, &pkgErr) { + if awsErr, ok := pkgErr.Err.(s3manager.MultiUploadFailure); ok { + printError(command, op, fmt.Errorf("multipart upload fail. To resume use the following id: %s", awsErr.UploadID())) + } + } + } + } else { + if errors.As(err, &pkgErr) { + if awsErr, ok := pkgErr.Err.(s3manager.MultiUploadFailure); ok { + printError(command, op, fmt.Errorf("multipart upload fail. To resume use the following id: %s", awsErr.UploadID())) + } + } + } + + return err +} diff --git a/command/lsmp.go b/command/lsmp.go new file mode 100644 index 000000000..12850af5d --- /dev/null +++ b/command/lsmp.go @@ -0,0 +1,134 @@ +package command + +import ( + "fmt" + + "github.com/hashicorp/go-multierror" + "github.com/peak/s5cmd/v2/log" + "github.com/peak/s5cmd/v2/storage" + "github.com/peak/s5cmd/v2/storage/url" + "github.com/peak/s5cmd/v2/strutil" + "github.com/urfave/cli/v2" +) + +var lsmpHelpTemplate = `Name: + {{.HelpName}} - {{.Usage}} + +Usage: + {{.HelpName}} [options] prefix + +Options: + {{range .VisibleFlags}}{{.}} + {{end}} +Examples: + 1. List multipart uploads for bucket + > s5cmd {{.HelpName}} s3://bucket + 2. List multipart uploads for specific object + > s5cmd {{.HelpName}} s3://bucket/object + 3. List multipart uploads with full path to the object + > s5cmd {{.HelpName}} --show-fullpath s3://bucket/object +` + +func NewListMultipartCommand() *cli.Command { + cmd := &cli.Command{ + Name: "lsmp", + HelpName: "lsmp", + Usage: "list multipart uploads", + CustomHelpTemplate: lsmpHelpTemplate, + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "show-fullpath", + Usage: "show the fullpath names of the object(s)", + }, + }, + Before: func(c *cli.Context) error { + err := validateListMultipartCommand(c) + if err != nil { + printError(commandFromContext(c), c.Command.Name, err) + } + return err + }, + Action: func(c *cli.Context) (err error) { + + var merror error + + fullCommand := commandFromContext(c) + + srcurl, err := url.New(c.Args().First()) + if err != nil { + printError(fullCommand, c.Command.Name, err) + return err + } + + client, err := storage.NewRemoteClient(c.Context, srcurl, NewStorageOpts(c)) + if err != nil { + printError(fullCommand, c.Command.Name, err) + return err + } + + for object := range client.ListMultipartUploads(c.Context, srcurl) { + if err := object.Err; err != nil { + merror = multierror.Append(merror, err) + printError(fullCommand, c.Command.Name, err) + continue + } + msg := ListMPUploadMessage{ + Object: object, + showFullPath: c.Bool("show-fullpath"), + } + log.Info(msg) + } + + return nil + }, + } + + cmd.BashComplete = getBashCompleteFn(cmd, false, false) + return cmd +} + +type ListMPUploadMessage struct { + Object *storage.UploadObject `json:"object"` + + showFullPath bool +} + +// String returns the string representation of ListMessage. +func (l ListMPUploadMessage) String() string { + // date and storage fields + var listFormat = "%19s" + + listFormat = listFormat + " %s %s" + + var s string + + var path string + if l.showFullPath { + path = l.Object.URL.String() + } else { + path = l.Object.URL.Relative() + } + + s = fmt.Sprintf( + listFormat, + l.Object.Initiated.Format(dateFormat), + path, + l.Object.UploadID, + ) + + return s +} + +// JSON returns the JSON representation of ListMessage. +func (l ListMPUploadMessage) JSON() string { + return strutil.JSON(l.Object) +} + +func validateListMultipartCommand(c *cli.Context) error { + if c.Args().Len() != 1 { + return fmt.Errorf("expected 1 argument") + } + + _, err := url.New(c.Args().First()) + return err +} diff --git a/command/parts.go b/command/parts.go new file mode 100644 index 000000000..c73167cf0 --- /dev/null +++ b/command/parts.go @@ -0,0 +1,126 @@ +package command + +import ( + "fmt" + + "github.com/hashicorp/go-multierror" + "github.com/peak/s5cmd/v2/log" + "github.com/peak/s5cmd/v2/storage" + "github.com/peak/s5cmd/v2/storage/url" + "github.com/peak/s5cmd/v2/strutil" + "github.com/urfave/cli/v2" +) + +var partsHelpTemplate = `Name: + {{.HelpName}} - {{.Usage}} + +Usage: + {{.HelpName}} [options] object_path uploadID + +Options: + {{range .VisibleFlags}}{{.}} + {{end}} +Examples: + 1. List multipart upload parts + > s5cmd {{.HelpName}} s3://bucket/object 0a6d5ad3-3cab-4d88-aa8b-b735de98877f +` + +func NewMultipartPartsCommand() *cli.Command { + cmd := &cli.Command{ + Name: "parts", + HelpName: "parts", + Usage: "list multipart upload parts", + CustomHelpTemplate: partsHelpTemplate, + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "humanize", + Aliases: []string{"H"}, + Usage: "human-readable output for object sizes", + }, + }, + // Before: func(c *cli.Context) error { + // err := validateListMultipartCommand(c) + // if err != nil { + // printError(commandFromContext(c), c.Command.Name, err) + // } + // return err + // }, + Action: func(c *cli.Context) (err error) { + + var merror error + + fullCommand := commandFromContext(c) + + parturl, err := url.New(c.Args().First()) + if err != nil { + printError(fullCommand, c.Command.Name, err) + return err + } + uploadID := c.Args().Get(1) + + client, err := storage.NewRemoteClient(c.Context, parturl, NewStorageOpts(c)) + if err != nil { + printError(fullCommand, c.Command.Name, err) + return err + } + + for part := range client.ListMultipartUploadParts(c.Context, parturl, uploadID) { + if err := part.Err; err != nil { + merror = multierror.Append(merror, err) + printError(fullCommand, c.Command.Name, err) + continue + } + msg := MPPartsMessage{ + Part: part, + showHumanized: c.Bool("humanize"), + } + log.Info(msg) + } + + return nil + }, + } + + cmd.BashComplete = getBashCompleteFn(cmd, false, false) + return cmd +} + +type MPPartsMessage struct { + Part *storage.MPPartObject `json:"part"` + + showHumanized bool +} + +func (pm MPPartsMessage) humanize() string { + var size string + if pm.showHumanized { + size = strutil.HumanizeBytes(pm.Part.Size) + } else { + size = fmt.Sprintf("%d", pm.Part.Size) + } + return size +} + +// String returns the string representation of ListMessage. +func (pm MPPartsMessage) String() string { + // date + var listFormat = "%19s" + + // Part number, etag, size + listFormat = listFormat + " %d %s %s" + + s := fmt.Sprintf( + listFormat, + pm.Part.ModTime.Format(dateFormat), + pm.Part.PartNumber, + pm.Part.ETag, + pm.humanize(), + ) + + return s +} + +// JSON returns the JSON representation of ListMessage. +func (pm MPPartsMessage) JSON() string { + return strutil.JSON(pm.Part) +} diff --git a/command/pipe.go b/command/pipe.go index 14a0f9251..d752b121a 100644 --- a/command/pipe.go +++ b/command/pipe.go @@ -241,7 +241,7 @@ func (c Pipe) Run(ctx context.Context) error { err = client.Put(ctx, &stdin{file: os.Stdin}, c.dst, metadata, c.concurrency, c.partSize) if err != nil { - return err + return handleMultipartError(c.fullCommand, c.op, err) } msg := log.InfoMessage{ diff --git a/storage/s3.go b/storage/s3.go index 7c85468ca..d12ed4713 100644 --- a/storage/s3.go +++ b/storage/s3.go @@ -314,63 +314,64 @@ func (s *S3) listObjectsV2(ctx context.Context, url *url.URL) <-chan *Object { var now time.Time - err := s.api.ListObjectsV2PagesWithContext(ctx, &listInput, func(p *s3.ListObjectsV2Output, lastPage bool) bool { - for _, c := range p.CommonPrefixes { - prefix := aws.StringValue(c.Prefix) - if !url.Match(prefix) { - continue - } + err := s.api.ListObjectsV2PagesWithContext(ctx, &listInput, + func(p *s3.ListObjectsV2Output, lastPage bool) bool { + for _, c := range p.CommonPrefixes { + prefix := aws.StringValue(c.Prefix) + if !url.Match(prefix) { + continue + } - newurl := url.Clone() - newurl.Path = prefix - objCh <- &Object{ - URL: newurl, - Type: ObjectType{os.ModeDir}, + newurl := url.Clone() + newurl.Path = prefix + objCh <- &Object{ + URL: newurl, + Type: ObjectType{os.ModeDir}, + } + + objectFound = true + } + // track the instant object iteration began, + // so it can be used to bypass objects created after this instant + if now.IsZero() { + now = time.Now().UTC() } - objectFound = true - } - // track the instant object iteration began, - // so it can be used to bypass objects created after this instant - if now.IsZero() { - now = time.Now().UTC() - } + for _, c := range p.Contents { + key := aws.StringValue(c.Key) + if !url.Match(key) { + continue + } - for _, c := range p.Contents { - key := aws.StringValue(c.Key) - if !url.Match(key) { - continue - } + mod := aws.TimeValue(c.LastModified).UTC() + if mod.After(now) { + objectFound = true + continue + } - mod := aws.TimeValue(c.LastModified).UTC() - if mod.After(now) { - objectFound = true - continue - } + var objtype os.FileMode + if strings.HasSuffix(key, "/") { + objtype = os.ModeDir + } - var objtype os.FileMode - if strings.HasSuffix(key, "/") { - objtype = os.ModeDir - } + newurl := url.Clone() + newurl.Path = aws.StringValue(c.Key) + etag := aws.StringValue(c.ETag) - newurl := url.Clone() - newurl.Path = aws.StringValue(c.Key) - etag := aws.StringValue(c.ETag) + objCh <- &Object{ + URL: newurl, + Etag: strings.Trim(etag, `"`), + ModTime: &mod, + Type: ObjectType{objtype}, + Size: aws.Int64Value(c.Size), + StorageClass: StorageClass(aws.StringValue(c.StorageClass)), + } - objCh <- &Object{ - URL: newurl, - Etag: strings.Trim(etag, `"`), - ModTime: &mod, - Type: ObjectType{objtype}, - Size: aws.Int64Value(c.Size), - StorageClass: StorageClass(aws.StringValue(c.StorageClass)), + objectFound = true } - objectFound = true - } - - return !lastPage - }) + return !lastPage + }) if err != nil { objCh <- &Object{Err: err} return @@ -1211,6 +1212,151 @@ func (s *S3) HeadObject(ctx context.Context, url *url.URL) (*Object, *Metadata, return obj, metadata, nil } +// ListMultipartUploads is listing all currently active multi-part uploads +// under specific prefix. +func (s *S3) ListMultipartUploads(ctx context.Context, url *url.URL) <-chan *UploadObject { + input := &s3.ListMultipartUploadsInput{ + Bucket: aws.String(url.Bucket), + Prefix: aws.String(url.Prefix), + } + + objCh := make(chan *UploadObject) + + go func() { + defer close(objCh) + objectFound := false + + var now time.Time + + err := s.api.ListMultipartUploadsPagesWithContext(ctx, input, + func(lmuo *s3.ListMultipartUploadsOutput, lastPage bool) bool { + + // track the instant object iteration began, + // so it can be used to bypass objects created after this instant + if now.IsZero() { + now = time.Now().UTC() + } + + for _, c := range lmuo.Uploads { + key := aws.StringValue(c.Key) + if !url.Match(key) { + continue + } + + mod := aws.TimeValue(c.Initiated).UTC() + if mod.After(now) { + objectFound = true + continue + } + + newurl := url.Clone() + newurl.Path = aws.StringValue(c.Key) + + objCh <- &UploadObject{ + URL: newurl, + Initiated: c.Initiated, + UploadID: aws.StringValue(c.UploadId), + StorageClass: StorageClass(aws.StringValue(c.StorageClass)), + } + + objectFound = true + } + return !lastPage + }) + + if err != nil { + objCh <- &UploadObject{Err: err} + return + } + + if !objectFound && !url.IsBucket() { + objCh <- &UploadObject{Err: ErrNoObjectFound} + } + }() + + return objCh +} + +// AbortMultipartUpload will abort active multi-part upload. +// Needs exact url to object and upload id which should be aborted +func (s *S3) AbortMultipartUpload(ctx context.Context, url *url.URL, uploadID string) error { + input := &s3.AbortMultipartUploadInput{ + Bucket: aws.String(url.Bucket), + Key: aws.String(url.Path), + UploadId: aws.String(uploadID), + } + + _, err := s.api.AbortMultipartUploadWithContext(ctx, input) + + return err +} + +// ListMultipartUploadParts will list currently uploaded parts of multi-part upload. +func (s *S3) ListMultipartUploadParts(ctx context.Context, url *url.URL, uploadID string) <-chan *MPPartObject { + input := &s3.ListPartsInput{ + Bucket: aws.String(url.Bucket), + Key: aws.String(url.Path), + UploadId: aws.String(uploadID), + } + + objCh := make(chan *MPPartObject) + + go func() { + defer close(objCh) + objectFound := false + + var now time.Time + + err := s.api.ListPartsPagesWithContext(ctx, input, + func(lpo *s3.ListPartsOutput, lastPage bool) bool { + + // track the instant object iteration began, + // so it can be used to bypass objects created after this instant + if now.IsZero() { + now = time.Now().UTC() + } + + for _, c := range lpo.Parts { + + key := aws.StringValue(lpo.Key) + if !url.Match(key) { + continue + } + + mod := aws.TimeValue(c.LastModified).UTC() + if mod.After(now) { + objectFound = true + continue + } + + newurl := url.Clone() + newurl.Path = aws.StringValue(lpo.Key) + + objCh <- &MPPartObject{ + ModTime: c.LastModified, + PartNumber: aws.Int64Value(c.PartNumber), + ETag: aws.StringValue(c.ETag), + Size: aws.Int64Value(c.Size), + } + + objectFound = true + } + return !lastPage + }) + + if err != nil { + objCh <- &MPPartObject{Err: err} + return + } + + if !objectFound && !url.IsBucket() { + objCh <- &MPPartObject{Err: ErrNoObjectFound} + } + }() + + return objCh +} + type sdkLogger struct{} func (l sdkLogger) Log(args ...interface{}) { diff --git a/storage/s3_test.go b/storage/s3_test.go index 11b52ebc3..1a2867bc1 100644 --- a/storage/s3_test.go +++ b/storage/s3_test.go @@ -1303,6 +1303,218 @@ func TestS3HeadObject(t *testing.T) { } } +func TestS3ListMultiparts(t *testing.T) { + const ( + numObjectsToReturn = 10 + pre = "s3://bucket/key" + ) + + u, err := url.New(pre) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + mapReturnObjNameToModtime := map[string]time.Time{} + + s3objs := make([]*s3.MultipartUpload, 0, numObjectsToReturn) + + for i := 0; i < numObjectsToReturn; i++ { + fname := fmt.Sprintf("%s/%d", pre, i) + now := time.Now() + + mapReturnObjNameToModtime[pre+"/"+fname] = now + s3objs = append(s3objs, &s3.MultipartUpload{ + Key: aws.String("key/" + fname), + Initiated: aws.Time(now), + UploadId: aws.String("random-id-" + fname), + }) + } + + // shuffle the objects array to remove possible assumptions about how objects + // are stored. + rand.Shuffle(len(s3objs), func(i, j int) { + s3objs[i], s3objs[j] = s3objs[j], s3objs[i] + }) + + mockAPI := s3.New(unit.Session) + + mockAPI.Handlers.Unmarshal.Clear() + mockAPI.Handlers.UnmarshalMeta.Clear() + mockAPI.Handlers.UnmarshalError.Clear() + mockAPI.Handlers.Send.Clear() + + mockAPI.Handlers.Send.PushBack(func(r *request.Request) { + r.HTTPResponse = &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("")), + } + + r.Data = &s3.ListMultipartUploadsOutput{ + Uploads: s3objs, + } + }) + + mockS3 := &S3{ + api: mockAPI, + } + + ouputCh := mockS3.ListMultipartUploads(context.Background(), u) + + for obj := range ouputCh { + if _, ok := mapReturnObjNameToModtime[obj.String()]; ok { + delete(mapReturnObjNameToModtime, obj.String()) + continue + } + t.Errorf("%v should not have been returned\n", obj) + } + assert.Equal(t, len(mapReturnObjNameToModtime), 0) +} + +func TestS3AbortMultipartUpload(t *testing.T) { + const ( + pre = "s3://bucket/key" + uID = "87567b2a-287d-401a-b92a-104f726eeb30" + ) + + u, err := url.New(pre) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + mockAPI := s3.New(unit.Session) + + mockAPI.Handlers.Unmarshal.Clear() + mockAPI.Handlers.UnmarshalMeta.Clear() + mockAPI.Handlers.UnmarshalError.Clear() + mockAPI.Handlers.Send.Clear() + + mockAPI.Handlers.Send.PushBack(func(r *request.Request) { + r.HTTPResponse = &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("")), + } + + r.Data = &s3.AbortMultipartUploadOutput{} + }) + + mockS3 := &S3{ + api: mockAPI, + } + + err = mockS3.AbortMultipartUpload(context.Background(), u, uID) + assert.NilError(t, err) +} + +func TestS3AbortMultipartUploadError(t *testing.T) { + const ( + pre = "s3://bucket/key" + uID = "87567b2a-287d-401a-b92a-104f726eeb30" + ) + + u, err := url.New(pre) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + mockErr := fmt.Errorf("No such upload") + + mockAPI := s3.New(unit.Session) + mockS3 := &S3{api: mockAPI} + + mockAPI.Handlers.Unmarshal.Clear() + mockAPI.Handlers.UnmarshalMeta.Clear() + mockAPI.Handlers.UnmarshalError.Clear() + mockAPI.Handlers.Send.Clear() + + mockAPI.Handlers.Send.PushBack(func(r *request.Request) { + r.Error = mockErr + }) + + err = mockS3.AbortMultipartUpload(context.Background(), u, uID) + if err != mockErr { + t.Errorf("error got = %v, want %v", err, mockErr) + } +} + +func TestS3ListMultipartsParts(t *testing.T) { + testCases := []struct { + url string + uploadID string + numPartsToReturn int + partsSize int64 + }{ + { + url: "s3://bucket/object1", + uploadID: "random-id-object1", + numPartsToReturn: 10, + partsSize: 5000000, // 5MiB part size + }, + { + url: "s3://bucket/object2", + uploadID: "random-id-object1", + numPartsToReturn: 60, + partsSize: 467000000, // 467MiB part size + }, + } + + mockAPI := s3.New(unit.Session) + mockS3 := &S3{api: mockAPI} + + mockAPI.Handlers.Unmarshal.Clear() + mockAPI.Handlers.UnmarshalMeta.Clear() + mockAPI.Handlers.UnmarshalError.Clear() + + for _, tc := range testCases { + u, err := url.New(tc.url) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + s3parts := make([]*s3.Part, 0, tc.numPartsToReturn) + mapReturnPartEtagToModtime := map[string]time.Time{} + + for i := 0; i < tc.numPartsToReturn; i++ { + + now := time.Now() + etag := "etag-" + now.String() + + mapReturnPartEtagToModtime[fmt.Sprintf("%d/%s", i, etag)] = now + s3parts = append(s3parts, &s3.Part{ + PartNumber: aws.Int64(int64(i)), + ETag: aws.String(etag), + Size: aws.Int64(tc.partsSize), + LastModified: aws.Time(now), + }) + } + + mockAPI.Handlers.Send.Clear() + mockAPI.Handlers.Send.PushBack(func(r *request.Request) { + r.HTTPResponse = &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("")), + } + + r.Data = &s3.ListPartsOutput{ + Bucket: aws.String(u.Bucket), + Key: aws.String(u.Path), + UploadId: aws.String(tc.uploadID), + Parts: s3parts, + } + }) + ouputCh := mockS3.ListMultipartUploadParts(context.Background(), u, tc.uploadID) + + for obj := range ouputCh { + key := fmt.Sprintf("%d/%s", obj.PartNumber, obj.ETag) + if mapReturnPartEtagToModtime[key] != aws.TimeValue(obj.ModTime) { + t.Errorf("returned part is not correct") + continue + } + delete(mapReturnPartEtagToModtime, key) + } + assert.Equal(t, len(mapReturnPartEtagToModtime), 0) + } +} + func valueAtPath(i interface{}, s string) interface{} { v, err := awsutil.ValuesAtPath(i, s) if err != nil || len(v) == 0 { diff --git a/storage/storage.go b/storage/storage.go index 1a6171263..9a2e651f7 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -119,6 +119,23 @@ type Object struct { VersionID string `json:"version_id,omitempty"` } +type UploadObject struct { + URL *url.URL `json:"key,omitempty"` + Initiated *time.Time `json:"initiated,omitempty"` + UploadID string `json:"uploadID,omitempty"` + + StorageClass StorageClass `json:"storage_class,omitempty"` + Err error `json:"error,omitempty"` +} + +type MPPartObject struct { + ModTime *time.Time `json:"last_modified,omitempty"` + PartNumber int64 `json:"part_number,omitempty"` + Size int64 `json:"size,omitempty"` + ETag string `json:"etag,omitempty"` + Err error `json:"error,omitempty"` +} + // String returns the string representation of Object. func (o *Object) String() string { return o.URL.String() @@ -132,6 +149,26 @@ func (o *Object) JSON() string { return strutil.JSON(o) } +// String returns the string representation of UploadObject. +func (uo *UploadObject) String() string { + return uo.URL.String() +} + +// JSON returns the JSON representation of UploadObject. +func (uo *UploadObject) JSON() string { + return strutil.JSON(uo) +} + +// String returns the string representation of MPPartObject. +func (po *MPPartObject) String() string { + return fmt.Sprintf("%d-%s", po.PartNumber, po.ETag) +} + +// JSON returns the JSON representation of MPPartObject. +func (po *MPPartObject) JSON() string { + return strutil.JSON(po) +} + // ObjectType is the type of Object. type ObjectType struct { mode os.FileMode