diff --git a/src/ConnectionPool.jl b/src/ConnectionPool.jl index 9a7a649f6..e7ef04619 100644 --- a/src/ConnectionPool.jl +++ b/src/ConnectionPool.jl @@ -51,6 +51,7 @@ Fields: - `host::String` - `port::String`, exactly as specified in the URI (i.e. may be empty). - `pipeline_limit`, number of requests to send before waiting for responses. +- `idle_timeout`, No. of sconds to maintain connection after last transaction. - `peerport`, remote TCP port number (used for debug messages). - `localport`, local TCP port number (used for debug messages). - `io::T`, the `TCPSocket` or `SSLContext. @@ -68,6 +69,7 @@ mutable struct Connection{T <: IO} host::String port::String pipeline_limit::Int + idle_timeout::Int peerport::UInt16 localport::UInt16 io::T @@ -95,8 +97,9 @@ struct Transaction{T <: IO} <: IO end Connection(host::AbstractString, port::AbstractString, - pipeline_limit::Int, io::T) where T <: IO = - Connection{T}(host, port, pipeline_limit, + pipeline_limit::Int, idle_timeout::Int, io::T) where T <: IO = + Connection{T}(host, port, + pipeline_limit, idle_timeout, peerport(io), localport(io), io, nobytes, -1, @@ -104,7 +107,7 @@ Connection(host::AbstractString, port::AbstractString, 0, false, Condition(), 0) -Connection(io) = Connection("", "", default_pipeline_limit, io) +Connection(io) = Connection("", "", default_pipeline_limit, 0, io) Transaction(c::Connection{T}) where T <: IO = Transaction{T}(c, (c.sequence += 1)) @@ -139,7 +142,7 @@ function Base.eof(t::Transaction) @require isreadable(t) || !isopen(t) if bytesavailable(t) > 0 return false - end ;@debug 4 "eof(::Transaction) -> eof($(typeof(c.io))): $t" + end ;@debug 4 "eof(::Transaction) -> eof($(typeof(t.c.io))): $t" return eof(t.c.io) end @@ -392,6 +395,17 @@ end Remove closed connections from `pool`. """ function purge() + + for c in pool + if c.idle_timeout > 0 && + !c.readbusy && + !c.writebusy && + time() - c.timestamp > c.idle_timeout + + close(c.io) ;@debug 1 "⌛️ Timeout: $c" + end + end + isdeletable(c) = !isopen(c.io) && (@debug 1 "🗑 Deleted: $c"; true) deleteat!(pool, map(isdeletable, pool)) end @@ -407,6 +421,7 @@ function getconnection(::Type{Transaction{T}}, port::AbstractString; connection_limit::Int=default_connection_limit, pipeline_limit::Int=default_pipeline_limit, + idle_timeout::Int=0, reuse_limit::Int=nolimit, kw...)::Transaction{T} where T <: IO @@ -438,7 +453,9 @@ function getconnection(::Type{Transaction{T}}, busy = findall(T, host, port, pipeline_limit) if length(busy) < connection_limit io = getconnection(T, host, port; kw...) - c = Connection(host, port, pipeline_limit, io) + c = Connection(host, port, + pipeline_limit, idle_timeout, + io) push!(pool, c) ;@debug 1 "🔗 New: $c" return client_transaction(c) end @@ -466,14 +483,50 @@ function keepalive!(tcp) return end +struct ConnectTimeout <: Exception + host + port +end + function getconnection(::Type{TCPSocket}, host::AbstractString, port::AbstractString; keepalive::Bool=false, - kw...)::TCPSocket + connect_timeout::Int=0, + kw...)::Sockets.TCP + p::UInt = isempty(port) ? UInt(80) : parse(UInt, port) + @debug 2 "TCP connect: $host:$p..." - tcp = Sockets.connect(Sockets.getaddrinfo(host), p) + + if connect_timeout == 0 + tcp = Sockets.connect(Sockets.getaddrinfo(host), p) + keepalive && keepalive!(tcp) + return tcp + end + + tcp = Sockets.TCPSocket() + Base.connect!(tcp, Sockets.getaddrinfo(host), p) + + timeout = Ref{Bool}(false) + @schedule begin + sleep(connect_timeout) + if tcp.status == Base.StatusConnecting + timeout[] = true + tcp.status = Base.StatusClosing + ccall(:jl_forceclose_uv, Void, (Ptr{Void},), tcp.handle) + #close(tcp) + end + end + try + Base.wait_connected(tcp) + catch e + if timeout[] + throw(ConnectTimeout(host, port)) + end + rethrow(e) + end + keepalive && keepalive!(tcp) return tcp end diff --git a/src/Servers.jl b/src/Servers.jl index 174b3aea6..ceaec717c 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -309,7 +309,7 @@ function listen(f::Function, continue end io = ssl ? getsslcontext(io, sslconfig) : io - let io = Connection(host, string(port), pipeline_limit, io) + let io = Connection(host, string(port), pipeline_limit, 0, io) @info "Accept: $io" @async try handle_connection(f, io; kw...)