diff --git a/command/app.go b/command/app.go index 23f4588e6..6f324b842 100644 --- a/command/app.go +++ b/command/app.go @@ -173,16 +173,17 @@ var app = &cli.App{ // NewStorageOpts creates storage.Options object from the given context. func NewStorageOpts(c *cli.Context) storage.Options { return storage.Options{ - DryRun: c.Bool("dry-run"), - Endpoint: c.String("endpoint-url"), - MaxRetries: c.Int("retry-count"), - NoSignRequest: c.Bool("no-sign-request"), - NoVerifySSL: c.Bool("no-verify-ssl"), - RequestPayer: c.String("request-payer"), - UseListObjectsV1: c.Bool("use-list-objects-v1"), - Profile: c.String("profile"), - CredentialFile: c.String("credentials-file"), - LogLevel: log.LevelFromString(c.String("log")), + DryRun: c.Bool("dry-run"), + Endpoint: c.String("endpoint-url"), + MaxRetries: c.Int("retry-count"), + NoSignRequest: c.Bool("no-sign-request"), + NoVerifySSL: c.Bool("no-verify-ssl"), + RequestPayer: c.String("request-payer"), + UseListObjectsV1: c.Bool("use-list-objects-v1"), + Profile: c.String("profile"), + CredentialFile: c.String("credentials-file"), + LogLevel: log.LevelFromString(c.String("log")), + NoSuchUploadRetryCount: c.Int("no-such-upload-retry-count"), } } diff --git a/command/cp.go b/command/cp.go index cba3c4296..bae23f88e 100644 --- a/command/cp.go +++ b/command/cp.go @@ -177,6 +177,12 @@ func NewSharedFlags() []cli.Flag { Name: "content-encoding", Usage: "set content encoding for target: defines content encoding header for object, e.g. --content-encoding gzip", }, + &cli.IntFlag{ + Name: "no-such-upload-retry-count", + Usage: "number of times that a request will be retried on NoSuchUpload error; you should not use this unless you really know what you're doing", + DefaultText: "0", + Hidden: true, + }, } } @@ -251,8 +257,8 @@ type Copy struct { acl string forceGlacierTransfer bool ignoreGlacierWarnings bool - exclude []string raw bool + exclude []string cacheControl string expires string contentType string diff --git a/e2e/cp_test.go b/e2e/cp_test.go index cf8d2ddf5..4a7d1debc 100644 --- a/e2e/cp_test.go +++ b/e2e/cp_test.go @@ -4146,3 +4146,42 @@ func TestCopyExpectExitCode1OnUnreachableHost(t *testing.T) { result.Assert(t, icmd.Expected{ExitCode: 1}) } + +func TestCopySingleFileToS3WithNoSuchUploadRetryCount(t *testing.T) { + t.Parallel() + + bucket := s3BucketFromTestName(t) + + s3client, s5cmd, cleanup := setup(t) + defer cleanup() + + createBucket(t, s3client, bucket) + + const ( + filename = "example.txt" + content = "Some example text" + ) + + workdir := fs.NewDir(t, bucket, fs.WithFile(filename, content)) + defer workdir.Remove() + + srcpath := workdir.Join(filename) + dstpath := fmt.Sprintf("s3://%v/", bucket) + + srcpath = filepath.ToSlash(srcpath) + cmd := s5cmd("cp", "--no-such-upload-retry-count", "5", srcpath, dstpath) + result := icmd.RunCmd(cmd) + + result.Assert(t, icmd.Success) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: suffix(`cp %v %v%v`, srcpath, dstpath, filename), + }) + + // assert local filesystem + expected := fs.Expected(t, fs.WithFile(filename, content)) + assert.Assert(t, fs.Equal(workdir.Path(), expected)) + + // assert S3 + assert.Assert(t, ensureS3Object(s3client, bucket, filename, content)) +} diff --git a/storage/s3.go b/storage/s3.go index 2c780704d..82f10a49b 100644 --- a/storage/s3.go +++ b/storage/s3.go @@ -2,11 +2,14 @@ package storage import ( "context" + "crypto/rand" "crypto/tls" "encoding/json" "errors" "fmt" "io" + "math" + "math/big" "net/http" urlpkg "net/url" "os" @@ -43,6 +46,9 @@ const ( // Google Cloud Storage endpoint gcsEndpoint = "storage.googleapis.com" + + // the key of the object metadata which is used to handle retry decision on NoSuchUpload error + metadataKeyRetryID = "s5cmd-upload-retry-id" ) // Re-used AWS sessions dramatically improve performance. @@ -53,13 +59,14 @@ var globalSessionCache = &SessionCache{ // S3 is a storage type which interacts with S3API, DownloaderAPI and // UploaderAPI. type S3 struct { - api s3iface.S3API - downloader s3manageriface.DownloaderAPI - uploader s3manageriface.UploaderAPI - endpointURL urlpkg.URL - dryRun bool - useListObjectsV1 bool - requestPayer string + api s3iface.S3API + downloader s3manageriface.DownloaderAPI + uploader s3manageriface.UploaderAPI + endpointURL urlpkg.URL + dryRun bool + useListObjectsV1 bool + noSuchUploadRetryCount int + requestPayer string } func (s *S3) RequestPayer() *string { @@ -99,13 +106,14 @@ func newS3Storage(ctx context.Context, opts Options) (*S3, error) { } return &S3{ - api: s3.New(awsSession), - downloader: s3manager.NewDownloader(awsSession), - uploader: s3manager.NewUploader(awsSession), - endpointURL: endpointURL, - dryRun: opts.DryRun, - useListObjectsV1: opts.UseListObjectsV1, - requestPayer: opts.RequestPayer, + api: s3.New(awsSession), + downloader: s3manager.NewDownloader(awsSession), + uploader: s3manager.NewUploader(awsSession), + endpointURL: endpointURL, + dryRun: opts.DryRun, + useListObjectsV1: opts.UseListObjectsV1, + requestPayer: opts.RequestPayer, + noSuchUploadRetryCount: opts.NoSuchUploadRetryCount, }, nil } @@ -125,12 +133,21 @@ func (s *S3) Stat(ctx context.Context, url *url.URL) (*Object, error) { etag := aws.StringValue(output.ETag) mod := aws.TimeValue(output.LastModified) - return &Object{ + + obj := &Object{ URL: url, Etag: strings.Trim(etag, `"`), ModTime: &mod, Size: aws.Int64Value(output.ContentLength), - }, nil + } + + if s.noSuchUploadRetryCount > 0 { + if retryID, ok := output.Metadata[metadataKeyRetryID]; ok { + obj.retryID = *retryID + } + } + + return obj, nil } // List is a non-blocking S3 list operation which paginates and filters S3 @@ -514,6 +531,7 @@ func (s *S3) Put( Key: aws.String(to.Path), Body: reader, ContentType: aws.String(contentType), + Metadata: make(map[string]*string), RequestPayer: s.RequestPayer(), } @@ -554,11 +572,53 @@ func (s *S3) Put( input.ContentEncoding = aws.String(contentEncoding) } - _, err := s.uploader.UploadWithContext(ctx, input, func(u *s3manager.Uploader) { + // add retry ID to the object metadata + if s.noSuchUploadRetryCount > 0 { + input.Metadata[metadataKeyRetryID] = generateRetryID() + } + + uploaderOptsFn := func(u *s3manager.Uploader) { u.PartSize = partSize u.Concurrency = concurrency - }) + } + _, err := s.uploader.UploadWithContext(ctx, input, uploaderOptsFn) + + if errHasCode(err, s3.ErrCodeNoSuchUpload) && s.noSuchUploadRetryCount > 0 { + return s.retryOnNoSuchUpload(ctx, to, input, err, uploaderOptsFn) + } + + return err +} + +func (s *S3) retryOnNoSuchUpload(ctx aws.Context, to *url.URL, input *s3manager.UploadInput, + err error, uploaderOpts ...func(*s3manager.Uploader)) error { + + var expectedRetryID string + if ID, ok := input.Metadata[metadataKeyRetryID]; ok { + expectedRetryID = *ID + } + + attempts := 0 + for ; errHasCode(err, s3.ErrCodeNoSuchUpload) && attempts < s.noSuchUploadRetryCount; attempts++ { + // check if object exists and has the retry ID we provided, if it does + // then it means that one of previous uploads was succesfull despite the received error. + obj, sErr := s.Stat(ctx, to) + if sErr == nil && obj.retryID == expectedRetryID { + err = nil + break + } + msg := log.DebugMessage{Err: fmt.Sprintf("Retrying to upload %v upon error: %q", to, err.Error())} + log.Debug(msg) + + _, err = s.uploader.UploadWithContext(ctx, input, uploaderOpts...) + } + + if errHasCode(err, s3.ErrCodeNoSuchUpload) && s.noSuchUploadRetryCount > 0 { + err = awserr.New(s3.ErrCodeNoSuchUpload, fmt.Sprintf( + "RetryOnNoSuchUpload: %v attempts to retry resulted in %v", attempts, + s3.ErrCodeNoSuchUpload), err) + } return err } @@ -808,6 +868,10 @@ func (sc *SessionCache) newSession(ctx context.Context, opts Options) (*session. WithS3ForcePathStyle(!isVirtualHostStyle). WithS3UseAccelerate(useAccelerate). WithHTTPClient(httpClient). + // TODO WithLowerCaseHeaderMaps and WithDisableRestProtocolURICleaning options + // are going to be unnecessary and unsupported in AWS-SDK version 2. + // They should be removed during migration. + WithLowerCaseHeaderMaps(true). // Disable URI cleaning to allow adjacent slashes to be used in S3 object keys. WithDisableRestProtocolURICleaning(true) @@ -985,3 +1049,9 @@ func errHasCode(err error, code string) bool { func IsCancelationError(err error) bool { return errHasCode(err, request.CanceledErrorCode) } + +// generate a retry ID for this upload attempt +func generateRetryID() *string { + num, _ := rand.Int(rand.Reader, big.NewInt(math.MaxInt64)) + return aws.String(num.String()) +} diff --git a/storage/s3_test.go b/storage/s3_test.go index a31f2f0ce..8b84aac00 100644 --- a/storage/s3_test.go +++ b/storage/s3_test.go @@ -14,6 +14,7 @@ import ( "os" "reflect" "strings" + "sync/atomic" "testing" "time" @@ -565,6 +566,83 @@ func TestS3Retry(t *testing.T) { } } +func TestS3RetryOnNoSuchUpload(t *testing.T) { + log.Init("debug", false) + + noSuchUploadError := awserr.New(s3.ErrCodeNoSuchUpload, "The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed. status code: 404, request id: PJXXXXX, host id: HOSTIDXX", nil) + testcases := []struct { + name string + err error + retryCount int32 + }{ + { + name: "Don't retry", + err: noSuchUploadError, + retryCount: 0, + }, { + name: "Retry 5 times on NoSuchUpload error", + err: noSuchUploadError, + retryCount: 5, + }, { + name: "No error", + err: nil, + retryCount: 0, + }, + } + + url, err := url.New("s3://bucket/key") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + for _, tc := range testcases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + mockApi := s3.New(unit.Session) + mockS3 := &S3{ + api: mockApi, + uploader: &s3manager.Uploader{ + S3: mockApi, + PartSize: s3manager.DefaultUploadPartSize, + Concurrency: s3manager.DefaultUploadConcurrency, + LeavePartsOnError: false, + MaxUploadParts: s3manager.MaxUploadParts, + }, + noSuchUploadRetryCount: int(tc.retryCount), + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + atomicCounter := new(int32) + atomic.StoreInt32(atomicCounter, 0) + + mockApi.Handlers.Send.Clear() + mockApi.Handlers.Unmarshal.Clear() + mockApi.Handlers.UnmarshalMeta.Clear() + mockApi.Handlers.ValidateResponse.Clear() + mockApi.Handlers.Unmarshal.PushBack(func(r *request.Request) { + r.Error = tc.err + r.HTTPResponse = &http.Response{} + }) + mockApi.Handlers.Unmarshal.PushBack(func(r *request.Request) { + atomic.AddInt32(atomicCounter, 1) + }) + + mockS3.Put(ctx, strings.NewReader(""), url, NewMetadata(), s3manager.DefaultUploadConcurrency, s3manager.DefaultUploadPartSize) + + // +1 is for the original request + // *2 is to account for the "Stat" requests that are made to obtain + // retry code from object metada. + want := 2*tc.retryCount + 1 + counter := atomic.LoadInt32(atomicCounter) + if counter != want { + t.Errorf("expected retry request count %d, got %d", want, counter) + } + }) + } +} + func TestS3CopyEncryptionRequest(t *testing.T) { testcases := []struct { name string diff --git a/storage/storage.go b/storage/storage.go index 129e891ca..52f7f5647 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -56,18 +56,19 @@ func NewLocalClient(opts Options) *Filesystem { func NewRemoteClient(ctx context.Context, url *url.URL, opts Options) (*S3, error) { newOpts := Options{ - MaxRetries: opts.MaxRetries, - Endpoint: opts.Endpoint, - NoVerifySSL: opts.NoVerifySSL, - DryRun: opts.DryRun, - NoSignRequest: opts.NoSignRequest, - UseListObjectsV1: opts.UseListObjectsV1, - RequestPayer: opts.RequestPayer, - Profile: opts.Profile, - CredentialFile: opts.CredentialFile, - LogLevel: opts.LogLevel, - bucket: url.Bucket, - region: opts.region, + MaxRetries: opts.MaxRetries, + Endpoint: opts.Endpoint, + NoVerifySSL: opts.NoVerifySSL, + DryRun: opts.DryRun, + NoSignRequest: opts.NoSignRequest, + UseListObjectsV1: opts.UseListObjectsV1, + RequestPayer: opts.RequestPayer, + Profile: opts.Profile, + CredentialFile: opts.CredentialFile, + LogLevel: opts.LogLevel, + bucket: url.Bucket, + region: opts.region, + NoSuchUploadRetryCount: opts.NoSuchUploadRetryCount, } return newS3Storage(ctx, newOpts) } @@ -81,18 +82,19 @@ func NewClient(ctx context.Context, url *url.URL, opts Options) (Storage, error) // Options stores configuration for storage. type Options struct { - MaxRetries int - Endpoint string - NoVerifySSL bool - DryRun bool - NoSignRequest bool - UseListObjectsV1 bool - LogLevel log.LogLevel - RequestPayer string - Profile string - CredentialFile string - bucket string - region string + MaxRetries int + NoSuchUploadRetryCount int + Endpoint string + NoVerifySSL bool + DryRun bool + NoSignRequest bool + UseListObjectsV1 bool + LogLevel log.LogLevel + RequestPayer string + Profile string + CredentialFile string + bucket string + region string } func (o *Options) SetRegion(region string) { @@ -108,6 +110,7 @@ type Object struct { Size int64 `json:"size,omitempty"` StorageClass StorageClass `json:"storage_class,omitempty"` Err error `json:"error,omitempty"` + retryID string } // String returns the string representation of Object.