Skip to content

Commit

Permalink
aws/request: Reorganize retry behavior to reduce separation (aws#2744)
Browse files Browse the repository at this point in the history
Reorganizes the SDK's retry utilities to follow a consistent code path.
Request.IsErrorRetryable is the primary entry pointer for determining if
a request error should be retryable instead of split between
Request.Send and DefaultRetryer calling IsErrorRetryable. This also
gives the implementation of the Retryer interface consistent control
over when a request will be retried.

Also adds support for request and service specific API error retry and
throttling to be enabled by specifying API error codes before a request
is sent. The "RetryErrorCodes" and "ThrottleErrorCodes" members were added to
request.Request struct. These are used by the Request's IsErrorRetryable
and IsErrorThrottle respectively to specify additional API error codes
that identify the failed request attempt as needing to be retried or
throttled. The DefaultRetryer uses both of these methods when
determining if a failed request attempt should be retried.

Related to aws#1376
  • Loading branch information
jasdel authored Aug 13, 2019
1 parent 0b6d7d6 commit 2a80185
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 148 deletions.
31 changes: 9 additions & 22 deletions aws/client/default_retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
// struct and override the specific methods. For example, to override only
// the MaxRetries method:
//
// type retryer struct {
// client.DefaultRetryer
// }
// type retryer struct {
// client.DefaultRetryer
// }
//
// // This implementation always has 100 max retries
// func (d retryer) MaxRetries() int { return 100 }
// // This implementation always has 100 max retries
// func (d retryer) MaxRetries() int { return 100 }
type DefaultRetryer struct {
NumMaxRetries int
}
Expand All @@ -34,8 +34,8 @@ func (d DefaultRetryer) MaxRetries() int {
func (d DefaultRetryer) RetryRules(r *request.Request) time.Duration {
// Set the upper limit of delay in retrying at ~five minutes
minTime := 30
throttle := d.shouldThrottle(r)
if throttle {
isThrottle := r.IsErrorThrottle()
if isThrottle {
if delay, ok := getRetryDelay(r); ok {
return delay
}
Expand All @@ -44,7 +44,7 @@ func (d DefaultRetryer) RetryRules(r *request.Request) time.Duration {
}

retryCount := r.RetryCount
if throttle && retryCount > 8 {
if isThrottle && retryCount > 8 {
retryCount = 8
} else if retryCount > 13 {
retryCount = 13
Expand All @@ -65,21 +65,8 @@ func (d DefaultRetryer) ShouldRetry(r *request.Request) bool {
if r.HTTPResponse.StatusCode >= 500 && r.HTTPResponse.StatusCode != 501 {
return true
}
return r.IsErrorRetryable() || d.shouldThrottle(r)
}

// ShouldThrottle returns true if the request should be throttled.
func (d DefaultRetryer) shouldThrottle(r *request.Request) bool {
switch r.HTTPResponse.StatusCode {
case 429:
case 502:
case 503:
case 504:
default:
return r.IsErrorThrottle()
}

return true
return r.IsErrorRetryable() || r.IsErrorThrottle()
}

// This will look in the Retry-After header, RFC 7231, for how long
Expand Down
2 changes: 1 addition & 1 deletion aws/client/default_retryer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestRetryThrottleStatusCodes(t *testing.T) {

d := DefaultRetryer{NumMaxRetries: 10}
for i, c := range cases {
throttle := d.shouldThrottle(&c.r)
throttle := c.r.IsErrorThrottle()
retry := d.ShouldRetry(&c.r)

if e, a := c.expectThrottle, throttle; e != a {
Expand Down
60 changes: 31 additions & 29 deletions aws/corehandlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ func handleSendError(r *request.Request, err error) {
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}
}
// Catch all other request errors.
// Catch all request errors, and let the default retrier determine
// if the error is retryable.
r.Error = awserr.New("RequestError", "send request failed", err)
r.Retryable = aws.Bool(true) // network errors are retryable

// Override the error with a context canceled error, if that was canceled.
ctx := r.Context()
Expand All @@ -184,37 +184,39 @@ var ValidateResponseHandler = request.NamedHandler{Name: "core.ValidateResponseH

// AfterRetryHandler performs final checks to determine if the request should
// be retried and how long to delay.
var AfterRetryHandler = request.NamedHandler{Name: "core.AfterRetryHandler", Fn: func(r *request.Request) {
// If one of the other handlers already set the retry state
// we don't want to override it based on the service's state
if r.Retryable == nil || aws.BoolValue(r.Config.EnforceShouldRetryCheck) {
r.Retryable = aws.Bool(r.ShouldRetry(r))
}
var AfterRetryHandler = request.NamedHandler{
Name: "core.AfterRetryHandler",
Fn: func(r *request.Request) {
// If one of the other handlers already set the retry state
// we don't want to override it based on the service's state
if r.Retryable == nil || aws.BoolValue(r.Config.EnforceShouldRetryCheck) {
r.Retryable = aws.Bool(r.ShouldRetry(r))
}

if r.WillRetry() {
r.RetryDelay = r.RetryRules(r)
if r.WillRetry() {
r.RetryDelay = r.RetryRules(r)

if sleepFn := r.Config.SleepDelay; sleepFn != nil {
// Support SleepDelay for backwards compatibility and testing
sleepFn(r.RetryDelay)
} else if err := aws.SleepWithContext(r.Context(), r.RetryDelay); err != nil {
r.Error = awserr.New(request.CanceledErrorCode,
"request context canceled", err)
r.Retryable = aws.Bool(false)
return
}
if sleepFn := r.Config.SleepDelay; sleepFn != nil {
// Support SleepDelay for backwards compatibility and testing
sleepFn(r.RetryDelay)
} else if err := aws.SleepWithContext(r.Context(), r.RetryDelay); err != nil {
r.Error = awserr.New(request.CanceledErrorCode,
"request context canceled", err)
r.Retryable = aws.Bool(false)
return
}

// when the expired token exception occurs the credentials
// need to be expired locally so that the next request to
// get credentials will trigger a credentials refresh.
if r.IsErrorExpired() {
r.Config.Credentials.Expire()
}
// when the expired token exception occurs the credentials
// need to be expired locally so that the next request to
// get credentials will trigger a credentials refresh.
if r.IsErrorExpired() {
r.Config.Credentials.Expire()
}

r.RetryCount++
r.Error = nil
}
}}
r.RetryCount++
r.Error = nil
}
}}

// ValidateEndpointHandler is a request handler to validate a request had the
// appropriate Region and Endpoint set. Will set r.Error if the endpoint or
Expand Down
6 changes: 3 additions & 3 deletions aws/request/http_request_retry_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package request_test

import (
"errors"
"strings"
"testing"

Expand All @@ -14,14 +13,15 @@ func TestRequestCancelRetry(t *testing.T) {
c := make(chan struct{})

reqNum := 0
s := mock.NewMockClient(aws.NewConfig().WithMaxRetries(10))
s := mock.NewMockClient(&aws.Config{
MaxRetries: aws.Int(1),
})
s.Handlers.Validate.Clear()
s.Handlers.Unmarshal.Clear()
s.Handlers.UnmarshalMeta.Clear()
s.Handlers.UnmarshalError.Clear()
s.Handlers.Send.PushFront(func(r *request.Request) {
reqNum++
r.Error = errors.New("net/http: request canceled")
})
out := &testData{}
r := s.NewRequest(&request.Operation{Name: "Operation"}, nil, out)
Expand Down
79 changes: 19 additions & 60 deletions aws/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"io"
"net"
"net/http"
"net/url"
"reflect"
Expand Down Expand Up @@ -65,6 +64,15 @@ type Request struct {
LastSignedAt time.Time
DisableFollowRedirects bool

// Additional API error codes that should be retried. IsErrorRetryable
// will consider these codes in addition to its built in cases.
RetryErrorCodes []string

// Additional API error codes that should be retried with throttle backoff
// delay. IsErrorThrottle will consider these codes in addition to its
// built in cases.
ThrottleErrorCodes []string

// A value greater than 0 instructs the request to be signed as Presigned URL
// You should not set this field directly. Instead use Request's
// Presign or PresignRequest methods.
Expand Down Expand Up @@ -498,21 +506,17 @@ func (r *Request) Send() error {

if err := r.sendRequest(); err == nil {
return nil
} else if !shouldRetryError(r.Error) {
}
r.Handlers.Retry.Run(r)
r.Handlers.AfterRetry.Run(r)

if r.Error != nil || !aws.BoolValue(r.Retryable) {
return r.Error
}

if err := r.prepareRetry(); err != nil {
r.Error = err
return err
} else {
r.Handlers.Retry.Run(r)
r.Handlers.AfterRetry.Run(r)

if r.Error != nil || !aws.BoolValue(r.Retryable) {
return r.Error
}

if err := r.prepareRetry(); err != nil {
r.Error = err
return err
}
continue
}
}
}
Expand Down Expand Up @@ -596,51 +600,6 @@ func AddToUserAgent(r *Request, s string) {
r.HTTPRequest.Header.Set("User-Agent", s)
}

type temporary interface {
Temporary() bool
}

func shouldRetryError(origErr error) bool {
switch err := origErr.(type) {
case awserr.Error:
if err.Code() == CanceledErrorCode {
return false
}
return shouldRetryError(err.OrigErr())
case *url.Error:
if strings.Contains(err.Error(), "connection refused") {
// Refused connections should be retried as the service may not yet
// be running on the port. Go TCP dial considers refused
// connections as not temporary.
return true
}
// *url.Error only implements Temporary after golang 1.6 but since
// url.Error only wraps the error:
return shouldRetryError(err.Err)
case temporary:
if netErr, ok := err.(*net.OpError); ok && netErr.Op == "dial" {
return true
}
// If the error is temporary, we want to allow continuation of the
// retry process
return err.Temporary() || isErrConnectionReset(origErr)
case nil:
// `awserr.Error.OrigErr()` can be nil, meaning there was an error but
// because we don't know the cause, it is marked as retryable. See
// TestRequest4xxUnretryable for an example.
return true
default:
switch err.Error() {
case "net/http: request canceled",
"net/http: request canceled while waiting for connection":
// known 1.5 error case when an http request is cancelled
return false
}
// here we don't know the error; so we allow a retry.
return true
}
}

// SanitizeHostForHeader removes default port from host and updates request.Host
func SanitizeHostForHeader(r *http.Request) {
host := getHost(r)
Expand Down
72 changes: 64 additions & 8 deletions aws/request/request_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func TestShouldRetryError_timeout(t *testing.T) {

tr := &http.Transport{}
defer tr.CloseIdleConnections()
cli := http.Client{
client := http.Client{
Timeout: time.Nanosecond,
Transport: tr,
}

resp, err := cli.Do(newRequest(t, "https://179.179.179.179/no/such/host"))
resp, err := client.Do(newRequest(t, "https://179.179.179.179/no/such/host"))
if resp != nil {
resp.Body.Close()
}
Expand All @@ -53,7 +53,7 @@ func TestShouldRetryError_timeout(t *testing.T) {
func TestShouldRetryError_cancelled(t *testing.T) {
tr := &http.Transport{}
defer tr.CloseIdleConnections()
cli := http.Client{
client := http.Client{
Transport: tr,
}

Expand Down Expand Up @@ -82,7 +82,7 @@ func TestShouldRetryError_cancelled(t *testing.T) {
close(ch) // request is cancelled before anything
}()

resp, err := cli.Do(r)
resp, err := client.Do(r)
if resp != nil {
resp.Body.Close()
}
Expand Down Expand Up @@ -131,10 +131,7 @@ func debugerr(t *testing.T, err error) {
t.Logf("%s is a temporary error: %t", err, err.Temporary())
return
case *url.Error:
// we should be before 1.5
// that's our case !
t.Logf("err: %s", err)
t.Logf("err: %#v", err.Err)
t.Logf("err: %s, nested err: %#v", err, err.Err)
if operr, ok := err.Err.(*net.OpError); ok {
t.Logf("operr: %#v", operr)
}
Expand All @@ -144,3 +141,62 @@ func debugerr(t *testing.T, err error) {
return
}
}

func TestRequest_retryCustomCodes(t *testing.T) {
cases := map[string]struct {
Code string
RetryErrorCodes []string
ThrottleErrorCodes []string
Retryable bool
Throttle bool
}{
"retry code": {
Code: "RetryMePlease",
RetryErrorCodes: []string{
"RetryMePlease",
"SomeOtherError",
},
Retryable: true,
},
"throttle code": {
Code: "AThrottleableError",
RetryErrorCodes: []string{
"RetryMePlease",
"SomeOtherError",
},
ThrottleErrorCodes: []string{
"AThrottleableError",
"SomeOtherError",
},
Throttle: true,
},
"unknown code": {
Code: "UnknownCode",
RetryErrorCodes: []string{
"RetryMePlease",
"SomeOtherError",
},
Retryable: false,
},
}

for name, c := range cases {
req := Request{
HTTPRequest: &http.Request{},
HTTPResponse: &http.Response{},
Error: awserr.New(c.Code, "some error", nil),
RetryErrorCodes: c.RetryErrorCodes,
ThrottleErrorCodes: c.ThrottleErrorCodes,
}

retryable := req.IsErrorRetryable()
if e, a := c.Retryable, retryable; e != a {
t.Errorf("%s, expect %v retryable, got %v", name, e, a)
}

throttle := req.IsErrorThrottle()
if e, a := c.Throttle, throttle; e != a {
t.Errorf("%s, expect %v throttle, got %v", name, e, a)
}
}
}
Loading

0 comments on commit 2a80185

Please sign in to comment.