Skip to content

Commit

Permalink
Ports S3 Manager Upload and Download Buffering Strategies (#404)
Browse files Browse the repository at this point in the history
* service/s3/s3manager: Add Strategies for Download and Upload Buffering

* Add S3 Upload and Download Manager Performance Benchmarking Tools

* service/s3/s3manager: Use sync.Pool for reuse of part buffers for streaming payloads
  • Loading branch information
skmcgrail authored Oct 1, 2019
1 parent c7fa04b commit 11a6d37
Show file tree
Hide file tree
Showing 34 changed files with 2,593 additions and 115 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ Deprecations

SDK Features
---
* `service/s3/s3manager`: Add Upload Buffer Provider ([#404](https://github.com/aws/aws-sdk-go-v2/pull/404))
* Adds a new `BufferProvider` member for specifying how part data can be buffered in memory.
* Windows platforms will now default to buffering 1MB per part to reduce contention when uploading files.
* Non-Windows platforms will continue to employ a non-buffering behavior.
* `service/s3/s3manager`: Add Download Buffer Provider ([#404](https://github.com/aws/aws-sdk-go-v2/pull/404))
* Adds a new `BufferProvider` member for specifying how part data can be buffered in memory when copying from the http response body.
* Windows platforms will now default to buffering 1MB per part to reduce contention when downloading files.
* Non-Windows platforms will continue to employ a non-buffering behavior.
* `service/dynamodb/dynamodbattribute`: New Encoder and Decoder Behavior for Empty Collections ([#401](https://github.com/aws/aws-sdk-go-v2/pull/401))
* The `Encoder` and `Decoder` types have been enhanced to support the marshaling of empty structures, maps, and slices to and from their respective DynamoDB AttributeValues.
* This change incorporates the behavior changes introduced via a marshal option in V1 ([#2834](https://github.com/aws/aws-sdk-go/pull/2834))
Expand All @@ -24,6 +32,9 @@ SDK Enhancements
* Related to [aws/aws-sdk-go#2310](https://github.com/aws/aws-sdk-go/pull/2310)
* Fixes [#251](https://github.com/aws/aws-sdk-go-v2/issues/251)
* `aws/request` : Retryer is now a named field on Request. ([#393](https://github.com/aws/aws-sdk-go-v2/pull/393))
* `service/s3/s3manager`: Adds `sync.Pool` to allow reuse of part buffers for streaming payloads ([#404](https://github.com/aws/aws-sdk-go-v2/pull/404))
* Fixes [#402](https://github.com/aws/aws-sdk-go-v2/issues/402)
* Uses the new behavior introduced in V1 [#2863](https://github.com/aws/aws-sdk-go/pull/2863) which allows the reuse of the sync.Pool across multiple Upload request that match part sizes.

SDK Bugs
---
Expand Down
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ LINTIGNOREDEPS='vendor/.+\.go'
LINTIGNOREPKGCOMMENT='service/[^/]+/doc_custom.go:.+package comment should be of the form'
LINTIGNOREENDPOINTS='aws/endpoints/defaults.go:.+(method|const) .+ should be '
UNIT_TEST_TAGS="example codegen awsinclude"
ALL_TAGS="example codegen awsinclude integration perftest"

# SDK's Core and client packages that are compatable with Go 1.9+.
SDK_CORE_PKGS=./aws/... ./private/... ./internal/...
Expand Down Expand Up @@ -56,11 +57,14 @@ cleanup-models:
###################
# Unit/CI Testing #
###################
unit: verify
build:
go build -o /dev/null -tags ${ALL_TAGS} ${SDK_ALL_PKGS}

unit: verify build
@echo "go test SDK and vendor packages"
@go test -tags ${UNIT_TEST_TAGS} ${SDK_ALL_PKGS}

unit-with-race-cover: verify
unit-with-race-cover: verify build
@echo "go test SDK and vendor packages"
@go test -tags ${UNIT_TEST_TAGS} -race -cpu=1,2,4 ${SDK_ALL_PKGS}

Expand Down
11 changes: 11 additions & 0 deletions internal/awstesting/discard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package awstesting

// DiscardAt is an io.WriteAt that discards
// the requested bytes to be written
type DiscardAt struct{}

// WriteAt discards the given []byte slice and returns len(p) bytes
// as having been written at the given offset. It will never return an error.
func (d DiscardAt) WriteAt(p []byte, off int64) (n int, err error) {
return len(p), nil
}
12 changes: 12 additions & 0 deletions internal/awstesting/endless_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package awstesting

// EndlessReader is an io.Reader that will always return
// that bytes have been read.
type EndlessReader struct{}

// Read will report that it has read len(p) bytes in p.
// The content in the []byte will be unmodified.
// This will never return an error.
func (e EndlessReader) Read(p []byte) (int, error) {
return len(p), nil
}
34 changes: 34 additions & 0 deletions internal/awstesting/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"os"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -63,3 +64,36 @@ func ConfigWithDefaultRegion(region string) aws.Config {

return cfg
}

// CreateFileOfSize will return an *os.File that is of size bytes
func CreateFileOfSize(dir string, size int64) (*os.File, error) {
file, err := ioutil.TempFile(dir, "s3Bench")
if err != nil {
return nil, err
}

err = file.Truncate(size)
if err != nil {
file.Close()
os.Remove(file.Name())
return nil, err
}

return file, nil
}

// SizeToName returns a human-readable string for the given size bytes
func SizeToName(size int) string {
units := []string{"B", "KB", "MB", "GB"}
i := 0
for size >= 1024 {
size /= 1024
i++
}

if i > len(units)-1 {
i = len(units) - 1
}

return fmt.Sprintf("%d%s", size, units[i])
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
## Performance Utility

Downloads a test file from a S3 bucket using the SDK's S3 download manager. Allows passing
in custom configuration for the HTTP client and SDK's Download Manager behavior.

## Build
### Standalone
```sh
go build -tags "integration perftest" -o s3DownloadManager ./awstesting/integration/performance/s3DownloadManager
```
### Benchmarking
```sh
go test -tags "integration perftest" -c -o s3DownloadManager ./awstesting/integration/performance/s3DownloadManager
```

## Usage Example:
### Standalone
```sh
AWS_REGION=us-west-2 AWS_PROFILE=aws-go-sdk-team-test ./s3DownloadManager \
-bucket aws-sdk-go-data \
-size 10485760 \
-client.idle-conns 1000 \
-client.idle-conns-host 300 \
-client.timeout.connect=1s \
-client.timeout.response-header=1s
```

### Benchmarking
```sh
AWS_REGION=us-west-2 AWS_PROFILE=aws-go-sdk-team-test ./s3DownloadManager \
-test.bench=. \
-test.benchmem \
-test.benchtime 1x \
-bucket aws-sdk-go-data \
-client.idle-conns 1000 \
-client.idle-conns-host 300 \
-client.timeout.connect=1s \
-client.timeout.response-header=1s
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// +build integration,perftest

package main

import (
"net"
"net/http"
"time"
)

func NewClient(cfg ClientConfig) *http.Client {
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: cfg.Timeouts.Connect,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: cfg.MaxIdleConns,
MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost,
IdleConnTimeout: 90 * time.Second,

DisableKeepAlives: !cfg.KeepAlive,
TLSHandshakeTimeout: cfg.Timeouts.TLSHandshake,
ExpectContinueTimeout: cfg.Timeouts.ExpectContinue,
ResponseHeaderTimeout: cfg.Timeouts.ResponseHeader,
}

return &http.Client{
Transport: tr,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// +build integration,perftest

package main

import (
"flag"
"fmt"
"net/http"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
)

type Config struct {
Bucket string
Size int64
LogVerbose bool

SDK SDKConfig
Client ClientConfig
}

func (c *Config) SetupFlags(prefix string, flagset *flag.FlagSet) {
flagset.StringVar(&c.Bucket, "bucket", "",
"The S3 bucket `name` to download the object from.")
flagset.Int64Var(&c.Size, "size", 0,
"The S3 object size in bytes to be first uploaded then downloaded")
flagset.BoolVar(&c.LogVerbose, "verbose", false,
"The output log will include verbose request information")

c.SDK.SetupFlags(prefix, flagset)
c.Client.SetupFlags(prefix, flagset)
}

func (c *Config) Validate() error {
var errs Errors

if len(c.Bucket) == 0 || c.Size <= 0 {
errs = append(errs, fmt.Errorf("bucket and filename/size are required"))
}

if err := c.SDK.Validate(); err != nil {
errs = append(errs, err)
}
if err := c.Client.Validate(); err != nil {
errs = append(errs, err)
}

if len(errs) != 0 {
return errs
}

return nil
}

type SDKConfig struct {
PartSize int64
Concurrency int
BufferProvider s3manager.WriterReadFromProvider
}

func (c *SDKConfig) SetupFlags(prefix string, flagset *flag.FlagSet) {
prefix += "sdk."

flagset.Int64Var(&c.PartSize, prefix+"part-size", s3manager.DefaultDownloadPartSize,
"Specifies the `size` of parts of the object to download.")
flagset.IntVar(&c.Concurrency, prefix+"concurrency", s3manager.DefaultDownloadConcurrency,
"Specifies the number of parts to download `at once`.")
}

func (c *SDKConfig) Validate() error {
return nil
}

type ClientConfig struct {
KeepAlive bool
Timeouts Timeouts

MaxIdleConns int
MaxIdleConnsPerHost int
}

func (c *ClientConfig) SetupFlags(prefix string, flagset *flag.FlagSet) {
prefix += "client."

flagset.BoolVar(&c.KeepAlive, prefix+"http-keep-alive", true,
"Specifies if HTTP keep alive is enabled.")

defTR := http.DefaultTransport.(*http.Transport)

flagset.IntVar(&c.MaxIdleConns, prefix+"idle-conns", defTR.MaxIdleConns,
"Specifies max idle connection pool size.")

flagset.IntVar(&c.MaxIdleConnsPerHost, prefix+"idle-conns-host", http.DefaultMaxIdleConnsPerHost,
"Specifies max idle connection pool per host, will be truncated by idle-conns.")

c.Timeouts.SetupFlags(prefix, flagset)
}

func (c *ClientConfig) Validate() error {
var errs Errors

if err := c.Timeouts.Validate(); err != nil {
errs = append(errs, err)
}

if len(errs) != 0 {
return errs
}
return nil
}

type Timeouts struct {
Connect time.Duration
TLSHandshake time.Duration
ExpectContinue time.Duration
ResponseHeader time.Duration
}

func (c *Timeouts) SetupFlags(prefix string, flagset *flag.FlagSet) {
prefix += "timeout."

flagset.DurationVar(&c.Connect, prefix+"connect", 30*time.Second,
"The `timeout` connecting to the remote host.")

defTR := http.DefaultTransport.(*http.Transport)

flagset.DurationVar(&c.TLSHandshake, prefix+"tls", defTR.TLSHandshakeTimeout,
"The `timeout` waiting for the TLS handshake to complete.")

flagset.DurationVar(&c.ExpectContinue, prefix+"expect-continue", defTR.ExpectContinueTimeout,
"The `timeout` waiting for the TLS handshake to complete.")

flagset.DurationVar(&c.ResponseHeader, prefix+"response-header", defTR.ResponseHeaderTimeout,
"The `timeout` waiting for the TLS handshake to complete.")
}

func (c *Timeouts) Validate() error {
return nil
}

type Errors []error

func (es Errors) Error() string {
var buf strings.Builder
for _, e := range es {
buf.WriteString(e.Error())
}

return buf.String()
}
Loading

0 comments on commit 11a6d37

Please sign in to comment.