Skip to content

Commit

Permalink
command: retry upload on s3.NoSuchUpload (#470)
Browse files Browse the repository at this point in the history
With this change, doUpload method will retry the multipart upload on NoSuchUploadError if the no-such-upload-retry-count flag is used with cp or sync and the value of the special metadata in remote file does not match the one placed in upload request (or does not exists at all) . Otherwise (if it is different), it will assume that upload was successful and ignore the error.

Retry logic is placed into s3.Put since it is possible to restart upload operation and write unit test there.

For the retry condition, it will use a user defined metadata field. If the no-such-upload-retry-count given (with positive parameter), then it will put a random string as a user defined metadata. Then, upon the s3.NoSuchUpload error, it will check if that field matches with the one placed or not. If it does not match (or does not even exists), then the upload will be retried.

Fixes #450
  • Loading branch information
kucukaslan authored Aug 2, 2022
1 parent 123b1c7 commit 7c90f8a
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 53 deletions.
21 changes: 11 additions & 10 deletions command/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,16 +173,17 @@ var app = &cli.App{
// NewStorageOpts creates storage.Options object from the given context.
func NewStorageOpts(c *cli.Context) storage.Options {
return storage.Options{
DryRun: c.Bool("dry-run"),
Endpoint: c.String("endpoint-url"),
MaxRetries: c.Int("retry-count"),
NoSignRequest: c.Bool("no-sign-request"),
NoVerifySSL: c.Bool("no-verify-ssl"),
RequestPayer: c.String("request-payer"),
UseListObjectsV1: c.Bool("use-list-objects-v1"),
Profile: c.String("profile"),
CredentialFile: c.String("credentials-file"),
LogLevel: log.LevelFromString(c.String("log")),
DryRun: c.Bool("dry-run"),
Endpoint: c.String("endpoint-url"),
MaxRetries: c.Int("retry-count"),
NoSignRequest: c.Bool("no-sign-request"),
NoVerifySSL: c.Bool("no-verify-ssl"),
RequestPayer: c.String("request-payer"),
UseListObjectsV1: c.Bool("use-list-objects-v1"),
Profile: c.String("profile"),
CredentialFile: c.String("credentials-file"),
LogLevel: log.LevelFromString(c.String("log")),
NoSuchUploadRetryCount: c.Int("no-such-upload-retry-count"),
}
}

Expand Down
8 changes: 7 additions & 1 deletion command/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ func NewSharedFlags() []cli.Flag {
Name: "content-encoding",
Usage: "set content encoding for target: defines content encoding header for object, e.g. --content-encoding gzip",
},
&cli.IntFlag{
Name: "no-such-upload-retry-count",
Usage: "number of times that a request will be retried on NoSuchUpload error; you should not use this unless you really know what you're doing",
DefaultText: "0",
Hidden: true,
},
}
}

Expand Down Expand Up @@ -251,8 +257,8 @@ type Copy struct {
acl string
forceGlacierTransfer bool
ignoreGlacierWarnings bool
exclude []string
raw bool
exclude []string
cacheControl string
expires string
contentType string
Expand Down
39 changes: 39 additions & 0 deletions e2e/cp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4146,3 +4146,42 @@ func TestCopyExpectExitCode1OnUnreachableHost(t *testing.T) {

result.Assert(t, icmd.Expected{ExitCode: 1})
}

func TestCopySingleFileToS3WithNoSuchUploadRetryCount(t *testing.T) {
t.Parallel()

bucket := s3BucketFromTestName(t)

s3client, s5cmd, cleanup := setup(t)
defer cleanup()

createBucket(t, s3client, bucket)

const (
filename = "example.txt"
content = "Some example text"
)

workdir := fs.NewDir(t, bucket, fs.WithFile(filename, content))
defer workdir.Remove()

srcpath := workdir.Join(filename)
dstpath := fmt.Sprintf("s3://%v/", bucket)

srcpath = filepath.ToSlash(srcpath)
cmd := s5cmd("cp", "--no-such-upload-retry-count", "5", srcpath, dstpath)
result := icmd.RunCmd(cmd)

result.Assert(t, icmd.Success)

assertLines(t, result.Stdout(), map[int]compareFunc{
0: suffix(`cp %v %v%v`, srcpath, dstpath, filename),
})

// assert local filesystem
expected := fs.Expected(t, fs.WithFile(filename, content))
assert.Assert(t, fs.Equal(workdir.Path(), expected))

// assert S3
assert.Assert(t, ensureS3Object(s3client, bucket, filename, content))
}
106 changes: 88 additions & 18 deletions storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package storage

import (
"context"
"crypto/rand"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"math/big"
"net/http"
urlpkg "net/url"
"os"
Expand Down Expand Up @@ -43,6 +46,9 @@ const (

// Google Cloud Storage endpoint
gcsEndpoint = "storage.googleapis.com"

// the key of the object metadata which is used to handle retry decision on NoSuchUpload error
metadataKeyRetryID = "s5cmd-upload-retry-id"
)

// Re-used AWS sessions dramatically improve performance.
Expand All @@ -53,13 +59,14 @@ var globalSessionCache = &SessionCache{
// S3 is a storage type which interacts with S3API, DownloaderAPI and
// UploaderAPI.
type S3 struct {
api s3iface.S3API
downloader s3manageriface.DownloaderAPI
uploader s3manageriface.UploaderAPI
endpointURL urlpkg.URL
dryRun bool
useListObjectsV1 bool
requestPayer string
api s3iface.S3API
downloader s3manageriface.DownloaderAPI
uploader s3manageriface.UploaderAPI
endpointURL urlpkg.URL
dryRun bool
useListObjectsV1 bool
noSuchUploadRetryCount int
requestPayer string
}

func (s *S3) RequestPayer() *string {
Expand Down Expand Up @@ -99,13 +106,14 @@ func newS3Storage(ctx context.Context, opts Options) (*S3, error) {
}

return &S3{
api: s3.New(awsSession),
downloader: s3manager.NewDownloader(awsSession),
uploader: s3manager.NewUploader(awsSession),
endpointURL: endpointURL,
dryRun: opts.DryRun,
useListObjectsV1: opts.UseListObjectsV1,
requestPayer: opts.RequestPayer,
api: s3.New(awsSession),
downloader: s3manager.NewDownloader(awsSession),
uploader: s3manager.NewUploader(awsSession),
endpointURL: endpointURL,
dryRun: opts.DryRun,
useListObjectsV1: opts.UseListObjectsV1,
requestPayer: opts.RequestPayer,
noSuchUploadRetryCount: opts.NoSuchUploadRetryCount,
}, nil
}

Expand All @@ -125,12 +133,21 @@ func (s *S3) Stat(ctx context.Context, url *url.URL) (*Object, error) {

etag := aws.StringValue(output.ETag)
mod := aws.TimeValue(output.LastModified)
return &Object{

obj := &Object{
URL: url,
Etag: strings.Trim(etag, `"`),
ModTime: &mod,
Size: aws.Int64Value(output.ContentLength),
}, nil
}

if s.noSuchUploadRetryCount > 0 {
if retryID, ok := output.Metadata[metadataKeyRetryID]; ok {
obj.retryID = *retryID
}
}

return obj, nil
}

// List is a non-blocking S3 list operation which paginates and filters S3
Expand Down Expand Up @@ -514,6 +531,7 @@ func (s *S3) Put(
Key: aws.String(to.Path),
Body: reader,
ContentType: aws.String(contentType),
Metadata: make(map[string]*string),
RequestPayer: s.RequestPayer(),
}

Expand Down Expand Up @@ -554,11 +572,53 @@ func (s *S3) Put(
input.ContentEncoding = aws.String(contentEncoding)
}

_, err := s.uploader.UploadWithContext(ctx, input, func(u *s3manager.Uploader) {
// add retry ID to the object metadata
if s.noSuchUploadRetryCount > 0 {
input.Metadata[metadataKeyRetryID] = generateRetryID()
}

uploaderOptsFn := func(u *s3manager.Uploader) {
u.PartSize = partSize
u.Concurrency = concurrency
})
}
_, err := s.uploader.UploadWithContext(ctx, input, uploaderOptsFn)

if errHasCode(err, s3.ErrCodeNoSuchUpload) && s.noSuchUploadRetryCount > 0 {
return s.retryOnNoSuchUpload(ctx, to, input, err, uploaderOptsFn)
}

return err
}

func (s *S3) retryOnNoSuchUpload(ctx aws.Context, to *url.URL, input *s3manager.UploadInput,
err error, uploaderOpts ...func(*s3manager.Uploader)) error {

var expectedRetryID string
if ID, ok := input.Metadata[metadataKeyRetryID]; ok {
expectedRetryID = *ID
}

attempts := 0
for ; errHasCode(err, s3.ErrCodeNoSuchUpload) && attempts < s.noSuchUploadRetryCount; attempts++ {
// check if object exists and has the retry ID we provided, if it does
// then it means that one of previous uploads was succesfull despite the received error.
obj, sErr := s.Stat(ctx, to)
if sErr == nil && obj.retryID == expectedRetryID {
err = nil
break
}

msg := log.DebugMessage{Err: fmt.Sprintf("Retrying to upload %v upon error: %q", to, err.Error())}
log.Debug(msg)

_, err = s.uploader.UploadWithContext(ctx, input, uploaderOpts...)
}

if errHasCode(err, s3.ErrCodeNoSuchUpload) && s.noSuchUploadRetryCount > 0 {
err = awserr.New(s3.ErrCodeNoSuchUpload, fmt.Sprintf(
"RetryOnNoSuchUpload: %v attempts to retry resulted in %v", attempts,
s3.ErrCodeNoSuchUpload), err)
}
return err
}

Expand Down Expand Up @@ -808,6 +868,10 @@ func (sc *SessionCache) newSession(ctx context.Context, opts Options) (*session.
WithS3ForcePathStyle(!isVirtualHostStyle).
WithS3UseAccelerate(useAccelerate).
WithHTTPClient(httpClient).
// TODO WithLowerCaseHeaderMaps and WithDisableRestProtocolURICleaning options
// are going to be unnecessary and unsupported in AWS-SDK version 2.
// They should be removed during migration.
WithLowerCaseHeaderMaps(true).
// Disable URI cleaning to allow adjacent slashes to be used in S3 object keys.
WithDisableRestProtocolURICleaning(true)

Expand Down Expand Up @@ -985,3 +1049,9 @@ func errHasCode(err error, code string) bool {
func IsCancelationError(err error) bool {
return errHasCode(err, request.CanceledErrorCode)
}

// generate a retry ID for this upload attempt
func generateRetryID() *string {
num, _ := rand.Int(rand.Reader, big.NewInt(math.MaxInt64))
return aws.String(num.String())
}
78 changes: 78 additions & 0 deletions storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -565,6 +566,83 @@ func TestS3Retry(t *testing.T) {
}
}

func TestS3RetryOnNoSuchUpload(t *testing.T) {
log.Init("debug", false)

noSuchUploadError := awserr.New(s3.ErrCodeNoSuchUpload, "The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed. status code: 404, request id: PJXXXXX, host id: HOSTIDXX", nil)
testcases := []struct {
name string
err error
retryCount int32
}{
{
name: "Don't retry",
err: noSuchUploadError,
retryCount: 0,
}, {
name: "Retry 5 times on NoSuchUpload error",
err: noSuchUploadError,
retryCount: 5,
}, {
name: "No error",
err: nil,
retryCount: 0,
},
}

url, err := url.New("s3://bucket/key")
if err != nil {
t.Errorf("unexpected error: %v", err)
}

for _, tc := range testcases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
mockApi := s3.New(unit.Session)
mockS3 := &S3{
api: mockApi,
uploader: &s3manager.Uploader{
S3: mockApi,
PartSize: s3manager.DefaultUploadPartSize,
Concurrency: s3manager.DefaultUploadConcurrency,
LeavePartsOnError: false,
MaxUploadParts: s3manager.MaxUploadParts,
},
noSuchUploadRetryCount: int(tc.retryCount),
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

atomicCounter := new(int32)
atomic.StoreInt32(atomicCounter, 0)

mockApi.Handlers.Send.Clear()
mockApi.Handlers.Unmarshal.Clear()
mockApi.Handlers.UnmarshalMeta.Clear()
mockApi.Handlers.ValidateResponse.Clear()
mockApi.Handlers.Unmarshal.PushBack(func(r *request.Request) {
r.Error = tc.err
r.HTTPResponse = &http.Response{}
})
mockApi.Handlers.Unmarshal.PushBack(func(r *request.Request) {
atomic.AddInt32(atomicCounter, 1)
})

mockS3.Put(ctx, strings.NewReader(""), url, NewMetadata(), s3manager.DefaultUploadConcurrency, s3manager.DefaultUploadPartSize)

// +1 is for the original request
// *2 is to account for the "Stat" requests that are made to obtain
// retry code from object metada.
want := 2*tc.retryCount + 1
counter := atomic.LoadInt32(atomicCounter)
if counter != want {
t.Errorf("expected retry request count %d, got %d", want, counter)
}
})
}
}

func TestS3CopyEncryptionRequest(t *testing.T) {
testcases := []struct {
name string
Expand Down
Loading

0 comments on commit 7c90f8a

Please sign in to comment.