Skip to content

Commit

Permalink
command/sync: handle object listing errors (#597)
Browse files Browse the repository at this point in the history
Resolves #564 

Changes are made:

- Added `exit-on-error` flag. Its value is `false` by default.
- Added `shouldStopSync` function. It determines whether a sync process
should be stopped or not. It does not ignore the errors `AccessDenied`
and `NoSuchBucket` regardless of the value of `exit-on-error` flag.
- `sync` command stops if an error is received when listing objects from
source or destination when the `exit-on-error` flag is `true`. But it
always ignores the `ErrNoObjectFound` error.

---------

Co-authored-by: İlkin Balkanay <[email protected]>
Co-authored-by: Onur Sönmez <[email protected]>
  • Loading branch information
3 people authored Aug 4, 2023
1 parent 987b4ae commit cb948fa
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#### Bugfixes
- Fixed a bug introduced with `external sort` support in `sync` command which prevents `sync` to an empty destination with `--delete` option. ([#576](https://github.com/peak/s5cmd/issues/576))
- Fixed a bug in `sync` command, which previously caused the command to continue running even if an error was received from the destination bucket. ([#564](https://github.com/peak/s5cmd/issues/564))
- Fixed a bug that causes local files to be lost if downloads fail. ([#479](https://github.com/peak/s5cmd/issues/479))

## v2.1.0 - 19 Jun 2023
Expand Down
4 changes: 4 additions & 0 deletions command/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package command
import (
"bufio"
"context"
"errors"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -197,6 +198,9 @@ func (r *Reader) read() {
}
if err != nil {
if err == io.EOF {
if errors.Is(r.ctx.Err(), context.Canceled) {
r.err = r.ctx.Err()
}
return
}
r.err = multierror.Append(r.err, err)
Expand Down
61 changes: 51 additions & 10 deletions command/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"strings"
"sync"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/hashicorp/go-multierror"
"github.com/lanrat/extsort"
"github.com/urfave/cli/v2"

errorpkg "github.com/peak/s5cmd/v2/error"
"github.com/peak/s5cmd/v2/log"
"github.com/peak/s5cmd/v2/log/stat"
"github.com/peak/s5cmd/v2/parallel"
"github.com/peak/s5cmd/v2/storage"
Expand Down Expand Up @@ -76,6 +78,10 @@ func NewSyncCommandFlags() []cli.Flag {
Name: "size-only",
Usage: "make size of object only criteria to decide whether an object should be synced",
},
&cli.BoolFlag{
Name: "exit-on-error",
Usage: "stops the sync process if an error is received",
},
}
sharedFlags := NewSharedFlags()
return append(syncFlags, sharedFlags...)
Expand Down Expand Up @@ -119,8 +125,9 @@ type Sync struct {
fullCommand string

// flags
delete bool
sizeOnly bool
delete bool
sizeOnly bool
exitOnError bool

// s3 options
storageOpts storage.Options
Expand All @@ -142,8 +149,9 @@ func NewSync(c *cli.Context) Sync {
fullCommand: commandFromContext(c),

// flags
delete: c.Bool("delete"),
sizeOnly: c.Bool("size-only"),
delete: c.Bool("delete"),
sizeOnly: c.Bool("size-only"),
exitOnError: c.Bool("exit-on-error"),

// flags
followSymlinks: !c.Bool("no-follow-symlinks"),
Expand All @@ -169,20 +177,22 @@ func (s Sync) Run(c *cli.Context) error {
return err
}

sourceObjects, destObjects, err := s.getSourceAndDestinationObjects(c.Context, srcurl, dsturl)
ctx, cancel := context.WithCancel(c.Context)

sourceObjects, destObjects, err := s.getSourceAndDestinationObjects(ctx, cancel, srcurl, dsturl)
if err != nil {
printError(s.fullCommand, s.op, err)
return err
}

isBatch := srcurl.IsWildcard()
if !isBatch && !srcurl.IsRemote() {
sourceClient, err := storage.NewClient(c.Context, srcurl, s.storageOpts)
sourceClient, err := storage.NewClient(ctx, srcurl, s.storageOpts)
if err != nil {
return err
}

obj, err := sourceClient.Stat(c.Context, srcurl)
obj, err := sourceClient.Stat(ctx, srcurl)
if err != nil {
return err
}
Expand Down Expand Up @@ -221,7 +231,7 @@ func (s Sync) Run(c *cli.Context) error {
// Create commands in background.
go s.planRun(c, onlySource, onlyDest, commonObjects, dsturl, strategy, pipeWriter, isBatch)

err = NewRun(c, pipeReader).Run(c.Context)
err = NewRun(c, pipeReader).Run(ctx)
return multierror.Append(err, merrorWaiter).ErrorOrNil()
}

Expand Down Expand Up @@ -284,7 +294,7 @@ func compareObjects(sourceObjects, destObjects chan *storage.Object) (chan *url.
// getSourceAndDestinationObjects returns source and destination objects from
// given URLs. The returned channels gives objects sorted in ascending order
// with respect to their url.Relative path. See also storage.Less.
func (s Sync) getSourceAndDestinationObjects(ctx context.Context, srcurl, dsturl *url.URL) (chan *storage.Object, chan *storage.Object, error) {
func (s Sync) getSourceAndDestinationObjects(ctx context.Context, cancel context.CancelFunc, srcurl, dsturl *url.URL) (chan *storage.Object, chan *storage.Object, error) {
sourceClient, err := storage.NewClient(ctx, srcurl, s.storageOpts)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -332,6 +342,15 @@ func (s Sync) getSourceAndDestinationObjects(ctx context.Context, srcurl, dsturl
defer close(filteredSrcObjectChannel)
// filter and redirect objects
for st := range unfilteredSrcObjectChannel {
if st.Err != nil && s.shouldStopSync(st.Err) {
msg := log.ErrorMessage{
Err: cleanupError(st.Err),
Command: s.fullCommand,
Operation: s.op,
}
log.Error(msg)
cancel()
}
if s.shouldSkipObject(st, true) {
continue
}
Expand Down Expand Up @@ -368,9 +387,17 @@ func (s Sync) getSourceAndDestinationObjects(ctx context.Context, srcurl, dsturl

go func() {
defer close(filteredDstObjectChannel)

// filter and redirect objects
for dt := range unfilteredDestObjectsChannel {
if dt.Err != nil && s.shouldStopSync(dt.Err) {
msg := log.ErrorMessage{
Err: cleanupError(dt.Err),
Command: s.fullCommand,
Operation: s.op,
}
log.Error(msg)
cancel()
}
if s.shouldSkipObject(dt, false) {
continue
}
Expand Down Expand Up @@ -538,3 +565,17 @@ func (s Sync) shouldSkipObject(object *storage.Object, verbose bool) bool {
}
return false
}

// shouldStopSync determines whether a sync process should be stopped or not.
func (s Sync) shouldStopSync(err error) bool {
if err == storage.ErrNoObjectFound {
return false
}
if awsErr, ok := err.(awserr.Error); ok {
switch awsErr.Code() {
case "AccessDenied", "NoSuchBucket":
return true
}
}
return s.exitOnError
}
127 changes: 127 additions & 0 deletions e2e/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1793,3 +1793,130 @@ func TestIssue435(t *testing.T) {
assertError(t, err, errS3NoSuchKey)
}
}

// sync s3://bucket/* s3://bucket/ (dest bucket is empty)
func TestSyncS3BucketToEmptyS3BucketWithExitOnErrorFlag(t *testing.T) {
t.Parallel()
s3client, s5cmd := setup(t)

bucket := s3BucketFromTestName(t)
dstbucket := s3BucketFromTestNameWithPrefix(t, "dst")

const (
prefix = "prefix"
)
createBucket(t, s3client, bucket)
createBucket(t, s3client, dstbucket)

S3Content := map[string]string{
"testfile.txt": "S: this is a test file",
"readme.md": "S: this is a readme file",
"a/another_test_file.txt": "S: yet another txt file",
"abc/def/test.py": "S: file in nested folders",
}

for filename, content := range S3Content {
putFile(t, s3client, bucket, filename, content)
}

bucketPath := fmt.Sprintf("s3://%v", bucket)
src := fmt.Sprintf("%v/*", bucketPath)
dst := fmt.Sprintf("s3://%v/%v/", dstbucket, prefix)

cmd := s5cmd("sync", "--exit-on-error", src, dst)
result := icmd.RunCmd(cmd)

result.Assert(t, icmd.Success)

assertLines(t, result.Stdout(), map[int]compareFunc{
0: equals(`cp %v/a/another_test_file.txt %va/another_test_file.txt`, bucketPath, dst),
1: equals(`cp %v/abc/def/test.py %vabc/def/test.py`, bucketPath, dst),
2: equals(`cp %v/readme.md %vreadme.md`, bucketPath, dst),
3: equals(`cp %v/testfile.txt %vtestfile.txt`, bucketPath, dst),
}, sortInput(true))

// assert s3 objects in source bucket.
for key, content := range S3Content {
assert.Assert(t, ensureS3Object(s3client, bucket, key, content))
}

// assert s3 objects in dest bucket
for key, content := range S3Content {
key = fmt.Sprintf("%s/%s", prefix, key) // add the prefix
assert.Assert(t, ensureS3Object(s3client, dstbucket, key, content))
}
}

// sync --exit-on-error s3://bucket/* s3://NotExistingBucket/ (dest bucket doesn't exist)
func TestSyncExitOnErrorS3BucketToS3BucketThatDoesNotExist(t *testing.T) {
t.Parallel()

now := time.Now()
timeSource := newFixedTimeSource(now)
s3client, s5cmd := setup(t, withTimeSource(timeSource))

bucket := s3BucketFromTestName(t)
destbucket := "NotExistingBucket"

createBucket(t, s3client, bucket)

S3Content := map[string]string{
"testfile.txt": "S: this is a test file",
"readme.md": "S: this is a readme file",
"a/another_test_file.txt": "S: yet another txt file",
"abc/def/test.py": "S: file in nested folders",
}

for filename, content := range S3Content {
putFile(t, s3client, bucket, filename, content)
}

src := fmt.Sprintf("s3://%v/*", bucket)
dst := fmt.Sprintf("s3://%v/", destbucket)

cmd := s5cmd("sync", "--exit-on-error", src, dst)
result := icmd.RunCmd(cmd)

result.Assert(t, icmd.Expected{ExitCode: 1})

assertLines(t, result.Stderr(), map[int]compareFunc{
0: contains(`status code: 404`),
})
}

// sync s3://bucket/* s3://NotExistingBucket/ (dest bucket doesn't exist)
func TestSyncS3BucketToS3BucketThatDoesNotExist(t *testing.T) {
t.Parallel()

now := time.Now()
timeSource := newFixedTimeSource(now)
s3client, s5cmd := setup(t, withTimeSource(timeSource))

bucket := s3BucketFromTestName(t)
destbucket := "NotExistingBucket"

createBucket(t, s3client, bucket)

S3Content := map[string]string{
"testfile.txt": "S: this is a test file",
"readme.md": "S: this is a readme file",
"a/another_test_file.txt": "S: yet another txt file",
"abc/def/test.py": "S: file in nested folders",
}

for filename, content := range S3Content {
putFile(t, s3client, bucket, filename, content)
}

src := fmt.Sprintf("s3://%v/*", bucket)
dst := fmt.Sprintf("s3://%v/", destbucket)

cmd := s5cmd("sync", src, dst)
result := icmd.RunCmd(cmd)

result.Assert(t, icmd.Expected{ExitCode: 1})

assertLines(t, result.Stderr(), map[int]compareFunc{
0: contains(`status code: 404`),
})
}

0 comments on commit cb948fa

Please sign in to comment.