diff --git a/src/ConnectionPool.jl b/src/ConnectionPool.jl index 15420c0fe..d32122506 100644 --- a/src/ConnectionPool.jl +++ b/src/ConnectionPool.jl @@ -51,6 +51,7 @@ Fields: - `host::String` - `port::String`, exactly as specified in the URI (i.e. may be empty). - `pipeline_limit`, number of requests to send before waiting for responses. +- `idle_timeout`, No. of sconds to maintain connection after last transaction. - `peerport`, remote TCP port number (used for debug messages). - `localport`, local TCP port number (used for debug messages). - `io::T`, the `TCPSocket` or `SSLContext. @@ -68,6 +69,7 @@ mutable struct Connection{T <: IO} host::String port::String pipeline_limit::Int + idle_timeout::Int peerport::UInt16 localport::UInt16 io::T @@ -95,8 +97,9 @@ struct Transaction{T <: IO} <: IO end Connection(host::AbstractString, port::AbstractString, - pipeline_limit::Int, io::T) where T <: IO = - Connection{T}(host, port, pipeline_limit, + pipeline_limit::Int, idle_timeout::Int, io::T) where T <: IO = + Connection{T}(host, port, + pipeline_limit, idle_timeout, peerport(io), localport(io), io, nobytes, -1, @@ -104,7 +107,7 @@ Connection(host::AbstractString, port::AbstractString, 0, false, Condition(), 0) -Connection(io) = Connection("", "", default_pipeline_limit, io) +Connection(io) = Connection("", "", 0, default_pipeline_limit, 0, io) Transaction(c::Connection{T}) where T <: IO = Transaction{T}(c, (c.sequence += 1)) @@ -385,7 +388,11 @@ end Remove closed connections from `pool`. """ function purge() - isdeletable(c) = !isopen(c.io) && (@debug 1 "🗑 Deleted: $c"; true) + isdeletable(c) = (!isopen(c.io) || (c.idle_timeout > 0 && + !c.readbusy && + !c.writebusy && + time()-c.timestamp > c.idle_timeout)) && + (@debug 1 "🗑 Deleted: $c"; true) deleteat!(pool, map(isdeletable, pool)) end @@ -400,6 +407,7 @@ function getconnection(::Type{Transaction{T}}, port::AbstractString; connection_limit::Int=default_connection_limit, pipeline_limit::Int=default_pipeline_limit, + idle_timeout::Int=0, reuse_limit::Int=nolimit, kw...)::Transaction{T} where T <: IO @@ -431,7 +439,9 @@ function getconnection(::Type{Transaction{T}}, busy = findall(T, host, port, pipeline_limit) if length(busy) < connection_limit io = getconnection(T, host, port; kw...) - c = Connection(host, port, pipeline_limit, io) + c = Connection(host, port, + pipeline_limit, idle_timeout, + io) push!(pool, c) ;@debug 1 "🔗 New: $c" return client_transaction(c) end @@ -459,16 +469,45 @@ function keepalive!(tcp) return end +struct ConnectTimeout <: Exception + host + port +end + function getconnection(::Type{TCPSocket}, host::AbstractString, port::AbstractString; keepalive::Bool=false, + connect_timeout::Int=0, kw...)::TCPSocket + p::UInt = isempty(port) ? UInt(80) : parse(UInt, port) + @debug 2 "TCP connect: $host:$p..." - tcp = connect(getaddrinfo(host), p) - keepalive && keepalive!(tcp) - return tcp + + if connect_timeout == 0 + tcp = connect(getaddrinfo(host), p) + keepalive && keepalive!(tcp) + return tcp + end + + # FIXME this currently causes the Julia runtime to hang on exit... + result = Ref{TCPSocket}() + error = Ref{Any}() + @schedule try + result[] = connect(getaddrinfo(host), p) + catch e + error[] = e + end + sleep(connect_timeout) + if isassigned(error) + throw(error[]) + end + if !isassigned(result) + throw(ConnectTimeout(host, port)) + end + keepalive && keepalive!(result[]) + return result[] end const nosslconfig = SSLConfig()