From b6667b08caf59dd37d02405f6f2473fc4825cee5 Mon Sep 17 00:00:00 2001 From: Sam O'Connor Date: Sat, 3 Mar 2018 22:26:22 +1100 Subject: [PATCH 1/7] add ConnectionPool connect_timeout and idle_timeout --- src/ConnectionPool.jl | 57 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/src/ConnectionPool.jl b/src/ConnectionPool.jl index 9446bcb81..7a7dd1e10 100644 --- a/src/ConnectionPool.jl +++ b/src/ConnectionPool.jl @@ -51,6 +51,7 @@ Fields: - `host::String` - `port::String`, exactly as specified in the URI (i.e. may be empty). - `pipeline_limit`, number of requests to send before waiting for responses. +- `idle_timeout`, No. of sconds to maintain connection after last transaction. - `peerport`, remote TCP port number (used for debug messages). - `localport`, local TCP port number (used for debug messages). - `io::T`, the `Sockets.TCP` or `SSLContext. @@ -68,6 +69,7 @@ mutable struct Connection{T <: IO} host::String port::String pipeline_limit::Int + idle_timeout::Int peerport::UInt16 localport::UInt16 io::T @@ -95,8 +97,9 @@ struct Transaction{T <: IO} <: IO end Connection(host::AbstractString, port::AbstractString, - pipeline_limit::Int, io::T) where T <: IO = - Connection{T}(host, port, pipeline_limit, + pipeline_limit::Int, idle_timeout::Int, io::T) where T <: IO = + Connection{T}(host, port, + pipeline_limit, idle_timeout, peerport(io), localport(io), io, nobytes, -1, @@ -104,7 +107,7 @@ Connection(host::AbstractString, port::AbstractString, 0, false, Condition(), 0) -Connection(io) = Connection("", "", default_pipeline_limit, io) +Connection(io) = Connection("", "", 0, default_pipeline_limit, 0, io) Transaction(c::Connection{T}) where T <: IO = Transaction{T}(c, (c.sequence += 1)) @@ -385,7 +388,11 @@ end Remove closed connections from `pool`. """ function purge() - isdeletable(c) = !isopen(c.io) && (@debug 1 "🗑 Deleted: $c"; true) + isdeletable(c) = (!isopen(c.io) || (c.idle_timeout > 0 && + !c.readbusy && + !c.writebusy && + time()-c.timestamp > c.idle_timeout)) && + (@debug 1 "🗑 Deleted: $c"; true) deleteat!(pool, map(isdeletable, pool)) end @@ -400,6 +407,7 @@ function getconnection(::Type{Transaction{T}}, port::AbstractString; connection_limit::Int=default_connection_limit, pipeline_limit::Int=default_pipeline_limit, + idle_timeout::Int=0, reuse_limit::Int=nolimit, kw...)::Transaction{T} where T <: IO @@ -431,7 +439,9 @@ function getconnection(::Type{Transaction{T}}, busy = findall(T, host, port, pipeline_limit) if length(busy) < connection_limit io = getconnection(T, host, port; kw...) - c = Connection(host, port, pipeline_limit, io) + c = Connection(host, port, + pipeline_limit, idle_timeout, + io) push!(pool, c) ;@debug 1 "🔗 New: $c" return client_transaction(c) end @@ -459,16 +469,45 @@ function keepalive!(tcp) return end -function getconnection(::Type{Sockets.TCP}, +struct ConnectTimeout <: Exception + host + port +end + +function getconnection(::Type{TCPSocket}, host::AbstractString, port::AbstractString; keepalive::Bool=false, + connect_timeout::Int=0, kw...)::Sockets.TCP + p::UInt = isempty(port) ? UInt(80) : parse(UInt, port) + @debug 2 "TCP connect: $host:$p..." - tcp = Sockets.connect(Sockets.getaddrinfo(host), p) - keepalive && keepalive!(tcp) - return tcp + + if connect_timeout == 0 + tcp = Sockets.connect(Sockets.getaddrinfo(host), p) + keepalive && keepalive!(tcp) + return tcp + end + + # FIXME this currently causes the Julia runtime to hang on exit... + result = Ref{TCPSocket}() + error = Ref{Any}() + @schedule try + result[] = Sockets.connect(Sockets.getaddrinfo(host), p) + catch e + error[] = e + end + sleep(connect_timeout) + if isassigned(error) + throw(error[]) + end + if !isassigned(result) + throw(ConnectTimeout(host, port)) + end + keepalive && keepalive!(result[]) + return result[] end const nosslconfig = SSLConfig() From a636f3781f6eafdb80893946b38480ea732091ba Mon Sep 17 00:00:00 2001 From: Sam O'Connor Date: Sun, 4 Mar 2018 17:46:14 +1100 Subject: [PATCH 2/7] tweak Servers.jl for new Connection args --- src/ConnectionPool.jl | 2 +- src/Servers.jl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ConnectionPool.jl b/src/ConnectionPool.jl index 7a7dd1e10..a52d9cc48 100644 --- a/src/ConnectionPool.jl +++ b/src/ConnectionPool.jl @@ -474,7 +474,7 @@ struct ConnectTimeout <: Exception port end -function getconnection(::Type{TCPSocket}, +function getconnection(::Type{Sockets.TCP}, host::AbstractString, port::AbstractString; keepalive::Bool=false, diff --git a/src/Servers.jl b/src/Servers.jl index a844f2950..472adb1f0 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -309,7 +309,7 @@ function listen(f::Function, continue end io = ssl ? getsslcontext(io, sslconfig) : io - let io = Connection(host, string(port), pipeline_limit, io) + let io = Connection(host, string(port), pipeline_limit, 0, io) @info "Accept: $io" @async try handle_connection(f, io; kw...) From 483cbef31f9e6daf81a57e809dffa441ddb149a1 Mon Sep 17 00:00:00 2001 From: Sam O'Connor Date: Sun, 4 Mar 2018 18:41:26 +1100 Subject: [PATCH 3/7] close connections on idle_timeout --- src/ConnectionPool.jl | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/ConnectionPool.jl b/src/ConnectionPool.jl index a52d9cc48..ccd25b037 100644 --- a/src/ConnectionPool.jl +++ b/src/ConnectionPool.jl @@ -142,7 +142,7 @@ function Base.eof(t::Transaction) @require isreadable(t) || !isopen(t) if bytesavailable(t) > 0 return false - end ;@debug 4 "eof(::Transaction) -> eof($(typeof(c.io))): $t" + end ;@debug 4 "eof(::Transaction) -> eof($(typeof(t.c.io))): $t" return eof(t.c.io) end @@ -388,11 +388,18 @@ end Remove closed connections from `pool`. """ function purge() - isdeletable(c) = (!isopen(c.io) || (c.idle_timeout > 0 && - !c.readbusy && - !c.writebusy && - time()-c.timestamp > c.idle_timeout)) && - (@debug 1 "🗑 Deleted: $c"; true) + + for c in pool + if c.idle_timeout > 0 && + !c.readbusy && + !c.writebusy && + time() - c.timestamp > c.idle_timeout + + close(c.io) ;@debug 1 "⌛️ Timeout: $c" + end + end + + isdeletable(c) = !isopen(c.io) && (@debug 1 "🗑 Deleted: $c"; true) deleteat!(pool, map(isdeletable, pool)) end From 132e037b4b5b0f44e305b75bb644d6d212aec000 Mon Sep 17 00:00:00 2001 From: Sam O'Connor Date: Sun, 4 Mar 2018 18:44:27 +1100 Subject: [PATCH 4/7] fix Connection(io) constructor --- src/ConnectionPool.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ConnectionPool.jl b/src/ConnectionPool.jl index ccd25b037..0447b202c 100644 --- a/src/ConnectionPool.jl +++ b/src/ConnectionPool.jl @@ -107,7 +107,7 @@ Connection(host::AbstractString, port::AbstractString, 0, false, Condition(), 0) -Connection(io) = Connection("", "", 0, default_pipeline_limit, 0, io) +Connection(io) = Connection("", "", default_pipeline_limit, 0, io) Transaction(c::Connection{T}) where T <: IO = Transaction{T}(c, (c.sequence += 1)) From ce4906b84875b4325e6931a814267d0ff78c1a07 Mon Sep 17 00:00:00 2001 From: Sam O'Connor Date: Mon, 5 Mar 2018 23:03:13 +1100 Subject: [PATCH 5/7] revised connect_timeout implementation --- src/ConnectionPool.jl | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/src/ConnectionPool.jl b/src/ConnectionPool.jl index 0447b202c..8bbb21276 100644 --- a/src/ConnectionPool.jl +++ b/src/ConnectionPool.jl @@ -498,23 +498,26 @@ function getconnection(::Type{Sockets.TCP}, return tcp end - # FIXME this currently causes the Julia runtime to hang on exit... - result = Ref{TCPSocket}() - error = Ref{Any}() - @schedule try - result[] = Sockets.connect(Sockets.getaddrinfo(host), p) - catch e - error[] = e - end - sleep(connect_timeout) - if isassigned(error) - throw(error[]) - end - if !isassigned(result) - throw(ConnectTimeout(host, port)) + tcp = Sockets.TCPSocket() + Base.connect!(tcp, Sockets.getaddrinfo(host), p) + yield() + + # sum(delays) ~= connect_timeout + delays = ExponentialBackOff(n=connect_timeout + 10, + first_delay=0.001, + factor=1.871, + max_delay=1) + for d in delays + if tcp.status != Base.StatusConnecting + Base.check_open(tcp) + keepalive && keepalive!(tcp) + return tcp + end + sleep(d) end - keepalive && keepalive!(result[]) - return result[] + + ccall(:jl_forceclose_uv, Void, (Ptr{Void},), tcp.handle) + throw(ConnectTimeout(host, port)) end const nosslconfig = SSLConfig() From f594189d38f1ed3c71deee195207ca9985692737 Mon Sep 17 00:00:00 2001 From: Sam O'Connor Date: Tue, 6 Mar 2018 00:33:49 +1100 Subject: [PATCH 6/7] revised timeout --- src/ConnectionPool.jl | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/ConnectionPool.jl b/src/ConnectionPool.jl index 8bbb21276..ac522379b 100644 --- a/src/ConnectionPool.jl +++ b/src/ConnectionPool.jl @@ -500,24 +500,27 @@ function getconnection(::Type{Sockets.TCP}, tcp = Sockets.TCPSocket() Base.connect!(tcp, Sockets.getaddrinfo(host), p) - yield() - - # sum(delays) ~= connect_timeout - delays = ExponentialBackOff(n=connect_timeout + 10, - first_delay=0.001, - factor=1.871, - max_delay=1) - for d in delays - if tcp.status != Base.StatusConnecting - Base.check_open(tcp) - keepalive && keepalive!(tcp) - return tcp + + timeout = Ref{Bool}(false) + @schedule begin + sleep(connect_timeout) + if tcp.status == Base.StatusConnecting + timeout[] = true + ccall(:jl_forceclose_uv, Void, (Ptr{Void},), tcp.handle) + #close(tcp) + end + end + try + Base.wait_connected(tcp) + catch e + if timeout[] + throw(ConnectTimeout(host, port)) end - sleep(d) + rethrow(e) end - ccall(:jl_forceclose_uv, Void, (Ptr{Void},), tcp.handle) - throw(ConnectTimeout(host, port)) + keepalive && keepalive!(tcp) + return tcp end const nosslconfig = SSLConfig() From ed87912773a9b2ad6f6d0ddaacefae1814fd96a9 Mon Sep 17 00:00:00 2001 From: Sam O'Connor Date: Tue, 6 Mar 2018 00:42:39 +1100 Subject: [PATCH 7/7] set tcp.status = Base.StatusClosing when closing on timeout --- src/ConnectionPool.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ConnectionPool.jl b/src/ConnectionPool.jl index ac522379b..f36172fdc 100644 --- a/src/ConnectionPool.jl +++ b/src/ConnectionPool.jl @@ -506,6 +506,7 @@ function getconnection(::Type{Sockets.TCP}, sleep(connect_timeout) if tcp.status == Base.StatusConnecting timeout[] = true + tcp.status = Base.StatusClosing ccall(:jl_forceclose_uv, Void, (Ptr{Void},), tcp.handle) #close(tcp) end