Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Downloader for transient retries #553

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/AWS.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,23 @@ export JSONService, RestJSONService, RestXMLService, QueryService, set_features

const DEFAULT_REGION = "us-east-1"

include(joinpath("utilities", "utilities.jl"))

include("AWSExceptions.jl")

using ..AWSExceptions
using ..AWSExceptions: AWSException

include("AWSCredentials.jl")
include("AWSConfig.jl")
include("AWSMetadata.jl")

include(joinpath("utilities", "utilities.jl"))
include(joinpath("utilities", "request.jl"))
include(joinpath("utilities", "response.jl"))
include(joinpath("utilities", "sign.jl"))
include(joinpath("utilities", "downloads_backend.jl"))

include("deprecated.jl")

using ..AWSExceptions
using ..AWSExceptions: AWSException

const user_agent = Ref("AWS.jl/1.0.0")
const aws_config = Ref{AbstractAWSConfig}()

Expand Down
33 changes: 23 additions & 10 deletions src/utilities/downloads_backend.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@ using HTTP.MessageRequest: body_was_streamed
This backend uses the Downloads.jl stdlib to use libcurl
as an HTTP client to connect to the AWS REST API.

It has one field,
It has two fields,

- `downloader::Union{Nothing,Downloads.Downloader}`

which is the `Downloads.Downloader` to use. If set to `nothing`, the default,
then a global downloader object will be used.
- `downloader::Union{Nothing,Downloads.Downloader}`: if `nothing`, use a global Downloader object. Otherwise, uses the given Downloader.
- `create_new_downloader::Any`: a zero-argument function which returns a new Downloader object to use.
Defaults to creating a new global downloader. This is called when a transient error occurs.

Downloads.jl tends to perform better under concurrent operation than HTTP.jl,
particularly with `@async` / `asyncmap`. As of March 2022, threading (e.g. `@spawn` or `@threads`) with Downloads.jl is broken on all releases of Julia ([Downloads.jl#110](https://github.com/JuliaLang/Downloads.jl/issues/110)), and there are still reported issues on the upcoming
1.7.3 and 1.8 releases ([Downloads.jl#182](https://github.com/JuliaLang/Downloads.jl/issues/182])).
"""
struct DownloadsBackend <: AWS.AbstractBackend
downloader::Union{Nothing,Downloads.Downloader}
create_new_downloader::Any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
create_new_downloader::Any
create_new_downloader::Base.Callable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That isn't documented; is it really better than Any?

end

DownloadsBackend() = DownloadsBackend(nothing)
DownloadsBackend() = DownloadsBackend(nothing, () -> get_downloader(; fresh=true))
DownloadsBackend(D::Downloader) = DownloadsBackend(D, () -> get_downloader(; fresh=true))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not restrict the constructor to only accepting create_new_downloader and the downloader field can just be used internally?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Downloaders are stateful, and the original API promise was that you could decide how to share them, e.g. you could have 1 downloader per thread that you provision out. However this issue shows that we need the ability to create new ones as well. That makes me think actually the original API was bad and we probably should only have as input create_new_downloader like you say. However I think that's probably breaking, since we started by allowing you to pass in a Downloader. So my compromise here is that we'll use whatever downloader you pass in, but sometimes we will make a new one.

But... it's even more complicated, because if we make a new one because we think the old one might have a problem, we don't want to use the old one anymore. But DownloadsBackend is immutable, so I don't have a way to replace the old one. We can replace AWS's global downloader, which is probably what most people are using, but if you were using the ability to provision downloaders on a per-request basis then we don't have a way to replace those.

This API problem still has me stumped. The current implementation ONLY fixes things in a good way for users of the global downloader.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest you have the following for backwards compatibility:

DownloadsBackend(D::Downloader) = DownloadsBackend(() -> D)

Using this doesn't work with your transient fixes but is effectively just uses the old behaviour. If you want the fix you need to pass in a function. I can't see another option for this as we can't copy instances of Downloaders.

But... it's even more complicated, because if we make a new one because we think the old one might have a problem, we don't want to use the old one anymore. But DownloadsBackend is immutable

Why not make DownloadsBackend mutable or use a Ref for the downloader field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am worried about concurrent access to the field: we can have multiple readers and at least one writer to the field. I suppose we can add a lock though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a lock and updated the constructors


const AWS_DOWNLOADER = Ref{Union{Nothing,Downloader}}(nothing)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not to derail this PR but using undef when the downloader is not yet defined seems preferable

Suggested change
const AWS_DOWNLOADER = Ref{Union{Nothing,Downloader}}(nothing)
const AWS_DOWNLOADER = Ref{Downloader}()

You'd just need to change some code in get_downloader to use isassigned(AWS_DOWNLOADER) before dereferencing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that separately if we are going to change it. I vaguely remember some problems with that approach when trying it in the original implementation. Maybe it was useful to be able to reset back to nothing, or something like that...

const AWS_DOWNLOAD_LOCK = ReentrantLock()
Expand All @@ -31,13 +32,14 @@ const AWS_DOWNLOAD_LOCK = ReentrantLock()
# because we add a hook to avoid redirects in order to try to match the HTTPBackend's
# implementation, and we don't want to mutate the global downloader from Downloads.jl.
# https://github.com/JuliaLang/Downloads.jl/blob/84e948c02b8a0625552a764bf90f7d2ee97c949c/src/Downloads.jl#L293-L301
function get_downloader(downloader=nothing)
function get_downloader(; fresh=false)
downloader = nothing
lock(AWS_DOWNLOAD_LOCK) do
yield() # let other downloads finish
downloader isa Downloader && return nothing
ericphanson marked this conversation as resolved.
Show resolved Hide resolved
while true
omus marked this conversation as resolved.
Show resolved Hide resolved
downloader = AWS_DOWNLOADER[]
downloader isa Downloader && return nothing
!fresh && downloader isa Downloader && return nothing
D = Downloader()
D.easy_hook =
(easy, info) -> Curl.setopt(easy, Curl.CURLOPT_FOLLOWLOCATION, false)
Expand All @@ -57,14 +59,20 @@ function read_body(x::IO)
return read(x)
end

function _http_request(backend::DownloadsBackend, request::Request, response_stream::IO)
function _http_request(
backend::DownloadsBackend, request::Request, response_stream::IO; transient_retry=false
)
# HTTP.jl sets this header automatically.
request.headers["Content-Length"] = string(body_length(request.content))

# We pass an `input` only when we have content we wish to send.
input = !isempty(request.content) ? IOBuffer(request.content) : nothing

downloader = @something(backend.downloader, get_downloader())
if transient_retry
downloader = backend.create_new_downloader()
else
downloader = @something(backend.downloader, get_downloader())
end

# set the hook so that we don't follow redirects. Only
# need to do this on per-request downloaders, because we
Expand Down Expand Up @@ -108,6 +116,11 @@ function _http_request(backend::DownloadsBackend, request::Request, response_str

check =
(s, e) -> begin
if is_transient_error(e)
# We want a new one, ref https://github.com/JuliaCloud/AWS.jl/issues/552
downloader = backend.create_new_downloader()
return true
end
return (isa(e, HTTP.StatusError) && AWS._http_status(e) >= 500) ||
isa(e, Downloads.RequestError)
end
Expand Down
29 changes: 26 additions & 3 deletions src/utilities/request.jl
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,23 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers
TOO_MANY_REQUESTS = 429
EXPIRED_ERROR_CODES = ["ExpiredToken", "ExpiredTokenException", "RequestExpired"]
REDIRECT_ERROR_CODES = [301, 302, 303, 304, 305, 307, 308]

# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html?highlight=retry
# https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-retries.html#cli-usage-retries-modes-standard.title
THROTTLING_ERROR_CODES = [
"Throttling",
"ThrottlingException",
"ThrottledException",
"RequestThrottledException",
"TooManyRequestsException",
"ProvisionedThroughputExceededException",
"TransactionInProgressException",
"RequestLimitExceeded",
"BandwidthLimitExceeded",
"LimitExceededException",
"RequestThrottled",
"PriorRequestNotComplete",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a transient error, not a throttling one, according to https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-retries.html#cli-usage-retries-modes-standard.title

However, in this PR only Downloads backend is getting special transient error handling, so maybe I should add this back with a comment, so we don't mess up the HTTP behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it later on as a separate check

"SlowDown",
"EC2ThrottledException",
]

request.headers["User-Agent"] = user_agent[]
Expand All @@ -119,11 +126,15 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers
local aws_response
local response

transient_retry = false

get_response =
() -> begin
credentials(aws) === nothing || sign!(aws, request)

aws_response = @mock _http_request(request.backend, request, stream)
aws_response = @mock _http_request(
request.backend, request, stream; transient_retry=transient_retry
)
response = aws_response.response

if response.status in REDIRECT_ERROR_CODES
Expand Down Expand Up @@ -154,6 +165,10 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers
if !(e isa AWSException)
return false
end
if is_transient_error(e)
transient_retry = true
return true
end

occursin("Signature expired", e.message) && return true

Expand All @@ -172,6 +187,12 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers
return true
end

if e.code == "PriorRequestNotComplete"
# Retry this transient error, because the
# HTTP backend currently doesn't have a check for it.
return true
end

# Handle BadDigest error and CRC32 check sum failure
if _header(e.cause, "crc32body") == "x-amz-crc32" ||
e.code in ("BadDigest", "RequestTimeout", "RequestTimeoutException")
Expand Down Expand Up @@ -202,7 +223,9 @@ function submit_request(aws::AbstractAWSConfig, request::Request; return_headers
end
end

function _http_request(http_backend::HTTPBackend, request::Request, response_stream::IO)
function _http_request(
http_backend::HTTPBackend, request::Request, response_stream::IO; transient_retry=false
)
http_options = merge(http_backend.http_options, request.http_options)

# HTTP options such as `status_exception` need to be used when creating the stack
Expand Down
21 changes: 21 additions & 0 deletions src/utilities/utilities.jl
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,24 @@ function Base.iterate(exp::AWSExponentialBackoff, i=1)
delay = min(b * r^i, exp.max_backoff)
return delay, i + 1
end

# https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
function is_transient_error(e::HTTP.StatusError)
return AWS._http_status(e) in (400, 408, 500, 502, 503, 504)
end

# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html?highlight=retry
# https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-retries.html#cli-usage-retries-modes-standard.title
const TRANSIENT_ERROR_CODES = (
"RequestTimeout",
"RequestTimeoutException",
"PriorRequestNotComplete",
"ConnectionError",
"HTTPClientError",
)

function is_transient_error(e::AWSException)
return e.code in TRANSIENT_ERROR_CODES || is_transient_error(e.cause)
end

is_transient_error(::Downloads.RequestError) = true
2 changes: 1 addition & 1 deletion test/patch.jl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ function _response(;
end

function _aws_http_request_patch(response::AWS.Response=_response())
p = @patch AWS._http_request(::AWS.AbstractBackend, request::Request, ::IO) = response
p = @patch AWS._http_request(::AWS.AbstractBackend, request::Request, ::IO; transient_retry=false) = response
ericphanson marked this conversation as resolved.
Show resolved Hide resolved
ericphanson marked this conversation as resolved.
Show resolved Hide resolved
return p
end

Expand Down