diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c0172622..ad435b3cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ #### Bugfixes - Fixed a bug introduced with `external sort` support in `sync` command which prevents `sync` to an empty destination with `--delete` option. ([#576](https://github.com/peak/s5cmd/issues/576)) +- Fixed a bug in `sync` command, which previously caused the command to continue running even if an error was received from the destination bucket. ([#564](https://github.com/peak/s5cmd/issues/564)) - Fixed a bug that causes local files to be lost if downloads fail. ([#479](https://github.com/peak/s5cmd/issues/479)) ## v2.1.0 - 19 Jun 2023 diff --git a/command/run.go b/command/run.go index f6ac6687f..5ef3c7890 100644 --- a/command/run.go +++ b/command/run.go @@ -3,6 +3,7 @@ package command import ( "bufio" "context" + "errors" "flag" "fmt" "io" @@ -197,6 +198,9 @@ func (r *Reader) read() { } if err != nil { if err == io.EOF { + if errors.Is(r.ctx.Err(), context.Canceled) { + r.err = r.ctx.Err() + } return } r.err = multierror.Append(r.err, err) diff --git a/command/sync.go b/command/sync.go index 39f497f6f..f6a65adf5 100644 --- a/command/sync.go +++ b/command/sync.go @@ -9,11 +9,13 @@ import ( "strings" "sync" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/hashicorp/go-multierror" "github.com/lanrat/extsort" "github.com/urfave/cli/v2" errorpkg "github.com/peak/s5cmd/v2/error" + "github.com/peak/s5cmd/v2/log" "github.com/peak/s5cmd/v2/log/stat" "github.com/peak/s5cmd/v2/parallel" "github.com/peak/s5cmd/v2/storage" @@ -76,6 +78,10 @@ func NewSyncCommandFlags() []cli.Flag { Name: "size-only", Usage: "make size of object only criteria to decide whether an object should be synced", }, + &cli.BoolFlag{ + Name: "exit-on-error", + Usage: "stops the sync process if an error is received", + }, } sharedFlags := NewSharedFlags() return append(syncFlags, sharedFlags...) @@ -119,8 +125,9 @@ type Sync struct { fullCommand string // flags - delete bool - sizeOnly bool + delete bool + sizeOnly bool + exitOnError bool // s3 options storageOpts storage.Options @@ -142,8 +149,9 @@ func NewSync(c *cli.Context) Sync { fullCommand: commandFromContext(c), // flags - delete: c.Bool("delete"), - sizeOnly: c.Bool("size-only"), + delete: c.Bool("delete"), + sizeOnly: c.Bool("size-only"), + exitOnError: c.Bool("exit-on-error"), // flags followSymlinks: !c.Bool("no-follow-symlinks"), @@ -169,7 +177,9 @@ func (s Sync) Run(c *cli.Context) error { return err } - sourceObjects, destObjects, err := s.getSourceAndDestinationObjects(c.Context, srcurl, dsturl) + ctx, cancel := context.WithCancel(c.Context) + + sourceObjects, destObjects, err := s.getSourceAndDestinationObjects(ctx, cancel, srcurl, dsturl) if err != nil { printError(s.fullCommand, s.op, err) return err @@ -177,12 +187,12 @@ func (s Sync) Run(c *cli.Context) error { isBatch := srcurl.IsWildcard() if !isBatch && !srcurl.IsRemote() { - sourceClient, err := storage.NewClient(c.Context, srcurl, s.storageOpts) + sourceClient, err := storage.NewClient(ctx, srcurl, s.storageOpts) if err != nil { return err } - obj, err := sourceClient.Stat(c.Context, srcurl) + obj, err := sourceClient.Stat(ctx, srcurl) if err != nil { return err } @@ -221,7 +231,7 @@ func (s Sync) Run(c *cli.Context) error { // Create commands in background. go s.planRun(c, onlySource, onlyDest, commonObjects, dsturl, strategy, pipeWriter, isBatch) - err = NewRun(c, pipeReader).Run(c.Context) + err = NewRun(c, pipeReader).Run(ctx) return multierror.Append(err, merrorWaiter).ErrorOrNil() } @@ -284,7 +294,7 @@ func compareObjects(sourceObjects, destObjects chan *storage.Object) (chan *url. // getSourceAndDestinationObjects returns source and destination objects from // given URLs. The returned channels gives objects sorted in ascending order // with respect to their url.Relative path. See also storage.Less. -func (s Sync) getSourceAndDestinationObjects(ctx context.Context, srcurl, dsturl *url.URL) (chan *storage.Object, chan *storage.Object, error) { +func (s Sync) getSourceAndDestinationObjects(ctx context.Context, cancel context.CancelFunc, srcurl, dsturl *url.URL) (chan *storage.Object, chan *storage.Object, error) { sourceClient, err := storage.NewClient(ctx, srcurl, s.storageOpts) if err != nil { return nil, nil, err @@ -332,6 +342,15 @@ func (s Sync) getSourceAndDestinationObjects(ctx context.Context, srcurl, dsturl defer close(filteredSrcObjectChannel) // filter and redirect objects for st := range unfilteredSrcObjectChannel { + if st.Err != nil && s.shouldStopSync(st.Err) { + msg := log.ErrorMessage{ + Err: cleanupError(st.Err), + Command: s.fullCommand, + Operation: s.op, + } + log.Error(msg) + cancel() + } if s.shouldSkipObject(st, true) { continue } @@ -368,9 +387,17 @@ func (s Sync) getSourceAndDestinationObjects(ctx context.Context, srcurl, dsturl go func() { defer close(filteredDstObjectChannel) - // filter and redirect objects for dt := range unfilteredDestObjectsChannel { + if dt.Err != nil && s.shouldStopSync(dt.Err) { + msg := log.ErrorMessage{ + Err: cleanupError(dt.Err), + Command: s.fullCommand, + Operation: s.op, + } + log.Error(msg) + cancel() + } if s.shouldSkipObject(dt, false) { continue } @@ -538,3 +565,17 @@ func (s Sync) shouldSkipObject(object *storage.Object, verbose bool) bool { } return false } + +// shouldStopSync determines whether a sync process should be stopped or not. +func (s Sync) shouldStopSync(err error) bool { + if err == storage.ErrNoObjectFound { + return false + } + if awsErr, ok := err.(awserr.Error); ok { + switch awsErr.Code() { + case "AccessDenied", "NoSuchBucket": + return true + } + } + return s.exitOnError +} diff --git a/e2e/sync_test.go b/e2e/sync_test.go index 3cc2894af..0af4aa48d 100644 --- a/e2e/sync_test.go +++ b/e2e/sync_test.go @@ -1793,3 +1793,130 @@ func TestIssue435(t *testing.T) { assertError(t, err, errS3NoSuchKey) } } + +// sync s3://bucket/* s3://bucket/ (dest bucket is empty) +func TestSyncS3BucketToEmptyS3BucketWithExitOnErrorFlag(t *testing.T) { + t.Parallel() + s3client, s5cmd := setup(t) + + bucket := s3BucketFromTestName(t) + dstbucket := s3BucketFromTestNameWithPrefix(t, "dst") + + const ( + prefix = "prefix" + ) + createBucket(t, s3client, bucket) + createBucket(t, s3client, dstbucket) + + S3Content := map[string]string{ + "testfile.txt": "S: this is a test file", + "readme.md": "S: this is a readme file", + "a/another_test_file.txt": "S: yet another txt file", + "abc/def/test.py": "S: file in nested folders", + } + + for filename, content := range S3Content { + putFile(t, s3client, bucket, filename, content) + } + + bucketPath := fmt.Sprintf("s3://%v", bucket) + src := fmt.Sprintf("%v/*", bucketPath) + dst := fmt.Sprintf("s3://%v/%v/", dstbucket, prefix) + + cmd := s5cmd("sync", "--exit-on-error", src, dst) + result := icmd.RunCmd(cmd) + + result.Assert(t, icmd.Success) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: equals(`cp %v/a/another_test_file.txt %va/another_test_file.txt`, bucketPath, dst), + 1: equals(`cp %v/abc/def/test.py %vabc/def/test.py`, bucketPath, dst), + 2: equals(`cp %v/readme.md %vreadme.md`, bucketPath, dst), + 3: equals(`cp %v/testfile.txt %vtestfile.txt`, bucketPath, dst), + }, sortInput(true)) + + // assert s3 objects in source bucket. + for key, content := range S3Content { + assert.Assert(t, ensureS3Object(s3client, bucket, key, content)) + } + + // assert s3 objects in dest bucket + for key, content := range S3Content { + key = fmt.Sprintf("%s/%s", prefix, key) // add the prefix + assert.Assert(t, ensureS3Object(s3client, dstbucket, key, content)) + } +} + +// sync --exit-on-error s3://bucket/* s3://NotExistingBucket/ (dest bucket doesn't exist) +func TestSyncExitOnErrorS3BucketToS3BucketThatDoesNotExist(t *testing.T) { + t.Parallel() + + now := time.Now() + timeSource := newFixedTimeSource(now) + s3client, s5cmd := setup(t, withTimeSource(timeSource)) + + bucket := s3BucketFromTestName(t) + destbucket := "NotExistingBucket" + + createBucket(t, s3client, bucket) + + S3Content := map[string]string{ + "testfile.txt": "S: this is a test file", + "readme.md": "S: this is a readme file", + "a/another_test_file.txt": "S: yet another txt file", + "abc/def/test.py": "S: file in nested folders", + } + + for filename, content := range S3Content { + putFile(t, s3client, bucket, filename, content) + } + + src := fmt.Sprintf("s3://%v/*", bucket) + dst := fmt.Sprintf("s3://%v/", destbucket) + + cmd := s5cmd("sync", "--exit-on-error", src, dst) + result := icmd.RunCmd(cmd) + + result.Assert(t, icmd.Expected{ExitCode: 1}) + + assertLines(t, result.Stderr(), map[int]compareFunc{ + 0: contains(`status code: 404`), + }) +} + +// sync s3://bucket/* s3://NotExistingBucket/ (dest bucket doesn't exist) +func TestSyncS3BucketToS3BucketThatDoesNotExist(t *testing.T) { + t.Parallel() + + now := time.Now() + timeSource := newFixedTimeSource(now) + s3client, s5cmd := setup(t, withTimeSource(timeSource)) + + bucket := s3BucketFromTestName(t) + destbucket := "NotExistingBucket" + + createBucket(t, s3client, bucket) + + S3Content := map[string]string{ + "testfile.txt": "S: this is a test file", + "readme.md": "S: this is a readme file", + "a/another_test_file.txt": "S: yet another txt file", + "abc/def/test.py": "S: file in nested folders", + } + + for filename, content := range S3Content { + putFile(t, s3client, bucket, filename, content) + } + + src := fmt.Sprintf("s3://%v/*", bucket) + dst := fmt.Sprintf("s3://%v/", destbucket) + + cmd := s5cmd("sync", src, dst) + result := icmd.RunCmd(cmd) + + result.Assert(t, icmd.Expected{ExitCode: 1}) + + assertLines(t, result.Stderr(), map[int]compareFunc{ + 0: contains(`status code: 404`), + }) +}