diff --git a/src/Streams.jl b/src/Streams.jl index d89d3265c..e0f05cfdd 100644 --- a/src/Streams.jl +++ b/src/Streams.jl @@ -1,6 +1,6 @@ module Streams -export Stream, closebody, isaborted, setstatus +export Stream, closebody, isaborted, setstatus, readall! using Sockets, LoggingExtras using ..IOExtras, ..Messages, ..ConnectionPool, ..Conditions, ..Exceptions @@ -252,15 +252,13 @@ function Base.read(http::Stream, ::Type{UInt8}) end function http_unsafe_read(http::Stream, p::Ptr{UInt8}, n::UInt)::Int - ntr = UInt(ntoread(http)) - if ntr == 0 - return 0 - end + ntr == 0 && return 0 + # If there is spare space in `p` + # read two extra bytes + # (`\r\n` at end ofchunk). unsafe_read(http.stream, p, min(n, ntr + (http.readchunked ? 2 : 0))) - # If there is spare space in `p` - # read two extra bytes - n = min(n, ntr) # (`\r\n` at end ofchunk). + n = min(n, ntr) update_ntoread(http, n) return n end @@ -268,7 +266,7 @@ end function Base.readbytes!(http::Stream, buf::AbstractVector{UInt8}, n=length(buf)) @require n <= length(buf) - return http_unsafe_read(http, pointer(buf), UInt(n)) + return GC.@preserve buf http_unsafe_read(http, pointer(buf), UInt(n)) end function Base.unsafe_read(http::Stream, p::Ptr{UInt8}, n::UInt) @@ -282,14 +280,20 @@ function Base.unsafe_read(http::Stream, p::Ptr{UInt8}, n::UInt) nothing end -function Base.readbytes!(http::Stream, buf::IOBuffer, n=bytesavailable(http)) - Base.ensureroom(buf, n) - unsafe_read(http, pointer(buf.data, buf.size + 1), n) +@noinline bufcheck(buf, n) = ((buf.size + n) <= length(buf.data)) || throw(ArgumentError("Unable to grow response stream IOBuffer large enough for response body size")) + +function Base.readbytes!(http::Stream, buf::Base.GenericIOBuffer, n=bytesavailable(http)) + Base.ensureroom(buf, buf.size + n) + # check if there's enough room in buf to write n bytes + bufcheck(buf, n) + data = buf.data + GC.@preserve data unsafe_read(http, pointer(data, buf.size + 1), n) buf.size += n end -function Base.read(http::Stream) - buf = PipeBuffer() +Base.read(http::Stream, buf::Base.GenericIOBuffer=PipeBuffer()) = take!(readall!(http, buf)) + +function readall!(http::Stream, buf::Base.GenericIOBuffer=PipeBuffer()) if ntoread(http) == unknown_length while !eof(http) readbytes!(http, buf) @@ -299,7 +303,7 @@ function Base.read(http::Stream) readbytes!(http, buf, ntoread(http)) end end - return take!(buf) + return buf end function Base.readuntil(http::Stream, f::Function)::ByteView diff --git a/src/clientlayers/RetryRequest.jl b/src/clientlayers/RetryRequest.jl index e0c23f869..8fa5850ef 100644 --- a/src/clientlayers/RetryRequest.jl +++ b/src/clientlayers/RetryRequest.jl @@ -22,7 +22,7 @@ e.g. `Sockets.DNSError`, `Base.EOFError` and `HTTP.StatusError` """ function retrylayer(handler) return function(req::Request; retry::Bool=true, retries::Int=4, - retry_delays::ExponentialBackOff=ExponentialBackOff(n = retries), retry_check=FALSE, + retry_delays::ExponentialBackOff=ExponentialBackOff(n = retries, factor=3.0), retry_check=FALSE, retry_non_idempotent::Bool=false, kw...) if !retry || retries == 0 # no retry @@ -61,8 +61,8 @@ function retrylayer(handler) @debugv 1 "🚷 No Retry: $(no_retry_reason(ex, req))" end return s, retry - end) - + end + ) return retry_request(req; kw...) end end diff --git a/src/clientlayers/StreamRequest.jl b/src/clientlayers/StreamRequest.jl index 2142157ee..5dcb283c2 100644 --- a/src/clientlayers/StreamRequest.jl +++ b/src/clientlayers/StreamRequest.jl @@ -128,10 +128,36 @@ function readbody(stream::Stream, res::Response, decompress::Union{Nothing, Bool end end +# 2 most common types of IOBuffers +const IOBuffers = Union{IOBuffer, Base.GenericIOBuffer{SubArray{UInt8, 1, Vector{UInt8}, Tuple{UnitRange{Int64}}, true}}} + function readbody!(stream::Stream, res::Response, buf_or_stream) if !iserror(res) if isbytes(res.body) - res.body = read(buf_or_stream) + if length(res.body) > 0 + # user-provided buffer to read response body into + # specify write=true to make the buffer writable + # but also specify maxsize, which means it won't be grown + # (we don't want to be changing the user's buffer for them) + body = IOBuffer(res.body; write=true, maxsize=length(res.body)) + if buf_or_stream isa BufferStream + # if it's a BufferStream, the response body was gzip encoded + # so using the default write is fastest because it utilizes + # readavailable under the hood, for which BufferStream is optimized + write(body, buf_or_stream) + elseif buf_or_stream isa Stream + # for HTTP.Stream, there's already an optimized read method + # that just needs an IOBuffer to write into + readall!(buf_or_stream, body) + else + error("unreachable") + end + else + res.body = read(buf_or_stream) + end + elseif (res.body isa IOBuffers || res.body isa Base.GenericIOBuffer) && buf_or_stream isa Stream + # optimization for IOBuffer response_stream to avoid temporary allocations + readall!(buf_or_stream, res.body) else write(res.body, buf_or_stream) end diff --git a/test/client.jl b/test/client.jl index 6195025ec..f45cf3505 100644 --- a/test/client.jl +++ b/test/client.jl @@ -122,6 +122,34 @@ end x["headers"]["Host"] == y["headers"]["Host"] && x["headers"]["User-Agent"] == y["headers"]["User-Agent"] end + + # pass pre-allocated buffer + body = zeros(UInt8, 100) + r = HTTP.get("https://$httpbin/bytes/100"; response_stream=body, socket_type_tls=tls) + @test body === r.body + + # wrapping pre-allocated buffer in IOBuffer will write to buffer directly + io = IOBuffer(body; write=true) + r = HTTP.get("https://$httpbin/bytes/100"; response_stream=io, socket_type_tls=tls) + @test body === r.body.data + + # if provided buffer is too small, we won't grow it for user + body = zeros(UInt8, 10) + @test_throws HTTP.RequestError HTTP.get("https://$httpbin/bytes/100"; response_stream=body, socket_type_tls=tls, retry=false) + + # also won't shrink it if buffer provided is larger than response body + body = zeros(UInt8, 10) + r = HTTP.get("https://$httpbin/bytes/5"; response_stream=body, socket_type_tls=tls) + @test body === r.body + @test length(body) == 10 + @test HTTP.header(r, "Content-Length") == "5" + + # but if you wrap it in a writable IOBuffer, we will grow it + io = IOBuffer(body; write=true) + r = HTTP.get("https://$httpbin/bytes/100"; response_stream=io, socket_type_tls=tls) + # same Array, though it was resized larger + @test body === r.body.data + @test length(body) == 100 end @testset "Client Body Posting - Vector{UTF8}, String, IOStream, IOBuffer, BufferStream, Dict, NamedTuple" begin