From 84ce2b3f056a78af4348b2f510f5e7335106c713 Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Fri, 9 Dec 2022 16:45:35 -0700 Subject: [PATCH 1/2] Allow more custom retry logic Allow user to pass `retry_delays` with a custom `ExponentialBackoff` object. Allow user to pass `retry_check` that is a function of the form: `check(s, ex, req, resp) -> Bool` which will be called if the default retry logic doesn't allow retrying. --- src/Messages.jl | 40 ++++++++++++++++------------ src/clientlayers/ExceptionRequest.jl | 2 +- src/clientlayers/MessageRequest.jl | 15 +++++++++-- src/clientlayers/RetryRequest.jl | 25 +++++++++-------- src/clientlayers/StreamRequest.jl | 19 ++++++------- test/client.jl | 20 ++++++++++---- 6 files changed, 76 insertions(+), 45 deletions(-) diff --git a/src/Messages.jl b/src/Messages.jl index ddb8f4a2d..db31d59fd 100644 --- a/src/Messages.jl +++ b/src/Messages.jl @@ -51,7 +51,7 @@ module Messages export Message, Request, Response, reset!, status, method, headers, uri, body, resource, - iserror, isredirect, retryable, ischunked, issafe, isidempotent, + iserror, isredirect, retryablebody, retryable, ischunked, issafe, isidempotent, header, hasheader, headercontains, setheader, defaultheader!, appendheader, removeheader, mkheaders, readheaders, headerscomplete, readchunksize, @@ -124,9 +124,8 @@ function reset!(r::Response) if !isempty(r.headers) empty!(r.headers) end - if r.body isa Vector{UInt8} && !isempty(r.body) - empty!(r.body) - end + delete!(r.request.context, :response_body) + return end status(r::Response) = getfield(r, :status) @@ -224,17 +223,6 @@ https://tools.ietf.org/html/rfc7231#section-4.2.1 issafe(r::Request) = issafe(r.method) issafe(method) = method in ["GET", "HEAD", "OPTIONS", "TRACE"] -""" - isidempotent(::Request) - -https://tools.ietf.org/html/rfc7231#section-4.2.2 -""" -isidempotent(r::Request) = isidempotent(r.method) -isidempotent(method) = issafe(method) || method in ["PUT", "DELETE"] -retry_non_idempotent(r::Request) = get(r.context, :retry_non_idempotent, false) -allow_retries(r::Request) = get(r.context, :allow_retries, false) -nothing_written(r::Request) = get(r.context, :nothingwritten, false) - """ iserror(::Response) @@ -258,6 +246,17 @@ isredirect(status) = status in (301, 302, 303, 307, 308) redirectlimitreached(r::Request) = get(r.context, :redirectlimitreached, false) allow_redirects(r::Request) = get(r.context, :allow_redirects, false) +""" + isidempotent(::Request) + +https://tools.ietf.org/html/rfc7231#section-4.2.2 +""" +isidempotent(r::Request) = isidempotent(r.method) +isidempotent(method) = issafe(method) || method in ["PUT", "DELETE"] +retry_non_idempotent(r::Request) = get(r.context, :retry_non_idempotent, false) +allow_retries(r::Request) = get(r.context, :allow_retries, false) +nothing_written(r::Request) = get(r.context, :nothingwritten, false) + # whether the retry limit has been reached for a given request # set in the RetryRequest layer once the limit is reached retrylimitreached(r::Request) = get(r.context, :retrylimitreached, false) @@ -272,8 +271,15 @@ function retryable end supportsmark(x) = false supportsmark(x::T) where {T <: IO} = length(Base.methods(mark, Tuple{T}, parentmodule(T))) > 0 || hasfield(T, :mark) -retryable(r::Request) = (isbytes(r.body) || r.body isa Union{Dict, NamedTuple} || (r.body isa Vector && all(isbytes, r.body)) || - (supportsmark(r.body) && ismarked(r.body))) && +# request body is retryable if it was provided as "bytes", a Dict or NamedTuple, +# or a chunked array of "bytes"; OR if it supports mark() and is marked +retryablebody(r::Request) = (isbytes(r.body) || r.body isa Union{Dict, NamedTuple} || + (r.body isa Vector && all(isbytes, r.body)) || (supportsmark(r.body) && ismarked(r.body))) + +# request is retryable if the body is retryable, the user is allowing retries at all, +# we haven't reached the retry limit, and either nothing has been written yet or +# the request is idempotent or the user has explicitly allowed non-idempotent retries +retryable(r::Request) = retryablebody(r) && allow_retries(r) && !retrylimitreached(r) && (nothing_written(r) || isidempotent(r) || retry_non_idempotent(r)) retryable(r::Response) = retryable(r.status) diff --git a/src/clientlayers/ExceptionRequest.jl b/src/clientlayers/ExceptionRequest.jl index 4fd8582ce..f91c84754 100644 --- a/src/clientlayers/ExceptionRequest.jl +++ b/src/clientlayers/ExceptionRequest.jl @@ -2,7 +2,7 @@ module ExceptionRequest export exceptionlayer -using ..Messages, ..Exceptions +using ..IOExtras, ..Messages, ..Exceptions """ exceptionlayer(handler) -> handler diff --git a/src/clientlayers/MessageRequest.jl b/src/clientlayers/MessageRequest.jl index ad9033a1e..47f34881b 100644 --- a/src/clientlayers/MessageRequest.jl +++ b/src/clientlayers/MessageRequest.jl @@ -1,7 +1,7 @@ module MessageRequest using URIs -using ..Messages, ..Parsers +using ..IOExtras, ..Messages, ..Parsers export messagelayer @@ -14,7 +14,18 @@ Hard-coded as the first layer in the request pipeline. function messagelayer(handler) return function(method::String, url::URI, headers::Headers, body; response_stream=nothing, http_version=v"1.1", kw...) req = Request(method, resource(url), headers, body; url=url, version=http_version, responsebody=response_stream) - return handler(req; response_stream=response_stream, kw...) + local resp + try + resp = handler(req; response_stream=response_stream, kw...) + finally + if @isdefined(resp) && iserror(resp) && haskey(resp.request.context, :response_body) + if isbytes(resp.body) + resp.body = resp.request.context[:response_body] + else + write(resp.body, resp.request.context[:response_body]) + end + end + end end end diff --git a/src/clientlayers/RetryRequest.jl b/src/clientlayers/RetryRequest.jl index 2d706fc78..5f48f204d 100644 --- a/src/clientlayers/RetryRequest.jl +++ b/src/clientlayers/RetryRequest.jl @@ -5,6 +5,8 @@ using ..IOExtras, ..Messages, ..Strings, ..ExceptionRequest, ..Exceptions export retrylayer +FALSE(x...) = false + """ retrylayer(handler) -> handler @@ -19,7 +21,9 @@ e.g. `Sockets.DNSError`, `Base.EOFError` and `HTTP.StatusError` (if status is `5xx`). """ function retrylayer(handler) - return function(req::Request; retry::Bool=true, retries::Int=4, retry_non_idempotent::Bool=false, kw...) + return function(req::Request; retry::Bool=true, retries::Int=4, + retry_delays::ExponentialBackOff=ExponentialBackOff(n = retries), retry_check=FALSE, + retry_non_idempotent::Bool=false, kw...) if !retry || retries == 0 # no retry return handler(req; kw...) @@ -37,11 +41,11 @@ function retrylayer(handler) end retryattempt = Ref(0) retry_request = Base.retry(handler, - delays=ExponentialBackOff(n = retries), + delays=retry_delays, check=(s, ex) -> begin retryattempt[] += 1 req.context[:retryattempt] = retryattempt[] - retry = isrecoverable(ex) && retryable(req) + retry = retryable(req) || retryablebody(req) && _retry_check(s, ex, req, retry_check) if retryattempt[] == retries req.context[:retrylimitreached] = true end @@ -63,20 +67,19 @@ function retrylayer(handler) end end -isrecoverable(e) = false -isrecoverable(e::Union{Base.EOFError, Base.IOError, MbedTLS.MbedException, OpenSSL.OpenSSLError}) = true -isrecoverable(e::ArgumentError) = e.msg == "stream is closed or unusable" -isrecoverable(e::Sockets.DNSError) = true -isrecoverable(e::ConnectError) = true -isrecoverable(e::RequestError) = isrecoverable(e.error) -isrecoverable(e::StatusError) = retryable(e.status) +function _retry_check(s, ex, req, check) + resp = req.response + if haskey(req.context, :response_body) + resp.body = req.context[:response_body] + end + return check(s, ex, req, resp) +end function no_retry_reason(ex, req) buf = IOBuffer() show(IOContext(buf, :compact => true), req) print(buf, ", ", ex isa StatusError ? "HTTP $(ex.status): " : - !isrecoverable(ex) ? "$ex not recoverable, " : "", !isbytes(req.body) ? "request streamed, " : "", !isbytes(req.response.body) ? "response streamed, " : "", !isidempotent(req) ? "$(req.method) non-idempotent" : "") diff --git a/src/clientlayers/StreamRequest.jl b/src/clientlayers/StreamRequest.jl index f8fba426e..2142157ee 100644 --- a/src/clientlayers/StreamRequest.jl +++ b/src/clientlayers/StreamRequest.jl @@ -129,16 +129,17 @@ function readbody(stream::Stream, res::Response, decompress::Union{Nothing, Bool end function readbody!(stream::Stream, res::Response, buf_or_stream) - if isbytes(res.body) - # normal response body path: read as Vector{UInt8} and store - res.body = read(buf_or_stream) - elseif isredirect(stream) || retryable(stream) - # if response body is a stream, but we're redirecting or - # retrying, store this "temporary" body in the request context - res.request.context[:response_body] = read(buf_or_stream) + if !iserror(res) + if isbytes(res.body) + res.body = read(buf_or_stream) + else + write(res.body, buf_or_stream) + end else - # normal streaming response body path: write response body out directly - write(res.body, buf_or_stream) + # read the response body into the request context so that it can be + # read by the user if they want to or set later if + # we end up not retrying/redirecting/etc. + res.request.context[:response_body] = read(buf_or_stream) end end diff --git a/test/client.jl b/test/client.jl index 023744e26..3741fca7e 100644 --- a/test/client.jl +++ b/test/client.jl @@ -496,6 +496,7 @@ end @testset "Retry with request/response body streams" begin shouldfail = Ref(true) + status = Ref(200) server = HTTP.listen!(8080) do http @assert !eof(http) msg = String(read(http)) @@ -503,6 +504,12 @@ end shouldfail[] = false error("500 unexpected error") end + HTTP.setstatus(http, status[]) + if status[] != 200 + HTTP.startwrite(http) + HTTP.write(http, "$(status[]) unexpected error") + status[] = 200 + end HTTP.startwrite(http) HTTP.write(http, msg) end @@ -518,14 +525,17 @@ end seekstart(req_body) resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry=false, status_exception=false) @test String(take!(res_body)) == "500 unexpected error" - # when retrying, we can still get access to the most recent failed response body in the response's request context - shouldfail[] = true + # don't throw a 500, but set status to status we don't retry by default + shouldfail[] = false + status[] = 404 seekstart(req_body) - println("making 3rd request") - resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body) + check = (s, ex, req, resp) -> begin + @test String(resp.body) == "404 unexpected error" + resp.status == 404 + end + resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry_check=(s, ex, req, resp) -> resp.status == 404) @test isok(resp) @test String(take!(res_body)) == "hey there sailor" - @test String(resp.request.context[:response_body]) == "500 unexpected error" finally close(server) HTTP.ConnectionPool.closeall() From ef84fa45e0953770485b6fdc79545900714562ca Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Fri, 9 Dec 2022 16:48:48 -0700 Subject: [PATCH 2/2] Update docs --- src/HTTP.jl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/HTTP.jl b/src/HTTP.jl index e432dfb3d..d0dc7af44 100644 --- a/src/HTTP.jl +++ b/src/HTTP.jl @@ -135,6 +135,9 @@ Retry arguments: - `retry = true`, retry idempotent requests in case of error. - `retries = 4`, number of times to retry. - `retry_non_idempotent = false`, retry non-idempotent requests too. e.g. POST. + - `retry_delay = ExponentialBackOff(n = retries)`, provide a custom `ExponentialBackOff` object to control the delay between retries. + - `retry_check = (s, ex, req, resp) -> Bool`, provide a custom function to control whether a retry should be attempted. + The function should accept 4 arguments: the delay state, exception, request, and response, and return `true` if a retry should be attempted. Redirect arguments: - `redirect = true`, follow 3xx redirect responses; i.e. additional requests will be made to the redirected location