-
Notifications
You must be signed in to change notification settings - Fork 183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ConnectionPool connect_timeout and idle_timeout #215
Changes from 6 commits
b6667b0
a636f37
483cbef
132e037
ce4906b
f594189
ed87912
a5330d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,7 @@ Fields: | |
- `host::String` | ||
- `port::String`, exactly as specified in the URI (i.e. may be empty). | ||
- `pipeline_limit`, number of requests to send before waiting for responses. | ||
- `idle_timeout`, No. of sconds to maintain connection after last transaction. | ||
- `peerport`, remote TCP port number (used for debug messages). | ||
- `localport`, local TCP port number (used for debug messages). | ||
- `io::T`, the `Sockets.TCP` or `SSLContext. | ||
|
@@ -68,6 +69,7 @@ mutable struct Connection{T <: IO} | |
host::String | ||
port::String | ||
pipeline_limit::Int | ||
idle_timeout::Int | ||
peerport::UInt16 | ||
localport::UInt16 | ||
io::T | ||
|
@@ -95,16 +97,17 @@ struct Transaction{T <: IO} <: IO | |
end | ||
|
||
Connection(host::AbstractString, port::AbstractString, | ||
pipeline_limit::Int, io::T) where T <: IO = | ||
Connection{T}(host, port, pipeline_limit, | ||
pipeline_limit::Int, idle_timeout::Int, io::T) where T <: IO = | ||
Connection{T}(host, port, | ||
pipeline_limit, idle_timeout, | ||
peerport(io), localport(io), | ||
io, nobytes, | ||
-1, | ||
0, false, Condition(), | ||
0, false, Condition(), | ||
0) | ||
|
||
Connection(io) = Connection("", "", default_pipeline_limit, io) | ||
Connection(io) = Connection("", "", default_pipeline_limit, 0, io) | ||
|
||
Transaction(c::Connection{T}) where T <: IO = | ||
Transaction{T}(c, (c.sequence += 1)) | ||
|
@@ -139,7 +142,7 @@ function Base.eof(t::Transaction) | |
@require isreadable(t) || !isopen(t) | ||
if bytesavailable(t) > 0 | ||
return false | ||
end ;@debug 4 "eof(::Transaction) -> eof($(typeof(c.io))): $t" | ||
end ;@debug 4 "eof(::Transaction) -> eof($(typeof(t.c.io))): $t" | ||
return eof(t.c.io) | ||
end | ||
|
||
|
@@ -385,6 +388,17 @@ end | |
Remove closed connections from `pool`. | ||
""" | ||
function purge() | ||
|
||
for c in pool | ||
if c.idle_timeout > 0 && | ||
!c.readbusy && | ||
!c.writebusy && | ||
time() - c.timestamp > c.idle_timeout | ||
|
||
close(c.io) ;@debug 1 "⌛️ Timeout: $c" | ||
end | ||
end | ||
|
||
isdeletable(c) = !isopen(c.io) && (@debug 1 "🗑 Deleted: $c"; true) | ||
deleteat!(pool, map(isdeletable, pool)) | ||
end | ||
|
@@ -400,6 +414,7 @@ function getconnection(::Type{Transaction{T}}, | |
port::AbstractString; | ||
connection_limit::Int=default_connection_limit, | ||
pipeline_limit::Int=default_pipeline_limit, | ||
idle_timeout::Int=0, | ||
reuse_limit::Int=nolimit, | ||
kw...)::Transaction{T} where T <: IO | ||
|
||
|
@@ -431,7 +446,9 @@ function getconnection(::Type{Transaction{T}}, | |
busy = findall(T, host, port, pipeline_limit) | ||
if length(busy) < connection_limit | ||
io = getconnection(T, host, port; kw...) | ||
c = Connection(host, port, pipeline_limit, io) | ||
c = Connection(host, port, | ||
pipeline_limit, idle_timeout, | ||
io) | ||
push!(pool, c) ;@debug 1 "🔗 New: $c" | ||
return client_transaction(c) | ||
end | ||
|
@@ -459,14 +476,49 @@ function keepalive!(tcp) | |
return | ||
end | ||
|
||
struct ConnectTimeout <: Exception | ||
host | ||
port | ||
end | ||
|
||
function getconnection(::Type{Sockets.TCP}, | ||
host::AbstractString, | ||
port::AbstractString; | ||
keepalive::Bool=false, | ||
connect_timeout::Int=0, | ||
kw...)::Sockets.TCP | ||
|
||
p::UInt = isempty(port) ? UInt(80) : parse(UInt, port) | ||
|
||
@debug 2 "TCP connect: $host:$p..." | ||
tcp = Sockets.connect(Sockets.getaddrinfo(host), p) | ||
|
||
if connect_timeout == 0 | ||
tcp = Sockets.connect(Sockets.getaddrinfo(host), p) | ||
keepalive && keepalive!(tcp) | ||
return tcp | ||
end | ||
|
||
tcp = Sockets.TCPSocket() | ||
Base.connect!(tcp, Sockets.getaddrinfo(host), p) | ||
|
||
timeout = Ref{Bool}(false) | ||
@schedule begin | ||
sleep(connect_timeout) | ||
if tcp.status == Base.StatusConnecting | ||
timeout[] = true | ||
ccall(:jl_forceclose_uv, Void, (Ptr{Void},), tcp.handle) | ||
#close(tcp) | ||
end | ||
end | ||
try | ||
Base.wait_connected(tcp) | ||
catch e | ||
if timeout[] | ||
throw(ConnectTimeout(host, port)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you want to make sure it throws this exception? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because I don't think I think application level code will want to handle "I set a short timeout and it wasn't met" differently from "something bad happened deep in the network stack. Elsewhere in HTTP.jl we wrap things like In higher level code it is important to handle some specific errors by retrying but make sure everything else finds its way up to create a crash and a stack trace and an issue report and a fix. e.g. https://github.com/JuliaCloud/AWSCore.jl/blob/master/src/http.jl#L47-L51 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @vtjnash, I just noticed that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure how that's relevant here, since you don't have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was assuming that I could get the
👍🙃 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (a) the socket isn't open, so you can't poll it (b) it's invalid to pass libuv-derived fd handles to libuv There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thx for taking the time to explain :) |
||
end | ||
rethrow(e) | ||
end | ||
|
||
keepalive && keepalive!(tcp) | ||
return tcp | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vtjnash close didn't work here (0.6.2).
But I assume that's some small detail of handling status flags in Base.close because closing while opening is not a common operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, maybe fixed on master? This is right, but should also set the state to StatusClosing