Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup all timeout code and places where we might timeout #910

Merged
merged 2 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 40 additions & 44 deletions src/ConnectionPool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ remotely closed, a connection will be reused.
"""
module ConnectionPool

export Connection, newconnection, releaseconnection, getrawstream, inactiveseconds
export Connection, newconnection, releaseconnection, getrawstream, inactiveseconds, shouldtimeout

using Sockets, LoggingExtras, NetworkOptions
using MbedTLS: SSLConfig, SSLContext, setup!, associate!, hostname!, handshake!
Expand Down Expand Up @@ -99,6 +99,8 @@ getrawstream(c::Connection) = c.io

inactiveseconds(c::Connection)::Float64 = time() - c.timestamp

shouldtimeout(c::Connection, readtimeout) = !isreadable(c) || inactiveseconds(c) > readtimeout

Base.unsafe_write(c::Connection, p::Ptr{UInt8}, n::UInt) =
unsafe_write(c.io, p, n)

Expand Down Expand Up @@ -379,6 +381,14 @@ struct ConnectTimeout <: Exception
port
end

function checkconnected(tcp)
if tcp.status == Base.StatusConnecting
close(tcp)
return false
end
return true
end

function getconnection(::Type{TCPSocket},
host::AbstractString,
port::AbstractString;
Expand All @@ -388,51 +398,29 @@ function getconnection(::Type{TCPSocket},
kw...)::TCPSocket

p::UInt = isempty(port) ? UInt(80) : parse(UInt, port)

@debugv 2 "TCP connect: $host:$p..."

addrs = Sockets.getalladdrinfo(host)

connect_timeout = connect_timeout == 0 && readtimeout > 0 ? readtimeout : connect_timeout

lasterr = ErrorException("unknown connection error")

for addr in addrs
if connect_timeout == 0
try
tcp = Sockets.connect(addr, p)
keepalive && keepalive!(tcp)
return tcp
catch err
lasterr = err
continue # to next ip addr
end
else
tcp = Sockets.TCPSocket()
Sockets.connect!(tcp, addr, p)

timeout = Ref{Bool}(false)
@async begin
sleep(connect_timeout)
if tcp.status == Base.StatusConnecting
timeout[] = true
tcp.status = Base.StatusClosing
ccall(:jl_forceclose_uv, Nothing, (Ptr{Nothing},), tcp.handle)
#close(tcp)
try
return if connect_timeout > 0
tcp = Sockets.TCPSocket()
Sockets.connect!(tcp, addr, p)
try_with_timeout(() -> checkconnected(tcp), connect_timeout, () -> close(tcp)) do
Sockets.wait_connected(tcp)
keepalive && keepalive!(tcp)
end
end
try
Sockets.wait_connected(tcp)
keepalive && keepalive!(tcp)
return tcp
catch err
if timeout[]
lasterr = ConnectTimeout(host, port)
else
lasterr = err
end
continue # to next ip addr
else
tcp = Sockets.connect(addr, p)
keepalive && keepalive!(tcp)
tcp
end
catch e
lasterr = e isa TimeoutError ? ConnectTimeout(host, port) : e
continue
end
end
# If no connetion could be set up, to any address, throw last error
Expand Down Expand Up @@ -484,15 +472,23 @@ end
function sslupgrade(c::Connection,
host::AbstractString;
require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"),
readtimeout::Int=0,
kw...)::Connection
# first we release the original connection, but we don't want it to be
# reused in the pool, because we're hijacking the TCPSocket
release(POOL, connectionkey(c), c; return_for_reuse=false)
# now we hijack the TCPSocket and upgrade to SSLContext
tls = sslconnection(c.io, host;
require_ssl_verification=require_ssl_verification,
kw...)
# initiate the upgrade to SSL
# if the upgrade fails, an error will be thrown and the original c will be closed
# in ConnectionRequest
tls = if readtimeout > 0
try_with_timeout(() -> shouldtimeout(c, readtimeout), readtimeout, () -> close(c)) do
sslconnection(c.io, host; require_ssl_verification=require_ssl_verification, kw...)
end
else
sslconnection(c.io, host; require_ssl_verification=require_ssl_verification, kw...)
end
# success, now we turn it into a new Connection
conn = Connection(host, "", 0, require_ssl_verification, tls)
# release the "old" one, but don't allow reuse since we're hijacking the socket
release(POOL, connectionkey(c), c; return_for_reuse=false)
# and return the new one
return acquire(POOL, connectionkey(conn), conn)
end

Expand Down
36 changes: 35 additions & 1 deletion src/Exceptions.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module Exceptions

export @try, HTTPError, ConnectError, TimeoutError, StatusError, RequestError
export @try, try_with_timeout, HTTPError, ConnectError, TimeoutError, StatusError, RequestError
using LoggingExtras
import ..HTTP # for doc references

@eval begin
Expand All @@ -24,6 +25,39 @@ macro $(:try)(exes...)
end
end # @eval

function try_with_timeout(f, shouldtimeout, delay, iftimeout=() -> nothing)
@assert delay > 0
cond = Condition()
# execute f async
t = @async try
notify(cond, f())
catch e
@debugv 1 "error executing f in try_with_timeout"
notify(cond, e)
end
# start a timer
timer = Timer(delay; interval=delay / 10) do tm
if shouldtimeout()
@debugv 1 "❗️ Timeout: $delay"
notify(cond, TimeoutError(delay))
iftimeout()
close(tm)
end
end
res = wait(cond)
@debugv 1 "try_with_timeout finished with: $res"
if res isa TimeoutError
# timedout
throw(res)
end
# didn't timeout
close(timer)
if res isa Exception
throw(res)
end
return res
end

abstract type HTTPError <: Exception end

"""
Expand Down
21 changes: 15 additions & 6 deletions src/clientlayers/ConnectionRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Close the connection if the request throws an exception.
Otherwise leave it open so that it can be reused.
"""
function connectionlayer(handler)
return function(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, kw...)
return function(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, readtimeout::Int=0, kw...)
if proxy !== nothing
target_url = req.url
url = URI(proxy)
Expand All @@ -73,7 +73,7 @@ function connectionlayer(handler)
IOType = sockettype(url, socket_type)
local io
try
io = newconnection(IOType, url.host, url.port; kw...)
io = newconnection(IOType, url.host, url.port; readtimeout=readtimeout, kw...)
catch e
throw(ConnectError(string(url), e))
end
Expand All @@ -88,30 +88,36 @@ function connectionlayer(handler)
elseif target_url.scheme in ("ws", ) && target_url.port == ""
target_url = URI(target_url, port=80) # if there is no port info, connect_tunnel will fail
end
r = connect_tunnel(io, target_url, req)
r = if readtimeout > 0
try_with_timeout(() -> shouldtimeout(io, readtimeout, () -> close(io)), readtimeout) do
connect_tunnel(io, target_url, req)
end
else
connect_tunnel(io, target_url, req)
end
if r.status != 200
close(io)
return r
end
if target_url.scheme in ("https", "wss")
io = ConnectionPool.sslupgrade(io, target_url.host; kw...)
io = ConnectionPool.sslupgrade(io, target_url.host; readtimeout=readtimeout, kw...)
end
req.headers = filter(x->x.first != "Proxy-Authorization", req.headers)
end

stream = Stream(req.response, io)
return handler(stream; kw...)
return handler(stream; readtimeout=readtimeout, kw...)
catch e
@debugv 1 "❗️ ConnectionLayer $e. Closing: $io"
shouldreuse = false
@try Base.IOError close(io)
e isa HTTPError || throw(RequestError(req, e))
rethrow(e)
finally
releaseconnection(io, shouldreuse)
if !shouldreuse
@try Base.IOError close(io)
end
releaseconnection(io, shouldreuse)
end
end
end
Expand All @@ -126,8 +132,11 @@ function connect_tunnel(io, target_url, req)
headers["Proxy-Authorization"] = auth
end
request = Request("CONNECT", target, headers)
# @debugv 2 "connect_tunnel: writing headers"
writeheaders(io, request)
# @debugv 2 "connect_tunnel: reading headers"
readheaders(io, request.response)
# @debugv 2 "connect_tunnel: done reading headers"
return request.response
end

Expand Down
24 changes: 2 additions & 22 deletions src/clientlayers/TimeoutRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,8 @@ function timeoutlayer(handler)
return handler(stream; kw...)
end
io = stream.stream
wait_for_timeout = Ref{Bool}(true)
timedout = Ref{Bool}(false)

@async while wait_for_timeout[]
if isreadable(io) && inactiveseconds(io) > readtimeout
timedout[] = true
close(io)
@debugv 1 "💥 Read inactive > $(readtimeout)s: $io"
break
end
sleep(readtimeout / 10)
end

try
return handler(stream; kw...)
catch e
if timedout[]
throw(TimeoutError(readtimeout))
end
rethrow(e)
finally
wait_for_timeout[] = false
return try_with_timeout(() -> shouldtimeout(io, readtimeout), readtimeout, () -> close(io)) do
handler(stream; kw...)
end
end
end
Expand Down
1 change: 1 addition & 0 deletions src/connectionpools.jl
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ function acquire(f, pod::Pod)
# println("$(taskid()): checking idle_timeout connections for reuse")
conn = popfirst!(pod.conns)
if isvalid(pod, conn)
# println("$(taskid()): found a valid connection to reuse")
return trackconnection!(pod, conn)
else
# nothing, let the non-valid connection fall into GC oblivion
Expand Down