diff --git a/src/AWS.jl b/src/AWS.jl index 78d35d751f..989e43504a 100644 --- a/src/AWS.jl +++ b/src/AWS.jl @@ -24,13 +24,16 @@ export JSONService, RestJSONService, RestXMLService, QueryService, set_features const DEFAULT_REGION = "us-east-1" -include(joinpath("utilities", "utilities.jl")) - include("AWSExceptions.jl") + +using ..AWSExceptions +using ..AWSExceptions: AWSException + include("AWSCredentials.jl") include("AWSConfig.jl") include("AWSMetadata.jl") +include(joinpath("utilities", "utilities.jl")) include(joinpath("utilities", "request.jl")) include(joinpath("utilities", "response.jl")) include(joinpath("utilities", "sign.jl")) @@ -38,9 +41,6 @@ include(joinpath("utilities", "downloads_backend.jl")) include("deprecated.jl") -using ..AWSExceptions -using ..AWSExceptions: AWSException - const user_agent = Ref("AWS.jl/1.0.0") const aws_config = Ref{AbstractAWSConfig}() diff --git a/src/utilities/downloads_backend.jl b/src/utilities/downloads_backend.jl index 00c298ad06..3b36f8f509 100644 --- a/src/utilities/downloads_backend.jl +++ b/src/utilities/downloads_backend.jl @@ -6,22 +6,40 @@ using HTTP.MessageRequest: body_was_streamed This backend uses the Downloads.jl stdlib to use libcurl as an HTTP client to connect to the AWS REST API. -It has one field, - -- `downloader::Union{Nothing,Downloads.Downloader}` - -which is the `Downloads.Downloader` to use. If set to `nothing`, the default, -then a global downloader object will be used. - Downloads.jl tends to perform better under concurrent operation than HTTP.jl, particularly with `@async` / `asyncmap`. As of March 2022, threading (e.g. `@spawn` or `@threads`) with Downloads.jl is broken on all releases of Julia ([Downloads.jl#110](https://github.com/JuliaLang/Downloads.jl/issues/110)), and there are still reported issues on the upcoming 1.7.3 and 1.8 releases ([Downloads.jl#182](https://github.com/JuliaLang/Downloads.jl/issues/182])). + +There are three constructors: + + DownloadsBackend() + +This constructor uses AWS.jl's global downloader object. If a transient error occurs, the global downloader object is replaced with a fresh one. + + DownloadsBackend(create_new_downloader) + +This constructor calls `create_new_downloader()` to create a new downloader which it uses. Upon encountering a transient error, another new downloader is created and used from then on. + + DownloadsBackend(D::Downloader) + +This constructor uses the provided downloader `D`, and uses it always. """ -struct DownloadsBackend <: AWS.AbstractBackend +mutable struct DownloadsBackend <: AWS.AbstractBackend downloader::Union{Nothing,Downloads.Downloader} + # `downloader_lock===nothing` signals that we don't want to replace + # the existing `downloader` field. + downloader_lock::Union{Nothing,ReentrantLock} + create_new_downloader::Any end -DownloadsBackend() = DownloadsBackend(nothing) +# Use global downloader; don't replace `downloader` field on transient errors +DownloadsBackend() = DownloadsBackend(nothing, nothing, () -> get_downloader(; fresh=true)) +# Use `create_new_downloader` to create a new `downloader`; DO replace the `downloader` field on transient errors +function DownloadsBackend(create_new_downloader) + return DownloadsBackend(create_new_downloader(), ReentrantLock(), create_new_downloader) +end +# Use provided downloader `D`; don't replace `downloader` on transient errors +DownloadsBackend(D::Downloader) = DownloadsBackend(D, nothing, () -> D) const AWS_DOWNLOADER = Ref{Union{Nothing,Downloader}}(nothing) const AWS_DOWNLOAD_LOCK = ReentrantLock() @@ -31,13 +49,13 @@ const AWS_DOWNLOAD_LOCK = ReentrantLock() # because we add a hook to avoid redirects in order to try to match the HTTPBackend's # implementation, and we don't want to mutate the global downloader from Downloads.jl. # https://github.com/JuliaLang/Downloads.jl/blob/84e948c02b8a0625552a764bf90f7d2ee97c949c/src/Downloads.jl#L293-L301 -function get_downloader(downloader=nothing) +function get_downloader(; fresh=false) + downloader = nothing lock(AWS_DOWNLOAD_LOCK) do yield() # let other downloads finish - downloader isa Downloader && return nothing while true downloader = AWS_DOWNLOADER[] - downloader isa Downloader && return nothing + !fresh && downloader isa Downloader && return nothing D = Downloader() D.easy_hook = (easy, info) -> Curl.setopt(easy, Curl.CURLOPT_FOLLOWLOCATION, false) @@ -57,14 +75,27 @@ function read_body(x::IO) return read(x) end -function _http_request(backend::DownloadsBackend, request::Request, response_stream::IO) +function _http_request( + backend::DownloadsBackend, request::Request, response_stream::IO; transient_retry=false +) # HTTP.jl sets this header automatically. request.headers["Content-Length"] = string(body_length(request.content)) # We pass an `input` only when we have content we wish to send. input = !isempty(request.content) ? IOBuffer(request.content) : nothing - downloader = @something(backend.downloader, get_downloader()) + if transient_retry + downloader = backend.create_new_downloader() + + # If we have a lock, use it to replace our existing downloader. + if backend.downloader_lock !== nothing + Base.@lock backend.downloader_lock begin + backend.downloader = downloader + end + end + else + downloader = @something(backend.downloader, get_downloader()) + end # set the hook so that we don't follow redirects. Only # need to do this on per-request downloaders, because we @@ -78,6 +109,18 @@ function _http_request(backend::DownloadsBackend, request::Request, response_str local response check = function (s, e) + if is_transient_error(e) + + # TODO: remove this restriction. These valid transient errors + # are currently not supported by our tests. + if (isa(e, HTTP.StatusError) && AWS._http_status(e) < 500) + return false + end + + # We want a new one, ref https://github.com/JuliaCloud/AWS.jl/issues/552 + downloader = backend.create_new_downloader() + return true + end return (isa(e, HTTP.StatusError) && AWS._http_status(e) >= 500) || isa(e, Downloads.RequestError) end diff --git a/src/utilities/request.jl b/src/utilities/request.jl index f3bea64a36..fadfc5406c 100644 --- a/src/utilities/request.jl +++ b/src/utilities/request.jl @@ -100,6 +100,9 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers TOO_MANY_REQUESTS = 429 EXPIRED_ERROR_CODES = ["ExpiredToken", "ExpiredTokenException", "RequestExpired"] REDIRECT_ERROR_CODES = [301, 302, 303, 304, 305, 307, 308] + + # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html?highlight=retry + # https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-retries.html#cli-usage-retries-modes-standard.title THROTTLING_ERROR_CODES = [ "Throttling", "ThrottlingException", @@ -107,9 +110,13 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers "RequestThrottledException", "TooManyRequestsException", "ProvisionedThroughputExceededException", + "TransactionInProgressException", + "RequestLimitExceeded", + "BandwidthLimitExceeded", "LimitExceededException", "RequestThrottled", - "PriorRequestNotComplete", + "SlowDown", + "EC2ThrottledException", ] request.headers["User-Agent"] = user_agent[] @@ -119,11 +126,15 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers local aws_response local response + transient_retry = false + get_response = () -> begin credentials(aws) === nothing || sign!(aws, request) - aws_response = @mock _http_request(request.backend, request, stream) + aws_response = @mock _http_request( + request.backend, request, stream; transient_retry=transient_retry + ) response = aws_response.response if response.status in REDIRECT_ERROR_CODES @@ -153,6 +164,10 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers if !(e isa AWSException) return false end + if is_transient_error(e) + transient_retry = true + return true + end occursin("Signature expired", e.message) && return true @@ -170,6 +185,12 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers return true end + if e.code == "PriorRequestNotComplete" + # Retry this transient error, because the + # HTTP backend currently doesn't have a check for it. + return true + end + # Handle BadDigest error and CRC32 check sum failure if _header(e.cause, "crc32body") == "x-amz-crc32" || e.code in ("BadDigest", "RequestTimeout", "RequestTimeoutException") @@ -198,7 +219,9 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers end end -function _http_request(http_backend::HTTPBackend, request::Request, response_stream::IO) +function _http_request( + http_backend::HTTPBackend, request::Request, response_stream::IO; transient_retry=false +) http_options = merge(http_backend.http_options, request.http_options) # HTTP options such as `status_exception` need to be used when creating the stack diff --git a/src/utilities/utilities.jl b/src/utilities/utilities.jl index 52eaca2518..6643bd4305 100644 --- a/src/utilities/utilities.jl +++ b/src/utilities/utilities.jl @@ -159,3 +159,24 @@ function Base.iterate(exp::AWSExponentialBackoff, i=1) delay = min(b * r^i, exp.max_backoff) return delay, i + 1 end + +# https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html +function is_transient_error(e::HTTP.StatusError) + return AWS._http_status(e) in (400, 408, 500, 502, 503, 504) +end + +# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html?highlight=retry +# https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-retries.html#cli-usage-retries-modes-standard.title +const TRANSIENT_ERROR_CODES = ( + "RequestTimeout", + "RequestTimeoutException", + "PriorRequestNotComplete", + "ConnectionError", + "HTTPClientError", +) + +function is_transient_error(e::AWSException) + return e.code in TRANSIENT_ERROR_CODES || is_transient_error(e.cause) +end + +is_transient_error(::Downloads.RequestError) = true diff --git a/test/issues.jl b/test/issues.jl index a5180b5bb1..9d1d609ea7 100644 --- a/test/issues.jl +++ b/test/issues.jl @@ -218,14 +218,33 @@ try end io = IOBuffer() - apply(_incomplete_patch(; data=data, num_attempts_to_fail=4)) do - params = Dict("response_stream" => io) - @test_throws err_t S3.get_object(bucket, key, params; aws_config=config) - - seekstart(io) - retrieved = read(io) - @test length(retrieved) == n - 1 - @test retrieved == data[1:(n - 1)] + if AWS.DEFAULT_BACKEND[] isa AWS.DownloadsBackend + # We'll use a different `DownloadsBackend` with an extra test. + called_make_new_downloader = false + make_new_downloader = () -> begin + called_make_new_downloader = true + return AWS.get_downloader(; fresh=true) + end + AWS.DEFAULT_BACKEND[] = AWS.DownloadsBackend(nothing, make_new_downloader) + end + try + apply(_incomplete_patch(; data=data, num_attempts_to_fail=4)) do + params = Dict("response_stream" => io) + @test_throws err_t S3.get_object(bucket, key, params; aws_config=config) + + seekstart(io) + retrieved = read(io) + @test length(retrieved) == n - 1 + @test retrieved == data[1:(n - 1)] + end + finally + # Reset the downloader + if AWS.DEFAULT_BACKEND[] isa AWS.DownloadsBackend + DEFAULT_BACKEND[] = AWS.DownloadsBackend() + end + end + if AWS.DEFAULT_BACKEND[] isa AWS.DownloadsBackend + @test called_make_new_downloader end end end diff --git a/test/patch.jl b/test/patch.jl index 27b586285c..557dd0f050 100644 --- a/test/patch.jl +++ b/test/patch.jl @@ -62,7 +62,11 @@ function _response(; end function _aws_http_request_patch(response::AWS.Response=_response()) - p = @patch AWS._http_request(::AWS.AbstractBackend, request::Request, ::IO) = response + p = @patch function AWS._http_request( + ::AWS.AbstractBackend, request::Request, ::IO; transient_retry=false + ) + return response + end return p end