Skip to content

Commit

Permalink
Try to get streaming read sizes to match up with chunk sizes where
Browse files Browse the repository at this point in the history
possible.

The IODebug logging for these two cases now has less `unread!`-ing
and less fragmentation.

```
HTTP.open(..., verbose=3) do http
   while !eof(http)
       buf = Vector{UInt8}(undef, 4096)
       @show readbytes!(http, buf)
   end
end

HTTP.open(..., verbose=3) do http
   while !eof(http)
       readavailable(http)
   end
end
```

Add readuntil_block_size = 4096. Attempt to avoid reading ahead to far
when looking for end of headers / endof chunksize.

Speculatively try to read and ignore 2 extra bytes in chunked mode.
Usually the trailing CRLF seems to be in the same packet as the last
part of the chunk data, so this avoids a follow-up fragmented read
when parsing the next chunk size. This should have no downside
for peers that send the trailing CRLF in the same packet as the
new chunk size as the chunk size parser will still ignore it
in that case.
  • Loading branch information
samoconnor committed Oct 21, 2018
1 parent 3cbd5d1 commit 6414e99
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 8 deletions.
5 changes: 3 additions & 2 deletions src/IOExtras.jl
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ end

const ByteView = typeof(view(UInt8[], 1:0))

const readuntil_block_size = 4096

"""
Read from an `IO` stream until `find_delimiter(bytes)` returns non-zero.
Expand All @@ -141,7 +142,7 @@ function Base.readuntil(io::IO,

# Fast path, buffer already contains delimiter...
if !eof(io)
bytes = readavailable(io)
bytes = read(io, readuntil_block_size)
if (l = find_delimiter(bytes)) > 0
if l < length(bytes)
unread!(io, view(bytes, l+1:length(bytes)))
Expand All @@ -152,7 +153,7 @@ function Base.readuntil(io::IO,
# Otherwise, wait for delimiter...
buf = Vector{UInt8}(bytes)
while !eof(io)
bytes = readavailable(io)
bytes = read(io, readuntil_block_size)
append!(buf, bytes)
if (l = find_delimiter(buf)) > 0
if l < length(buf)
Expand Down
29 changes: 23 additions & 6 deletions src/Streams.jl
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ end
@inline function ntoread(http::Stream)

@require headerscomplete(http.message)
# FIXME ?
#if !headerscomplete(http.message)
# startread(http)
#end

# Find length of next chunk
if http.ntoread == unknown_length && http.readchunked
Expand All @@ -206,6 +210,10 @@ end
return http.ntoread
end

# CRLF at end of chunk.
@inline nextra(http::Stream) = http.readchunked ? 2 : 0


@inline function update_ntoread(http::Stream, n)

if http.ntoread != unknown_length
Expand All @@ -229,8 +237,14 @@ function Base.readavailable(http::Stream, n::Int=typemax(Int))::ByteView
return nobytes
end

bytes = read(http.stream, n)
update_ntoread(http, length(bytes))
bytes = read(http.stream, n + nextra(http)) # Try to read (and ignore)
l = min(n, length(bytes)) # trailing CRLF after chunk.
if l > n
bytes = view(bytes, 1:n)
l = n
end

update_ntoread(http, l)

return bytes
end
Expand All @@ -239,19 +253,22 @@ Base.read(http::Stream, n::Integer) = readavailable(http, Int(n))

function http_unsafe_read(http::Stream, p::Ptr{UInt8}, n::UInt)::Int

n = min(n, UInt(ntoread(http)))
ntr = UInt(ntoread(http))

if n == 0
if ntr == 0
return 0
end

unsafe_read(http.stream, p, n)
n2 = min(n, ntr + nextra(http)) # Try to read (and ignore) trailing CRLF
n = min(n, ntr)
unsafe_read(http.stream, p, n2)
update_ntoread(http, n)

return n
end

function Base.readbytes!(http::Stream, buf, n)
function Base.readbytes!(http::Stream, buf::AbstractVector{UInt8},
n=length(buf))
@require n <= length(buf)
return http_unsafe_read(http, pointer(buf), UInt(n))
end
Expand Down

0 comments on commit 6414e99

Please sign in to comment.