Skip to content

Commit

Permalink
Merge pull request #338 from JuliaWeb/so/streamio
Browse files Browse the repository at this point in the history
Improve streaming interface. Reduce allocation and buffering.
  • Loading branch information
samoconnor authored Dec 18, 2018
2 parents 0fec7d0 + f28f47c commit a383583
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 226 deletions.
32 changes: 5 additions & 27 deletions Manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,14 @@ uuid = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"

[[BinaryProvider]]
deps = ["Libdl", "Pkg", "SHA", "Test"]
git-tree-sha1 = "b530fbeb6f41ab5a83fbe3db1fcbe879334bcd2d"
git-tree-sha1 = "9930c1a6cd49d9fcd7218df6be417e6ae4f1468a"
uuid = "b99e7846-7c00-51b0-8f62-c81ae34c0232"
version = "0.4.2"

[[Compat]]
deps = ["Base64", "Dates", "DelimitedFiles", "Distributed", "InteractiveUtils", "LibGit2", "Libdl", "LinearAlgebra", "Markdown", "Mmap", "Pkg", "Printf", "REPL", "Random", "Serialization", "SharedArrays", "Sockets", "SparseArrays", "Statistics", "Test", "UUIDs", "Unicode"]
git-tree-sha1 = "ae262fa91da6a74e8937add6b613f58cd56cdad4"
uuid = "34da2185-b29b-5c13-b0c7-acf172513d20"
version = "1.1.0"
version = "0.5.2"

[[Dates]]
deps = ["Printf"]
uuid = "ade2ca70-3891-5945-98fb-dc099432e06a"

[[DelimitedFiles]]
deps = ["Mmap"]
uuid = "8bb1440f-4735-579b-a4ab-409b98df4dab"

[[Distributed]]
deps = ["LinearAlgebra", "Random", "Serialization", "Sockets"]
uuid = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Expand Down Expand Up @@ -59,10 +49,10 @@ deps = ["Base64"]
uuid = "d6f4376e-aef5-505a-96c1-9c027394607a"

[[MbedTLS]]
deps = ["BinaryProvider", "Compat", "Libdl", "Pkg", "Sockets"]
git-tree-sha1 = "17d5a81dbb1e682d4ff707c01f0afe5948068fa6"
deps = ["BinaryProvider", "Libdl", "Pkg", "Random", "Sockets", "Test"]
git-tree-sha1 = "4b890362c0c2fdb14a575ce927f1f4eeac6dda9f"
uuid = "739be429-bea8-5141-9913-cc70e7f3736d"
version = "0.6.0"
version = "0.6.4"

[[Mmap]]
uuid = "a63ad114-7e13-5084-954f-fe012c677804"
Expand All @@ -89,21 +79,9 @@ uuid = "ea8e919c-243c-51af-8825-aaa63cd721ce"
[[Serialization]]
uuid = "9e88b42a-f829-5b0c-bbe9-9e923198166b"

[[SharedArrays]]
deps = ["Distributed", "Mmap", "Random", "Serialization"]
uuid = "1a1011a3-84de-559e-8e89-a11a2f7dc383"

[[Sockets]]
uuid = "6462fe0b-24de-5631-8697-dd941f90decc"

[[SparseArrays]]
deps = ["LinearAlgebra", "Random"]
uuid = "2f01184e-e22b-5df5-ae63-d93ebab69eaf"

[[Statistics]]
deps = ["LinearAlgebra", "SparseArrays"]
uuid = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"

[[Test]]
deps = ["Distributed", "InteractiveUtils", "Logging", "Random"]
uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
Expand Down
3 changes: 2 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
MbedTLS = "v0.6.0"

[extras]
BufferedStreams = "e1450e63-4bb3-523b-b2a4-4ffa8c0fd77d"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
XMLDict = "228000da-037f-5747-90a9-8195ccbf91a5"

[targets]
test = ["Test", "JSON", "XMLDict", "Distributed"]
test = ["Test", "JSON", "XMLDict", "Distributed", "BufferedStreams"]
120 changes: 57 additions & 63 deletions src/ConnectionPool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ export Connection, Transaction,

using ..IOExtras, ..Sockets

import ..ByteView
import ..@debug, ..@debugshow, ..DEBUG_LEVEL, ..taskid
import ..@require, ..precondition_error, ..@ensure, ..postcondition_error
using MbedTLS: SSLConfig, SSLContext, setup!, associate!, hostname!, handshake!
Expand All @@ -38,9 +37,6 @@ const default_connection_limit = 8
const default_pipeline_limit = 16
const nolimit = typemax(Int)

const nobytes = view(UInt8[], 1:0)
byteview(bytes::ByteView) = bytes
byteview(bytes)::ByteView = view(bytes, 1:length(bytes))

"""
Connection{T <: IO}
Expand All @@ -55,9 +51,9 @@ Fields:
- `peerport`, remote TCP port number (used for debug messages).
- `localport`, local TCP port number (used for debug messages).
- `io::T`, the `TCPSocket` or `SSLContext.
- `excess::ByteView`, left over bytes read from the connection after
the end of a response message. These bytes are probably the start of the
next response message.
- `buffer::IOBuffer`, left over bytes read from the connection after
the end of a response header (or chunksize). These bytes are usually
part of the response body.
- `sequence`, number of most recent `Transaction`.
- `writecount`, number of Messages that have been written.
- `writedone`, signal that `writecount` was incremented.
Expand All @@ -74,7 +70,7 @@ mutable struct Connection{T <: IO}
peerport::UInt16 # debug only
localport::UInt16 # debug only
io::T
excess::ByteView
buffer::IOBuffer
sequence::Int
writecount::Int
writebusy::Bool
Expand Down Expand Up @@ -104,7 +100,7 @@ Connection(host::AbstractString, port::AbstractString,
pipeline_limit, idle_timeout,
require_ssl_verification,
peerport(io), localport(io),
io, nobytes,
io, PipeBuffer(),
-1,
0, false, Condition(),
0, false, Condition(),
Expand Down Expand Up @@ -157,84 +153,80 @@ function Base.eof(t::Transaction)
end

Base.bytesavailable(t::Transaction) = bytesavailable(t.c)
Base.bytesavailable(c::Connection) =
!isempty(c.excess) ? length(c.excess) : bytesavailable(c.io)
Base.bytesavailable(c::Connection) = bytesavailable(c.buffer) +
bytesavailable(c.io)

Base.isreadable(t::Transaction) = t.c.readbusy && t.c.readcount == t.sequence

Base.iswritable(t::Transaction) = t.c.writebusy && t.c.writecount == t.sequence

function Base.readavailable(t::Transaction)::ByteView
@require isreadable(t)
if !isempty(t.c.excess)
bytes = t.c.excess
@debug 4 "↩️ read $(length(bytes))-bytes from excess buffer."
t.c.excess = nobytes
else
bytes = byteview(readavailable(t.c.io))
@debug 4 "⬅️ read $(length(bytes))-bytes from $(typeof(t.c.io))"
end
t.c.timestamp = time()
function Base.read(t::Transaction, nb::Int)
nb = min(nb, bytesavailable(t))
bytes = Base.StringVector(nb)
unsafe_read(t, pointer(bytes), nb)
return bytes
end

function Base.read(t::Transaction, nb::Integer)::ByteView
bytes = t.c.excess
l = length(bytes)
if l > 0
if l > nb
t.c.excess = view(bytes, nb+1:l)
return view(bytes, 1:nb)
else
t.c.excess = nobytes
return bytes
end
function Base.read(t::Transaction, ::Type{UInt8})
if bytesavailable(t.c.buffer) == 0
read_to_buffer(t)
end
v = Base.StringVector(min(nb, bytesavailable(t.c.io)))
unsafe_read(t.c.io, pointer(v), length(v))
return byteview(v)
return read(t.c.buffer, UInt8)
end

function Base.unsafe_read(t::Transaction, p::Ptr{UInt8}, n::UInt)
bytes = t.c.excess
l = length(bytes)
l = bytesavailable(t.c.buffer)
if l > 0
nb = min(l,n)
unsafe_copyto!(p, pointer(bytes), nb)
p += nb;
unsafe_read(t.c.buffer, p, nb)
p += nb
n -= nb
if nb == l
t.c.excess = nobytes
else
t.c.excess = view(bytes, nb+1:l)
end
@debug 4 "↩️ read $nb-bytes from buffer."
end
if n > 0
unsafe_read(t.c.io, p, n)
@debug 4 "⬅️ read $n-bytes from $(typeof(t.c.io))"
end
return nothing
end

function read_to_buffer(t::Transaction, sizehint=4096)

"""
unread!(::Transaction, bytes)
buf = t.c.buffer

# Reset the buffer if it is empty.
if bytesavailable(buf) == 0
buf.size = 0
buf.ptr = 1
end

# Wait for data.
if eof(t.c.io)
throw(EOFError())
end

# Read from stream into buffer.
n = min(sizehint, bytesavailable(t.c.io))
buf = t.c.buffer
Base.ensureroom(buf, n)
unsafe_read(t.c.io, pointer(buf.data, buf.size + 1), n)
buf.size += n
end

Push bytes back into a connection's `excess` buffer
(to be returned by the next read).
"""
function IOExtras.unread!(t::Transaction, bytes::ByteView)
@require isreadable(t)
@require !isempty(bytes)
@require t.c.excess === nobytes || bytes.parent == t.c.excess.parent &&
bytes.indices[1].stop + 1 ==
t.c.excess.indices[1].start
if t.c.excess === nobytes
t.c.excess = bytes
else
t.c.excess = view(bytes.parent,
bytes.indices[1].start:t.c.excess.indices[1].stop)
Read until `find_delimiter(bytes)` returns non-zero.
Return view of bytes up to the delimiter.
"""
function Base.readuntil(t::Transaction, f::Function #=Vector{UInt8} -> Int=#,
sizehint=4096)::ByteView
buf = t.c.buffer
if bytesavailable(buf) == 0
read_to_buffer(t, sizehint)
end
return
while (bytes = readuntil(buf, f)) === nobytes
read_to_buffer(t, sizehint)
end
return bytes
end

"""
Expand Down Expand Up @@ -359,7 +351,8 @@ function purge(c::Connection)
while !eof(c.io)
readavailable(c.io)
end
c.excess = nobytes
c.buffer.size = 0
c.buffer.ptr = 1
@ensure bytesavailable(c) == 0
end

Expand Down Expand Up @@ -665,7 +658,8 @@ function Base.show(io::IO, c::Connection)
c.host, ":",
c.port != "" ? c.port : Int(c.peerport), ":", Int(c.localport),
"", c.pipeline_limit,
length(c.excess) > 0 ? " $(length(c.excess))-byte excess" : "",
bytesavailable(c.buffer) > 0 ?
" $(bytesavailable(c.buffer))-byte excess" : "",
nwaiting > 0 ? " $nwaiting bytes waiting" : "",
DEBUG_LEVEL[] > 1 && applicable(tcpsocket, c.io) ?
" $(Base._fd(tcpsocket(c.io)))" : "")
Expand Down
1 change: 0 additions & 1 deletion src/DebugRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ module DebugRequest

import ..Layer, ..request
using ..IOExtras
import ..ConnectionPool: ByteView, byteview

const live_mode = true

Expand Down
12 changes: 7 additions & 5 deletions src/IODebug.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import ..debug_header

logwrite(iod::IODebug, f, x) = show_io_debug(stdout, "➡️ ", f, x)
logread(iod::IODebug, f, x) = show_io_debug(stdout, "⬅️ ", f, x)
logunread(iod::IODebug, f, x) = show_io_debug(stdout, "♻️ ", f, x)

else

Expand All @@ -23,7 +22,6 @@ else

logwrite(iod::IODebug, f, x) = push!(iod.log, ("➡️ ", f, x))
logread(iod::IODebug, f, x) = push!(iod.log, ("⬅️ ", f, x))
logunread(iod::IODebug, f, x) = push!(iod.log, ("♻️ ", f, x))

end

Expand Down Expand Up @@ -57,9 +55,13 @@ Base.readavailable(iod::IODebug) =
(r = readavailable(iod.io);
logread(iod, :readavailable, String(copy(r))); r)

IOExtras.unread!(iod::IODebug, bytes) =
(logunread(iod, :unread!, String(copy(bytes)));
unread!(iod.io, bytes))
Base.readuntil(iod::IODebug, f) =
(r = readuntil(iod.io, f);
logread(iod, :readuntil, String(copy(r))); r)

Base.readuntil(iod::IODebug, f, h) =
(r = readuntil(iod.io, f, h);
logread(iod, :readuntil, String(copy(r))); r)

Base.eof(iod::IODebug) = eof(iod.io)
Base.close(iod::IODebug) = close(iod.io)
Expand Down
Loading

0 comments on commit a383583

Please sign in to comment.