Skip to content

Commit

Permalink
Merge #548
Browse files Browse the repository at this point in the history
548: translate Retry to `Base.retry` r=ericphanson a=ericphanson

Suggested in #542 (comment)

I'm not sure we should merge this as-is, because before we could do immediate retries OR delay retries, and now we can only do delay retries. ~~I'm also not 100% sure the delay amounts are the same.~~

The functional approach is nicer than the macro approach though in my opinion, ~~since the retries don't need to be literals~~. So we could let folks request more retries on a per-request basis, or even put it in the Backend struct.

edit 1: Retry.jl expands to a for-loop, so non-literals are OK. I thought it unrolled the loop.

edit 2: the delay amounts are definitely not the same, but that's a good thing- now we copy the upstream recommendation from https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html. In particular, putting a cap of 20s means we can increase the retries further without danger, xref #550 (we can't do that with Retry.jl since the retries are hard-coded with no cap and get too big- ref #543 (comment)).

---
12 May update:

I'm now in favor of merging this, because I want to get in #550. The first few retries are pretty short, e.g.
```julia
julia> collect(AWS.AWSExponentialBackoff(; max_attempts=10))
9-element Vector{Any}:
  0.3365311959885935
  1.2370247946870188
  2.925043257220695
 12.165245886682294
 20.0
 20.0
  5.5969335490336505
 20.0
 20.0
```
So I think having a short delay instead of an immediate retry in a few cases is probably OK. Also, the AWS docs don't talk about non-delayed retries, they seem to only do delayed retries.

Co-authored-by: Eric Hanson <[email protected]>
  • Loading branch information
bors[bot] and ericphanson authored May 25, 2022
2 parents bf568c3 + 15c6f60 commit d52c639
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 78 deletions.
1 change: 1 addition & 0 deletions .JuliaFormatter.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
style="blue"
8 changes: 4 additions & 4 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
MbedTLS = "739be429-bea8-5141-9913-cc70e7f3736d"
Mocking = "78c3b35d-d492-501b-9361-3d52fe80e533"
OrderedCollections = "bac558e1-5e72-5ebc-8fee-abe8a469f55d"
Retry = "20febd7b-183b-5ae2-ac4a-720e7ce64774"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
URIs = "5c2747f8-b7ea-4ff2-ba2e-563bfd36b1d4"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
Expand All @@ -30,17 +30,17 @@ JSON = "0.18, 0.19, 0.20, 0.21"
MbedTLS = "0.6, 0.7, 1"
Mocking = "0.7"
OrderedCollections = "1.3"
Retry = "0.3, 0.4"
StableRNGs = "1"
URIs = "1"
XMLDict = "0.3, 0.4"
julia = "1.3"

[extras]
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Suppressor = "fd094767-a336-5f1f-9728-57cf17d0bbfb"
StableRNGs = "860ef19b-820b-49d6-a774-d7a799459cd3"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"

[targets]
test = ["Pkg", "Random", "Suppressor", "Test", "UUIDs"]
test = ["Pkg", "Suppressor", "StableRNGs", "Test", "UUIDs"]
2 changes: 1 addition & 1 deletion src/AWS.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ using HTTP
using MbedTLS
using Mocking
using OrderedCollections: LittleDict, OrderedDict
using Retry
using Sockets
using URIs
using UUIDs: UUIDs
using XMLDict
using Random

export @service
export _merge
Expand Down
67 changes: 37 additions & 30 deletions src/utilities/downloads_backend.jl
Original file line number Diff line number Diff line change
Expand Up @@ -76,37 +76,44 @@ function _http_request(backend::DownloadsBackend, request::Request, response_str

local buffer
local response

check = function (s, e)
return (isa(e, HTTP.StatusError) && AWS._http_status(e) >= 500) ||
isa(e, Downloads.RequestError)
end

delays = AWSExponentialBackoff(; max_attempts=4)

get_response = function ()
# Use a sacrificial I/O stream so that we only write the `response_stream` once
# even with multiple attempts.
buffer = Base.BufferStream()

# Rewind the input on each attempt otherwise every subsequent attempt will send an
# empty payload.
input !== nothing && seekstart(input)

r = @mock Downloads.request(
request.url;
input=input,
# Compatibility with Downloads.jl versions below v1.5.2
# See: https://github.com/JuliaLang/Downloads.jl/issues/131
output=request.request_method != "HEAD" ? buffer : nothing,
method=request.request_method,
headers=request.headers,
verbose=false,
throw=true,
downloader=downloader,
)

response = _http_response(request, r; throw=true)
# We'll rely on lexical scoping; `buffer` and `response`
# are bindings in the outer scope, so we don't need to return here.
return nothing
end

try
@repeat 4 try
# Use a sacrificial I/O stream so that we only write the `response_stream` once
# even with multiple attempts.
buffer = Base.BufferStream()

# Rewind the input on each attempt otherwise every subsequent attempt will send an
# empty payload.
input !== nothing && seekstart(input)

r = @mock Downloads.request(
request.url;
input=input,
# Compatibility with Downloads.jl versions below v1.5.2
# See: https://github.com/JuliaLang/Downloads.jl/issues/131
output=request.request_method != "HEAD" ? buffer : nothing,
method=request.request_method,
headers=request.headers,
verbose=false,
throw=true,
downloader=downloader,
)

response = _http_response(request, r; throw=true)
catch e
@delay_retry if (
(isa(e, HTTP.StatusError) && AWS._http_status(e) >= 500) ||
isa(e, Downloads.RequestError)
)
end
end
retry(get_response; check=check, delays=delays)()
finally
close(buffer)

Expand Down
116 changes: 74 additions & 42 deletions src/utilities/request.jl
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers
request.headers["Host"] = HTTP.URI(request.url).host
stream = @something request.response_stream IOBuffer()

@repeat 3 try
local aws_response
local response

get_response = function ()
credentials(aws) === nothing || sign!(aws, request)

aws_response = @mock _http_request(request.backend, request, stream)
Expand All @@ -130,32 +133,46 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers
throw(AWSException(e, stream))
end
end
catch e
if e isa HTTP.StatusError
e = AWSException(e, stream)
elseif !(e isa AWSException)
rethrow(e)
end

function upgrade_error(f)
return () -> try
return f()
catch e
if e isa HTTP.StatusError
e = AWSException(e, stream)
rethrow(e)
end
rethrow()
end
end

@retry if occursin("Signature expired", e.message)
check = function (s, e)
# Pass on non-AWS exceptions.
if !(e isa AWSException)
return false
end

occursin("Signature expired", e.message) && return true

# Handle ExpiredToken...
# https://github.com/aws/aws-sdk-go/blob/v1.31.5/aws/request/retryer.go#L98
@retry if e isa AWSException && e.code in EXPIRED_ERROR_CODES
if e isa AWSException && e.code in EXPIRED_ERROR_CODES
check_credentials(credentials(aws); force_refresh=true)
return true
end

# Throttle handling
# https://github.com/boto/botocore/blob/1.16.17/botocore/data/_retry.json
# https://docs.aws.amazon.com/general/latest/gr/api-retries.html
@delay_retry if _http_status(e.cause) == TOO_MANY_REQUESTS ||
e.code in THROTTLING_ERROR_CODES
if _http_status(e.cause) == TOO_MANY_REQUESTS || e.code in THROTTLING_ERROR_CODES
return true
end

# Handle BadDigest error and CRC32 check sum failure
@retry if _header(e.cause, "crc32body") == "x-amz-crc32" ||
if _header(e.cause, "crc32body") == "x-amz-crc32" ||
e.code in ("BadDigest", "RequestTimeout", "RequestTimeoutException")
return true
end

if occursin("Missing Authentication Token", e.message) &&
Expand All @@ -166,8 +183,13 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers
),
)
end
return false
end

delays = AWSExponentialBackoff(; max_attempts=3)

retry(upgrade_error(get_response); check=check, delays=delays)()

if request.use_response_type
return aws_response
else
Expand All @@ -185,38 +207,48 @@ function _http_request(http_backend::HTTPBackend, request::Request, response_str

local buffer
local response

get_response = function ()
# Use a sacrificial I/O stream so that we only write to the `response_stream`
# once even with multiple attempted requests. Additionally this works around the
# HTTP.jl issue (https://github.com/JuliaWeb/HTTP.jl/issues/543) where the
# `response_stream` is closed automatically. Effectively, this works as if we're
# not using streaming I/O at all, as we write all data at once, but only
# returning data via I/O ensures we aren't relying on response's body being
# populated.
buffer = Base.BufferStream()

response = @mock HTTP.request(
http_stack,
request.request_method,
HTTP.URI(request.url),
HTTP.mkheaders(request.headers),
request.content;
require_ssl_verification=false,
response_stream=buffer,
http_options...,
)

# We'll rely on lexical scoping; `buffer` and `response`
# are bindings in the outer scope, so we don't need to return here.
return nothing
end

check = function (s, e)
# `Base.IOError` is needed because HTTP.jl can often have errors that aren't
# caught and wrapped in an `HTTP.IOError`
# https://github.com/JuliaWeb/HTTP.jl/issues/382
return isa(e, Sockets.DNSError) ||
isa(e, HTTP.ParseError) ||
isa(e, HTTP.IOError) ||
isa(e, Base.IOError) ||
(isa(e, HTTP.StatusError) && _http_status(e) >= 500)
end

delays = AWSExponentialBackoff(; max_attempts=4)

try
@repeat 4 try
# Use a sacrificial I/O stream so that we only write to the `response_stream`
# once even with multiple attempted requests. Additionally this works around the
# HTTP.jl issue (https://github.com/JuliaWeb/HTTP.jl/issues/543) where the
# `response_stream` is closed automatically. Effectively, this works as if we're
# not using streaming I/O at all, as we write all data at once, but only
# returning data via I/O ensures we aren't relying on response's body being
# populated.
buffer = Base.BufferStream()

response = @mock HTTP.request(
http_stack,
request.request_method,
HTTP.URI(request.url),
HTTP.mkheaders(request.headers),
request.content;
require_ssl_verification=false,
response_stream=buffer,
http_options...,
)
catch e
# `Base.IOError` is needed because HTTP.jl can often have errors that aren't
# caught and wrapped in an `HTTP.IOError`
# https://github.com/JuliaWeb/HTTP.jl/issues/382
@delay_retry if isa(e, Sockets.DNSError) ||
isa(e, HTTP.ParseError) ||
isa(e, HTTP.IOError) ||
isa(e, Base.IOError) ||
(isa(e, HTTP.StatusError) && _http_status(e) >= 500)
end
end
retry(get_response; check=check, delays=delays)()
finally
# We're unable to read from the `Base.BufferStream` until it has been closed.
# HTTP.jl will close passed in `response_stream` keyword. This ensures that it
Expand Down
20 changes: 20 additions & 0 deletions src/utilities/utilities.jl
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,23 @@ end
function _assignment_to_kw!(x)
return throw(ArgumentError("Expected assignment expression, instead found: `$x`"))
end

# https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
# Default values for AWS's `standard` retry mode. Note: these can be overridden elsewhere.
Base.@kwdef struct AWSExponentialBackoff
max_attempts::Int = 3
max_backoff::Float64 = 20.0
rng::AbstractRNG = Random.GLOBAL_RNG
end

# We make one more attempt than the number of delays
Base.length(exp::AWSExponentialBackoff) = exp.max_attempts - 1

function Base.iterate(exp::AWSExponentialBackoff, i=1)
i >= exp.max_attempts && return nothing
# rand() has values in [0, 1), so we use 1.0 - rand() which has values in (0, 1] required.
b = 1.0 - rand(exp.rng)
r = 2.0
delay = min(b * r^i, exp.max_backoff)
return delay, i + 1
end
2 changes: 1 addition & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ using MbedTLS: digest, MD_SHA256, MD_MD5
using Mocking
using Pkg
using Random
using Retry
using Suppressor
using Test
using UUIDs
using XMLDict
using StableRNGs

Mocking.activate()

Expand Down
21 changes: 21 additions & 0 deletions test/utilities.jl
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,24 @@ end
@test ex == Expr(:kw, :a, true)
end
end

# Count the elements in an iterator without using `length`
function count_len(itr)
c = 0
for _ in itr
c += 1
end
return c
end

@testset "AWSExponentialBackoff" begin
for (n, max_backoff) in [(3, 5.0), (10, 20.0)]
itr = AWS.AWSExponentialBackoff(;
max_attempts=n, max_backoff=max_backoff, rng=StableRNG(1)
)
@test count_len(itr) == n - 1
@test length(collect(itr)) == n - 1
@test all(>(0), itr)
@test all(<=(max_backoff), itr)
end
end

0 comments on commit d52c639

Please sign in to comment.