diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1cd179533..3b81cc856 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: fail-fast: false matrix: version: - - '1.0' + - '1.6' - '1' # automatically expands to the latest stable 1.x release of Julia - 'nightly' os: diff --git a/Project.toml b/Project.toml index 1c6be8241..fea4fea25 100644 --- a/Project.toml +++ b/Project.toml @@ -17,7 +17,7 @@ URIs = "5c2747f8-b7ea-4ff2-ba2e-563bfd36b1d4" IniFile = "0.5" MbedTLS = "0.6.8, 0.7, 1" URIs = "1.3" -julia = "0.7, 1" +julia = "1.6" [extras] BufferedStreams = "e1450e63-4bb3-523b-b2a4-4ffa8c0fd77d" diff --git a/docs/src/internal_interface.md b/docs/src/internal_interface.md index a2a7608e5..c5da18a98 100644 --- a/docs/src/internal_interface.md +++ b/docs/src/internal_interface.md @@ -55,11 +55,10 @@ HTTP.Streams.isaborted ```@docs HTTP.ConnectionPool.Connection -HTTP.ConnectionPool.Transaction -HTTP.ConnectionPool.getconnection +HTTP.ConnectionPool.newconnection HTTP.ConnectionPool.POOL -HTTP.IOExtras.startwrite(::HTTP.ConnectionPool.Transaction) -HTTP.IOExtras.closewrite(::HTTP.ConnectionPool.Transaction) -HTTP.IOExtras.startread(::HTTP.ConnectionPool.Transaction) -HTTP.IOExtras.closeread(::HTTP.ConnectionPool.Transaction) +HTTP.IOExtras.startwrite(::HTTP.ConnectionPool.Connection) +HTTP.IOExtras.closewrite(::HTTP.ConnectionPool.Connection) +HTTP.IOExtras.startread(::HTTP.ConnectionPool.Connection) +HTTP.IOExtras.closeread(::HTTP.ConnectionPool.Connection) ``` diff --git a/src/ConnectionPool.jl b/src/ConnectionPool.jl index 3e3eaaf08..78e52b523 100644 --- a/src/ConnectionPool.jl +++ b/src/ConnectionPool.jl @@ -1,31 +1,25 @@ """ -This module provides the [`getconnection`](@ref) function with support for: +This module provides the [`newconnection`](@ref) function with support for: - Opening TCP and SSL connections. -- Reusing connections for multiple Request/Response Messages, -- Pipelining Request/Response Messages. i.e. allowing a new Request to be - sent before previous Responses have been read. +- Reusing connections for multiple Request/Response Messages This module defines a [`Connection`](@ref) -struct to manage pipelining and connection reuse and a -[`Transaction`](@ref)`<: IO` struct to manage a single -pipelined request. Methods are provided for `eof`, `readavailable`, +struct to manage the lifetime of a connection and its reuse. +Methods are provided for `eof`, `readavailable`, `unsafe_write` and `close`. -This allows the `Transaction` object to act as a proxy for the +This allows the `Connection` object to act as a proxy for the `TCPSocket` or `SSLContext` that it wraps. The [`POOL`](@ref) is used to manage connection pooling. Connections -are identified by their host, port, pipeline limit, whether they require +are identified by their host, port, whether they require ssl verification, and whether they are a client or server connection. If a subsequent request matches these properties of a previous connection and limits are respected (reuse limit, idle timeout), and it wasn't otherwise -remotely closed, a connection will be reused. Transactions pipeline their -requests and responses concurrently on a Connection by calling `startwrite` -and `closewrite`, with corresponding `startread` and `closeread`. +remotely closed, a connection will be reused. """ module ConnectionPool -export Connection, Transaction, - getconnection, getrawstream, inactiveseconds +export Connection, newconnection, getrawstream, inactiveseconds using ..IOExtras, ..Sockets @@ -38,23 +32,8 @@ const default_connection_limit = 8 const default_pipeline_limit = 16 const nolimit = typemax(Int) -# certain operations, like locking Channels and Conditions -# is only supported in >= 1.3 -macro v1_3(expr, elses=nothing) - esc(quote - @static if VERSION >= v"1.3" - $expr - else - $elses - end - end) -end - -@static if VERSION >= v"1.3" - const Cond = Threads.Condition -else - const Cond = Condition -end +include("connectionpools.jl") +using .ConnectionPools """ Connection{T <: IO} @@ -64,8 +43,7 @@ A `TCPSocket` or `SSLContext` connection to a HTTP `host` and `port`. Fields: - `host::String` - `port::String`, exactly as specified in the URI (i.e. may be empty). -- `pipeline_limit`, number of requests to send before waiting for responses. -- `idle_timeout`, No. of seconds to maintain connection after last transaction. +- `idle_timeout`, No. of seconds to maintain connection after last request/response. - `peerip`, remote IP adress (used for debug/log messages). - `peerport`, remote TCP port number (used for debug/log messages). - `localport`, local TCP port number (used for debug messages). @@ -74,120 +52,55 @@ Fields: - `buffer::IOBuffer`, left over bytes read from the connection after the end of a response header (or chunksize). These bytes are usually part of the response body. -- `sequence`, number of most recent `Transaction`. -- `writecount`, number of Messages that have been written, protected by `writelock` -- `writelock`, lock writecount and writebusy, and signal that `writecount` was incremented. -- `writebusy`, whether a Transaction currently holds the Connection write lock, protected by `writelock` -- `readcount`, number of Messages that have been read, protected by `readlock` -- `readlock`, lock readcount and readbusy, and signal that `readcount` was incremented. -- `readbusy`, whether a Transaction currently holds the Connection read lock, protectecd by `readlock` - `timestamp`, time data was last received. """ -mutable struct Connection{T <: IO} +mutable struct Connection <: IO host::String port::String - pipeline_limit::Int idle_timeout::Int require_ssl_verification::Bool peerip::IPAddr # for debugging/logging peerport::UInt16 # for debugging/logging localport::UInt16 # debug only - io::T + io::IO clientconnection::Bool buffer::IOBuffer - sequence::Threads.Atomic{Int} - writecount::Int - writelock::Cond # protects the writecount and writebusy fields, notifies on closewrite - writebusy::Bool - readcount::Int - readlock::Cond # protects the readcount and readbusy fields, notifies on closeread - readbusy::Bool timestamp::Float64 - closelock::ReentrantLock - closed::Bool + readable::Bool + writable::Bool end """ - hashconn + connectionkey Used for "hashing" a Connection object on just the key properties necessary for determining -connection re-useability. That is, when a new request calls `getconnection`, we take the -request parameters of what socket type, the host and port, any pipeline_limit and if ssl +connection re-useability. That is, when a new request calls `newconnection`, we take the +request parameters of what socket type, the host and port, and if ssl verification is required, and if an existing Connection was already created with the exact same parameters, we can re-use it (as long as it's not already being used, obviously). """ -function hashconn end - -hashconn(x::Connection{T}) where {T} = hashconn(T, x.host, x.port, x.pipeline_limit, x.require_ssl_verification, x.clientconnection) -hashconn(T, host, port, pipeline_limit, require_ssl_verification, client) = hash(T, hash(host, hash(port, hash(pipeline_limit, hash(require_ssl_verification, hash(client, UInt(0))))))) - -""" - Transaction - -A single pipelined HTTP Request/Response transaction. - -Fields: - - `c`, the shared [`Connection`](@ref) used for this `Transaction`. - - `sequence::Int`, identifies this `Transaction` among the others that share `c`. - - `writebusy::Bool`, whether this Transaction holds its parent Connection write lock, protected by c.writelock - - `readbusy::Bool`, whether this Transaction holds its parent Connection read lock, protected by c.readlock -""" -mutable struct Transaction{T <: IO} <: IO - c::Connection{T} - sequence::Int - writebusy::Bool - readbusy::Bool -end +connectionkey(x::Connection) = (typeof(x.io), x.host, x.port, x.require_ssl_verification, x.clientconnection) Connection(host::AbstractString, port::AbstractString, - pipeline_limit::Int, idle_timeout::Int, - require_ssl_verification::Bool, io::T, client=true) where T <: IO = - Connection{T}(host, port, - pipeline_limit, idle_timeout, - require_ssl_verification, - safe_getpeername(io)..., localport(io), - io, client, PipeBuffer(), Threads.Atomic{Int}(0), - 0, Cond(), false, - 0, Cond(), false, - time(), ReentrantLock(), false) + idle_timeout::Int, + require_ssl_verification::Bool, io::IO, client=true) = + Connection(host, port, idle_timeout, + require_ssl_verification, + safe_getpeername(io)..., localport(io), + io, client, PipeBuffer(), time(), false, false) Connection(io; require_ssl_verification::Bool=true) = - Connection("", "", default_pipeline_limit, 0, require_ssl_verification, io, false) + Connection("", "", 0, require_ssl_verification, io, false) -Transaction(c::Connection{T}) where T <: IO = - Transaction{T}(c, (Threads.atomic_add!(c.sequence, 1)), false, false) +getrawstream(c::Connection) = c.io -function client_transaction(c) - t = Transaction(c) - startwrite(t) - return t -end - -getrawstream(t::Transaction) = t.c.io +inactiveseconds(c::Connection)::Float64 = time() - c.timestamp -inactiveseconds(t::Transaction) = inactiveseconds(t.c) - -function inactiveseconds(c::Connection)::Float64 - return time() - c.timestamp -end - -Base.unsafe_write(t::Transaction, p::Ptr{UInt8}, n::UInt) = - unsafe_write(t.c.io, p, n) +Base.unsafe_write(c::Connection, p::Ptr{UInt8}, n::UInt) = + unsafe_write(c.io, p, n) Base.isopen(c::Connection) = isopen(c.io) -Base.isopen(t::Transaction) = isopen(t.c) && - readcount(t.c) <= t.sequence && - writecount(t.c) <= t.sequence - -writebusy(c::Connection) = @v1_3 lock(() -> c.writebusy, c.writelock) c.writebusy -writecount(c::Connection) = @v1_3 lock(() -> c.writecount, c.writelock) c.writecount -readbusy(c::Connection) = @v1_3 lock(() -> c.readbusy, c.readlock) c.readbusy -readcount(c::Connection) = @v1_3 lock(() -> c.readcount, c.readlock) c.readcount - -writebusy(t::Transaction) = @v1_3 lock(() -> t.writebusy, t.c.writelock) t.writebusy -readbusy(t::Transaction) = @v1_3 lock(() -> t.readbusy, t.c.readlock) t.readbusy - """ flush(c::Connection) @@ -208,61 +121,52 @@ function Base.flush(c::Connection) end end -""" -Is `c` currently in use or expecting a response to request already sent? -""" -isbusy(c::Connection) = isopen(c) && (writebusy(c) || readbusy(c) || - writecount(c) > readcount(c)) +Base.isreadable(c::Connection) = c.readable +Base.iswritable(c::Connection) = c.writable -function Base.eof(t::Transaction) - @require isreadable(t) || !isopen(t) - if bytesavailable(t) > 0 +function Base.eof(c::Connection) + @require isreadable(c) || !isopen(c) + if bytesavailable(c) > 0 return false - end ;@debug 4 "eof(::Transaction) -> eof($(typeof(t.c.io))): $t" - return eof(t.c.io) + end + return eof(c.io) end -Base.bytesavailable(t::Transaction) = bytesavailable(t.c) Base.bytesavailable(c::Connection) = bytesavailable(c.buffer) + bytesavailable(c.io) -Base.isreadable(t::Transaction) = readbusy(t) -Base.iswritable(t::Transaction) = writebusy(t) - -function Base.read(t::Transaction, nb::Int) - nb = min(nb, bytesavailable(t)) +function Base.read(c::Connection, nb::Int) + nb = min(nb, bytesavailable(c)) bytes = Base.StringVector(nb) - unsafe_read(t, pointer(bytes), nb) + GC.@preserve bytes unsafe_read(c, pointer(bytes), nb) return bytes end -function Base.read(t::Transaction, ::Type{UInt8}) - if bytesavailable(t.c.buffer) == 0 - read_to_buffer(t) +function Base.read(c::Connection, ::Type{UInt8}) + if bytesavailable(c.buffer) == 0 + read_to_buffer(c) end - return read(t.c.buffer, UInt8) + return read(c.buffer, UInt8) end -function Base.unsafe_read(t::Transaction, p::Ptr{UInt8}, n::UInt) - l = bytesavailable(t.c.buffer) +function Base.unsafe_read(c::Connection, p::Ptr{UInt8}, n::UInt) + l = bytesavailable(c.buffer) if l > 0 - nb = min(l,n) - unsafe_read(t.c.buffer, p, nb) + nb = min(l, n) + unsafe_read(c.buffer, p, nb) p += nb n -= nb - @debug 4 "↩️ read $nb-bytes from buffer." - t.c.timestamp = time() + c.timestamp = time() end if n > 0 - unsafe_read(t.c.io, p, n) - @debug 4 "⬅️ read $n-bytes from $(typeof(t.c.io))" - t.c.timestamp = time() + unsafe_read(c.io, p, n) + c.timestamp = time() end return nothing end -function read_to_buffer(t::Transaction, sizehint=4096) - buf = t.c.buffer +function read_to_buffer(c::Connection, sizehint=4096) + buf = c.buffer # Reset the buffer if it is empty. if bytesavailable(buf) == 0 @@ -271,15 +175,15 @@ function read_to_buffer(t::Transaction, sizehint=4096) end # Wait for data. - if eof(t.c.io) + if eof(c.io) throw(EOFError()) end # Read from stream into buffer. - n = min(sizehint, bytesavailable(t.c.io)) - buf = t.c.buffer + n = min(sizehint, bytesavailable(c.io)) + buf = c.buffer Base.ensureroom(buf, n) - unsafe_read(t.c.io, pointer(buf.data, buf.size + 1), n) + GC.@preserve buf unsafe_read(c.io, pointer(buf.data, buf.size + 1), n) buf.size += n end @@ -287,118 +191,49 @@ end Read until `find_delimiter(bytes)` returns non-zero. Return view of bytes up to the delimiter. """ -function Base.readuntil(t::Transaction, f::Function #=Vector{UInt8} -> Int=#, +function Base.readuntil(c::Connection, f::Function #=Vector{UInt8} -> Int=#, sizehint=4096)::ByteView - buf = t.c.buffer + buf = c.buffer if bytesavailable(buf) == 0 - read_to_buffer(t, sizehint) + read_to_buffer(c, sizehint) end while (bytes = readuntil(buf, f)) === nobytes - read_to_buffer(t, sizehint) + read_to_buffer(c, sizehint) end return bytes end """ - startwrite(::Transaction) - -Wait for prior pending writes to complete. + startwrite(::Connection) """ -function IOExtras.startwrite(t::Transaction) - @require !iswritable(t) - - @v1_3 lock(t.c.writelock) - try - while writecount(t.c) != t.sequence - @debug 1 "⏳ Wait write: $t" - wait(t.c.writelock) - end - t.writebusy = true - t.c.writebusy = true - @ensure iswritable(t) - @debug 2 "👁 Start write:$t" - finally - @v1_3 unlock(t.c.writelock) - end - +function IOExtras.startwrite(c::Connection) + @require !iswritable(c) + c.writable = true + @debug 2 "👁 Start write:$c" return end """ - closewrite(::Transaction) + closewrite(::Connection) -Signal that an entire Request Message has been written to the `Transaction`. +Signal that an entire Request Message has been written to the `Connection`. """ -function IOExtras.closewrite(t::Transaction) - @require iswritable(t) - - @v1_3 lock(t.c.writelock) - try - t.writebusy = false - t.c.writecount += 1 ;@debug 2 "🗣 Write done: $t" - t.c.writebusy = false - notify(t.c.writelock) - @ensure !iswritable(t) - finally - @v1_3 unlock(t.c.writelock) - end - flush(t.c) - release(t.c) - - return -end - -""" - startread(::Transaction) - -Wait for prior pending reads to complete. -""" -function IOExtras.startread(t::Transaction) - @require !isreadable(t) - - t.c.timestamp = time() - @v1_3 lock(t.c.readlock) - try - while readcount(t.c) != t.sequence - @debug 1 "⏳ Wait read: $t" - wait(t.c.readlock) - end - t.readbusy = true - t.c.readbusy = true - @debug 2 "👁 Start read: $t" - @ensure isreadable(t) - finally - @v1_3 unlock(t.c.readlock) - end - +function IOExtras.closewrite(c::Connection) + @require iswritable(c) + c.writable = false + @debug 2 "🗣 Write done: $c" + flush(c) return end """ - closeread(::Transaction) - -Signal that an entire Response Message has been read from the `Transaction`. - -Increment `readcount` and wake up tasks waiting in `startread`. + startread(::Connection) """ -function IOExtras.closeread(t::Transaction) - @require isreadable(t) - - @v1_3 lock(t.c.readlock) - try - t.readbusy = false - t.c.readcount += 1 ;@debug 2 "✉️ Read done: $t" - t.c.readbusy = false - notify(t.c.readlock) - @ensure !isreadable(t) - if !isbusy(t.c) - @async monitor_idle_connection(t.c) - end - finally - @v1_3 unlock(t.c.readlock) - end - release(t.c) - +function IOExtras.startread(c::Connection) + @require !isreadable(c) + c.timestamp = time() + c.readable = true + @debug 2 "👁 Start read: $c" return end @@ -409,30 +244,35 @@ Close `c` on EOF or if response data arrives when no request was sent. function monitor_idle_connection(c::Connection) if eof(c.io) ;@debug 2 "💀 Closed: $c" close(c.io) - elseif !isbusy(c) ;@debug 1 "😈 Idle RX!!: $c" - close(c.io) end end -function monitor_idle_connection(c::Connection{SSLContext}) - # MbedTLS.jl monitors idle connections for TLS close_notify messages. - # https://github.com/JuliaWeb/MbedTLS.jl/pull/145 -end - -Base.wait_close(t::Transaction) = Base.wait_close(tcpsocket(t.c.io)) +""" + closeread(::Connection) -function Base.close(t::Transaction) - close(t.c) - if iswritable(t) - closewrite(t) - end - if isreadable(t) - closeread(t) +Signal that an entire Response Message has been read from the `Connection`. +""" +function IOExtras.closeread(c::Connection) + @require isreadable(c) + c.readable = false + @debug 2 "✉️ Read done: $c" + if c.clientconnection + release(POOL, connectionkey(c), c) + # Ignore SSLContext as it already monitors idle connections for TLS close_notify messages + !(c.io isa SSLContext) && @async monitor_idle_connection(c) end return end +Base.wait_close(c::Connection) = Base.wait_close(tcpsocket(c.io)) + function Base.close(c::Connection) + if iswritable(c) + closewrite(c) + end + if isreadable(c) + closeread(c) + end close(c.io) if bytesavailable(c) > 0 purge(c) @@ -461,82 +301,16 @@ end Close all connections in `pool`. """ function closeall() - lock(POOL.lock) do - for pod in values(POOL.conns) - @v1_3 lock(pod.conns) - while isready(pod.conns) - close(take!(pod.conns)) - end - pod.numactive = 0 - @v1_3 unlock(pod.conns) - end - end - return -end - -mutable struct Pod - conns::Channel{Connection} - numactive::Int -end - -Pod() = Pod(Channel{Connection}(Inf), 0) - -function decr!(pod::Pod) - @v1_3 @assert islocked(pod.conns.cond_take) - pod.numactive -= 1 - return -end - -function incr!(pod::Pod) - @v1_3 @assert islocked(pod.conns.cond_take) - pod.numactive += 1 - return -end - - -function release(c::Connection) - h = hashconn(c) - if haskey(POOL.conns, h) - pod = getpod(POOL, h) - @debug 2 "returning connection to pod: $c" - put!(pod.conns, c) - end + ConnectionPools.reset!(POOL) return end -# "release" a Connection, without returning to pod for re-use -# used for https proxy tunnel upgrades which shouldn't be reused -function kill!(c::Connection) - h = hashconn(c) - if haskey(POOL.conns, h) - pod = getpod(POOL, h) - @v1_3 lock(pod.conns) - try - decr!(pod) - finally - @v1_3 unlock(pod.conns) - end - end - return -end - -struct Pool - lock::ReentrantLock - conns::Dict{UInt, Pod} -end - """ POOL Global connection pool keeping track of active connections. """ -const POOL = Pool(ReentrantLock(), Dict{UInt, Pod}()) - -function getpod(pool::Pool, x) - lock(pool.lock) do - get!(() -> Pod(), pool.conns, x) - end -end +const POOL = Pool(Connection) """ getconnection(type, host, port) -> Connection @@ -544,93 +318,28 @@ end Find a reusable `Connection` in the `pool`, or create a new `Connection` if required. """ -function getconnection(::Type{Transaction{T}}, +function newconnection(::Type{T}, host::AbstractString, port::AbstractString; - connection_limit::Int=default_connection_limit, - pipeline_limit::Int=default_pipeline_limit, - idle_timeout::Int=0, - reuse_limit::Int=nolimit, + connection_limit=default_connection_limit, + pipeline_limit=default_pipeline_limit, + idle_timeout=typemax(Int), + reuse_limit=nolimit, require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"), - kw...)::Transaction{T} where T <: IO - pod = getpod(POOL, hashconn(T, host, port, pipeline_limit, require_ssl_verification, true)) - @v1_3 lock(pod.conns) - try - while isready(pod.conns) - conn = take!(pod.conns) - if isvalid(pod, conn, reuse_limit, pipeline_limit) - # this is a reuseable connection, so use it - @debug 2 "1 reusing connection: $conn" - return client_transaction(conn) - end - end - # If there are not too many connections to this host:port, - # create a new connection... - if pod.numactive < connection_limit - return newconnection(pod, T, host, port, pipeline_limit, - require_ssl_verification, idle_timeout; kw...) - end - # wait for a Connection to be released - while true - conn = take!(pod.conns) - if isvalid(pod, conn, reuse_limit, pipeline_limit) - # this is a reuseable connection, so use it - @debug 2 "2 reusing connection: $conn" - return client_transaction(conn) - elseif pod.numactive < connection_limit - return newconnection(pod, T, host, port, pipeline_limit, - require_ssl_verification, idle_timeout; kw...) - end - end - finally - @v1_3 unlock(pod.conns) + kw...)::Connection where {T <: IO} + return acquire( + POOL, + (T, host, port, require_ssl_verification, true); + max_concurrent_connections=Int(connection_limit), + idle_timeout=Int(idle_timeout)) do + Connection(host, port, + idle_timeout, require_ssl_verification, + getconnection(T, host, port; + require_ssl_verification=require_ssl_verification, kw...) + ) end end -function isvalid(pod, conn, reuse_limit, pipeline_limit) - # Close connections that have reached the reuse limit... - if reuse_limit != nolimit - if readcount(conn) >= reuse_limit && !readbusy(conn) - @debug 2 "💀 overuse: $conn" - close(conn.io) - end - end - # Close connections that have reached the timeout limit... - if conn.idle_timeout > 0 - if !isbusy(conn) && inactiveseconds(conn) > conn.idle_timeout - @debug 2 "💀 idle timeout: $conn" - close(conn.io) - end - end - # For closed connections, we decrease active count in pod, and "continue" - # which effectively drops the connection - if !isopen(conn.io) - close(conn) - lock(conn.closelock) do - if !conn.closed - conn.closed = true - decr!(pod) - end - end - return false - end - # If we've hit our pipeline_limit, can't use this one, but don't close - if (writecount(conn) - readcount(conn)) >= pipeline_limit + 1 - return false - end - - return !writebusy(conn) -end - -function newconnection(pod, T, host, port, pipeline_limit, require_ssl_verification, idle_timeout; kw...) - io = getconnection(T, host, port; - require_ssl_verification=require_ssl_verification, kw...) - c = Connection(host, port, pipeline_limit, idle_timeout, require_ssl_verification, io) - incr!(pod) - @debug 1 "🔗 New: $c" - return client_transaction(c) -end - function keepalive!(tcp) @debug 2 "setting keepalive on tcp socket" err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint), @@ -746,16 +455,19 @@ function sslconnection(tcp::TCPSocket, host::AbstractString; return io end -function sslupgrade(t::Transaction{TCPSocket}, +function sslupgrade(c::Connection, host::AbstractString; require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"), - kw...)::Transaction{SSLContext} - tls = sslconnection(t.c.io, host; + kw...)::Connection + # first we release the original connection, but we don't want it to be + # reused in the pool, because we're hijacking the TCPSocket + release(POOL, connectionkey(c), c; return_for_reuse=false) + # now we hijack the TCPSocket and upgrade to SSLContext + tls = sslconnection(c.io, host; require_ssl_verification=require_ssl_verification, kw...) - c = Connection(tls; require_ssl_verification=require_ssl_verification) - kill!(t.c) - return client_transaction(c) + conn = Connection(host, "", 0, require_ssl_verification, tls) + return acquire(POOL, connectionkey(conn), conn) end function Base.show(io::IO, c::Connection) @@ -763,12 +475,9 @@ function Base.show(io::IO, c::Connection) print( io, tcpstatus(c), " ", - lpad(writecount(c), 3),"↑", writebusy(c) ? "🔒 " : " ", - lpad(readcount(c), 3), "↓", readbusy(c) ? "🔒 " : " ", "$(lpad(round(Int, time() - c.timestamp), 3))s ", c.host, ":", c.port != "" ? c.port : Int(c.peerport), ":", Int(c.localport), - " ≣", c.pipeline_limit, bytesavailable(c.buffer) > 0 ? " $(bytesavailable(c.buffer))-byte excess" : "", nwaiting > 0 ? " $nwaiting bytes waiting" : "", @@ -776,8 +485,6 @@ function Base.show(io::IO, c::Connection) " $(Base._fd(tcpsocket(c.io)))" : "") end -Base.show(io::IO, t::Transaction) = print(io, "T$(rpad(t.sequence,2)) ", t.c) - function tcpstatus(c::Connection) if !applicable(tcpsocket, c.io) return "" diff --git a/src/ConnectionRequest.jl b/src/ConnectionRequest.jl index 80de0c4b8..790c72f11 100644 --- a/src/ConnectionRequest.jl +++ b/src/ConnectionRequest.jl @@ -62,8 +62,7 @@ export ConnectionPoolLayer function request(::Type{ConnectionPoolLayer{Next}}, url::URI, req, body; proxy=getproxy(url.scheme, url.host), - socket_type::Type=TCPSocket, - reuse_limit::Int=ConnectionPool.nolimit, kw...) where Next + socket_type::Type=TCPSocket, kw...) where Next if proxy !== nothing target_url = url @@ -79,19 +78,14 @@ function request(::Type{ConnectionPoolLayer{Next}}, url::URI, req, body; end end - IOType = ConnectionPool.Transaction{sockettype(url, socket_type)} + IOType = sockettype(url, socket_type) local io try - io = getconnection(IOType, url.host, url.port; - reuse_limit=reuse_limit, kw...) + io = newconnection(IOType, url.host, url.port; kw...) catch e rethrow(isioerror(e) ? IOError(e, "during request($url)") : e) end - if io.sequence >= reuse_limit - defaultheader!(req, "Connection" => "close") - end - try if proxy !== nothing && target_url.scheme == "https" # tunnel request @@ -107,8 +101,7 @@ function request(::Type{ConnectionPoolLayer{Next}}, url::URI, req, body; r = request(Next, io, req, body; kw...) - if (io.sequence >= reuse_limit - || (proxy !== nothing && target_url.scheme == "https")) + if proxy !== nothing && target_url.scheme == "https" close(io) end @@ -132,7 +125,6 @@ function connect_tunnel(io, target_url, req) end request = Request("CONNECT", target, headers) writeheaders(io, request) - startread(io) readheaders(io, request.response) return request.response end diff --git a/src/HTTP.jl b/src/HTTP.jl index 752292a8b..ef0925428 100644 --- a/src/HTTP.jl +++ b/src/HTTP.jl @@ -114,9 +114,6 @@ Connection Pool options - `connect_timeout = 0`, close the connection after this many seconds if it is still attempting to connect. Use `connect_timeout = 0` to disable. - `connection_limit = 8`, number of concurrent connections to each host:port. - - `pipeline_limit = 16`, number of concurrent requests per connection. - - `reuse_limit = nolimit`, number of times a connection is reused after the - first request. - `socket_type = TCPSocket` @@ -548,7 +545,7 @@ relationship with [`HTTP.Response`](@ref), [`HTTP.Parsers`](@ref), │┌───────────────────────────║────────┼──║────────────────────────────────────┐ └▶ HTTP.ConnectionPool ║ │ ║ │ │ ┌──────────────▼────────┐ ┌───────────────────────┐ │ - │ getconnection() -> │ HTTP.Transaction <:IO │ │ HTTP.Transaction <:IO │ │ + │ getconnection() -> │ HTTP.Connection <:IO │ │ HTTP.Connection <:IO │ │ │ └───────────────────────┘ └───────────────────────┘ │ │ ║ ╲│╱ ║ ╲│╱ │ │ ║ │ ║ │ │ @@ -613,14 +610,14 @@ include("Handlers.jl") ;using .Handlers; using .Handlers: serve include("parsemultipart.jl") ;using .MultiPartParsing: parse_multipart_form include("WebSockets.jl") ;using .WebSockets -import .ConnectionPool: Transaction, Connection +import .ConnectionPool: Connection function Base.parse(::Type{T}, str::AbstractString)::T where T <: Message buffer = Base.BufferStream() write(buffer, str) close(buffer) m = T() - http = Stream(m, Transaction(Connection(buffer))) + http = Stream(m, Connection(buffer)) m.body = read(http) closeread(http) return m diff --git a/src/Servers.jl b/src/Servers.jl index dd38676a7..ce8ddc736 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -331,9 +331,10 @@ end """ Start a `check_readtimeout` task to close the `Connection` if it is inactive. -Create a `Transaction` object for each HTTP Request received. +Passes the `Connection` object to handle a single request/response transaction +for each HTTP Request received. After `reuse_limit + 1` transactions, signal `final_transaction` to the -transaction handler. +transaction handler, which will close the connection. """ function handle_connection(f, c::Connection, server, reuse_limit, readtimeout) if readtimeout > 0 @@ -344,7 +345,7 @@ function handle_connection(f, c::Connection, server, reuse_limit, readtimeout) count = 0 # if the connection socket or original server close, we stop taking requests while isopen(c) && isopen(server) && count <= reuse_limit - handle_transaction(f, Transaction(c), server; + handle_transaction(f, c, server; final_transaction=(count == reuse_limit)) count += 1 end @@ -376,21 +377,21 @@ function check_readtimeout(c, readtimeout, wait_for_timeout) end """ -Create a `HTTP.Stream` and parse the Request headers from a `HTTP.Transaction` +Create a `HTTP.Stream` and parse the Request headers from a `HTTP.Connection` (by calling `startread(::Stream`). If there is a parse error, send an error Response. Otherwise, execute stream processing function `f`. If `f` throws an exception, send an error Response and close the connection. """ -function handle_transaction(f, t::Transaction, server; final_transaction::Bool=false) +function handle_transaction(f, c::Connection, server; final_transaction::Bool=false) request = Request() - http = Stream(request, t) + http = Stream(request, c) try @debug 2 "server startread" startread(http) if !isopen(server) - close(t) + close(c) return end catch e @@ -398,8 +399,8 @@ function handle_transaction(f, t::Transaction, server; final_transaction::Bool=f return elseif e isa ParseError status = e.code == :HEADER_SIZE_EXCEEDS_LIMIT ? 413 : 400 - write(t, Response(status, body = string(e.code))) - close(t) + write(c, Response(status, body = string(e.code))) + close(c) return else rethrow(e) @@ -411,9 +412,8 @@ function handle_transaction(f, t::Transaction, server; final_transaction::Bool=f setheader(request.response, "Connection" => "close") end - @async try + try f(http) - # If `startwrite()` was never called, throw an error so we send a 500 and log this if isopen(http) && !iswritable(http) error("Server never wrote a response") @@ -437,9 +437,9 @@ function handle_transaction(f, t::Transaction, server; final_transaction::Bool=f final_transaction = true finally if server.access_log !== nothing - try @info sprint(server.access_log, http) _group=:access; catch end + try; @info sprint(server.access_log, http) _group=:access; catch; end end - final_transaction && close(t.c.io) + final_transaction && close(c.io) end return end diff --git a/src/StreamRequest.jl b/src/StreamRequest.jl index 1146ad14d..6914c5f13 100644 --- a/src/StreamRequest.jl +++ b/src/StreamRequest.jl @@ -43,26 +43,18 @@ function request(::Type{StreamLayer{Next}}, io::IO, req::Request, body; end end - if !isidempotent(req) - # Wait for pipelined reads to complete - # before sending non-idempotent request body. - @debug 2 "non-idempotent client startread" - startread(io) - end - - aborted = false write_error = nothing try - @sync begin if iofunction === nothing @async try writebody(http, req, body) + @debug 2 "client closewrite" + closewrite(http) catch e write_error = e isopen(io) && try; close(io); catch; end end - yield() @debug 2 "client startread" startread(http) readbody(http, response, response_stream, reached_redirect_limit) @@ -74,10 +66,8 @@ function request(::Type{StreamLayer{Next}}, io::IO, req::Request, body; # The server may have closed the connection. # Don't propagate such errors. try; close(io); catch; end - aborted = true end end - catch e if write_error !== nothing throw(write_error) @@ -86,15 +76,10 @@ function request(::Type{StreamLayer{Next}}, io::IO, req::Request, body; end end - # Suppress errors from closing - try - @debug 2 "client closewrite" - closewrite(http) - @debug 2 "client closeread" - closeread(http) - catch e - e isa EOFError && rethrow() - end + @debug 2 "client closewrite" + closewrite(http) + @debug 2 "client closeread" + closeread(http) verbose == 1 && printlncompact(response) verbose == 2 && println(response) @@ -112,18 +97,7 @@ function writebody(http::Stream, req::Request, body) end req.txcount += 1 - - if isidempotent(req) - @debug 2 "client closewrite" - closewrite(http) - else - @debug 2 "🔒 $(req.method) non-idempotent, " * - "holding write lock: $(http.stream)" - # "A user agent SHOULD NOT pipeline requests after a - # non-idempotent method, until the final response - # status code for that method has been received" - # https://tools.ietf.org/html/rfc7230#section-6.3.2 - end + return end function writebodystream(http, req, body) diff --git a/src/Streams.jl b/src/Streams.jl index 35f373aed..e763a427d 100644 --- a/src/Streams.jl +++ b/src/Streams.jl @@ -33,22 +33,17 @@ Creates a `HTTP.Stream` that wraps an existing `IO` stream. - `startwrite(::Stream)` sends the `Request` headers to the `IO` stream. - `write(::Stream, body)` sends the `body` (or a chunk of the body). - `closewrite(::Stream)` sends the final `0` chunk (if needed) and calls - `closewrite` on the `IO` stream. When the `IO` stream is a - [`HTTP.ConnectionPool.Transaction`](@ref), calling `closewrite` releases - the [`HTTP.ConnectionPool.Connection`](@ref) back into the pool for use by the - next pipelined request. + `closewrite` on the `IO` stream. - `startread(::Stream)` calls `startread` on the `IO` stream then - reads and parses the `Response` headers. When the `IO` stream is a - [`HTTP.ConnectionPool.Transaction`](@ref), calling `startread` waits for other - pipelined responses to be read from the [`HTTP.ConnectionPool.Connection`](@ref). + reads and parses the `Response` headers. - `eof(::Stream)` and `readavailable(::Stream)` parse the body from the `IO` stream. - `closeread(::Stream)` reads the trailers and calls `closeread` on the `IO` - stream. When the `IO` stream is a [`HTTP.ConnectionPool.Transaction`](@ref), - calling `closeread` releases the readlock and allows the next pipelined - response to be read by another `Stream` that is waiting in `startread`. - If a complete response has not been received, `closeread` throws `EOFError`. + stream. When the `IO` stream is a [`HTTP.ConnectionPool.Connection`](@ref), + calling `closeread` releases the connection back to the connection pool + for reuse. If a complete response has not been received, `closeread` throws + `EOFError`. """ Stream(r::M, io::S) where {M, S} = Stream{M,S}(r, io, false, false, true, 0, 0) diff --git a/src/WebSockets.jl b/src/WebSockets.jl index 09ae5acc7..151459a4e 100644 --- a/src/WebSockets.jl +++ b/src/WebSockets.jl @@ -102,7 +102,7 @@ function open(f::Function, url; binary=false, verbose=false, headers = [], kw... ] HTTP.open("GET", url, headers; - reuse_limit=0, verbose=verbose ? 2 : 0, kw...) do http + verbose=verbose ? 2 : 0, kw...) do http startread(http) diff --git a/src/access_log.jl b/src/access_log.jl index 97876cc99..1d6939287 100644 --- a/src/access_log.jl +++ b/src/access_log.jl @@ -65,9 +65,9 @@ function symbol_mapping(s::Symbol) hdr = replace(String(m[1]), '_' => '-') :(HTTP.header(http.message.response, $hdr, "-")) elseif s === :remote_addr - :(http.stream.c.peerip) + :(http.stream.peerip) elseif s === :remote_port - :(http.stream.c.peerport) + :(http.stream.peerport) elseif s === :remote_user :("-") # TODO: find from Basic auth... elseif s === :time_iso8601 diff --git a/src/connectionpools.jl b/src/connectionpools.jl new file mode 100644 index 000000000..e6d4d1e7e --- /dev/null +++ b/src/connectionpools.jl @@ -0,0 +1,287 @@ +module ConnectionPools + +export Pod, Pool, acquire, release + +import Base: acquire, release + +connectionid(x) = objectid(x) + +""" + ConnectionTracker(conn::T) + +Wraps a `Connection` of type `T`. +A `Connection` object must support the following interface: + * `isopen(x)`: check if a `Connection` object is still open and can be used + * `close(x)`: close a `Connection` object; `isopen(x)` should return false after calling `close` + * `ConnectionPools.connectionid(x)`: optional method to distinguish `Connection` objects from each other; by default, calls `objectid(x)`, which is valid for `mutable struct` objects + +The `idle_timestamp` field is a timestamp to track when a `Connection` was returned to a `Pod` and became idle_timestamp. + +The `times_used` field keeps track of how many times the connection has been "used" (i.e. acquired then released). +""" +mutable struct ConnectionTracker{T} + conn::T + idle_timestamp::Float64 + times_used::Int +end + +ConnectionTracker(conn::T) where {T} = ConnectionTracker(conn, time(), 0) + +""" + Pod(T; max_concurrent_connections::Int, idle_timeout::Int) + +A threadsafe object for managing a pool of and the reuse of `Connection` objects (see [`ConnectionTracker`](@ref)). + +A Pod manages a collection of `Connection`s and the following keyword arguments allow configuring the management thereof: + + * `max_concurrent_connections::Int=typemax(Int)`: controls the max # of currently acquired `Connection`s allowed + * `idle_timeout::Int=typemax(Int)`: controls the max # of seconds a `Connection` may be idle_timeout before it should be closed and not reused + +After creating a `Pod`, `Connection`s can be acquired by calling [`acquire`](@ref) and MUST +be subsequently released by calling [`release`](@ref). +""" +struct Pod{T} + # this lock/condition protects the `conns` Vector and `active` Dict + # no changes to either field should be made without holding this lock + lock::Threads.Condition + conns::Vector{ConnectionTracker{T}} + active::Dict{Any, ConnectionTracker{T}} + max_concurrent_connections::Int + idle_timeout::Int +end + +const MAX = typemax(Int) + +function Pod(T; max_concurrent_connections::Int=MAX, idle_timeout::Int=MAX) + return Pod(Threads.Condition(), ConnectionTracker{T}[], Dict{Any, ConnectionTracker{T}}(), max_concurrent_connections, idle_timeout) +end + +# check if an idle_timeout `Connection` is still valid to be reused +function isvalid(pod::Pod{C}, conn::ConnectionTracker{C}) where {C} + if (time() - conn.idle_timestamp) > pod.idle_timeout + # println("connection idle_timeout timeout") + # if the connection has been idle_timeout too long, close it + close(conn.conn) + elseif isopen(conn.conn) + # println("found a valid connection to reuse") + # dump(conn.conn) + # otherwise, if the connection is open, this is a valid connection we can use! + return true + else + # println("connection no longer open") + end + return false +end + +function trackconnection!(pod::Pod{C}, conn::ConnectionTracker{C}) where {C} + conn.times_used += 1 + id = connectionid(conn.conn) + if haskey(pod.active, id) + error("connection to be acquired is already an active, tracked connection from the pod according to the `connectionid(conn)`") + end + pod.active[id] = conn + return conn.conn +end + +""" + acquire(f, pod::Pod{C}) -> C + +Check first for existing `Connection`s in a `Pod` still valid to reuse, +and if so, return one. If no existing `Connection` is available for reuse, +call the provided function `f()`, which must return a new connection instance of type `C`. +This new connection instance will be tracked by the `Pod` and MUST be returned to the `Pod` +after use by calling `release(pod, conn)`. +""" +function acquire(f, pod::Pod) + lock(pod.lock) + try + # if there are idle connections in the pod, + # let's check if they're still valid and can be used again + while !isempty(pod.conns) + # Pod connections are FIFO, so grab the earliest returned connection + # println("checking idle_timeout connections for reuse") + conn = popfirst!(pod.conns) + if isvalid(pod, conn) + return trackconnection!(pod, conn) + else + # nothing, let the non-valid connection fall into GC oblivion + end + end + # There were no idle connections able to be reused + # If there are not too many already-active connections, create new + if length(pod.active) < pod.max_concurrent_connections + # println("no idle_timeout connections to reuse; creating new") + return trackconnection!(pod, ConnectionTracker(f())) + end + # If we reach here, there were no valid idle connections and too many + # currently-active connections, so we need to wait until for a "release" + # event, which will mean a connection has been returned that can be reused, + # or a "slot" has opened up so we can create a new connection, otherwise, + # we'll just need to start the loop back over and wait again + while true + # this `wait` call will block on our Pod `lock` condition + # until a connection is `release`ed and the condition + # is notified + # println("connection pool maxxed out; waiting for connection to be released to the pod") + conn = wait(pod.lock) + if conn !== nothing + # println("checking recently released connection validity for reuse") + if isvalid(pod, conn) + return trackconnection!(pod, conn) + end + end + # if the Connection just returned to the Pod wasn't valid, the active + # count should have at least went down, so we should be able to create a new one + if length(pod.active) < pod.max_concurrent_connections + return trackconnection!(pod, ConnectionTracker(f())) + end + # If for some reason there were still too many active connections, let's + # start the loop back over waiting for connections to be returned + end + finally + unlock(pod.lock) + end +end + +# ability to provide an already created connection object to insert into the Pod +# if Pod is already at max_concurrent_connections, acquire will wait until an +# active connection is released back to the pod +# it will be tracked among active connections and must be released +function acquire(pod::Pod{C}, c::C) where {C} + lock(pod.lock) + try + if length(pod.active) < pod.max_concurrent_connections + return trackconnection!(pod, ConnectionTracker(c)) + else + while true + # wait until pod gets a connection released + conn = wait(pod.lock) + if conn !== nothing + push!(pod.conns, conn) + else + conn = ConnectionTracker(c) + end + return trackconnection!(pod, conn) + end + end + finally + unlock(pod.lock) + end +end + +function release(pod::Pod{C}, conn::C; return_for_reuse::Bool=true) where {C} + lock(pod.lock) + try + # We first want to look up the corresponding ConnectionTracker object in our + # Pod `active` Dict that tracks active connections + id = connectionid(conn) + # if, for some reason, it's not in our `active` tracking Dict + # then something is wrong; you're trying to release a `Connection` + # that this Pod currently doesn't think is active + if !haskey(pod.active, id) + error("couldn't find connection id in pod's current list of active connections; invalid release call; each acquired connection should be `release`ed ONLY once") + end + conn_tracker = pod.active[id] + # remove the ConnectionTracker from our `active` Dict tracker + delete!(pod.active, id) + if return_for_reuse && isopen(conn) + # reset the idle_timestamp of the ConnectionTracker + conn_tracker.idle_timestamp = time() + # check if there are any tasks waiting to acquire a connection + if isempty(pod.lock) + # if not, we put the connection back in the pod idle queue + # in this case, there's no need to notify the pod lock/condition + # since there's no one waiting to be notified anyway + # println("returning connection (id='$(id)') to pod idle_timeout queue for reuse") + push!(pod.conns, conn_tracker) + else + # if there are waiters, we notify the pod condition and pass the + # ConnectionTracker object in the notification; we ensure to pass + # all=false, so only one waiter is woken up and receives the + # ConnectionTracker + # println("returning connection (id='$(id)') to a waiting task for reuse") + notify(pod.lock, conn_tracker; all=false) + end + else + # if the user has, for whatever reason, requested this connection not be reused + # anymore by passing `return_for_reuse=false`, then we've still removed it from + # the `active` tracking and want to notify the pod in case there are waiting + # acquire tasks that can now create a new connection + # println("connection not reuseable; notifying pod that a connection has been released though") + notify(pod.lock, nothing; all=false) + end + finally + unlock(pod.lock) + end + return +end + +""" + Pool(T) + +A threadsafe convenience object for managing multiple [`Pod`](@ref)s of connections. +A `Pod` of reuseable connections will be looked up by the `key` when calling `acquire(f, pool, key)`. +""" +struct Pool{C} + lock::ReentrantLock + pods::Dict{Any, Pod{C}} +end + +Pool(C) = Pool(ReentrantLock(), Dict{Any, Pod{C}}()) + +""" + acquire(f, pool::Pool{C}, key; max_concurrent_connections::Int, idle_timeout::Int, reuse::Int) -> C + +Get a connection from a `pool`, looking up a `Pod` of reuseable connections +by the provided `key`. If no `Pod` exists for the given key yet, one will be +created and passed the `max`, `idle_timeout`, and `reuse` keyword arguments if provided. +The provided function `f` must create a new connection instance of type `C`. +The acquired connection MUST be returned to the pool by calling `release(pool, key, conn)` exactly once. +""" +function acquire(f, pool::Pool{C}, key; kw...) where {C} + pod = lock(pool.lock) do + get!(() -> Pod(C; kw...), pool.pods, key) + end + return acquire(f, pod) +end + +function acquire(pool::Pool{C}, key, conn::C; kw...) where {C} + pod = lock(pool.lock) do + get!(() -> Pod(C; kw...), pool.pods, key) + end + return acquire(pod, conn) +end + +""" + release(pool::Pool{C}, key, conn::C) + +Return an acquired connection to a `pool` with the same `key` provided when it was acquired. +""" +function release(pool::Pool{C}, key, conn::C; kw...) where {C} + pod = lock(pool.lock) do + pool.pods[key] + end + release(pod, conn; kw...) + return +end + +function reset!(pool::Pool) + lock(pool.lock) do + for pod in values(pool.pods) + lock(pod.lock) do + foreach(pod.conns) do conn + close(conn.conn) + end + empty!(pod.conns) + for conn in values(pod.active) + close(conn.conn) + end + empty!(pod.active) + end + end + empty!(pool.pods) + end + return +end + +end # module diff --git a/test/chunking.jl b/test/chunking.jl index 942390f29..08d027b53 100644 --- a/test/chunking.jl +++ b/test/chunking.jl @@ -18,7 +18,7 @@ using BufferedStreams t = @async HTTP.listen("127.0.0.1", port) do http startwrite(http) - tcp = http.stream.c.io + tcp = http.stream.io write(tcp, encoded_data[1:split1]) flush(tcp) diff --git a/test/client.jl b/test/client.jl index 65241514d..9bb730a42 100644 --- a/test/client.jl +++ b/test/client.jl @@ -124,26 +124,30 @@ end @testset "Incomplete response with known content length" begin server = Sockets.listen(ip"0.0.0.0", 8080) - task = @async HTTP.listen("0.0.0.0", 8080; server=server) do http - HTTP.setstatus(http, 200) - HTTP.setheader(http, "Content-Length" => "64") # Promise 64 bytes... - HTTP.startwrite(http) - HTTP.write(http, rand(UInt8, 63)) # ...but only send 63 bytes. - # Close the stream so that eof(stream) is true and the client isn't - # waiting forever for the last byte. - HTTP.close(http.stream) - end + try + task = @async HTTP.listen("0.0.0.0", 8080; server=server) do http + HTTP.setstatus(http, 200) + HTTP.setheader(http, "Content-Length" => "64") # Promise 64 bytes... + HTTP.startwrite(http) + HTTP.write(http, rand(UInt8, 63)) # ...but only send 63 bytes. + # Close the stream so that eof(stream) is true and the client isn't + # waiting forever for the last byte. + HTTP.close(http.stream) + end - err = try - HTTP.get("http://localhost:8080"; retry=false) - catch err - err - end - @test err isa HTTP.IOError - @test err.e isa EOFError + err = try + HTTP.get("http://localhost:8080"; retry=false) + catch err + err + end + @test err isa HTTP.IOError + @test err.e isa EOFError - # Shutdown - try; close(server); wait(task); catch; end + finally + # Shutdown + try; close(server); wait(task); catch; end + HTTP.ConnectionPool.closeall() + end end @testset "ASync Client Request Body" begin @@ -257,51 +261,58 @@ end if ip"127.0.0.1" in alladdrs && ip"::1" in alladdrs for interface in (IPv4(0), IPv6(0)) server = listen(interface, 8080) - @async HTTP.listen(string(interface), 8080; server=server) do http - HTTP.setstatus(http, 200) - HTTP.startwrite(http) - HTTP.write(http, "hello, world") + try + @async HTTP.listen(string(interface), 8080; server=server) do http + HTTP.setstatus(http, 200) + HTTP.startwrite(http) + HTTP.write(http, "hello, world") + end + req = HTTP.get("http://localhost:8080") + @test req.status == 200 + @test String(req.body) == "hello, world" + finally + close(server) + HTTP.ConnectionPool.closeall() end - req = HTTP.get("http://localhost:8080") - close(server) - @test req.status == 200 - @test String(req.body) == "hello, world" end end end @testset "Sockets.get(sock|peer)name(::HTTP.Stream)" begin server = listen(IPv4(0), 8080) - @async HTTP.listen("0.0.0.0", 8080; server=server) do http - sock = Sockets.getsockname(http) - peer = Sockets.getpeername(http) - str = sprint() do io - print(io, sock[1], ":", sock[2], " - ", peer[1], ":", peer[2]) + try + @async HTTP.listen("0.0.0.0", 8080; server=server) do http + sock = Sockets.getsockname(http) + peer = Sockets.getpeername(http) + str = sprint() do io + print(io, sock[1], ":", sock[2], " - ", peer[1], ":", peer[2]) + end + HTTP.setstatus(http, 200) + HTTP.setheader(http, "Content-Length" => string(sizeof(str))) + HTTP.startwrite(http) + HTTP.write(http, str) end - HTTP.setstatus(http, 200) - HTTP.setheader(http, "Content-Length" => string(sizeof(str))) - HTTP.startwrite(http) - HTTP.write(http, str) - end - # Tests for Stream{TCPSocket} - HTTP.open("GET", "http://localhost:8080") do http - # Test server peer/sock - reg = r"^127\.0\.0\.1:8080 - 127\.0\.0\.1:(\d+)$" - m = match(reg, read(http, String)) - @test m !== nothing - server_peerport = parse(Int, m[1]) - # Test client peer/sock - sock = Sockets.getsockname(http) - @test sock[1] == ip"127.0.0.1" - @test sock[2] == server_peerport - peer = Sockets.getpeername(http) - @test peer[1] == ip"127.0.0.1" - @test peer[2] == 8080 + # Tests for Stream{TCPSocket} + HTTP.open("GET", "http://localhost:8080") do http + # Test server peer/sock + reg = r"^127\.0\.0\.1:8080 - 127\.0\.0\.1:(\d+)$" + m = match(reg, read(http, String)) + @test m !== nothing + server_peerport = parse(Int, m[1]) + # Test client peer/sock + sock = Sockets.getsockname(http) + @test sock[1] == ip"127.0.0.1" + @test sock[2] == server_peerport + peer = Sockets.getpeername(http) + @test peer[1] == ip"127.0.0.1" + @test peer[2] == 8080 + end + finally + close(server) + HTTP.ConnectionPool.closeall() end - close(server) - # Tests for Stream{SSLContext} HTTP.open("GET", "https://julialang.org") do http sock = Sockets.getsockname(http) @@ -326,71 +337,79 @@ end @testset "Implicit request headers" begin server = listen(IPv4(0), 8080) - tsk = @async HTTP.listen("0.0.0.0", 8080; server=server) do http - data = Dict{String,String}(http.message.headers) - HTTP.setstatus(http, 200) - HTTP.startwrite(http) - HTTP.write(http, sprint(JSON.print, data)) + try + tsk = @async HTTP.listen("0.0.0.0", 8080; server=server) do http + data = Dict{String,String}(http.message.headers) + HTTP.setstatus(http, 200) + HTTP.startwrite(http) + HTTP.write(http, sprint(JSON.print, data)) + end + old_user_agent = HTTP.MessageRequest.USER_AGENT[] + default_user_agent = "HTTP.jl/$VERSION" + # Default values + HTTP.setuseragent!(default_user_agent) + d = JSON.parse(IOBuffer(HTTP.get("http://localhost:8080").body)) + @test d["Host"] == "localhost:8080" + @test d["Accept"] == "*/*" + @test d["User-Agent"] == default_user_agent + # Overwriting behavior + headers = ["Host" => "http.jl", "Accept" => "application/json"] + HTTP.setuseragent!("HTTP.jl test") + d = JSON.parse(IOBuffer(HTTP.get("http://localhost:8080", headers).body)) + @test d["Host"] == "http.jl" + @test d["Accept"] == "application/json" + @test d["User-Agent"] == "HTTP.jl test" + # No User-Agent + HTTP.setuseragent!(nothing) + d = JSON.parse(IOBuffer(HTTP.get("http://localhost:8080").body)) + @test !haskey(d, "User-Agent") + + HTTP.setuseragent!(old_user_agent) + finally + close(server) + HTTP.ConnectionPool.closeall() end - old_user_agent = HTTP.MessageRequest.USER_AGENT[] - default_user_agent = "HTTP.jl/$VERSION" - # Default values - HTTP.setuseragent!(default_user_agent) - d = JSON.parse(IOBuffer(HTTP.get("http://localhost:8080").body)) - @test d["Host"] == "localhost:8080" - @test d["Accept"] == "*/*" - @test d["User-Agent"] == default_user_agent - # Overwriting behavior - headers = ["Host" => "http.jl", "Accept" => "application/json"] - HTTP.setuseragent!("HTTP.jl test") - d = JSON.parse(IOBuffer(HTTP.get("http://localhost:8080", headers).body)) - @test d["Host"] == "http.jl" - @test d["Accept"] == "application/json" - @test d["User-Agent"] == "HTTP.jl test" - # No User-Agent - HTTP.setuseragent!(nothing) - d = JSON.parse(IOBuffer(HTTP.get("http://localhost:8080").body)) - @test !haskey(d, "User-Agent") - - HTTP.setuseragent!(old_user_agent) - close(server) end import NetworkOptions, MbedTLS @testset "NetworkOptions for host verification" begin # Set up server with self-signed cert server = listen(IPv4(0), 8443) - cert, key = joinpath.(@__DIR__, "resources", ("cert.pem", "key.pem")) - sslconfig = MbedTLS.SSLConfig(cert, key) - tsk = @async HTTP.listen("0.0.0.0", 8443; server=server, sslconfig=sslconfig) do http - HTTP.setstatus(http, 200) - HTTP.startwrite(http) - HTTP.write(http, "hello, world") - end - url = "https://localhost:8443" - env = ["JULIA_NO_VERIFY_HOSTS" => nothing, "JULIA_SSL_NO_VERIFY_HOSTS" => nothing, "JULIA_ALWAYS_VERIFY_HOSTS" => nothing] - withenv(env...) do - @test NetworkOptions.verify_host(url) - @test NetworkOptions.verify_host(url, "SSL") - @test_throws HTTP.IOError HTTP.get(url; retries=1) - @test_throws HTTP.IOError HTTP.get(url; require_ssl_verification=true, retries=1) - @test HTTP.get(url; require_ssl_verification=false).status == 200 - end - withenv(env..., "JULIA_NO_VERIFY_HOSTS" => "localhost") do - @test !NetworkOptions.verify_host(url) - @test !NetworkOptions.verify_host(url, "SSL") - @test HTTP.get(url).status == 200 - @test_throws HTTP.IOError HTTP.get(url; require_ssl_verification=true, retries=1) - @test HTTP.get(url; require_ssl_verification=false).status == 200 - end - withenv(env..., "JULIA_SSL_NO_VERIFY_HOSTS" => "localhost") do - @test NetworkOptions.verify_host(url) - @test !NetworkOptions.verify_host(url, "SSL") - @test HTTP.get(url).status == 200 - @test_throws HTTP.IOError HTTP.get(url; require_ssl_verification=true, retries=1) - @test HTTP.get(url; require_ssl_verification=false).status == 200 + try + cert, key = joinpath.(@__DIR__, "resources", ("cert.pem", "key.pem")) + sslconfig = MbedTLS.SSLConfig(cert, key) + tsk = @async HTTP.listen("0.0.0.0", 8443; server=server, sslconfig=sslconfig) do http + HTTP.setstatus(http, 200) + HTTP.startwrite(http) + HTTP.write(http, "hello, world") + end + url = "https://localhost:8443" + env = ["JULIA_NO_VERIFY_HOSTS" => nothing, "JULIA_SSL_NO_VERIFY_HOSTS" => nothing, "JULIA_ALWAYS_VERIFY_HOSTS" => nothing] + withenv(env...) do + @test NetworkOptions.verify_host(url) + @test NetworkOptions.verify_host(url, "SSL") + @test_throws HTTP.IOError HTTP.get(url; retries=1) + @test_throws HTTP.IOError HTTP.get(url; require_ssl_verification=true, retries=1) + @test HTTP.get(url; require_ssl_verification=false).status == 200 + end + withenv(env..., "JULIA_NO_VERIFY_HOSTS" => "localhost") do + @test !NetworkOptions.verify_host(url) + @test !NetworkOptions.verify_host(url, "SSL") + @test HTTP.get(url).status == 200 + @test_throws HTTP.IOError HTTP.get(url; require_ssl_verification=true, retries=1) + @test HTTP.get(url; require_ssl_verification=false).status == 200 + end + withenv(env..., "JULIA_SSL_NO_VERIFY_HOSTS" => "localhost") do + @test NetworkOptions.verify_host(url) + @test !NetworkOptions.verify_host(url, "SSL") + @test HTTP.get(url).status == 200 + @test_throws HTTP.IOError HTTP.get(url; require_ssl_verification=true, retries=1) + @test HTTP.get(url; require_ssl_verification=false).status == 200 + end + finally + close(server) + HTTP.ConnectionPool.closeall() end - close(server) end @testset "Public entry point of HTTP.request and friends (e.g. issue #463)" begin @@ -446,23 +465,26 @@ end # We are only interested in the request passed in by the client # Returns 400 after reading the http request into req proxy = listen(IPv4(0), 8082) - @async begin - sock = accept(proxy) - while isopen(sock) - line = readline(sock) - isempty(line) && break - - push!(req, line) + try + @async begin + sock = accept(proxy) + while isopen(sock) + line = readline(sock) + isempty(line) && break + + push!(req, line) + end + write(sock, "HTTP/1.1 400 Bad Request\r\n\r\n") end - write(sock, "HTTP/1.1 400 Bad Request\r\n\r\n") - end - # Make the HTTP request - HTTP.get("https://example.com"; proxy="http://localhost:8082", retry=false, status_exception=false) + # Make the HTTP request + HTTP.get("https://example.com"; proxy="http://localhost:8082", retry=false, status_exception=false) - # Test if the host header exist in the request - @test "Host: example.com:443" in req - - close(proxy) + # Test if the host header exist in the request + @test "Host: example.com:443" in req + finally + close(proxy) + HTTP.ConnectionPool.closeall() + end end end diff --git a/test/loopback.jl b/test/loopback.jl index 07323acc7..48663ba9a 100644 --- a/test/loopback.jl +++ b/test/loopback.jl @@ -42,7 +42,6 @@ Base.readavailable(lb::Loopback) = readavailable(lb.io) Base.unsafe_read(lb::Loopback, p::Ptr, n::Integer) = unsafe_read(lb.io, p, n) HTTP.IOExtras.tcpsocket(::Loopback) = Sockets.TCPSocket() -HTTP.ConnectionPool.tcpstatus(c::HTTP.ConnectionPool.Connection{Loopback}) = "🤖 " lbreq(req, headers, body; method="GET", kw...) = HTTP.request(method, "http://test/$req", headers, body; config..., kw...) @@ -320,61 +319,6 @@ end end end - @testset "ASync - Pipeline limit = 1" begin - server_events = [] - t = async_test(;pipeline_limit=1) - if haskey(ENV, "HTTP_JL_TEST_TIMING_SENSITIVE") - @test server_events == [ - "Request: GET /delay1 HTTP/1.1", - "Request: GET /delay2 HTTP/1.1", - "Response: HTTP/1.1 200 OK <= (GET /delay1 HTTP/1.1)", - "Request: GET /delay3 HTTP/1.1", - "Response: HTTP/1.1 200 OK <= (GET /delay2 HTTP/1.1)", - "Request: GET /delay4 HTTP/1.1", - "Response: HTTP/1.1 200 OK <= (GET /delay3 HTTP/1.1)", - "Request: GET /delay5 HTTP/1.1", - "Response: HTTP/1.1 200 OK <= (GET /delay4 HTTP/1.1)", - "Response: HTTP/1.1 200 OK <= (GET /delay5 HTTP/1.1)"] - end - end - - @testset "ASync - Pipeline limit = 2" begin - server_events = [] - t = async_test(;pipeline_limit=2) - if haskey(ENV, "HTTP_JL_TEST_TIMING_SENSITIVE") - @test server_events == [ - "Request: GET /delay1 HTTP/1.1", - "Request: GET /delay2 HTTP/1.1", - "Request: GET /delay3 HTTP/1.1", - "Response: HTTP/1.1 200 OK <= (GET /delay1 HTTP/1.1)", - "Request: GET /delay4 HTTP/1.1", - "Response: HTTP/1.1 200 OK <= (GET /delay2 HTTP/1.1)", - "Request: GET /delay5 HTTP/1.1", - "Response: HTTP/1.1 200 OK <= (GET /delay3 HTTP/1.1)", - "Response: HTTP/1.1 200 OK <= (GET /delay4 HTTP/1.1)", - "Response: HTTP/1.1 200 OK <= (GET /delay5 HTTP/1.1)"] - end - end - - @testset "ASync - Pipeline limit = 3" begin - server_events = [] - t = async_test(;pipeline_limit=3) - @show t - if haskey(ENV, "HTTP_JL_TEST_TIMING_SENSITIVE") - @test server_events == [ - "Request: GET /delay1 HTTP/1.1", - "Request: GET /delay2 HTTP/1.1", - "Request: GET /delay3 HTTP/1.1", - "Request: GET /delay4 HTTP/1.1", - "Response: HTTP/1.1 200 OK <= (GET /delay1 HTTP/1.1)", - "Request: GET /delay5 HTTP/1.1", - "Response: HTTP/1.1 200 OK <= (GET /delay2 HTTP/1.1)", - "Response: HTTP/1.1 200 OK <= (GET /delay3 HTTP/1.1)", - "Response: HTTP/1.1 200 OK <= (GET /delay4 HTTP/1.1)", - "Response: HTTP/1.1 200 OK <= (GET /delay5 HTTP/1.1)"] - end - end - @testset "ASync - " begin server_events = [] t = async_test() @@ -405,47 +349,4 @@ end "Request: POST /delay1 HTTP/1.1", "Response: HTTP/1.1 200 OK <= (POST /delay1 HTTP/1.1)"] end -# there were issues in travis testing for windows 32-bit where -# the slowness causes these tests to be pretty unreliable; i.e. -# the requests were still pipelined fine and in the correct order -# but we just didn't see the right order of the server receiving all -# three requests first then sending the 3 responses -@static if Sys.WORD_SIZE == 64 - @testset "ASync - " begin - server_events = [] - t = async_test(["GET","GET","POST", "GET","GET"]) - if !(server_events[1:6] == [ - "Request: GET /delay1 HTTP/1.1", - "Request: GET /delay2 HTTP/1.1", - "Request: POST /delay3 HTTP/1.1", - "Response: HTTP/1.1 200 OK <= (GET /delay1 HTTP/1.1)", - "Response: HTTP/1.1 200 OK <= (GET /delay2 HTTP/1.1)", - "Response: HTTP/1.1 200 OK <= (POST /delay3 HTTP/1.1)"] || - server_events[1:6] == [ - "Request: GET /delay1 HTTP/1.1", - "Request: GET /delay2 HTTP/1.1", - "Response: HTTP/1.1 200 OK <= (GET /delay1 HTTP/1.1)", - "Request: POST /delay3 HTTP/1.1", - "Response: HTTP/1.1 200 OK <= (GET /delay2 HTTP/1.1)", - "Response: HTTP/1.1 200 OK <= (POST /delay3 HTTP/1.1)"]) - @show server_events - end - @test server_events[1:6] == [ - "Request: GET /delay1 HTTP/1.1", - "Request: GET /delay2 HTTP/1.1", - "Request: POST /delay3 HTTP/1.1", - "Response: HTTP/1.1 200 OK <= (GET /delay1 HTTP/1.1)", - "Response: HTTP/1.1 200 OK <= (GET /delay2 HTTP/1.1)", - "Response: HTTP/1.1 200 OK <= (POST /delay3 HTTP/1.1)"] || - server_events[1:6] == [ - "Request: GET /delay1 HTTP/1.1", - "Request: GET /delay2 HTTP/1.1", - "Response: HTTP/1.1 200 OK <= (GET /delay1 HTTP/1.1)", - "Request: POST /delay3 HTTP/1.1", - "Response: HTTP/1.1 200 OK <= (GET /delay2 HTTP/1.1)", - "Response: HTTP/1.1 200 OK <= (POST /delay3 HTTP/1.1)"] - end - - HTTP.ConnectionPool.closeall() -end # @static end \ No newline at end of file diff --git a/test/runtests.jl b/test/runtests.jl index 08f16e3a5..3e831cbcc 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,7 +1,7 @@ using Test, HTTP, JSON const dir = joinpath(dirname(pathof(HTTP)), "..", "test") -include("resources/TestRequest.jl") +include(joinpath(dir, "resources/TestRequest.jl")) @testset "HTTP" begin for f in [ diff --git a/test/server.jl b/test/server.jl index e5af4544a..bc1e63e38 100644 --- a/test/server.jl +++ b/test/server.jl @@ -316,6 +316,7 @@ end # @testset sleep(1) # necessary to properly forget the closed connection from the previous call try HTTP.get("http://localhost:1234/close"; retry=false) catch end HTTP.get("http://localhost:1234", ["Connection" => "close"]) + sleep(1) # we want to make sure the server has time to finish logging before checking logs end @test length(logs) == 7 @test all(x -> x.group === :access, logs)