-
-
Notifications
You must be signed in to change notification settings - Fork 43
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This changes TcpClient.new and TcpClient.with_timeout to support connecting to multiple IP addresses based on RFC 8305, also known as "Happy Eyeballs version 2". In addition, TcpClient.new now uses a default timeout of 60 seconds instead of waiting forever, and std.time.Duration is extended with a few extra methods such as Duration.positive? and Duration./. This fixes #795. Changelog: added
- Loading branch information
1 parent
98c7f1e
commit a86b484
Showing
4 changed files
with
350 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,237 @@ | ||
# Support for connecting TCP sockets using [RFC | ||
# 8305](https://datatracker.ietf.org/doc/html/rfc8305). | ||
import std.cmp (min) | ||
import std.drop (Drop) | ||
import std.io (Error) | ||
import std.iter (Stream) | ||
import std.net.ip (IpAddress) | ||
import std.net.socket (TcpClient) | ||
import std.sync (Channel) | ||
import std.time (Duration, Instant) | ||
|
||
# The amount of milliseconds to wait for a socket to connect. | ||
let TIMEOUT = 250 | ||
|
||
# Returns an iterator that yields IP addresses in alternating order, starting | ||
# with an IPv6 address. | ||
fn interleave(ips: ref Array[IpAddress]) -> Stream[IpAddress] { | ||
let mut v6_idx = 0 | ||
let mut v4_idx = 0 | ||
let mut v6 = true | ||
|
||
Stream.new(fn move { | ||
if v6 := v6.false? { | ||
loop { | ||
match ips.opt(v6_idx := v6_idx + 1) { | ||
case Some(V6(ip)) -> return Option.Some(IpAddress.V6(ip)) | ||
case Some(_) -> {} | ||
case _ -> break | ||
} | ||
} | ||
} | ||
|
||
loop { | ||
match ips.opt(v4_idx := v4_idx + 1) { | ||
case Some(V4(ip)) -> return Option.Some(IpAddress.V4(ip)) | ||
case Some(_) -> {} | ||
case _ -> return Option.None | ||
} | ||
} | ||
}) | ||
} | ||
|
||
fn connect( | ||
ips: ref Array[IpAddress], | ||
port: Int, | ||
timeout_after: Instant, | ||
) -> Result[TcpClient, Error] { | ||
let size = ips.size | ||
|
||
# It's possible the list of IPs is passed directly from "user input" such as | ||
# a DNS record. If this list is empty we don't want to panic and abort, but | ||
# instead give callers a chance to handle the error. As such, we return | ||
# `InvalidArgument` instead. | ||
# | ||
# When there's only one IP address we can skip the Happy Eyeballs algorithm | ||
# and just connect to it directly. | ||
match size { | ||
case 0 -> throw Error.InvalidArgument | ||
case 1 -> return TcpClient.connect(ips.get(0), port, timeout_after) | ||
case _ -> {} | ||
} | ||
|
||
let ips = interleave(ips) | ||
let cons = Connections.new(port, timeout_after) | ||
let mut pending = 0 | ||
|
||
while timeout_after.remaining.positive? { | ||
let id = match ips.next { | ||
case Some(ip) -> { | ||
pending += 1 | ||
cons.connect(ip) | ||
} | ||
case _ -> break | ||
} | ||
|
||
let wait = Duration.from_millis(TIMEOUT) | ||
let deadline = min(timeout_after, wait.to_instant) | ||
|
||
loop { | ||
match cons.receive(deadline) { | ||
case Some(Ok(v)) -> return Result.Ok(v) | ||
case Some(Error(v)) if v == id -> { | ||
# If the socket we're waiting for produces an error then there's no | ||
# point in waiting any longer, so we just move on. | ||
pending -= 1 | ||
break | ||
} | ||
case Some(_) -> { | ||
# If a socket we tried to use previously produces an error we just | ||
# ignore it and continue waiting for the current socket. | ||
pending -= 1 | ||
} | ||
case _ -> { | ||
# We waited long enough and so we need to move on to the next socket. | ||
break | ||
} | ||
} | ||
} | ||
} | ||
|
||
# None of the sockets could connect within the initial timeout, but they | ||
# might connect before our supplied deadline (if this hasn't already expired | ||
# at this point). | ||
while pending > 0 { | ||
match cons.receive(timeout_after) { | ||
case Some(Ok(v)) -> return Result.Ok(v) | ||
case Some(_) -> pending -= 1 | ||
case _ -> break | ||
} | ||
} | ||
|
||
Result.Error( | ||
if timeout_after.remaining.positive? { | ||
Error.ConnectionRefused | ||
} else { | ||
Error.TimedOut | ||
}, | ||
) | ||
} | ||
|
||
# A type for connecting a `TcpClient` asynchronously. | ||
type async Connection { | ||
# The ID of the current connection. | ||
# | ||
# This is used to determine when an error is produced what socket that error | ||
# belongs to. | ||
let @id: Int | ||
|
||
# The IP address to connect to. | ||
let @ip: IpAddress | ||
|
||
# The port to connect to. | ||
let @port: Int | ||
|
||
# The deadline after which we should give up. | ||
let @deadline: Instant | ||
|
||
# The channel to send the results back to. | ||
let @output: Channel[Result[TcpClient, Int]] | ||
|
||
# A flag indicating if we should continue trying to connect or if we should | ||
# stop. | ||
let @run: Bool | ||
|
||
fn static new( | ||
id: Int, | ||
ip: IpAddress, | ||
port: Int, | ||
deadline: Instant, | ||
output: uni Channel[Result[TcpClient, Int]], | ||
) -> Connection { | ||
Connection( | ||
id: id, | ||
ip: ip, | ||
port: port, | ||
deadline: deadline, | ||
output: output, | ||
run: true, | ||
) | ||
} | ||
|
||
fn async mut cancel { | ||
@run = false | ||
} | ||
|
||
fn async connect { | ||
if @run.false? { return } | ||
|
||
# To support cancellation we use an internal timeout. This way we don't just | ||
# sit around for e.g. 60 seconds even though another socket connected | ||
# successfully. | ||
let interval = Duration.from_millis(TIMEOUT) | ||
let deadline = min(interval.to_instant, @deadline) | ||
let res = recover { | ||
match TcpClient.connect(@ip, @port, deadline) { | ||
case Ok(v) -> Result.Ok(v) | ||
case Error(TimedOut) if @deadline.remaining.to_nanos > 0 -> { | ||
# We finished one cycle but there's still time left, so let's try | ||
# again until the user-provided deadline is also exceeded. | ||
return connect | ||
} | ||
case Error(_) -> { | ||
# We wan out of time or encountered a non-timeout error (e.g. the | ||
# connection is refused). In this case we need to report back to the | ||
# parent process such that it doesn't hang waiting for a result | ||
# forever. | ||
Result.Error(@id) | ||
} | ||
} | ||
} | ||
|
||
@output.send(res) | ||
} | ||
} | ||
|
||
type inline Connections { | ||
# The post to connect the IPs to. | ||
let @port: Int | ||
|
||
# The deadline after which all attempts should time out. | ||
let @timeout_after: Instant | ||
|
||
# The channel to use for communicating results back to the parent process. | ||
let @channel: Channel[Result[TcpClient, Int]] | ||
|
||
# The processes used to establish connections | ||
let @connections: Array[Connection] | ||
|
||
fn static new(port: Int, timeout_after: Instant) -> Connections { | ||
Connections( | ||
port: port, | ||
timeout_after: timeout_after, | ||
channel: Channel.new, | ||
connections: [], | ||
) | ||
} | ||
|
||
fn mut connect(ip: IpAddress) -> Int { | ||
let id = @connections.size | ||
let chan = recover @channel.clone | ||
let proc = Connection.new(id, ip, @port, @timeout_after, chan) | ||
|
||
proc.connect | ||
@connections.push(proc) | ||
id | ||
} | ||
|
||
fn mut receive(timeout_after: Instant) -> Option[Result[TcpClient, Int]] { | ||
@channel.receive_until(timeout_after) | ||
} | ||
} | ||
|
||
impl Drop for Connections { | ||
fn mut drop { | ||
@connections.iter_mut.each(fn (c) { c.cancel }) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.