Skip to content

Commit

Permalink
feat: Implemented response body streaming for GetObject action
Browse files Browse the repository at this point in the history
  • Loading branch information
0x180 committed Jul 8, 2024
1 parent 157f22b commit e773872
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 81 deletions.
9 changes: 2 additions & 7 deletions backend/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (az *Azure) DeleteBucketTagging(ctx context.Context, bucket string) error {
return az.PutBucketTagging(ctx, bucket, nil)
}

func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput, writer io.Writer) (*s3.GetObjectOutput, error) {
func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
var opts *azblob.DownloadStreamOptions
if *input.Range != "" {
offset, count, err := backend.ParseRange(0, *input.Range)
Expand All @@ -429,12 +429,6 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput, writer
if err != nil {
return nil, azureErrToS3Err(err)
}
defer blobDownloadResponse.Body.Close()

_, err = io.Copy(writer, blobDownloadResponse.Body)
if err != nil {
return nil, fmt.Errorf("copy data: %w", err)
}

var tagcount int32
if blobDownloadResponse.TagCount != nil {
Expand All @@ -451,6 +445,7 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput, writer
Metadata: parseAzMetadata(blobDownloadResponse.Metadata),
TagCount: &tagcount,
ContentRange: blobDownloadResponse.ContentRange,
Body: blobDownloadResponse.Body,
}, nil
}

Expand Down
5 changes: 2 additions & 3 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bufio"
"context"
"fmt"
"io"

"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
Expand Down Expand Up @@ -60,7 +59,7 @@ type Backend interface {
// standard object operations
PutObject(context.Context, *s3.PutObjectInput) (string, error)
HeadObject(context.Context, *s3.HeadObjectInput) (*s3.HeadObjectOutput, error)
GetObject(context.Context, *s3.GetObjectInput, io.Writer) (*s3.GetObjectOutput, error)
GetObject(context.Context, *s3.GetObjectInput) (*s3.GetObjectOutput, error)
GetObjectAcl(context.Context, *s3.GetObjectAclInput) (*s3.GetObjectAclOutput, error)
GetObjectAttributes(context.Context, *s3.GetObjectAttributesInput) (s3response.GetObjectAttributesResult, error)
CopyObject(context.Context, *s3.CopyObjectInput) (*s3.CopyObjectOutput, error)
Expand Down Expand Up @@ -180,7 +179,7 @@ func (BackendUnsupported) PutObject(context.Context, *s3.PutObjectInput) (string
func (BackendUnsupported) HeadObject(context.Context, *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) GetObject(context.Context, *s3.GetObjectInput, io.Writer) (*s3.GetObjectOutput, error) {
func (BackendUnsupported) GetObject(context.Context, *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) GetObjectAcl(context.Context, *s3.GetObjectAclInput) (*s3.GetObjectAclOutput, error) {
Expand Down
15 changes: 15 additions & 0 deletions backend/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"crypto/md5"
"encoding/hex"
"fmt"
"io"
"net/http"
"os"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -128,3 +130,16 @@ func md5String(data []byte) string {
sum := md5.Sum(data)
return hex.EncodeToString(sum[:])
}

type FileSectionReadCloser struct {
R io.Reader
F *os.File
}

func (f *FileSectionReadCloser) Read(p []byte) (int, error) {
return f.R.Read(p)
}

func (f *FileSectionReadCloser) Close() error {
return f.F.Close()
}
34 changes: 15 additions & 19 deletions backend/posix/posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -1593,7 +1593,7 @@ func (p *Posix) DeleteObjects(ctx context.Context, input *s3.DeleteObjectsInput)
}, nil
}

func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput, writer io.Writer) (*s3.GetObjectOutput, error) {
func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
if input.Bucket == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidBucketName)
}
Expand Down Expand Up @@ -1637,11 +1637,11 @@ func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput, writer io
}

if length == -1 {
length = objSize - startOffset + 1
length = objSize - startOffset
}

if startOffset+length > objSize+1 {
length = objSize - startOffset + 1
if startOffset+length > objSize {
length = objSize - startOffset
}

var contentRange string
Expand Down Expand Up @@ -1684,21 +1684,6 @@ func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput, writer io
}, nil
}

f, err := os.Open(objPath)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return nil, fmt.Errorf("open object: %w", err)
}
defer f.Close()

rdr := io.NewSectionReader(f, startOffset, length)
_, err = io.Copy(writer, rdr)
if err != nil {
return nil, fmt.Errorf("copy data: %w", err)
}

userMetaData := make(map[string]string)

contentType, contentEncoding := p.loadUserMetaData(bucket, object, userMetaData)
Expand All @@ -1719,6 +1704,16 @@ func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput, writer io
tagCount = &tgCount
}

f, err := os.Open(objPath)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return nil, fmt.Errorf("open object: %w", err)
}

rdr := io.NewSectionReader(f, startOffset, length)

return &s3.GetObjectOutput{
AcceptRanges: &acceptRange,
ContentLength: &length,
Expand All @@ -1729,6 +1724,7 @@ func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput, writer io
Metadata: userMetaData,
TagCount: tagCount,
ContentRange: &contentRange,
Body: &backend.FileSectionReadCloser{R: rdr, F: f},
}, nil
}

Expand Down
8 changes: 1 addition & 7 deletions backend/s3proxy/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,17 +314,11 @@ func (s *S3Proxy) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s
return out, handleError(err)
}

func (s *S3Proxy) GetObject(ctx context.Context, input *s3.GetObjectInput, w io.Writer) (*s3.GetObjectOutput, error) {
func (s *S3Proxy) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
output, err := s.client.GetObject(ctx, input)
if err != nil {
return nil, handleError(err)
}
defer output.Body.Close()

_, err = io.Copy(w, output.Body)
if err != nil {
return nil, err
}

return output, nil
}
Expand Down
8 changes: 2 additions & 6 deletions backend/scoutfs/scoutfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ func (s *ScoutFS) retrieveUploadId(bucket, object string) (string, [32]byte, err
return entries[0].Name(), sum, nil
}

func (s *ScoutFS) GetObject(_ context.Context, input *s3.GetObjectInput, writer io.Writer) (*s3.GetObjectOutput, error) {
func (s *ScoutFS) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
bucket := *input.Bucket
object := *input.Key
acceptRange := *input.Range
Expand Down Expand Up @@ -658,13 +658,8 @@ func (s *ScoutFS) GetObject(_ context.Context, input *s3.GetObjectInput, writer
if err != nil {
return nil, fmt.Errorf("open object: %w", err)
}
defer f.Close()

rdr := io.NewSectionReader(f, startOffset, length)
_, err = io.Copy(writer, rdr)
if err != nil {
return nil, fmt.Errorf("copy data: %w", err)
}

userMetaData := make(map[string]string)

Expand Down Expand Up @@ -694,6 +689,7 @@ func (s *ScoutFS) GetObject(_ context.Context, input *s3.GetObjectInput, writer
TagCount: &tagCount,
StorageClass: types.StorageClassStandard,
ContentRange: &contentRange,
Body: &backend.FileSectionReadCloser{R: rdr, F: f},
}, nil
}

Expand Down
15 changes: 4 additions & 11 deletions s3api/controllers/backend_moq_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 5 additions & 14 deletions s3api/controllers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error {
Key: &key,
Range: &acceptRange,
VersionId: &versionId,
}, ctx.Response().BodyWriter())
})
if err != nil {
return SendResponse(ctx, err,
&MetaOpts{
Expand All @@ -412,15 +412,6 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error {
BucketOwner: parsedAcl.Owner,
})
}
if res == nil {
return SendResponse(ctx, fmt.Errorf("get object nil response"),
&MetaOpts{
Logger: c.logger,
MetricsMng: c.mm,
Action: metrics.ActionGetObject,
BucketOwner: parsedAcl.Owner,
})
}

utils.SetMetaHeaders(ctx, res.Metadata)
var lastmod string
Expand All @@ -429,10 +420,6 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error {
}

utils.SetResponseHeaders(ctx, []utils.CustomHeader{
{
Key: "Content-Length",
Value: fmt.Sprint(getint64(res.ContentLength)),
},
{
Key: "Content-Type",
Value: getstring(res.ContentType),
Expand Down Expand Up @@ -477,6 +464,10 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error {
status = http.StatusPartialContent
}

if res.Body != nil {
ctx.Response().SetBodyStream(res.Body, int(getint64(res.ContentLength)))
}

return SendResponse(ctx, nil,
&MetaOpts{
Logger: c.logger,
Expand Down
3 changes: 1 addition & 2 deletions s3api/controllers/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"reflect"
Expand Down Expand Up @@ -191,7 +190,7 @@ func TestS3ApiController_GetActions(t *testing.T) {
GetObjectAttributesFunc: func(context.Context, *s3.GetObjectAttributesInput) (s3response.GetObjectAttributesResult, error) {
return s3response.GetObjectAttributesResult{}, nil
},
GetObjectFunc: func(context.Context, *s3.GetObjectInput, io.Writer) (*s3.GetObjectOutput, error) {
GetObjectFunc: func(context.Context, *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
return &s3.GetObjectOutput{
Metadata: map[string]string{"hello": "world"},
ContentType: getPtr("application/xml"),
Expand Down
Loading

0 comments on commit e773872

Please sign in to comment.