Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow customizing retry logic for http requests #974

Merged
merged 2 commits into from
Dec 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/HTTP.jl
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ Retry arguments:
- `retry = true`, retry idempotent requests in case of error.
- `retries = 4`, number of times to retry.
- `retry_non_idempotent = false`, retry non-idempotent requests too. e.g. POST.
- `retry_delay = ExponentialBackOff(n = retries)`, provide a custom `ExponentialBackOff` object to control the delay between retries.
- `retry_check = (s, ex, req, resp) -> Bool`, provide a custom function to control whether a retry should be attempted.
The function should accept 4 arguments: the delay state, exception, request, and response, and return `true` if a retry should be attempted.

Redirect arguments:
- `redirect = true`, follow 3xx redirect responses; i.e. additional requests will be made to the redirected location
Expand Down
40 changes: 23 additions & 17 deletions src/Messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ module Messages

export Message, Request, Response,
reset!, status, method, headers, uri, body, resource,
iserror, isredirect, retryable, ischunked, issafe, isidempotent,
iserror, isredirect, retryablebody, retryable, ischunked, issafe, isidempotent,
header, hasheader, headercontains, setheader, defaultheader!, appendheader,
removeheader, mkheaders, readheaders, headerscomplete,
readchunksize,
Expand Down Expand Up @@ -124,9 +124,8 @@ function reset!(r::Response)
if !isempty(r.headers)
empty!(r.headers)
end
if r.body isa Vector{UInt8} && !isempty(r.body)
empty!(r.body)
end
delete!(r.request.context, :response_body)
return
end

status(r::Response) = getfield(r, :status)
Expand Down Expand Up @@ -224,17 +223,6 @@ https://tools.ietf.org/html/rfc7231#section-4.2.1
issafe(r::Request) = issafe(r.method)
issafe(method) = method in ["GET", "HEAD", "OPTIONS", "TRACE"]

"""
isidempotent(::Request)

https://tools.ietf.org/html/rfc7231#section-4.2.2
"""
isidempotent(r::Request) = isidempotent(r.method)
isidempotent(method) = issafe(method) || method in ["PUT", "DELETE"]
retry_non_idempotent(r::Request) = get(r.context, :retry_non_idempotent, false)
allow_retries(r::Request) = get(r.context, :allow_retries, false)
nothing_written(r::Request) = get(r.context, :nothingwritten, false)

"""
iserror(::Response)

Expand All @@ -258,6 +246,17 @@ isredirect(status) = status in (301, 302, 303, 307, 308)
redirectlimitreached(r::Request) = get(r.context, :redirectlimitreached, false)
allow_redirects(r::Request) = get(r.context, :allow_redirects, false)

"""
isidempotent(::Request)

https://tools.ietf.org/html/rfc7231#section-4.2.2
"""
isidempotent(r::Request) = isidempotent(r.method)
isidempotent(method) = issafe(method) || method in ["PUT", "DELETE"]
retry_non_idempotent(r::Request) = get(r.context, :retry_non_idempotent, false)
allow_retries(r::Request) = get(r.context, :allow_retries, false)
nothing_written(r::Request) = get(r.context, :nothingwritten, false)

# whether the retry limit has been reached for a given request
# set in the RetryRequest layer once the limit is reached
retrylimitreached(r::Request) = get(r.context, :retrylimitreached, false)
Expand All @@ -272,8 +271,15 @@ function retryable end
supportsmark(x) = false
supportsmark(x::T) where {T <: IO} = length(Base.methods(mark, Tuple{T}, parentmodule(T))) > 0 || hasfield(T, :mark)

retryable(r::Request) = (isbytes(r.body) || r.body isa Union{Dict, NamedTuple} || (r.body isa Vector && all(isbytes, r.body)) ||
(supportsmark(r.body) && ismarked(r.body))) &&
# request body is retryable if it was provided as "bytes", a Dict or NamedTuple,
# or a chunked array of "bytes"; OR if it supports mark() and is marked
retryablebody(r::Request) = (isbytes(r.body) || r.body isa Union{Dict, NamedTuple} ||
(r.body isa Vector && all(isbytes, r.body)) || (supportsmark(r.body) && ismarked(r.body)))

# request is retryable if the body is retryable, the user is allowing retries at all,
# we haven't reached the retry limit, and either nothing has been written yet or
# the request is idempotent or the user has explicitly allowed non-idempotent retries
retryable(r::Request) = retryablebody(r) &&
allow_retries(r) && !retrylimitreached(r) &&
(nothing_written(r) || isidempotent(r) || retry_non_idempotent(r))
retryable(r::Response) = retryable(r.status)
Expand Down
2 changes: 1 addition & 1 deletion src/clientlayers/ExceptionRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module ExceptionRequest

export exceptionlayer

using ..Messages, ..Exceptions
using ..IOExtras, ..Messages, ..Exceptions

"""
exceptionlayer(handler) -> handler
Expand Down
15 changes: 13 additions & 2 deletions src/clientlayers/MessageRequest.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module MessageRequest

using URIs
using ..Messages, ..Parsers
using ..IOExtras, ..Messages, ..Parsers

export messagelayer

Expand All @@ -14,7 +14,18 @@ Hard-coded as the first layer in the request pipeline.
function messagelayer(handler)
return function(method::String, url::URI, headers::Headers, body; response_stream=nothing, http_version=v"1.1", kw...)
req = Request(method, resource(url), headers, body; url=url, version=http_version, responsebody=response_stream)
return handler(req; response_stream=response_stream, kw...)
local resp
try
resp = handler(req; response_stream=response_stream, kw...)
finally
if @isdefined(resp) && iserror(resp) && haskey(resp.request.context, :response_body)
if isbytes(resp.body)
resp.body = resp.request.context[:response_body]
else
write(resp.body, resp.request.context[:response_body])
end
end
end
end
end

Expand Down
25 changes: 14 additions & 11 deletions src/clientlayers/RetryRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ using ..IOExtras, ..Messages, ..Strings, ..ExceptionRequest, ..Exceptions

export retrylayer

FALSE(x...) = false

"""
retrylayer(handler) -> handler

Expand All @@ -19,7 +21,9 @@ e.g. `Sockets.DNSError`, `Base.EOFError` and `HTTP.StatusError`
(if status is `5xx`).
"""
function retrylayer(handler)
return function(req::Request; retry::Bool=true, retries::Int=4, retry_non_idempotent::Bool=false, kw...)
return function(req::Request; retry::Bool=true, retries::Int=4,
retry_delays::ExponentialBackOff=ExponentialBackOff(n = retries), retry_check=FALSE,
retry_non_idempotent::Bool=false, kw...)
if !retry || retries == 0
# no retry
return handler(req; kw...)
Expand All @@ -37,11 +41,11 @@ function retrylayer(handler)
end
retryattempt = Ref(0)
retry_request = Base.retry(handler,
delays=ExponentialBackOff(n = retries),
delays=retry_delays,
check=(s, ex) -> begin
retryattempt[] += 1
req.context[:retryattempt] = retryattempt[]
retry = isrecoverable(ex) && retryable(req)
retry = retryable(req) || retryablebody(req) && _retry_check(s, ex, req, retry_check)
if retryattempt[] == retries
req.context[:retrylimitreached] = true
end
Expand All @@ -63,20 +67,19 @@ function retrylayer(handler)
end
end

isrecoverable(e) = false
isrecoverable(e::Union{Base.EOFError, Base.IOError, MbedTLS.MbedException, OpenSSL.OpenSSLError}) = true
isrecoverable(e::ArgumentError) = e.msg == "stream is closed or unusable"
isrecoverable(e::Sockets.DNSError) = true
isrecoverable(e::ConnectError) = true
isrecoverable(e::RequestError) = isrecoverable(e.error)
isrecoverable(e::StatusError) = retryable(e.status)
function _retry_check(s, ex, req, check)
resp = req.response
if haskey(req.context, :response_body)
resp.body = req.context[:response_body]
end
return check(s, ex, req, resp)
end

function no_retry_reason(ex, req)
buf = IOBuffer()
show(IOContext(buf, :compact => true), req)
print(buf, ", ",
ex isa StatusError ? "HTTP $(ex.status): " :
!isrecoverable(ex) ? "$ex not recoverable, " : "",
!isbytes(req.body) ? "request streamed, " : "",
!isbytes(req.response.body) ? "response streamed, " : "",
!isidempotent(req) ? "$(req.method) non-idempotent" : "")
Expand Down
19 changes: 10 additions & 9 deletions src/clientlayers/StreamRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,17 @@ function readbody(stream::Stream, res::Response, decompress::Union{Nothing, Bool
end

function readbody!(stream::Stream, res::Response, buf_or_stream)
if isbytes(res.body)
# normal response body path: read as Vector{UInt8} and store
res.body = read(buf_or_stream)
elseif isredirect(stream) || retryable(stream)
# if response body is a stream, but we're redirecting or
# retrying, store this "temporary" body in the request context
res.request.context[:response_body] = read(buf_or_stream)
if !iserror(res)
if isbytes(res.body)
res.body = read(buf_or_stream)
else
write(res.body, buf_or_stream)
end
else
# normal streaming response body path: write response body out directly
write(res.body, buf_or_stream)
# read the response body into the request context so that it can be
# read by the user if they want to or set later if
# we end up not retrying/redirecting/etc.
res.request.context[:response_body] = read(buf_or_stream)
end
end

Expand Down
20 changes: 15 additions & 5 deletions test/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -496,13 +496,20 @@ end

@testset "Retry with request/response body streams" begin
shouldfail = Ref(true)
status = Ref(200)
server = HTTP.listen!(8080) do http
@assert !eof(http)
msg = String(read(http))
if shouldfail[]
shouldfail[] = false
error("500 unexpected error")
end
HTTP.setstatus(http, status[])
if status[] != 200
HTTP.startwrite(http)
HTTP.write(http, "$(status[]) unexpected error")
status[] = 200
end
HTTP.startwrite(http)
HTTP.write(http, msg)
end
Expand All @@ -518,14 +525,17 @@ end
seekstart(req_body)
resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry=false, status_exception=false)
@test String(take!(res_body)) == "500 unexpected error"
# when retrying, we can still get access to the most recent failed response body in the response's request context
shouldfail[] = true
# don't throw a 500, but set status to status we don't retry by default
shouldfail[] = false
status[] = 404
seekstart(req_body)
println("making 3rd request")
resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body)
check = (s, ex, req, resp) -> begin
@test String(resp.body) == "404 unexpected error"
resp.status == 404
end
resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry_check=(s, ex, req, resp) -> resp.status == 404)
@test isok(resp)
@test String(take!(res_body)) == "hey there sailor"
@test String(resp.request.context[:response_body]) == "500 unexpected error"
finally
close(server)
HTTP.ConnectionPool.closeall()
Expand Down