Skip to content

Commit

Permalink
address race condition and aggregate errors
Browse files Browse the repository at this point in the history
Signed-off-by: wayner0628 <[email protected]>
  • Loading branch information
wayner0628 committed Nov 8, 2024
1 parent bf65836 commit db481d0
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions flytecopilot/data/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri
}
}

success := 0
// Track the count of successful downloads and the total number of items
downloadSuccess := 0
itemCount := len(absPaths)
// Track successful closures of readers and writers in deferred functions
readerCloseSuccessCount := 0
Expand Down Expand Up @@ -110,7 +111,9 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri
if err != nil {
logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err)
}

Check warning on line 113 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L112-L113

Added lines #L112 - L113 were not covered by tests
mu.Lock()
readerCloseSuccessCount += 1
mu.Unlock()
}()

_, _, k, err := ref.Split()
Expand All @@ -136,7 +139,9 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri
if err != nil {
logger.Errorf(ctx, "failed to close File write stream. Error: %s", err)
}

Check warning on line 141 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L140-L141

Added lines #L140 - L141 were not covered by tests
mu.Lock()
writerCloseSuccessCount += 1
mu.Unlock()
}()

_, err = io.Copy(writer, reader)
Expand All @@ -145,22 +150,20 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri
return
}

Check warning on line 151 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L149-L151

Added lines #L149 - L151 were not covered by tests
mu.Lock()
success += 1
downloadSuccess += 1
mu.Unlock()
}()
}
// Go routines are synchronized with a WaitGroup to prevent goroutine leaks.
wg.Wait()
if success != itemCount {
return nil, errors.Errorf("failed to copy %d out of %d remote files from [%s] to local [%s]", itemCount-success, itemCount, blobRef, toPath)
} else if readerCloseSuccessCount != itemCount {
return nil, errors.Errorf("failed to close %d out of %d remote file readers", itemCount-readerCloseSuccessCount, itemCount)
} else if writerCloseSuccessCount != itemCount {
return nil, errors.Errorf("failed to close %d out of %d local file writers", itemCount-writerCloseSuccessCount, itemCount)
} else {
logger.Infof(ctx, "successfully copied %d remote files from [%s] to local [%s]", success, blobRef, toPath)
return toPath, nil
if downloadSuccess != itemCount || readerCloseSuccessCount != itemCount || writerCloseSuccessCount != itemCount {
return nil, errors.Errorf(
"Failed to copy %d out of %d remote files from [%s] to local [%s]. Failed to close %d readers; Failed to close %d writers.",
itemCount-downloadSuccess, itemCount, blobRef, toPath, itemCount-readerCloseSuccessCount, itemCount-writerCloseSuccessCount,
)
}

Check warning on line 164 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L160-L164

Added lines #L160 - L164 were not covered by tests
logger.Infof(ctx, "successfully copied %d remote files from [%s] to local [%s]", downloadSuccess, blobRef, toPath)
return toPath, nil
} else if blob.GetMetadata().GetType().Dimensionality == core.BlobType_SINGLE {
// reader should be declared here (avoid being shared across all goroutines)
var reader io.ReadCloser
Expand Down Expand Up @@ -197,7 +200,7 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri
logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, blobRef, toPath)
return toPath, nil
}

return nil, errors.Errorf("unexpected Blob type encountered")

Check warning on line 204 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L204

Added line #L204 was not covered by tests
}

Expand Down

0 comments on commit db481d0

Please sign in to comment.