Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow user-configurable retry delay and check #877

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/Messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,9 @@ Whether a `Request` is eligible to be retried.
"""
function retryable end

retryable(r::Request) = (isbytes(r.body) || (r.body !== nothing && ismarked(r.body))) &&
allow_retries(r) && (isidempotent(r) || retry_non_idempotent(r)) && !retrylimitreached(r)
retryable_requestbody(r::Request) = isbytes(r.body) || (r.body !== nothing && ismarked(r.body))
retryable(r::Request) = retryable_requestbody(r) && allow_retries(r) &&
(isidempotent(r) || retry_non_idempotent(r)) && !retrylimitreached(r)
retryable(r::Response) = retryable(r.status)
retryable(status) = status in (403, 408, 409, 429, 500, 502, 503, 504, 599)

Expand Down
10 changes: 7 additions & 3 deletions src/clientlayers/RetryRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ using ..IOExtras, ..Messages, ..Strings, ..ExceptionRequest, ..Exceptions

export retrylayer

FALSE(args...) = false

"""
retrylayer(handler) -> handler

Expand All @@ -19,7 +21,7 @@ e.g. `Sockets.DNSError`, `Base.EOFError` and `HTTP.StatusError`
(if status is `5xx`).
"""
function retrylayer(handler)
return function(req::Request; retry::Bool=true, retries::Int=4, retry_non_idempotent::Bool=false, kw...)
return function(req::Request; retry::Bool=true, retries::Int=4, retry_non_idempotent::Bool=false, retry_delays=ExponentialBackOff(n = retries), retry_check=FALSE, kw...)
if !retry || retries == 0
# no retry
return handler(req; kw...)
Expand All @@ -36,10 +38,10 @@ function retrylayer(handler)
end
retryattempt = Ref(0)
retry_request = Base.retry(handler,
delays=ExponentialBackOff(n = retries),
delays=retry_delays,
check=(s, ex) -> begin
retryattempt[] += 1
retry = isrecoverable(ex) && retryable(req)
retry = (isrecoverable(ex) && retryable(req)) || (Messages.retryable_requestbody(req) && retry_check(req, req.response, get_maybe_ephemeral_response_body(req), ex))
if retryattempt[] == retries
req.context[:retrylimitreached] = true
end
Expand All @@ -61,6 +63,8 @@ function retrylayer(handler)
end
end

get_maybe_ephemeral_response_body(req::Request) = isbytes(req.response.body) ? req.response.body : get(() -> UInt8[], req.context, :ephemeral_response_body)

supportsmark(x) = false
supportsmark(x::T) where {T <: IO} = length(Base.methods(mark, Tuple{T}, parentmodule(T))) > 0 || hasfield(T, :mark)

Expand Down
16 changes: 8 additions & 8 deletions src/clientlayers/StreamRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ immediately so that the transmission can be aborted if the `Response` status
indicates that the server does not wish to receive the message body.
[RFC7230 6.5](https://tools.ietf.org/html/rfc7230#section-6.5).
"""
function streamlayer(stream::Stream; iofunction=nothing, decompress::Bool=true, kw...)::Response
function streamlayer(stream::Stream; iofunction=nothing, decompress::Bool=true, retry_status=nothing, kw...)::Response
response = stream.message
req = response.request
io = stream.stream
Expand Down Expand Up @@ -45,7 +45,7 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Bool=true,
end
@debugv 2 "client startread"
startread(stream)
readbody(stream, response, decompress)
readbody(stream, response, decompress, retry_status)
else
iofunction(stream)
end
Expand Down Expand Up @@ -103,7 +103,7 @@ writechunk(stream, body::IO) = writebodystream(stream, body)
writechunk(stream, body::Union{Dict, NamedTuple}) = writebodystream(stream, body)
writechunk(stream, body) = write(stream, body)

function readbody(stream::Stream, res::Response, decompress::Bool)
function readbody(stream::Stream, res::Response, decompress::Bool, retry_status)
if decompress && header(res, "Content-Encoding") == "gzip"
# Plug in a buffer stream in between so that we can (i) read the http stream in
# chunks instead of byte-by-byte and (ii) make sure to stop reading the http stream
Expand All @@ -121,21 +121,21 @@ function readbody(stream::Stream, res::Response, decompress::Bool)
close(gzstream)
end
end
readbody!(stream, res, buf)
readbody!(stream, res, buf, retry_status)
wait(tsk)
else
readbody!(stream, res, stream)
readbody!(stream, res, stream, retry_status)
end
end

function readbody!(stream::Stream, res::Response, buf_or_stream)
function readbody!(stream::Stream, res::Response, buf_or_stream, retry_status)
if isbytes(res.body)
# normal response body path: read as Vector{UInt8} and store
res.body = read(buf_or_stream)
elseif isredirect(stream) || retryable(stream)
elseif isredirect(stream) || retryable(stream) || (retry_status !== nothing && retry_status(res.status))
# if response body is a stream, but we're redirecting or
# retrying, store this "temporary" body in the request context
res.request.context[:response_body] = read(buf_or_stream)
res.request.context[:ephemeral_response_body] = read(buf_or_stream)
else
# normal streaming response body path: write response body out directly
write(res.body, buf_or_stream)
Expand Down
14 changes: 13 additions & 1 deletion test/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,19 @@ end
resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body)
@test resp.status == 200
@test String(take!(res_body)) == "hey there sailor"
@test String(resp.request.context[:response_body]) == "500 unexpected error"
@test String(resp.request.context[:ephemeral_response_body]) == "500 unexpected error"
# custom retry_delays and retry_check
shouldfail[] = true
seekstart(req_body)
chk = (req, resp, resp_body, ex) -> begin
@show resp_body
@test String(resp_body) == "500 unexpected error"
return true
end
# HTTP.post is non-idempotent, so shouldn't be retried, but our custom retry_check says retry
resp = HTTP.post("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry_non_idempotent=true, retry_check=chk)
@test resp.status == 200
@test String(take!(res_body)) == "hey there sailor"
finally
if server !== nothing
close(server)
Expand Down