From 941db1ea06cb2651df83f79cf12d01cd5ba3f789 Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Mon, 11 Jul 2022 17:54:50 -0600 Subject: [PATCH 1/2] Allow user-configurable retry delay and check It's been reported that users sometimes feel the need to disable HTTP's retry abilities and wrap HTTP methods with their own retry logic due to the lack of configurability around delays and overriding the retry check. Since there is some tricky logic in getting retries just right, especially in the presence of streaming request or response bodies, this PR proposes more configurability for retry logic in the hopes that users won't opt to roll their own incorrect retry logic. --- src/Messages.jl | 5 +++-- src/clientlayers/RetryRequest.jl | 8 +++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/Messages.jl b/src/Messages.jl index 37cd08444..122d3e718 100644 --- a/src/Messages.jl +++ b/src/Messages.jl @@ -261,8 +261,9 @@ Whether a `Request` is eligible to be retried. """ function retryable end -retryable(r::Request) = (isbytes(r.body) || (r.body !== nothing && ismarked(r.body))) && - allow_retries(r) && (isidempotent(r) || retry_non_idempotent(r)) && !retrylimitreached(r) +retryable_requestbody(r::Request) = isbytes(r.body) || (r.body !== nothing && ismarked(r.body)) +retryable(r::Request) = retryable_requestbody(r) && allow_retries(r) && + (isidempotent(r) || retry_non_idempotent(r)) && !retrylimitreached(r) retryable(r::Response) = retryable(r.status) retryable(status) = status in (403, 408, 409, 429, 500, 502, 503, 504, 599) diff --git a/src/clientlayers/RetryRequest.jl b/src/clientlayers/RetryRequest.jl index 11038697c..51722c371 100644 --- a/src/clientlayers/RetryRequest.jl +++ b/src/clientlayers/RetryRequest.jl @@ -5,6 +5,8 @@ using ..IOExtras, ..Messages, ..Strings, ..ExceptionRequest, ..Exceptions export retrylayer +FALSE(args...) = false + """ retrylayer(handler) -> handler @@ -19,7 +21,7 @@ e.g. `Sockets.DNSError`, `Base.EOFError` and `HTTP.StatusError` (if status is `5xx`). """ function retrylayer(handler) - return function(req::Request; retry::Bool=true, retries::Int=4, retry_non_idempotent::Bool=false, kw...) + return function(req::Request; retry::Bool=true, retries::Int=4, retry_non_idempotent::Bool=false, retry_delays=ExponentialBackOff(n = retries), retry_check=FALSE, kw...) if !retry || retries == 0 # no retry return handler(req; kw...) @@ -36,10 +38,10 @@ function retrylayer(handler) end retryattempt = Ref(0) retry_request = Base.retry(handler, - delays=ExponentialBackOff(n = retries), + delays=retry_delays, check=(s, ex) -> begin retryattempt[] += 1 - retry = isrecoverable(ex) && retryable(req) + retry = (isrecoverable(ex) && retryable(req)) || (Messages.retryable_requestbody(req) && retry_check(req, req.resp, ex)) if retryattempt[] == retries req.context[:retrylimitreached] = true end From e72faa27868908ebbf51a7347bf339749b26637d Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Wed, 13 Jul 2022 20:43:27 -0600 Subject: [PATCH 2/2] more ideas --- src/clientlayers/RetryRequest.jl | 4 +++- src/clientlayers/StreamRequest.jl | 16 ++++++++-------- test/client.jl | 14 +++++++++++++- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/clientlayers/RetryRequest.jl b/src/clientlayers/RetryRequest.jl index 51722c371..bad43bcb4 100644 --- a/src/clientlayers/RetryRequest.jl +++ b/src/clientlayers/RetryRequest.jl @@ -41,7 +41,7 @@ function retrylayer(handler) delays=retry_delays, check=(s, ex) -> begin retryattempt[] += 1 - retry = (isrecoverable(ex) && retryable(req)) || (Messages.retryable_requestbody(req) && retry_check(req, req.resp, ex)) + retry = (isrecoverable(ex) && retryable(req)) || (Messages.retryable_requestbody(req) && retry_check(req, req.response, get_maybe_ephemeral_response_body(req), ex)) if retryattempt[] == retries req.context[:retrylimitreached] = true end @@ -63,6 +63,8 @@ function retrylayer(handler) end end +get_maybe_ephemeral_response_body(req::Request) = isbytes(req.response.body) ? req.response.body : get(() -> UInt8[], req.context, :ephemeral_response_body) + supportsmark(x) = false supportsmark(x::T) where {T <: IO} = length(Base.methods(mark, Tuple{T}, parentmodule(T))) > 0 || hasfield(T, :mark) diff --git a/src/clientlayers/StreamRequest.jl b/src/clientlayers/StreamRequest.jl index ff718bee1..0f6f573de 100644 --- a/src/clientlayers/StreamRequest.jl +++ b/src/clientlayers/StreamRequest.jl @@ -17,7 +17,7 @@ immediately so that the transmission can be aborted if the `Response` status indicates that the server does not wish to receive the message body. [RFC7230 6.5](https://tools.ietf.org/html/rfc7230#section-6.5). """ -function streamlayer(stream::Stream; iofunction=nothing, decompress::Bool=true, kw...)::Response +function streamlayer(stream::Stream; iofunction=nothing, decompress::Bool=true, retry_status=nothing, kw...)::Response response = stream.message req = response.request io = stream.stream @@ -45,7 +45,7 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Bool=true, end @debugv 2 "client startread" startread(stream) - readbody(stream, response, decompress) + readbody(stream, response, decompress, retry_status) else iofunction(stream) end @@ -103,7 +103,7 @@ writechunk(stream, body::IO) = writebodystream(stream, body) writechunk(stream, body::Union{Dict, NamedTuple}) = writebodystream(stream, body) writechunk(stream, body) = write(stream, body) -function readbody(stream::Stream, res::Response, decompress::Bool) +function readbody(stream::Stream, res::Response, decompress::Bool, retry_status) if decompress && header(res, "Content-Encoding") == "gzip" # Plug in a buffer stream in between so that we can (i) read the http stream in # chunks instead of byte-by-byte and (ii) make sure to stop reading the http stream @@ -121,21 +121,21 @@ function readbody(stream::Stream, res::Response, decompress::Bool) close(gzstream) end end - readbody!(stream, res, buf) + readbody!(stream, res, buf, retry_status) wait(tsk) else - readbody!(stream, res, stream) + readbody!(stream, res, stream, retry_status) end end -function readbody!(stream::Stream, res::Response, buf_or_stream) +function readbody!(stream::Stream, res::Response, buf_or_stream, retry_status) if isbytes(res.body) # normal response body path: read as Vector{UInt8} and store res.body = read(buf_or_stream) - elseif isredirect(stream) || retryable(stream) + elseif isredirect(stream) || retryable(stream) || (retry_status !== nothing && retry_status(res.status)) # if response body is a stream, but we're redirecting or # retrying, store this "temporary" body in the request context - res.request.context[:response_body] = read(buf_or_stream) + res.request.context[:ephemeral_response_body] = read(buf_or_stream) else # normal streaming response body path: write response body out directly write(res.body, buf_or_stream) diff --git a/test/client.jl b/test/client.jl index ee3b44704..058a9a824 100644 --- a/test/client.jl +++ b/test/client.jl @@ -543,7 +543,19 @@ end resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body) @test resp.status == 200 @test String(take!(res_body)) == "hey there sailor" - @test String(resp.request.context[:response_body]) == "500 unexpected error" + @test String(resp.request.context[:ephemeral_response_body]) == "500 unexpected error" + # custom retry_delays and retry_check + shouldfail[] = true + seekstart(req_body) + chk = (req, resp, resp_body, ex) -> begin + @show resp_body + @test String(resp_body) == "500 unexpected error" + return true + end + # HTTP.post is non-idempotent, so shouldn't be retried, but our custom retry_check says retry + resp = HTTP.post("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry_non_idempotent=true, retry_check=chk) + @test resp.status == 200 + @test String(take!(res_body)) == "hey there sailor" finally if server !== nothing close(server)