Skip to content

Commit

Permalink
Merge pull request #215 from JuliaWeb/idle_timeout
Browse files Browse the repository at this point in the history
Add ConnectionPool connect_timeout and idle_timeout
  • Loading branch information
samoconnor authored Mar 20, 2018
2 parents 9e41fc3 + a5330d5 commit f2ea5ef
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 8 deletions.
67 changes: 60 additions & 7 deletions src/ConnectionPool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Fields:
- `host::String`
- `port::String`, exactly as specified in the URI (i.e. may be empty).
- `pipeline_limit`, number of requests to send before waiting for responses.
- `idle_timeout`, No. of sconds to maintain connection after last transaction.
- `peerport`, remote TCP port number (used for debug messages).
- `localport`, local TCP port number (used for debug messages).
- `io::T`, the `TCPSocket` or `SSLContext.
Expand All @@ -68,6 +69,7 @@ mutable struct Connection{T <: IO}
host::String
port::String
pipeline_limit::Int
idle_timeout::Int
peerport::UInt16
localport::UInt16
io::T
Expand Down Expand Up @@ -95,16 +97,17 @@ struct Transaction{T <: IO} <: IO
end

Connection(host::AbstractString, port::AbstractString,
pipeline_limit::Int, io::T) where T <: IO =
Connection{T}(host, port, pipeline_limit,
pipeline_limit::Int, idle_timeout::Int, io::T) where T <: IO =
Connection{T}(host, port,
pipeline_limit, idle_timeout,
peerport(io), localport(io),
io, nobytes,
-1,
0, false, Condition(),
0, false, Condition(),
0)

Connection(io) = Connection("", "", default_pipeline_limit, io)
Connection(io) = Connection("", "", default_pipeline_limit, 0, io)

Transaction(c::Connection{T}) where T <: IO =
Transaction{T}(c, (c.sequence += 1))
Expand Down Expand Up @@ -139,7 +142,7 @@ function Base.eof(t::Transaction)
@require isreadable(t) || !isopen(t)
if bytesavailable(t) > 0
return false
end ;@debug 4 "eof(::Transaction) -> eof($(typeof(c.io))): $t"
end ;@debug 4 "eof(::Transaction) -> eof($(typeof(t.c.io))): $t"
return eof(t.c.io)
end

Expand Down Expand Up @@ -392,6 +395,17 @@ end
Remove closed connections from `pool`.
"""
function purge()

for c in pool
if c.idle_timeout > 0 &&
!c.readbusy &&
!c.writebusy &&
time() - c.timestamp > c.idle_timeout

close(c.io) ;@debug 1 "⌛️ Timeout: $c"
end
end

isdeletable(c) = !isopen(c.io) && (@debug 1 "🗑 Deleted: $c"; true)
deleteat!(pool, map(isdeletable, pool))
end
Expand All @@ -407,6 +421,7 @@ function getconnection(::Type{Transaction{T}},
port::AbstractString;
connection_limit::Int=default_connection_limit,
pipeline_limit::Int=default_pipeline_limit,
idle_timeout::Int=0,
reuse_limit::Int=nolimit,
kw...)::Transaction{T} where T <: IO

Expand Down Expand Up @@ -438,7 +453,9 @@ function getconnection(::Type{Transaction{T}},
busy = findall(T, host, port, pipeline_limit)
if length(busy) < connection_limit
io = getconnection(T, host, port; kw...)
c = Connection(host, port, pipeline_limit, io)
c = Connection(host, port,
pipeline_limit, idle_timeout,
io)
push!(pool, c) ;@debug 1 "🔗 New: $c"
return client_transaction(c)
end
Expand Down Expand Up @@ -466,14 +483,50 @@ function keepalive!(tcp)
return
end

struct ConnectTimeout <: Exception
host
port
end

function getconnection(::Type{TCPSocket},
host::AbstractString,
port::AbstractString;
keepalive::Bool=false,
kw...)::TCPSocket
connect_timeout::Int=0,
kw...)::Sockets.TCP

p::UInt = isempty(port) ? UInt(80) : parse(UInt, port)

@debug 2 "TCP connect: $host:$p..."
tcp = Sockets.connect(Sockets.getaddrinfo(host), p)

if connect_timeout == 0
tcp = Sockets.connect(Sockets.getaddrinfo(host), p)
keepalive && keepalive!(tcp)
return tcp
end

tcp = Sockets.TCPSocket()
Base.connect!(tcp, Sockets.getaddrinfo(host), p)

timeout = Ref{Bool}(false)
@schedule begin
sleep(connect_timeout)
if tcp.status == Base.StatusConnecting
timeout[] = true
tcp.status = Base.StatusClosing
ccall(:jl_forceclose_uv, Void, (Ptr{Void},), tcp.handle)
#close(tcp)
end
end
try
Base.wait_connected(tcp)
catch e
if timeout[]
throw(ConnectTimeout(host, port))
end
rethrow(e)
end

keepalive && keepalive!(tcp)
return tcp
end
Expand Down
2 changes: 1 addition & 1 deletion src/Servers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ function listen(f::Function,
continue
end
io = ssl ? getsslcontext(io, sslconfig) : io
let io = Connection(host, string(port), pipeline_limit, io)
let io = Connection(host, string(port), pipeline_limit, 0, io)
@info "Accept: $io"
@async try
handle_connection(f, io; kw...)
Expand Down

0 comments on commit f2ea5ef

Please sign in to comment.