Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

command/sync: handle object listing errors #597

Merged
merged 30 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
da7d6fa
command/sync: handle object listing errors
ahmethakanbesel Jul 18, 2023
4589b09
command/sync: fix typo in the comments
ahmethakanbesel Jul 18, 2023
899a195
command/sync: fix typo in the comments
ahmethakanbesel Jul 18, 2023
9482b08
command/sync: add --exit-on-error flag
ahmethakanbesel Jul 18, 2023
ea069b4
command/sync: better error handling
ahmethakanbesel Jul 18, 2023
2bf614d
command/sync: add new tests
ahmethakanbesel Jul 18, 2023
219c8a6
command/sync: update not found bucket test
ahmethakanbesel Jul 19, 2023
b89f6d3
command/sync: update tests
ahmethakanbesel Jul 19, 2023
d3aea4e
command/sync: refactor getSourceAndDestinationObjects
ahmethakanbesel Jul 20, 2023
127ff9b
command/run: check ctx cancelled
ahmethakanbesel Jul 21, 2023
c6318bb
Merge branch 'peak:master' into sync-handle-errors
ahmethakanbesel Jul 21, 2023
15b3d58
command/sync: revert refactoring
ahmethakanbesel Jul 21, 2023
10ee234
command/run: revert a change
ahmethakanbesel Jul 24, 2023
c85c3ba
command/sync: add a test case
ahmethakanbesel Jul 24, 2023
8230b85
changelog: add a bug fix
ahmethakanbesel Jul 24, 2023
17b199c
Merge branch 'master' into sync-handle-errors
ahmethakanbesel Jul 24, 2023
68f2b4a
changelog: fix typo
ahmethakanbesel Jul 24, 2023
c3fdf8b
Merge branch 'master' into sync-handle-errors
ahmethakanbesel Jul 25, 2023
966ce06
command/sync: add fallthrough
ahmethakanbesel Jul 25, 2023
723fcf3
Merge branch 'master' into sync-handle-errors
ahmethakanbesel Jul 25, 2023
209ae93
Merge branch 'master' into sync-handle-errors
ahmethakanbesel Jul 27, 2023
11c1435
changelog: update the entry
ahmethakanbesel Jul 27, 2023
427ffb4
Merge branch 'master' into sync-handle-errors
ahmethakanbesel Jul 27, 2023
ff52dea
Merge branch 'master' into sync-handle-errors
ilkinulas Jul 27, 2023
06619ef
command/sync: refactor `shouldStopSync`
ahmethakanbesel Jul 27, 2023
6f5e1ff
Merge branch 'master' into sync-handle-errors
ahmethakanbesel Jul 28, 2023
44261f2
command/sync: revert the ctx
ahmethakanbesel Jul 28, 2023
96fd166
command/run: set reader err if ctx is cancelled
ahmethakanbesel Jul 28, 2023
1189724
command/run: remove unnecessary `return`
ahmethakanbesel Jul 28, 2023
7b79b43
Merge branch 'master' into sync-handle-errors
sonmezonur Aug 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#### Bugfixes
- Fixed a bug introduced with `external sort` support in `sync` command which prevents `sync` to an empty destination with `--delete` option. ([#576](https://github.com/peak/s5cmd/issues/576))
- Fixed a bug in `sync` command, which previously caused the command to continue running even if an error was received from the destination bucket. ([#564](https://github.com/peak/s5cmd/issues/564))
- Fixed a bug that causes local files to be lost if downloads fail. ([#479](https://github.com/peak/s5cmd/issues/479))

## v2.1.0 - 19 Jun 2023
Expand Down
63 changes: 53 additions & 10 deletions command/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"strings"
"sync"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/hashicorp/go-multierror"
"github.com/lanrat/extsort"
"github.com/urfave/cli/v2"

errorpkg "github.com/peak/s5cmd/v2/error"
"github.com/peak/s5cmd/v2/log"
"github.com/peak/s5cmd/v2/log/stat"
"github.com/peak/s5cmd/v2/parallel"
"github.com/peak/s5cmd/v2/storage"
Expand Down Expand Up @@ -76,6 +78,10 @@ func NewSyncCommandFlags() []cli.Flag {
Name: "size-only",
Usage: "make size of object only criteria to decide whether an object should be synced",
},
&cli.BoolFlag{
Name: "exit-on-error",
Usage: "stops the sync process if an error is received",
},
}
sharedFlags := NewSharedFlags()
return append(syncFlags, sharedFlags...)
Expand Down Expand Up @@ -119,8 +125,9 @@ type Sync struct {
fullCommand string

// flags
delete bool
sizeOnly bool
delete bool
sizeOnly bool
exitOnError bool

// s3 options
storageOpts storage.Options
Expand All @@ -142,8 +149,9 @@ func NewSync(c *cli.Context) Sync {
fullCommand: commandFromContext(c),

// flags
delete: c.Bool("delete"),
sizeOnly: c.Bool("size-only"),
delete: c.Bool("delete"),
sizeOnly: c.Bool("size-only"),
exitOnError: c.Bool("exit-on-error"),

// flags
followSymlinks: !c.Bool("no-follow-symlinks"),
Expand All @@ -169,20 +177,22 @@ func (s Sync) Run(c *cli.Context) error {
return err
}

sourceObjects, destObjects, err := s.getSourceAndDestinationObjects(c.Context, srcurl, dsturl)
ctx, cancel := context.WithCancel(c.Context)

sourceObjects, destObjects, err := s.getSourceAndDestinationObjects(ctx, cancel, srcurl, dsturl)
if err != nil {
printError(s.fullCommand, s.op, err)
return err
}

isBatch := srcurl.IsWildcard()
if !isBatch && !srcurl.IsRemote() {
sourceClient, err := storage.NewClient(c.Context, srcurl, s.storageOpts)
sourceClient, err := storage.NewClient(ctx, srcurl, s.storageOpts)
if err != nil {
return err
}

obj, _ := sourceClient.Stat(c.Context, srcurl)
obj, _ := sourceClient.Stat(ctx, srcurl)
isBatch = obj != nil && obj.Type.IsDir()
}

Expand Down Expand Up @@ -217,7 +227,7 @@ func (s Sync) Run(c *cli.Context) error {
// Create commands in background.
go s.planRun(c, onlySource, onlyDest, commonObjects, dsturl, strategy, pipeWriter, isBatch)

err = NewRun(c, pipeReader).Run(c.Context)
err = NewRun(c, pipeReader).Run(ctx)
return multierror.Append(err, merrorWaiter).ErrorOrNil()
}

Expand Down Expand Up @@ -280,7 +290,7 @@ func compareObjects(sourceObjects, destObjects chan *storage.Object) (chan *url.
// getSourceAndDestinationObjects returns source and destination objects from
// given URLs. The returned channels gives objects sorted in ascending order
// with respect to their url.Relative path. See also storage.Less.
func (s Sync) getSourceAndDestinationObjects(ctx context.Context, srcurl, dsturl *url.URL) (chan *storage.Object, chan *storage.Object, error) {
func (s Sync) getSourceAndDestinationObjects(ctx context.Context, cancel context.CancelFunc, srcurl, dsturl *url.URL) (chan *storage.Object, chan *storage.Object, error) {
sourceClient, err := storage.NewClient(ctx, srcurl, s.storageOpts)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -328,6 +338,15 @@ func (s Sync) getSourceAndDestinationObjects(ctx context.Context, srcurl, dsturl
defer close(filteredSrcObjectChannel)
// filter and redirect objects
for st := range unfilteredSrcObjectChannel {
if st.Err != nil && s.shouldStopSync(st.Err) {
msg := log.ErrorMessage{
Err: cleanupError(st.Err),
Command: s.fullCommand,
Operation: s.op,
}
log.Error(msg)
cancel()
}
if s.shouldSkipObject(st, true) {
continue
}
Expand Down Expand Up @@ -364,9 +383,17 @@ func (s Sync) getSourceAndDestinationObjects(ctx context.Context, srcurl, dsturl

go func() {
defer close(filteredDstObjectChannel)

// filter and redirect objects
for dt := range unfilteredDestObjectsChannel {
if dt.Err != nil && s.shouldStopSync(dt.Err) {
msg := log.ErrorMessage{
Err: cleanupError(dt.Err),
Command: s.fullCommand,
Operation: s.op,
}
log.Error(msg)
cancel()
}
if s.shouldSkipObject(dt, false) {
continue
}
Expand Down Expand Up @@ -534,3 +561,19 @@ func (s Sync) shouldSkipObject(object *storage.Object, verbose bool) bool {
}
return false
}

// shouldStopSync determines whether a sync process should be stopped or not.
func (s Sync) shouldStopSync(err error) bool {
if err == storage.ErrNoObjectFound {
return false
}
if awsErr, ok := err.(awserr.Error); ok {
switch awsErr.Code() {
case "AccessDenied":
fallthrough
ilkinulas marked this conversation as resolved.
Show resolved Hide resolved
case "NoSuchBucket":
return true
}
}
return s.exitOnError
}
127 changes: 127 additions & 0 deletions e2e/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1793,3 +1793,130 @@ func TestIssue435(t *testing.T) {
assertError(t, err, errS3NoSuchKey)
}
}

// sync s3://bucket/* s3://bucket/ (dest bucket is empty)
func TestSyncS3BucketToEmptyS3BucketWithExitOnErrorFlag(t *testing.T) {
t.Parallel()
s3client, s5cmd := setup(t)

bucket := s3BucketFromTestName(t)
dstbucket := s3BucketFromTestNameWithPrefix(t, "dst")

const (
prefix = "prefix"
)
createBucket(t, s3client, bucket)
createBucket(t, s3client, dstbucket)

S3Content := map[string]string{
"testfile.txt": "S: this is a test file",
"readme.md": "S: this is a readme file",
"a/another_test_file.txt": "S: yet another txt file",
"abc/def/test.py": "S: file in nested folders",
}

for filename, content := range S3Content {
putFile(t, s3client, bucket, filename, content)
}

bucketPath := fmt.Sprintf("s3://%v", bucket)
src := fmt.Sprintf("%v/*", bucketPath)
dst := fmt.Sprintf("s3://%v/%v/", dstbucket, prefix)

cmd := s5cmd("sync", "--exit-on-error", src, dst)
result := icmd.RunCmd(cmd)

result.Assert(t, icmd.Success)

assertLines(t, result.Stdout(), map[int]compareFunc{
0: equals(`cp %v/a/another_test_file.txt %va/another_test_file.txt`, bucketPath, dst),
1: equals(`cp %v/abc/def/test.py %vabc/def/test.py`, bucketPath, dst),
2: equals(`cp %v/readme.md %vreadme.md`, bucketPath, dst),
3: equals(`cp %v/testfile.txt %vtestfile.txt`, bucketPath, dst),
}, sortInput(true))

// assert s3 objects in source bucket.
for key, content := range S3Content {
assert.Assert(t, ensureS3Object(s3client, bucket, key, content))
}

// assert s3 objects in dest bucket
for key, content := range S3Content {
key = fmt.Sprintf("%s/%s", prefix, key) // add the prefix
assert.Assert(t, ensureS3Object(s3client, dstbucket, key, content))
}
}

// sync --exit-on-error s3://bucket/* s3://NotExistingBucket/ (dest bucket doesn't exist)
func TestSyncExitOnErrorS3BucketToS3BucketThatDoesNotExist(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case does not adequately test the --exit-on-error flag, because if the bucket does not exist, the flag will have no effect. It would be better to test the flag with an error other than "AccessDenied" or "NoSuchBucket".

t.Parallel()

now := time.Now()
timeSource := newFixedTimeSource(now)
s3client, s5cmd := setup(t, withTimeSource(timeSource))

bucket := s3BucketFromTestName(t)
destbucket := "NotExistingBucket"

createBucket(t, s3client, bucket)

S3Content := map[string]string{
"testfile.txt": "S: this is a test file",
"readme.md": "S: this is a readme file",
"a/another_test_file.txt": "S: yet another txt file",
"abc/def/test.py": "S: file in nested folders",
}

for filename, content := range S3Content {
putFile(t, s3client, bucket, filename, content)
}

src := fmt.Sprintf("s3://%v/*", bucket)
dst := fmt.Sprintf("s3://%v/", destbucket)

cmd := s5cmd("sync", "--exit-on-error", src, dst)
result := icmd.RunCmd(cmd)

result.Assert(t, icmd.Expected{ExitCode: 1})

assertLines(t, result.Stderr(), map[int]compareFunc{
0: contains(`status code: 404`),
})
}

// sync s3://bucket/* s3://NotExistingBucket/ (dest bucket doesn't exist)
func TestSyncS3BucketToS3BucketThatDoesNotExist(t *testing.T) {
t.Parallel()

now := time.Now()
timeSource := newFixedTimeSource(now)
s3client, s5cmd := setup(t, withTimeSource(timeSource))

bucket := s3BucketFromTestName(t)
destbucket := "NotExistingBucket"

createBucket(t, s3client, bucket)

S3Content := map[string]string{
"testfile.txt": "S: this is a test file",
"readme.md": "S: this is a readme file",
"a/another_test_file.txt": "S: yet another txt file",
"abc/def/test.py": "S: file in nested folders",
}

for filename, content := range S3Content {
putFile(t, s3client, bucket, filename, content)
}

src := fmt.Sprintf("s3://%v/*", bucket)
dst := fmt.Sprintf("s3://%v/", destbucket)

cmd := s5cmd("sync", src, dst)
result := icmd.RunCmd(cmd)

result.Assert(t, icmd.Expected{ExitCode: 1})

assertLines(t, result.Stderr(), map[int]compareFunc{
0: contains(`status code: 404`),
})
}