diff --git a/.JuliaFormatter.toml b/.JuliaFormatter.toml new file mode 100644 index 0000000000..1e72b507ee --- /dev/null +++ b/.JuliaFormatter.toml @@ -0,0 +1 @@ +style="blue" diff --git a/Project.toml b/Project.toml index b42a527fa6..1cd86fa4a8 100644 --- a/Project.toml +++ b/Project.toml @@ -15,7 +15,7 @@ JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" MbedTLS = "739be429-bea8-5141-9913-cc70e7f3736d" Mocking = "78c3b35d-d492-501b-9361-3d52fe80e533" OrderedCollections = "bac558e1-5e72-5ebc-8fee-abe8a469f55d" -Retry = "20febd7b-183b-5ae2-ac4a-720e7ce64774" +Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" URIs = "5c2747f8-b7ea-4ff2-ba2e-563bfd36b1d4" UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" @@ -30,17 +30,17 @@ JSON = "0.18, 0.19, 0.20, 0.21" MbedTLS = "0.6, 0.7, 1" Mocking = "0.7" OrderedCollections = "1.3" -Retry = "0.3, 0.4" +StableRNGs = "1" URIs = "1" XMLDict = "0.3, 0.4" julia = "1.3" [extras] Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" -Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" Suppressor = "fd094767-a336-5f1f-9728-57cf17d0bbfb" +StableRNGs = "860ef19b-820b-49d6-a774-d7a799459cd3" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" [targets] -test = ["Pkg", "Random", "Suppressor", "Test", "UUIDs"] +test = ["Pkg", "Suppressor", "StableRNGs", "Test", "UUIDs"] diff --git a/src/AWS.jl b/src/AWS.jl index 1b5508e583..78d35d751f 100644 --- a/src/AWS.jl +++ b/src/AWS.jl @@ -8,11 +8,11 @@ using HTTP using MbedTLS using Mocking using OrderedCollections: LittleDict, OrderedDict -using Retry using Sockets using URIs using UUIDs: UUIDs using XMLDict +using Random export @service export _merge diff --git a/src/utilities/downloads_backend.jl b/src/utilities/downloads_backend.jl index ab31e4756b..00c298ad06 100644 --- a/src/utilities/downloads_backend.jl +++ b/src/utilities/downloads_backend.jl @@ -76,37 +76,44 @@ function _http_request(backend::DownloadsBackend, request::Request, response_str local buffer local response + + check = function (s, e) + return (isa(e, HTTP.StatusError) && AWS._http_status(e) >= 500) || + isa(e, Downloads.RequestError) + end + + delays = AWSExponentialBackoff(; max_attempts=4) + + get_response = function () + # Use a sacrificial I/O stream so that we only write the `response_stream` once + # even with multiple attempts. + buffer = Base.BufferStream() + + # Rewind the input on each attempt otherwise every subsequent attempt will send an + # empty payload. + input !== nothing && seekstart(input) + + r = @mock Downloads.request( + request.url; + input=input, + # Compatibility with Downloads.jl versions below v1.5.2 + # See: https://github.com/JuliaLang/Downloads.jl/issues/131 + output=request.request_method != "HEAD" ? buffer : nothing, + method=request.request_method, + headers=request.headers, + verbose=false, + throw=true, + downloader=downloader, + ) + + response = _http_response(request, r; throw=true) + # We'll rely on lexical scoping; `buffer` and `response` + # are bindings in the outer scope, so we don't need to return here. + return nothing + end + try - @repeat 4 try - # Use a sacrificial I/O stream so that we only write the `response_stream` once - # even with multiple attempts. - buffer = Base.BufferStream() - - # Rewind the input on each attempt otherwise every subsequent attempt will send an - # empty payload. - input !== nothing && seekstart(input) - - r = @mock Downloads.request( - request.url; - input=input, - # Compatibility with Downloads.jl versions below v1.5.2 - # See: https://github.com/JuliaLang/Downloads.jl/issues/131 - output=request.request_method != "HEAD" ? buffer : nothing, - method=request.request_method, - headers=request.headers, - verbose=false, - throw=true, - downloader=downloader, - ) - - response = _http_response(request, r; throw=true) - catch e - @delay_retry if ( - (isa(e, HTTP.StatusError) && AWS._http_status(e) >= 500) || - isa(e, Downloads.RequestError) - ) - end - end + retry(get_response; check=check, delays=delays)() finally close(buffer) diff --git a/src/utilities/request.jl b/src/utilities/request.jl index fe1a48c1fc..8bc9914532 100644 --- a/src/utilities/request.jl +++ b/src/utilities/request.jl @@ -116,7 +116,10 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers request.headers["Host"] = HTTP.URI(request.url).host stream = @something request.response_stream IOBuffer() - @repeat 3 try + local aws_response + local response + + get_response = function () credentials(aws) === nothing || sign!(aws, request) aws_response = @mock _http_request(request.backend, request, stream) @@ -130,32 +133,46 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers throw(AWSException(e, stream)) end end - catch e - if e isa HTTP.StatusError - e = AWSException(e, stream) - elseif !(e isa AWSException) - rethrow(e) + end + + function upgrade_error(f) + return () -> try + return f() + catch e + if e isa HTTP.StatusError + e = AWSException(e, stream) + rethrow(e) + end + rethrow() end + end - @retry if occursin("Signature expired", e.message) + check = function (s, e) + # Pass on non-AWS exceptions. + if !(e isa AWSException) + return false end + occursin("Signature expired", e.message) && return true + # Handle ExpiredToken... # https://github.com/aws/aws-sdk-go/blob/v1.31.5/aws/request/retryer.go#L98 - @retry if e isa AWSException && e.code in EXPIRED_ERROR_CODES + if e isa AWSException && e.code in EXPIRED_ERROR_CODES check_credentials(credentials(aws); force_refresh=true) + return true end # Throttle handling # https://github.com/boto/botocore/blob/1.16.17/botocore/data/_retry.json # https://docs.aws.amazon.com/general/latest/gr/api-retries.html - @delay_retry if _http_status(e.cause) == TOO_MANY_REQUESTS || - e.code in THROTTLING_ERROR_CODES + if _http_status(e.cause) == TOO_MANY_REQUESTS || e.code in THROTTLING_ERROR_CODES + return true end # Handle BadDigest error and CRC32 check sum failure - @retry if _header(e.cause, "crc32body") == "x-amz-crc32" || + if _header(e.cause, "crc32body") == "x-amz-crc32" || e.code in ("BadDigest", "RequestTimeout", "RequestTimeoutException") + return true end if occursin("Missing Authentication Token", e.message) && @@ -166,8 +183,13 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers ), ) end + return false end + delays = AWSExponentialBackoff(; max_attempts=3) + + retry(upgrade_error(get_response); check=check, delays=delays)() + if request.use_response_type return aws_response else @@ -185,38 +207,48 @@ function _http_request(http_backend::HTTPBackend, request::Request, response_str local buffer local response + + get_response = function () + # Use a sacrificial I/O stream so that we only write to the `response_stream` + # once even with multiple attempted requests. Additionally this works around the + # HTTP.jl issue (https://github.com/JuliaWeb/HTTP.jl/issues/543) where the + # `response_stream` is closed automatically. Effectively, this works as if we're + # not using streaming I/O at all, as we write all data at once, but only + # returning data via I/O ensures we aren't relying on response's body being + # populated. + buffer = Base.BufferStream() + + response = @mock HTTP.request( + http_stack, + request.request_method, + HTTP.URI(request.url), + HTTP.mkheaders(request.headers), + request.content; + require_ssl_verification=false, + response_stream=buffer, + http_options..., + ) + + # We'll rely on lexical scoping; `buffer` and `response` + # are bindings in the outer scope, so we don't need to return here. + return nothing + end + + check = function (s, e) + # `Base.IOError` is needed because HTTP.jl can often have errors that aren't + # caught and wrapped in an `HTTP.IOError` + # https://github.com/JuliaWeb/HTTP.jl/issues/382 + return isa(e, Sockets.DNSError) || + isa(e, HTTP.ParseError) || + isa(e, HTTP.IOError) || + isa(e, Base.IOError) || + (isa(e, HTTP.StatusError) && _http_status(e) >= 500) + end + + delays = AWSExponentialBackoff(; max_attempts=4) + try - @repeat 4 try - # Use a sacrificial I/O stream so that we only write to the `response_stream` - # once even with multiple attempted requests. Additionally this works around the - # HTTP.jl issue (https://github.com/JuliaWeb/HTTP.jl/issues/543) where the - # `response_stream` is closed automatically. Effectively, this works as if we're - # not using streaming I/O at all, as we write all data at once, but only - # returning data via I/O ensures we aren't relying on response's body being - # populated. - buffer = Base.BufferStream() - - response = @mock HTTP.request( - http_stack, - request.request_method, - HTTP.URI(request.url), - HTTP.mkheaders(request.headers), - request.content; - require_ssl_verification=false, - response_stream=buffer, - http_options..., - ) - catch e - # `Base.IOError` is needed because HTTP.jl can often have errors that aren't - # caught and wrapped in an `HTTP.IOError` - # https://github.com/JuliaWeb/HTTP.jl/issues/382 - @delay_retry if isa(e, Sockets.DNSError) || - isa(e, HTTP.ParseError) || - isa(e, HTTP.IOError) || - isa(e, Base.IOError) || - (isa(e, HTTP.StatusError) && _http_status(e) >= 500) - end - end + retry(get_response; check=check, delays=delays)() finally # We're unable to read from the `Base.BufferStream` until it has been closed. # HTTP.jl will close passed in `response_stream` keyword. This ensures that it diff --git a/src/utilities/utilities.jl b/src/utilities/utilities.jl index 427ef49bc0..52eaca2518 100644 --- a/src/utilities/utilities.jl +++ b/src/utilities/utilities.jl @@ -139,3 +139,23 @@ end function _assignment_to_kw!(x) return throw(ArgumentError("Expected assignment expression, instead found: `$x`")) end + +# https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html +# Default values for AWS's `standard` retry mode. Note: these can be overridden elsewhere. +Base.@kwdef struct AWSExponentialBackoff + max_attempts::Int = 3 + max_backoff::Float64 = 20.0 + rng::AbstractRNG = Random.GLOBAL_RNG +end + +# We make one more attempt than the number of delays +Base.length(exp::AWSExponentialBackoff) = exp.max_attempts - 1 + +function Base.iterate(exp::AWSExponentialBackoff, i=1) + i >= exp.max_attempts && return nothing + # rand() has values in [0, 1), so we use 1.0 - rand() which has values in (0, 1] required. + b = 1.0 - rand(exp.rng) + r = 2.0 + delay = min(b * r^i, exp.max_backoff) + return delay, i + 1 +end diff --git a/test/runtests.jl b/test/runtests.jl index e671e9939b..3df8afe121 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -30,11 +30,11 @@ using MbedTLS: digest, MD_SHA256, MD_MD5 using Mocking using Pkg using Random -using Retry using Suppressor using Test using UUIDs using XMLDict +using StableRNGs Mocking.activate() diff --git a/test/utilities.jl b/test/utilities.jl index f444983d93..db6aeaf4c7 100644 --- a/test/utilities.jl +++ b/test/utilities.jl @@ -79,3 +79,24 @@ end @test ex == Expr(:kw, :a, true) end end + +# Count the elements in an iterator without using `length` +function count_len(itr) + c = 0 + for _ in itr + c += 1 + end + return c +end + +@testset "AWSExponentialBackoff" begin + for (n, max_backoff) in [(3, 5.0), (10, 20.0)] + itr = AWS.AWSExponentialBackoff(; + max_attempts=n, max_backoff=max_backoff, rng=StableRNG(1) + ) + @test count_len(itr) == n - 1 + @test length(collect(itr)) == n - 1 + @test all(>(0), itr) + @test all(<=(max_backoff), itr) + end +end