Skip to content

Commit

Permalink
add ConnectionPool connect_timeout and idle_timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
samoconnor committed Mar 3, 2018
1 parent b24f49a commit 5cf9161
Showing 1 changed file with 47 additions and 8 deletions.
55 changes: 47 additions & 8 deletions src/ConnectionPool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Fields:
- `host::String`
- `port::String`, exactly as specified in the URI (i.e. may be empty).
- `pipeline_limit`, number of requests to send before waiting for responses.
- `idle_timeout`, No. of sconds to maintain connection after last transaction.
- `peerport`, remote TCP port number (used for debug messages).
- `localport`, local TCP port number (used for debug messages).
- `io::T`, the `TCPSocket` or `SSLContext.
Expand All @@ -68,6 +69,7 @@ mutable struct Connection{T <: IO}
host::String
port::String
pipeline_limit::Int
idle_timeout::Int
peerport::UInt16
localport::UInt16
io::T
Expand Down Expand Up @@ -95,16 +97,17 @@ struct Transaction{T <: IO} <: IO
end

Connection(host::AbstractString, port::AbstractString,
pipeline_limit::Int, io::T) where T <: IO =
Connection{T}(host, port, pipeline_limit,
pipeline_limit::Int, idle_timeout::Int, io::T) where T <: IO =
Connection{T}(host, port,
pipeline_limit, idle_timeout,
peerport(io), localport(io),
io, nobytes,
-1,
0, false, Condition(),
0, false, Condition(),
0)

Connection(io) = Connection("", "", default_pipeline_limit, io)
Connection(io) = Connection("", "", 0, default_pipeline_limit, 0, io)

Transaction(c::Connection{T}) where T <: IO =
Transaction{T}(c, (c.sequence += 1))
Expand Down Expand Up @@ -385,7 +388,11 @@ end
Remove closed connections from `pool`.
"""
function purge()
isdeletable(c) = !isopen(c.io) && (@debug 1 "🗑 Deleted: $c"; true)
isdeletable(c) = (!isopen(c.io) || (c.idle_timeout > 0 &&
!c.readbusy &&
!c.writebusy &&
time()-c.timestamp > c.idle_timeout)) &&
(@debug 1 "🗑 Deleted: $c"; true)
deleteat!(pool, map(isdeletable, pool))
end

Expand All @@ -400,6 +407,7 @@ function getconnection(::Type{Transaction{T}},
port::AbstractString;
connection_limit::Int=default_connection_limit,
pipeline_limit::Int=default_pipeline_limit,
idle_timeout::Int=0,
reuse_limit::Int=nolimit,
kw...)::Transaction{T} where T <: IO

Expand Down Expand Up @@ -431,7 +439,9 @@ function getconnection(::Type{Transaction{T}},
busy = findall(T, host, port, pipeline_limit)
if length(busy) < connection_limit
io = getconnection(T, host, port; kw...)
c = Connection(host, port, pipeline_limit, io)
c = Connection(host, port,
pipeline_limit, idle_timeout,
io)
push!(pool, c) ;@debug 1 "🔗 New: $c"
return client_transaction(c)
end
Expand Down Expand Up @@ -459,16 +469,45 @@ function keepalive!(tcp)
return
end

struct ConnectTimeout <: Exception
host
port
end

function getconnection(::Type{TCPSocket},
host::AbstractString,
port::AbstractString;
keepalive::Bool=false,
connect_timeout::Int=0,
kw...)::TCPSocket

p::UInt = isempty(port) ? UInt(80) : parse(UInt, port)

@debug 2 "TCP connect: $host:$p..."
tcp = connect(getaddrinfo(host), p)
keepalive && keepalive!(tcp)
return tcp

if connect_timeout == 0
tcp = connect(getaddrinfo(host), p)
keepalive && keepalive!(tcp)
return tcp
end

# FIXME this currently causes the Julia runtime to hang on exit...
result = Ref{TCPSocket}()
error = Ref{Any}()
@schedule try
result[] = connect(getaddrinfo(host), p)
catch e
error[] = e
end
sleep(connect_timeout)
if isassigned(error)
throw(error[])
end
if !isassigned(result)
throw(ConnectTimeout(host, port))
end
keepalive && keepalive!(result[])
return result[]
end

const nosslconfig = SSLConfig()
Expand Down

0 comments on commit 5cf9161

Please sign in to comment.