diff --git a/service/s3/s3manager/upload.go b/service/s3/s3manager/upload.go index c5d7773eac0..4132bcb8c18 100644 --- a/service/s3/s3manager/upload.go +++ b/service/s3/s3manager/upload.go @@ -508,6 +508,7 @@ func (u *multiuploader) upload(firstBuf io.ReadSeeker) (*UploadOutput, error) { // Read and queue the rest of the parts for u.geterr() == nil { + num++ // This upload exceeded maximum number of supported parts, error now. if num > int64(u.ctx.MaxUploadParts) || num > int64(MaxUploadParts) { var msg string @@ -521,7 +522,6 @@ func (u *multiuploader) upload(firstBuf io.ReadSeeker) (*UploadOutput, error) { u.seterr(awserr.New("TotalPartsExceeded", msg, nil)) break } - num++ buf, err := u.nextReader() if err == io.EOF { @@ -530,7 +530,9 @@ func (u *multiuploader) upload(firstBuf io.ReadSeeker) (*UploadOutput, error) { ch <- chunk{buf: buf, num: num} - if err != nil && err != io.ErrUnexpectedEOF { + if err == io.ErrUnexpectedEOF { + break + } else if err != nil { u.seterr(awserr.New( "ReadRequestBody", "read multipart upload data failed", diff --git a/service/s3/s3manager/upload_test.go b/service/s3/s3manager/upload_test.go index ead4bcda534..9b408dabb99 100644 --- a/service/s3/s3manager/upload_test.go +++ b/service/s3/s3manager/upload_test.go @@ -391,11 +391,15 @@ func TestUploadOrderReadFail2(t *testing.T) { type sizedReader struct { size int cur int + err error } func (s *sizedReader) Read(p []byte) (n int, err error) { if s.cur >= s.size { - return 0, io.EOF + if s.err == nil { + s.err = io.EOF + } + return 0, s.err } n = len(p) @@ -429,6 +433,52 @@ func TestUploadOrderMultiBufferedReader(t *testing.T) { assert.Equal(t, []int{1024 * 1024 * 2, 1024 * 1024 * 5, 1024 * 1024 * 5}, parts) } +func TestUploadOrderMultiBufferedReaderUnexpectedEOF(t *testing.T) { + s, ops, args := loggingSvc(emptyList) + mgr := s3manager.NewUploaderWithClient(s) + _, err := mgr.Upload(&s3manager.UploadInput{ + Bucket: aws.String("Bucket"), + Key: aws.String("Key"), + Body: &sizedReader{size: 1024 * 1024 * 12, err: io.ErrUnexpectedEOF}, + }) + + assert.NoError(t, err) + assert.Equal(t, []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *ops) + + // Part lengths + parts := []int{ + buflen(val((*args)[1], "Body")), + buflen(val((*args)[2], "Body")), + buflen(val((*args)[3], "Body")), + } + sort.Ints(parts) + assert.Equal(t, []int{1024 * 1024 * 2, 1024 * 1024 * 5, 1024 * 1024 * 5}, parts) +} + +// TestUploadOrderMultiBufferedReaderEOF tests the edge case where the +// file size is the same as part size, which means nextReader will +// return io.EOF rather than io.ErrUnexpectedEOF +func TestUploadOrderMultiBufferedReaderEOF(t *testing.T) { + s, ops, args := loggingSvc(emptyList) + mgr := s3manager.NewUploaderWithClient(s) + _, err := mgr.Upload(&s3manager.UploadInput{ + Bucket: aws.String("Bucket"), + Key: aws.String("Key"), + Body: &sizedReader{size: 1024 * 1024 * 10, err: io.EOF}, + }) + + assert.NoError(t, err) + assert.Equal(t, []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *ops) + + // Part lengths + parts := []int{ + buflen(val((*args)[1], "Body")), + buflen(val((*args)[2], "Body")), + } + sort.Ints(parts) + assert.Equal(t, []int{1024 * 1024 * 5, 1024 * 1024 * 5}, parts) +} + func TestUploadOrderMultiBufferedReaderExceedTotalParts(t *testing.T) { s, ops, _ := loggingSvc([]string{"UploadPart"}) mgr := s3manager.NewUploaderWithClient(s, func(u *s3manager.Uploader) {