From 5ef94d77465cd829f06a668c02e544d61545f359 Mon Sep 17 00:00:00 2001 From: Loren Segal Date: Thu, 28 May 2015 01:42:46 -0700 Subject: [PATCH] s3/s3manager: Buffer parts with io.SectionReader when possible Improves memory consumption on io.ReaderAt streams (files, in memory buffers) by avoiding creation of individual byte buffers. Increasing concurrency threshold for seekable streams will no longer impact memory usage. Fixes #232 --- service/s3/s3manager/upload.go | 96 +++++++++++++++++++++++------ service/s3/s3manager/upload_test.go | 59 ++++++++++++++++++ 2 files changed, 137 insertions(+), 18 deletions(-) diff --git a/service/s3/s3manager/upload.go b/service/s3/s3manager/upload.go index 3b540c0e84d..40f21f7c262 100644 --- a/service/s3/s3manager/upload.go +++ b/service/s3/s3manager/upload.go @@ -231,31 +231,89 @@ type uploader struct { s *s3.S3 in *UploadInput opts *UploadOptions + + readerPos int64 // current reader position + totalSize int64 // set to -1 if the size is not known } // internal logic for deciding whether to upload a single part or use a // multipart upload. func (u *uploader) upload() (*UploadOutput, error) { + // Try to get the total size for some optimizations + u.initSize() + // Do one read to determine if we have more than one part - packet := make([]byte, u.opts.PartSize) - n, err := io.ReadFull(u.in.Body, packet) + buf, err := u.nextReader() if err == io.EOF || err == io.ErrUnexpectedEOF { // single part - return u.singlePart(packet[0:n]) + return u.singlePart(buf) } else if err != nil { return nil, apierr.New("ReadRequestBody", "read upload data failed", err) } mu := multiuploader{uploader: u} - return mu.upload(packet) + return mu.upload(buf) +} + +// initSize tries to detect the total stream size, setting u.totalSize. If +// the size is not known, totalSize is set to -1. +func (u *uploader) initSize() { + u.totalSize = -1 + + switch r := u.in.Body.(type) { + case io.Seeker: + pos, _ := r.Seek(0, 1) + defer r.Seek(pos, 0) + + n, err := r.Seek(0, 2) + if err != nil { + return + } + u.totalSize = n + } +} + +// nextReader returns a seekable reader representing the next packet of data. +// This operation increases the shared u.readerPos counter, but note that it +// does not need to be wrapped in a mutex because nextReader is only called +// from the main thread. +func (u *uploader) nextReader() (io.ReadSeeker, error) { + switch r := u.in.Body.(type) { + case io.ReaderAt: + var err error + + n := u.opts.PartSize + if u.totalSize >= 0 { + bytesLeft := u.totalSize - u.readerPos + + if bytesLeft == 0 { + err = io.EOF + } else if bytesLeft <= u.opts.PartSize { + err = io.ErrUnexpectedEOF + n = bytesLeft + } + } + + buf := io.NewSectionReader(r, u.readerPos, n) + u.readerPos += n + + return buf, err + + default: + packet := make([]byte, u.opts.PartSize) + n, err := io.ReadFull(u.in.Body, packet) + u.readerPos += int64(n) + + return bytes.NewReader(packet[0:n]), err + } } // singlePart contains upload logic for uploading a single chunk via // a regular PutObject request. Multipart requests require at least two // parts, or at least 5MB of data. -func (u *uploader) singlePart(part []byte) (*UploadOutput, error) { +func (u *uploader) singlePart(buf io.ReadSeeker) (*UploadOutput, error) { params := &s3.PutObjectInput{} awsutil.Copy(params, u.in) - params.Body = bytes.NewReader(part) + params.Body = buf req, _ := u.s.PutObjectRequest(params) if err := req.Send(); err != nil { @@ -278,7 +336,7 @@ type multiuploader struct { // keeps track of a single chunk of data being sent to S3. type chunk struct { - buf []byte + buf io.ReadSeeker num int64 } @@ -290,9 +348,9 @@ func (a completedParts) Len() int { return len(a) } func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber } -// upload will perform a multipart upload using the firstPart buffer containing +// upload will perform a multipart upload using the firstBuf buffer containing // the first chunk of data. -func (u *multiuploader) upload(firstPart []byte) (*UploadOutput, error) { +func (u *multiuploader) upload(firstBuf io.ReadSeeker) (*UploadOutput, error) { params := &s3.CreateMultipartUploadInput{} awsutil.Copy(params, u.in) @@ -312,19 +370,21 @@ func (u *multiuploader) upload(firstPart []byte) (*UploadOutput, error) { // Send part 1 to the workers var num int64 = 1 - ch <- chunk{buf: firstPart, num: num} + ch <- chunk{buf: firstBuf, num: num} // Read and queue the rest of the parts for u.geterr() == nil { num++ - packet := make([]byte, u.opts.PartSize) - n, err := io.ReadFull(u.in.Body, packet) - ch <- chunk{buf: packet[0:n], num: num} - if err != nil { - if err != io.EOF && err != io.ErrUnexpectedEOF { - u.seterr(apierr.New("ReadRequestBody", "read multipart upload data failed", err)) - } + buf, err := u.nextReader() + if err == io.EOF { + break + } + + ch <- chunk{buf: buf, num: num} + + if err != nil && err != io.ErrUnexpectedEOF { + u.seterr(apierr.New("ReadRequestBody", "read multipart upload data failed", err)) break } } @@ -370,7 +430,7 @@ func (u *multiuploader) send(c chunk) error { resp, err := u.s.UploadPart(&s3.UploadPartInput{ Bucket: u.in.Bucket, Key: u.in.Key, - Body: bytes.NewReader(c.buf), + Body: c.buf, UploadID: &u.uploadID, PartNumber: &c.num, }) diff --git a/service/s3/s3manager/upload_test.go b/service/s3/s3manager/upload_test.go index 89f95bdad95..25b429ed764 100644 --- a/service/s3/s3manager/upload_test.go +++ b/service/s3/s3manager/upload_test.go @@ -6,6 +6,7 @@ import ( "io" "io/ioutil" "net/http" + "sort" "sync" "testing" @@ -286,3 +287,61 @@ func TestUploadOrderReadFail2(t *testing.T) { assert.EqualError(t, err.(awserr.Error).OrigErr().(awserr.Error).OrigErr(), "random failure") assert.Equal(t, []string{"CreateMultipartUpload", "AbortMultipartUpload"}, *ops) } + +type sizedReaderImpl struct { + size int + cur int +} + +type sizedReader struct { + *sizedReaderImpl +} + +func (s sizedReader) Read(p []byte) (n int, err error) { + if s.cur >= s.size { + return 0, io.EOF + } + + n = len(p) + s.cur += len(p) + if s.cur > s.size { + n -= s.cur - s.size + } + + return +} + +func TestUploadOrderMultiBufferedReader(t *testing.T) { + s, ops, args := loggingSvc() + _, err := s3manager.Upload(s, &s3manager.UploadInput{ + Bucket: aws.String("Bucket"), + Key: aws.String("Key"), + Body: sizedReader{&sizedReaderImpl{size: 1024 * 1024 * 12}}, + }, nil) + + assert.NoError(t, err) + assert.Equal(t, []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *ops) + + // Part lengths + parts := []int{ + buflen(val((*args)[1], "Body")), + buflen(val((*args)[2], "Body")), + buflen(val((*args)[3], "Body")), + } + sort.Ints(parts) + assert.Equal(t, []int{1024 * 1024 * 2, 1024 * 1024 * 5, 1024 * 1024 * 5}, parts) +} + +func TestUploadOrderSingleBufferedReader(t *testing.T) { + s, ops, _ := loggingSvc() + resp, err := s3manager.Upload(s, &s3manager.UploadInput{ + Bucket: aws.String("Bucket"), + Key: aws.String("Key"), + Body: sizedReader{&sizedReaderImpl{size: 1024 * 1024 * 2}}, + }, nil) + + assert.NoError(t, err) + assert.Equal(t, []string{"PutObject"}, *ops) + assert.NotEqual(t, "", resp.Location) + assert.Equal(t, "", resp.UploadID) +}