Skip to content

Commit

Permalink
service/s3/s3manager: Add unit test for S3 Upload manager part retries
Browse files Browse the repository at this point in the history
Adds a new unit test to S3 Upload Manager to verify the SDK will
correctly retry concurrent multipart uploads.

Update SDK's retry logic to include retrying write: connection reset and
broken pipe, but not read: connection reset.
  • Loading branch information
jasdel committed Jun 14, 2019
1 parent 96b0e0e commit c3d2710
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 23 deletions.
11 changes: 10 additions & 1 deletion aws/request/connection_reset_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,14 @@ import (
)

func isErrConnectionReset(err error) bool {
return strings.Contains(err.Error(), "connection reset")
if strings.Contains(err.Error(), "read: connection reset") {
return false
}

if strings.Contains(err.Error(), "connection reset") ||
strings.Contains(err.Error(), "broken pipe") {
return true
}

return false
}
13 changes: 11 additions & 2 deletions aws/request/connection_reset_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,22 @@ func TestSerializationErrConnectionReset_accept(t *testing.T) {
Err error
ExpectAttempts int
}{
"with temporary": {
"accept with temporary": {
Err: errAcceptConnectionResetStub,
ExpectAttempts: 6,
},
"not temporary": {
"read not temporary": {
Err: errReadConnectionResetStub,
ExpectAttempts: 1,
},
"write with temporary": {
Err: errWriteConnectionResetStub,
ExpectAttempts: 6,
},
"write broken pipe with temporary": {
Err: errWriteBrokenPipeStub,
ExpectAttempts: 6,
},
"generic connection reset": {
Err: errConnectionResetStub,
ExpectAttempts: 6,
Expand Down Expand Up @@ -86,6 +94,7 @@ func TestSerializationErrConnectionReset_accept(t *testing.T) {
}
cfg := unit.Session.Config.Copy()
cfg.MaxRetries = aws.Int(5)
cfg.SleepDelay = func(time.Duration) {}

req := request.New(
*cfg,
Expand Down
2 changes: 0 additions & 2 deletions aws/request/http_request_retry_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// +build go1.5

package request_test

import (
Expand Down
43 changes: 27 additions & 16 deletions aws/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ func (r *Request) WillRetry() bool {
return r.Error != nil && aws.BoolValue(r.Retryable) && r.RetryCount < r.MaxRetries()
}

func fmtAttemptCount(retryCount, maxRetries int) string {
return fmt.Sprintf("attempt %v/%v", retryCount, maxRetries)
}

// ParamsFilled returns if the request's parameters have been populated
// and the parameters are valid. False is returned if no parameters are
// provided or invalid.
Expand Down Expand Up @@ -330,16 +334,17 @@ func getPresignedURL(r *Request, expire time.Duration) (string, http.Header, err
return r.HTTPRequest.URL.String(), r.SignedHeaderVals, nil
}

func debugLogReqError(r *Request, stage string, retrying bool, err error) {
const (
willRetry = "will retry"
notRetrying = "not retrying"
retryCount = "retry %v/%v"
)

func debugLogReqError(r *Request, stage, retryStr string, err error) {
if !r.Config.LogLevel.Matches(aws.LogDebugWithRequestErrors) {
return
}

retryStr := "not retrying"
if retrying {
retryStr = "will retry"
}

r.Config.Logger.Log(fmt.Sprintf("DEBUG: %s %s/%s failed, %s, error %v",
stage, r.ClientInfo.ServiceName, r.Operation.Name, retryStr, err))
}
Expand All @@ -358,12 +363,12 @@ func (r *Request) Build() error {
if !r.built {
r.Handlers.Validate.Run(r)
if r.Error != nil {
debugLogReqError(r, "Validate Request", false, r.Error)
debugLogReqError(r, "Validate Request", notRetrying, r.Error)
return r.Error
}
r.Handlers.Build.Run(r)
if r.Error != nil {
debugLogReqError(r, "Build Request", false, r.Error)
debugLogReqError(r, "Build Request", notRetrying, r.Error)
return r.Error
}
r.built = true
Expand All @@ -379,7 +384,7 @@ func (r *Request) Build() error {
func (r *Request) Sign() error {
r.Build()
if r.Error != nil {
debugLogReqError(r, "Build Request", false, r.Error)
debugLogReqError(r, "Build Request", notRetrying, r.Error)
return r.Error
}

Expand Down Expand Up @@ -473,7 +478,7 @@ func (r *Request) Send() error {
r.AttemptTime = time.Now()

if err := r.Sign(); err != nil {
debugLogReqError(r, "Sign Request", false, err)
debugLogReqError(r, "Sign Request", notRetrying, err)
return err
}

Expand Down Expand Up @@ -520,21 +525,27 @@ func (r *Request) sendRequest() (sendErr error) {
r.Retryable = nil
r.Handlers.Send.Run(r)
if r.Error != nil {
debugLogReqError(r, "Send Request", r.WillRetry(), r.Error)
debugLogReqError(r, "Send Request",
fmtAttemptCount(r.RetryCount, r.MaxRetries()),
r.Error)
return r.Error
}

r.Handlers.UnmarshalMeta.Run(r)
r.Handlers.ValidateResponse.Run(r)
if r.Error != nil {
r.Handlers.UnmarshalError.Run(r)
debugLogReqError(r, "Validate Response", r.WillRetry(), r.Error)
debugLogReqError(r, "Validate Response",
fmtAttemptCount(r.RetryCount, r.MaxRetries()),
r.Error)
return r.Error
}

r.Handlers.Unmarshal.Run(r)
if r.Error != nil {
debugLogReqError(r, "Unmarshal Response", r.WillRetry(), r.Error)
debugLogReqError(r, "Unmarshal Response",
fmtAttemptCount(r.RetryCount, r.MaxRetries()),
r.Error)
return r.Error
}

Expand Down Expand Up @@ -565,8 +576,8 @@ type temporary interface {
Temporary() bool
}

func shouldRetryCancel(err error) bool {
switch err := err.(type) {
func shouldRetryCancel(origErr error) bool {
switch err := origErr.(type) {
case awserr.Error:
if err.Code() == CanceledErrorCode {
return false
Expand All @@ -585,7 +596,7 @@ func shouldRetryCancel(err error) bool {
case temporary:
// If the error is temporary, we want to allow continuation of the
// retry process
return err.Temporary()
return err.Temporary() || isErrConnectionReset(origErr)
case nil:
// `awserr.Error.OrigErr()` can be nil, meaning there was an error but
// because we don't know the cause, it is marked as retryable. See
Expand Down
20 changes: 18 additions & 2 deletions aws/request/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,26 @@ func (e *tempNetworkError) Error() string {

var (
// net.OpError accept, are always temporary
errAcceptConnectionResetStub = &tempNetworkError{isTemp: true, op: "accept", msg: "connection reset"}
errAcceptConnectionResetStub = &tempNetworkError{
isTemp: true, op: "accept", msg: "connection reset",
}

// net.OpError read for ECONNRESET is not temporary.
errReadConnectionResetStub = &tempNetworkError{isTemp: false, op: "read", msg: "connection reset"}
errReadConnectionResetStub = &tempNetworkError{
isTemp: false, op: "read", msg: "connection reset",
}

// net.OpError write for ECONNRESET may not be temporary, but is treaded as
// temporary by the SDK.
errWriteConnectionResetStub = &tempNetworkError{
isTemp: false, op: "write", msg: "connection reset",
}

// net.OpError write for broken pipe may not be temporary, but is treaded as
// temporary by the SDK.
errWriteBrokenPipeStub = &tempNetworkError{
isTemp: false, op: "write", msg: "broken pipe",
}

// Generic connection reset error
errConnectionResetStub = errors.New("connection reset")
Expand Down
Loading

0 comments on commit c3d2710

Please sign in to comment.